Skip to content

Commit

Permalink
[log] report average utilization of cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
curiosityyy committed Mar 7, 2019
1 parent 1ce79fc commit 69bb483
Show file tree
Hide file tree
Showing 14 changed files with 143 additions and 36 deletions.
40 changes: 20 additions & 20 deletions conf/workers10.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,71 +2,71 @@
"worker" : [
{
"id" : 0,
"cpu" : 20,
"memory" : 40,
"cpu" : 32,
"memory" : 128,
"disk" : 1,
"network" : 1
},
{
"id" : 1,
"cpu" : 20,
"memory" : 40,
"cpu" : 32,
"memory" : 128,
"disk" : 1,
"network" : 1
},
{
"id" : 2,
"cpu" : 20,
"memory" : 40,
"cpu" : 32,
"memory" : 128,
"disk" : 1,
"network" : 1
},
{
"id" : 3,
"cpu" : 20,
"memory" : 40,
"cpu" : 32,
"memory" : 128,
"disk" : 1,
"network" : 1
},
{
"id" : 4,
"cpu" : 20,
"memory" : 40,
"cpu" : 32,
"memory" : 128,
"disk" : 1,
"network" : 1
},
{
"id" : 5,
"cpu" : 20,
"memory" : 40,
"cpu" : 32,
"memory" : 128,
"disk" : 1,
"network" : 1
},
{
"id" : 6,
"cpu" : 20,
"memory" : 40,
"cpu" : 32,
"memory" : 128,
"disk" : 1,
"network" : 1
},
{
"id" : 7,
"cpu" : 20,
"memory" : 40,
"cpu" : 32,
"memory" : 128,
"disk" : 1,
"network" : 1
},
{
"id" : 8,
"cpu" : 20,
"memory" : 40,
"cpu" : 32,
"memory" : 128,
"disk" : 1,
"network" : 1
},
{
"id" : 9,
"cpu" : 20,
"memory" : 40,
"cpu" : 32,
"memory" : 128,
"disk" : 1,
"network" : 1
}
Expand Down
2 changes: 1 addition & 1 deletion scripts/plot.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def plot(self, size, df, output_file):
ax.set_xlabel(self.xlabel)
ax.set_ylabel(self.ylabel)
ax.set_xlim([0, size-1])
ax.set_ylim([0, 1])
ax.set_ylim([0, 1.5])

# TODO xlim
df.plot(linewidth=self.linewidth, ax=ax)
Expand Down
5 changes: 5 additions & 0 deletions src/job/job.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ class Job {
sg.SetIsNodeManager();
}
}
void SetResourcesReq() {
for (auto &sg : subgraphs_) {
sg.SetResourcesReq();
}
}

