From a8d46efc3e9dff830ec081eb4ca5c55bb3febf5d Mon Sep 17 00:00:00 2001 From: Jason Powers Date: Thu, 24 Apr 2014 16:36:51 -0600 Subject: [PATCH 1/6] Updates to use sovrn repo for builds --- defaultEnvironment.gradle | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/defaultEnvironment.gradle b/defaultEnvironment.gradle index 3a1eef28..b372c732 100644 --- a/defaultEnvironment.gradle +++ b/defaultEnvironment.gradle @@ -8,6 +8,10 @@ repositories { maven { url "http://maven.restlet.org" } + maven { + url "http://git.15c.lijit.com:1984/nexus/content/groups/public/" + } + // local repo for libs we have to ship with def sandBoxRepo = "${project.rootDir}/sandbox-repo" From 492f687ee8acc3857a754ab2738bb3a860f58b82 Mon Sep 17 00:00:00 2001 From: Jason Powers Date: Thu, 24 Apr 2014 16:37:20 -0600 Subject: [PATCH 2/6] Swapped to sovrn built version of open-replicator --- subprojects.gradle | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/subprojects.gradle b/subprojects.gradle index a919b467..3a694a40 100644 --- a/subprojects.gradle +++ b/subprojects.gradle @@ -9,7 +9,7 @@ File getEnvironmentScript() apply from: environmentScript -project.version = "2.0.0" +project.version = "2.0.1-lijit" @@ -53,7 +53,7 @@ ext.externalDependency = [ if (isDefaultEnvironment) { externalDependency['mysqlConnectorJava'] = 'mysql:mysql-connector-java:5.1.14' externalDependency['helixCore'] = 'org.apache.helix:helix-core:0.6.2-incubating' - externalDependency['or'] = 'com.google:open-replicator:1.0.5' + externalDependency['or'] = 'net.sovrn:open-replicator:1.0.6' externalDependency['log4j'] = 'org.slf4j:slf4j-log4j12:1.6.1' } From 89e6fa2e9b5db5d47b2a25822f7c76f50fcbd392 Mon Sep 17 00:00:00 2001 From: Jason Powers Date: Fri, 25 Apr 2014 08:59:44 -0600 Subject: [PATCH 3/6] Removed link to sovrn nexus repo, moved to local init.gradle --- defaultEnvironment.gradle | 4 ---- 1 file changed, 4 deletions(-) diff --git a/defaultEnvironment.gradle b/defaultEnvironment.gradle index b372c732..3a1eef28 100644 --- a/defaultEnvironment.gradle +++ b/defaultEnvironment.gradle @@ -8,10 +8,6 @@ repositories { maven { url "http://maven.restlet.org" } - maven { - url "http://git.15c.lijit.com:1984/nexus/content/groups/public/" - } - // local repo for libs we have to ship with def sandBoxRepo = "${project.rootDir}/sandbox-repo" From a997948c651f85a496fb3f52e6ca608ec8ce6f52 Mon Sep 17 00:00:00 2001 From: Jason Powers Date: Fri, 25 Apr 2014 15:08:23 -0600 Subject: [PATCH 4/6] Removed the requirement to work in a transaction, added code to stop the NPE when an unwatched table comes through --- .../linkedin/databus2/producers/ORListener.java | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/databus2-relay/databus2-event-producer-or/src/main/java/com/linkedin/databus2/producers/ORListener.java b/databus2-relay/databus2-event-producer-or/src/main/java/com/linkedin/databus2/producers/ORListener.java index 32109ea0..f5990bab 100644 --- a/databus2-relay/databus2-event-producer-or/src/main/java/com/linkedin/databus2/producers/ORListener.java +++ b/databus2-relay/databus2-event-producer-or/src/main/java/com/linkedin/databus2/producers/ORListener.java @@ -389,7 +389,8 @@ private void startSource(String newTableName, long newTableId) if (null == srcId) { - throw new DatabusRuntimeException("Could not find a matching logical source for table Uri (" + _currTableName + ")" ); + _log.info("Could not find a matching logical source for table Uri (" + _currTableName + ")"); + return; } assert(_transaction != null); _perSourceTransaction = new PerSourceTransaction(srcId); @@ -403,14 +404,7 @@ private void startSource(String newTableName, long newTableId) private void endSource() { - if (_perSourceTransaction != null) - { _perSourceTransaction = null; - } - else - { - throw new DatabusRuntimeException("_perSourceTransaction should not be null in endSource()"); - } } private void deleteRows(DeleteRowsEventV2 dre) @@ -467,6 +461,11 @@ private void frameAvroRecord(BinlogEventV4Header bh, List rl, final DbusOpc final long scn = scn(_currFileNum, (int)bh.getPosition()); final boolean isReplicated = false; VersionedSchema vs = _schemaRegistryService.fetchLatestVersionedSchemaBySourceName(_tableUriToSrcNameMap.get(_currTableName)); + + if(vs == null) { + // There's nothing to do for this table, we're not listening to it + return; + } Schema schema = vs.getSchema(); if ( _log.isDebugEnabled()) From dc2e7abd903ef1c7af120179e0813dccafb0c4ef Mon Sep 17 00:00:00 2001 From: Jason Powers Date: Mon, 28 Apr 2014 10:49:51 -0600 Subject: [PATCH 5/6] Changed version number to be unique to this fork --- subprojects.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/subprojects.gradle b/subprojects.gradle index 3a694a40..3fc7c10d 100644 --- a/subprojects.gradle +++ b/subprojects.gradle @@ -9,7 +9,7 @@ File getEnvironmentScript() apply from: environmentScript -project.version = "2.0.1-lijit" +project.version = "2.0.2-sovrn" From 061475e59c9f612347cd42fe979d83ed94b72ee5 Mon Sep 17 00:00:00 2001 From: Jason Powers Date: Mon, 28 Apr 2014 10:50:18 -0600 Subject: [PATCH 6/6] Initial cut of code that will generate the current SCN if its not given one at startup --- .../OpenReplicatorEventProducer.java | 55 ++++++++++++++++++- 1 file changed, 54 insertions(+), 1 deletion(-) diff --git a/databus2-relay/databus2-event-producer-or/src/main/java/com/linkedin/databus2/producers/OpenReplicatorEventProducer.java b/databus2-relay/databus2-event-producer-or/src/main/java/com/linkedin/databus2/producers/OpenReplicatorEventProducer.java index 91946d7c..d6696dbf 100644 --- a/databus2-relay/databus2-event-producer-or/src/main/java/com/linkedin/databus2/producers/OpenReplicatorEventProducer.java +++ b/databus2-relay/databus2-event-producer-or/src/main/java/com/linkedin/databus2/producers/OpenReplicatorEventProducer.java @@ -21,6 +21,10 @@ import java.lang.management.ManagementFactory; import java.net.URI; import java.net.URISyntaxException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -297,11 +301,60 @@ public synchronized void start(long sinceSCN) } } } + + // If we get to here and the SCN is still 0 or -1 + // go get the current position from the DB + if(sinceSCNToUse <= 0) { + sinceSCNToUse = getCurrentMaxSCN(); + } _producerThread = new EventProducerThread(_physicalSourceName, sinceSCNToUse); _producerThread.start(); } - public class EventProducerThread extends DatabusThreadBase implements TransactionProcessor + /** + * Gets the current Max SCN from the DB. Used when there isn't one specified + * @return + */ + private long getCurrentMaxSCN() { + Connection connection = null; + try { + Class.forName("com.mysql.jdbc.Driver").newInstance(); + connection = DriverManager.getConnection("jdbc:mysql://"+ _or.getHost() + ":" + _or.getPort() + "/?" + + "user="+ _or.getUser() + "&password=" + _or.getPassword()); + ResultSet rs = connection.prepareStatement("show master status").executeQuery(); + String filename = null; + int position = 4; + if(rs.next()) + { + filename = rs.getString(1); + position = rs.getInt(2); + } + rs.close(); + int fileId = Integer.parseInt(filename.split("\\.")[1]); + return ORListener.scn(fileId, position); + + } + catch (Exception e) + { + _log.error("Unable to load mysql driver to get SCN:" + e.getMessage(), e); + return 0; + } + finally + { + if (connection != null) + { + try { + connection.close(); + } + catch (SQLException e) + { + _log.error("Unable to close mysql connection" + e.getMessage(), e); + } + } + } +} + +public class EventProducerThread extends DatabusThreadBase implements TransactionProcessor { // The scn with which the event buffer is started private final AtomicLong _startPrevScn = new AtomicLong(-1);