Skip to content

Commit

Permalink
Merge pull request #5 from nlnwa/Optimization
Browse files Browse the repository at this point in the history
Reduced number of db calls
  • Loading branch information
johnerikhalse authored Apr 1, 2019
2 parents e7266f4 + b84c7ef commit 60733d3
Show file tree
Hide file tree
Showing 8 changed files with 305 additions and 106 deletions.
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@

<docker.java.image>openjdk:8-jdk-alpine</docker.java.image>

<veidemann.commons.version>0.3.8</veidemann.commons.version>
<veidemann.rethinkdbadapter.version>0.3.14</veidemann.rethinkdbadapter.version>
<veidemann.commons.version>0.3.10</veidemann.commons.version>
<veidemann.rethinkdbadapter.version>0.3.15</veidemann.rethinkdbadapter.version>
<veidemann.harvester.version>0.3.7</veidemann.harvester.version>

<log4j.version>2.7</log4j.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,8 @@
import no.nb.nna.veidemann.api.frontier.v1.PageHarvestSpec;
import no.nb.nna.veidemann.api.frontier.v1.QueuedUri;
import no.nb.nna.veidemann.commons.ExtraStatusCodes;
import no.nb.nna.veidemann.commons.db.ConfigAdapter;
import no.nb.nna.veidemann.commons.db.DbException;
import no.nb.nna.veidemann.commons.db.DbService;
import no.nb.nna.veidemann.commons.db.DistributedLock;
import no.nb.nna.veidemann.commons.db.DistributedLock.Key;
import no.nb.nna.veidemann.frontier.worker.Preconditions.PreconditionState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -50,21 +47,23 @@ public class CrawlExecution {

private static final Logger LOG = LoggerFactory.getLogger(CrawlExecution.class);

private final StatusWrapper status;
final StatusWrapper status;

private final Frontier frontier;
final Frontier frontier;

private final ConfigObject crawlConfig;
final ConfigObject crawlConfig;

private final ConfigObject politenessConfig;
final ConfigObject politenessConfig;

private final ConfigObject collectionConfig;
final ConfigObject collectionConfig;

private final CrawlLimitsConfig limits;
final CrawlLimitsConfig limits;

private final QueuedUriWrapper qUri;
final QueuedUriWrapper qUri;

private final CrawlHostGroup crawlHostGroup;
final CrawlHostGroup crawlHostGroup;

private final OutlinkHandler outlinkHandler;

private long delayMs = 0L;

Expand All @@ -78,10 +77,9 @@ public class CrawlExecution {

public CrawlExecution(QueuedUri queuedUri, CrawlHostGroup crawlHostGroup, Frontier frontier) throws DbException {
this.status = StatusWrapper.getStatusWrapper(queuedUri.getExecutionId());
ConfigAdapter configAdapter = DbService.getInstance().getConfigAdapter();
ConfigObject job = configAdapter.getConfigObject(status.getJobId());
this.crawlConfig = configAdapter.getConfigObject(job.getCrawlJob().getCrawlConfigRef());
this.collectionConfig = configAdapter.getConfigObject(crawlConfig.getCrawlConfig().getCollectionRef());
ConfigObject job = frontier.getConfig(status.getJobId());
this.crawlConfig = frontier.getConfig(job.getCrawlJob().getCrawlConfigRef());
this.collectionConfig = frontier.getConfig(crawlConfig.getCrawlConfig().getCollectionRef());
try {
this.qUri = QueuedUriWrapper.getQueuedUriWrapper(queuedUri, collectionConfig.getMeta().getName()).clearError();
} catch (URISyntaxException ex) {
Expand All @@ -91,8 +89,9 @@ public CrawlExecution(QueuedUri queuedUri, CrawlHostGroup crawlHostGroup, Fronti

this.crawlHostGroup = crawlHostGroup;
this.frontier = frontier;
this.politenessConfig = configAdapter.getConfigObject(crawlConfig.getCrawlConfig().getPolitenessRef());
this.politenessConfig = frontier.getConfig(crawlConfig.getCrawlConfig().getPolitenessRef());
this.limits = job.getCrawlJob().getLimits();
this.outlinkHandler = new OutlinkHandler(this);
}

public String getId() {
Expand Down Expand Up @@ -151,7 +150,7 @@ public PageHarvestSpec preFetch() {
}
}

LOG.info("Fetching " + qUri.getUri());
LOG.debug("Fetching " + qUri.getUri());
fetchStart = System.currentTimeMillis();

return PageHarvestSpec.newBuilder()
Expand Down Expand Up @@ -237,6 +236,8 @@ public void postFetchFinally() {
MDC.put("uri", qUri.getUri());

try {
outlinkHandler.finish();

status.removeCurrentUri(qUri).saveStatus();
long fetchEnd = System.currentTimeMillis();
fetchTimeMs = fetchEnd - fetchStart;
Expand Down Expand Up @@ -284,57 +285,7 @@ public void postFetchFinally() {
}

public void queueOutlink(QueuedUri outlink) throws DbException {
try {
QueuedUriWrapper outUri = QueuedUriWrapper.getQueuedUriWrapper(qUri, outlink, collectionConfig.getMeta().getName());

DistributedLock lock = DbService.getInstance()
.createDistributedLock(new Key("quri", outUri.getSurt()), 10);
lock.lock();
try {
if (shouldInclude(outUri)) {
outUri.setSequence(outUri.getDiscoveryPath().length());

PreconditionState check = Preconditions.checkPreconditions(frontier, crawlConfig, status, outUri);
switch (check) {
case OK:
LOG.debug("Found new URI: {}, queueing.", outUri.getSurt());
outUri.setPriorityWeight(this.crawlConfig.getCrawlConfig().getPriorityWeight());
outUri.addUriToQueue();
break;
case RETRY:
LOG.debug("Failed preconditions for: {}, queueing for retry.", outUri.getSurt());
outUri.setPriorityWeight(this.crawlConfig.getCrawlConfig().getPriorityWeight());
outUri.addUriToQueue();
break;
case FAIL:
case DENIED:
break;
}
}
} finally {
lock.unlock();
}
} catch (URISyntaxException ex) {
status.incrementDocumentsFailed();
LOG.info("Illegal URI {}", ex);
}
}

boolean shouldInclude(QueuedUriWrapper outlink) throws DbException {
if (!LimitsCheck.isQueueable(limits, status, outlink)) {
return false;
}

if (!frontier.getScopeChecker().isInScope(status, outlink)) {
return false;
}

if (DbService.getInstance().getCrawlQueueAdapter().uriNotIncludedInQueue(outlink.getQueuedUri(), status.getStartTime())) {
return true;
}

LOG.debug("Found already included URI: {}, skipping.", outlink.getSurt());
return false;
outlinkHandler.queueOutlink(outlink);
}

private void calculateDelay() {
Expand Down
33 changes: 29 additions & 4 deletions src/main/java/no/nb/nna/veidemann/frontier/worker/Frontier.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@
*/
package no.nb.nna.veidemann.frontier.worker;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import no.nb.nna.veidemann.api.config.v1.ConfigObject;
import no.nb.nna.veidemann.api.config.v1.ConfigRef;
import no.nb.nna.veidemann.api.frontier.v1.CrawlExecutionStatus;
import no.nb.nna.veidemann.api.frontier.v1.CrawlSeedRequest;
import no.nb.nna.veidemann.commons.ExtraStatusCodes;
Expand All @@ -25,11 +29,14 @@
import no.nb.nna.veidemann.commons.db.CrawlQueueFetcher;
import no.nb.nna.veidemann.commons.db.CrawlableUri;
import no.nb.nna.veidemann.commons.db.DbException;
import no.nb.nna.veidemann.commons.db.DbQueryException;
import no.nb.nna.veidemann.commons.db.DbService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.URISyntaxException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

/**
*
Expand All @@ -46,11 +53,23 @@ public class Frontier implements AutoCloseable {

private final CrawlQueueFetcher crawlQueueFetcher;

private final LoadingCache<ConfigRef, ConfigObject> configCache;

public Frontier(RobotsServiceClient robotsServiceClient, DnsServiceClient dnsServiceClient, OutOfScopeHandlerClient outOfScopeHandlerClient) {
this.robotsServiceClient = robotsServiceClient;
this.dnsServiceClient = dnsServiceClient;
this.scopeChecker = new ScopeCheck(outOfScopeHandlerClient);
this.crawlQueueFetcher = DbService.getInstance().getCrawlQueueAdapter().getCrawlQueueFetcher();

configCache = CacheBuilder.newBuilder()
.expireAfterWrite(5, TimeUnit.MINUTES)
.build(
new CacheLoader<ConfigRef, ConfigObject>() {
public ConfigObject load(ConfigRef key) throws DbException {
return DbService.getInstance().getConfigAdapter()
.getConfigObject(key);
}
});
}

public CrawlExecutionStatus scheduleSeed(CrawlSeedRequest request) throws DbException {
Expand All @@ -67,10 +86,8 @@ public CrawlExecutionStatus scheduleSeed(CrawlSeedRequest request) throws DbExce
String uri = request.getSeed().getMeta().getName();

try {
ConfigObject crawlConfig = DbService.getInstance().getConfigAdapter()
.getConfigObject(request.getJob().getCrawlJob().getCrawlConfigRef());
ConfigObject collectionConfig = DbService.getInstance().getConfigAdapter()
.getConfigObject(crawlConfig.getCrawlConfig().getCollectionRef());
ConfigObject crawlConfig = getConfig(request.getJob().getCrawlJob().getCrawlConfigRef());
ConfigObject collectionConfig = getConfig(crawlConfig.getCrawlConfig().getCollectionRef());
QueuedUriWrapper qUri = QueuedUriWrapper.getQueuedUriWrapper(uri, request.getJobExecutionId(),
status.getId(), crawlConfig.getCrawlConfig().getPolitenessRef(), collectionConfig.getMeta().getName());
qUri.setPriorityWeight(crawlConfig.getCrawlConfig().getPriorityWeight());
Expand Down Expand Up @@ -115,6 +132,14 @@ public ScopeCheck getScopeChecker() {
return scopeChecker;
}

public ConfigObject getConfig(ConfigRef ref) throws DbQueryException {
try {
return configCache.get(ref);
} catch (ExecutionException e) {
throw new DbQueryException(e);
}
}

@Override
public void close() {
}
Expand Down
156 changes: 156 additions & 0 deletions src/main/java/no/nb/nna/veidemann/frontier/worker/OutlinkHandler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
/*
* Copyright 2019 National Library of Norway.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package no.nb.nna.veidemann.frontier.worker;

import com.google.common.collect.Multimap;
import com.google.common.collect.MultimapBuilder;
import no.nb.nna.veidemann.api.frontier.v1.QueuedUri;
import no.nb.nna.veidemann.commons.db.DbException;
import no.nb.nna.veidemann.commons.db.DbService;
import no.nb.nna.veidemann.commons.db.DistributedLock;
import no.nb.nna.veidemann.commons.db.DistributedLock.Key;
import no.nb.nna.veidemann.frontier.worker.Preconditions.PreconditionState;
import org.netpreserve.commons.uri.UriFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.URISyntaxException;
import java.util.Collection;
import java.util.Set;

public class OutlinkHandler {

private static final Logger LOG = LoggerFactory.getLogger(OutlinkHandler.class);

public static final UriFormat SURT_HOST_FORMAT = new UriFormat()
.surtEncoding(true)
.ignoreScheme(true)
.ignoreUser(true)
.ignorePassword(true)
.ignorePath(true)
.ignoreQuery(true)
.ignoreFragment(true)
.decodeHost(true);

private final CrawlExecution crawlExecution;

Multimap<String, QueuedUriWrapper> queuedUriMap = MultimapBuilder.hashKeys().hashSetValues().build();

public OutlinkHandler(CrawlExecution crawlExecution) {
this.crawlExecution = crawlExecution;
}

public void queueOutlink(QueuedUri outlink) throws DbException {
try {
QueuedUriWrapper outUri = QueuedUriWrapper.getQueuedUriWrapper(crawlExecution.qUri, outlink,
crawlExecution.collectionConfig.getMeta().getName());
if (shouldInclude(outUri)) {
queuedUriMap.put(outUri.getParsedSurt().toCustomString(SURT_HOST_FORMAT), outUri);
}
} catch (URISyntaxException ex) {
crawlExecution.status.incrementDocumentsFailed();
LOG.info("Illegal URI {}", ex);
}
}

public void finish() throws DbException {
processOutlinks();
}

private void processOutlinks() throws DbException {
while (!queuedUriMap.isEmpty()) {
Set<String> keySet = queuedUriMap.keySet();
if (keySet.size() == 1) {
String key = keySet.iterator().next();
DistributedLock lock = DbService.getInstance()
.createDistributedLock(new Key("quri", key), 10);
lock.lock();
try {
processOutlinks(queuedUriMap.get(key));
queuedUriMap.removeAll(key);
} finally {
lock.unlock();
}

} else {
for (String key : keySet.toArray(new String[0])) {
Collection<QueuedUriWrapper> v = queuedUriMap.get(key);
DistributedLock lock = DbService.getInstance()
.createDistributedLock(new Key("quri", key), 10);
if (lock.tryLock()) {
try {
processOutlinks(v);
queuedUriMap.removeAll(key);
} finally {
lock.unlock();
}
} else {
LOG.debug("Waiting for lock on " + key);
}
}
}
}
}

private void processOutlinks(Collection<QueuedUriWrapper> outlinks) throws DbException {
for (QueuedUriWrapper outUri : outlinks) {
if (uriNotIncludedInQueue(outUri)) {
outUri.setSequence(outUri.getDiscoveryPath().length());

PreconditionState check = Preconditions.checkPreconditions(crawlExecution.frontier,
crawlExecution.crawlConfig, crawlExecution.status, outUri);
switch (check) {
case OK:
LOG.debug("Found new URI: {}, queueing.", outUri.getSurt());
outUri.setPriorityWeight(crawlExecution.crawlConfig.getCrawlConfig().getPriorityWeight());
outUri.addUriToQueue();
break;
case RETRY:
LOG.debug("Failed preconditions for: {}, queueing for retry.", outUri.getSurt());
outUri.setPriorityWeight(crawlExecution.crawlConfig.getCrawlConfig().getPriorityWeight());
outUri.addUriToQueue();
break;
case FAIL:
case DENIED:
break;
}
}
}
}

private boolean shouldInclude(QueuedUriWrapper outlink) throws DbException {
if (!LimitsCheck.isQueueable(crawlExecution.limits, crawlExecution.status, outlink)) {
return false;
}

if (!crawlExecution.frontier.getScopeChecker().isInScope(crawlExecution.status, outlink)) {
return false;
}

return true;
}

private boolean uriNotIncludedInQueue(QueuedUriWrapper outlink) throws DbException {
if (DbService.getInstance().getCrawlQueueAdapter()
.uriNotIncludedInQueue(outlink.getQueuedUri(), crawlExecution.status.getStartTime())) {
return true;
}

LOG.debug("Found already included URI: {}, skipping.", outlink.getSurt());
return false;
}
}
Loading

0 comments on commit 60733d3

Please sign in to comment.