Skip to content

Commit

Permalink
Refactor and fix bugs, released as stable async leader
Browse files Browse the repository at this point in the history
  • Loading branch information
CoderSherlock committed Oct 20, 2018
1 parent 7c4e48f commit 2a976a5
Show file tree
Hide file tree
Showing 6 changed files with 179 additions and 120 deletions.
10 changes: 6 additions & 4 deletions dlib/dnn/syncer/syncer.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,12 @@ class dnn_syncer {
dnn_syncer (const dnn_syncer &) = default;
dnn_syncer &operator= (const dnn_syncer &) = default;

[[deprecated("Please use dnn_leader/dnn_async_leader or dnn_worker instead of dnn_syncer.")]]
[[deprecated ("Please use dnn_leader/dnn_async_leader or dnn_worker instead of dnn_syncer.")]]
dnn_syncer (int ism) {
ismaster = ism;
}

[[deprecated("Please use dnn_leader/dnn_async_leader or dnn_worker instead of dnn_syncer.")]]
[[deprecated ("Please use dnn_leader/dnn_async_leader or dnn_worker instead of dnn_syncer.")]]
dnn_syncer (trainer_type *trainer, int ism) {
this->trainer = trainer;
this->ismaster = ism;
Expand Down Expand Up @@ -210,7 +210,7 @@ class dnn_leader : public dnn_syncer<trainer_type> {

void recieve_gradients_parallism (std::vector<std::vector<resizable_tensor>> &all_tensors);

void update_gradients(std::vector<tensor*> & gradients);
void update_gradients (std::vector<tensor *> &gradients);

void sn_sync();

Expand All @@ -237,10 +237,12 @@ class dnn_async_leader : public dnn_leader<trainer_type> {

int recieve_gradients_from_one (int slave_index, std::vector<resizable_tensor> &cli_tensors);

void send_parameters (int slave_index, std::vector<resizable_tensor> &parameters);

void sync();

private:
void async_thread(int);
void async_thread (int);

std::vector<std::thread *> recievers;

Expand Down
114 changes: 84 additions & 30 deletions dlib/dnn/syncer/syncer_async.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,34 +36,6 @@ void dnn_async_leader<trainer_type>::init_reciever_pool() {
}
};

template<typename trainer_type>
void dnn_async_leader<trainer_type>::sync() {
int last = -1;

while (1) {
int now = 0;

while (this->tq.queue_lock.trylock() == 0) {};

for (auto i = this->tq.queue.begin(); i != this->tq.queue.end(); i ++) {
if ((*i).ready == 1) {
now ++;
}
}

this->tq.queue_lock.unlock();

if (now != last) {
std::cout << "Now we have " << now << " tasks." << std::endl;
last = now;
}

}

for (size_t i = 0; i < this->recievers.size(); i ++) {
this->recievers[i]->join();
}
};

template<typename trainer_type>
void dnn_async_leader<trainer_type>::async_thread (int slave_index) {
Expand All @@ -88,18 +60,20 @@ void dnn_async_leader<trainer_type>::async_thread (int slave_index) {

task t (slave_index, 1, gradients);
this->tq.add_task (t);
this->trainer->train_noop(); // HPZ: Very important

while (this->send_back_flags[slave_index] == 0) {}

// this->send_parameters();
this->send_parameters (slave_index, this->send_back_paras[slave_index]);

this->send_back_flags[slave_index] = 0;
}
};

template<typename trainer_type>
int dnn_async_leader<trainer_type>::recieve_gradients_from_one (int slave_index, std::vector<resizable_tensor> &cli_tensors) {
std::cout << slave_index << ":"<<&this->slaves_conns << std::endl;
// std::cout << slave_index << ":" << &this->slaves_conns << std::endl;

for (size_t i = 0; i < cli_tensors.size(); i++) {
if (cli_tensors[i].size() != 0) {
network::recieve_compressed_tensor (this->slaves_conns[slave_index], &cli_tensors[i]);
Expand All @@ -110,6 +84,86 @@ int dnn_async_leader<trainer_type>::recieve_gradients_from_one (int slave_index,
};


template<typename trainer_type>
void dnn_async_leader<trainer_type>::send_parameters (int slave_index, std::vector<resizable_tensor> &tensors) {

for (size_t i = 0; i < tensors.size(); i++) {
if (tensors[i].size() != 0) {
print_tensor (&tensors[i], 10);
network::send_compressed_tensor (this->slaves_conns[slave_index], &tensors[i]);
}
}

}


template<typename trainer_type>
void dnn_async_leader<trainer_type>::sync() {

while (1) {

while (this->tq.queue_lock.trylock() == 0) {};

auto i = this->tq.queue.begin();

for (i = this->tq.queue.begin(); i != this->tq.queue.end(); i ++) {
if ((*i).ready == 1) {

while (this->trainer->status_lock.trylock() == 0);

this->trainer->synchronization_status = 0;
this->trainer->status_lock.unlock();

// Let's rock it!
(*i).ready = 0;
this->tq.queue_lock.unlock();

// Update to trainer
std::vector<tensor *> temp (this->trainer->num_computational_layers);

for (size_t j = 0; j < temp.size(); j ++) {
temp[j] = & ((*i).tensors[j]);
}

while (this->trainer->synchronization_status != 1) { }

this->update_gradients (temp);

while (this->trainer->status_lock.trylock() == 0);

if (this->trainer->synchronization_status != 1)
std::cout << "Something wrong with sync lock: current: " << this->trainer->synchronization_status << "\t Going to set: 2" << std::endl;

this->trainer->synchronization_status = 2;
this->trainer->status_lock.unlock();

// Wait for result
while (this->trainer->synchronization_status != 3) { }

visit_layer_parameters (this->trainer->devices[0]->net, [&] (size_t k, tensor & t) {
std::cout << "SP get parameteres from" << &t << std::endl;
this->send_back_paras[ (*i).slave_index][k] = t;
});

this->send_back_flags[ (*i).slave_index] = 1;

while (this->tq.queue_lock.trylock() == 0) {};

this->tq.queue.erase (i);

this->tq.queue_lock.unlock();

break;
}
}


}

for (size_t i = 0; i < this->recievers.size(); i ++) {
this->recievers[i]->join();
}
};

}

Expand Down
4 changes: 2 additions & 2 deletions dlib/dnn/syncer/syncer_leader_default.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,13 +149,13 @@ void dnn_leader<trainer_type>::send_parameters (connection *slave) {
tensors.resize (this->trainer->num_computational_layers);

visit_layer_parameters (this->trainer->devices[0]->net, [&] (size_t i, tensor & t) {
std::cout<<"SP get parameteres from" << &t << std::endl;
std::cout << "SP get parameteres from" << &t << std::endl;
tensors[i] = &t;
});

for (size_t i = 0; i < tensors.size(); i++) {
if (tensors[i]->size() != 0) {
print_tensor(tensors[i], 10);
print_tensor (tensors[i], 10);
network::send_compressed_tensor (slave, tensors[i]);
}
}
Expand Down
7 changes: 2 additions & 5 deletions dlib/dnn/syncer/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,7 @@ struct task {
tensors = tensors_;
}

~task () {
slave_index = -1;
ready = 0;
};
~task () = default;
}; // End of class task

class task_queue {
Expand Down Expand Up @@ -92,7 +89,7 @@ class task_queue {
return false;
}

// private:
// private:
std::list<task> queue;
mutex queue_lock;
};
Expand Down
3 changes: 2 additions & 1 deletion examples/dnn_dist_leader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ int main (int argc, char **argv) try {
#if !ASYNC
dnn_leader<trainer_type> syncer (&trainer, 0);
#else
dnn_async_leader<trainer_type> syncer(&trainer, 0);
dnn_async_leader<trainer_type> syncer (&trainer, 0);
#endif
syncer.set_this_device (me);
syncer.set_isMaster (1);
Expand Down Expand Up @@ -202,6 +202,7 @@ int main (int argc, char **argv) try {
}
}
}

#else
syncer.sync();
#endif
Expand Down
Loading

0 comments on commit 2a976a5

Please sign in to comment.