Skip to content

Commit

Permalink
ALL DONE
Browse files Browse the repository at this point in the history
  • Loading branch information
jtirana98 committed Apr 1, 2023
1 parent 167839b commit ae499ec
Show file tree
Hide file tree
Showing 9 changed files with 99 additions and 105 deletions.
24 changes: 12 additions & 12 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -58,19 +58,19 @@ set(SOURCE_FILES
)

add_executable("${CMAKE_PROJECT_NAME}" ${SOURCE_FILES} main.cpp)
#add_executable(compute_node ${SOURCE_FILES} pipeline_simulation/compute_node.cpp)
#add_executable(data_owner ${SOURCE_FILES} pipeline_simulation/data_owner.cpp)
#add_executable(simulated_data_owner ${SOURCE_FILES} pipeline_simulation/profiling/data_owner_simulated.cpp)
#add_executable(aggregator ${SOURCE_FILES} pipeline_simulation/aggregator.cpp)
add_executable(compute_node ${SOURCE_FILES} pipeline_simulation/compute_node.cpp)
add_executable(data_owner ${SOURCE_FILES} pipeline_simulation/data_owner.cpp)
add_executable(simulated_data_owner ${SOURCE_FILES} pipeline_simulation/profiling/data_owner_simulated.cpp)
add_executable(aggregator ${SOURCE_FILES} pipeline_simulation/aggregator.cpp)

target_link_libraries("${CMAKE_PROJECT_NAME}" ${TORCH_LIBRARIES})
#target_link_libraries(compute_node ${TORCH_LIBRARIES})
#target_link_libraries(data_owner ${TORCH_LIBRARIES})
#target_link_libraries(simulated_data_owner ${TORCH_LIBRARIES})
#target_link_libraries(aggregator ${TORCH_LIBRARIES})
target_link_libraries(compute_node ${TORCH_LIBRARIES})
target_link_libraries(data_owner ${TORCH_LIBRARIES})
target_link_libraries(simulated_data_owner ${TORCH_LIBRARIES})
target_link_libraries(aggregator ${TORCH_LIBRARIES})

set_property(TARGET ${CMAKE_PROJECT_NAME} PROPERTY CXX_STANDARD 17)
#set_property(TARGET compute_node PROPERTY CXX_STANDARD 17)
#set_property(TARGET data_owner PROPERTY CXX_STANDARD 17)
#set_property(TARGET simulated_data_owner PROPERTY CXX_STANDARD 17)
#set_property(TARGET aggregator PROPERTY CXX_STANDARD 17)
set_property(TARGET compute_node PROPERTY CXX_STANDARD 17)
set_property(TARGET data_owner PROPERTY CXX_STANDARD 17)
set_property(TARGET simulated_data_owner PROPERTY CXX_STANDARD 17)
set_property(TARGET aggregator PROPERTY CXX_STANDARD 17)
50 changes: 44 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

--------------------------------------------------------------------------------

