Skip to content

Commit

Permalink
Merge branch 'release/0.5.1'
Browse files Browse the repository at this point in the history
  • Loading branch information
ianoc committed Nov 26, 2014
2 parents 8d47172 + 8432ace commit fa8de69
Show file tree
Hide file tree
Showing 8 changed files with 132 additions and 42 deletions.
7 changes: 7 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# chill #

### 0.5.1
* reverse Config contains logic: https://github.com/twitter/chill/pull/205
* fix setConf: https://github.com/twitter/chill/pull/204
* fix default constructor for kryo serialization: https://github.com/twitter/chill/pull/203
* Switched Chill Avro to use ClassTags, added Java unit tests: https://github.com/twitter/chill/pull/200
* Enable cross compilation for chill-avro: https://github.com/twitter/chill/pull/202

### 0.5.0
* Move to211: https://github.com/twitter/chill/pull/197
* Make 2.10.4 the default, move to scalatest: https://github.com/twitter/chill/pull/196
Expand Down
16 changes: 1 addition & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -147,21 +147,7 @@ Discussion occurs primarily on the [Chill mailing list](https://groups.google.co

## Maven

Chill modules are available on Maven Central. The current groupid and version for all modules is, respectively, `"com.twitter"` and `0.3.6`.

Current published artifacts are

* `chill-java`
* `chill-storm`
* `chill-hadoop`
* `chill_2.9.3`
* `chill_2.10`
* `chill-bijection_2.9.3`
* `chill-bijection_2.10`
* `chill-akka_2.9.3`
* `chill-akka_2.10`

The suffix denotes the scala version.
Chill modules are available on Maven Central. The current groupid and version for all modules is, respectively, `"com.twitter"` and `0.5.1` and each scala project is published for `2.10` and `2.11`. Search [search.maven.org](http://search.maven.org/#search%7Cga%7C1%7Cchill) when in doubt.

## Authors

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,38 +14,39 @@ limitations under the License.
*/
package com.twitter.chill.avro

import org.apache.avro.specific.SpecificRecordBase
import com.twitter.chill.{ InjectiveSerializer, KSerializer }
import com.twitter.bijection.Injection
import com.twitter.bijection.avro.{ GenericAvroCodecs, SpecificAvroCodecs }
import com.twitter.chill.{ InjectiveSerializer, KSerializer }
import org.apache.avro.Schema
import com.twitter.bijection.Injection
import org.apache.avro.generic.GenericData.Record
import org.apache.avro.generic.GenericRecord
import org.apache.avro.specific.SpecificRecordBase

import scala.reflect.ClassTag

/**
* @author Mansur Ashraf
* @since 2/9/14.
*/
object AvroSerializer {

def SpecificRecordSerializer[T <: SpecificRecordBase: Manifest]: KSerializer[T] = {
def SpecificRecordSerializer[T <: SpecificRecordBase: ClassTag]: KSerializer[T] = {
implicit val inj = SpecificAvroCodecs[T]
InjectiveSerializer.asKryo
}

def SpecificRecordBinarySerializer[T <: SpecificRecordBase: Manifest]: KSerializer[T] = {
def SpecificRecordBinarySerializer[T <: SpecificRecordBase: ClassTag]: KSerializer[T] = {
implicit val inj = SpecificAvroCodecs.toBinary[T]
InjectiveSerializer.asKryo
}

def SpecificRecordJsonSerializer[T <: SpecificRecordBase: Manifest](schema: Schema): KSerializer[T] = {
def SpecificRecordJsonSerializer[T <: SpecificRecordBase: ClassTag](schema: Schema): KSerializer[T] = {
import com.twitter.bijection.StringCodec.utf8
implicit val inj = SpecificAvroCodecs.toJson[T](schema)
implicit val avroToArray = Injection.connect[T, String, Array[Byte]]
InjectiveSerializer.asKryo
}

def GenericRecordSerializer[T <: GenericRecord: Manifest](schema: Schema = null): KSerializer[T] = {
def GenericRecordSerializer[T <: GenericRecord: ClassTag](schema: Schema = null): KSerializer[T] = {
implicit val inj = GenericAvroCodecs[T](schema)
InjectiveSerializer.asKryo
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package com.twitter.chill.avro;

import avro.FiscalRecord;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.twitter.chill.KryoInstantiator;
import com.twitter.chill.KryoPool;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecordBuilder;
import org.junit.Before;
import org.junit.Test;
import org.objenesis.strategy.StdInstantiatorStrategy;
import scala.reflect.ClassTag;

import static org.junit.Assert.assertEquals;

public class AvroSerializerJavaTest {

private Schema schema;
private GenericData.Record user;
private FiscalRecord fiscalRecord;

@Before
public void setUp() throws Exception {

schema = SchemaBuilder
.record("person")
.fields()
.name("name").type().stringType().noDefault()
.name("ID").type().intType().noDefault()
.endRecord();


user = new GenericRecordBuilder(schema)
.set("name", "Jeff")
.set("ID", 1)
.build();


fiscalRecord = FiscalRecord.newBuilder().setCalendarDate("2012-01-01").setFiscalWeek(1).setFiscalYear(2012).build();

}

public <T> KryoPool getKryo(final ClassTag<T> tag, final Serializer<T> serializer){
KryoInstantiator kryoInstantiator = new KryoInstantiator() {
public Kryo newKryo() {
Kryo k =super.newKryo();
k.setInstantiatorStrategy(new StdInstantiatorStrategy());
k.register(tag.runtimeClass(), serializer);
return k;
}
};

return KryoPool.withByteArrayOutputStream(1, kryoInstantiator);
}
@Test
public void testSpecificRecordSerializer() throws Exception {
ClassTag<FiscalRecord> tag = getClassTag(FiscalRecord.class);
KryoPool kryo = getKryo(tag, AvroSerializer$.MODULE$.SpecificRecordSerializer(tag));
byte[] bytes = kryo.toBytesWithClass(fiscalRecord);
FiscalRecord result = (FiscalRecord) kryo.fromBytes(bytes);
assertEquals(fiscalRecord,result);
}

@Test
public void SpecificRecordBinarySerializer() throws Exception {
ClassTag<FiscalRecord> tag = getClassTag(FiscalRecord.class);
KryoPool kryo = getKryo(tag, AvroSerializer$.MODULE$.SpecificRecordBinarySerializer(tag));
byte[] bytes = kryo.toBytesWithClass(fiscalRecord);
FiscalRecord result = (FiscalRecord) kryo.fromBytes(bytes);
assertEquals(fiscalRecord,result);
}


@Test
public void testGenericRecord() throws Exception {
ClassTag<GenericData.Record> tag = getClassTag(GenericData.Record.class);
KryoPool kryo = getKryo(tag, AvroSerializer$.MODULE$.GenericRecordSerializer(schema,tag));
byte[] userBytes = kryo.toBytesWithClass(user);
GenericData.Record userResult = (GenericData.Record) kryo.fromBytes(userBytes);
assertEquals(userResult.get("name").toString(),"Jeff");
assertEquals(userResult.get("ID"),1);
assertEquals(user.toString(), userResult.toString());

}

private <T> ClassTag<T> getClassTag(Class<T> klass) {
return scala.reflect.ClassTag$.MODULE$.apply(klass);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@ import org.apache.avro.generic.GenericRecordBuilder
import org.apache.avro.SchemaBuilder
import org.apache.avro.generic.GenericData.Record

import scala.reflect.ClassTag

/**
* @author Mansur Ashraf
* @since 2/9/14.
*/
object AvroSerializerSpec extends WordSpec with Matchers {
class AvroSerializerSpec extends WordSpec with Matchers {

def getKryo[T: Manifest](k: KSerializer[T]) = {
def getKryo[T: ClassTag](k: KSerializer[T]) = {
val inst = {
() => (new ScalaKryoInstantiator).newKryo.forClass(k)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public class KryoSerialization extends Configured implements Serialization<Objec
* It will first call this, then setConf.
*/
public KryoSerialization() {
this(new Configuration());
super();
}

/**
Expand All @@ -65,15 +65,18 @@ public KryoSerialization( Configuration conf ) {

@Override
public void setConf(Configuration conf) {
try {
KryoInstantiator kryoInst = new ConfiguredInstantiator(new HadoopConfig(conf));
testKryo = kryoInst.newKryo();
kryoPool = KryoPool.withByteArrayOutputStream(MAX_CACHED_KRYO, kryoInst);
}
catch(ConfigurationException cx) {
// This interface can't throw
throw new RuntimeException(cx);
}
// null check is to handle when calling the defaul constructor, in Configured, it calls super which calls setConf with a null Configuration
if (conf != null) {
try {
KryoInstantiator kryoInst = new ConfiguredInstantiator(new HadoopConfig(conf));
testKryo = kryoInst.newKryo();
kryoPool = KryoPool.withByteArrayOutputStream(MAX_CACHED_KRYO, kryoInst);
}
catch(ConfigurationException cx) {
// This interface can't throw
throw new RuntimeException(cx);
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public String getOrElse(String key, String def) {
}

public boolean contains(String key) {
return get(key) == null;
return get(key) != null;
}
public Boolean getBoolean(String key) {
String bval = get(key);
Expand Down
11 changes: 5 additions & 6 deletions project/Build.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ object ChillBuild extends Build {

val sharedSettings = Project.defaultSettings ++ mimaDefaultSettings ++ scalariformSettings ++ Seq(

version := "0.5.0",
version := "0.5.1",
organization := "com.twitter",
scalaVersion := "2.10.4",
crossScalaVersions := Seq("2.10.4", "2.11.2"),
Expand Down Expand Up @@ -125,8 +125,8 @@ object ChillBuild extends Build {
Some(subProj)
.filterNot(unreleasedModules.contains(_))
.map { s =>
val suffix = if (javaOnly.contains(s)) "" else "_2.9.3"
"com.twitter" % ("chill-" + s + suffix) % "0.3.6"
val suffix = if (javaOnly.contains(s)) "" else "_2.10"
"com.twitter" % ("chill-" + s + suffix) % "0.5.1"
}

def module(name: String) = {
Expand Down Expand Up @@ -225,10 +225,9 @@ object ChillBuild extends Build {
).dependsOn(chillJava)

lazy val chillAvro = module("avro").settings(
crossPaths := false,
autoScalaLibrary := false,
libraryDependencies ++= Seq(
"com.twitter" %% "bijection-avro" % "0.7.0"
"com.twitter" %% "bijection-avro" % "0.7.0",
"junit" % "junit" % "4.5" % "test"
)
).dependsOn(chill,chillJava, chillBijection)

Expand Down

0 comments on commit fa8de69

Please sign in to comment.