diff --git a/dlib/dnn/syncer/syncer.h b/dlib/dnn/syncer/syncer.h index 10ac6a5e..6ec58c6b 100644 --- a/dlib/dnn/syncer/syncer.h +++ b/dlib/dnn/syncer/syncer.h @@ -56,12 +56,12 @@ class dnn_syncer { dnn_syncer (const dnn_syncer &) = default; dnn_syncer &operator= (const dnn_syncer &) = default; - [[deprecated("Please use dnn_leader/dnn_async_leader or dnn_worker instead of dnn_syncer.")]] + [[deprecated ("Please use dnn_leader/dnn_async_leader or dnn_worker instead of dnn_syncer.")]] dnn_syncer (int ism) { ismaster = ism; } - [[deprecated("Please use dnn_leader/dnn_async_leader or dnn_worker instead of dnn_syncer.")]] + [[deprecated ("Please use dnn_leader/dnn_async_leader or dnn_worker instead of dnn_syncer.")]] dnn_syncer (trainer_type *trainer, int ism) { this->trainer = trainer; this->ismaster = ism; @@ -210,7 +210,7 @@ class dnn_leader : public dnn_syncer { void recieve_gradients_parallism (std::vector> &all_tensors); - void update_gradients(std::vector & gradients); + void update_gradients (std::vector &gradients); void sn_sync(); @@ -237,10 +237,12 @@ class dnn_async_leader : public dnn_leader { int recieve_gradients_from_one (int slave_index, std::vector &cli_tensors); + void send_parameters (int slave_index, std::vector ¶meters); + void sync(); private: - void async_thread(int); + void async_thread (int); std::vector recievers; diff --git a/dlib/dnn/syncer/syncer_async.h b/dlib/dnn/syncer/syncer_async.h index c498a58c..6d7da298 100644 --- a/dlib/dnn/syncer/syncer_async.h +++ b/dlib/dnn/syncer/syncer_async.h @@ -36,34 +36,6 @@ void dnn_async_leader::init_reciever_pool() { } }; -template -void dnn_async_leader::sync() { - int last = -1; - - while (1) { - int now = 0; - - while (this->tq.queue_lock.trylock() == 0) {}; - - for (auto i = this->tq.queue.begin(); i != this->tq.queue.end(); i ++) { - if ((*i).ready == 1) { - now ++; - } - } - - this->tq.queue_lock.unlock(); - - if (now != last) { - std::cout << "Now we have " << now << " tasks." << std::endl; - last = now; - } - - } - - for (size_t i = 0; i < this->recievers.size(); i ++) { - this->recievers[i]->join(); - } -}; template void dnn_async_leader::async_thread (int slave_index) { @@ -88,10 +60,11 @@ void dnn_async_leader::async_thread (int slave_index) { task t (slave_index, 1, gradients); this->tq.add_task (t); + this->trainer->train_noop(); // HPZ: Very important while (this->send_back_flags[slave_index] == 0) {} - // this->send_parameters(); + this->send_parameters (slave_index, this->send_back_paras[slave_index]); this->send_back_flags[slave_index] = 0; } @@ -99,7 +72,8 @@ void dnn_async_leader::async_thread (int slave_index) { template int dnn_async_leader::recieve_gradients_from_one (int slave_index, std::vector &cli_tensors) { - std::cout << slave_index << ":"<<&this->slaves_conns << std::endl; + // std::cout << slave_index << ":" << &this->slaves_conns << std::endl; + for (size_t i = 0; i < cli_tensors.size(); i++) { if (cli_tensors[i].size() != 0) { network::recieve_compressed_tensor (this->slaves_conns[slave_index], &cli_tensors[i]); @@ -110,6 +84,86 @@ int dnn_async_leader::recieve_gradients_from_one (int slave_index, }; +template +void dnn_async_leader::send_parameters (int slave_index, std::vector &tensors) { + + for (size_t i = 0; i < tensors.size(); i++) { + if (tensors[i].size() != 0) { + print_tensor (&tensors[i], 10); + network::send_compressed_tensor (this->slaves_conns[slave_index], &tensors[i]); + } + } + +} + + +template +void dnn_async_leader::sync() { + + while (1) { + + while (this->tq.queue_lock.trylock() == 0) {}; + + auto i = this->tq.queue.begin(); + + for (i = this->tq.queue.begin(); i != this->tq.queue.end(); i ++) { + if ((*i).ready == 1) { + + while (this->trainer->status_lock.trylock() == 0); + + this->trainer->synchronization_status = 0; + this->trainer->status_lock.unlock(); + + // Let's rock it! + (*i).ready = 0; + this->tq.queue_lock.unlock(); + + // Update to trainer + std::vector temp (this->trainer->num_computational_layers); + + for (size_t j = 0; j < temp.size(); j ++) { + temp[j] = & ((*i).tensors[j]); + } + + while (this->trainer->synchronization_status != 1) { } + + this->update_gradients (temp); + + while (this->trainer->status_lock.trylock() == 0); + + if (this->trainer->synchronization_status != 1) + std::cout << "Something wrong with sync lock: current: " << this->trainer->synchronization_status << "\t Going to set: 2" << std::endl; + + this->trainer->synchronization_status = 2; + this->trainer->status_lock.unlock(); + + // Wait for result + while (this->trainer->synchronization_status != 3) { } + + visit_layer_parameters (this->trainer->devices[0]->net, [&] (size_t k, tensor & t) { + std::cout << "SP get parameteres from" << &t << std::endl; + this->send_back_paras[ (*i).slave_index][k] = t; + }); + + this->send_back_flags[ (*i).slave_index] = 1; + + while (this->tq.queue_lock.trylock() == 0) {}; + + this->tq.queue.erase (i); + + this->tq.queue_lock.unlock(); + + break; + } + } + + + } + + for (size_t i = 0; i < this->recievers.size(); i ++) { + this->recievers[i]->join(); + } +}; } diff --git a/dlib/dnn/syncer/syncer_leader_default.h b/dlib/dnn/syncer/syncer_leader_default.h index 9b042076..4c93f8de 100644 --- a/dlib/dnn/syncer/syncer_leader_default.h +++ b/dlib/dnn/syncer/syncer_leader_default.h @@ -149,13 +149,13 @@ void dnn_leader::send_parameters (connection *slave) { tensors.resize (this->trainer->num_computational_layers); visit_layer_parameters (this->trainer->devices[0]->net, [&] (size_t i, tensor & t) { - std::cout<<"SP get parameteres from" << &t << std::endl; + std::cout << "SP get parameteres from" << &t << std::endl; tensors[i] = &t; }); for (size_t i = 0; i < tensors.size(); i++) { if (tensors[i]->size() != 0) { - print_tensor(tensors[i], 10); + print_tensor (tensors[i], 10); network::send_compressed_tensor (slave, tensors[i]); } } diff --git a/dlib/dnn/syncer/utils.h b/dlib/dnn/syncer/utils.h index b09389fc..1a00cb1b 100644 --- a/dlib/dnn/syncer/utils.h +++ b/dlib/dnn/syncer/utils.h @@ -48,10 +48,7 @@ struct task { tensors = tensors_; } - ~task () { - slave_index = -1; - ready = 0; - }; + ~task () = default; }; // End of class task class task_queue { @@ -92,7 +89,7 @@ class task_queue { return false; } - // private: + // private: std::list queue; mutex queue_lock; }; diff --git a/examples/dnn_dist_leader.cpp b/examples/dnn_dist_leader.cpp index 8f33e714..3c107404 100644 --- a/examples/dnn_dist_leader.cpp +++ b/examples/dnn_dist_leader.cpp @@ -133,7 +133,7 @@ int main (int argc, char **argv) try { #if !ASYNC dnn_leader syncer (&trainer, 0); #else - dnn_async_leader syncer(&trainer, 0); + dnn_async_leader syncer (&trainer, 0); #endif syncer.set_this_device (me); syncer.set_isMaster (1); @@ -202,6 +202,7 @@ int main (int argc, char **argv) try { } } } + #else syncer.sync(); #endif diff --git a/examples/dnn_dist_worker.cpp b/examples/dnn_dist_worker.cpp index 8510b961..75de9bfb 100644 --- a/examples/dnn_dist_worker.cpp +++ b/examples/dnn_dist_worker.cpp @@ -9,7 +9,7 @@ The specific network we will run is from the paper LeCun, Yann, et al. "Gradient-based learning applied to document recognition." Proceedings of the IEEE 86.11 (1998): 2278-2324. - except that we replace the sigmoid non-linearities with rectified linear units. + except that we replace the sigmoid non-linearities with rectified linear units. These tools will use CUDA and cuDNN to drastically accelerate network training and testing. CMake should automatically find them if they are @@ -30,38 +30,37 @@ using namespace std; using namespace dlib; using std::chrono::system_clock; -std::vector loadSlaves(char* filename) { - std::ifstream f; - f.open(filename); +std::vector loadSlaves (char *filename) { + std::ifstream f; + f.open (filename); std::vector ret; - int number = 0; - std::string ip; - int port = -1; - while(f >> ip >> port) { - device temp(number, ip, port); - ret.push_back(temp); + int number = 0; + std::string ip; + int port = -1; + + while (f >> ip >> port) { + device temp (number, ip, port); + ret.push_back (temp); } f.close(); return ret; } -int main(int argc, char** argv) try -{ +int main (int argc, char **argv) try { // signal (SIGINT, to_exit); - // This example is going to run on the MNIST dataset. - if (argc < 2) - { + // This example is going to run on the MNIST dataset. + if (argc < 2) { cout << "Master program has invalid argumnets" << endl; return 1; } - char* data_path; // Training & Testing data - char* slave_path; // File contains all slave ip and port information + char *data_path; // Training & Testing data + char *slave_path; // File contains all slave ip and port information int ismaster = 0; device me; @@ -71,34 +70,34 @@ int main(int argc, char** argv) try std::vector slave_list; me.ip = argv[1]; - me.port = atoi(argv[2]); - me.number = atoi(argv[3]); + me.port = atoi (argv[2]); + me.number = atoi (argv[3]); // Print self information std::cout << "Local Machine info:\n"; std::cout << "slave" << " " << me.ip << ":" << me.port << " " << me.number << std::endl; - for(int i =1; i < argc; i++) { - if(strcmp(argv[i], "-d")==0){ - data_path = argv[i+1]; + for (int i = 1; i < argc; i++) { + if (strcmp (argv[i], "-d") == 0) { + data_path = argv[i + 1]; std::cout << "Dataset:\t" << data_path << std::endl; } - if(strcmp(argv[i], "-s")==0){ - slave_path = argv[i+1]; + if (strcmp (argv[i], "-s") == 0) { + slave_path = argv[i + 1]; std::cout << "Slaveset:\t" << slave_path << std::endl; } } // Get slaves - slave_list = loadSlaves(slave_path); + slave_list = loadSlaves (slave_path); // Get data - dataset, unsigned long> training(load_mnist_training_data, data_path); - dataset, unsigned long> testing(load_mnist_testing_data, data_path); - training = training.split(0, 1000); - dataset, unsigned long> local_training = training = training.split_by_group(slave_list.size(), me.number); + dataset, unsigned long> training (load_mnist_training_data, data_path); + dataset, unsigned long> testing (load_mnist_testing_data, data_path); + training = training.split (0, 1000); + dataset, unsigned long> local_training = training = training.split_by_group (slave_list.size(), me.number); std::cout << local_training.getData().size() << std::endl; @@ -107,99 +106,106 @@ int main(int argc, char** argv) try /* * Define net_type (original by dlib) */ - using net_type = loss_multiclass_log< - fc<10, - relu> - >>>>>>>>>>>>; + using net_type = loss_multiclass_log < + fc<10, + relu> + >>>>>>>>>> >>; net_type net; - dnn_trainer trainer(net); + dnn_trainer trainer (net); - trainer.set_learning_rate(0.01); - trainer.set_min_learning_rate(0.00001); - trainer.set_mini_batch_size(128); - trainer.be_verbose(); + trainer.set_learning_rate (0.01); + trainer.set_min_learning_rate (0.00001); + trainer.set_mini_batch_size (128); + trainer.be_verbose(); char sync_filename[30]; - sprintf(sync_filename, "backup.%d.mm", me.number); - trainer.set_synchronization_file(sync_filename, std::chrono::seconds(20)); + sprintf (sync_filename, "backup.%d.mm", me.number); + trainer.set_synchronization_file (sync_filename, std::chrono::seconds (20)); // HPZ: Setup synchronized protocol and test for the connection availablitiy. using trainer_type = dnn_trainer; - dnn_worker syncer(&trainer, 0); - syncer.set_this_device(me); - + dnn_worker syncer (&trainer, 0); + syncer.set_this_device (me); + // TODO: Wait for master connect - if(!syncer.wait_for_master_init()){ + if (!syncer.wait_for_master_init()) { std::cerr << "Error happens when master send init message" << std::endl; - exit(0); + exit (0); } trainer.isDistributed = 1; - + // HPZ: Manually check if any problems happened in the init - sleep((unsigned int) 0); + sleep ((unsigned int) 0); int epoch = 0, batch = 0; int mark = 0; auto time = 0; - while(true){ + + while (true) { mark += 1; auto epoch_time = system_clock::now(); // HPZ: Counting - - while(trainer.status_lock.trylock() == 0); + + while (trainer.status_lock.trylock() == 0); + if (trainer.synchronization_status != 3) std::cout << "Something wrong with sync lock: current: " << trainer.synchronization_status << "\t Going to set: 0" << std::endl; + trainer.synchronization_status = 0; std::cout << "[dnn_master]: init done, may start to train" << std::endl; trainer.status_lock.unlock(); - epoch += trainer.train_one_batch(local_training.getData(), local_training.getLabel()); + epoch += trainer.train_one_batch (local_training.getData(), local_training.getLabel()); // Wait for ready std::cout << "Im here" << std::endl; - while(trainer.synchronization_status != 1) {asm("");}//std::cout<<"wait to sync" << std::endl;} + + while (trainer.synchronization_status != 1) { + asm (""); + }//std::cout<<"wait to sync" << std::endl;} + // std::cout << "[dnn_master]: start to sync" << std::endl; - std::cout << "(train time " << std::chrono::duration_cast(system_clock::now() - epoch_time).count() << std::endl; // HPZ: Counting + std::cout << "(train time " << std::chrono::duration_cast (system_clock::now() - epoch_time).count() << std::endl; // HPZ: Counting // std::cout << "[Before]" << std::endl; // accuracy(net, local_training_images, local_training_labels); // accuracy(net, testing_images, testing_labels); + sleep ((unsigned int) me.number); auto sync_time = system_clock::now(); // HPZ: Counting syncer.sn_sync(); - std::cout << "(sync time " << std::chrono::duration_cast(system_clock::now() - sync_time).count() << std::endl; // HPZ: Counting - + std::cout << "(sync time " << std::chrono::duration_cast (system_clock::now() - sync_time).count() << std::endl; // HPZ: Counting + // serialize(trainer, std::cout); // Wait for all devices send back to their paramaters - while(trainer.synchronization_status != 3) {}//std::cout <<"wait to update"<(system_clock::now() - epoch_time).count() << std::endl; // HPZ: Counting - time += std::chrono::duration_cast(system_clock::now() - epoch_time).count(); + std::cout << "Time for batch is " + << std::chrono::duration_cast (system_clock::now() - epoch_time).count() << std::endl; // HPZ: Counting + time += std::chrono::duration_cast (system_clock::now() - epoch_time).count(); std::cout << trainer.learning_rate << std::endl; // std::cout << "[After]" << std::endl; - local_training.accuracy(net); + local_training.accuracy (net); // accuracy(net, testing_images, testing_labels); - // - - if(ismaster) - { + // + + if (ismaster) { if (trainer.learning_rate <= 0.001) { std::cout << "---------------------------" << std::endl; std::cout << "|Exit because l_rate |" << std::endl; std::cout << "---------------------------" << std::endl; - break; - } + break; + } if (epoch >= 60) { std::cout << "---------------------------" << std::endl; @@ -209,12 +215,13 @@ int main(int argc, char** argv) try } } - + } + // trainer.train(training_images, training_labels); - local_training.accuracy(net); - testing.accuracy(net); + local_training.accuracy (net); + testing.accuracy (net); std::cout << "All time: " << time << std::endl; std::cout << trainer << std::endl; @@ -226,7 +233,7 @@ int main(int argc, char** argv) try // about that kind of transient data so that our file will be smaller. We do this by // "cleaning" the network before saving it. net.clean(); - serialize("mnist_network.dat") << net; + serialize ("mnist_network.dat") << net; // Now if we later wanted to recall the network from disk we can simply say: // deserialize("mnist_network.dat") >> net; @@ -261,10 +268,8 @@ int main(int argc, char** argv) try // Finally, you can also save network parameters to XML files if you want to do // something with the network in another tool. For example, you could use dlib's // tools/convert_dlib_nets_to_caffe to convert the network to a caffe model. - net_to_xml(net, "lenet.xml"); -} -catch(std::exception& e) -{ + net_to_xml (net, "lenet.xml"); +} catch (std::exception &e) { cout << e.what() << endl; }