For a more detailed discription of the documentation follow this [link](https://docs.google.com/document/d/1DaWOX27c4_4_VUT-l_UrgUV-zFa8UsIZ5zUv06pgc0s/edit?usp=sharing)
For a more detailed discription of the documentation follow this [link](https://docs.google.com/document/d/1DaWOX27c4_4_VUT-l_UrgUV-zFa8UsIZ5zUv06pgc0s/edit?usp=sharing) or check the wiki.


Repository structure:
Expand Down Expand Up @@ -65,8 +65,46 @@ How to run program and connect Libtorch:

Running SplitPipe in a distributed manner:

- configuring root-table
- enable mulit-task (if applicable)
- parameters for each entity.
- include a figure of the structure.
- emulated version.
*Case 0: Model profiling*
An example code is in main, you can either get the delay for each batch or get the per-layer delay.

*Case 1: Real system*

In this case you will run the data owners as real devices. You can run all entities in one machine or use different devices (within the same network)

- If you cannot use multicast:
- comment the following parts in the code:
- in data_owner.cpp: Comment the findPeers() call and the findInit()
- in compute_node.cpp: Comment the findInit()
- in aggregator.cpp: Comment the findInit()
- in network_layer.cpp: comment the line 506 of versio 1.0.0
- update the rooting table in pipeline_simulation/network_layer.h

For each data owner device call:

$ ./data_owner -i id -d <number-of-data-owners> -c <number-of-compute-nodes> -s <split-rule>

If not an init data owner you just give the node's id

For each compute node device call:

$ ./compute_node -i id

or use script run_cn.sh in pipeline_simulation/profiling

For the aggregator:

$ ./aggregator -i id -d <number-of-data-owners> -c <number-of-compute-nodes>

or use script run_aggr.sh in pipeline_simulation/profiling

NOTE: There is support for logging and checkpoining but this feature is deactivated for this version. You can use the utils/pipeline_logging.sh to do so.

*Case 3: Emulated environment*

In this case the data owners are running in an emulated environmet. Note that this version does not supprt multicast.
You can add in the pipeline_simulation/profiling/rpi_stat.h the device characteristics and use the script run_data_owners_init.sh and run_data_owners_worker.sh in
pipeline_simulation/profiling.
The results are stored to logging files as are indicated in the script files (change them accordingly)

The code for the emulated data owner is in pipeline_simulation/profiling/data_owner_simulatede.cpp
8 changes: 4 additions & 4 deletions main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ int main(int argc, char **argv) {
std::vector<int> batches{32, 64, 128};
std::vector<int> splits{2, 4, 6, 8};

// train_resnet(CIFAR_10, resnet101, false, 128, std::vector<int>(), false);
train_resnet(CIFAR_10, resnet101, false, 128, std::vector<int>(), false); //get the latency for one batch update for the whole model

//train_resnet(CIFAR_10, resnet101, true, 128);
train_vgg(CIFAR_10, v19, false, 128, std::vector<int>(), false);
//train_vgg(CIFAR_10, v19, true, 128);
train_resnet(CIFAR_10, resnet101, true, 128); // get the per-layer latency
train_vgg(CIFAR_10, v19, false, 128, std::vector<int>(), false); //get the latency for one batch update for the whole model
train_vgg(CIFAR_10, v19, true, 128); // get the per-layer latency
}
63 changes: 9 additions & 54 deletions pipeline_simulation/profiling/data_owner_simulated.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,8 @@ int main(int argc, char **argv) {
refactoring_data client_message;
// check if you are the init
if (myID == 0) {

auto cut_layers_ = "10,19";
//auto data_owners_ = argv[2]; // CHANGE
int num_data_owners = atoi(argv[2]);
//std::cout << data_owners_ << std::endl;
//auto compute_nodes_ = atoi(argv[3]);
int num_compute_nodes = 1;

if(argc >= 4)
Expand All @@ -43,7 +39,6 @@ int main(int argc, char **argv) {
cut_layers_ = "3,13,19";
if (num_compute_nodes == 3)
cut_layers_ = "2,15,25,35";//"3,8,14,19";
//if (num_compute_nodes == 4)

const char separator = ',';
std::string val;
Expand All @@ -56,13 +51,6 @@ int main(int argc, char **argv) {
}
}

/*streamData = std::stringstream(data_owners_);
while (std::getline(streamData, val, separator)) {
if (val != "") {
data_owners.push_back(stoi(val));
}
}*/

data_owners.push_back(0);
for (int i = 0; i < num_data_owners-1; i++) {
data_owners.push_back(i+3 +1);
Expand All @@ -75,16 +63,7 @@ int main(int argc, char **argv) {
}

int num_parts = compute_nodes.size() + 2;

//std::cout << "found them" << std::endl;
//sleep(2);

int data_onwer_end = 2;
int data_owner_beg = 8;

int model_name = 2;
int model_type = 3;


client_message.dataset = CIFAR_10;
client_message.model_name_ = model_name::resnet;//model_name::vgg;
client_message.model_type_ =resnet_model::resnet101;//vgg_model::v19;
Expand All @@ -107,24 +86,6 @@ int main(int argc, char **argv) {
my_port = my_port + (data_owners[i] +3);
sys_.my_network_layer.rooting_table.insert({data_owners[i], std::pair<std::string, int>(my_addr.first, my_port)});
}
/*else if(data_owners[i] > 13 && data_owners[i] < 18) {
std::pair<std::string, int> my_addr = sys_.my_network_layer.rooting_table.find(13)->second;
int my_port = my_addr.second;
my_port = my_port + (data_owners[i] - 13);
sys_.my_network_layer.rooting_table.insert({data_owners[i], std::pair<std::string, int>(my_addr.first, my_port)});
}
else if (data_owners[i] > 18 && data_owners[i] < 33){
std::pair<std::string, int> my_addr = sys_.my_network_layer.rooting_table.find(18)->second;
int my_port = my_addr.second;
my_port = my_port + (data_owners[i] - 18);
sys_.my_network_layer.rooting_table.insert({data_owners[i], std::pair<std::string, int>(my_addr.first, my_port)});
}
else if (data_owners[i] > 33 && data_owners[i] < 43) {
std::pair<std::string, int> my_addr = sys_.my_network_layer.rooting_table.find(33)->second;
int my_port = my_addr.second;
my_port = my_port + (data_owners[i] - 33);
sys_.my_network_layer.rooting_table.insert({data_owners[i], std::pair<std::string, int>(my_addr.first, my_port)});
}*/
else if (data_owners[i] >= 18) {
std::pair<std::string, int> my_addr = sys_.my_network_layer.rooting_table.find(18)->second;
int my_port = my_addr.second;
Expand Down Expand Up @@ -201,20 +162,16 @@ int main(int argc, char **argv) {
auto send_gradients = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch());
int epoch_count = 0, g_epoch_count = 0; // communication round
bool new_r = true;
for (size_t round = 0; round != sys_.rounds; ++round) {
for (size_t round = 0; round != sys_.rounds; round++) {
int batch_index = 0;
sys_.zero_metrics();
int total_num = 0;
if (new_r) {
//std::cout << "New round " << std::endl;
init_epoch = std::chrono::steady_clock::now();
new_r = false;
}

send_activations = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch());
//long c = send_activations.count();
//std::cout << c << std::endl;
//std::cout << std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count() << std::endl;
int g_i = 0;
for (int inter_batch = 0; inter_batch < 4; inter_batch++ ) {
for (auto& batch : *train_dataloader) {
Expand All @@ -225,8 +182,7 @@ int main(int argc, char **argv) {
task.size_ = batch.data.size(0);
task.values = batch.data;
task = sys_.exec(task, batch.target);
//task.t_start = send_activations.count();
//std::cout << task.t_start << std::endl;

total_num += task.size_;
task.batch0 = batch_index;
auto end_f1 = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
Expand All @@ -249,7 +205,7 @@ int main(int argc, char **argv) {
//std::cout << "f1-end: " << end_f1-send_activations.count() << std::endl;
// send task to next node
task.t_start = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
//usleep(myID*200);

std::cout << "Send forward task to C1 " << end_f1-send_activations.count() << std::endl;
sys_.my_network_layer.new_message(task, sys_.inference_path[0]);

Expand All @@ -261,7 +217,7 @@ int main(int argc, char **argv) {
task = sys_.exec(task, batch.target); // forward and backward
// send task - backward
auto end_m2 = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
//std::cout << "m2-end: " << end_m2-send_gradients.count() << std::endl;


real_duration = 0;
real_duration = my_rpi.rpi_fbm2;
Expand All @@ -274,13 +230,13 @@ int main(int argc, char **argv) {
usleep(real_duration-(end_m2-send_gradients.count()));
}

//task.t_start = send_gradients.count();

task.t_start = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
//usleep(myID*110);

std::cout << "Send backprop task to C1 " << end_f1-send_activations.count() << std::endl;
sys_.my_network_layer.new_message(task, sys_.inference_path[1]);
//optimize task

//optimize task
auto task1 = sys_.my_network_layer.check_new_task();

task1 = sys_.exec(task1, batch.target); // optimize
Expand All @@ -297,8 +253,7 @@ int main(int argc, char **argv) {

//if (g_i % 50 == 0)
std::cout << "One batch: global epoch " << g_epoch_count+1 << " local epoch: " << epoch_count+1 <<" b: " << batch_index+1 << " is " << _time << std::endl;

// end of batch

batch_index++;
g_i++;
}
Expand Down
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,14 @@ mkdir -p /root/experiments/simulations_check/compute_nodes_$2/dataowners_$1_
declare -i start=0
declare -i end=0
declare -i port=0
start=$(( $2 + 1 ))
end=$(( $1-2 + $start ))
declare -i port_start=3
start=$(( 18 ))
end=$(( $1 + $start -1))

for i in $(seq $start 1 $end)
do
port=$(( 8081 + $2 + $i ))
port=$(( 8081 + $port_start ))
sudo iptables -I INPUT -p tcp -m tcp --dport $port -j ACCEPT
../../build/simulated_data_owner $i > "/root/experiments/simulations_check/compute_nodes_$2/dataowners_$1_/d$i.data" &
done

sudo iptables -I INPUT -p tcp -m tcp --dport 8081 -j ACCEPT
../../build/simulated_data_owner 0 $1 $2 > "/root/experiments/simulations_check/compute_nodes_$2/dataowners_$1_/d0.data" &
port_start=$(( $port_start + 1 ))
done
24 changes: 24 additions & 0 deletions pipeline_simulation/profiling/run_data_owners_workser.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#!/bin/bash -xe

# 1: num of data owners
# 2: num of compute nodes

export LD_LIBRARY_PATH=/usr/local/lib:/usr/lib:/usr/local/lib64:/usr/lib64

mkdir -p /root/experiments/simulations_check/compute_nodes_$2
mkdir -p /root/experiments/simulations_check/compute_nodes_$2/dataowners_$1_

declare -i start=0
declare -i end=0
declare -i port=0
declare -i port_start=0
start=$(( 18 ))
end=$(( $1 + $start -1))

for i in $(seq $start 1 $end)
do
port=$(( 8081 + $port_start ))
sudo iptables -I INPUT -p tcp -m tcp --dport $port -j ACCEPT
../../build/simulated_data_owner $i > "/root/experiments/simulations_check/compute_nodes_$2/dataowners_$1_/d$i.data" &
port_start=$(( $port_start + 1 ))
done
22 changes: 0 additions & 22 deletions pipeline_simulation/profiling/run_exper.sh

This file was deleted.

0 comments on commit ae499ec

Please sign in to comment.