friend void from_json(const json &j, Job &job) {
j.at("jobid").get_to(job.job_id_);
Expand Down
3 changes: 1 addition & 2 deletions src/job/subgraph.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,6 @@ class SubGraph {

friend void from_json(const json &j, SubGraph &sg) {
j.at("shardtask").get_to(sg.shard_tasks_);
sg.SetResourcesReq();
sg.memory_ = sg.GetMemoryCap();
}

double GetMemoryCap() {
Expand Down Expand Up @@ -173,6 +171,7 @@ class SubGraph {
}

void SetResourcesReq() {
memory_ = GetMemoryCap();
if (is_worker_) {
for (auto &st : shard_tasks_) {
if (st.GetResourceType() == ResourceType::kCPU) {
Expand Down
4 changes: 3 additions & 1 deletion src/job_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ class JobManager : public EventHandler {
if (executor == "YARN") {
job_.SetIsNodeManager();
}
job_.SetResourcesReq();
RegisterHandlers();
BuildDependencies();
}
Expand Down Expand Up @@ -132,7 +133,8 @@ class JobManager : public EventHandler {
subgraph_finished_task_[subgraph_id]++;
// subgraph finish update memory
if (sg.GetShardTasks().size() == subgraph_finished_task_[subgraph_id]) {
workers_.at(sg.GetWorkerID())->SubGraphFinish(time, sg.GetMemory());
workers_.at(sg.GetWorkerID())
->SubGraphFinish(time, sg.GetResourcePack());
users_->at(job_.GetUserID()).SubGraphFinish(time, sg.GetMemory());
DLOG(INFO) << "finish subgraph: " << subgraph_id
<< " of job id: " << sg.GetJobID();
Expand Down
36 changes: 36 additions & 0 deletions src/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <chrono>
#include <ctime>
#include <fstream>
#include <string>

Expand All @@ -26,6 +28,7 @@
using json = nlohmann::json;

json ReadJsonFromFile(const std::string &file) {
auto read_start = std::chrono::system_clock::now();
std::ifstream in(file, std::ios::in);
CHECK(in.is_open()) << "Cannot open json file " << file;
std::string ret;
Expand All @@ -34,6 +37,10 @@ json ReadJsonFromFile(const std::string &file) {
in.seekg(0, std::ios::beg);
in.read(&ret[0], ret.size());
in.close();
auto read_end = std::chrono::system_clock::now();
std::cout << "Reading " << file << " uses time : "
<< std::chrono::duration<double>(read_end - read_start).count()
<< std::endl;
return json::parse(ret);
}

Expand All @@ -59,13 +66,42 @@ int main(int argc, char **argv) {
FLAGS_logbuflevel = -1;

LOG(INFO) << "start simulation";
auto init_start = std::chrono::system_clock::now();
axe::simulation::Simulator simulator(ReadJsonFromFile(argv[1]),
ReadJsonFromFile(argv[2]), argv[4]);
SetAlgorithm(ReadJsonFromFile(argv[3]));
auto init_end = std::chrono::system_clock::now();
std::cout << "Init uses time : "
<< std::chrono::duration<double>(init_end - init_start).count()
<< std::endl;

auto init2_start = std::chrono::system_clock::now();
simulator.Init(argv[4]);
auto init2_end = std::chrono::system_clock::now();
std::cout << "Init2 uses time : "
<< std::chrono::duration<double>(init2_end - init2_start).count()
<< std::endl;

/*
auto print_start = std::chrono::system_clock::now();
simulator.Print();
auto print_end = std::chrono::system_clock::now();
std::cout << "Print uses time : " << std::chrono::duration<double>(print_end -
print_start).count() << std::endl;
*/

auto serve_start = std::chrono::system_clock::now();
simulator.Serve();
auto serve_end = std::chrono::system_clock::now();
std::cout << "Serving use time : "
<< std::chrono::duration<double>(serve_end - serve_start).count()
<< std::endl;
auto report_start = std::chrono::system_clock::now();
simulator.Report();
auto report_end = std::chrono::system_clock::now();
std::cout << "Reportation use time : "
<< std::chrono::duration<double>(report_end - report_start).count()
<< std::endl;
LOG(INFO) << "simulation end";

google::FlushLogFiles(google::INFO);
Expand Down
1 change: 1 addition & 0 deletions src/resource/resource.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ class ResourcePack {
void SetNetwork(double network) {
resource_[static_cast<int>(ResourceType::kNetwork)] = network;
}
void SetResourceByIndex(int idx, double value) { resource_[idx] = value; }

ResourcePack Add(const ResourcePack &rhs) const {
ResourcePack result;
Expand Down
41 changes: 40 additions & 1 deletion src/simulator.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,13 +121,51 @@ class Simulator {
}

void Report() {
average_records_.resize(100000);
for (int i = 0; i < 100000; i++) {
average_records_.at(i).resize(4);
}
std::string prefix = "report/worker_";
std::string suffix = ".csv";
for (int i = 0; i < workers_abstract_.size(); ++i) {
std::ofstream fout(prefix + std::to_string(i) + suffix, std::ios::out);
workers_abstract_[i]->ReportUtilization(fout);
workers_abstract_[i]->ReportUtilization(fout, average_records_);
fout.close();
std::cout << "average usage of worker " << i
<< " CPU : " << workers_abstract_[i]->GetAverageUsage()[0]
<< ", Memory : " << workers_abstract_[i]->GetAverageUsage()[1]
<< ", Disk : " << workers_abstract_[i]->GetAverageUsage()[2]
<< ", Network : " << workers_abstract_[i]->GetAverageUsage()[3]
<< "\n";
}
std::ofstream fout(std::string("report/cluster_average") + suffix,
std::ios::out);
fout << "#CPU"
<< "\t"
<< "MEMORY"
<< "\t"
<< "DISK"
<< "\t"
<< "NETWORK" << std::endl;

int worker_num = workers_abstract_.size();
std::vector<double> average_;
average_.resize(4);
for (int i = 0; i < 100000; i++) {
for (int j = 0; j < average_records_.at(i).size(); j++) {
average_[j] += average_records_[i][j] / worker_num;
fout << (average_records_[i][j] / worker_num);
if (j < average_records_.at(i).size() - 1)
fout << "\t";
}
fout << std::endl;
}
fout.close();
std::cout << "average usage of cluster "
<< " CPU : " << average_[0] / 100000
<< ", Memory : " << average_[1] / 100000
<< ", Disk : " << average_[2] / 100000
<< ", Network : " << average_[3] / 100000 << "\n";
scheduler_->Report();
prefix = "report/user_";
for (int i = 0; i < users_->size(); ++i) {
Expand Down Expand Up @@ -168,6 +206,7 @@ class Simulator {

private:
bool is_worker_ = true;
std::vector<std::vector<double>> average_records_;
ResourcePack cluster_resource_capacity_;
std::vector<Job> jobs_;
std::shared_ptr<Scheduler> scheduler_;
Expand Down
2 changes: 1 addition & 1 deletion src/worker/node_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ class NodeManager : public WorkerAbstract {
records_.insert(GenerateUtilizationRecord(0));
worker_id_ = worker_id;
invalid_event_id_set_ = invalid_event_id_set;
CHECK(false) << "stop here";
worker_cpu_ = WorkerCPU(worker_id_, resource_capacity_, resource_usage_,
resource_reservation_, false);
worker_disk_ =
Expand Down Expand Up @@ -69,6 +68,7 @@ class NodeManager : public WorkerAbstract {
bool TryToReserve(ResourcePack resource) {
return resource_reservation_->Add(resource).FitIn(*resource_capacity_);
}

friend void from_json(const json &j, std::shared_ptr<NodeManager> &worker) {
worker = std::make_shared<NodeManager>();
worker->resource_capacity_ = std::make_shared<ResourcePack>();
Expand Down
10 changes: 6 additions & 4 deletions src/worker/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,11 @@ class Worker : public WorkerAbstract {
}

// subgraph finish
void SubGraphFinish(double time, double mem) {
DLOG(INFO) << "sg finish, release memory reservation: " << mem;
resource_reservation_->SetMemory(resource_reservation_->GetMemory() - mem);
void SubGraphFinish(double time, ResourcePack resource) {
DLOG(INFO) << "sg finish, release memory reservation: "
<< resource.GetMemory();
resource_reservation_->SetMemory(resource_reservation_->GetMemory() -
resource.GetMemory());
records_.insert(GenerateUtilizationRecord(time));
}

Expand Down Expand Up @@ -110,7 +112,7 @@ class Worker : public WorkerAbstract {
j.get_to(*(worker->resource_capacity_));
worker->resource_maximum_reservation_ = std::make_shared<ResourcePack>();
*(worker->resource_maximum_reservation_) = {
2000, worker->resource_capacity_->GetMemory(), 2000, 2000};
50000000, worker->resource_capacity_->GetMemory(), 1000000, 1000000};
worker->records_.insert(worker->GenerateUtilizationRecord(0));
}

Expand Down
Loading

0 comments on commit 69bb483

Please sign in to comment.