Skip to content

Commit

Permalink
Merge pull request #48 from nlnwa/priority
Browse files Browse the repository at this point in the history
Ensure all seeds gets fair scheduling
  • Loading branch information
maeb authored Jun 11, 2021
2 parents 1bb0c5c + f7f56da commit 9416745
Show file tree
Hide file tree
Showing 13 changed files with 253 additions and 136 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -560,9 +560,16 @@ public JobExecutionStatus getTempJobExecutionStatus(JedisContext ctx, String job
return jobExecutionGetScript.run(ctx, jobExecutionId);
}

public void updateJobExecutionStatus(String jobExecutionId, State oldState, State newState, CrawlExecutionStatusChangeOrBuilder change) {
/**
* @param jobExecutionId
* @param oldState
* @param newState
* @param change
* @return true if job is running
*/
public Boolean updateJobExecutionStatus(String jobExecutionId, State oldState, State newState, CrawlExecutionStatusChangeOrBuilder change) {
try (JedisContext ctx = JedisContext.forPool(jedisPool)) {
jobExecutionUpdateScript.run(ctx, jobExecutionId, oldState, newState, change);
return jobExecutionUpdateScript.run(ctx, jobExecutionId, oldState, newState, change);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,11 @@ public void run() {
JobExecutionStatus tjes = frontier.getCrawlQueueManager().getTempJobExecutionStatus(ctx, jobExecutionId);
try {
conn.exec("db-saveJobExecutionStatus",
r.table(Tables.JOB_EXECUTIONS.name).get(jobExecutionId).update(ProtoUtils.protoToRethink(tjes)));
r.table(Tables.JOB_EXECUTIONS.name).get(jobExecutionId).update(doc ->
r.branch(doc.g("state").match("FINISHED|ABORTED_TIMEOUT|ABORTED_SIZE|ABORTED_MANUAL|FAILED|DIED"),
doc,
ProtoUtils.protoToRethink(tjes))
));
} catch (DbException e) {
LOG.warn("Could not update jobExecutionState", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,16 @@

import static no.nb.nna.veidemann.frontier.db.CrawlQueueManager.JOB_EXECUTION_PREFIX;

public class JobExecutionUpdateScript extends RedisJob<Void> {
public class JobExecutionUpdateScript extends RedisJob<Boolean> {
final LuaScript jobExecutionUpdate;

public JobExecutionUpdateScript() {
super("jobExecutionUpdate");
jobExecutionUpdate = new LuaScript("jobexecution_update.lua");
}

public void run(JedisContext ctx, String jobExecutionId, State oldState, State newState, CrawlExecutionStatusChangeOrBuilder change) {
execute(ctx, jedis -> {
public Boolean run(JedisContext ctx, String jobExecutionId, State oldState, State newState, CrawlExecutionStatusChangeOrBuilder change) {
return execute(ctx, jedis -> {
String key = JOB_EXECUTION_PREFIX + jobExecutionId;

String oState = State.UNDEFINED.name();
Expand All @@ -42,8 +42,8 @@ public void run(JedisContext ctx, String jobExecutionId, State oldState, State n
List<String> args = ImmutableList.of(oState, nState, documentsCrawled, documentsDenied, documentsFailed,
documentsOutOfScope, documentsRetried, urisCrawled, bytesCrawled);

jobExecutionUpdate.runString(jedis, keys, args);
return null;
Long runningExecutions = (Long) jobExecutionUpdate.runString(jedis, keys, args);
return runningExecutions > 0;
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,11 @@ public NextUriScriptResult run(JedisContext ctx, CrawlHostGroup crawlHostGroup)
double maxScore = mResult.iterator().next().getScore();

// Choose weighted random crawl execution
Set<String> eResult = jedis.zrangeByScore(UCHG + chgId, String.valueOf(Math.random() * maxScore), "+inf", 0, 1);
String key = UCHG + chgId;
String minScore = String.valueOf(Math.random() * maxScore);
Long matchCount = jedis.zcount(key, minScore, "+inf");
long offset = (int) (Math.random() * (matchCount - 1));
Set<String> eResult = jedis.zrangeByScore(key, minScore, "+inf", (int) offset, 1);
if (eResult.isEmpty()) {
return new NextUriScriptResult(FutureOptional.empty());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,14 @@ public void run(JedisContext ctx, QueuedUri qUri) {
String uchgKey = String.format("%s%s",
UCHG,
chgId);
String weight = String.format(Locale.ENGLISH, "%1.2f", qUri.getPriorityWeight());
double priorityWeight = qUri.getPriorityWeight();

// If this is a seed, up the priority to ensure it gets out of CREATED state in reasonable time
if (qUri.getDiscoveryPath().isEmpty()) {
priorityWeight += 100d;
}
String weight = String.format(Locale.ENGLISH, "%1.2f", priorityWeight);

String eid = qUri.getExecutionId();
List<String> keys = ImmutableList.of(ueIdKey, uchgKey);
List<String> args = ImmutableList.of(ueIdVal, weight, eid);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public static boolean isAborted(Frontier frontier, StatusWrapper status) throws
case ABORTED_TIMEOUT:
case ABORTED_SIZE:
// Set end state to desired state
status.setEndState(status.getDesiredState()).saveStatus();
endCrawl(frontier, status, status.getDesiredState());
return true;
}
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,8 @@ public synchronized StatusWrapper saveStatus() throws DbException {
boolean wasNotEnded = changes.get(0).get("old_val") == null || changes.get(0).get("old_val").get("endTime") == null;
CrawlExecutionStatus newDoc = ProtoUtils.rethinkToProto(changes.get(0).get("new_val"), CrawlExecutionStatus.class);

frontier.getCrawlQueueManager().updateJobExecutionStatus(newDoc.getJobExecutionId(), status.getState(), newDoc.getState(), change);
if (wasNotEnded && newDoc.hasEndTime()) {
Boolean hasRunningExecutions = frontier.getCrawlQueueManager().updateJobExecutionStatus(newDoc.getJobExecutionId(), status.getState(), newDoc.getState(), change);
if (!hasRunningExecutions && wasNotEnded && newDoc.hasEndTime()) {
updateJobExecution(conn, newDoc.getJobExecutionId());
}

Expand All @@ -196,51 +196,42 @@ private void updateJobExecution(RethinkDbConnection conn, String jobExecutionId)
return;
}

// Get a count of still running CrawlExecutions for this execution's JobExecution
Long notEndedCount = tjes.getExecutionsStateMap().entrySet().stream()
.filter(e -> e.getKey().matches("UNDEFINED|CREATED|FETCHING|SLEEPING"))
.map(e -> e.getValue().longValue())
.reduce(0L, Long::sum);

// If all CrawlExecutions are done for this JobExectuion, update the JobExecution with end statistics
if (notEndedCount == 0) {
LOG.debug("JobExecution '{}' finished, saving stats", jobExecutionId);

// Fetch the JobExecutionStatus object this CrawlExecution is part of
JobExecutionStatus jes = conn.executeGet("db-getJobExecutionStatus",
r.table(Tables.JOB_EXECUTIONS.name).get(jobExecutionId),
JobExecutionStatus.class);
if (jes == null) {
throw new IllegalStateException("Can't find JobExecution: " + jobExecutionId);
}

// Set JobExecution's status to FINISHED if it wasn't already aborted
JobExecutionStatus.State state;
switch (jes.getState()) {
case DIED:
case FAILED:
case ABORTED_MANUAL:
state = jes.getState();
break;
default:
if (jes.getDesiredState() != null && jes.getDesiredState() != JobExecutionStatus.State.UNDEFINED) {
state = jes.getDesiredState();
} else {
state = JobExecutionStatus.State.FINISHED;
}
break;
}
LOG.debug("JobExecution '{}' finished, saving stats", jobExecutionId);

// Update aggregated statistics
JobExecutionStatus.Builder jesBuilder = jes.toBuilder()
.setState(state)
.setEndTime(ProtoUtils.getNowTs());
jesBuilder.mergeFrom(tjes);
// Fetch the JobExecutionStatus object this CrawlExecution is part of
JobExecutionStatus jes = conn.executeGet("db-getJobExecutionStatus",
r.table(Tables.JOB_EXECUTIONS.name).get(jobExecutionId),
JobExecutionStatus.class);
if (jes == null) {
throw new IllegalStateException("Can't find JobExecution: " + jobExecutionId);
}

conn.exec("db-saveJobExecutionStatus",
r.table(Tables.JOB_EXECUTIONS.name).get(jesBuilder.getId()).update(ProtoUtils.protoToRethink(jesBuilder)));
frontier.getCrawlQueueManager().removeRedisJobExecution(jobExecutionId);
// Set JobExecution's status to FINISHED if it wasn't already aborted
JobExecutionStatus.State state;
switch (jes.getState()) {
case DIED:
case FAILED:
case ABORTED_MANUAL:
state = jes.getState();
break;
default:
if (jes.getDesiredState() != null && jes.getDesiredState() != JobExecutionStatus.State.UNDEFINED) {
state = jes.getDesiredState();
} else {
state = JobExecutionStatus.State.FINISHED;
}
break;
}

// Update aggregated statistics
JobExecutionStatus.Builder jesBuilder = jes.toBuilder()
.setState(state)
.setEndTime(ProtoUtils.getNowTs());
jesBuilder.mergeFrom(tjes);

conn.exec("db-saveJobExecutionStatus",
r.table(Tables.JOB_EXECUTIONS.name).get(jesBuilder.getId()).update(ProtoUtils.protoToRethink(jesBuilder)));
frontier.getCrawlQueueManager().removeRedisJobExecution(jobExecutionId);
}

public String getId() {
Expand Down
66 changes: 40 additions & 26 deletions src/main/resources/lua/jobexecution_update.lua
Original file line number Diff line number Diff line change
@@ -1,34 +1,48 @@
---
--- KEYS[1]: jobExecutionKey
--- ARGV[1]: oldState
--- ARGV[2]: newState
--- ARGV[3]: documentsCrawled
--- ARGV[4]: documentsDenied
--- ARGV[5]: documentsFailed
--- ARGV[6]: documentsOutOfScope
--- ARGV[7]: documentsRetried
--- ARGV[8]: urisCrawled
--- ARGV[9]: bytesCrawled
---

if ARGV[2] == 'CREATED' or redis.call('EXISTS', KEYS[1]) == 1 then
local jobExecutionKey = KEYS[1]
local oldState = ARGV[1]
local newState = ARGV[2]
local documentsCrawled = ARGV[3]
local documentsDenied = ARGV[4]
local documentsFailed = ARGV[5]
local documentsOutOfScope = ARGV[6]
local documentsRetried = ARGV[7]
local urisCrawled = ARGV[8]
local bytesCrawled = ARGV[9]

if newState == 'CREATED' or redis.call('EXISTS', jobExecutionKey) == 1 then
-- Update states
if ARGV[1] ~= 'UNDEFINED' then
redis.call('HINCRBY', KEYS[1], ARGV[1], -1)
if oldState ~= 'UNDEFINED' then
redis.call('HINCRBY', jobExecutionKey, oldState, -1)
end

if ARGV[2] ~= 'UNDEFINED' then
redis.call('HINCRBY', KEYS[1], ARGV[2], 1)
if newState ~= 'UNDEFINED' then
redis.call('HINCRBY', jobExecutionKey, newState, 1)
end

-- Update stats
redis.call('HINCRBY', KEYS[1], "documentsCrawled", ARGV[3])
redis.call('HINCRBY', KEYS[1], "documentsDenied", ARGV[4])
redis.call('HINCRBY', KEYS[1], "documentsFailed", ARGV[5])
redis.call('HINCRBY', KEYS[1], "documentsOutOfScope", ARGV[6])
redis.call('HINCRBY', KEYS[1], "documentsRetried", ARGV[7])
redis.call('HINCRBY', KEYS[1], "urisCrawled", ARGV[8])
redis.call('HINCRBY', KEYS[1], "bytesCrawled", ARGV[9])
redis.call('HINCRBY', jobExecutionKey, "documentsCrawled", documentsCrawled)
redis.call('HINCRBY', jobExecutionKey, "documentsDenied", documentsDenied)
redis.call('HINCRBY', jobExecutionKey, "documentsFailed", documentsFailed)
redis.call('HINCRBY', jobExecutionKey, "documentsOutOfScope", documentsOutOfScope)
redis.call('HINCRBY', jobExecutionKey, "documentsRetried", documentsRetried)
redis.call('HINCRBY', jobExecutionKey, "urisCrawled", urisCrawled)
redis.call('HINCRBY', jobExecutionKey, "bytesCrawled", bytesCrawled)

local running = 0
if redis.call('HEXISTS', jobExecutionKey, 'UNDEFINED') == 1 then
running = running + tonumber(redis.call('HGET', jobExecutionKey, 'UNDEFINED'))
end
if redis.call('HEXISTS', jobExecutionKey, 'CREATED') == 1 then
running = running + tonumber(redis.call('HGET', jobExecutionKey, 'CREATED'))
end
if redis.call('HEXISTS', jobExecutionKey, 'FETCHING') == 1 then
running = running + tonumber(redis.call('HGET', jobExecutionKey, 'FETCHING'))
end
if redis.call('HEXISTS', jobExecutionKey, 'SLEEPING') == 1 then
running = running + tonumber(redis.call('HGET', jobExecutionKey, 'SLEEPING'))
end

return running
end

end
return -1
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
import no.nb.nna.veidemann.api.frontier.v1.JobExecutionStatus;
import no.nb.nna.veidemann.frontier.testutil.CrawlRunner.RunningCrawl;
import no.nb.nna.veidemann.frontier.testutil.CrawlRunner.SeedAndExecutions;
import org.assertj.core.api.Assertions;
import org.assertj.core.presentation.StandardRepresentation;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
Expand All @@ -30,27 +28,8 @@ public class HarvestMultipleJobsTest extends no.nb.nna.veidemann.frontier.testut
int maxHopsFromSeed = 1;
int numberOfJobs = 40;

public class CustomRepresentation extends StandardRepresentation {
// override fallbackToStringOf to handle Example formatting
@Override
public String fallbackToStringOf(Object o) {
if (o instanceof JobExecutionStatus) {
JobExecutionStatus jes = (JobExecutionStatus) o;
return jes.getId();
}
if (o instanceof CrawlExecutionStatus) {
CrawlExecutionStatus ces = (CrawlExecutionStatus) o;
return ces.getId();
}
// fallback to default formatting.
return super.fallbackToStringOf(o);
}
}

@Test
public void testSameSeedsInParallellJobs() throws Exception {
Assertions.useRepresentation(new CustomRepresentation());

scopeCheckerServiceMock.withMaxHopsFromSeed(maxHopsFromSeed);
harvesterMock.withLinksPerLevel(linksPerLevel);

Expand Down Expand Up @@ -96,6 +75,8 @@ public void testSameSeedsInParallellJobs() throws Exception {
.currentUriIdCountIsEqualTo(0);
});

assertThat(rethinkDbData).jobStatsMatchesCrawlExecutions();

assertThat(logServiceMock.crawlLogs).hasNumberOfRequests(0);

assertThat(redisData)
Expand Down Expand Up @@ -161,6 +142,8 @@ public void testUniqueSeedsWithSameIpInParallellJobs() throws Exception {
.currentUriIdCountIsEqualTo(0);
});

assertThat(rethinkDbData).jobStatsMatchesCrawlExecutions();

assertThat(logServiceMock.crawlLogs).hasNumberOfRequests(0);

assertThat(redisData)
Expand Down Expand Up @@ -211,7 +194,6 @@ public void testUniqueSeedsInParallellJobs() throws Exception {
.documentsRetriedEquals(0)
.documentsOutOfScopeEquals(seedCount);
});
String crawlExecutionId1 = seeds[1].get(0).getCrawlExecution(jobs[1]).get().getId();

assertThat(rethinkDbData)
.crawlExecutionStatuses().hasSize(seedCount * numberOfJobs)
Expand All @@ -228,6 +210,8 @@ public void testUniqueSeedsInParallellJobs() throws Exception {
.currentUriIdCountIsEqualTo(0);
});

assertThat(rethinkDbData).jobStatsMatchesCrawlExecutions();

assertThat(logServiceMock.crawlLogs).hasNumberOfRequests(0);

assertThat(redisData)
Expand Down Expand Up @@ -290,6 +274,8 @@ public void testUniqueSeedsWithSameIpInOneJob() throws Exception {
.currentUriIdCountIsEqualTo(0);
});

assertThat(rethinkDbData).jobStatsMatchesCrawlExecutions();

assertThat(logServiceMock.crawlLogs).hasNumberOfRequests(0);

assertThat(redisData)
Expand Down Expand Up @@ -348,6 +334,8 @@ public void testSameSeedsInOneJob() throws Exception {
.currentUriIdCountIsEqualTo(0);
});

assertThat(rethinkDbData).jobStatsMatchesCrawlExecutions();

assertThat(logServiceMock.crawlLogs).hasNumberOfRequests(0);

assertThat(redisData)
Expand Down
Loading

0 comments on commit 9416745

Please sign in to comment.