Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SINGA-135 Improve hybrid partitioning #154

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
175 changes: 97 additions & 78 deletions src/neuralnet/neuralnet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
*
*************************************************************/


#include "singa/neuralnet/neuralnet.h"
#include <unordered_map>
#include <algorithm>
Expand Down Expand Up @@ -132,15 +131,13 @@ NeuralNet* NeuralNet::Create(const NetProto& net_conf, Phase phase,
const NetProto NeuralNet::Unrolling(const NetProto& net_conf) {
// Step 1: Unroll each layer & set parameter sharing
NetProto conf;

std::vector<std::vector<int>> layer_groups;
std::unordered_map<string, int> org_layer_names;
for (int index = 0; index < net_conf.layer_size(); index ++) {
for (int index = 0; index < net_conf.layer_size(); index++) {
const LayerProto& org_layer = net_conf.layer(index);
org_layer_names[org_layer.name()] = index; // layer_name -> index

std::vector<int> layer_group;
for (int i = 0; i < org_layer.unroll_len(); i ++) { // unroll
for (int i = 0; i < org_layer.unroll_len(); i++) { // unroll
LayerProto* unroll_layer = conf.add_layer();
unroll_layer->CopyFrom(org_layer); // create a new layer conf
// update layer names
Expand All @@ -149,7 +146,7 @@ const NetProto NeuralNet::Unrolling(const NetProto& net_conf) {
unroll_layer->set_name(sstm.str());
unroll_layer->set_unroll_index(i);
// update layer parameter sharing
for (int j = 0; j < unroll_layer->param_size(); j ++) {
for (int j = 0; j < unroll_layer->param_size(); j++) {
ParamProto* param = unroll_layer->mutable_param(j);
if (i > 0) {
param->set_share_from("0#" + param->name());
Expand All @@ -163,30 +160,27 @@ const NetProto NeuralNet::Unrolling(const NetProto& net_conf) {
unroll_layer->clear_unroll_conn_type();
unroll_layer->clear_shift();
unroll_layer->clear_srclayers();

layer_group.push_back(conf.layer_size() - 1);
// LOG(ERROR) << "unrolling layer " << unroll_layer->name();
}
layer_groups.push_back(layer_group);
}
// Step 2: Connect unrolled layers by setting `srclayers`
for (int index = 0; index < net_conf.layer_size(); index ++) {
for (int index = 0; index < net_conf.layer_size(); index++) {
const LayerProto& org_layer = net_conf.layer(index);
if (org_layer.srclayers_size() == 0)
continue; // no src layer
for (int i = 0; i < org_layer.srclayers_size(); i ++) {
for (int i = 0; i < org_layer.srclayers_size(); i++) {
const string& org_layer_src = org_layer.srclayers(i);
singa::UnrollConnType unroll_conn_type = kUnrollOneToOne;
if (i < org_layer.unroll_conn_type_size())
unroll_conn_type = org_layer.unroll_conn_type(i);
unsigned int shift = 0;
if (i < org_layer.shift_size())
shift = org_layer.shift(i);

const std::vector<int> unroll_layer_srcs
= layer_groups[org_layer_names[org_layer_src]];

for (unsigned int j = 0; j < layer_groups[index].size(); j ++) {
for (unsigned int j = 0; j < layer_groups[index].size(); j++) {
LayerProto* unroll_layer = conf.mutable_layer(layer_groups[index][j]);
// Update src layers of `unroll_layer` by considering the types
if (unroll_conn_type == kUnrollOneToAll) {
Expand All @@ -205,10 +199,9 @@ const NetProto NeuralNet::Unrolling(const NetProto& net_conf) {
}
}
}

// TODO(fanju): add LSTM when it is ready
if (org_layer.type() == kGRU) { // connect GRU layers
for (unsigned int j = 1; j < layer_groups[index].size(); j ++) {
for (unsigned int j = 1; j < layer_groups[index].size(); j++) {
LayerProto* unroll_layer = conf.mutable_layer(layer_groups[index][j]);
string srcname = conf.layer(layer_groups[index][j-1]).name();
unroll_layer->add_srclayers(srcname);
Expand All @@ -218,31 +211,31 @@ const NetProto NeuralNet::Unrolling(const NetProto& net_conf) {
return conf;
}


NeuralNet::NeuralNet(NetProto netproto, int npartitions) {
LOG(INFO) << "Constructing NeuralNet...";
auto graph = CreateGraph(netproto, npartitions);
CreateNetFromGraph(graph);
PrepareDataStructures();

for (Node* node : graph->nodes())
delete static_cast<LayerProto*>(node->proto);
delete graph;
LOG(INFO) << "NeuralNet Constructed";
unroll_len_ = netproto.unroll_len();
LOG(INFO) << "NeuralNet Constructed";
}

NeuralNet::~NeuralNet() {
for (auto layer : layers_)
delete layer;
}

void NeuralNet::Load(const vector<string>& paths) {
unordered_map<string, Param*> params;
for (auto p : params_) {
params[p->name()] = p;
}
Load(paths, params);
}

void NeuralNet::Load(const vector<string>& paths,
const unordered_map<string, Param*>& params) {
for (const auto path : paths) {
Expand Down Expand Up @@ -360,24 +353,51 @@ NetProto NeuralNet::AddPartitionConnectionLayers(const NetProto& netproto,
}
/*
* Add Slice, Concate, Split Layers for Model Partition
*
*
* All cases are as follows:
* src_pdim | dst_pdim | connection_type | Action
* 0 | 0 | OneToOne | Direct Connection
* 1 | 1 | OneToOne | Direct Connection
* 0 | 0 | OneToAll | Direct Connection
* 1 | 0 | OneToOne | Slice -> Concate
* 0 | 1 | OneToOne | Slice -> Concate
* 1 | 0 | OneToAll | Slice -> Concate
* 0 | 1 | OneToAll | Split -> Concate
* 1 | 1 | OneToAll | Split -> Concate
* src_pdim | dst_pdim | connection_type(+) | Action
* 0 | 0 | OneToOne | Direct Connection
* 1 | 1 | OneToOne | Direct Connection
* 0 | 0 | OneToAll | Direct Connection
* 1 | 0 | OneToOne | Slice -> Concate
* 0 | 1 | OneToOne | Slice -> Concate
* 1 | 0 | OneToAll | Slice -> Concate
* 0 | 1 | OneToAll | Split -> Concate
* 1 | 1 | OneToAll | Split -> Concate
* (include non-partitioned cases)
* -1 | -1 | OneToOne | Direct Connection
* -1 | -1 | OneToAll | Direct Connection
* 0 | -1 | OneToOne | Concate
* 0 | -1 | OneToAll | Concate
* 1 | -1 | OneToOne | Concate
* 1 | -1 | OneToAll | Concate
* -1 | 0 | OneToOne | Slice
* -1 | 0 | OneToAll | Slice
* -1 | 1 | OneToOne | Slice
* -1 | 1 | OneToAll | Split
* (+)
* OneToOne: each src connects to one dst
* OneToAll: each src connects to all dst
*
* Logic:
* dst_pdim = 1 && OneToAll ?
* (YES) Split -> Concate
* (NO) src_pdim = dst_pdim ?
* (YES) Direct Connection
* (NO) Slice -> Concate
* For each pair of (src,dst), we determine if to add connection layers:
* src -> slice/split (*optional) -> concate (*optional) -> dst
*
* slice/split layers and concate can be determined independently:
*
* + Add Slice/Split Layer
* IF src_pdim != dst_pdim AND dst_pdim != -1 ?
* - (YES) IF dst_pdim = 1 AND OneToAll ?
* - (YES) Split
* - (NO) Slice
* - (NO) No Action
*
* + Add Concate Layer
* IF src_pdim != dst_pdim AND src_pdim != -1 ?
* - (YES) Add Concate
* - (NO) IF src_pdim = dst_pdim = 1 AND OneToAll ?
* - (YES) Add Concate
* - (No) No Action
*/
for (const LayerProto& origin_layer : netproto.layer()) {
LayerProto* dst_layer = name2proto[origin_layer.name()];
Expand All @@ -386,57 +406,55 @@ NetProto NeuralNet::AddPartitionConnectionLayers(const NetProto& netproto,
for (const string& src_name : origin_layer.srclayers()) {
LayerProto* src_layer = name2proto[src_name];
int src_pdim = src_layer->partition_dim();
// dst_pdim = 1 && OneToAll ?
if (dst_pdim == 1 && connection == kOneToAll) {
// add split layer
LayerProto* split_layer = net_w_connection.add_layer();
split_layer->set_name(splitName(src_layer->name()));
split_layer->set_type(kSplit);
split_layer->set_partition_dim(src_layer->partition_dim());
split_layer->add_srclayers(src_layer->name());
split_layer->mutable_split_conf()->set_num_splits(npartitions);
// add concate layer
LayerProto* concate_layer = net_w_connection.add_layer();
concate_layer->set_name(concateName(split_layer->name()));
concate_layer->set_type(kConcate);
concate_layer->set_partition_dim(dst_layer->partition_dim());
// concate on src_pdim
concate_layer->mutable_concate_conf()
->set_concate_dim(src_layer->partition_dim());
concate_layer->mutable_concate_conf()->set_num_concates(npartitions);
concate_layer->add_srclayers(split_layer->name());
// connect dst_layer to concate layer
dst_layer->add_srclayers(concate_layer->name());
} else {
// src_pdim = dst_pdim ?
if (dst_pdim == src_pdim) {
// direct connection
dst_layer->add_srclayers(src_layer->name());
// create tmp_src and tmp_dst layers, they will be connected at the end
// tmp_src could be original src or split or slice
// tmp_dst could be original dst or concate
LayerProto* tmp_src = src_layer;
LayerProto* tmp_dst = dst_layer;
// Add Slice/Split Layer
// IF src_pdim != dst_pdim AND dst_pdim != -1 ?
if (src_pdim != dst_pdim && dst_pdim != -1) {
// IF dst_pdim = 1 AND OneToAll ?
if (dst_pdim == 1 && connection == kOneToAll) {
// add split layer
LayerProto* split_layer = net_w_connection.add_layer();
split_layer->set_name(splitName(src_layer->name()));
split_layer->set_type(kSplit);
split_layer->set_partition_dim(src_layer->partition_dim());
split_layer->mutable_split_conf()->set_num_splits(npartitions);
split_layer->add_srclayers(src_layer->name());
tmp_src = split_layer;
} else {
// add slice layer
LayerProto* slice_layer = net_w_connection.add_layer();
slice_layer->set_name(sliceName(src_layer->name()));
slice_layer->set_type(kSlice);
slice_layer->set_partition_dim(src_layer->partition_dim());
// slice on dst_pdim
slice_layer->mutable_slice_conf()->set_num_slices(npartitions);
slice_layer->mutable_slice_conf()
->set_slice_dim(dst_layer->partition_dim());
slice_layer->mutable_slice_conf()->set_num_slices(npartitions);
slice_layer->add_srclayers(src_layer->name());
// add concate layer
LayerProto* concate_layer = net_w_connection.add_layer();
concate_layer->set_name(concateName(slice_layer->name()));
concate_layer->set_type(kConcate);
concate_layer->set_partition_dim(dst_layer->partition_dim());
// concate on src_pdim
concate_layer->mutable_concate_conf()
->set_concate_dim(src_layer->partition_dim());
concate_layer->mutable_concate_conf()->set_num_concates(npartitions);
concate_layer->add_srclayers(slice_layer->name());
// connect dst_layer to concate layer
dst_layer->add_srclayers(concate_layer->name());
tmp_src = slice_layer;
}
}
// Add Concate Layer
// IF src_pdim != dst_pdim AND src_pdim != -1 ?
// IF src_pdim = dst_pdim = 1 AND OneToAll ?
if ((src_pdim != dst_pdim && src_pdim != -1)
|| (src_pdim == 1 && dst_pdim == 1 && connection == kOneToAll)) {
// add concate layer
LayerProto* concate_layer = net_w_connection.add_layer();
concate_layer->set_name(concateName(tmp_src->name()));
concate_layer->set_type(kConcate);
concate_layer->set_partition_dim(dst_layer->partition_dim());
concate_layer->mutable_concate_conf()
->set_concate_dim(src_layer->partition_dim());
concate_layer->mutable_concate_conf()->set_num_concates(npartitions);
dst_layer->add_srclayers(concate_layer->name());
tmp_dst = concate_layer;
}
// Connect tmp_src and tmp_dst
tmp_dst->add_srclayers(tmp_src->name());
}
}
LOG(INFO) << "NeuralNet Config After Adding Connection Layers is\n"
Expand All @@ -448,21 +466,22 @@ Graph* NeuralNet::CreateGraph(const NetProto& netproto, int npartitions) {
NetProto net_w_split = AddModelSplitLayers(netproto);
NetProto net_w_connection =
AddPartitionConnectionLayers(net_w_split, npartitions);
// for each original layer proto, create #npartitions of nodes
// for each original layer proto, create #npart of nodes
Graph* graph = new Graph();
map<string, vector<Node*>> name2nodes;
map<string, const LayerProto*> name2proto;
for (const LayerProto& layer : net_w_connection.layer()) {
vector<Node*> nodes;
for (int i = 0; i < npartitions; i++) {
int npart = layer.partition_dim() == -1 ? 1 : npartitions;
for (int i = 0; i < npart; i++) {
LayerProto *proto = new LayerProto(layer);
// differentiate partitions
string nodename = layer.name() + "@" + std::to_string(i);
proto->set_name(nodename);
proto->set_type(layer.type());
proto->set_partition_dim(layer.partition_dim());
proto->set_partition_id(i);
proto->set_num_partitions(npartitions);
proto->set_num_partitions(npart);
Node* node = graph->AddNode(nodename, layer.name(), i, proto);
nodes.push_back(node);
// TODO(wangwei) update param name
Expand All @@ -475,7 +494,8 @@ Graph* NeuralNet::CreateGraph(const NetProto& netproto, int npartitions) {
vector<Node*> dst_nodes = name2nodes[origin_layer.name()];
for (const string& src_name : origin_layer.srclayers()) {
vector<Node*> src_nodes = name2nodes[src_name];
if (origin_layer.type() != kConcate) {
if (src_nodes.size() > 1 && dst_nodes.size() > 1
&& origin_layer.type() != kConcate) {
for (size_t i = 0; i < src_nodes.size(); ++i) {
CHECK_EQ(src_nodes[i]->partition_id, i);
CHECK_EQ(dst_nodes[i]->partition_id, i);
Expand Down Expand Up @@ -548,7 +568,7 @@ void NeuralNet::CreateNetFromGraph(Graph* graph) {
LOG(INFO) << "constructing graph: " << node->name;
auto layer = name2layer(node->name);
layer->Setup(*(static_cast<LayerProto*>(node->proto)), srclayers(layer));
DLOG(INFO) << "constructing graph: " << layer->name();
// DLOG(INFO) << "constructing graph: " << layer->name();
layerinfo[layer->name()] = IntVecToString(layer->data(nullptr).shape());
for (auto param : layer->GetParams()) {
param->set_id(paramid++);
Expand Down Expand Up @@ -580,7 +600,6 @@ void NeuralNet::CreateNetFromGraph(Graph* graph) {
}
}
}

// share params due to laye unrolling
for (auto & entry : name2param) {
Param* param = entry.second;
Expand Down
2 changes: 1 addition & 1 deletion src/worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ void Worker::Run() {
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
LOG(ERROR) << "Worker (group = " << grp_id_ <<", id = " << id_ << ") "
<< " start on " << (device >= 0 ? "GPU " + std::to_string(device) : "CPU");
<< "start on " << (device >= 0 ? "GPU " + std::to_string(device) : "CPU");
if (device >= 0)
context->ActivateDevice(device);

Expand Down