From ff10541f3cff16d62cc26deb75ae9c48f7237fb3 Mon Sep 17 00:00:00 2001 From: yangzhichao Date: Thu, 11 Aug 2016 10:37:07 +0800 Subject: [PATCH] add int unsigned col type support --- .../databus/util/AvroPrimitiveTypes.java | 12 +++++- .../linkedin/databus/util/SimpleTypeInfo.java | 23 +++++++++- .../databus2/producers/ORListener.java | 42 ++++++++++++++++--- 3 files changed, 69 insertions(+), 8 deletions(-) diff --git a/databus-util-cmdline/databus-util-cmdline-impl/src/main/java/com/linkedin/databus/util/AvroPrimitiveTypes.java b/databus-util-cmdline/databus-util-cmdline-impl/src/main/java/com/linkedin/databus/util/AvroPrimitiveTypes.java index ad54fe3c..43a0a753 100644 --- a/databus-util-cmdline/databus-util-cmdline-impl/src/main/java/com/linkedin/databus/util/AvroPrimitiveTypes.java +++ b/databus-util-cmdline/databus-util-cmdline-impl/src/main/java/com/linkedin/databus/util/AvroPrimitiveTypes.java @@ -44,7 +44,17 @@ public enum AvroPrimitiveTypes BLOB("bytes"), ARRAY("array"), TABLE("record"), - XMLTYPE("string"); + XMLTYPE("string"), + TINYINT("int"), + TINYINT_UNSIGNED("int"), + SMALLINT("int"), + SMALLINT_UNSIGNED("int"), + MEDIUMINT("int"), + MEDIUMINT_UNSIGNED("int"), + INT("int"), + INT_UNSIGNED("int"), + BIGINT("long"), + BIGINT_UNSIGNED("long"); private final String _avroType; private AvroPrimitiveTypes(String avroType) diff --git a/databus-util-cmdline/databus-util-cmdline-impl/src/main/java/com/linkedin/databus/util/SimpleTypeInfo.java b/databus-util-cmdline/databus-util-cmdline-impl/src/main/java/com/linkedin/databus/util/SimpleTypeInfo.java index 51f858c2..2fd8e992 100644 --- a/databus-util-cmdline/databus-util-cmdline-impl/src/main/java/com/linkedin/databus/util/SimpleTypeInfo.java +++ b/databus-util-cmdline/databus-util-cmdline-impl/src/main/java/com/linkedin/databus/util/SimpleTypeInfo.java @@ -30,7 +30,28 @@ public class SimpleTypeInfo public SimpleTypeInfo(String name, int precision, int scale) { - if(name.equals("NUMBER")) + boolean unsigned = false; + if (name.contains(" ")) + { + String nameArr[] = name.split(" "); + if (nameArr[1].trim().toLowerCase().equals("unsigned")) + { + unsigned = true; + } + name = nameArr[0]; + } + if (name.toLowerCase().equals("integer") || name.toLowerCase().endsWith("int")) + { + if ((precision > 9) || (precision == 0)) + { + _type = unsigned ? AvroPrimitiveTypes.BIGINT_UNSIGNED : AvroPrimitiveTypes.BIGINT; + } + else + { + _type = AvroPrimitiveTypes.valueOf((name + (unsigned ? "_unsigned" : "")).toUpperCase()); + } + } + else if(name.equals("NUMBER")) { if(scale > 0) { diff --git a/databus2-relay/databus2-event-producer-or/src/main/java/com/linkedin/databus2/producers/ORListener.java b/databus2-relay/databus2-event-producer-or/src/main/java/com/linkedin/databus2/producers/ORListener.java index c6f33d7b..6c0db42d 100644 --- a/databus2-relay/databus2-event-producer-or/src/main/java/com/linkedin/databus2/producers/ORListener.java +++ b/databus2-relay/databus2-event-producer-or/src/main/java/com/linkedin/databus2/producers/ORListener.java @@ -1,5 +1,6 @@ package com.linkedin.databus2.producers; +import java.math.BigInteger; import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.sql.Time; @@ -16,6 +17,7 @@ import java.util.concurrent.TimeUnit; import org.apache.avro.Schema; +import org.apache.avro.Schema.Field; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.log4j.Logger; @@ -138,6 +140,13 @@ public interface TransactionProcessor /** Milli sec timeout for _binlogEventQueue operation **/ private long _queueTimeoutMs = 100L; + + /** correct unsigned int type*/ + public static final int TINYINT_MAX_VALUE = 256; + public static final int SMALLINT_MAX_VALUE = 65536; + public static final int MEDIUMINT_MAX_VALUE = 16777216; + public static final long INTEGER_MAX_VALUE = 4294967296L; + public static final BigInteger BIGINT_MAX_VALUE = new BigInteger("18446744073709551616"); public ORListener(String name, int currentFileNumber, @@ -444,7 +453,7 @@ private void insertFieldIntoRecord( try { if (! isFieldNull) - fieldValueObj = orToAvroType(fieldValue); + fieldValueObj = orToAvroType(fieldValue, avroField); else fieldValueObj = null; @@ -461,8 +470,9 @@ private void insertFieldIntoRecord( /** * Given a OR Column, it returns a corresponding Java object that can be inserted into * AVRO record + * @param avroField */ - private Object orToAvroType(Column s) + private Object orToAvroType(Column s, Field avroField) throws DatabusException { if (s instanceof BitColumn) @@ -523,19 +533,31 @@ else if (s instanceof Int24Column) { Int24Column ic = (Int24Column) s; Integer i = ic.getValue(); + if (i < 0 && SchemaHelper.getMetaField(avroField, "dbFieldType").contains("UNSIGNED")) + { + i += ORListener.MEDIUMINT_MAX_VALUE; + } return i; } else if (s instanceof LongColumn) { LongColumn lc = (LongColumn) s; - Integer i = lc.getValue(); - return i; + Long l = lc.getValue().longValue(); + if (l < 0 && SchemaHelper.getMetaField(avroField, "dbFieldType").contains("UNSIGNED")) + { + l += ORListener.INTEGER_MAX_VALUE; + } + return l; } else if (s instanceof LongLongColumn) { LongLongColumn llc = (LongLongColumn) s; - Long l = llc.getValue(); - return l; + BigInteger b = new BigInteger(llc.getValue()+""); + if (b.compareTo(BigInteger.ZERO) < 0 && SchemaHelper.getMetaField(avroField, "dbFieldType").contains("UNSIGNED")) + { + b = b.add(ORListener.BIGINT_MAX_VALUE); + } + return b; } else if (s instanceof NullColumn) { @@ -551,6 +573,10 @@ else if (s instanceof ShortColumn) { ShortColumn sc = (ShortColumn) s; Integer i = sc.getValue(); + if (i < 0 && SchemaHelper.getMetaField(avroField, "dbFieldType").contains("UNSIGNED")) + { + i = i + ORListener.SMALLINT_MAX_VALUE; + } return i; } else if (s instanceof StringColumn) @@ -587,6 +613,10 @@ else if (s instanceof TinyColumn) { TinyColumn tc = (TinyColumn) s; Integer i = tc.getValue(); + if (i < 0 && SchemaHelper.getMetaField(avroField, "dbFieldType").contains("UNSIGNED")) + { + i = i + ORListener.TINYINT_MAX_VALUE; + } return i; } else if (s instanceof YearColumn)