Skip to content

Commit

Permalink
add int unsigned col type support
Browse files Browse the repository at this point in the history
  • Loading branch information
yangzhichao committed Aug 11, 2016
1 parent 237cb1c commit ff10541
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,17 @@ public enum AvroPrimitiveTypes
BLOB("bytes"),
ARRAY("array"),
TABLE("record"),
XMLTYPE("string");
XMLTYPE("string"),
TINYINT("int"),
TINYINT_UNSIGNED("int"),
SMALLINT("int"),
SMALLINT_UNSIGNED("int"),
MEDIUMINT("int"),
MEDIUMINT_UNSIGNED("int"),
INT("int"),
INT_UNSIGNED("int"),
BIGINT("long"),
BIGINT_UNSIGNED("long");

private final String _avroType;
private AvroPrimitiveTypes(String avroType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,28 @@ public class SimpleTypeInfo

public SimpleTypeInfo(String name, int precision, int scale)
{
if(name.equals("NUMBER"))
boolean unsigned = false;
if (name.contains(" "))
{
String nameArr[] = name.split(" ");
if (nameArr[1].trim().toLowerCase().equals("unsigned"))
{
unsigned = true;
}
name = nameArr[0];
}
if (name.toLowerCase().equals("integer") || name.toLowerCase().endsWith("int"))
{
if ((precision > 9) || (precision == 0))
{
_type = unsigned ? AvroPrimitiveTypes.BIGINT_UNSIGNED : AvroPrimitiveTypes.BIGINT;
}
else
{
_type = AvroPrimitiveTypes.valueOf((name + (unsigned ? "_unsigned" : "")).toUpperCase());
}
}
else if(name.equals("NUMBER"))
{
if(scale > 0)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.linkedin.databus2.producers;

import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.sql.Time;
Expand All @@ -16,6 +17,7 @@
import java.util.concurrent.TimeUnit;

import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.log4j.Logger;
Expand Down Expand Up @@ -138,6 +140,13 @@ public interface TransactionProcessor

/** Milli sec timeout for _binlogEventQueue operation **/
private long _queueTimeoutMs = 100L;

/** correct unsigned int type*/
public static final int TINYINT_MAX_VALUE = 256;
public static final int SMALLINT_MAX_VALUE = 65536;
public static final int MEDIUMINT_MAX_VALUE = 16777216;
public static final long INTEGER_MAX_VALUE = 4294967296L;
public static final BigInteger BIGINT_MAX_VALUE = new BigInteger("18446744073709551616");

public ORListener(String name,
int currentFileNumber,
Expand Down Expand Up @@ -444,7 +453,7 @@ private void insertFieldIntoRecord(
try
{
if (! isFieldNull)
fieldValueObj = orToAvroType(fieldValue);
fieldValueObj = orToAvroType(fieldValue, avroField);
else
fieldValueObj = null;

Expand All @@ -461,8 +470,9 @@ private void insertFieldIntoRecord(
/**
* Given a OR Column, it returns a corresponding Java object that can be inserted into
* AVRO record
* @param avroField
*/
private Object orToAvroType(Column s)
private Object orToAvroType(Column s, Field avroField)
throws DatabusException
{
if (s instanceof BitColumn)
Expand Down Expand Up @@ -523,19 +533,31 @@ else if (s instanceof Int24Column)
{
Int24Column ic = (Int24Column) s;
Integer i = ic.getValue();
if (i < 0 && SchemaHelper.getMetaField(avroField, "dbFieldType").contains("UNSIGNED"))
{
i += ORListener.MEDIUMINT_MAX_VALUE;
}
return i;
}
else if (s instanceof LongColumn)
{
LongColumn lc = (LongColumn) s;
Integer i = lc.getValue();
return i;
Long l = lc.getValue().longValue();
if (l < 0 && SchemaHelper.getMetaField(avroField, "dbFieldType").contains("UNSIGNED"))
{
l += ORListener.INTEGER_MAX_VALUE;
}
return l;
}
else if (s instanceof LongLongColumn)
{
LongLongColumn llc = (LongLongColumn) s;
Long l = llc.getValue();
return l;
BigInteger b = new BigInteger(llc.getValue()+"");
if (b.compareTo(BigInteger.ZERO) < 0 && SchemaHelper.getMetaField(avroField, "dbFieldType").contains("UNSIGNED"))
{
b = b.add(ORListener.BIGINT_MAX_VALUE);
}
return b;
}
else if (s instanceof NullColumn)
{
Expand All @@ -551,6 +573,10 @@ else if (s instanceof ShortColumn)
{
ShortColumn sc = (ShortColumn) s;
Integer i = sc.getValue();
if (i < 0 && SchemaHelper.getMetaField(avroField, "dbFieldType").contains("UNSIGNED"))
{
i = i + ORListener.SMALLINT_MAX_VALUE;
}
return i;
}
else if (s instanceof StringColumn)
Expand Down Expand Up @@ -587,6 +613,10 @@ else if (s instanceof TinyColumn)
{
TinyColumn tc = (TinyColumn) s;
Integer i = tc.getValue();
if (i < 0 && SchemaHelper.getMetaField(avroField, "dbFieldType").contains("UNSIGNED"))
{
i = i + ORListener.TINYINT_MAX_VALUE;
}
return i;
}
else if (s instanceof YearColumn)
Expand Down

0 comments on commit ff10541

Please sign in to comment.