Skip to content

Commit

Permalink
Merge pull request #4 from nlnwa/more_metrics
Browse files Browse the repository at this point in the history
Fixed job metrics
  • Loading branch information
johnerikhalse authored Mar 8, 2019
2 parents b50dd14 + 766f835 commit e9a0bb1
Showing 1 changed file with 92 additions and 46 deletions.
138 changes: 92 additions & 46 deletions exporter/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,53 +161,99 @@ func (e *exporter) collectJobStatusJob() {
}

func (e *exporter) collectJobStatus() {
jobStates, err := r.Table("job_executions").
GetAllByIndex("state", "RUNNING").
EqJoin("jobId", r.Table("config")).Without(map[string]interface{}{"right": "id"}).Zip().
jobStates, err := r.Table("config").Filter(map[string]interface{}{"kind": "crawlJob"}).
Map(func(d r.Term) interface{} {
return map[string]interface{}{
"name": d.Field("meta").Field("name"),
"state": d.Field("state"),
"id": d.Field("id"),}
}).
Merge(func(d r.Term) interface{} {
return r.Table("executions").Map(func(doc r.Term) interface{} {
return map[string]interface{}{
"ABORTED_MANUAL": r.Branch(doc.Field("state").Eq("ABORTED_MANUAL"), 1, 0),
"ABORTED_SIZE": r.Branch(doc.Field("state").Eq("ABORTED_SIZE"), 1, 0),
"ABORTED_TIMEOUT": r.Branch(doc.Field("state").Eq("ABORTED_TIMEOUT"), 1, 0),
"CREATED": r.Branch(doc.Field("state").Eq("CREATED"), 1, 0),
"FAILED": r.Branch(doc.Field("state").Eq("FAILED"), 1, 0),
"FETCHING": r.Branch(doc.Field("state").Eq("FETCHING"), 1, 0),
"FINISHED": r.Branch(doc.Field("state").Eq("FINISHED"), 1, 0),
"SLEEPING": r.Branch(doc.Field("state").Eq("SLEEPING"), 1, 0),
"documentsCrawled": doc.Field("documentsCrawled").Default(0),
"documentsDenied": doc.Field("documentsDenied").Default(0),
"documentsFailed": doc.Field("documentsFailed").Default(0),
"documentsOutOfScope": doc.Field("documentsOutOfScope").Default(0),
"documentsRetried": doc.Field("documentsRetried").Default(0),
"urisCrawled": doc.Field("urisCrawled").Default(0),
"bytesCrawled": doc.Field("bytesCrawled").Default(0),
}
}).Reduce(func(left, right r.Term) interface{} {
return map[string]interface{}{
"ABORTED_MANUAL": left.Field("ABORTED_MANUAL").Add(right.Field("ABORTED_MANUAL")),
"ABORTED_SIZE": left.Field("ABORTED_SIZE").Add(right.Field("ABORTED_SIZE")),
"ABORTED_TIMEOUT": left.Field("ABORTED_TIMEOUT").Add(right.Field("ABORTED_TIMEOUT")),
"CREATED": left.Field("CREATED").Add(right.Field("CREATED")),
"FAILED": left.Field("FAILED").Add(right.Field("FAILED")),
"FETCHING": left.Field("FETCHING").Add(right.Field("FETCHING")),
"FINISHED": left.Field("FINISHED").Add(right.Field("FINISHED")),
"SLEEPING": left.Field("SLEEPING").Add(right.Field("SLEEPING")),
"documentsCrawled": left.Field("documentsCrawled").Add(right.Field("documentsCrawled")),
"documentsDenied": left.Field("documentsDenied").Add(right.Field("documentsDenied")),
"documentsFailed": left.Field("documentsFailed").Add(right.Field("documentsFailed")),
"documentsOutOfScope": left.Field("documentsOutOfScope").Add(right.Field("documentsOutOfScope")),
"documentsRetried": left.Field("documentsRetried").Add(right.Field("documentsRetried")),
"urisCrawled": left.Field("urisCrawled").Add(right.Field("urisCrawled")),
"bytesCrawled": left.Field("bytesCrawled").Add(right.Field("bytesCrawled")),
}
})
return r.Table("job_executions").
OrderBy(r.OrderByOpts{Index: r.Desc("jobId_startTime")}).
Between([]r.Term{d.Field("id"), r.MinVal}, []r.Term{d.Field("id"), r.MaxVal}).
Limit(1).
Map(func(doc r.Term) interface{} {
return doc.Field("executionsState").
Do(func(d2 r.Term) interface{} {
return d2.ConcatMap(func(d3 r.Term) interface{} {
return d3.CoerceTo("array")
}).CoerceTo("object")
}).Default(map[string]interface{}{}).
Merge(
map[string]interface{}{
"documentsCrawled": doc.Field("documentsCrawled").Default(0),
"documentsDenied": doc.Field("documentsDenied").Default(0),
"documentsFailed": doc.Field("documentsFailed").Default(0),
"documentsOutOfScope": doc.Field("documentsOutOfScope").Default(0),
"documentsRetried": doc.Field("documentsRetried").Default(0),
"urisCrawled": doc.Field("urisCrawled").Default(0),
"bytesCrawled": doc.Field("bytesCrawled").Default(0),
"state": doc.Field("state").Default("UNDEFINED"),
"jobExecutionId": doc.Field("id").Default(""),
})
}).Nth(0).
//Reduce(func(left, right r.Term) interface{} { return left }).
Default(map[string]interface{}{
"ABORTED_MANUAL": 0,
"ABORTED_SIZE": 0,
"ABORTED_TIMEOUT": 0,
"CREATED": 0,
"FAILED": 0,
"FETCHING": 0,
"FINISHED": 0,
"SLEEPING": 0,
"documentsCrawled": 0,
"documentsDenied": 0,
"documentsFailed": 0,
"documentsOutOfScope": 0,
"documentsRetried": 0,
"urisCrawled": 0,
"bytesCrawled": 0,
"state": "UNDEFINED",
}).
Merge(map[string]interface{}{"name": d.Field("meta").Field("name"),}).
Do(func(doc r.Term) interface{} {
return r.Branch(doc.Field("state").Eq("RUNNING"),
doc.Merge(func(d r.Term) interface{} {
return r.Table("executions").
Between([]r.Term{d.Field("jobExecutionId"), r.MinVal}, []r.Term{d.Field("jobExecutionId"), r.MaxVal}, r.BetweenOpts{
Index: "jobExecutionId_seedId",
}).
Map(func(doc r.Term) interface{} {
return map[string]interface{}{
"ABORTED_MANUAL": r.Branch(doc.Field("state").Eq("ABORTED_MANUAL"), 1, 0),
"ABORTED_SIZE": r.Branch(doc.Field("state").Eq("ABORTED_SIZE"), 1, 0),
"ABORTED_TIMEOUT": r.Branch(doc.Field("state").Eq("ABORTED_TIMEOUT"), 1, 0),
"CREATED": r.Branch(doc.Field("state").Eq("CREATED"), 1, 0),
"FAILED": r.Branch(doc.Field("state").Eq("FAILED"), 1, 0),
"FETCHING": r.Branch(doc.Field("state").Eq("FETCHING"), 1, 0),
"FINISHED": r.Branch(doc.Field("state").Eq("FINISHED"), 1, 0),
"SLEEPING": r.Branch(doc.Field("state").Eq("SLEEPING"), 1, 0),
"documentsCrawled": doc.Field("documentsCrawled").Default(0),
"documentsDenied": doc.Field("documentsDenied").Default(0),
"documentsFailed": doc.Field("documentsFailed").Default(0),
"documentsOutOfScope": doc.Field("documentsOutOfScope").Default(0),
"documentsRetried": doc.Field("documentsRetried").Default(0),
"urisCrawled": doc.Field("urisCrawled").Default(0),
"bytesCrawled": doc.Field("bytesCrawled").Default(0),
}
}).Reduce(func(left, right r.Term) interface{} {
return map[string]interface{}{
"ABORTED_MANUAL": left.Field("ABORTED_MANUAL").Add(right.Field("ABORTED_MANUAL")),
"ABORTED_SIZE": left.Field("ABORTED_SIZE").Add(right.Field("ABORTED_SIZE")),
"ABORTED_TIMEOUT": left.Field("ABORTED_TIMEOUT").Add(right.Field("ABORTED_TIMEOUT")),
"CREATED": left.Field("CREATED").Add(right.Field("CREATED")),
"FAILED": left.Field("FAILED").Add(right.Field("FAILED")),
"FETCHING": left.Field("FETCHING").Add(right.Field("FETCHING")),
"FINISHED": left.Field("FINISHED").Add(right.Field("FINISHED")),
"SLEEPING": left.Field("SLEEPING").Add(right.Field("SLEEPING")),
"documentsCrawled": left.Field("documentsCrawled").Add(right.Field("documentsCrawled")),
"documentsDenied": left.Field("documentsDenied").Add(right.Field("documentsDenied")),
"documentsFailed": left.Field("documentsFailed").Add(right.Field("documentsFailed")),
"documentsOutOfScope": left.Field("documentsOutOfScope").Add(right.Field("documentsOutOfScope")),
"documentsRetried": left.Field("documentsRetried").Add(right.Field("documentsRetried")),
"urisCrawled": left.Field("urisCrawled").Add(right.Field("urisCrawled")),
"bytesCrawled": left.Field("bytesCrawled").Add(right.Field("bytesCrawled")),
}
})
}),
doc)
})
}).
Run(e.conn.DbSession)

Expand Down

0 comments on commit e9a0bb1

Please sign in to comment.