From 9c3be977f7b9d135eb0ab1f70a084843311d7d07 Mon Sep 17 00:00:00 2001 From: andyqzb Date: Fri, 6 Nov 2015 20:44:34 +0800 Subject: [PATCH] ORListener use poll to block for new events --- .../databus2/producers/ORListener.java | 26 ++++++++++++++++--- .../OpenReplicatorEventProducer.java | 2 +- 2 files changed, 24 insertions(+), 4 deletions(-) diff --git a/databus2-relay/databus2-event-producer-or/src/main/java/com/linkedin/databus2/producers/ORListener.java b/databus2-relay/databus2-event-producer-or/src/main/java/com/linkedin/databus2/producers/ORListener.java index 0723d62c..c6f33d7b 100644 --- a/databus2-relay/databus2-event-producer-or/src/main/java/com/linkedin/databus2/producers/ORListener.java +++ b/databus2-relay/databus2-event-producer-or/src/main/java/com/linkedin/databus2/producers/ORListener.java @@ -136,6 +136,9 @@ public interface TransactionProcessor /** Shared queue to transfer binlog events from OpenReplicator to ORlistener thread **/ private BlockingQueue _binlogEventQueue = null; + /** Milli sec timeout for _binlogEventQueue operation **/ + private long _queueTimeoutMs = 100L; + public ORListener(String name, int currentFileNumber, Logger log, @@ -144,7 +147,8 @@ public ORListener(String name, Map tableUriToSrcIdMap, Map tableUriToSrcNameMap, SchemaRegistryService schemaRegistryService, - int maxQueueSize) + int maxQueueSize, + long queueTimeoutMs) { super("ORListener_" + name); _log = log; @@ -155,6 +159,7 @@ public ORListener(String name, _schemaRegistryService = schemaRegistryService; _currFileNum = currentFileNumber; _binlogEventQueue = new LinkedBlockingQueue(maxQueueSize); + _queueTimeoutMs = queueTimeoutMs; } @Override @@ -163,7 +168,7 @@ public void onEvents(BinlogEventV4 event) boolean isPut = false; do { try { - isPut = _binlogEventQueue.offer(event, 100, TimeUnit.MILLISECONDS); + isPut = _binlogEventQueue.offer(event, _queueTimeoutMs, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { _log.error("failed to put binlog event to binlogEventQueue event: " + event, e); } @@ -638,9 +643,24 @@ public void run() { eventList.clear(); int eventNumber = _binlogEventQueue.drainTo(eventList); + if (eventNumber == 0) + { + try + { + event = _binlogEventQueue.poll(_queueTimeoutMs, TimeUnit.MILLISECONDS); + if (event != null) + { + eventList.add(event); + eventNumber = eventList.size(); + } + } + catch (InterruptedException e) + { + _log.info("Interrupted when poll from _binlogEventQueue!!"); + } + } for (int i = 0; i < eventNumber; i++) { - event = eventList.get(i); if (event == null) { diff --git a/databus2-relay/databus2-event-producer-or/src/main/java/com/linkedin/databus2/producers/OpenReplicatorEventProducer.java b/databus2-relay/databus2-event-producer-or/src/main/java/com/linkedin/databus2/producers/OpenReplicatorEventProducer.java index 78c76d6e..f06d31d7 100644 --- a/databus2-relay/databus2-event-producer-or/src/main/java/com/linkedin/databus2/producers/OpenReplicatorEventProducer.java +++ b/databus2-relay/databus2-event-producer-or/src/main/java/com/linkedin/databus2/producers/OpenReplicatorEventProducer.java @@ -330,7 +330,7 @@ void initOpenReplicator(long scn) String binlogFile = String.format("%s.%06d", _binlogFilePrefix, logid); // we should use a new ORListener to drop the left events in binlogEventQueue and the half processed transaction. _orListener = new ORListener(_sourceName, logid, _log, _binlogFilePrefix, _producerThread, _tableUriToSrcIdMap, - _tableUriToSrcNameMap, _schemaRegistryService, 200); + _tableUriToSrcNameMap, _schemaRegistryService, 200, 100L); _or.setBinlogFileName(binlogFile); _or.setBinlogPosition(offset);