Skip to content

Commit

Permalink
Merge pull request #45 from nlnwa/fix-race
Browse files Browse the repository at this point in the history
Fix race condition
  • Loading branch information
maeb authored Jun 3, 2021
2 parents f1610cd + 78a30f6 commit 1bb0c5c
Show file tree
Hide file tree
Showing 20 changed files with 708 additions and 230 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import no.nb.nna.veidemann.frontier.worker.QueuedUriWrapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.ScanParams;
Expand Down Expand Up @@ -129,7 +130,10 @@ public CrawlQueueManager(Frontier frontier, RethinkDbConnection conn, JedisPool
() -> getPrefetchHandler(), p -> releaseCrawlHostGroup(p.getQueuedUri().getCrawlHostGroupId(), "", RESCHEDULE_DELAY));
}

public QueuedUri addToCrawlHostGroup(QueuedUri qUri, boolean attemptToSetAsBusy) throws DbException {
public QueuedUri addToCrawlHostGroup(QueuedUri qUri) throws DbException {
MDC.put("eid", qUri.getExecutionId());
MDC.put("uri", qUri.getUri());

Objects.requireNonNull(qUri.getCrawlHostGroupId(), "CrawlHostGroupId cannot be null");
Objects.requireNonNull(qUri.getPolitenessRef().getId(), "PolitenessId cannot be null");
if (qUri.getSequence() <= 0L) {
Expand Down Expand Up @@ -161,7 +165,7 @@ public QueuedUri addToCrawlHostGroup(QueuedUri qUri, boolean attemptToSetAsBusy)
try (JedisContext ctx = JedisContext.forPool(jedisPool)) {
uriAddScript.run(ctx, qUri);
chgAddScript.run(ctx, qUri.getCrawlHostGroupId(), qUri.getExecutionId(), qUri.getEarliestFetchTimeStamp(),
attemptToSetAsBusy, frontier.getSettings().getBusyTimeout().toMillis());
frontier.getSettings().getBusyTimeout().toMillis());
}
return qUri;
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

import static no.nb.nna.veidemann.frontier.db.CrawlQueueManager.*;

public class ChgAddScript extends RedisJob<Boolean> {
public class ChgAddScript extends RedisJob<Long> {
final LuaScript chgAddScript;

public ChgAddScript() {
Expand All @@ -20,25 +20,20 @@ public ChgAddScript() {
* Add uri to queue.
*
* @param ctx
* @param attemptToSetAsBusy if true, try to set uri's chg to busy
* @param busyTimeout if it is possible to set chg as busy, this is the timeout
* @return true if chg was set to busy, false otherwise
* @param busyTimeout if it is possible to set chg as busy, this is the timeout
* @return number of uris in queue for this CrawlHostGroup
*/
public boolean run(JedisContext ctx, String chgId, String crawlExecutionId, Timestamp earliestFetchTimestamp, boolean attemptToSetAsBusy, long busyTimeout) {
public long run(JedisContext ctx, String chgId, String crawlExecutionId, Timestamp earliestFetchTimestamp, long busyTimeout) {
return execute(ctx, jedis -> {
String chgKey = CHG_PREFIX + chgId;

// Handle CHG and counters
long readyTime = Timestamps.toMillis(earliestFetchTimestamp);
String readyTimeString = Long.toString(readyTime);
String busyExpireTime = String.valueOf(System.currentTimeMillis() + busyTimeout);

List<String> chgKeys = ImmutableList.of(chgKey, CHG_WAIT_KEY, CHG_BUSY_KEY, CHG_READY_KEY,
CRAWL_EXECUTION_ID_COUNT_KEY, QUEUE_COUNT_TOTAL_KEY);
List<String> chgArgs = ImmutableList.of(readyTimeString, crawlExecutionId, chgId, String.valueOf(attemptToSetAsBusy), busyExpireTime);
String queueCount = (String) chgAddScript.runString(jedis, chgKeys, chgArgs);

return queueCount.equals("true");
List<String> chgKeys = ImmutableList.of(chgKey, CHG_WAIT_KEY, CRAWL_EXECUTION_ID_COUNT_KEY, QUEUE_COUNT_TOTAL_KEY);
List<String> chgArgs = ImmutableList.of(readyTimeString, crawlExecutionId, chgId);
return (Long) chgAddScript.runString(jedis, chgKeys, chgArgs);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ public ChgGetScript() {
public CrawlHostGroup run(JedisContext ctx, String crawlHostGroupId) {
return execute(ctx, jedis -> {
Map<String, String> encoded = jedis.hgetAll(CrawlQueueManager.CHG_PREFIX + crawlHostGroupId);
LOG.trace("HGETALL {}, RESULT: {}", CrawlQueueManager.CHG_PREFIX + crawlHostGroupId, encoded);
return CrawlHostGroupCodec.decode(crawlHostGroupId, encoded);
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ public CrawlHostGroup run(JedisContext ctx, long busyTimeout) {
}
}
return null;
} else {
LOG.trace("BLPOP {} {}, RESULT: {}", waitForReadyTimeout, CHG_READY_KEY, res);
}
String chgId = res.get(1);
String chgKey = CHG_PREFIX + chgId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

import static no.nb.nna.veidemann.frontier.db.CrawlQueueManager.*;

public class ChgReleaseScript extends RedisJob<Void> {
public class ChgReleaseScript extends RedisJob<Long> {
private static final Logger LOG = LoggerFactory.getLogger(ChgReleaseScript.class);
final LuaScript chgRealeaseScript;

Expand All @@ -19,8 +19,8 @@ public ChgReleaseScript() {
chgRealeaseScript = new LuaScript("chg_release.lua");
}

public void run(JedisContext ctx, String crawlHostGroupId, String sessionToken, long nextFetchDelayMs) {
if (nextFetchDelayMs <= 0) {
public Long run(JedisContext ctx, String crawlHostGroupId, String sessionToken, long nextFetchDelayMs) {
if (nextFetchDelayMs < 10) {
nextFetchDelayMs = 10;
}
String chgKey = CrawlQueueManager.CHG_PREFIX + crawlHostGroupId;
Expand All @@ -30,13 +30,14 @@ public void run(JedisContext ctx, String crawlHostGroupId, String sessionToken,
List<String> keys = ImmutableList.of(CHG_BUSY_KEY, CHG_WAIT_KEY, chgKey, SESSION_TO_CHG_KEY);
List<String> args = ImmutableList.of(String.valueOf(waitTime), crawlHostGroupId, sessionToken);

execute(ctx, jedis -> {
return execute(ctx, jedis -> {
try {
chgRealeaseScript.runString(jedis, keys, args);
String result = (String) chgRealeaseScript.runString(jedis, keys, args);
return Long.parseLong(result);
} catch (JedisDataException e) {
LOG.warn(e.getMessage());
return 0L;
}
return null;
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,9 @@ public static List<String> encodeList(CrawlHostGroup chg) {
encoded.add(RETRY_DELAY_SECONDS);
encoded.add(String.valueOf(chg.getRetryDelaySeconds()));

encoded.add(QUEUED_URI_COUNT);
encoded.add(String.valueOf(chg.getQueuedUriCount()));
// Do not encode QUEUED_URI_COUNT (qc) since that value is manipulated with Redis HINCR
// encoded.add(QUEUED_URI_COUNT);
// encoded.add(String.valueOf(chg.getQueuedUriCount()));

encoded.add(CURRENT_URI_ID);
encoded.add(chg.getCurrentUriId());
Expand All @@ -82,7 +83,8 @@ public static Map<String, String> encodeMap(CrawlHostGroup chg) {
encoded.put(DELAY_FACTOR, String.valueOf(chg.getDelayFactor()));
encoded.put(MAX_RETRIES, String.valueOf(chg.getMaxRetries()));
encoded.put(RETRY_DELAY_SECONDS, String.valueOf(chg.getRetryDelaySeconds()));
encoded.put(QUEUED_URI_COUNT, String.valueOf(chg.getQueuedUriCount()));
// Do not encode QUEUED_URI_COUNT (qc) since that value is manipulated with Redis HINCR
// encoded.put(QUEUED_URI_COUNT, String.valueOf(chg.getQueuedUriCount()));
encoded.put(CURRENT_URI_ID, chg.getCurrentUriId());
encoded.put(SESSION_TOKEN, chg.getSessionToken());
encoded.put(FETCH_START_TIME_STAMP, Long.toString(Timestamps.toMillis(chg.getFetchStartTimeStamp())));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package no.nb.nna.veidemann.frontier.db.script;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.Marker;
import org.slf4j.MarkerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.exceptions.JedisNoScriptException;

Expand All @@ -10,11 +14,15 @@
import java.util.stream.Collectors;

public class LuaScript {
private static final Logger LOG = LoggerFactory.getLogger(LuaScript.class);

private final Marker scriptNameMarker;
String scriptName;
String sha;
String script;

public LuaScript(String scriptName) {
scriptNameMarker = MarkerFactory.getMarker(scriptName);
this.scriptName = scriptName;
InputStream in = LuaScript.class.getClassLoader().getResourceAsStream("lua/" + scriptName);
script = new BufferedReader(new InputStreamReader(in)).lines().collect(Collectors.joining("\n"));
Expand All @@ -25,19 +33,24 @@ Object runString(Jedis jedis, List<String> keys, List<String> args) {
sha = jedis.scriptLoad(script);
}
try {
return jedis.evalsha(sha, keys, args);
Object result = jedis.evalsha(sha, keys, args);
LOG.trace(scriptNameMarker, "{}: KEYS: {}, ARGS: {}, RESULT: {}", scriptName, keys, args, result);
return result;
} catch (JedisNoScriptException ex) {
sha = null;
return runString(jedis, keys, args);
}
}

Object runBytes(Jedis jedis, List<byte[]> keys, List<byte[]> args) {
LOG.trace(scriptNameMarker, "{}: KEYS: {}, ARGS: {}", scriptName, keys, args);
if (sha == null) {
sha = jedis.scriptLoad(script);
}
try {
return jedis.evalsha(sha.getBytes(), keys, args);
Object result = jedis.evalsha(sha.getBytes(), keys, args);
LOG.trace(scriptNameMarker, "{}: KEYS: {}, ARGS: {}, RESULT: {}", scriptName, keys, args, result);
return result;
} catch (JedisNoScriptException ex) {
sha = null;
return runBytes(jedis, keys, args);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ public void postFetchFinally() {
forEach(span, frontier.getPostFetchThreadPool(), outlinkQueue, outlink -> {
try {
OutlinkHandler.processOutlink(frontier, status, qUri, outlink, scriptParameters, scopeScriptRef);
} catch (DbException e) {
} catch (DbException | IllegalStateException e) {
// An error here indicates problems with DB communication. No idea how to handle that yet.
LOG.error("Error processing outlink: {}", e.toString(), e);
} catch (Throwable e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ public void accept(Boolean isAllowed) {
try {
if (isAllowed) {
if (changedCrawlHostGroup) {
frontier.getCrawlQueueManager().addToCrawlHostGroup(qUri.getQueuedUri(), false);
frontier.getCrawlQueueManager().addToCrawlHostGroup(qUri.getQueuedUri());
future.set(PreconditionState.RETRY);
} else {
future.set(PreconditionState.OK);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ public boolean forceAddUriToQueue(StatusWrapper status) throws DbException {
wrapped.clearAnnotation();

QueuedUri q = wrapped.build();
q = frontier.getCrawlQueueManager().addToCrawlHostGroup(q, false);
q = frontier.getCrawlQueueManager().addToCrawlHostGroup(q);
wrapped = q.toBuilder();

oldEarliestFetchTimestamp = null;
Expand Down
36 changes: 6 additions & 30 deletions src/main/resources/lua/chg_add.lua
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,12 @@

local chgKey = KEYS[1]
local waitKey = KEYS[2]
local busyKey = KEYS[3]
local readyKey = KEYS[4]
local crawlExecutionIdCountKey = KEYS[5]
local queueCountTotalKey = KEYS[6]
local crawlExecutionIdCountKey = KEYS[3]
local queueCountTotalKey = KEYS[4]

local nextReadyTime = ARGV[1]
local crawlExecutionId = ARGV[2]
local chgId = ARGV[3]
local attemptToSetAsBusy = ARGV[4]
local busyExpireTime = ARGV[5]

local chgExists = redis.call('EXISTS', chgKey)

Expand All @@ -27,29 +23,9 @@ redis.call('HINCRBY', crawlExecutionIdCountKey, crawlExecutionId, 1)
--- Increment total queue count
redis.call('INCR', queueCountTotalKey)

if attemptToSetAsBusy == 'true' then
-- Caller want chg to be busy immediately

-- If a new chg was created, it is safe to add as busy
if queueCount == 1 then
redis.call('ZADD', busyKey, busyExpireTime, chgId)
return 'true'
end

-- Check if chg is ready and can be added to busy
local isReady = redis.call('ZRANK', readyKey, chgId)
if isReady ~= 'nil' then
-- Remove from ready
redis.call('LREM', readyKey, 0, chgId)
-- Add chg to busy and return queue count
redis.call('ZADD', busyKey, busyExpireTime, chgId)
return 'true'
end
else
if chgExists == 0 then
-- If new chg was created, queue it.
redis.call('ZADD', waitKey, nextReadyTime, chgId)
end
if chgExists == 0 then
-- If new chg was created, queue it.
redis.call('ZADD', waitKey, nextReadyTime, chgId)
end

return 'false'
return queueCount
8 changes: 4 additions & 4 deletions src/main/resources/lua/chg_release.lua
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ if storedSessionToken ~= sessionToken then
error("Trying to release chg '" .. chgId .. "' with sessionToken '" .. sessionToken .. "', but chg's sessionToken was '" .. storedSessionToken .. "'")
end

local queueCount = redis.call('HGET', chgKey, "qc")

-- Remove chg from busyKey
if redis.call('ZREM', busyKey, chgId) == 1 then
-- Remove session token
Expand All @@ -31,17 +33,15 @@ if redis.call('ZREM', busyKey, chgId) == 1 then
end

-- Check queue count from chgKey.
local queueCount = redis.call('HGET', chgKey, "qc")
if (not queueCount) or (tonumber(queueCount) <= 0) then
-- If queue count is zero there is no need for chg, remove it.
redis.call('DEL', chgKey)
--redis.call('LREM', readyKey, 0, chgId)
--redis.call('ZREM', busyKey, chgId)
--redis.call('ZREM', waitKey, chgId)
else
-- Otherwise add chg to waitKey.
redis.call('ZADD', waitKey, waitTime, chgId)
-- Remove unnecessary data from chg.
redis.call('HDEL', chgKey, "df", "rd", "mi", "ma", "mr", "u", "st", "ts")
end
end

return queueCount
Loading

0 comments on commit 1bb0c5c

Please sign in to comment.