diff --git a/README.md b/README.md index 480cca5d..697047f5 100644 --- a/README.md +++ b/README.md @@ -3,7 +3,7 @@ [![Join the chat at https://gitter.im/linkedin/databus](https://badges.gitter.im/Join%20Chat.svg)](https://gitter.im/linkedin/databus?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge) -In Internet architectures, data systems are typically categorized into source-of-truth systems that serve as primary stores for the user-generated writes, and derived data stores or indexes which serve reads and other complex queries. The data in these secondary stores is often derived from the primary data through custom transformations, sometimes involving complex processing driven by business logic. Similarly data in caching tiers is derived from reads against the primary data store, but needs to get invalidated or refreshed when the primary data gets mutated. A fundamental requirement emerging from these kinds of data architectures is the need to reliably capture, flow and process primary data changes. +In Internet architectures, data systems are typically categorized into source-of-truth systems that serve as primary stores for the user-generated writes, and derived data stores or indexes which serve reads and other complex queries. The data in these secondary stores is often derived from the primary data through custom transformations, sometimes involving complex processing driven by business logic. Similarly, data in caching tiers is derived from reads against the primary data store, but needs to get invalidated or refreshed when the primary data gets mutated. A fundamental requirement emerging from these kinds of data architectures is the need to reliably capture, flow and process primary data changes. We have built Databus, a source-agnostic distributed change data capture system, which is an integral part of LinkedIn's data processing pipeline. The Databus transport layer provides latencies in the low milliseconds and handles throughput of thousands of events per second per server while supporting infinite look back capabilities and rich subscription functionality. diff --git a/databus-client/databus-client-http/src/main/java/com/linkedin/databus/client/netty/NettyHttpDatabusRelayConnection.java b/databus-client/databus-client-http/src/main/java/com/linkedin/databus/client/netty/NettyHttpDatabusRelayConnection.java index 7cfa6cb0..04514daf 100644 --- a/databus-client/databus-client-http/src/main/java/com/linkedin/databus/client/netty/NettyHttpDatabusRelayConnection.java +++ b/databus-client/databus-client-http/src/main/java/com/linkedin/databus/client/netty/NettyHttpDatabusRelayConnection.java @@ -23,11 +23,13 @@ import java.io.InputStream; import java.nio.channels.Channels; import java.nio.channels.ClosedChannelException; +import java.nio.charset.Charset; import java.util.Formatter; import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.commons.io.IOUtils; import org.apache.log4j.Logger; import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.type.TypeReference; @@ -51,6 +53,7 @@ import com.linkedin.databus.core.DbusPrettyLogUtils; import com.linkedin.databus.core.async.ActorMessageQueue; import com.linkedin.databus.core.data_model.PhysicalPartition; +import com.linkedin.databus.core.util.CompressUtil; import com.linkedin.databus.core.util.IdNamePair; import com.linkedin.databus.core.util.Range; import com.linkedin.databus2.core.container.DatabusHttpHeaders; @@ -67,6 +70,7 @@ public class NettyHttpDatabusRelayConnection { public static final String MODULE = NettyHttpDatabusRelayConnection.class.getName(); public static final Logger LOG = Logger.getLogger(MODULE); + public static final boolean needCompress = true; private static enum State { @@ -239,7 +243,7 @@ private String createRegisterUrl() uriString.append("&sources=") .append(_sourcesSubsList); } - + uriString.append("&").append(DatabusHttpHeaders.PROTOCOL_COMPRESS_PARAM).append("=").append(needCompress); final String url = uriString.toString(); return url; } @@ -710,6 +714,19 @@ public void finishResponse() throws Exception else { InputStream bodyStream = Channels.newInputStream(_decorated); + String bodyStr = IOUtils.toString(bodyStream,Charset.defaultCharset().name()); + IOUtils.closeQuietly(bodyStream); + if (NettyHttpDatabusRelayConnection.needCompress) + { + try + { + bodyStr = CompressUtil.uncompress(bodyStr); + } + catch (Exception e)//failed because the steam may be not compressed + { + } + } + ObjectMapper mapper = new ObjectMapper(); int registerResponseVersion = 3; // either 2 or 3 would suffice here; we care only about 4 @@ -734,7 +751,7 @@ public void finishResponse() throws Exception if (registerResponseVersion == 4) // DDSDBUS-2009 { HashMap> responseMap = - mapper.readValue(bodyStream, new TypeReference>>() {}); + mapper.readValue(bodyStr, new TypeReference>>() {}); // Look for mandatory SOURCE_SCHEMAS_KEY. Map> sourcesSchemasMap = RegisterResponseEntry.createFromResponse(responseMap, @@ -760,7 +777,7 @@ public void finishResponse() throws Exception else // version 2 or 3 { List schemasList = - mapper.readValue(bodyStream, new TypeReference>() {}); + mapper.readValue(bodyStr, new TypeReference>() {}); Map> sourcesSchemasMap = RegisterResponseEntry.convertSchemaListToMap(schemasList); diff --git a/databus-core/databus-core-container/src/main/java/com/linkedin/databus2/core/container/DatabusHttpHeaders.java b/databus-core/databus-core-container/src/main/java/com/linkedin/databus2/core/container/DatabusHttpHeaders.java index 926cb305..05ff528c 100644 --- a/databus-core/databus-core-container/src/main/java/com/linkedin/databus2/core/container/DatabusHttpHeaders.java +++ b/databus-core/databus-core-container/src/main/java/com/linkedin/databus2/core/container/DatabusHttpHeaders.java @@ -51,6 +51,7 @@ public class DatabusHttpHeaders /** protocol version param name for /register request */ public static final String PROTOCOL_VERSION_PARAM = "protocolVersion"; + public static final String PROTOCOL_COMPRESS_PARAM = "compress"; /** max event version - max DbusEvent version client can understand */ public static final String MAX_EVENT_VERSION = "maxev"; diff --git a/databus-core/databus-core-impl/build.gradle b/databus-core/databus-core-impl/build.gradle index 9f14e788..d2c17b93 100644 --- a/databus-core/databus-core-impl/build.gradle +++ b/databus-core/databus-core-impl/build.gradle @@ -16,6 +16,8 @@ dependencies { compile externalDependency.json compile externalDependency.log4j compile externalDependency.netty + compile externalDependency.c3p0 + compile externalDependency.guava testCompile externalDependency.testng testCompile externalDependency.easymock diff --git a/databus-core/databus-core-impl/src/main/java/com/linkedin/databus/core/util/CompressUtil.java b/databus-core/databus-core-impl/src/main/java/com/linkedin/databus/core/util/CompressUtil.java new file mode 100644 index 00000000..d6862c03 --- /dev/null +++ b/databus-core/databus-core-impl/src/main/java/com/linkedin/databus/core/util/CompressUtil.java @@ -0,0 +1,37 @@ +package com.linkedin.databus.core.util; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; + +import com.google.common.io.BaseEncoding; + +public class CompressUtil +{ + public static String compress(String str) throws IOException + { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + GZIPOutputStream gzip = new GZIPOutputStream(out); + gzip.write(str.getBytes(Charset.defaultCharset())); + gzip.close(); + return BaseEncoding.base64().encode(out.toByteArray()); + } + + public static String uncompress(String str) throws IOException + { + byte[] encodeByteArr = BaseEncoding.base64().decode(str); + ByteArrayOutputStream out = new ByteArrayOutputStream(); + ByteArrayInputStream in = new ByteArrayInputStream(encodeByteArr); + GZIPInputStream gunzip = new GZIPInputStream(in); + byte[] buffer = new byte[256]; + int n; + while ((n = gunzip.read(buffer)) >= 0) + { + out.write(buffer, 0, n); + } + return out.toString(Charset.defaultCharset().name()); + } +} diff --git a/databus-core/databus-core-impl/src/main/java/com/linkedin/databus2/core/seq/MaxSCNReaderWriterConfig.java b/databus-core/databus-core-impl/src/main/java/com/linkedin/databus2/core/seq/MaxSCNReaderWriterConfig.java index b3093f62..3dc5fc3a 100644 --- a/databus-core/databus-core-impl/src/main/java/com/linkedin/databus2/core/seq/MaxSCNReaderWriterConfig.java +++ b/databus-core/databus-core-impl/src/main/java/com/linkedin/databus2/core/seq/MaxSCNReaderWriterConfig.java @@ -27,6 +27,7 @@ public class MaxSCNReaderWriterConfig implements ConfigBuilder + { + private String jdbcUrl= "jdbc:mysql://localhost:3306/databus"; + + private String scnTable = "databus_scn_store"; + + private String driverClass = "com.mysql.jdbc.Driver"; + + private String dbUser = "root"; + + private String dbPassword = ""; + + private Long flushItvl = 1L; + + private Long initVal = 0L; + + private String upsertSCNQuery = "insert into databus_scn_store (max_scn) values (?)"; + + private String getSCNQuery = "select max_scn from databus_scn_store order by updated_at desc limit 1"; + + private String scnColumnName = "max_scn"; + + public String getScnColumnName() { + return scnColumnName; + } + + public void setScnColumnName(String scnColumnName) { + this.scnColumnName = scnColumnName; + } + + public Long getInitVal() { + return initVal; + } + + public void setInitVal(Long initVal) { + this.initVal = initVal; + } + + public String getUpsertSCNQuery() { + return upsertSCNQuery; + } + + public void setUpsertSCNQuery(String upsertSCNQuery) { + this.upsertSCNQuery = upsertSCNQuery; + } + + public String getGetSCNQuery() { + return getSCNQuery; + } + + public void setGetSCNQuery(String getSCNQuery) { + this.getSCNQuery = getSCNQuery; + } + + public Long getFlushItvl() { + return flushItvl; + } + + public void setFlushItvl(Long flushItvl) { + this.flushItvl = flushItvl; + } + + public String getDbUser() { + return dbUser; + } + + public void setDbUser(String dbUser) { + this.dbUser = dbUser; + } + + public String getDbPassword() { + return dbPassword; + } + + public void setDbPassword(String dbPassword) { + this.dbPassword = dbPassword; + } + + public String getJdbcUrl() { + return jdbcUrl; + } + + public void setJdbcUrl(String jdbcUrl) { + this.jdbcUrl = jdbcUrl; + } + + public String getScnTable() { + return scnTable; + } + + public void setScnTable(String scnTable) { + this.scnTable = scnTable; + } + + public String getDriverClass() { + return driverClass; + } + + public void setDriverClass(String driverClass) { + this.driverClass = driverClass; + } + + @Override + public StaticConfig build() throws InvalidConfigException { + //TODO : verify + return new StaticConfig(jdbcUrl,scnTable, driverClass, dbUser, dbPassword, flushItvl, initVal , + upsertSCNQuery, getSCNQuery ,scnColumnName); + } + } + public static class StaticConfig{ + + private String driverClass = "com.mysql.jdbc.Driver"; + + private String jdbcUrl; + + private String scnTable; + + private String dbUser; + + private String dbPassword; + + private Long flushItvl; + + private Long initVal; + + private String upsertSCNQuery; + + private String getSCNQuery; + + private String scnColumnName; + + public String getScnColumnName() { + return scnColumnName; + } + + public Long getInitVal() { + return initVal; + } + + public String getUpsertSCNQuery() { + return upsertSCNQuery; + } + + public String getGetSCNQuery() { + return getSCNQuery; + } + + public Long getFlushItvl() { + return flushItvl; + } + + public String getDriverClass() { + return driverClass; + } + + public String getJdbcUrl() { + return jdbcUrl; + } + + public String getScnTable() { + return scnTable; + } + + public String getDbUser() { + return dbUser; + } + + public String getDbPassword() { + return dbPassword; + } + + public StaticConfig(String host, String table, String driverClass, String dbUser, String dbPassword, long flushItvl, long initVal, + String upsertSCNQuery, String getSCNQuery, String scnColumnName){ + this.jdbcUrl = host; + this.scnTable = table; + this.driverClass = driverClass; + this.dbUser = dbUser; + this.dbPassword = dbPassword; + this.flushItvl = flushItvl; + this.initVal = initVal; + this.upsertSCNQuery = upsertSCNQuery; + this.getSCNQuery = getSCNQuery; + this.scnColumnName = scnColumnName; + } + + } + +} + diff --git a/databus-core/databus-core-impl/src/main/java/com/linkedin/databus2/core/seq/MysqlMaxSCNHandlerFactory.java b/databus-core/databus-core-impl/src/main/java/com/linkedin/databus2/core/seq/MysqlMaxSCNHandlerFactory.java new file mode 100644 index 00000000..43a99d79 --- /dev/null +++ b/databus-core/databus-core-impl/src/main/java/com/linkedin/databus2/core/seq/MysqlMaxSCNHandlerFactory.java @@ -0,0 +1,26 @@ +package com.linkedin.databus2.core.seq; + +import com.linkedin.databus2.core.DatabusException; + +/** + * + */ +public class MysqlMaxSCNHandlerFactory implements SequenceNumberHandlerFactory { + private final MysqlMaxSCNHandler.Config _configBuilder; + + public MysqlMaxSCNHandlerFactory(MysqlMaxSCNHandler.Config configBuilder) + { + _configBuilder = configBuilder; + } + + @Override + public MaxSCNReaderWriter createHandler(String id) throws DatabusException { + MysqlMaxSCNHandler maxSCNHandler; + MysqlMaxSCNHandler.StaticConfig config; + synchronized (_configBuilder) { + config = _configBuilder.build(); + maxSCNHandler = MysqlMaxSCNHandler.create(config); + } + return maxSCNHandler; + } +} diff --git a/databus-core/databus-core-schemas/src/main/java/com/linkedin/databus2/schemas/VersionedSchema.java b/databus-core/databus-core-schemas/src/main/java/com/linkedin/databus2/schemas/VersionedSchema.java index 91706ce1..4f72866d 100644 --- a/databus-core/databus-core-schemas/src/main/java/com/linkedin/databus2/schemas/VersionedSchema.java +++ b/databus-core/databus-core-schemas/src/main/java/com/linkedin/databus2/schemas/VersionedSchema.java @@ -19,6 +19,9 @@ */ +import java.util.ArrayList; +import java.util.List; + import org.apache.avro.Schema; /** @@ -30,12 +33,14 @@ public class VersionedSchema private final VersionedSchemaId _id; private final String _origSchemaStr; + private final List _pkFieldList; public VersionedSchema(VersionedSchemaId id, Schema s, String origSchemaStr) { _schema = s; _id = id; _origSchemaStr = origSchemaStr; + _pkFieldList = new ArrayList(); } public VersionedSchema(String baseName, short id, Schema s, String origSchemaStr) @@ -85,6 +90,11 @@ public VersionedSchemaId getId() { return _id; } + + public List getPkFieldList() + { + return _pkFieldList; + } /** * @return The original schema string as registered. Returns null if the original schema string is not available. diff --git a/databus-core/databus-core-schemas/src/main/java/com/linkedin/databus2/schemas/utils/SchemaHelper.java b/databus-core/databus-core-schemas/src/main/java/com/linkedin/databus2/schemas/utils/SchemaHelper.java index 8a8eda4f..57cb4032 100644 --- a/databus-core/databus-core-schemas/src/main/java/com/linkedin/databus2/schemas/utils/SchemaHelper.java +++ b/databus-core/databus-core-schemas/src/main/java/com/linkedin/databus2/schemas/utils/SchemaHelper.java @@ -22,10 +22,14 @@ import java.util.Collections; import java.util.Comparator; import java.util.List; +import java.util.Map; import org.apache.avro.Schema; import org.apache.avro.Schema.Field; import org.apache.avro.Schema.Type; +import org.jboss.netty.util.internal.ConcurrentHashMap; + +import com.linkedin.databus2.schemas.VersionedSchema; /** @@ -214,6 +218,39 @@ public static final byte[] getSchemaId(String schema) { return Utils.md5(schema.getBytes(Charset.defaultCharset())); } + + private static Map> orderedMap = new ConcurrentHashMap>(); + /** + * Order the fields present in the schema based on the metaFieldName and comparator being passed. + * + * @param schema Schema containing the fields to be ordered + * @param metaFieldName Meta Field for ordering + * @param comparator comparator for sorting + * @return ordered Field list or null if schema is null + */ + public static List getOrderedFieldsByDBFieldPosition(final VersionedSchema vs) + { + if ( null == vs || null == vs.getSchema()) + return null; + List fieldList = orderedMap.get(vs.getId().toString()); + if(fieldList != null) + { + return fieldList; + } + + Schema schema = vs.getSchema(); + fieldList = getOrderedFieldsByMetaField(schema, "dbFieldPosition", new Comparator() { + @Override + public int compare(String o1, String o2) { + // TODO Auto-generated method stub + int m1 = Integer.parseInt(o1); + int m2 = Integer.parseInt(o2); + return m1-m2; + } + }); + orderedMap.put(vs.getId().toString(), fieldList); + return fieldList; + } /** * Order the fields present in the schema based on the metaFieldName and comparator being passed. diff --git a/databus-util-cmdline/databus-util-cmdline-impl/build.gradle b/databus-util-cmdline/databus-util-cmdline-impl/build.gradle index 05883b40..95ee42a3 100644 --- a/databus-util-cmdline/databus-util-cmdline-impl/build.gradle +++ b/databus-util-cmdline/databus-util-cmdline-impl/build.gradle @@ -9,6 +9,7 @@ dependencies { compile externalDependency.jacksonCoreAsl compile externalDependency.jacksonMapperAsl compile externalDependency.jline + compile externalDependency.mysqlConnectorJava compile project(':databus-core:databus-core-impl') compile project(':databus-core:databus-core-schemas') diff --git a/databus-util-cmdline/databus-util-cmdline-impl/src/main/java/com/linkedin/databus/util/AvroPrimitiveTypes.java b/databus-util-cmdline/databus-util-cmdline-impl/src/main/java/com/linkedin/databus/util/AvroPrimitiveTypes.java index a8403e4d..3e1f50dc 100644 --- a/databus-util-cmdline/databus-util-cmdline-impl/src/main/java/com/linkedin/databus/util/AvroPrimitiveTypes.java +++ b/databus-util-cmdline/databus-util-cmdline-impl/src/main/java/com/linkedin/databus/util/AvroPrimitiveTypes.java @@ -33,6 +33,7 @@ public enum AvroPrimitiveTypes LONG("long"), RAW("bytes"), FLOAT("float"), + DECIMAL("double"), DOUBLE("double"), CLOB("string"), VARCHAR("string"), @@ -40,6 +41,7 @@ public enum AvroPrimitiveTypes NVARCHAR("string"), NVARCHAR2("string"), TIMESTAMP("long"), + DATETIME("long"), CHAR("string"), DATE("long"), BLOB("bytes"), @@ -55,7 +57,8 @@ public enum AvroPrimitiveTypes INT("long"), INT_UNSIGNED("long"), BIGINT("long"), - BIGINT_UNSIGNED("long"); + BIGINT_UNSIGNED("long"), + YEAR("int"); private final String _avroType; private AvroPrimitiveTypes(String avroType) diff --git a/databus-util-cmdline/databus-util-cmdline-impl/src/main/java/com/linkedin/databus/util/FieldToAvro.java b/databus-util-cmdline/databus-util-cmdline-impl/src/main/java/com/linkedin/databus/util/FieldToAvro.java index 78b08731..dd790686 100644 --- a/databus-util-cmdline/databus-util-cmdline-impl/src/main/java/com/linkedin/databus/util/FieldToAvro.java +++ b/databus-util-cmdline/databus-util-cmdline-impl/src/main/java/com/linkedin/databus/util/FieldToAvro.java @@ -202,7 +202,7 @@ private Map simpleTypeToAvro(FieldInfo fieldInfo, SimpleTypeInfo field.put("default", null); // Field type - String[] type = new String[] { "null", typeInfo.getPrimitiveType().getAvroType() }; + String[] type = new String[] {"null", typeInfo.getPrimitiveType().getAvroType()}; field.put("type", type); // Field metadata diff --git a/databus-util-cmdline/databus-util-cmdline-impl/src/main/java/com/linkedin/databus/util/SchemaUtils.java b/databus-util-cmdline/databus-util-cmdline-impl/src/main/java/com/linkedin/databus/util/SchemaUtils.java index 6ee1a2e9..9d8a4649 100644 --- a/databus-util-cmdline/databus-util-cmdline-impl/src/main/java/com/linkedin/databus/util/SchemaUtils.java +++ b/databus-util-cmdline/databus-util-cmdline-impl/src/main/java/com/linkedin/databus/util/SchemaUtils.java @@ -50,7 +50,7 @@ public static String toCamelCase(String columnName) public static String toCamelCase(String columnName, boolean initialCap) { - boolean afterUnderscore = false; +/* boolean afterUnderscore = false; StringBuilder sb = new StringBuilder(columnName.length()); for(int i=0; i < columnName.length(); i++) { @@ -76,7 +76,9 @@ else if(afterUnderscore) sb.replace(0, 1, sb.substring(0,1).toUpperCase()); } - return sb.toString(); + return sb.toString();*/ + //FIXME + return columnName; } public static boolean in(String needle, String... haystack) diff --git a/databus-util-cmdline/databus-util-cmdline-impl/src/main/java/com/linkedin/databus/util/TypeInfoFactory.java b/databus-util-cmdline/databus-util-cmdline-impl/src/main/java/com/linkedin/databus/util/TypeInfoFactory.java index 3be4428b..b078b8dc 100644 --- a/databus-util-cmdline/databus-util-cmdline-impl/src/main/java/com/linkedin/databus/util/TypeInfoFactory.java +++ b/databus-util-cmdline/databus-util-cmdline-impl/src/main/java/com/linkedin/databus/util/TypeInfoFactory.java @@ -121,6 +121,10 @@ public TableTypeInfo buildTableType(Connection con, String tableOwner, String ta columnTypeName = columnTypeParts[1]; } + if (columnTypeName.endsWith(" UNSIGNED")) { + columnTypeName = columnTypeName.substring(0, columnTypeName.length() - " UNSIGNED".length()); + } + TypeInfo typeInfo = getTypeInfo(con, columnTypeOwner, columnTypeName, columnPrecision, columnScale,""); FieldInfo field = new FieldInfo(columnName, typeInfo, column - 1); fields.add(field); @@ -208,7 +212,7 @@ public boolean isSimpleType(Connection con, String typeOwner, String typeName) throws SQLException { //For whatever reason, the JDBC driver does not return this as primitive type - if (typeName.equalsIgnoreCase("NVARCHAR") + if (typeName.equalsIgnoreCase("NVARCHAR") || typeName.equalsIgnoreCase("NVARCHAR2") || typeName.contains("XML")) return true; diff --git a/databus2-relay/databus2-event-producer-common/src/main/java/com/linkedin/databus2/producers/ds/Transaction.java b/databus2-relay/databus2-event-producer-common/src/main/java/com/linkedin/databus2/producers/ds/Transaction.java index 95c4fa28..24781917 100644 --- a/databus2-relay/databus2-event-producer-common/src/main/java/com/linkedin/databus2/producers/ds/Transaction.java +++ b/databus2-relay/databus2-event-producer-common/src/main/java/com/linkedin/databus2/producers/ds/Transaction.java @@ -50,6 +50,18 @@ public class Transaction */ private long _txnNanoTimestamp; + private long ignoredSourceScn = -1; + + + public long getIgnoredSourceScn() { + return ignoredSourceScn; + } + + public void setIgnoredSourceScn(long ignoredSourceScn) { + this.ignoredSourceScn = ignoredSourceScn; + } + + public Transaction() { _perSourceTxnEntries = new HashMap(); diff --git a/databus2-relay/databus2-event-producer-or/src/main/java/com/linkedin/databus2/producers/ORListener.java b/databus2-relay/databus2-event-producer-or/src/main/java/com/linkedin/databus2/producers/ORListener.java index 6c0db42d..48ff2639 100644 --- a/databus2-relay/databus2-event-producer-or/src/main/java/com/linkedin/databus2/producers/ORListener.java +++ b/databus2-relay/databus2-event-producer-or/src/main/java/com/linkedin/databus2/producers/ORListener.java @@ -3,11 +3,7 @@ import java.math.BigInteger; import java.nio.ByteBuffer; import java.nio.charset.Charset; -import java.sql.Time; -import java.sql.Timestamp; import java.util.ArrayList; -import java.util.Calendar; -import java.util.Comparator; import java.util.Date; import java.util.HashMap; import java.util.List; @@ -40,25 +36,12 @@ import com.google.code.or.common.glossary.Column; import com.google.code.or.common.glossary.Pair; import com.google.code.or.common.glossary.Row; -import com.google.code.or.common.glossary.column.BitColumn; -import com.google.code.or.common.glossary.column.BlobColumn; -import com.google.code.or.common.glossary.column.DateColumn; -import com.google.code.or.common.glossary.column.DatetimeColumn; -import com.google.code.or.common.glossary.column.DecimalColumn; -import com.google.code.or.common.glossary.column.DoubleColumn; -import com.google.code.or.common.glossary.column.EnumColumn; -import com.google.code.or.common.glossary.column.FloatColumn; import com.google.code.or.common.glossary.column.Int24Column; import com.google.code.or.common.glossary.column.LongColumn; import com.google.code.or.common.glossary.column.LongLongColumn; import com.google.code.or.common.glossary.column.NullColumn; -import com.google.code.or.common.glossary.column.SetColumn; import com.google.code.or.common.glossary.column.ShortColumn; -import com.google.code.or.common.glossary.column.StringColumn; -import com.google.code.or.common.glossary.column.TimeColumn; -import com.google.code.or.common.glossary.column.TimestampColumn; import com.google.code.or.common.glossary.column.TinyColumn; -import com.google.code.or.common.glossary.column.YearColumn; import com.linkedin.databus.core.DatabusRuntimeException; import com.linkedin.databus.core.DatabusThreadBase; import com.linkedin.databus.core.DbusOpcode; @@ -68,6 +51,7 @@ import com.linkedin.databus2.producers.ds.PerSourceTransaction; import com.linkedin.databus2.producers.ds.PrimaryKeySchema; import com.linkedin.databus2.producers.ds.Transaction; +import com.linkedin.databus2.producers.util.Or2AvroConvert; import com.linkedin.databus2.schemas.NoSuchSchemaException; import com.linkedin.databus2.schemas.SchemaRegistryService; import com.linkedin.databus2.schemas.VersionedSchema; @@ -134,6 +118,9 @@ public interface TransactionProcessor /** Track all the table map events, cleared when the binlog rotated **/ private final Map _tableMapEvents = new HashMap(); + + /** Transaction into buffer thread */ + private final TransactionWriter _transactionWriter; /** Shared queue to transfer binlog events from OpenReplicator to ORlistener thread **/ private BlockingQueue _binlogEventQueue = null; @@ -148,6 +135,9 @@ public interface TransactionProcessor public static final long INTEGER_MAX_VALUE = 4294967296L; public static final BigInteger BIGINT_MAX_VALUE = new BigInteger("18446744073709551616"); + private String _curSourceName; + private boolean _ignoreSource = false; + public ORListener(String name, int currentFileNumber, Logger log, @@ -169,6 +159,8 @@ public ORListener(String name, _currFileNum = currentFileNumber; _binlogEventQueue = new LinkedBlockingQueue(maxQueueSize); _queueTimeoutMs = queueTimeoutMs; + _transactionWriter = new TransactionWriter(maxQueueSize, queueTimeoutMs, txnProcessor); + _transactionWriter.start(); } @Override @@ -188,8 +180,8 @@ private void processTableMapEvent(TableMapEvent tme) { _tableMapEvents.put(tme.getTableId(), tme); - String newTableName = tme.getDatabaseName().toString().toLowerCase() + "." + tme.getTableName().toString().toLowerCase(); - startSource(newTableName); + _curSourceName = tme.getDatabaseName().toString().toLowerCase() + "." + tme.getTableName().toString().toLowerCase(); + startSource(_curSourceName); } private void startXtion(QueryEvent e) @@ -222,13 +214,15 @@ private void endXtion(AbstractBinlogEventV4 e) _transaction.setSizeInBytes(_currTxnSizeInBytes); _transaction.setTxnNanoTimestamp(_currTxnTimestamp); _transaction.setTxnReadLatencyNanos(txnReadLatency); + + if(_ignoreSource) { + long scn = scn(_currFileNum, (int)e.getHeader().getPosition()); + _transaction.setIgnoredSourceScn(scn); + } + try { - _txnProcessor.onEndTransaction(_transaction); - } catch (DatabusException e3) - { - _log.error("Got exception in the transaction handler ",e3); - throw new DatabusRuntimeException(e3); + _transactionWriter.addTransaction(_transaction); } finally { @@ -256,7 +250,13 @@ private void reset() private void startSource(String newTableName) { Short srcId = _tableUriToSrcIdMap.get(newTableName); + _ignoreSource = null == srcId; + if (_ignoreSource) { + LOG.info("Ignoring source: " + newTableName); + return; + } + LOG.info("Starting source: " + newTableName); assert (_transaction != null); if (_transaction.getPerSourceTransaction(srcId) == null) { @@ -266,16 +266,30 @@ private void startSource(String newTableName) private void deleteRows(DeleteRowsEventV2 dre) { + if (_ignoreSource) { + LOG.info("Ignoring delete rows for " + _curSourceName); + return; + } + LOG.info("DELETE FROM " + _curSourceName); frameAvroRecord(dre.getTableId(), dre.getHeader(), dre.getRows(), DbusOpcode.DELETE); } private void deleteRows(DeleteRowsEvent dre) { + if (_ignoreSource) { + LOG.info("Ignoring delete rows for " + _curSourceName); + return; + } + LOG.info("DELETE FROM " + _curSourceName); frameAvroRecord(dre.getTableId(), dre.getHeader(), dre.getRows(), DbusOpcode.DELETE); } private void updateRows(UpdateRowsEvent ure) { + if (_ignoreSource) { + LOG.info("Ignoring update rows for " + _curSourceName); + return; + } List> lp = ure.getRows(); List lr = new ArrayList(lp.size()); for (Pair pr: lp) @@ -283,11 +297,18 @@ private void updateRows(UpdateRowsEvent ure) Row r = pr.getAfter(); lr.add(r); } - frameAvroRecord(ure.getTableId(), ure.getHeader(), lr, DbusOpcode.UPSERT); + if (lr.size() > 0) { + LOG.info("UPDATE " + _curSourceName + ": " + lr.size()); + frameAvroRecord(ure.getTableId(), ure.getHeader(), lr, DbusOpcode.UPSERT); + } } private void updateRows(UpdateRowsEventV2 ure) { + if (_ignoreSource) { + LOG.info("Ignoring update rows for " + _curSourceName); + return; + } List> lp = ure.getRows(); List lr = new ArrayList(lp.size()); for (Pair pr: lp) @@ -295,16 +316,29 @@ private void updateRows(UpdateRowsEventV2 ure) Row r = pr.getAfter(); lr.add(r); } - frameAvroRecord(ure.getTableId(), ure.getHeader(), lr, DbusOpcode.UPSERT); + if (lr.size() > 0) { + LOG.info("UPDATE " + _curSourceName + ": " + lr.size()); + frameAvroRecord(ure.getTableId(), ure.getHeader(), lr, DbusOpcode.UPSERT); + } } private void insertRows(WriteRowsEvent wre) { + if (_ignoreSource) { + LOG.info("Ignoring insert rows for " + _curSourceName); + return; + } + LOG.info("INSERT INTO " + _curSourceName); frameAvroRecord(wre.getTableId(), wre.getHeader(), wre.getRows(), DbusOpcode.UPSERT); } private void insertRows(WriteRowsEventV2 wre) { + if (_ignoreSource) { + LOG.info("Ignoring insert rows for " + _curSourceName); + return; + } + LOG.info("INSERT INTO " + _curSourceName); frameAvroRecord(wre.getTableId(), wre.getHeader(), wre.getRows(), DbusOpcode.UPSERT); } @@ -327,9 +361,9 @@ private void frameAvroRecord(long tableId, BinlogEventV4Header bh, List rl, { List cl = r.getColumns(); GenericRecord gr = new GenericData.Record(schema); - generateAvroEvent(schema, cl, gr); + generateAvroEvent(vs, cl, gr); - List kps = generateKeyPair(cl, schema); + List kps = generateKeyPair(gr, vs); DbChangeEntry db = new DbChangeEntry(scn, timestampInNanos, gr, doc, isReplicated, schema, kps); _transaction.getPerSourceTransaction(_tableUriToSrcIdMap.get(tableName)).mergeDbChangeEntrySet(db); @@ -343,54 +377,56 @@ private void frameAvroRecord(long tableId, BinlogEventV4Header bh, List rl, } } - private List generateKeyPair(List cl, Schema schema) + private List generateKeyPair(GenericRecord gr, VersionedSchema versionedSchema) throws DatabusException { - Object o = null; Schema.Type st = null; - - // Build PrimaryKeySchema - String pkFieldName = SchemaHelper.getMetaField(schema, "pk"); - if(pkFieldName == null) - { - throw new DatabusException("No primary key specified in the schema"); + List pkFieldList = versionedSchema.getPkFieldList(); + if(pkFieldList.isEmpty()) + { + String pkFieldName = SchemaHelper.getMetaField(versionedSchema.getSchema(), "pk"); + if (pkFieldName == null) + { + throw new DatabusException("No primary key specified in the schema"); + } + PrimaryKeySchema pkSchema = new PrimaryKeySchema(pkFieldName); + List fields = versionedSchema.getSchema().getFields(); + for (int i = 0; i < fields.size(); i++) + { + Schema.Field field = fields.get(i); + if (pkSchema.isPartOfPrimaryKey(field)) + { + pkFieldList.add(field); + } + } } - - PrimaryKeySchema pkSchema = new PrimaryKeySchema(pkFieldName); - List fields = schema.getFields(); List kpl = new ArrayList(); - int cnt = 0; - for(Schema.Field field : fields) + for (Field field : pkFieldList) + { + o = gr.get(field.name()); + st = field.schema().getType(); + KeyPair kp = new KeyPair(o, st); + kpl.add(kp); + } + if (kpl == null || kpl.isEmpty()) { - if (pkSchema.isPartOfPrimaryKey(field)) + String pkFieldName = SchemaHelper.getMetaField(versionedSchema.getSchema(), "pk"); + StringBuilder sb = new StringBuilder(); + for (Schema.Field f : versionedSchema.getSchema().getFields()) { - o = cl.get(cnt).getValue(); - st = field.schema().getType(); - KeyPair kp = new KeyPair(o, st); - kpl.add(kp); + sb.append(f.name()).append(","); } - cnt++; + throw new DatabusException("pk is assigned to " + pkFieldName + " but fieldList is " + sb.toString()); } - return kpl; } - private void generateAvroEvent(Schema schema, List cols, GenericRecord record) + private void generateAvroEvent(VersionedSchema vs, List cols, GenericRecord record) throws DatabusException { // Get Ordered list of field by dbFieldPosition - List orderedFields = SchemaHelper.getOrderedFieldsByMetaField(schema, "dbFieldPosition", new Comparator() { - - @Override - public int compare(String o1, String o2) - { - Integer pos1 = Integer.parseInt(o1); - Integer pos2 = Integer.parseInt(o2); - - return pos1.compareTo(pos2); - } - }); + List orderedFields = SchemaHelper.getOrderedFieldsByDBFieldPosition(vs); // Build Map if (orderedFields.size() != cols.size()) @@ -475,160 +511,14 @@ private void insertFieldIntoRecord( private Object orToAvroType(Column s, Field avroField) throws DatabusException { - if (s instanceof BitColumn) - { - // This is in byte order - BitColumn bc = (BitColumn) s; - byte[] ba = bc.getValue(); - ByteBuffer b = ByteBuffer.wrap(ba); - return b; - } - else if (s instanceof BlobColumn) - { - BlobColumn bc = (BlobColumn) s; - byte[] ba = bc.getValue(); - return ByteBuffer.wrap(ba); - } - else if (s instanceof DateColumn) - { - DateColumn dc = (DateColumn) s; - Date d = dc.getValue(); - Long l = d.getTime(); - return l; - } - else if (s instanceof DatetimeColumn) - { - DatetimeColumn dc = (DatetimeColumn) s; - Date d = dc.getValue(); - Long t1 = (d.getTime()/1000) * 1000; //Bug in OR for DateTIme and Time data-types. MilliSeconds is not available for these columns but is set with currentMillis() wrongly. - return t1; - } - else if (s instanceof DecimalColumn) - { - DecimalColumn dc = (DecimalColumn) s; - _log.info("dc Value is :" + dc.getValue()); - String s1 = dc.getValue().toString(); // Convert to string for preserving precision - _log.info("Str : " + s1); - return s1; - } - else if (s instanceof DoubleColumn) - { - DoubleColumn dc = (DoubleColumn) s; - Double d = dc.getValue(); - return d; - } - else if (s instanceof EnumColumn) - { - EnumColumn ec = (EnumColumn) s; - Integer i = ec.getValue(); - return i; - } - else if (s instanceof FloatColumn) - { - FloatColumn fc = (FloatColumn) s; - Float f = fc.getValue(); - return f; - } - else if (s instanceof Int24Column) - { - Int24Column ic = (Int24Column) s; - Integer i = ic.getValue(); - if (i < 0 && SchemaHelper.getMetaField(avroField, "dbFieldType").contains("UNSIGNED")) - { - i += ORListener.MEDIUMINT_MAX_VALUE; - } - return i; - } - else if (s instanceof LongColumn) - { - LongColumn lc = (LongColumn) s; - Long l = lc.getValue().longValue(); - if (l < 0 && SchemaHelper.getMetaField(avroField, "dbFieldType").contains("UNSIGNED")) - { - l += ORListener.INTEGER_MAX_VALUE; - } - return l; - } - else if (s instanceof LongLongColumn) - { - LongLongColumn llc = (LongLongColumn) s; - BigInteger b = new BigInteger(llc.getValue()+""); - if (b.compareTo(BigInteger.ZERO) < 0 && SchemaHelper.getMetaField(avroField, "dbFieldType").contains("UNSIGNED")) - { - b = b.add(ORListener.BIGINT_MAX_VALUE); - } - return b; - } - else if (s instanceof NullColumn) - { - return null; - } - else if (s instanceof SetColumn) - { - SetColumn sc = (SetColumn) s; - Long l = sc.getValue(); - return l; - } - else if (s instanceof ShortColumn) - { - ShortColumn sc = (ShortColumn) s; - Integer i = sc.getValue(); - if (i < 0 && SchemaHelper.getMetaField(avroField, "dbFieldType").contains("UNSIGNED")) - { - i = i + ORListener.SMALLINT_MAX_VALUE; - } - return i; - } - else if (s instanceof StringColumn) - { - StringColumn sc = (StringColumn) s; - String str = new String(sc.getValue(), Charset.defaultCharset()); - return str; - } - else if (s instanceof TimeColumn) - { - TimeColumn tc = (TimeColumn) s; - Time t = tc.getValue(); - /** - * There is a bug in OR where instead of using the default year as 1970, it is using 0070. - * This is a temporary measure to resolve it by working around at this layer. The value obtained from OR is subtracted from "0070-00-01 00:00:00" - */ - Calendar c = Calendar.getInstance(); - c.set(70, 0, 1, 0, 0, 0); - // round off the milli-seconds as TimeColumn type has only seconds granularity but Calendar implementation - // includes milli-second (System.currentTimeMillis() at the time of instantiation) - long rawVal = (c.getTimeInMillis()/1000) * 1000; - long val2 = (t.getTime()/1000) * 1000; - long offset = val2 - rawVal; - return offset; - } - else if (s instanceof TimestampColumn) - { - TimestampColumn tsc = (TimestampColumn) s; - Timestamp ts = tsc.getValue(); - Long t = ts.getTime(); - return t; - } - else if (s instanceof TinyColumn) - { - TinyColumn tc = (TinyColumn) s; - Integer i = tc.getValue(); - if (i < 0 && SchemaHelper.getMetaField(avroField, "dbFieldType").contains("UNSIGNED")) - { - i = i + ORListener.TINYINT_MAX_VALUE; - } - return i; - } - else if (s instanceof YearColumn) - { - YearColumn yc = (YearColumn) s; - Integer i = yc.getValue(); - return i; - } - else - { - throw new DatabusRuntimeException("Unknown MySQL type in the event" + s.getClass() + " Object = " + s); - } + try + { + return Or2AvroConvert.convert(s, avroField); + } + catch (Exception e) + { + throw new DatabusRuntimeException("Unknown MySQL type in the event" + s.getClass() + " Object = " + s, e); + } } /** @@ -826,4 +716,16 @@ else if (event instanceof TableMapEvent) _log.info("ORListener Thread done"); doShutdownNotify(); } + + public void shutdownAll() + { + if(this.isAlive()) + { + this.shutdown(); + } + if (_transactionWriter != null && _transactionWriter.isAlive()) + { + _transactionWriter.shutdown(); + } + } } diff --git a/databus2-relay/databus2-event-producer-or/src/main/java/com/linkedin/databus2/producers/OpenReplicatorAvroEventFactory.java b/databus2-relay/databus2-event-producer-or/src/main/java/com/linkedin/databus2/producers/OpenReplicatorAvroEventFactory.java index a6fe34d8..45745189 100644 --- a/databus2-relay/databus2-event-producer-or/src/main/java/com/linkedin/databus2/producers/OpenReplicatorAvroEventFactory.java +++ b/databus2-relay/databus2-event-producer-or/src/main/java/com/linkedin/databus2/producers/OpenReplicatorAvroEventFactory.java @@ -143,6 +143,7 @@ protected byte[] serializeEvent(GenericRecord record) catch(RuntimeException ex) { // Avro likes to throw RuntimeExceptions instead of checked exceptions when serialization fails. + _log.error("Exception for record: " + record + " with schema: " + record.getSchema().getFullName()); throw new EventCreationException("Failed to serialize the Avro GenericRecord", ex); } diff --git a/databus2-relay/databus2-event-producer-or/src/main/java/com/linkedin/databus2/producers/OpenReplicatorEventProducer.java b/databus2-relay/databus2-event-producer-or/src/main/java/com/linkedin/databus2/producers/OpenReplicatorEventProducer.java index f06d31d7..fa3675c8 100644 --- a/databus2-relay/databus2-event-producer-or/src/main/java/com/linkedin/databus2/producers/OpenReplicatorEventProducer.java +++ b/databus2-relay/databus2-event-producer-or/src/main/java/com/linkedin/databus2/producers/OpenReplicatorEventProducer.java @@ -40,7 +40,6 @@ import org.apache.log4j.Logger; import com.google.code.or.OpenReplicator; -import com.linkedin.databus.core.DatabusRuntimeException; import com.linkedin.databus.core.DatabusThreadBase; import com.linkedin.databus.core.DbusEventBufferAppendable; import com.linkedin.databus.core.UnsupportedKeyException; @@ -340,7 +339,8 @@ void initOpenReplicator(long scn) _or.setTransport(null); _or.setBinlogParser(null); - _log.info(String.format("Open Replicator starting from %s@%d", binlogFile, offset)); + _log.info("Connecting to OpenReplicator " + _or.getUser() + "@" + _or.getHost() + ":" + _or.getPort() + "/" + + _or.getBinlogFileName() + "#" + _or.getBinlogPosition()); } @Override @@ -352,11 +352,23 @@ public void run() initOpenReplicator(_sinceScn); try { - _or.start(); + boolean started = false; + while (!started) { + try { + _or.start(); + started = true; + } + catch (Exception e) { + _log.error("Failed to start OpenReplicator: " + e); + _log.warn("Sleeping for 1000 ms"); + Thread.sleep(1000); + } + } _orListener.start(); } catch (Exception e) { - throw new DatabusRuntimeException("failed to start open replicator: " + e.getMessage(), e); + _log.error("failed to start open replicator: " + e.getMessage(), e); + return; } long lastConnectMs = System.currentTimeMillis(); @@ -401,10 +413,7 @@ public void run() try { //should stop orListener first to get the final maxScn used for init open replicator. - if (_orListener.isAlive()) - { - _orListener.shutdown(); - } + _orListener.shutdownAll(); long maxScn = _maxSCNReaderWriter.getMaxScn(); _startPrevScn.set(maxScn); initOpenReplicator(maxScn); @@ -450,10 +459,7 @@ public void run() _log.error("failed to stop Open Replicator", e); } } - if (_orListener.isAlive()) - { - _orListener.shutdown(); - } + _orListener.shutdownAll(); _log.info("Event Producer Thread done"); doShutdownNotify(); @@ -466,7 +472,7 @@ public void onEndTransaction(Transaction txn) try { addTxnToBuffer(txn); - _maxSCNReaderWriter.saveMaxScn(txn.getScn()); + _maxSCNReaderWriter.saveMaxScn(txn.getIgnoredSourceScn()!=-1 ? txn.getIgnoredSourceScn() : txn.getScn()); } catch (UnsupportedKeyException e) { @@ -502,6 +508,12 @@ private void addTxnToBuffer(Transaction txn) return; } + List sources = txn.getOrderedPerSourceTransactions(); + if (0 == sources.size()) { + _log.info("Ignoring txn: " + txn); + return; + } + EventSourceStatistics globalStats = getSource(GLOBAL_SOURCE_ID).getStatisticsBean(); _eventBuffer.startEvents(); @@ -510,7 +522,7 @@ private void addTxnToBuffer(Transaction txn) long timestamp = txn.getTxnNanoTimestamp(); List summaries = new ArrayList(); - for (PerSourceTransaction t: txn.getOrderedPerSourceTransactions() ) + for (PerSourceTransaction t: sources ) { long startDbUpdatesMs = System.currentTimeMillis(); short sourceId = (short)t.getSrcId(); diff --git a/databus2-relay/databus2-event-producer-or/src/main/java/com/linkedin/databus2/producers/TransactionWriter.java b/databus2-relay/databus2-event-producer-or/src/main/java/com/linkedin/databus2/producers/TransactionWriter.java new file mode 100644 index 00000000..5f6c959c --- /dev/null +++ b/databus2-relay/databus2-event-producer-or/src/main/java/com/linkedin/databus2/producers/TransactionWriter.java @@ -0,0 +1,122 @@ +package com.linkedin.databus2.producers; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.linkedin.databus.core.DatabusRuntimeException; +import com.linkedin.databus.core.DatabusThreadBase; +import com.linkedin.databus2.producers.ORListener.TransactionProcessor; +import com.linkedin.databus2.producers.ds.Transaction; + +public class TransactionWriter extends DatabusThreadBase +{ + private final BlockingQueue transactionQueue; + private final TransactionProcessor txnProcessor; + private final long queueTimeoutMs; + + private final Logger log = LoggerFactory.getLogger(getClass()); + + public TransactionWriter(int maxQueueSize, long queueTimeoutMs, TransactionProcessor txnProcessor) + { + super("transactionWriter"); + this.txnProcessor = txnProcessor; + this.queueTimeoutMs = queueTimeoutMs; + transactionQueue = new LinkedBlockingQueue(maxQueueSize); + } + + public void addTransaction(Transaction transaction) + { + boolean isPut = false; + do + { + try + { + isPut = transactionQueue.offer(transaction, this.queueTimeoutMs, TimeUnit.MILLISECONDS); + } + catch (InterruptedException e) + { + _log.error("failed to put transaction to eventQueue,will retry!"); + } + } while (!isPut && !isShutdownRequested()); + } + + @Override + public void run() + { + List transactionList = new ArrayList(); + Transaction transaction = null; + while (!isShutdownRequested()) + { + if (isPauseRequested()) + { + LOG.info("Pause requested for TransactionWriter. Pausing !!"); + signalPause(); + LOG.info("Pausing. Waiting for resume command"); + try + { + awaitUnPauseRequest(); + } + catch (InterruptedException e) + { + _log.info("Interrupted !!"); + } + LOG.info("Resuming TransactionWriter !!"); + signalResumed(); + LOG.info("TransactionWriter resumed !!"); + } + transactionList.clear(); + int transactionNum = transactionQueue.drainTo(transactionList); + if (transactionNum == 0) + { + try + { + transaction = transactionQueue.poll(queueTimeoutMs, TimeUnit.MILLISECONDS); + if (transaction != null) + { + transactionList.add(transaction); + transactionNum = transactionList.size(); + } + } + catch (InterruptedException e) + { + log.error("Interrupted when poll from transactionEventQueue!!"); + } + } + if (transactionNum <= 0) + { + continue; + } + for (int i = 0; i < transactionNum; i++) + { + transaction = transactionList.get(i); + if (transaction == null) + { + log.error("received null transaction"); + continue; + } + try + { + txnProcessor.onEndTransaction(transaction); + } + catch (Exception e) + { + log.error("Got exception in the transaction handler ", e); + throw new DatabusRuntimeException("Got exception in the transaction handler ", e); + } + finally + { + + } + } + } + log.info("transactionWriter thread done!"); + doShutdownNotify(); + } + +} diff --git a/databus2-relay/databus2-event-producer-or/src/main/java/com/linkedin/databus2/producers/util/Or2AvroConvert.java b/databus2-relay/databus2-event-producer-or/src/main/java/com/linkedin/databus2/producers/util/Or2AvroConvert.java new file mode 100644 index 00000000..ea939bd4 --- /dev/null +++ b/databus2-relay/databus2-event-producer-or/src/main/java/com/linkedin/databus2/producers/util/Or2AvroConvert.java @@ -0,0 +1,337 @@ +package com.linkedin.databus2.producers.util; + +import java.math.BigInteger; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.Date; +import java.util.HashMap; +import java.util.Map; + +import org.apache.avro.Schema.Field; + +import com.google.code.or.common.glossary.Column; +import com.google.code.or.common.glossary.column.Int24Column; +import com.google.code.or.common.glossary.column.LongColumn; +import com.google.code.or.common.glossary.column.LongLongColumn; +import com.google.code.or.common.glossary.column.NullColumn; +import com.google.code.or.common.glossary.column.ShortColumn; +import com.google.code.or.common.glossary.column.TinyColumn; +import com.linkedin.databus2.core.DatabusException; +import com.linkedin.databus2.schemas.utils.SchemaHelper; + +public class Or2AvroConvert +{ + private static final int maxCacheSize = 10000; + protected static final Map convertCacheMap = new HashMap(maxCacheSize); + + public static final int TINYINT_MAX_VALUE = 256; + public static final int SMALLINT_MAX_VALUE = 65536; + public static final int MEDIUMINT_MAX_VALUE = 16777216; + public static final long INTEGER_MAX_VALUE = 4294967296L; + public static final BigInteger BIGINT_MAX_VALUE = new BigInteger("18446744073709551616"); + + public static Object convert(Column s, Field avroField) throws DatabusException + { + if (s instanceof NullColumn) + { + return null; + } + Or2AvroBasicConvert converter = fetchConverter(avroField); + return converter.convert(s, avroField); + } + + private static Or2AvroBasicConvert fetchConverter(Field avroField) throws DatabusException + { + Or2AvroBasicConvert converter = convertCacheMap.get(avroField); + if (converter != null) + { + return converter; + } + synchronized (convertCacheMap) + { + converter = convertCacheMap.get(avroField); + if (converter != null) + { + return converter; + } + if (convertCacheMap.size() > maxCacheSize) + { + convertCacheMap.clear(); + } + String schemaStr = avroField.schema().toString(); + if (schemaStr.contains("int")) + { + converter = IntegerConverter.fetchInstance(); + } + else if (schemaStr.contains("long")) + { + converter = LongConverter.fetchInstance(); + } + else if (schemaStr.contains("double")) + { + converter = DoubleConverter.fetchInstance(); + } + else if (schemaStr.contains("string")) + { + converter = StringConverter.fetchInstance(); + } + else if (schemaStr.contains("bytes")) + { + converter = BytesConverter.fetchInstance(); + } + else if (schemaStr.contains("float")) + { + converter = FloatConverter.fetchInstance(); + } + if (converter == null) + { + throw new DatabusException("schema is " + schemaStr + ", converter not exist!"); + } + convertCacheMap.put(avroField, converter); + return converter; + } + } + + public static abstract class Or2AvroBasicConvert + { + public abstract Object convert(Column s, Field avroField) throws DatabusException; + } + + public static class IntegerConverter extends Or2AvroBasicConvert + { + private static IntegerConverter converter = null; + + public static IntegerConverter fetchInstance() + { + if (converter == null) + { + converter = new IntegerConverter(); + } + return converter; + } + + @Override + public Object convert(Column s, Field avroField) throws DatabusException + { + Object obj = s.getValue(); + Integer res = null; + if(obj instanceof Number) + { + res = ((Number) obj).intValue(); + } + else + { + throw new DatabusException((obj == null ? "" : (obj.getClass() + " | " + obj)) + "can't be converted into Integer"); + } + if(res.intValue() >= 0) + { + return res.intValue(); + } + return res.intValue() + (int) unsignedOffset(s, avroField); + } + } + + public static class LongConverter extends Or2AvroBasicConvert + { + private static LongConverter converter = null; + + public static LongConverter fetchInstance() + { + if (converter == null) + { + converter = new LongConverter(); + } + return converter; + } + + @Override + public Object convert(Column s, Field avroField) throws DatabusException + { + Object obj = s.getValue(); + Long res = null; + if (obj instanceof Date) + { + return ((Date) obj).getTime(); + } + else if (obj instanceof Number) + { + res = ((Number) obj).longValue(); + } + else + { + throw new DatabusException((obj == null ? "" : (obj.getClass() + " | " + obj)) + " can't be converted into Long"); + } + if (res.longValue() >= 0) + { + return res.longValue(); + } + return res.longValue() + unsignedOffset(s, avroField); + } + } + + public static class DoubleConverter extends Or2AvroBasicConvert + { + private static DoubleConverter converter = null; + + public static DoubleConverter fetchInstance() + { + if (converter == null) + { + converter = new DoubleConverter(); + } + return converter; + } + + @Override + public Object convert(Column s, Field avroField) throws DatabusException + { + Object obj = s.getValue(); + Double res = null; + if (obj instanceof Number) + { + res = ((Number) obj).doubleValue(); + } + else + { + throw new DatabusException((obj == null ? "" : (obj.getClass() + " | " + obj)) + " can't be converted into Double"); + } + return res; + } + } + + public static class StringConverter extends Or2AvroBasicConvert + { + private static StringConverter converter = null; + + public static StringConverter fetchInstance() + { + if (converter == null) + { + converter = new StringConverter(); + } + return converter; + } + + @Override + public Object convert(Column s, Field avroField) + { + if (s.getValue() instanceof byte[]) + { + return new String((byte[]) s.getValue(), Charset.defaultCharset()); + } + return s.getValue().toString();//s.getValue will never be null + } + } + + public static class BytesConverter extends Or2AvroBasicConvert + { + private static BytesConverter converter = null; + + public static BytesConverter fetchInstance() + { + if (converter == null) + { + converter = new BytesConverter(); + } + return converter; + } + + @Override + public Object convert(Column s, Field avroField) throws DatabusException + { + if (!(s.getValue() instanceof byte[])) + { + throw new DatabusException( + avroField.name() + " is assigned to be bytes array, but it can't be converted to byte array | " + + avroField.schema().toString()); + } + byte[] byteArr = (byte[]) s.getValue(); + return ByteBuffer.wrap(byteArr); + } + } + + public static class FloatConverter extends Or2AvroBasicConvert + { + private static FloatConverter converter = null; + + public static FloatConverter fetchInstance() + { + if (converter == null) + { + converter = new FloatConverter(); + } + return converter; + } + + @Override + public Object convert(Column s, Field avroField) throws DatabusException + { + Object obj = s.getValue(); + Float res = null; + if (obj instanceof Number) + { + res = ((Number) obj).floatValue(); + } + else + { + throw new DatabusException((obj == null ? "" : (obj.getClass() + " | " + obj)) + "can't be converted to Float"); + } + return res; + } + } + + private final static long unsignedOffset(Column s, Field avroField) + { + if (s instanceof Int24Column) + { + Int24Column lc = (Int24Column) s; + int i = lc.getValue().intValue(); + if (i < 0 && SchemaHelper.getMetaField(avroField, "dbFieldType").contains("UNSIGNED")) + { + return MEDIUMINT_MAX_VALUE; + } + return 0L; + } + else if (s instanceof LongColumn) + { + LongColumn lc = (LongColumn) s; + Long i = lc.getValue().longValue(); + if (i < 0 && SchemaHelper.getMetaField(avroField, "dbFieldType").contains("UNSIGNED")) + { + return INTEGER_MAX_VALUE; + } + return 0L; + } + else if (s instanceof LongLongColumn) + { + LongLongColumn llc = (LongLongColumn) s; + Long l = llc.getValue(); + if (l < 0 && SchemaHelper.getMetaField(avroField, "dbFieldType").contains("UNSIGNED")) + { + return BIGINT_MAX_VALUE.longValue(); + } + return 0L; + } + else if (s instanceof ShortColumn) + { + ShortColumn sc = (ShortColumn) s; + Integer i = sc.getValue(); + if (i < 0 && SchemaHelper.getMetaField(avroField, "dbFieldType").contains("UNSIGNED")) + { + return SMALLINT_MAX_VALUE; + } + return 0L; + } + else if (s instanceof TinyColumn) + { + TinyColumn tc = (TinyColumn) s; + Integer i = tc.getValue(); + if (i < 0 && SchemaHelper.getMetaField(avroField, "dbFieldType").contains("UNSIGNED")) + { + return TINYINT_MAX_VALUE; + } + return 0L; + } + return 0; + } +} diff --git a/databus2-relay/databus2-relay-impl/src/main/java/com/linkedin/databus/container/request/RegisterRequestProcessor.java b/databus2-relay/databus2-relay-impl/src/main/java/com/linkedin/databus/container/request/RegisterRequestProcessor.java index 9a0f73ad..17c1f1c0 100644 --- a/databus2-relay/databus2-relay-impl/src/main/java/com/linkedin/databus/container/request/RegisterRequestProcessor.java +++ b/databus2-relay/databus2-relay-impl/src/main/java/com/linkedin/databus/container/request/RegisterRequestProcessor.java @@ -33,6 +33,7 @@ import org.codehaus.jackson.map.ObjectMapper; import com.linkedin.databus.container.netty.HttpRelay; import com.linkedin.databus.core.data_model.LogicalSource; +import com.linkedin.databus.core.util.CompressUtil; import com.linkedin.databus2.core.DatabusException; import com.linkedin.databus2.core.container.ChunkedWritableByteChannel; import com.linkedin.databus2.core.container.DatabusHttpHeaders; @@ -191,9 +192,15 @@ public DatabusRequest process(DatabusRequest request) throws IOException, { mapper.writeValue(out, registeredSources); } + String outStr = out.toString(); + String compress = request.getParams().getProperty(DatabusHttpHeaders.PROTOCOL_COMPRESS_PARAM); + if ("true".equals(compress)) + { + outStr = CompressUtil.compress(outStr); + } ChunkedWritableByteChannel responseContent = request.getResponseContent(); - byte[] resultBytes = out.toString().getBytes(Charset.defaultCharset()); + byte[] resultBytes = outStr.getBytes(Charset.defaultCharset()); responseContent.addMetadata(DatabusHttpHeaders.DBUS_CLIENT_RELAY_PROTOCOL_VERSION_HDR, registerResponseProtocolVersion); responseContent.write(ByteBuffer.wrap(resultBytes)); diff --git a/sandbox-repo/com/google/open-replicator/1.0.7/open-replicator-1.0.7-sources.jar b/sandbox-repo/com/google/open-replicator/1.0.7/open-replicator-1.0.7-sources.jar new file mode 100644 index 00000000..28088c88 Binary files /dev/null and b/sandbox-repo/com/google/open-replicator/1.0.7/open-replicator-1.0.7-sources.jar differ diff --git a/sandbox-repo/com/google/open-replicator/1.0.7/open-replicator-1.0.7.jar b/sandbox-repo/com/google/open-replicator/1.0.7/open-replicator-1.0.7.jar new file mode 100644 index 00000000..7101837d Binary files /dev/null and b/sandbox-repo/com/google/open-replicator/1.0.7/open-replicator-1.0.7.jar differ diff --git a/subprojects.gradle b/subprojects.gradle index f236874f..9f08fa69 100644 --- a/subprojects.gradle +++ b/subprojects.gradle @@ -18,7 +18,7 @@ ext.externalDependency = [ 'commonsBeanutils': 'commons-beanutils:commons-beanutils:1.7.0', 'commonsCli': 'commons-cli:commons-cli:1.2', 'commonsCodec': 'commons-codec:commons-codec:1.6', - 'commonsCollections': 'commons-collections:commons-collections:3.2.1', + 'commonsCollections': 'commons-collections:commons-collections:3.2.2', 'commonsIo': 'commons-io:commons-io:1.4', 'commonsLang': 'commons-lang:commons-lang:2.5', 'commonsLogging': 'commons-logging:commons-logging:1.1', @@ -26,7 +26,7 @@ ext.externalDependency = [ 'commonsStats': 'org.apache.commons:commons-math3:3.2', 'easymock': 'org.easymock:easymock:3.1', 'easymockext': 'org.easymock:easymockclassextension:3.1', - 'guava': 'com.google.guava:guava:10.0', + 'guava': 'com.google.guava:guava:22.0', 'hadoopCore': 'org.apache.hadoop:hadoop-core:0.20.2', 'jacksonCoreAsl': 'org.codehaus.jackson:jackson-core-asl:1.8.5', 'jacksonMapperAsl': 'org.codehaus.jackson:jackson-mapper-asl:1.8.5', @@ -47,13 +47,14 @@ ext.externalDependency = [ 'zookeeper': 'org.apache.zookeeper:zookeeper:3.3.3', 'ojdbc6': 'com.oracle:ojdbc6:11.2.0.2.0', 'helixCore': 'org.apache.helix:helix-core:0.6.2.0', - 'or': 'com.linkedin.dds-mysql:open-replicator-impl:1.0.63' + 'or': 'com.linkedin.dds-mysql:open-replicator-impl:1.0.63', + 'c3p0': 'com.mchange:c3p0:0.9.5' ]; if (isDefaultEnvironment) { externalDependency['mysqlConnectorJava'] = 'mysql:mysql-connector-java:5.1.14' externalDependency['helixCore'] = 'org.apache.helix:helix-core:0.6.2-incubating' - externalDependency['or'] = 'com.google:open-replicator:1.0.5' + externalDependency['or'] = 'com.google:open-replicator:1.0.7' externalDependency['log4j'] = 'org.slf4j:slf4j-log4j12:1.6.1' }