From 285e737b4c0c519dd0980e13481cb60cec156b52 Mon Sep 17 00:00:00 2001 From: John Erik Halse Date: Fri, 28 May 2021 10:44:13 +0200 Subject: [PATCH 1/4] Tests for parallell jobs --- .../frontier/api/HarvestMultipleJobsTest.java | 398 ++++++++++++++++++ .../veidemann/frontier/api/HarvestTest.java | 192 +++++---- .../veidemann/frontier/api/TracerTest.java | 12 +- .../frontier/testutil/CrawlRunner.java | 168 +++++--- .../frontier/testutil/DnsResolverMock.java | 2 +- src/test/resources/log4j2.xml | 8 +- 6 files changed, 630 insertions(+), 150 deletions(-) create mode 100644 src/test/java/no/nb/nna/veidemann/frontier/api/HarvestMultipleJobsTest.java diff --git a/src/test/java/no/nb/nna/veidemann/frontier/api/HarvestMultipleJobsTest.java b/src/test/java/no/nb/nna/veidemann/frontier/api/HarvestMultipleJobsTest.java new file mode 100644 index 0000000..ff4175f --- /dev/null +++ b/src/test/java/no/nb/nna/veidemann/frontier/api/HarvestMultipleJobsTest.java @@ -0,0 +1,398 @@ +package no.nb.nna.veidemann.frontier.api; + +import no.nb.nna.veidemann.api.config.v1.ConfigObject; +import no.nb.nna.veidemann.api.frontier.v1.CrawlExecutionStatus; +import no.nb.nna.veidemann.api.frontier.v1.JobExecutionStatus; +import no.nb.nna.veidemann.frontier.testutil.CrawlRunner.RunningCrawl; +import no.nb.nna.veidemann.frontier.testutil.CrawlRunner.SeedAndExecutions; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.junit.jupiter.Testcontainers; + +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static no.nb.nna.veidemann.frontier.testutil.FrontierAssertions.assertThat; + +@Testcontainers +@Tag("integration") +@Tag("redis") +@Tag("rethinkDb") +public class HarvestMultipleJobsTest extends no.nb.nna.veidemann.frontier.testutil.AbstractIntegrationTest { + private static final Logger LOG = LoggerFactory.getLogger(HarvestMultipleJobsTest.class); + + int seedCount = 2; + int linksPerLevel = 1; + int maxHopsFromSeed = 1; + int numberOfJobs = 40; + + @Test + public void testSameSeedsInParallellJobs() throws Exception { + scopeCheckerServiceMock.withMaxHopsFromSeed(maxHopsFromSeed); + harvesterMock.withLinksPerLevel(linksPerLevel); + + ConfigObject[] jobs = new ConfigObject[numberOfJobs]; + for (int i = 0; i < numberOfJobs; i++) { + jobs[i] = crawlRunner.genJob("job" + (i + 1)); + } + List seeds = crawlRunner.genSeeds(seedCount, "a.seed", jobs); + RunningCrawl[] crawls = new RunningCrawl[numberOfJobs]; + for (int i = 0; i < numberOfJobs; i++) { + crawls[i] = crawlRunner.runCrawl(jobs[i], seeds); + } + crawlRunner.awaitCrawlFinished(5, TimeUnit.MINUTES, crawls); + + assertThat(rethinkDbData) + .hasQueueTotalCount(0); + assertThat(rethinkDbData) + .jobExecutionStatuses().hasSize(numberOfJobs) + .hasEntrySatisfying(crawls[1].getStatus().getId(), j -> { + assertThat(j) + .hasState(JobExecutionStatus.State.FINISHED) + .hasStartTime(true) + .hasEndTime(true) + .documentsCrawledEquals(2 * seedCount) + .documentsDeniedEquals(0) + .documentsFailedEquals(0) + .documentsRetriedEquals(0) + .documentsOutOfScopeEquals(seedCount); + }) + .hasEntrySatisfying(crawls[2].getStatus().getId(), j -> { + assertThat(j) + .hasState(JobExecutionStatus.State.FINISHED) + .hasStartTime(true) + .hasEndTime(true) + .documentsCrawledEquals(2 * seedCount) + .documentsDeniedEquals(0) + .documentsFailedEquals(0) + .documentsRetriedEquals(0) + .documentsOutOfScopeEquals(seedCount); + }); + String crawlExecutionId1 = seeds.get(0).getCrawlExecution(jobs[1]).get().getId(); + + assertThat(rethinkDbData) + .crawlExecutionStatuses().hasSize(seedCount * numberOfJobs) + .hasEntrySatisfying(crawlExecutionId1, s -> { + assertThat(s) + .hasState(CrawlExecutionStatus.State.FINISHED) + .hasStartTime(true) + .hasEndTime(true) + .documentsCrawledEquals(2) + .documentsDeniedEquals(0) + .documentsFailedEquals(0) + .documentsRetriedEquals(0) + .documentsOutOfScopeEquals(1) + .currentUriIdCountIsEqualTo(0); + }); + + assertThat(logServiceMock.crawlLogs).hasNumberOfRequests(0); + + assertThat(redisData) + .hasQueueTotalCount(0) + .crawlHostGroups().hasNumberOfElements(0); + assertThat(redisData) + .crawlExecutionQueueCounts().hasNumberOfElements(0); + assertThat(redisData) + .sessionTokens().hasNumberOfElements(0); + assertThat(redisData) + .readyQueue().hasNumberOfElements(0); + } + + @Test + public void testUniqueSeedsWithSameIpInParallellJobs() throws Exception { + scopeCheckerServiceMock.withMaxHopsFromSeed(maxHopsFromSeed); + harvesterMock.withLinksPerLevel(linksPerLevel); + + ConfigObject[] jobs = new ConfigObject[numberOfJobs]; + for (int i = 0; i < numberOfJobs; i++) { + jobs[i] = crawlRunner.genJob("job" + (i + 1)); + } + List[] seeds = new List[numberOfJobs]; + for (int i = 0; i < numberOfJobs; i++) { + String hostPrefix = String.format("a%03d.seed", i); + seeds[i] = crawlRunner.genSeeds(seedCount, hostPrefix, jobs[i]); + } + RunningCrawl[] crawls = new RunningCrawl[numberOfJobs]; + for (int i = 0; i < numberOfJobs; i++) { + crawls[i] = crawlRunner.runCrawl(jobs[i], seeds[i]); + } + crawlRunner.awaitCrawlFinished(5, TimeUnit.MINUTES, crawls); + + assertThat(rethinkDbData) + .hasQueueTotalCount(0); + assertThat(rethinkDbData) + .jobExecutionStatuses().hasSize(numberOfJobs) + .hasEntrySatisfying(crawls[1].getStatus().getId(), j -> { + assertThat(j) + .hasState(JobExecutionStatus.State.FINISHED) + .hasStartTime(true) + .hasEndTime(true) + .documentsCrawledEquals(2 * seedCount) + .documentsDeniedEquals(0) + .documentsFailedEquals(0) + .documentsRetriedEquals(0) + .documentsOutOfScopeEquals(seedCount); + }) + .hasEntrySatisfying(crawls[2].getStatus().getId(), j -> { + assertThat(j) + .hasState(JobExecutionStatus.State.FINISHED) + .hasStartTime(true) + .hasEndTime(true) + .documentsCrawledEquals(2 * seedCount) + .documentsDeniedEquals(0) + .documentsFailedEquals(0) + .documentsRetriedEquals(0) + .documentsOutOfScopeEquals(seedCount); + }); + String crawlExecutionId1 = seeds[1].get(0).getCrawlExecution(jobs[1]).get().getId(); + + assertThat(rethinkDbData) + .crawlExecutionStatuses().hasSize(seedCount * numberOfJobs) + .hasEntrySatisfying(crawlExecutionId1, s -> { + assertThat(s) + .hasState(CrawlExecutionStatus.State.FINISHED) + .hasStartTime(true) + .hasEndTime(true) + .documentsCrawledEquals(2) + .documentsDeniedEquals(0) + .documentsFailedEquals(0) + .documentsRetriedEquals(0) + .documentsOutOfScopeEquals(1) + .currentUriIdCountIsEqualTo(0); + }); + + assertThat(logServiceMock.crawlLogs).hasNumberOfRequests(0); + + assertThat(redisData) + .hasQueueTotalCount(0) + .crawlHostGroups().hasNumberOfElements(0); + assertThat(redisData) + .crawlExecutionQueueCounts().hasNumberOfElements(0); + assertThat(redisData) + .sessionTokens().hasNumberOfElements(0); + assertThat(redisData) + .readyQueue().hasNumberOfElements(0); + } + + @Test + public void testUniqueSeedsInParallellJobs() throws Exception { + scopeCheckerServiceMock.withMaxHopsFromSeed(maxHopsFromSeed); + harvesterMock.withLinksPerLevel(linksPerLevel); + + ConfigObject[] jobs = new ConfigObject[numberOfJobs]; + for (int i = 0; i < numberOfJobs; i++) { + jobs[i] = crawlRunner.genJob("job" + (i + 1)); + } + List[] seeds = new List[numberOfJobs]; + int offset = 0; + for (int i = 0; i < numberOfJobs; i++) { + String hostPrefix = String.format("a%03d.seed", i); + seeds[i] = crawlRunner.genSeeds(offset, seedCount, hostPrefix, jobs[i]); + offset += seedCount; + } + RunningCrawl[] crawls = new RunningCrawl[numberOfJobs]; + for (int i = 0; i < numberOfJobs; i++) { + crawls[i] = crawlRunner.runCrawl(jobs[i], seeds[i]); + } + crawlRunner.awaitCrawlFinished(1, TimeUnit.MINUTES, crawls); + + assertThat(rethinkDbData) + .hasQueueTotalCount(0); + assertThat(rethinkDbData) + .jobExecutionStatuses().hasSize(numberOfJobs) + .hasEntrySatisfying(crawls[1].getStatus().getId(), j -> { + assertThat(j) + .hasState(JobExecutionStatus.State.FINISHED) + .hasStartTime(true) + .hasEndTime(true) + .documentsCrawledEquals(2 * seedCount) + .documentsDeniedEquals(0) + .documentsFailedEquals(0) + .documentsRetriedEquals(0) + .documentsOutOfScopeEquals(seedCount); + }) + .hasEntrySatisfying(crawls[2].getStatus().getId(), j -> { + assertThat(j) + .hasState(JobExecutionStatus.State.FINISHED) + .hasStartTime(true) + .hasEndTime(true) + .documentsCrawledEquals(2 * seedCount) + .documentsDeniedEquals(0) + .documentsFailedEquals(0) + .documentsRetriedEquals(0) + .documentsOutOfScopeEquals(seedCount); + }); + String crawlExecutionId1 = seeds[1].get(0).getCrawlExecution(jobs[1]).get().getId(); + + assertThat(rethinkDbData) + .crawlExecutionStatuses().hasSize(seedCount * numberOfJobs) + .hasEntrySatisfying(crawlExecutionId1, s -> { + assertThat(s) + .hasState(CrawlExecutionStatus.State.FINISHED) + .hasStartTime(true) + .hasEndTime(true) + .documentsCrawledEquals(2) + .documentsDeniedEquals(0) + .documentsFailedEquals(0) + .documentsRetriedEquals(0) + .documentsOutOfScopeEquals(1) + .currentUriIdCountIsEqualTo(0); + }); + + assertThat(logServiceMock.crawlLogs).hasNumberOfRequests(0); + + assertThat(redisData) + .hasQueueTotalCount(0) + .crawlHostGroups().hasNumberOfElements(0); + assertThat(redisData) + .crawlExecutionQueueCounts().hasNumberOfElements(0); + assertThat(redisData) + .sessionTokens().hasNumberOfElements(0); + assertThat(redisData) + .readyQueue().hasNumberOfElements(0); + } + + @Test + public void testUniqueSeedsWithSameIpInOneJob() throws Exception { + scopeCheckerServiceMock.withMaxHopsFromSeed(maxHopsFromSeed); + harvesterMock.withLinksPerLevel(linksPerLevel); + + ConfigObject job = crawlRunner.genJob("job"); + List[] seeds = new List[numberOfJobs]; + for (int i = 0; i < numberOfJobs; i++) { + String hostPrefix = String.format("a%03d.seed", i); + seeds[i] = crawlRunner.genSeeds(seedCount, hostPrefix, job); + } + RunningCrawl[] crawls = new RunningCrawl[numberOfJobs]; + for (int i = 0; i < numberOfJobs; i++) { + crawls[i] = crawlRunner.runCrawl(job, seeds[i]); + } + crawlRunner.awaitCrawlFinished(1, TimeUnit.MINUTES, crawls); + + assertThat(rethinkDbData) + .hasQueueTotalCount(0); + assertThat(rethinkDbData) + .jobExecutionStatuses().hasSize(numberOfJobs) + .hasEntrySatisfying(crawls[1].getStatus().getId(), j -> { + assertThat(j) + .hasState(JobExecutionStatus.State.FINISHED) + .hasStartTime(true) + .hasEndTime(true) + .documentsCrawledEquals(2 * seedCount) + .documentsDeniedEquals(0) + .documentsFailedEquals(0) + .documentsRetriedEquals(0) + .documentsOutOfScopeEquals(seedCount); + }) + .hasEntrySatisfying(crawls[2].getStatus().getId(), j -> { + assertThat(j) + .hasState(JobExecutionStatus.State.FINISHED) + .hasStartTime(true) + .hasEndTime(true) + .documentsCrawledEquals(2 * seedCount) + .documentsDeniedEquals(0) + .documentsFailedEquals(0) + .documentsRetriedEquals(0) + .documentsOutOfScopeEquals(seedCount); + }); + String crawlExecutionId1 = seeds[1].get(0).getCrawlExecution(job).get().getId(); + + assertThat(rethinkDbData) + .crawlExecutionStatuses().hasSize(seedCount * numberOfJobs) + .hasEntrySatisfying(crawlExecutionId1, s -> { + assertThat(s) + .hasState(CrawlExecutionStatus.State.FINISHED) + .hasStartTime(true) + .hasEndTime(true) + .documentsCrawledEquals(2) + .documentsDeniedEquals(0) + .documentsFailedEquals(0) + .documentsRetriedEquals(0) + .documentsOutOfScopeEquals(1) + .currentUriIdCountIsEqualTo(0); + }); + + assertThat(logServiceMock.crawlLogs).hasNumberOfRequests(0); + + assertThat(redisData) + .hasQueueTotalCount(0) + .crawlHostGroups().hasNumberOfElements(0); + assertThat(redisData) + .crawlExecutionQueueCounts().hasNumberOfElements(0); + assertThat(redisData) + .sessionTokens().hasNumberOfElements(0); + assertThat(redisData) + .readyQueue().hasNumberOfElements(0); + } + + @Test + public void testSameSeedsInOneJob() throws Exception { + scopeCheckerServiceMock.withMaxHopsFromSeed(maxHopsFromSeed); + harvesterMock.withLinksPerLevel(linksPerLevel); + + ConfigObject job = crawlRunner.genJob("job"); + List seeds = crawlRunner.genSeeds(seedCount, "a.seed", job); + RunningCrawl[] crawls = new RunningCrawl[numberOfJobs]; + for (int i = 0; i < numberOfJobs; i++) { + crawls[i] = crawlRunner.runCrawl(job, seeds); + } + crawlRunner.awaitCrawlFinished(1, TimeUnit.MINUTES, crawls); + + assertThat(rethinkDbData) + .hasQueueTotalCount(0); + assertThat(rethinkDbData) + .jobExecutionStatuses().hasSize(numberOfJobs) + .hasEntrySatisfying(crawls[1].getStatus().getId(), j -> { + assertThat(j) + .hasState(JobExecutionStatus.State.FINISHED) + .hasStartTime(true) + .hasEndTime(true) + .documentsCrawledEquals(2 * seedCount) + .documentsDeniedEquals(0) + .documentsFailedEquals(0) + .documentsRetriedEquals(0) + .documentsOutOfScopeEquals(seedCount); + }) + .hasEntrySatisfying(crawls[2].getStatus().getId(), j -> { + assertThat(j) + .hasState(JobExecutionStatus.State.FINISHED) + .hasStartTime(true) + .hasEndTime(true) + .documentsCrawledEquals(2 * seedCount) + .documentsDeniedEquals(0) + .documentsFailedEquals(0) + .documentsRetriedEquals(0) + .documentsOutOfScopeEquals(seedCount); + }); + String crawlExecutionId1 = seeds.get(0).getCrawlExecution(job).get().getId(); + + assertThat(rethinkDbData) + .crawlExecutionStatuses().hasSize(seedCount * numberOfJobs) + .hasEntrySatisfying(crawlExecutionId1, s -> { + assertThat(s) + .hasState(CrawlExecutionStatus.State.FINISHED) + .hasStartTime(true) + .hasEndTime(true) + .documentsCrawledEquals(2) + .documentsDeniedEquals(0) + .documentsFailedEquals(0) + .documentsRetriedEquals(0) + .documentsOutOfScopeEquals(1) + .currentUriIdCountIsEqualTo(0); + }); + + assertThat(logServiceMock.crawlLogs).hasNumberOfRequests(0); + + assertThat(redisData) + .hasQueueTotalCount(0) + .crawlHostGroups().hasNumberOfElements(0); + assertThat(redisData) + .crawlExecutionQueueCounts().hasNumberOfElements(0); + assertThat(redisData) + .sessionTokens().hasNumberOfElements(0); + assertThat(redisData) + .readyQueue().hasNumberOfElements(0); + } +} diff --git a/src/test/java/no/nb/nna/veidemann/frontier/api/HarvestTest.java b/src/test/java/no/nb/nna/veidemann/frontier/api/HarvestTest.java index 23d6c95..2e6f109 100644 --- a/src/test/java/no/nb/nna/veidemann/frontier/api/HarvestTest.java +++ b/src/test/java/no/nb/nna/veidemann/frontier/api/HarvestTest.java @@ -1,10 +1,13 @@ package no.nb.nna.veidemann.frontier.api; +import no.nb.nna.veidemann.api.config.v1.ConfigObject; import no.nb.nna.veidemann.api.config.v1.CrawlLimitsConfig; import no.nb.nna.veidemann.api.frontier.v1.CrawlExecutionStatus; import no.nb.nna.veidemann.api.frontier.v1.JobExecutionStatus; import no.nb.nna.veidemann.commons.db.DbService; import no.nb.nna.veidemann.frontier.db.CrawlQueueManager; +import no.nb.nna.veidemann.frontier.testutil.CrawlRunner.RunningCrawl; +import no.nb.nna.veidemann.frontier.testutil.CrawlRunner.SeedAndExecutions; import no.nb.nna.veidemann.frontier.testutil.HarvesterMock; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; @@ -13,6 +16,7 @@ import org.testcontainers.junit.jupiter.Testcontainers; import redis.clients.jedis.Jedis; +import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -36,14 +40,15 @@ public void testOneSuccessfullSeed() throws Exception { scopeCheckerServiceMock.withMaxHopsFromSeed(maxHopsFromSeed); harvesterMock.withLinksPerLevel(linksPerLevel); - crawlRunner.setup(seedCount); - JobExecutionStatus jes = crawlRunner.runCrawl(); - crawlRunner.awaitCrawlFinished(); + ConfigObject job = crawlRunner.genJob("job1"); + List seeds = crawlRunner.genSeeds(seedCount, "a.seed", job); + RunningCrawl crawl = crawlRunner.runCrawl(job, seeds); + crawlRunner.awaitCrawlFinished(crawl); assertThat(rethinkDbData) .hasQueueTotalCount(0); assertThat(rethinkDbData) - .jobExecutionStatuses().hasSize(1).hasEntrySatisfying(jes.getId(), j -> { + .jobExecutionStatuses().hasSize(1).hasEntrySatisfying(crawl.getStatus().getId(), j -> { assertThat(j) .hasState(JobExecutionStatus.State.FINISHED) .hasStartTime(true) @@ -54,7 +59,8 @@ public void testOneSuccessfullSeed() throws Exception { .documentsRetriedEquals(0) .documentsOutOfScopeEquals(27); }); - String crawlExecutionId1 = crawlRunner.crawlExecutions.get(crawlRunner.seeds.get(0).getId()).get().getId(); + String crawlExecutionId1 = seeds.get(0).getCrawlExecution(job).get().getId(); + assertThat(rethinkDbData) .crawlExecutionStatuses().hasSize(seedCount) .hasEntrySatisfying(crawlExecutionId1, s -> { @@ -92,24 +98,25 @@ public void testHarvesterException() throws Exception { scopeCheckerServiceMock.withMaxHopsFromSeed(maxHopsFromSeed); // logServiceMock.withExpectedNrOfWrites(2); harvesterMock - .withExceptionForAllUrlRequests("http://stress-000000.com") - .withExceptionForUrlRequests("http://stress-000001.com", 1, 1) - .withExceptionForAllUrlRequests("http://stress-000002.com/p0/p2") + .withExceptionForAllUrlRequests("http://a.seed-000000.com") + .withExceptionForUrlRequests("http://a.seed-000001.com", 1, 1) + .withExceptionForAllUrlRequests("http://a.seed-000002.com/p0/p2") .withLinksPerLevel(linksPerLevel); - crawlRunner.setup(seedCount); - JobExecutionStatus jes = crawlRunner.runCrawl(); - crawlRunner.awaitCrawlFinished(); + ConfigObject job = crawlRunner.genJob("job1"); + List seeds = crawlRunner.genSeeds(seedCount, "a.seed", job); + RunningCrawl crawl = crawlRunner.runCrawl(job, seeds); + crawlRunner.awaitCrawlFinished(crawl); assertThat(logServiceMock.crawlLogs).hasNumberOfRequests(2); assertThat(logServiceMock.crawlLogs.get(0)) .hasWarcId() .statusCodeEquals(RETRY_LIMIT_REACHED) - .requestedUriEquals("http://stress-000000.com") + .requestedUriEquals("http://a.seed-000000.com") .error().isNotNull().codeEquals(RUNTIME_EXCEPTION); assertThat(logServiceMock.crawlLogs.get(1)) .hasWarcId() .statusCodeEquals(RETRY_LIMIT_REACHED.getCode()) - .requestedUriEquals("http://stress-000002.com/p0/p2") + .requestedUriEquals("http://a.seed-000002.com/p0/p2") .error().isNotNull().codeEquals(RUNTIME_EXCEPTION); assertThat(rethinkDbData) @@ -134,18 +141,20 @@ public void testHarvesterTimeout() throws Exception { scopeCheckerServiceMock.withMaxHopsFromSeed(maxHopsFromSeed); harvesterMock - .withLongFetchTimeForAllUrlRequests("http://stress-000000.com") - .withLongFetchTimeForUrlRequests("http://stress-000001.com/p0", 1, 1) + .withLongFetchTimeForAllUrlRequests("http://a.seed-000000.com") + .withLongFetchTimeForUrlRequests("http://a.seed-000001.com/p0", 1, 1) .withLinksPerLevel(linksPerLevel); - crawlRunner.setup(seedCount); - JobExecutionStatus jes = crawlRunner.runCrawl(); - crawlRunner.awaitCrawlFinished(); + + ConfigObject job = crawlRunner.genJob("job1"); + List seeds = crawlRunner.genSeeds(seedCount, "a.seed", job); + RunningCrawl crawl = crawlRunner.runCrawl(job, seeds); + crawlRunner.awaitCrawlFinished(crawl); assertThat(logServiceMock.crawlLogs).hasNumberOfRequests(1); assertThat(logServiceMock.crawlLogs.get(0)) .hasWarcId() .statusCodeEquals(RETRY_LIMIT_REACHED) - .requestedUriEquals("http://stress-000000.com") + .requestedUriEquals("http://a.seed-000000.com") .error().isNotNull().codeEquals(RUNTIME_EXCEPTION); assertThat(rethinkDbData) @@ -169,14 +178,15 @@ public void testHarvesterClosed() throws Exception { int maxHopsFromSeed = 2; scopeCheckerServiceMock.withMaxHopsFromSeed(maxHopsFromSeed); - crawlRunner.setup(seedCount); + ConfigObject job = crawlRunner.genJob("job1"); + List seeds = crawlRunner.genSeeds(seedCount, "a.seed", job); - Thread.sleep(1000); harvesterMock.close(); +// Thread.sleep(1000); harvesterMock = new HarvesterMock(settings).start(); - JobExecutionStatus jes = crawlRunner.runCrawl(); - crawlRunner.awaitCrawlFinished(60, TimeUnit.SECONDS); + RunningCrawl crawl = crawlRunner.runCrawl(job, seeds); + crawlRunner.awaitCrawlFinished(crawl); assertThat(logServiceMock.crawlLogs).hasNumberOfRequests(0); @@ -202,11 +212,12 @@ public void testDnsFailureOnce() throws Exception { scopeCheckerServiceMock.withMaxHopsFromSeed(maxHopsFromSeed); harvesterMock.withLinksPerLevel(linksPerLevel); - dnsResolverMock.withFetchErrorForHostRequests("stress-000000.com", 1, 1); + dnsResolverMock.withFetchErrorForHostRequests("a.seed-000000.com", 1, 1); - crawlRunner.setup(seedCount); - JobExecutionStatus jes = crawlRunner.runCrawl(); - crawlRunner.awaitCrawlFinished(); + ConfigObject job = crawlRunner.genJob("job1"); + List seeds = crawlRunner.genSeeds(seedCount, "a.seed", job); + RunningCrawl crawl = crawlRunner.runCrawl(job, seeds); + crawlRunner.awaitCrawlFinished(crawl); assertThat(logServiceMock.crawlLogs).hasNumberOfRequests(0); @@ -232,11 +243,12 @@ public void testDnsFailureTwice() throws Exception { scopeCheckerServiceMock.withMaxHopsFromSeed(maxHopsFromSeed); harvesterMock.withLinksPerLevel(linksPerLevel); - dnsResolverMock.withFetchErrorForHostRequests("stress-000000.com", 1, 2); + dnsResolverMock.withFetchErrorForHostRequests("a.seed-000000.com", 1, 2); - crawlRunner.setup(seedCount); - JobExecutionStatus jes = crawlRunner.runCrawl(); - crawlRunner.awaitCrawlFinished(); + ConfigObject job = crawlRunner.genJob("job1"); + List seeds = crawlRunner.genSeeds(seedCount, "a.seed", job); + RunningCrawl crawl = crawlRunner.runCrawl(job, seeds); + crawlRunner.awaitCrawlFinished(crawl); assertThat(logServiceMock.crawlLogs).hasNumberOfRequests(0); @@ -262,17 +274,18 @@ public void testDnsFailureThreeTimes() throws Exception { scopeCheckerServiceMock.withMaxHopsFromSeed(maxHopsFromSeed); harvesterMock.withLinksPerLevel(linksPerLevel); - dnsResolverMock.withFetchErrorForAllHostRequests("stress-000000.com"); + dnsResolverMock.withFetchErrorForAllHostRequests("a.seed-000000.com"); - crawlRunner.setup(seedCount); - JobExecutionStatus jes = crawlRunner.runCrawl(); - crawlRunner.awaitCrawlFinished(); + ConfigObject job = crawlRunner.genJob("job1"); + List seeds = crawlRunner.genSeeds(seedCount, "a.seed", job); + RunningCrawl crawl = crawlRunner.runCrawl(job, seeds); + crawlRunner.awaitCrawlFinished(crawl); assertThat(logServiceMock.crawlLogs).hasNumberOfRequests(1); assertThat(logServiceMock.crawlLogs.get(0)) .hasWarcId() .statusCodeEquals(RETRY_LIMIT_REACHED) - .requestedUriEquals("http://stress-000000.com") + .requestedUriEquals("http://a.seed-000000.com") .error().isNotNull().codeEquals(FAILED_DNS); assertThat(rethinkDbData) @@ -297,17 +310,18 @@ public void testDnsExceptionThreeTimes() throws Exception { scopeCheckerServiceMock.withMaxHopsFromSeed(maxHopsFromSeed); harvesterMock.withLinksPerLevel(linksPerLevel); - dnsResolverMock.withExceptionForAllHostRequests("stress-000000.com"); + dnsResolverMock.withExceptionForAllHostRequests("a.seed-000000.com"); - crawlRunner.setup(seedCount); - JobExecutionStatus jes = crawlRunner.runCrawl(); - crawlRunner.awaitCrawlFinished(); + ConfigObject job = crawlRunner.genJob("job1"); + List seeds = crawlRunner.genSeeds(seedCount, "a.seed", job); + RunningCrawl crawl = crawlRunner.runCrawl(job, seeds); + crawlRunner.awaitCrawlFinished(crawl); assertThat(logServiceMock.crawlLogs).hasNumberOfRequests(1); assertThat(logServiceMock.crawlLogs.get(0)) .hasWarcId() .statusCodeEquals(RETRY_LIMIT_REACHED) - .requestedUriEquals("http://stress-000000.com") + .requestedUriEquals("http://a.seed-000000.com") .error().isNotNull().codeEquals(FAILED_DNS); assertThat(rethinkDbData) @@ -333,42 +347,43 @@ public void testDeniedByRobotsTxt() throws Exception { scopeCheckerServiceMock.withMaxHopsFromSeed(maxHopsFromSeed); harvesterMock.withLinksPerLevel(linksPerLevel); robotsEvaluatorMock - .withFetchDenialForUrl("http://stress-000000.com") - .withFetchDenialForUrl("http://stress-000001.com/p0") - .withFetchDenialForUrl("http://stress-000001.com/p1") - .withExceptionForUrl("http://stress-000001.com/p2"); + .withFetchDenialForUrl("http://a.seed-000000.com") + .withFetchDenialForUrl("http://a.seed-000001.com/p0") + .withFetchDenialForUrl("http://a.seed-000001.com/p1") + .withExceptionForUrl("http://a.seed-000001.com/p2"); - crawlRunner.setup(seedCount); - JobExecutionStatus jes = crawlRunner.runCrawl(); - crawlRunner.awaitCrawlFinished(); + ConfigObject job = crawlRunner.genJob("job1"); + List seeds = crawlRunner.genSeeds(seedCount, "a.seed", job); + RunningCrawl crawl = crawlRunner.runCrawl(job, seeds); + crawlRunner.awaitCrawlFinished(crawl); assertThat(logServiceMock.crawlLogs).hasNumberOfRequests(3) .hasRequestSatisfying(r -> { assertThat(r) .hasWarcId() .statusCodeEquals(PRECLUDED_BY_ROBOTS) - .requestedUriEquals("http://stress-000000.com") + .requestedUriEquals("http://a.seed-000000.com") .error().isNotNull().codeEquals(PRECLUDED_BY_ROBOTS); }) .hasRequestSatisfying(r -> { assertThat(r) .hasWarcId() .statusCodeEquals(PRECLUDED_BY_ROBOTS) - .requestedUriEquals("http://stress-000001.com/p0") + .requestedUriEquals("http://a.seed-000001.com/p0") .error().isNotNull().codeEquals(PRECLUDED_BY_ROBOTS); }) .hasRequestSatisfying(r -> { assertThat(r) .hasWarcId() .statusCodeEquals(PRECLUDED_BY_ROBOTS) - .requestedUriEquals("http://stress-000001.com/p1") + .requestedUriEquals("http://a.seed-000001.com/p1") .error().isNotNull().codeEquals(PRECLUDED_BY_ROBOTS); }); assertThat(rethinkDbData) .hasQueueTotalCount(0); assertThat(rethinkDbData) - .jobExecutionStatuses().hasSize(1).hasEntrySatisfying(jes.getId(), j -> { + .jobExecutionStatuses().hasSize(1).hasEntrySatisfying(crawl.getStatus().getId(), j -> { assertThat(j) .hasState(JobExecutionStatus.State.FINISHED) .hasStartTime(true) @@ -379,8 +394,8 @@ public void testDeniedByRobotsTxt() throws Exception { .documentsRetriedEquals(0) .documentsOutOfScopeEquals(9); }); - String crawlExecutionId1 = crawlRunner.crawlExecutions.get(crawlRunner.seeds.get(0).getId()).get().getId(); - String crawlExecutionId2 = crawlRunner.crawlExecutions.get(crawlRunner.seeds.get(1).getId()).get().getId(); + String crawlExecutionId1 = seeds.get(0).getCrawlExecution(job).get().getId(); + String crawlExecutionId2 = seeds.get(1).getCrawlExecution(job).get().getId(); assertThat(rethinkDbData) .crawlExecutionStatuses().hasSize(seedCount) .hasEntrySatisfying(crawlExecutionId1, s -> { @@ -427,19 +442,20 @@ public void testRecheckScope() throws Exception { scopeCheckerServiceMock .withMaxHopsFromSeed(maxHopsFromSeed) - .withDenialForUrlRequests("http://stress-000000.com/p0", 2, 2); + .withDenialForUrlRequests("http://a.seed-000000.com/p0", 2, 2); harvesterMock.withLinksPerLevel(linksPerLevel); - crawlRunner.setup(seedCount); - JobExecutionStatus jes = crawlRunner.runCrawl(); - crawlRunner.awaitCrawlFinished(); + ConfigObject job = crawlRunner.genJob("job1"); + List seeds = crawlRunner.genSeeds(seedCount, "a.seed", job); + RunningCrawl crawl = crawlRunner.runCrawl(job, seeds); + crawlRunner.awaitCrawlFinished(crawl); assertThat(logServiceMock.crawlLogs).hasNumberOfRequests(0); assertThat(rethinkDbData) .hasQueueTotalCount(0); assertThat(rethinkDbData) - .jobExecutionStatuses().hasSize(1).hasEntrySatisfying(jes.getId(), j -> { + .jobExecutionStatuses().hasSize(1).hasEntrySatisfying(crawl.getStatus().getId(), j -> { assertThat(j) .hasState(JobExecutionStatus.State.FINISHED) .hasStartTime(true) @@ -458,7 +474,7 @@ public void testRecheckScope() throws Exception { .executionsStateCountEquals(CrawlExecutionStatus.State.FETCHING, 0) .executionsStateCountEquals(CrawlExecutionStatus.State.SLEEPING, 0); }); - String crawlExecutionId1 = crawlRunner.crawlExecutions.get(crawlRunner.seeds.get(0).getId()).get().getId(); + String crawlExecutionId1 = seeds.get(0).getCrawlExecution(job).get().getId(); assertThat(rethinkDbData) .crawlExecutionStatuses().hasSize(seedCount) .hasEntrySatisfying(crawlExecutionId1, s -> { @@ -494,14 +510,15 @@ public void testAbortCrawlExecutionEarly() throws Exception { scopeCheckerServiceMock.withMaxHopsFromSeed(maxHopsFromSeed); harvesterMock.withLinksPerLevel(linksPerLevel); - crawlRunner.setup(seedCount); - JobExecutionStatus jes = crawlRunner.runCrawl(); + ConfigObject job = crawlRunner.genJob("job1"); + List seeds = crawlRunner.genSeeds(seedCount, "a.seed", job); + RunningCrawl crawl = crawlRunner.runCrawl(job, seeds); // Abort the first execution as soon as it is created - String crawlExecutionId = crawlRunner.crawlExecutions.get(crawlRunner.seeds.get(0).getId()).get().getId(); + String crawlExecutionId = seeds.get(0).getCrawlExecution(job).get().getId(); DbService.getInstance().getExecutionsAdapter().setCrawlExecutionStateAborted(crawlExecutionId, CrawlExecutionStatus.State.ABORTED_MANUAL); - crawlRunner.awaitCrawlFinished(); + crawlRunner.awaitCrawlFinished(crawl); assertThat(logServiceMock.crawlLogs).hasNumberOfRequests(0); @@ -523,7 +540,7 @@ public void testAbortCrawlExecutionEarly() throws Exception { }); assertThat(rethinkDbData) .jobExecutionStatuses().hasSize(1) - .hasEntrySatisfying(jes.getId(), s -> { + .hasEntrySatisfying(crawl.getStatus().getId(), s -> { assertThat(s) .hasState(JobExecutionStatus.State.FINISHED) .hasStartTime(true) @@ -555,16 +572,17 @@ public void testAbortCrawlExecutionLate() throws Exception { scopeCheckerServiceMock.withMaxHopsFromSeed(maxHopsFromSeed); harvesterMock.withLinksPerLevel(linksPerLevel); - crawlRunner.setup(seedCount); - JobExecutionStatus jes = crawlRunner.runCrawl(); + ConfigObject job = crawlRunner.genJob("job1"); + List seeds = crawlRunner.genSeeds(seedCount, "a.seed", job); + RunningCrawl crawl = crawlRunner.runCrawl(job, seeds); - String crawlExecutionId1 = crawlRunner.crawlExecutions.get(crawlRunner.seeds.get(0).getId()).get().getId(); - String crawlExecutionId2 = crawlRunner.crawlExecutions.get(crawlRunner.seeds.get(1).getId()).get().getId(); - String crawlExecutionId3 = crawlRunner.crawlExecutions.get(crawlRunner.seeds.get(2).getId()).get().getId(); - String crawlExecutionId4 = crawlRunner.crawlExecutions.get(crawlRunner.seeds.get(3).getId()).get().getId(); + String crawlExecutionId1 = seeds.get(0).getCrawlExecution(job).get().getId(); + String crawlExecutionId2 = seeds.get(1).getCrawlExecution(job).get().getId(); + String crawlExecutionId3 = seeds.get(2).getCrawlExecution(job).get().getId(); + String crawlExecutionId4 = seeds.get(3).getCrawlExecution(job).get().getId(); // Abort the first execution as soon as it is fetching - await().pollDelay(100, TimeUnit.MILLISECONDS).pollInterval(100, TimeUnit.MILLISECONDS).atMost(30, TimeUnit.SECONDS) + await().pollDelay(10, TimeUnit.MILLISECONDS).pollInterval(10, TimeUnit.MILLISECONDS).atMost(30, TimeUnit.SECONDS) .until(() -> { CrawlExecutionStatus ces = DbService.getInstance().getExecutionsAdapter().getCrawlExecutionStatus(crawlExecutionId1); if (ces.getState() == CrawlExecutionStatus.State.FETCHING) { @@ -576,7 +594,7 @@ public void testAbortCrawlExecutionLate() throws Exception { // Abort the second execution as soon as it is sleeping - await().pollDelay(100, TimeUnit.MILLISECONDS).pollInterval(10, TimeUnit.MILLISECONDS).atMost(30, TimeUnit.SECONDS) + await().pollDelay(10, TimeUnit.MILLISECONDS).pollInterval(10, TimeUnit.MILLISECONDS).atMost(30, TimeUnit.SECONDS) .until(() -> { CrawlExecutionStatus ces = DbService.getInstance().getExecutionsAdapter().getCrawlExecutionStatus(crawlExecutionId2); if (ces.getState() == CrawlExecutionStatus.State.SLEEPING) { @@ -588,7 +606,7 @@ public void testAbortCrawlExecutionLate() throws Exception { // Abort the third execution as soon as it is finished - await().pollDelay(100, TimeUnit.MILLISECONDS).pollInterval(10, TimeUnit.MILLISECONDS).atMost(30, TimeUnit.SECONDS) + await().pollDelay(10, TimeUnit.MILLISECONDS).pollInterval(10, TimeUnit.MILLISECONDS).atMost(30, TimeUnit.SECONDS) .until(() -> { CrawlExecutionStatus ces = DbService.getInstance().getExecutionsAdapter().getCrawlExecutionStatus(crawlExecutionId3); if (ces.getState() == CrawlExecutionStatus.State.FINISHED) { @@ -599,7 +617,7 @@ public void testAbortCrawlExecutionLate() throws Exception { }); // Wait for crawl to finish - crawlRunner.awaitCrawlFinished(); + crawlRunner.awaitCrawlFinished(crawl); assertThat(logServiceMock.crawlLogs).hasNumberOfRequests(0); @@ -661,7 +679,7 @@ public void testAbortCrawlExecutionLate() throws Exception { }); assertThat(rethinkDbData) .jobExecutionStatuses().hasSize(1) - .hasEntrySatisfying(jes.getId(), j -> { + .hasEntrySatisfying(crawl.getStatus().getId(), j -> { assertThat(j) .hasState(JobExecutionStatus.State.FINISHED) .hasStartTime(true) @@ -702,24 +720,25 @@ public void testAbortJobExecution() throws Exception { harvesterMock.withLinksPerLevel(linksPerLevel); dnsResolverMock.withSimulatedLookupTimeMs(300); - crawlRunner.setup(seedCount); - JobExecutionStatus jes = crawlRunner.runCrawl(); + ConfigObject job = crawlRunner.genJob("job1"); + List seeds = crawlRunner.genSeeds(seedCount, "a.seed", job); + RunningCrawl crawl = crawlRunner.runCrawl(job, seeds); // Abort the first execution as soon as one seed is completed await().pollDelay(100, TimeUnit.MILLISECONDS).pollInterval(100, TimeUnit.MILLISECONDS).atMost(30, TimeUnit.SECONDS) .until(() -> { try (Jedis jedis = jedisPool.getResource()) { - Map f = jedis.hgetAll(CrawlQueueManager.JOB_EXECUTION_PREFIX + jes.getId()); + Map f = jedis.hgetAll(CrawlQueueManager.JOB_EXECUTION_PREFIX + crawl.getStatus().getId()); if (!f.getOrDefault("FINISHED", "0").equals("0")) { return true; } return false; } }); - DbService.getInstance().getExecutionsAdapter().setJobExecutionStateAborted(jes.getId()); + DbService.getInstance().getExecutionsAdapter().setJobExecutionStateAborted(crawl.getStatus().getId()); // Wait for crawl to finish - crawlRunner.awaitCrawlFinished(); + crawlRunner.awaitCrawlFinished(crawl); assertThat(logServiceMock.crawlLogs).hasNumberOfRequests(0); @@ -729,7 +748,7 @@ public void testAbortJobExecution() throws Exception { .crawlExecutionStatuses().hasSize(seedCount); assertThat(rethinkDbData) .jobExecutionStatuses().hasSize(1) - .hasEntrySatisfying(jes.getId(), j -> { + .hasEntrySatisfying(crawl.getStatus().getId(), j -> { assertThat(j) .hasState(JobExecutionStatus.State.ABORTED_MANUAL) .hasStartTime(true) @@ -773,9 +792,10 @@ public void testAbortTimeout() throws Exception { .withFetchTime(200); dnsResolverMock.withSimulatedLookupTimeMs(300); - crawlRunner.setup(seedCount, CrawlLimitsConfig.newBuilder().setMaxDurationS(5).build()); - JobExecutionStatus jes = crawlRunner.runCrawl(); - crawlRunner.awaitCrawlFinished(); + ConfigObject job = crawlRunner.genJob("job1", CrawlLimitsConfig.newBuilder().setMaxDurationS(5).build()); + List seeds = crawlRunner.genSeeds(seedCount, "a.seed", job); + RunningCrawl crawl = crawlRunner.runCrawl(job, seeds); + crawlRunner.awaitCrawlFinished(2, TimeUnit.MINUTES, crawl); assertThat(logServiceMock.crawlLogs).hasNumberOfRequests(0); @@ -785,7 +805,7 @@ public void testAbortTimeout() throws Exception { .crawlExecutionStatuses().hasSize(20); assertThat(rethinkDbData) .jobExecutionStatuses().hasSize(1) - .hasEntrySatisfying(jes.getId(), j -> { + .hasEntrySatisfying(crawl.getStatus().getId(), j -> { assertThat(j) .hasState(JobExecutionStatus.State.FINISHED) .hasStartTime(true) diff --git a/src/test/java/no/nb/nna/veidemann/frontier/api/TracerTest.java b/src/test/java/no/nb/nna/veidemann/frontier/api/TracerTest.java index 6fa1445..e215118 100644 --- a/src/test/java/no/nb/nna/veidemann/frontier/api/TracerTest.java +++ b/src/test/java/no/nb/nna/veidemann/frontier/api/TracerTest.java @@ -1,7 +1,9 @@ package no.nb.nna.veidemann.frontier.api; import io.opentracing.mock.MockSpan; -import no.nb.nna.veidemann.api.frontier.v1.JobExecutionStatus; +import no.nb.nna.veidemann.api.config.v1.ConfigObject; +import no.nb.nna.veidemann.frontier.testutil.CrawlRunner.RunningCrawl; +import no.nb.nna.veidemann.frontier.testutil.CrawlRunner.SeedAndExecutions; import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; @@ -14,7 +16,6 @@ import java.util.Map; import java.util.SortedSet; import java.util.TreeSet; -import java.util.concurrent.TimeUnit; @Testcontainers @Tag("integration") @@ -37,9 +38,10 @@ public void testOneSuccessfullSeed() throws Exception { scopeCheckerServiceMock.withMaxHopsFromSeed(maxHopsFromSeed); harvesterMock.withLinksPerLevel(linksPerLevel); - crawlRunner.setup(seedCount); - JobExecutionStatus jes = crawlRunner.runCrawl(); - crawlRunner.awaitCrawlFinished(30, TimeUnit.SECONDS); + ConfigObject job = crawlRunner.genJob("job1"); + List seeds = crawlRunner.genSeeds(seedCount, "a.seed", job); + RunningCrawl crawl = crawlRunner.runCrawl(job, seeds); + crawlRunner.awaitCrawlFinished(crawl); List finishedSpans = tracer.finishedSpans(); class item implements Comparable { diff --git a/src/test/java/no/nb/nna/veidemann/frontier/testutil/CrawlRunner.java b/src/test/java/no/nb/nna/veidemann/frontier/testutil/CrawlRunner.java index 2d58973..e20cff6 100644 --- a/src/test/java/no/nb/nna/veidemann/frontier/testutil/CrawlRunner.java +++ b/src/test/java/no/nb/nna/veidemann/frontier/testutil/CrawlRunner.java @@ -41,15 +41,20 @@ import redis.clients.jedis.JedisPool; import java.time.Duration; -import java.time.Instant; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; public class CrawlRunner implements AutoCloseable { @@ -57,16 +62,11 @@ public class CrawlRunner implements AutoCloseable { ConfigAdapter c = DbService.getInstance().getConfigAdapter(); ExecutionsAdapter e = DbService.getInstance().getExecutionsAdapter(); - public List seeds = new ArrayList<>(); - public Map> crawlExecutions = new HashMap<>(); - public ConfigObject crawlJob; private final ManagedChannel frontierChannel; private final FrontierGrpc.FrontierBlockingStub frontierStub; private final RethinkDbData rethinkDbData; private final JedisPool jedisPool; - - JobExecutionStatus jes; - Instant testStart; + private final Map jobExecIdToJobName = new HashMap<>(); public CrawlRunner(Settings settings, RethinkDbData rethinkDbData, JedisPool jedisPool) { frontierChannel = ManagedChannelBuilder.forAddress("localhost", settings.getApiPort()).usePlaintext().build(); @@ -75,11 +75,11 @@ public CrawlRunner(Settings settings, RethinkDbData rethinkDbData, JedisPool jed this.jedisPool = jedisPool; } - public void setup(int seedCount) throws DbException { - setup(seedCount, CrawlLimitsConfig.getDefaultInstance()); + public ConfigObject genJob(String name) throws DbException { + return genJob(name, CrawlLimitsConfig.getDefaultInstance()); } - public void setup(int seedCount, CrawlLimitsConfig limits) throws DbException { + public ConfigObject genJob(String name, CrawlLimitsConfig limits) throws DbException { ConfigObject.Builder defaultCrawlHostGroupConfig = c.getConfigObject(ConfigRef.newBuilder() .setKind(Kind.crawlHostGroupConfig).setId("chg-default") .build()) @@ -131,96 +131,150 @@ public void setup(int seedCount, CrawlLimitsConfig limits) throws DbException { ConfigObject.Builder crawlJobBuilder = ConfigObject.newBuilder() .setApiVersion("v1") .setKind(Kind.crawlJob); - crawlJobBuilder.getMetaBuilder().setName("stress"); + crawlJobBuilder.getMetaBuilder().setName(name); crawlJobBuilder.getCrawlJobBuilder() .setCrawlConfigRef(ApiTools.refForConfig(crawlConfig)) .setScopeScriptRef(ApiTools.refForConfig(scopeScript)) .setLimits(limits); - crawlJob = c.saveConfigObject(crawlJobBuilder.build()); + return c.saveConfigObject(crawlJobBuilder.build()); + } - genSeeds(ApiTools.refForConfig(crawlJob), seedCount); + public List genSeeds(int count, String hostPrefix, ConfigObject... jobs) throws DbException { + return genSeeds(0, count, hostPrefix, jobs); } - public void genSeeds(ConfigRef jobRef, int count) throws DbException { - System.out.print("Generating seeds "); - for (int i = 0; i < count; i++) { + public List genSeeds(int offset, int count, String hostPrefix, ConfigObject... jobs) throws DbException { + LOG.info("Generating {} seeds with prefix '{}'", count, hostPrefix); + + Set jobRefs = Arrays.stream(jobs).map(j -> ApiTools.refForConfig(j)).collect(Collectors.toSet()); + ArrayList seeds = new ArrayList<>(); + + for (int i = offset; i < offset + count; i++) { + String name = String.format("%s-%06d", hostPrefix, i); + String url = String.format("http://%s-%06d.com", hostPrefix, i); + ConfigObject.Builder entityBuilder = ConfigObject.newBuilder() .setApiVersion("v1") .setKind(Kind.crawlEntity); - entityBuilder.getMetaBuilder().setName("stress-" + i); + entityBuilder.getMetaBuilder().setName(name); ConfigObject entity = c.saveConfigObject(entityBuilder.build()); - String url = String.format("http://stress-%06d.com", i); ConfigObject.Builder seedBuilder = ConfigObject.newBuilder() .setApiVersion("v1") .setKind(Kind.seed); seedBuilder.getMetaBuilder().setName(url); seedBuilder.getSeedBuilder() .setEntityRef(ApiTools.refForConfig(entity)) - .addJobRef(jobRef); + .addAllJobRef(jobRefs); ConfigObject seed = c.saveConfigObject(seedBuilder.build()); - seeds.add(seed); - crawlExecutions.put(seed.getId(), SettableFuture.create()); - System.out.print("."); -// if (i == 10) { -// seed = c.saveConfigObject(seedBuilder.build()); -// seeds.add(seed); -// } + seeds.add(new SeedAndExecutions(seed, jobRefs)); } - System.out.println(" DONE"); System.out.flush(); - try { - Thread.sleep(500); - } catch (InterruptedException interruptedException) { - interruptedException.printStackTrace(); - } + return seeds; } - public JobExecutionStatus runCrawl() throws DbException { - System.out.print("Submitting seeds to job "); - jes = e.createJobExecutionStatus(crawlJob.getId()); - for (ConfigObject seed : seeds) { - ForkJoinPool.commonPool().submit((Callable) () -> { + public RunningCrawl runCrawl(ConfigObject crawlJob, List seeds) throws DbException { + LOG.info("Submitting seeds to job '{}'", crawlJob.getMeta().getName()); + JobExecutionStatus jes = e.createJobExecutionStatus(crawlJob.getId()); + ForkJoinPool.commonPool().submit((Callable) () -> { + for (SeedAndExecutions seed : seeds) { Builder requestBuilder = CrawlSeedRequest.newBuilder() .setJob(crawlJob) - .setSeed(seed) + .setSeed(seed.seed) .setJobExecutionId(jes.getId()); CrawlExecutionId ceid = frontierStub.crawlSeed(requestBuilder.build()); - crawlExecutions.get(seed.getId()).set(ceid); - return null; - }); - System.out.print("."); - } - System.out.println(" DONE"); - testStart = Instant.now(); - return jes; + seed.crawlExecutions.get(crawlJob.getId()).set(ceid); + } + return null; + }); + RunningCrawl c = new RunningCrawl(); + c.jobName = crawlJob.getMeta().getName(); + c.jes = jes; + return c; } - public void awaitCrawlFinished() { - awaitCrawlFinished(30, TimeUnit.SECONDS); + public void awaitCrawlFinished(RunningCrawl... runningCrawls) { + awaitCrawlFinished(30, TimeUnit.SECONDS, runningCrawls); } - public Duration awaitCrawlFinished(long timeout, TimeUnit unit) { + public Duration awaitCrawlFinished(long timeout, TimeUnit unit, RunningCrawl... runningCrawls) { + AtomicInteger emptyChgKeysCount = new AtomicInteger(0); await().pollDelay(1, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS).atMost(timeout, unit) .until(() -> { - JobExecutionStatus j = DbService.getInstance().getExecutionsAdapter().getJobExecutionStatus(jes.getId()); - if (LOG.isInfoEnabled() && j.getExecutionsStateCount() > 0) { - LOG.info("Job State {}, Executions: {}", j.getState(), j.getExecutionsStateMap()); + Set chgKeys = jedisPool.getResource().keys("chg*"); + if (chgKeys.isEmpty()) { + emptyChgKeysCount.incrementAndGet(); } - if (State.RUNNING != j.getState() && rethinkDbData.getQueuedUris().isEmpty() && jedisPool.getResource().keys("*").size() <= 1) { + + List statuses = Arrays.stream(runningCrawls) + .map(j -> { + try { + j.jes = DbService.getInstance().getExecutionsAdapter().getJobExecutionStatus(j.jes.getId()); + return j; + } catch (DbException e) { + throw new RuntimeException(e); + } + }) + .filter(j -> j.jes.getState() == State.RUNNING) + .peek(j -> { + if (LOG.isTraceEnabled()) { + LOG.trace("Job '{}' {}, Executions: CREATED={}, FETCHING={}, SLEEPING={}, FINISHED={}, ABORTED_TIMEOUT={}, ABORTED_SIZE={}, ABORTED_MANUAL={}, FAILED={}", + j.jobName, j.jes.getState(), + j.jes.getExecutionsStateMap().getOrDefault("CREATED", 0), + j.jes.getExecutionsStateMap().getOrDefault("FETCHING", 0), + j.jes.getExecutionsStateMap().getOrDefault("SLEEPING", 0), + j.jes.getExecutionsStateMap().getOrDefault("FINISHED", 0), + j.jes.getExecutionsStateMap().getOrDefault("ABORTED_TIMEOUT", 0), + j.jes.getExecutionsStateMap().getOrDefault("ABORTED_SIZE", 0), + j.jes.getExecutionsStateMap().getOrDefault("ABORTED_MANUAL", 0), + j.jes.getExecutionsStateMap().getOrDefault("FAILED", 0)); + } + }).collect(Collectors.toList()); + + if (statuses.stream().allMatch(j -> State.RUNNING != j.jes.getState()) && rethinkDbData.getQueuedUris().isEmpty() && jedisPool.getResource().keys("*").size() <= 1) { return true; } + if (statuses.stream().anyMatch(j -> State.RUNNING == j.jes.getState())) { + assertThat(emptyChgKeysCount).as("Crawl is not finished, but redis chg keys are missing").hasValueLessThan(3); + } + LOG.debug("Still running: {}", statuses.size()); return false; }); - Duration testTime = Duration.between(testStart, Instant.now()); - LOG.info(String.format("Test time: %02d:%02d:%02d.%d", - testTime.toHoursPart(), testTime.toMinutesPart(), testTime.toSecondsPart(), testTime.toMillisPart())); - return testTime; + return null; } @Override public void close() throws Exception { frontierChannel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS); } + + public static class SeedAndExecutions { + final ConfigObject seed; + Map> crawlExecutions = new HashMap<>(); + + public SeedAndExecutions(ConfigObject seed, Collection jobRefs) { + this.seed = seed; + for (ConfigRef r : jobRefs) { + crawlExecutions.put(r.getId(), SettableFuture.create()); + } + } + + public ConfigObject getSeed() { + return seed; + } + + public SettableFuture getCrawlExecution(ConfigObject job) { + return crawlExecutions.get(job.getId()); + } + } + + public static class RunningCrawl { + String jobName; + JobExecutionStatus jes; + + public JobExecutionStatus getStatus() { + return jes; + } + } } diff --git a/src/test/java/no/nb/nna/veidemann/frontier/testutil/DnsResolverMock.java b/src/test/java/no/nb/nna/veidemann/frontier/testutil/DnsResolverMock.java index 6220477..06f36d0 100644 --- a/src/test/java/no/nb/nna/veidemann/frontier/testutil/DnsResolverMock.java +++ b/src/test/java/no/nb/nna/veidemann/frontier/testutil/DnsResolverMock.java @@ -36,7 +36,7 @@ public class DnsResolverMock implements AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(DnsResolverMock.class); - Pattern seedNumPattern = Pattern.compile("stress-(\\d\\d)(\\d\\d)(\\d\\d).com"); + Pattern seedNumPattern = Pattern.compile(".+-(\\d\\d)(\\d\\d)(\\d\\d).com"); long simulatedLookupTimeMs = 0L; diff --git a/src/test/resources/log4j2.xml b/src/test/resources/log4j2.xml index 178fb3d..cc1078c 100644 --- a/src/test/resources/log4j2.xml +++ b/src/test/resources/log4j2.xml @@ -2,7 +2,7 @@ - + @@ -12,6 +12,12 @@ + + + + + + From 0925ba89201d5fc50a9a19c9f6ca9554a34980a5 Mon Sep 17 00:00:00 2001 From: John Erik Halse Date: Wed, 2 Jun 2021 16:46:56 +0200 Subject: [PATCH 2/4] Improved tests --- .../frontier/api/HarvestMultipleJobsTest.java | 97 ++++++------------- .../db/script/CrawlHostGroupCodecTest.java | 6 +- .../frontier/testutil/CrawlRunner.java | 36 ++++++- .../frontier/worker/CrawlHostGroupTest.java | 28 +++--- src/test/resources/log4j2.xml | 13 ++- 5 files changed, 92 insertions(+), 88 deletions(-) diff --git a/src/test/java/no/nb/nna/veidemann/frontier/api/HarvestMultipleJobsTest.java b/src/test/java/no/nb/nna/veidemann/frontier/api/HarvestMultipleJobsTest.java index ff4175f..08ce94e 100644 --- a/src/test/java/no/nb/nna/veidemann/frontier/api/HarvestMultipleJobsTest.java +++ b/src/test/java/no/nb/nna/veidemann/frontier/api/HarvestMultipleJobsTest.java @@ -5,6 +5,8 @@ import no.nb.nna.veidemann.api.frontier.v1.JobExecutionStatus; import no.nb.nna.veidemann.frontier.testutil.CrawlRunner.RunningCrawl; import no.nb.nna.veidemann.frontier.testutil.CrawlRunner.SeedAndExecutions; +import org.assertj.core.api.Assertions; +import org.assertj.core.presentation.StandardRepresentation; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import org.slf4j.Logger; @@ -28,8 +30,27 @@ public class HarvestMultipleJobsTest extends no.nb.nna.veidemann.frontier.testut int maxHopsFromSeed = 1; int numberOfJobs = 40; + public class CustomRepresentation extends StandardRepresentation { + // override fallbackToStringOf to handle Example formatting + @Override + public String fallbackToStringOf(Object o) { + if (o instanceof JobExecutionStatus) { + JobExecutionStatus jes = (JobExecutionStatus) o; + return jes.getId(); + } + if (o instanceof CrawlExecutionStatus) { + CrawlExecutionStatus ces = (CrawlExecutionStatus) o; + return ces.getId(); + } + // fallback to default formatting. + return super.fallbackToStringOf(o); + } + } + @Test public void testSameSeedsInParallellJobs() throws Exception { + Assertions.useRepresentation(new CustomRepresentation()); + scopeCheckerServiceMock.withMaxHopsFromSeed(maxHopsFromSeed); harvesterMock.withLinksPerLevel(linksPerLevel); @@ -48,18 +69,7 @@ public void testSameSeedsInParallellJobs() throws Exception { .hasQueueTotalCount(0); assertThat(rethinkDbData) .jobExecutionStatuses().hasSize(numberOfJobs) - .hasEntrySatisfying(crawls[1].getStatus().getId(), j -> { - assertThat(j) - .hasState(JobExecutionStatus.State.FINISHED) - .hasStartTime(true) - .hasEndTime(true) - .documentsCrawledEquals(2 * seedCount) - .documentsDeniedEquals(0) - .documentsFailedEquals(0) - .documentsRetriedEquals(0) - .documentsOutOfScopeEquals(seedCount); - }) - .hasEntrySatisfying(crawls[2].getStatus().getId(), j -> { + .allSatisfy((id, j) -> { assertThat(j) .hasState(JobExecutionStatus.State.FINISHED) .hasStartTime(true) @@ -70,11 +80,10 @@ public void testSameSeedsInParallellJobs() throws Exception { .documentsRetriedEquals(0) .documentsOutOfScopeEquals(seedCount); }); - String crawlExecutionId1 = seeds.get(0).getCrawlExecution(jobs[1]).get().getId(); assertThat(rethinkDbData) .crawlExecutionStatuses().hasSize(seedCount * numberOfJobs) - .hasEntrySatisfying(crawlExecutionId1, s -> { + .allSatisfy((id, s) -> { assertThat(s) .hasState(CrawlExecutionStatus.State.FINISHED) .hasStartTime(true) @@ -124,18 +133,7 @@ public void testUniqueSeedsWithSameIpInParallellJobs() throws Exception { .hasQueueTotalCount(0); assertThat(rethinkDbData) .jobExecutionStatuses().hasSize(numberOfJobs) - .hasEntrySatisfying(crawls[1].getStatus().getId(), j -> { - assertThat(j) - .hasState(JobExecutionStatus.State.FINISHED) - .hasStartTime(true) - .hasEndTime(true) - .documentsCrawledEquals(2 * seedCount) - .documentsDeniedEquals(0) - .documentsFailedEquals(0) - .documentsRetriedEquals(0) - .documentsOutOfScopeEquals(seedCount); - }) - .hasEntrySatisfying(crawls[2].getStatus().getId(), j -> { + .allSatisfy((id, j) -> { assertThat(j) .hasState(JobExecutionStatus.State.FINISHED) .hasStartTime(true) @@ -150,7 +148,7 @@ public void testUniqueSeedsWithSameIpInParallellJobs() throws Exception { assertThat(rethinkDbData) .crawlExecutionStatuses().hasSize(seedCount * numberOfJobs) - .hasEntrySatisfying(crawlExecutionId1, s -> { + .allSatisfy((id, s) -> { assertThat(s) .hasState(CrawlExecutionStatus.State.FINISHED) .hasStartTime(true) @@ -202,18 +200,7 @@ public void testUniqueSeedsInParallellJobs() throws Exception { .hasQueueTotalCount(0); assertThat(rethinkDbData) .jobExecutionStatuses().hasSize(numberOfJobs) - .hasEntrySatisfying(crawls[1].getStatus().getId(), j -> { - assertThat(j) - .hasState(JobExecutionStatus.State.FINISHED) - .hasStartTime(true) - .hasEndTime(true) - .documentsCrawledEquals(2 * seedCount) - .documentsDeniedEquals(0) - .documentsFailedEquals(0) - .documentsRetriedEquals(0) - .documentsOutOfScopeEquals(seedCount); - }) - .hasEntrySatisfying(crawls[2].getStatus().getId(), j -> { + .allSatisfy((id, j) -> { assertThat(j) .hasState(JobExecutionStatus.State.FINISHED) .hasStartTime(true) @@ -228,7 +215,7 @@ public void testUniqueSeedsInParallellJobs() throws Exception { assertThat(rethinkDbData) .crawlExecutionStatuses().hasSize(seedCount * numberOfJobs) - .hasEntrySatisfying(crawlExecutionId1, s -> { + .allSatisfy((id, s) -> { assertThat(s) .hasState(CrawlExecutionStatus.State.FINISHED) .hasStartTime(true) @@ -275,18 +262,7 @@ public void testUniqueSeedsWithSameIpInOneJob() throws Exception { .hasQueueTotalCount(0); assertThat(rethinkDbData) .jobExecutionStatuses().hasSize(numberOfJobs) - .hasEntrySatisfying(crawls[1].getStatus().getId(), j -> { - assertThat(j) - .hasState(JobExecutionStatus.State.FINISHED) - .hasStartTime(true) - .hasEndTime(true) - .documentsCrawledEquals(2 * seedCount) - .documentsDeniedEquals(0) - .documentsFailedEquals(0) - .documentsRetriedEquals(0) - .documentsOutOfScopeEquals(seedCount); - }) - .hasEntrySatisfying(crawls[2].getStatus().getId(), j -> { + .allSatisfy((id, j) -> { assertThat(j) .hasState(JobExecutionStatus.State.FINISHED) .hasStartTime(true) @@ -301,7 +277,7 @@ public void testUniqueSeedsWithSameIpInOneJob() throws Exception { assertThat(rethinkDbData) .crawlExecutionStatuses().hasSize(seedCount * numberOfJobs) - .hasEntrySatisfying(crawlExecutionId1, s -> { + .allSatisfy((id, s) -> { assertThat(s) .hasState(CrawlExecutionStatus.State.FINISHED) .hasStartTime(true) @@ -344,18 +320,7 @@ public void testSameSeedsInOneJob() throws Exception { .hasQueueTotalCount(0); assertThat(rethinkDbData) .jobExecutionStatuses().hasSize(numberOfJobs) - .hasEntrySatisfying(crawls[1].getStatus().getId(), j -> { - assertThat(j) - .hasState(JobExecutionStatus.State.FINISHED) - .hasStartTime(true) - .hasEndTime(true) - .documentsCrawledEquals(2 * seedCount) - .documentsDeniedEquals(0) - .documentsFailedEquals(0) - .documentsRetriedEquals(0) - .documentsOutOfScopeEquals(seedCount); - }) - .hasEntrySatisfying(crawls[2].getStatus().getId(), j -> { + .allSatisfy((id, j) -> { assertThat(j) .hasState(JobExecutionStatus.State.FINISHED) .hasStartTime(true) @@ -370,7 +335,7 @@ public void testSameSeedsInOneJob() throws Exception { assertThat(rethinkDbData) .crawlExecutionStatuses().hasSize(seedCount * numberOfJobs) - .hasEntrySatisfying(crawlExecutionId1, s -> { + .allSatisfy((id, s) -> { assertThat(s) .hasState(CrawlExecutionStatus.State.FINISHED) .hasStartTime(true) diff --git a/src/test/java/no/nb/nna/veidemann/frontier/db/script/CrawlHostGroupCodecTest.java b/src/test/java/no/nb/nna/veidemann/frontier/db/script/CrawlHostGroupCodecTest.java index 16e5ea7..bb79f4f 100644 --- a/src/test/java/no/nb/nna/veidemann/frontier/db/script/CrawlHostGroupCodecTest.java +++ b/src/test/java/no/nb/nna/veidemann/frontier/db/script/CrawlHostGroupCodecTest.java @@ -18,7 +18,7 @@ class CrawlHostGroupCodecTest { void encodeList() { CrawlHostGroup chg = CrawlHostGroup.newBuilder().build(); List result = CrawlHostGroupCodec.encodeList(chg); - List expectedResult = ImmutableList.of("mi", "0", "ma", "0", "df", "0.0", "mr", "0", "rd", "0", "qc", "0", "u", "", "st", "", "ts", "0"); + List expectedResult = ImmutableList.of("mi", "0", "ma", "0", "df", "0.0", "mr", "0", "rd", "0", "u", "", "st", "", "ts", "0"); assertThat(result).isEqualTo(expectedResult); chg = CrawlHostGroup.newBuilder() @@ -34,7 +34,7 @@ void encodeList() { .setFetchStartTimeStamp(Timestamps.fromMillis(5)) .build(); result = CrawlHostGroupCodec.encodeList(chg); - expectedResult = ImmutableList.of("mi", "1000", "ma", "3000", "df", "1.5", "mr", "3", "rd", "60", "qc", "45", "u", "uri1", "st", "token", "ts", "5"); + expectedResult = ImmutableList.of("mi", "1000", "ma", "3000", "df", "1.5", "mr", "3", "rd", "60", "u", "uri1", "st", "token", "ts", "5"); assertThat(result).isEqualTo(expectedResult); } @@ -81,7 +81,6 @@ void encodeMap() { .put("df", "0.0") .put("mr", "0") .put("rd", "0") - .put("qc", "0") .put("u", "") .put("st", "") .put("ts", "0") @@ -107,7 +106,6 @@ void encodeMap() { .put("df", "1.5") .put("mr", "3") .put("rd", "60") - .put("qc", "45") .put("u", "uri1") .put("st", "token") .put("ts", "5") diff --git a/src/test/java/no/nb/nna/veidemann/frontier/testutil/CrawlRunner.java b/src/test/java/no/nb/nna/veidemann/frontier/testutil/CrawlRunner.java index e20cff6..1b1287c 100644 --- a/src/test/java/no/nb/nna/veidemann/frontier/testutil/CrawlRunner.java +++ b/src/test/java/no/nb/nna/veidemann/frontier/testutil/CrawlRunner.java @@ -17,6 +17,7 @@ package no.nb.nna.veidemann.frontier.testutil; import com.google.common.util.concurrent.SettableFuture; +import com.rethinkdb.net.Cursor; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import no.nb.nna.veidemann.api.config.v1.ConfigObject; @@ -31,13 +32,20 @@ import no.nb.nna.veidemann.api.frontier.v1.JobExecutionStatus; import no.nb.nna.veidemann.api.frontier.v1.JobExecutionStatus.State; import no.nb.nna.veidemann.commons.db.ConfigAdapter; +import no.nb.nna.veidemann.commons.db.DbConnectionException; import no.nb.nna.veidemann.commons.db.DbException; +import no.nb.nna.veidemann.commons.db.DbQueryException; import no.nb.nna.veidemann.commons.db.DbService; import no.nb.nna.veidemann.commons.db.ExecutionsAdapter; import no.nb.nna.veidemann.commons.util.ApiTools; +import no.nb.nna.veidemann.db.RethinkDbConnection; +import no.nb.nna.veidemann.db.Tables; +import no.nb.nna.veidemann.db.initializer.RethinkDbInitializer; import no.nb.nna.veidemann.frontier.settings.Settings; +import org.assertj.core.description.Description; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import java.time.Duration; @@ -54,6 +62,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; +import static com.rethinkdb.RethinkDB.r; import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; @@ -62,6 +71,7 @@ public class CrawlRunner implements AutoCloseable { ConfigAdapter c = DbService.getInstance().getConfigAdapter(); ExecutionsAdapter e = DbService.getInstance().getExecutionsAdapter(); + RethinkDbConnection conn = ((RethinkDbInitializer) DbService.getInstance().getDbInitializer()).getDbConnection(); private final ManagedChannel frontierChannel; private final FrontierGrpc.FrontierBlockingStub frontierStub; private final RethinkDbData rethinkDbData; @@ -210,7 +220,7 @@ public Duration awaitCrawlFinished(long timeout, TimeUnit unit, RunningCrawl... List statuses = Arrays.stream(runningCrawls) .map(j -> { try { - j.jes = DbService.getInstance().getExecutionsAdapter().getJobExecutionStatus(j.jes.getId()); + j.jes = e.getJobExecutionStatus(j.jes.getId()); return j; } catch (DbException e) { throw new RuntimeException(e); @@ -232,11 +242,31 @@ public Duration awaitCrawlFinished(long timeout, TimeUnit unit, RunningCrawl... } }).collect(Collectors.toList()); - if (statuses.stream().allMatch(j -> State.RUNNING != j.jes.getState()) && rethinkDbData.getQueuedUris().isEmpty() && jedisPool.getResource().keys("*").size() <= 1) { + if (statuses.stream().allMatch(j -> State.RUNNING != j.jes.getState()) + && rethinkDbData.getQueuedUris().isEmpty() + && jedisPool.getResource().keys("*").size() <= 1) { return true; } if (statuses.stream().anyMatch(j -> State.RUNNING == j.jes.getState())) { - assertThat(emptyChgKeysCount).as("Crawl is not finished, but redis chg keys are missing").hasValueLessThan(3); + Description desc = new Description() { + @Override + public String value() { + StringBuilder sb = new StringBuilder(); + try (Jedis jedis = jedisPool.getResource()) { + sb.append(String.format("Crawl is not finished, but redis chg keys are missing.\nRemaining REDIS keys: %s\n Queue count total: %s", + jedis.keys("*"), + jedis.get("QCT"))); + Cursor c = conn.exec("db-getQueuedUris", r.table(Tables.URI_QUEUE.name)); + c.forEach(v -> sb.append("\nURi in RethinkDB queue: ").append(v)); + } catch (DbConnectionException dbConnectionException) { + dbConnectionException.printStackTrace(); + } catch (DbQueryException dbQueryException) { + dbQueryException.printStackTrace(); + } + return sb.toString(); + } + }; + assertThat(emptyChgKeysCount).as(desc).withFailMessage("").hasValueLessThan(3); } LOG.debug("Still running: {}", statuses.size()); return false; diff --git a/src/test/java/no/nb/nna/veidemann/frontier/worker/CrawlHostGroupTest.java b/src/test/java/no/nb/nna/veidemann/frontier/worker/CrawlHostGroupTest.java index fea8145..6d90868 100644 --- a/src/test/java/no/nb/nna/veidemann/frontier/worker/CrawlHostGroupTest.java +++ b/src/test/java/no/nb/nna/veidemann/frontier/worker/CrawlHostGroupTest.java @@ -108,7 +108,7 @@ public void testChgAddScript() throws Exception { ChgAddScript chgAddScript = new ChgAddScript(); - chgAddScript.run(ctx, chgId1, eId1, earliestFetchTimestamp1, false, 1000); + chgAddScript.run(ctx, chgId1, eId1, earliestFetchTimestamp1, 1000); assertThat(redisData) .hasQueueTotalCount(1) .crawlExecutionQueueCounts().hasNumberOfElements(1).hasQueueCount(eId1, 1); @@ -121,7 +121,7 @@ public void testChgAddScript() throws Exception { assertThat(redisData).readyQueue().hasNumberOfElements(0); - chgAddScript.run(ctx, chgId1, eId1, earliestFetchTimestamp2, false, 1000); + chgAddScript.run(ctx, chgId1, eId1, earliestFetchTimestamp2, 1000); assertThat(redisData) .hasQueueTotalCount(2) .crawlExecutionQueueCounts().hasNumberOfElements(1).hasQueueCount(eId1, 2); @@ -134,7 +134,7 @@ public void testChgAddScript() throws Exception { assertThat(redisData).readyQueue().hasNumberOfElements(0); - chgAddScript.run(ctx, chgId1, eId2, earliestFetchTimestamp2, false, 1000); + chgAddScript.run(ctx, chgId1, eId2, earliestFetchTimestamp2, 1000); assertThat(redisData) .hasQueueTotalCount(3) .crawlExecutionQueueCounts().hasNumberOfElements(2).hasQueueCount(eId1, 2).hasQueueCount(eId2, 1); @@ -147,7 +147,7 @@ public void testChgAddScript() throws Exception { assertThat(redisData).readyQueue().hasNumberOfElements(0); - chgAddScript.run(ctx, chgId2, eId2, earliestFetchTimestamp2, false, 1000); + chgAddScript.run(ctx, chgId2, eId2, earliestFetchTimestamp2, 1000); assertThat(redisData) .hasQueueTotalCount(4) .crawlExecutionQueueCounts().hasNumberOfElements(2).hasQueueCount(eId1, 2).hasQueueCount(eId2, 2); @@ -180,10 +180,10 @@ public void testChgDelayedQueueScript() throws Exception { ChgDelayedQueueScript chgDelayedQueueScript = new ChgDelayedQueueScript(); // Add some CrawlHostGroups - chgAddScript.run(ctx, chgId1, eId1, earliestFetchTimestamp1, false, 1000); - chgAddScript.run(ctx, chgId1, eId1, earliestFetchTimestamp2, false, 1000); - chgAddScript.run(ctx, chgId1, eId2, earliestFetchTimestamp2, false, 1000); - chgAddScript.run(ctx, chgId2, eId2, earliestFetchTimestamp2, false, 1000); + chgAddScript.run(ctx, chgId1, eId1, earliestFetchTimestamp1, 1000); + chgAddScript.run(ctx, chgId1, eId1, earliestFetchTimestamp2, 1000); + chgAddScript.run(ctx, chgId1, eId2, earliestFetchTimestamp2, 1000); + chgAddScript.run(ctx, chgId2, eId2, earliestFetchTimestamp2, 1000); // Check expected state assertThat(redisData) @@ -255,11 +255,11 @@ public void testChgNextScript() throws Exception { ChgReleaseScript chgReleaseScript = new ChgReleaseScript(); // Add some CrawlHostGroups and move to ready - chgAddScript.run(ctx, chgId1, eId1, earliestFetchTimestamp1, false, 1000); - chgAddScript.run(ctx, chgId1, eId1, earliestFetchTimestamp1, false, 1000); - chgAddScript.run(ctx, chgId1, eId2, earliestFetchTimestamp2, false, 1000); - chgAddScript.run(ctx, chgId2, eId3, earliestFetchTimestamp1, false, 1000); - chgAddScript.run(ctx, chgId3, eId4, earliestFetchTimestamp2, false, 1000); + chgAddScript.run(ctx, chgId1, eId1, earliestFetchTimestamp1, 1000); + chgAddScript.run(ctx, chgId1, eId1, earliestFetchTimestamp1, 1000); + chgAddScript.run(ctx, chgId1, eId2, earliestFetchTimestamp2, 1000); + chgAddScript.run(ctx, chgId2, eId3, earliestFetchTimestamp1, 1000); + chgAddScript.run(ctx, chgId3, eId4, earliestFetchTimestamp2, 1000); Long moved = chgDelayedQueueScript.run(ctx, CHG_WAIT_KEY, CHG_READY_KEY); // Check expected state @@ -368,7 +368,7 @@ public void testChgNextScript() throws Exception { assertThat(redisData).readyQueue().hasNumberOfElements(0); assertThat(redisData).sessionTokens().hasNumberOfElements(1).hasCrawlHostId("sess1", chgId1); - CrawlHostGroup result = chgGetScript.run(ctx,chgId1); + CrawlHostGroup result = chgGetScript.run(ctx, chgId1); assertThat(result).isEqualTo(chg1); chgReleaseScript.run(ctx, chgId1, "sess1", 1999); diff --git a/src/test/resources/log4j2.xml b/src/test/resources/log4j2.xml index cc1078c..8ff10c2 100644 --- a/src/test/resources/log4j2.xml +++ b/src/test/resources/log4j2.xml @@ -1,8 +1,16 @@ + + - + @@ -12,6 +20,9 @@ + + + From b0ab46fcc3fa095569f7eb981eb4e6db95786be8 Mon Sep 17 00:00:00 2001 From: John Erik Halse Date: Wed, 2 Jun 2021 16:52:09 +0200 Subject: [PATCH 3/4] Cleaned up scripts and enhanced logging --- .../frontier/db/CrawlQueueManager.java | 8 +++-- .../frontier/db/script/ChgAddScript.java | 19 ++++------ .../frontier/db/script/ChgGetScript.java | 1 + .../frontier/db/script/ChgNextScript.java | 2 ++ .../frontier/db/script/ChgReleaseScript.java | 13 +++---- .../frontier/db/script/LuaScript.java | 17 +++++++-- .../frontier/worker/PostFetchHandler.java | 2 +- .../frontier/worker/Preconditions.java | 2 +- .../frontier/worker/QueuedUriWrapper.java | 2 +- src/main/resources/lua/chg_add.lua | 36 ++++--------------- src/main/resources/lua/chg_release.lua | 8 ++--- 11 files changed, 51 insertions(+), 59 deletions(-) diff --git a/src/main/java/no/nb/nna/veidemann/frontier/db/CrawlQueueManager.java b/src/main/java/no/nb/nna/veidemann/frontier/db/CrawlQueueManager.java index cc280f9..609b465 100644 --- a/src/main/java/no/nb/nna/veidemann/frontier/db/CrawlQueueManager.java +++ b/src/main/java/no/nb/nna/veidemann/frontier/db/CrawlQueueManager.java @@ -45,6 +45,7 @@ import no.nb.nna.veidemann.frontier.worker.QueuedUriWrapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.slf4j.MDC; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.ScanParams; @@ -129,7 +130,10 @@ public CrawlQueueManager(Frontier frontier, RethinkDbConnection conn, JedisPool () -> getPrefetchHandler(), p -> releaseCrawlHostGroup(p.getQueuedUri().getCrawlHostGroupId(), "", RESCHEDULE_DELAY)); } - public QueuedUri addToCrawlHostGroup(QueuedUri qUri, boolean attemptToSetAsBusy) throws DbException { + public QueuedUri addToCrawlHostGroup(QueuedUri qUri) throws DbException { + MDC.put("eid", qUri.getExecutionId()); + MDC.put("uri", qUri.getUri()); + Objects.requireNonNull(qUri.getCrawlHostGroupId(), "CrawlHostGroupId cannot be null"); Objects.requireNonNull(qUri.getPolitenessRef().getId(), "PolitenessId cannot be null"); if (qUri.getSequence() <= 0L) { @@ -161,7 +165,7 @@ public QueuedUri addToCrawlHostGroup(QueuedUri qUri, boolean attemptToSetAsBusy) try (JedisContext ctx = JedisContext.forPool(jedisPool)) { uriAddScript.run(ctx, qUri); chgAddScript.run(ctx, qUri.getCrawlHostGroupId(), qUri.getExecutionId(), qUri.getEarliestFetchTimeStamp(), - attemptToSetAsBusy, frontier.getSettings().getBusyTimeout().toMillis()); + frontier.getSettings().getBusyTimeout().toMillis()); } return qUri; } catch (Exception e) { diff --git a/src/main/java/no/nb/nna/veidemann/frontier/db/script/ChgAddScript.java b/src/main/java/no/nb/nna/veidemann/frontier/db/script/ChgAddScript.java index d6c0789..57901db 100644 --- a/src/main/java/no/nb/nna/veidemann/frontier/db/script/ChgAddScript.java +++ b/src/main/java/no/nb/nna/veidemann/frontier/db/script/ChgAddScript.java @@ -8,7 +8,7 @@ import static no.nb.nna.veidemann.frontier.db.CrawlQueueManager.*; -public class ChgAddScript extends RedisJob { +public class ChgAddScript extends RedisJob { final LuaScript chgAddScript; public ChgAddScript() { @@ -20,25 +20,20 @@ public ChgAddScript() { * Add uri to queue. * * @param ctx - * @param attemptToSetAsBusy if true, try to set uri's chg to busy - * @param busyTimeout if it is possible to set chg as busy, this is the timeout - * @return true if chg was set to busy, false otherwise + * @param busyTimeout if it is possible to set chg as busy, this is the timeout + * @return number of uris in queue for this CrawlHostGroup */ - public boolean run(JedisContext ctx, String chgId, String crawlExecutionId, Timestamp earliestFetchTimestamp, boolean attemptToSetAsBusy, long busyTimeout) { + public long run(JedisContext ctx, String chgId, String crawlExecutionId, Timestamp earliestFetchTimestamp, long busyTimeout) { return execute(ctx, jedis -> { String chgKey = CHG_PREFIX + chgId; // Handle CHG and counters long readyTime = Timestamps.toMillis(earliestFetchTimestamp); String readyTimeString = Long.toString(readyTime); - String busyExpireTime = String.valueOf(System.currentTimeMillis() + busyTimeout); - List chgKeys = ImmutableList.of(chgKey, CHG_WAIT_KEY, CHG_BUSY_KEY, CHG_READY_KEY, - CRAWL_EXECUTION_ID_COUNT_KEY, QUEUE_COUNT_TOTAL_KEY); - List chgArgs = ImmutableList.of(readyTimeString, crawlExecutionId, chgId, String.valueOf(attemptToSetAsBusy), busyExpireTime); - String queueCount = (String) chgAddScript.runString(jedis, chgKeys, chgArgs); - - return queueCount.equals("true"); + List chgKeys = ImmutableList.of(chgKey, CHG_WAIT_KEY, CRAWL_EXECUTION_ID_COUNT_KEY, QUEUE_COUNT_TOTAL_KEY); + List chgArgs = ImmutableList.of(readyTimeString, crawlExecutionId, chgId); + return (Long) chgAddScript.runString(jedis, chgKeys, chgArgs); }); } } diff --git a/src/main/java/no/nb/nna/veidemann/frontier/db/script/ChgGetScript.java b/src/main/java/no/nb/nna/veidemann/frontier/db/script/ChgGetScript.java index b9da8bd..ad362f7 100644 --- a/src/main/java/no/nb/nna/veidemann/frontier/db/script/ChgGetScript.java +++ b/src/main/java/no/nb/nna/veidemann/frontier/db/script/ChgGetScript.java @@ -17,6 +17,7 @@ public ChgGetScript() { public CrawlHostGroup run(JedisContext ctx, String crawlHostGroupId) { return execute(ctx, jedis -> { Map encoded = jedis.hgetAll(CrawlQueueManager.CHG_PREFIX + crawlHostGroupId); + LOG.trace("HGETALL {}, RESULT: {}", CrawlQueueManager.CHG_PREFIX + crawlHostGroupId, encoded); return CrawlHostGroupCodec.decode(crawlHostGroupId, encoded); }); } diff --git a/src/main/java/no/nb/nna/veidemann/frontier/db/script/ChgNextScript.java b/src/main/java/no/nb/nna/veidemann/frontier/db/script/ChgNextScript.java index d96d4f7..6e6a3f1 100644 --- a/src/main/java/no/nb/nna/veidemann/frontier/db/script/ChgNextScript.java +++ b/src/main/java/no/nb/nna/veidemann/frontier/db/script/ChgNextScript.java @@ -42,6 +42,8 @@ public CrawlHostGroup run(JedisContext ctx, long busyTimeout) { } } return null; + } else { + LOG.trace("BLPOP {} {}, RESULT: {}", waitForReadyTimeout, CHG_READY_KEY, res); } String chgId = res.get(1); String chgKey = CHG_PREFIX + chgId; diff --git a/src/main/java/no/nb/nna/veidemann/frontier/db/script/ChgReleaseScript.java b/src/main/java/no/nb/nna/veidemann/frontier/db/script/ChgReleaseScript.java index 1a574b7..4d22eeb 100644 --- a/src/main/java/no/nb/nna/veidemann/frontier/db/script/ChgReleaseScript.java +++ b/src/main/java/no/nb/nna/veidemann/frontier/db/script/ChgReleaseScript.java @@ -10,7 +10,7 @@ import static no.nb.nna.veidemann.frontier.db.CrawlQueueManager.*; -public class ChgReleaseScript extends RedisJob { +public class ChgReleaseScript extends RedisJob { private static final Logger LOG = LoggerFactory.getLogger(ChgReleaseScript.class); final LuaScript chgRealeaseScript; @@ -19,8 +19,8 @@ public ChgReleaseScript() { chgRealeaseScript = new LuaScript("chg_release.lua"); } - public void run(JedisContext ctx, String crawlHostGroupId, String sessionToken, long nextFetchDelayMs) { - if (nextFetchDelayMs <= 0) { + public Long run(JedisContext ctx, String crawlHostGroupId, String sessionToken, long nextFetchDelayMs) { + if (nextFetchDelayMs < 10) { nextFetchDelayMs = 10; } String chgKey = CrawlQueueManager.CHG_PREFIX + crawlHostGroupId; @@ -30,13 +30,14 @@ public void run(JedisContext ctx, String crawlHostGroupId, String sessionToken, List keys = ImmutableList.of(CHG_BUSY_KEY, CHG_WAIT_KEY, chgKey, SESSION_TO_CHG_KEY); List args = ImmutableList.of(String.valueOf(waitTime), crawlHostGroupId, sessionToken); - execute(ctx, jedis -> { + return execute(ctx, jedis -> { try { - chgRealeaseScript.runString(jedis, keys, args); + String result = (String) chgRealeaseScript.runString(jedis, keys, args); + return Long.parseLong(result); } catch (JedisDataException e) { LOG.warn(e.getMessage()); + return 0L; } - return null; }); } } diff --git a/src/main/java/no/nb/nna/veidemann/frontier/db/script/LuaScript.java b/src/main/java/no/nb/nna/veidemann/frontier/db/script/LuaScript.java index 945e37e..e0ebe5b 100644 --- a/src/main/java/no/nb/nna/veidemann/frontier/db/script/LuaScript.java +++ b/src/main/java/no/nb/nna/veidemann/frontier/db/script/LuaScript.java @@ -1,5 +1,9 @@ package no.nb.nna.veidemann.frontier.db.script; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.Marker; +import org.slf4j.MarkerFactory; import redis.clients.jedis.Jedis; import redis.clients.jedis.exceptions.JedisNoScriptException; @@ -10,11 +14,15 @@ import java.util.stream.Collectors; public class LuaScript { + private static final Logger LOG = LoggerFactory.getLogger(LuaScript.class); + + private final Marker scriptNameMarker; String scriptName; String sha; String script; public LuaScript(String scriptName) { + scriptNameMarker = MarkerFactory.getMarker(scriptName); this.scriptName = scriptName; InputStream in = LuaScript.class.getClassLoader().getResourceAsStream("lua/" + scriptName); script = new BufferedReader(new InputStreamReader(in)).lines().collect(Collectors.joining("\n")); @@ -25,7 +33,9 @@ Object runString(Jedis jedis, List keys, List args) { sha = jedis.scriptLoad(script); } try { - return jedis.evalsha(sha, keys, args); + Object result = jedis.evalsha(sha, keys, args); + LOG.trace(scriptNameMarker, "{}: KEYS: {}, ARGS: {}, RESULT: {}", scriptName, keys, args, result); + return result; } catch (JedisNoScriptException ex) { sha = null; return runString(jedis, keys, args); @@ -33,11 +43,14 @@ Object runString(Jedis jedis, List keys, List args) { } Object runBytes(Jedis jedis, List keys, List args) { + LOG.trace(scriptNameMarker, "{}: KEYS: {}, ARGS: {}", scriptName, keys, args); if (sha == null) { sha = jedis.scriptLoad(script); } try { - return jedis.evalsha(sha.getBytes(), keys, args); + Object result = jedis.evalsha(sha.getBytes(), keys, args); + LOG.trace(scriptNameMarker, "{}: KEYS: {}, ARGS: {}, RESULT: {}", scriptName, keys, args, result); + return result; } catch (JedisNoScriptException ex) { sha = null; return runBytes(jedis, keys, args); diff --git a/src/main/java/no/nb/nna/veidemann/frontier/worker/PostFetchHandler.java b/src/main/java/no/nb/nna/veidemann/frontier/worker/PostFetchHandler.java index add47aa..2c4c2fe 100644 --- a/src/main/java/no/nb/nna/veidemann/frontier/worker/PostFetchHandler.java +++ b/src/main/java/no/nb/nna/veidemann/frontier/worker/PostFetchHandler.java @@ -181,7 +181,7 @@ public void postFetchFinally() { forEach(span, frontier.getPostFetchThreadPool(), outlinkQueue, outlink -> { try { OutlinkHandler.processOutlink(frontier, status, qUri, outlink, scriptParameters, scopeScriptRef); - } catch (DbException e) { + } catch (DbException | IllegalStateException e) { // An error here indicates problems with DB communication. No idea how to handle that yet. LOG.error("Error processing outlink: {}", e.toString(), e); } catch (Throwable e) { diff --git a/src/main/java/no/nb/nna/veidemann/frontier/worker/Preconditions.java b/src/main/java/no/nb/nna/veidemann/frontier/worker/Preconditions.java index 2d01dc1..1dd51f7 100644 --- a/src/main/java/no/nb/nna/veidemann/frontier/worker/Preconditions.java +++ b/src/main/java/no/nb/nna/veidemann/frontier/worker/Preconditions.java @@ -219,7 +219,7 @@ public void accept(Boolean isAllowed) { try { if (isAllowed) { if (changedCrawlHostGroup) { - frontier.getCrawlQueueManager().addToCrawlHostGroup(qUri.getQueuedUri(), false); + frontier.getCrawlQueueManager().addToCrawlHostGroup(qUri.getQueuedUri()); future.set(PreconditionState.RETRY); } else { future.set(PreconditionState.OK); diff --git a/src/main/java/no/nb/nna/veidemann/frontier/worker/QueuedUriWrapper.java b/src/main/java/no/nb/nna/veidemann/frontier/worker/QueuedUriWrapper.java index f3b80d0..9ed05de 100644 --- a/src/main/java/no/nb/nna/veidemann/frontier/worker/QueuedUriWrapper.java +++ b/src/main/java/no/nb/nna/veidemann/frontier/worker/QueuedUriWrapper.java @@ -293,7 +293,7 @@ public boolean forceAddUriToQueue(StatusWrapper status) throws DbException { wrapped.clearAnnotation(); QueuedUri q = wrapped.build(); - q = frontier.getCrawlQueueManager().addToCrawlHostGroup(q, false); + q = frontier.getCrawlQueueManager().addToCrawlHostGroup(q); wrapped = q.toBuilder(); oldEarliestFetchTimestamp = null; diff --git a/src/main/resources/lua/chg_add.lua b/src/main/resources/lua/chg_add.lua index 1e45222..f27e999 100644 --- a/src/main/resources/lua/chg_add.lua +++ b/src/main/resources/lua/chg_add.lua @@ -5,16 +5,12 @@ local chgKey = KEYS[1] local waitKey = KEYS[2] -local busyKey = KEYS[3] -local readyKey = KEYS[4] -local crawlExecutionIdCountKey = KEYS[5] -local queueCountTotalKey = KEYS[6] +local crawlExecutionIdCountKey = KEYS[3] +local queueCountTotalKey = KEYS[4] local nextReadyTime = ARGV[1] local crawlExecutionId = ARGV[2] local chgId = ARGV[3] -local attemptToSetAsBusy = ARGV[4] -local busyExpireTime = ARGV[5] local chgExists = redis.call('EXISTS', chgKey) @@ -27,29 +23,9 @@ redis.call('HINCRBY', crawlExecutionIdCountKey, crawlExecutionId, 1) --- Increment total queue count redis.call('INCR', queueCountTotalKey) -if attemptToSetAsBusy == 'true' then - -- Caller want chg to be busy immediately - - -- If a new chg was created, it is safe to add as busy - if queueCount == 1 then - redis.call('ZADD', busyKey, busyExpireTime, chgId) - return 'true' - end - - -- Check if chg is ready and can be added to busy - local isReady = redis.call('ZRANK', readyKey, chgId) - if isReady ~= 'nil' then - -- Remove from ready - redis.call('LREM', readyKey, 0, chgId) - -- Add chg to busy and return queue count - redis.call('ZADD', busyKey, busyExpireTime, chgId) - return 'true' - end -else - if chgExists == 0 then - -- If new chg was created, queue it. - redis.call('ZADD', waitKey, nextReadyTime, chgId) - end +if chgExists == 0 then + -- If new chg was created, queue it. + redis.call('ZADD', waitKey, nextReadyTime, chgId) end -return 'false' +return queueCount diff --git a/src/main/resources/lua/chg_release.lua b/src/main/resources/lua/chg_release.lua index 18f9b5b..c8829ae 100644 --- a/src/main/resources/lua/chg_release.lua +++ b/src/main/resources/lua/chg_release.lua @@ -22,6 +22,8 @@ if storedSessionToken ~= sessionToken then error("Trying to release chg '" .. chgId .. "' with sessionToken '" .. sessionToken .. "', but chg's sessionToken was '" .. storedSessionToken .. "'") end +local queueCount = redis.call('HGET', chgKey, "qc") + -- Remove chg from busyKey if redis.call('ZREM', busyKey, chgId) == 1 then -- Remove session token @@ -31,13 +33,9 @@ if redis.call('ZREM', busyKey, chgId) == 1 then end -- Check queue count from chgKey. - local queueCount = redis.call('HGET', chgKey, "qc") if (not queueCount) or (tonumber(queueCount) <= 0) then -- If queue count is zero there is no need for chg, remove it. redis.call('DEL', chgKey) - --redis.call('LREM', readyKey, 0, chgId) - --redis.call('ZREM', busyKey, chgId) - --redis.call('ZREM', waitKey, chgId) else -- Otherwise add chg to waitKey. redis.call('ZADD', waitKey, waitTime, chgId) @@ -45,3 +43,5 @@ if redis.call('ZREM', busyKey, chgId) == 1 then redis.call('HDEL', chgKey, "df", "rd", "mi", "ma", "mr", "u", "st", "ts") end end + +return queueCount \ No newline at end of file From 78a30f61dd36e3cf8bffa28c00db2a055e8fcd48 Mon Sep 17 00:00:00 2001 From: John Erik Halse Date: Wed, 2 Jun 2021 16:53:33 +0200 Subject: [PATCH 4/4] Fix for concurrency problem which left some executions in created state --- .../veidemann/frontier/db/script/CrawlHostGroupCodec.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/main/java/no/nb/nna/veidemann/frontier/db/script/CrawlHostGroupCodec.java b/src/main/java/no/nb/nna/veidemann/frontier/db/script/CrawlHostGroupCodec.java index 4aa7f8e..c4db847 100644 --- a/src/main/java/no/nb/nna/veidemann/frontier/db/script/CrawlHostGroupCodec.java +++ b/src/main/java/no/nb/nna/veidemann/frontier/db/script/CrawlHostGroupCodec.java @@ -59,8 +59,9 @@ public static List encodeList(CrawlHostGroup chg) { encoded.add(RETRY_DELAY_SECONDS); encoded.add(String.valueOf(chg.getRetryDelaySeconds())); - encoded.add(QUEUED_URI_COUNT); - encoded.add(String.valueOf(chg.getQueuedUriCount())); + // Do not encode QUEUED_URI_COUNT (qc) since that value is manipulated with Redis HINCR + // encoded.add(QUEUED_URI_COUNT); + // encoded.add(String.valueOf(chg.getQueuedUriCount())); encoded.add(CURRENT_URI_ID); encoded.add(chg.getCurrentUriId()); @@ -82,7 +83,8 @@ public static Map encodeMap(CrawlHostGroup chg) { encoded.put(DELAY_FACTOR, String.valueOf(chg.getDelayFactor())); encoded.put(MAX_RETRIES, String.valueOf(chg.getMaxRetries())); encoded.put(RETRY_DELAY_SECONDS, String.valueOf(chg.getRetryDelaySeconds())); - encoded.put(QUEUED_URI_COUNT, String.valueOf(chg.getQueuedUriCount())); + // Do not encode QUEUED_URI_COUNT (qc) since that value is manipulated with Redis HINCR + // encoded.put(QUEUED_URI_COUNT, String.valueOf(chg.getQueuedUriCount())); encoded.put(CURRENT_URI_ID, chg.getCurrentUriId()); encoded.put(SESSION_TOKEN, chg.getSessionToken()); encoded.put(FETCH_START_TIME_STAMP, Long.toString(Timestamps.toMillis(chg.getFetchStartTimeStamp())));