From a2f00089637f810d2cdf7c2d08a07e63a42f3150 Mon Sep 17 00:00:00 2001 From: Eren Avsarogullari Date: Mon, 12 Feb 2018 18:10:20 +0000 Subject: [PATCH] Add Sink Trait Support (#2726) --- .../twitter/heron/streamlet/scala/Sink.scala | 32 +++++++++ .../heron/streamlet/scala/Streamlet.scala | 2 +- .../heron/streamlet/scala/SinkTest.scala | 65 +++++++++++++++++++ .../streamlet/scala/common/TestContext.scala | 4 +- 4 files changed, 99 insertions(+), 4 deletions(-) create mode 100644 heron/api/src/scala/com/twitter/heron/streamlet/scala/Sink.scala create mode 100644 heron/api/tests/scala/com/twitter/heron/streamlet/scala/SinkTest.scala diff --git a/heron/api/src/scala/com/twitter/heron/streamlet/scala/Sink.scala b/heron/api/src/scala/com/twitter/heron/streamlet/scala/Sink.scala new file mode 100644 index 00000000000..45be8c35562 --- /dev/null +++ b/heron/api/src/scala/com/twitter/heron/streamlet/scala/Sink.scala @@ -0,0 +1,32 @@ +// Copyright 2018 Twitter. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.twitter.heron.streamlet.scala + +import java.io.Serializable +import com.twitter.heron.streamlet.Context + +/** + * Sink is how Streamlet's end. The put method invocation consumes the tuple into say + * external database/cache, etc. setup/cleanup is where the sink can do any one time setup work, + * like establishing/closing connection to sources, etc. + */ +trait Sink[T] extends Serializable { + + def setup(context: Context): Unit + + def put(tuple: T): Unit + + def cleanup(): Unit + +} diff --git a/heron/api/src/scala/com/twitter/heron/streamlet/scala/Streamlet.scala b/heron/api/src/scala/com/twitter/heron/streamlet/scala/Streamlet.scala index e6bb0237553..00a26284bc6 100644 --- a/heron/api/src/scala/com/twitter/heron/streamlet/scala/Streamlet.scala +++ b/heron/api/src/scala/com/twitter/heron/streamlet/scala/Streamlet.scala @@ -16,7 +16,7 @@ package com.twitter.heron.streamlet.scala import com.twitter.heron.streamlet.{KeyValue, KeyedWindow} // TODO: This Java Streamlet API references will be changed with Scala versions when they are ready -import com.twitter.heron.streamlet.{JoinType, SerializableTransformer, Sink, WindowConfig} +import com.twitter.heron.streamlet.{JoinType, SerializableTransformer, WindowConfig} /** * A Streamlet is a (potentially unbounded) ordered collection of tuples. diff --git a/heron/api/tests/scala/com/twitter/heron/streamlet/scala/SinkTest.scala b/heron/api/tests/scala/com/twitter/heron/streamlet/scala/SinkTest.scala new file mode 100644 index 00000000000..6245e33fc0c --- /dev/null +++ b/heron/api/tests/scala/com/twitter/heron/streamlet/scala/SinkTest.scala @@ -0,0 +1,65 @@ +// Copyright 2018 Twitter. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.twitter.heron.streamlet.scala + +import scala.collection.mutable.ListBuffer + +import com.twitter.heron.streamlet.Context +import com.twitter.heron.streamlet.scala.common.{BaseFunSuite, TestContext} + +/** + * Sink is how Streamlet's end. This class covers unit tests of Sink functionality + * by creating Test Sink Implementation + */ +class SinkTest extends BaseFunSuite { + + test("Sink should support setup") { + val list = ListBuffer[Int]() + val sink = new TestSink(list) + sink.setup(new TestContext()) + assert(list == List(1, 2)) + } + + test("Sink should put data") { + val list = ListBuffer[Int]() + val sink = new TestSink(list) + sink.setup(new TestContext()) + sink.put(3) + assert(list == List(1, 2, 3)) + } + + test("Sink should support cleanup") { + val list = ListBuffer[Int]() + val sink = new TestSink(list) + sink.setup(new TestContext()) + sink.put(3) + sink.cleanup() + assert(list.isEmpty) + } + + private class TestSink(numbers: ListBuffer[Int]) extends Sink[Int] { + + override def setup(context: Context): Unit = { + numbers += (1, 2) + } + + override def put(tuple: Int): Unit = { + numbers += tuple + } + + override def cleanup(): Unit = numbers.clear() + + } + +} diff --git a/heron/api/tests/scala/com/twitter/heron/streamlet/scala/common/TestContext.scala b/heron/api/tests/scala/com/twitter/heron/streamlet/scala/common/TestContext.scala index 9b020e59849..bf0832c60ff 100644 --- a/heron/api/tests/scala/com/twitter/heron/streamlet/scala/common/TestContext.scala +++ b/heron/api/tests/scala/com/twitter/heron/streamlet/scala/common/TestContext.scala @@ -44,9 +44,7 @@ private[scala] class TestContext extends Context { collectionInterval: Int, metricFn: Supplier[T]): Unit = {} - override def getState = - new TestState[io.Serializable, io.Serializable]() + override def getState = new TestState[io.Serializable, io.Serializable]() } -