Skip to content

Commit

Permalink
Merge pull request #645 from FederatedAI/dev-2.5.2-merge
Browse files Browse the repository at this point in the history
merge
  • Loading branch information
mgqa34 authored Sep 8, 2023
2 parents 28cda38 + 3eeb936 commit ccdddf2
Show file tree
Hide file tree
Showing 107 changed files with 9,092 additions and 615 deletions.
2 changes: 1 addition & 1 deletion BUILD_INFO
Original file line number Diff line number Diff line change
@@ -1 +1 @@
eggroll.version=2.5.1
eggroll.version=2.5.2
27 changes: 27 additions & 0 deletions jvm/core/main/java/com/webank/eggroll/core/util/CacheUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.webank.eggroll.core.util;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.webank.eggroll.core.meta.ErProcessor;

import java.util.concurrent.TimeUnit;

public class CacheUtil {

public static Cache<String,ErProcessor> buildErProcessorCache(int maxsize, int expireTime, TimeUnit timeUnit){
return CacheBuilder.newBuilder()
.maximumSize(maxsize)
.expireAfterWrite(expireTime,timeUnit)
.build();

}








}
22 changes: 16 additions & 6 deletions jvm/core/main/java/com/webank/eggroll/core/util/ProcessUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,21 @@

public class ProcessUtils {


public static Process createProcess(String command) throws IOException {
String[] cmd = new String[] { "/bin/sh", "-c", command };
return Runtime.getRuntime().exec(cmd);
}










public static boolean checkProcess(String processId) {

boolean flag = true;
Expand All @@ -23,13 +38,8 @@ public static boolean checkProcess(String processId) {
+
"| grep -w " + processId;
}
System.err.println(command);
String[] cmd = new String[] { "/bin/sh", "-c", command };

process = Runtime.getRuntime().exec(cmd);

process = createProcess(command);
BufferedReader br = new BufferedReader(new InputStreamReader( process.getInputStream()));

String line=null;
StringBuffer b=new StringBuffer();
while (true) {
Expand Down
57 changes: 57 additions & 0 deletions jvm/core/main/resources/upgrade-to-2.5.X.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
alter table session_processor add `processor_option` VARCHAR(512);



CREATE TABLE IF NOT EXISTS `processor_resource`
(
`id` SERIAL PRIMARY KEY,
`processor_id` BIGINT NOT NULL,
`session_id` VARCHAR(767),
`server_node_id` INT NOT NULL,
`resource_type` VARCHAR(255),
`allocated` BIGINT NOT NULL default 0,
`extention` VARCHAR(512),
`status` VARCHAR(255),
`pid` INT NOT NULL DEFAULT -1,
`created_at` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
`updated_at` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
) DEFAULT CHARACTER SET latin1
COLLATE latin1_swedish_ci;
CREATE INDEX `idx_processor_id_processor_resource` ON `processor_resource` (`processor_id`);
CREATE INDEX `idx_node_id_processor_resource` ON `processor_resource` (`server_node_id`);
CREATE INDEX `idx_session_id_processor_resource` ON `processor_resource` (`session_id`);
CREATE INDEX `idx_node_status_processor_resource` ON `processor_resource` (`server_node_id`,`resource_type`,`status`);



CREATE TABLE IF NOT EXISTS `node_resource`
(
`resource_id` SERIAL PRIMARY KEY,
`server_node_id` BIGINT NOT NULL,
`resource_type` VARCHAR(255),
`total` BIGINT NOT NULL default 0,
`used` BIGINT NOT NULL default 0,
`pre_allocated` BIGINT NOT NULL default 0,
`allocated` BIGINT NOT NULL DEFAULT 0,
`extention` VARCHAR(512),
`status` VARCHAR(255),
`created_at` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
`updated_at` DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
) DEFAULT CHARACTER SET latin1
COLLATE latin1_swedish_ci;
CREATE INDEX `idx_node_id_node_resource` ON `node_resource` (`server_node_id`);
CREATE INDEX `idx_node_status_node_resource` ON `node_resource` (`server_node_id`,`status`);
CREATE UNIQUE INDEX `idx_u_node_resource` ON `node_resource` (`server_node_id`, `resource_type`);


CREATE TABLE IF NOT EXISTS `session_ranks`
(
`container_id` SERIAL PRIMARY KEY,
`session_id` VARCHAR(767),
`server_node_id` INT NOT NULL,
`global_rank` INT UNSIGNED NOT NULL,
`local_rank` INT UNSIGNED NOT NULL
) DEFAULT CHARACTER SET latin1
COLLATE latin1_swedish_ci;

CREATE INDEX `idx_session_id_session_ranks` ON `session_ranks` (`session_id`);
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package com.webank.eggroll.core.client

import com.webank.eggroll.core.command.CommandClient
import com.webank.eggroll.core.constant._
import com.webank.eggroll.core.deepspeed.job.meta.{SubmitJobRequest, SubmitJobResponse}
import com.webank.eggroll.core.deepspeed.job.meta.{DownloadJobRequest, DownloadJobResponse, PrepareJobDownloadRequest, PrepareJobDownloadResponse, SubmitJobRequest, SubmitJobResponse}
import com.webank.eggroll.core.meta._
import com.webank.eggroll.core.session.StaticErConf

Expand Down Expand Up @@ -117,4 +117,9 @@ class ClusterManagerClient(val endpoint: ErEndpoint) {

def submitJob(job: SubmitJobRequest): SubmitJobResponse =
cc.call(JobCommands.submitJob, job)

def prepareJobDownload(job: PrepareJobDownloadRequest): PrepareJobDownloadResponse =
cc.call(JobCommands.prepareJobDownload, job)


}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ class NodeManagerClient(var nodeManagerEndpoint: ErEndpoint) {

private val commandClient = new CommandClient(nodeManagerEndpoint)



def this(serverHost: String, serverPort: Int) {
this(new ErEndpoint(serverHost, serverPort))
}
Expand All @@ -40,6 +42,10 @@ class NodeManagerClient(var nodeManagerEndpoint: ErEndpoint) {
StaticErConf.getInt(NodeManagerConfKeys.CONFKEY_NODE_MANAGER_PORT, -1))
}

def getRankLog():Unit ={

}

def heartbeat(processor: ErProcessor): ErProcessor =
commandClient.call[ErProcessor](NodeManagerCommands.heartbeat, processor)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ object JobCommands {
val stopJob = new CommandURI(prefix = prefix, name = "stopJob")
val killJob = new CommandURI(prefix = prefix, name = "killJob")
val downloadJob = new CommandURI(prefix = prefix, name = "downloadJob")
val prepareJobDownload = new CommandURI(prefix = prefix,name = "prepareJobDownload")
}

object RendezvousStoreCommands {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ object ClusterManagerConfKeys {
val CONFKEY_CLUSTER_MANAGER_DATASOURCE_DB_DEFAULT_AUTO_COMMIT = "eggroll.resourcemanager.clustermanager.datasource.db.default.auto.commit"
val CONFKEY_CLUSTER_MANAGER_HOST = "eggroll.resourcemanager.clustermanager.host"
val CONFKEY_CLUSTER_MANAGER_PORT = "eggroll.resourcemanager.clustermanager.port"
var CONFKEY_CLUSTER_MANAGER_NODE_HEARTBEAT_EXPIRED_COUNT = ErConfKey("eggroll.resourcemanager.clustermanager.node.heartbeat.expire.count",2)
var CONFKEY_CLUSTER_MANAGER_NODE_HEARTBEAT_EXPIRED_COUNT = ErConfKey("eggroll.resourcemanager.clustermanager.node.heartbeat.expire.count",200)
val EGGROLL_RESOURCEMANAGER_CLUSTERMANAGER_JDBC_PASSWORD_DECRYPTOR = ErConfKey("eggroll.resourcemanager.clustermanager.jdbc.password.decryptor")
val EGGROLL_RESOURCEMANAGER_CLUSTERMANAGER_JDBC_PASSWORD_DECRYPTOR_ARGS = ErConfKey("eggroll.resourcemanager.clustermanager.jdbc.password.decryptor.args")
val EGGROLL_RESOURCEMANAGER_CLUSTERMANAGER_JDBC_PASSWORD_DECRYPTOR_ARGS_SPLITER = ErConfKey("eggroll.resourcemanager.clustermanager.jdbc.password.decryptor.args.spliter", ",")
Expand Down Expand Up @@ -160,13 +160,17 @@ object SessionConfKeys {
val CONFKEY_SESSION_NAME = "eggroll.session.name"
val CONFKEY_SESSION_PROCESSORS_PER_NODE = "eggroll.session.processors.per.node"
val EGGROLL_SESSION_START_TIMEOUT_MS = ErConfKey("eggroll.session.start.timeout.ms", 20000)
val EGGROLL_SESSION_STATUS_NEW_TIMEOUT_MS = ErConfKey("eggroll.session.status.new.timeout.ms", 8*3600*1000+20000)
val EGGROLL_SESSION_STOP_TIMEOUT_MS = ErConfKey("eggroll.session.stop.timeout.ms", 20000)
val EGGROLL_SESSION_MAX_LIVE_MS = ErConfKey("eggroll.session.max.live.ms", 48*3600*1000)
var EGGROLL_SESSION_STATUS_CHECK_INTERVAL_MS = ErConfKey("eggroll.session.status.check.interval.ms", 5000)
val EGGROLL_SESSION_PYTHON_PATH = "python.path"
val EGGROLL_SESSION_PYTHON_VENV = "python.venv"
var EGGROLL_SESSION_USE_RESOURCE_DISPATCH = "eggroll.session.use.resource.dispatch"
val EGGROLL_RESOURCE_DISPATCH_INTERVAL = ErConfKey("eggroll.session.resource.dispatch.interval", 3000)
val EGGROLL_RESOURCE_LOCK_EXPIRE_INTERVAL = ErConfKey("eggroll.resource.lock.expire.interval", 3600000)

val EGGROLL_RESOURCE_COUNT_INTERVAL = ErConfKey("eggroll.resource.count.interval", 10000)
val EGGROLL_RESOURCE_SYSTEM_UPDATE_INTERVAL = ErConfKey("eggroll.resource.count.interval", 60000)
//, false
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,20 @@ object ProcessorTypes {

val ROLL_PAIR_MASTER = "roll_pair_master"
val EGG_PAIR = "egg_pair"
val DEEPSPEED_DOWNLOAD = "deepspeed_download"
}

object LogStreamStatus{

val INIT_STATUS="init"
val PREPARE_STATUS="prepare"
val RUNNING_STATUS="running"
val ERROR_STATUS="error"
val STOP_STATUS ="stop"

}


object ServerNodeTypes {
val CLUSTER_MANAGER = "CLUSTER_MANAGER"
val NODE_MANAGER = "NODE_MANAGER"
Expand Down Expand Up @@ -135,6 +147,7 @@ object ProcessorStatus {

object SessionStatus {
val NEW = "NEW"
var BEFORE_DESTORY = "BEFORE_DESTORY"
val NEW_TIMEOUT = "NEW_TIMEOUT"
val ACTIVE = "ACTIVE"
val CLOSED = "CLOSED"
Expand Down
Loading

0 comments on commit ccdddf2

Please sign in to comment.