Skip to content

Commit

Permalink
Merge pull request #49 from nlnwa/fix-abort-job
Browse files Browse the repository at this point in the history
Add guard against submitting seeds to job which has finished
  • Loading branch information
maeb authored Jun 11, 2021
2 parents 9416745 + ed595fa commit cc02e5b
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import com.google.protobuf.Empty;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;
import no.nb.nna.veidemann.api.frontier.v1.CountResponse;
Expand All @@ -30,6 +31,7 @@
import no.nb.nna.veidemann.frontier.worker.Frontier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -61,10 +63,15 @@ public void awaitTermination() throws InterruptedException {

@Override
public void crawlSeed(CrawlSeedRequest request, StreamObserver<CrawlExecutionId> responseObserver) {
MDC.clear();
MDC.put("uri", request.getSeed().getMeta().getName());
try {
CrawlExecutionStatus reply = ctx.getFrontier().scheduleSeed(request);
responseObserver.onNext(CrawlExecutionId.newBuilder().setId(reply.getId()).build());
responseObserver.onCompleted();
} catch (StatusRuntimeException e) {
LOG.error("Crawl seed error: " + e.getMessage());
responseObserver.onError(e);
} catch (Exception e) {
LOG.error("Crawl seed error: " + e.getMessage(), e);
Status status = Status.UNKNOWN.withDescription(e.toString());
Expand Down
36 changes: 29 additions & 7 deletions src/main/java/no/nb/nna/veidemann/frontier/worker/Frontier.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.common.util.concurrent.ListenableFuture;
import com.rethinkdb.RethinkDB;
import com.rethinkdb.gen.ast.Insert;
import io.grpc.Status;
import io.grpc.health.v1.HealthCheckResponse.ServingStatus;
import io.opentracing.Scope;
import io.opentracing.Span;
Expand Down Expand Up @@ -62,6 +63,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;

/**
Expand Down Expand Up @@ -157,6 +159,12 @@ public CrawlExecutionStatus scheduleSeed(CrawlSeedRequest request) throws DbExce
Tracer tracer = getTracer();
Span span = tracer.activeSpan();

// Check that job is still running before allowing new seeds
String jobState = conn.exec(r.table(Tables.JOB_EXECUTIONS.name).get(request.getJobExecutionId()).g("state"));
if (jobState.matches("FINISHED|ABORTED_TIMEOUT|ABORTED_SIZE|ABORTED_MANUAL|FAILED|DIED")) {
throw Status.FAILED_PRECONDITION.withDescription("Job execution '" + request.getJobExecutionId() + "' has finished").asRuntimeException();
}

// Create crawl execution
StatusWrapper status = StatusWrapper.getStatusWrapper(this,
createCrawlExecutionStatus(
Expand All @@ -166,14 +174,28 @@ public CrawlExecutionStatus scheduleSeed(CrawlSeedRequest request) throws DbExce

LOG.debug("New crawl execution: " + status.getId());

getAsyncFunctionsThreadPool().submit(() -> {
try (Scope scope = tracer.scopeManager().activate(span)) {
preprocessAndQueueSeed(request, status);
} catch (DbException e) {
e.printStackTrace();
// Do not process seed if job is aborted
if (CrawlExecutionHelpers.isAborted(this, status)) {
return status.getCrawlExecutionStatus();
}

try {
getAsyncFunctionsThreadPool().submit(() -> {
try (Scope scope = tracer.scopeManager().activate(span)) {
preprocessAndQueueSeed(request, status);
} catch (DbException e) {
LOG.error(e.toString(), e);
}
});
return status.getCrawlExecutionStatus();
} catch (RejectedExecutionException e) {
if (getAsyncFunctionsThreadPool().isShutdown()) {
status.setEndState(State.FAILED).saveStatus();
throw Status.UNAVAILABLE.asRuntimeException();
} else {
throw e;
}
});
return status.getCrawlExecutionStatus();
}
}

public void preprocessAndQueueSeed(CrawlSeedRequest request, StatusWrapper status) throws DbException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ public CrawlExecutionStatus.State getState() {
}

public CrawlExecutionStatus.State getDesiredState() {
return getCrawlExecutionStatus().getDesiredState();
return status.getDesiredState();
}

public StatusWrapper setState(CrawlExecutionStatus.State state) {
Expand Down Expand Up @@ -358,6 +358,7 @@ public synchronized StatusWrapper incrementDocumentsDenied(long val) {

public CrawlExecutionStatus getCrawlExecutionStatus() {
if (dirty) {
new RuntimeException("CES").printStackTrace();
throw new IllegalStateException("CES is dirty " + change);
}
return status.build();
Expand Down
69 changes: 69 additions & 0 deletions src/test/java/no/nb/nna/veidemann/frontier/api/HarvestTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.testcontainers.junit.jupiter.Testcontainers;
import redis.clients.jedis.Jedis;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -802,6 +803,74 @@ public void testAbortJobExecution() throws Exception {
.readyQueue().hasNumberOfElements(0);
}

@Test
public void testAbortBigJobExecution() throws Exception {
int seedCount = 1000;
int linksPerLevel = 3;
int maxHopsFromSeed = 2;

scopeCheckerServiceMock.withMaxHopsFromSeed(maxHopsFromSeed);
harvesterMock.withLinksPerLevel(linksPerLevel);

ConfigObject job = crawlRunner.genJob("job1");
List<SeedAndExecutions> seeds = new ArrayList<>();
for (int i = 0; i < 2; i++) {
String hostPrefix = String.format("a%03d.seed", i);
seeds.addAll(crawlRunner.genSeeds(seedCount, hostPrefix, job));
}
RunningCrawl crawl = crawlRunner.runCrawl(job, seeds);

// Abort job as soon as one hundred seeds are sleeping.
await().pollDelay(10, TimeUnit.MILLISECONDS).pollInterval(10, TimeUnit.MILLISECONDS).atMost(30, TimeUnit.MINUTES)
.until(() -> {
try (Jedis jedis = jedisPool.getResource()) {
Map<String, String> f = jedis.hgetAll(CrawlQueueManager.JOB_EXECUTION_PREFIX + crawl.getStatus().getId());
if (Integer.parseInt(f.getOrDefault("SLEEPING", "0")) > 10) {
return true;
}
return false;
}
});
DbService.getInstance().getExecutionsAdapter().setJobExecutionStateAborted(crawl.getStatus().getId());

// Wait for crawl to finish
crawlRunner.awaitCrawlFinished(5, TimeUnit.MINUTES, crawl);

assertThat(logServiceMock.crawlLogs).hasNumberOfRequests(0);

assertThat(rethinkDbData)
.hasQueueTotalCount(0)
.jobStatsMatchesCrawlExecutions()
.jobExecutionStatuses().hasSize(1)
.hasEntrySatisfying(crawl.getStatus().getId(), j -> {
assertThat(j)
.hasState(JobExecutionStatus.State.ABORTED_MANUAL)
.hasStartTime(true)
.hasEndTime(true)
.documentsCrawledSatisfies(d -> d.isGreaterThan(0))
.documentsDeniedEquals(0)
.documentsFailedEquals(0)
.documentsRetriedEquals(0)
.executionsStateCountSatifies(CrawlExecutionStatus.State.ABORTED_MANUAL, d -> d.isGreaterThan(0))
.executionsStateCountEquals(CrawlExecutionStatus.State.ABORTED_TIMEOUT, 0)
.executionsStateCountEquals(CrawlExecutionStatus.State.ABORTED_SIZE, 0)
.executionsStateCountEquals(CrawlExecutionStatus.State.FAILED, 0)
.executionsStateCountSatifies(CrawlExecutionStatus.State.CREATED, d -> d.isBetween(0, seedCount))
.executionsStateCountEquals(CrawlExecutionStatus.State.FETCHING, 0)
.executionsStateCountEquals(CrawlExecutionStatus.State.SLEEPING, 0);
});

assertThat(redisData)
.hasQueueTotalCount(0)
.crawlHostGroups().hasNumberOfElements(0);
assertThat(redisData)
.crawlExecutionQueueCounts().hasNumberOfElements(0);
assertThat(redisData)
.sessionTokens().hasNumberOfElements(0);
assertThat(redisData)
.readyQueue().hasNumberOfElements(0);
}

@Test
public void testAbortTimeout() throws Exception {
int seedCount = 20;
Expand Down

0 comments on commit cc02e5b

Please sign in to comment.