Skip to content

Commit

Permalink
Reimplement async reciever thread
Browse files Browse the repository at this point in the history
  • Loading branch information
CoderSherlock committed Oct 20, 2018
1 parent b07b235 commit 7c4e48f
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 16 deletions.
23 changes: 16 additions & 7 deletions dlib/dnn/syncer/syncer.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,6 @@ class dnn_syncer {
device master;
connection *master_conn = NULL;

std::vector<device> slaves_list;
std::vector<connection *> slaves_conns;
std::vector<slaveStatus> slaves_status;

int verbose = 0;
int num_debug = 1;
Expand All @@ -59,10 +56,12 @@ class dnn_syncer {
dnn_syncer (const dnn_syncer &) = default;
dnn_syncer &operator= (const dnn_syncer &) = default;

[[deprecated("Please use dnn_leader/dnn_async_leader or dnn_worker instead of dnn_syncer.")]]
dnn_syncer (int ism) {
ismaster = ism;
}

[[deprecated("Please use dnn_leader/dnn_async_leader or dnn_worker instead of dnn_syncer.")]]
dnn_syncer (trainer_type *trainer, int ism) {
this->trainer = trainer;
this->ismaster = ism;
Expand Down Expand Up @@ -161,14 +160,14 @@ class dnn_syncer {

void update (std::vector<tensor *> &updated);

std::vector<device> slaves_list;
std::vector<connection *> slaves_conns;
std::vector<slaveStatus> slaves_status;

// TODO
dnn_syncer &operator<< (std::ostream &out) {
out << trainer << std::endl;
out << ismaster << std::endl;

if (ismaster)
out << slaves_list.size() << std::endl;

}

};
Expand Down Expand Up @@ -214,6 +213,7 @@ class dnn_leader : public dnn_syncer<trainer_type> {
void update_gradients(std::vector<tensor*> & gradients);

void sn_sync();

};

template<typename trainer_type>
Expand All @@ -235,9 +235,18 @@ class dnn_async_leader : public dnn_leader<trainer_type> {

void init_reciever_pool();

int recieve_gradients_from_one (int slave_index, std::vector<resizable_tensor> &cli_tensors);

void sync();

private:
void async_thread(int);

std::vector<std::thread *> recievers;

std::vector<std::vector<resizable_tensor>> send_back_paras;
std::vector<int> send_back_flags;

task_queue tq;
};

Expand Down
96 changes: 96 additions & 0 deletions dlib/dnn/syncer/syncer_async.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,110 @@ namespace dlib {

template<typename trainer_type>
void dnn_async_leader<trainer_type>::init_reciever_pool() {

// Initiliaze parameters storage for each thread
std::vector<tensor *> tensors;
tensors.resize (this->trainer->num_computational_layers);
visit_layer_parameters (this->trainer->devices[0]->net, [&] (size_t i, tensor & t) {
tensors[i] = &t;
});

this->send_back_paras.resize (this->get_running_slaves_num());
this->send_back_flags.resize (this->get_running_slaves_num());

for (size_t i = 0; i < this->send_back_paras.size(); i++) {
this->send_back_paras[i].resize (this->trainer->num_computational_layers);

for (size_t j = 0; j < this->send_back_paras[i].size(); j++) {
this->send_back_paras[i][j].copy_size (*tensors[j]);
}

this->send_back_flags[i] = 0;
}

// Initialize reciever threads
this->recievers.resize (this->get_running_slaves_num());

for (size_t i = 0; i < this->recievers.size(); i++) {
this->recievers[i] = new std::thread (&dnn_async_leader::async_thread, this, i);
}
};

template<typename trainer_type>
void dnn_async_leader<trainer_type>::sync() {
int last = -1;

while (1) {
int now = 0;

while (this->tq.queue_lock.trylock() == 0) {};

for (auto i = this->tq.queue.begin(); i != this->tq.queue.end(); i ++) {
if ((*i).ready == 1) {
now ++;
}
}

this->tq.queue_lock.unlock();

if (now != last) {
std::cout << "Now we have " << now << " tasks." << std::endl;
last = now;
}

}

for (size_t i = 0; i < this->recievers.size(); i ++) {
this->recievers[i]->join();
}
};

template<typename trainer_type>
void dnn_async_leader<trainer_type>::async_thread (int slave_index) {

// Initialize the reciever structure
std::vector<tensor *> tensors;
tensors.resize (this->trainer->num_computational_layers);
visit_layer_parameters (this->trainer->devices[0]->net, [&] (size_t i, tensor & t) {
tensors[i] = &t;
});

std::vector<resizable_tensor> gradients;
gradients.resize (this->trainer->num_computational_layers);

for (size_t i = 0; i < gradients.size(); i++) {
gradients[i].copy_size (*tensors[i]);
}

while (1) {
this->recieve_gradients_from_one (slave_index, gradients);
std::cout << "Recieved from slave " << slave_index << std::endl;

task t (slave_index, 1, gradients);
this->tq.add_task (t);

while (this->send_back_flags[slave_index] == 0) {}

// this->send_parameters();

this->send_back_flags[slave_index] = 0;
}
};

template<typename trainer_type>
int dnn_async_leader<trainer_type>::recieve_gradients_from_one (int slave_index, std::vector<resizable_tensor> &cli_tensors) {
std::cout << slave_index << ":"<<&this->slaves_conns << std::endl;
for (size_t i = 0; i < cli_tensors.size(); i++) {
if (cli_tensors[i].size() != 0) {
network::recieve_compressed_tensor (this->slaves_conns[slave_index], &cli_tensors[i]);
}
}

return 1;
};



}

#endif
2 changes: 1 addition & 1 deletion dlib/dnn/syncer/syncer_leader_default.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ int dnn_syncer<trainer_type>::get_running_slaves_num() {
DLIB_CASSERT (this->ismaster == 1, "Slave deivce doesn't have the right to get running_slaves_num.");
int ret = 0;

for (int i = 0; i < slaves_list.size(); i++) {
for (int i = 0; i < this->slaves_list.size(); i++) {
if (this->slaves_status[i] == 1)
ret ++;
}
Expand Down
12 changes: 4 additions & 8 deletions dlib/dnn/syncer/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,23 +38,19 @@ struct task {
public:
size_t slave_index = -1;
bool ready = 0;
std::vector<tensor *> *tensors;
std::vector<resizable_tensor> tensors;

task () = default;
task &operator= (const task &) = default;
task (size_t si_, bool ready_, std::vector<tensor *> *tensors_) {
task (size_t si_, bool ready_, std::vector<resizable_tensor> tensors_) {
slave_index = si_;
ready = ready_;
tensors = tensors_;
}

~task () {
slave_index = -1;
ready = -1;

for (auto i : *tensors) {
free (i);
}
ready = 0;
};
}; // End of class task

Expand Down Expand Up @@ -96,7 +92,7 @@ class task_queue {
return false;
}

private:
// private:
std::list<task> queue;
mutex queue_lock;
};
Expand Down
6 changes: 6 additions & 0 deletions examples/dnn_dist_leader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,12 +159,15 @@ int main (int argc, char **argv) try {
#if !ASYNC
syncer.init_slaves();
#else
syncer.init_slaves(); // TODO Need a new listener function
syncer.init_reciever_pool();
#endif

std::cout << "Finished Initialization, now start training procedures" << std::endl;
syncer.print_slaves_status();
std::cout << "Now we have " << syncer.get_running_slaves_num() << " slaves" << std::endl;

#if !ASYNC
int epoch = 0, batch = 0;
int mark = 0;

Expand Down Expand Up @@ -199,6 +202,9 @@ int main (int argc, char **argv) try {
}
}
}
#else
syncer.sync();
#endif

training.accuracy (net);
testing.accuracy (net);
Expand Down

0 comments on commit 7c4e48f

Please sign in to comment.