Skip to content
This repository has been archived by the owner on Mar 3, 2023. It is now read-only.

Commit

Permalink
Add Sink Trait Support (#2726)
Browse files Browse the repository at this point in the history
  • Loading branch information
erenavsarogullari authored and kramasamy committed Feb 12, 2018
1 parent 152f47e commit a2f0008
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 4 deletions.
32 changes: 32 additions & 0 deletions heron/api/src/scala/com/twitter/heron/streamlet/scala/Sink.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright 2018 Twitter. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.twitter.heron.streamlet.scala

import java.io.Serializable
import com.twitter.heron.streamlet.Context

/**
* Sink is how Streamlet's end. The put method invocation consumes the tuple into say
* external database/cache, etc. setup/cleanup is where the sink can do any one time setup work,
* like establishing/closing connection to sources, etc.
*/
trait Sink[T] extends Serializable {

def setup(context: Context): Unit

def put(tuple: T): Unit

def cleanup(): Unit

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ package com.twitter.heron.streamlet.scala
import com.twitter.heron.streamlet.{KeyValue, KeyedWindow}

// TODO: This Java Streamlet API references will be changed with Scala versions when they are ready
import com.twitter.heron.streamlet.{JoinType, SerializableTransformer, Sink, WindowConfig}
import com.twitter.heron.streamlet.{JoinType, SerializableTransformer, WindowConfig}

/**
* A Streamlet is a (potentially unbounded) ordered collection of tuples.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright 2018 Twitter. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.twitter.heron.streamlet.scala

import scala.collection.mutable.ListBuffer

import com.twitter.heron.streamlet.Context
import com.twitter.heron.streamlet.scala.common.{BaseFunSuite, TestContext}

/**
* Sink is how Streamlet's end. This class covers unit tests of Sink functionality
* by creating Test Sink Implementation
*/
class SinkTest extends BaseFunSuite {

test("Sink should support setup") {
val list = ListBuffer[Int]()
val sink = new TestSink(list)
sink.setup(new TestContext())
assert(list == List(1, 2))
}

test("Sink should put data") {
val list = ListBuffer[Int]()
val sink = new TestSink(list)
sink.setup(new TestContext())
sink.put(3)
assert(list == List(1, 2, 3))
}

test("Sink should support cleanup") {
val list = ListBuffer[Int]()
val sink = new TestSink(list)
sink.setup(new TestContext())
sink.put(3)
sink.cleanup()
assert(list.isEmpty)
}

private class TestSink(numbers: ListBuffer[Int]) extends Sink[Int] {

override def setup(context: Context): Unit = {
numbers += (1, 2)
}

override def put(tuple: Int): Unit = {
numbers += tuple
}

override def cleanup(): Unit = numbers.clear()

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,7 @@ private[scala] class TestContext extends Context {
collectionInterval: Int,
metricFn: Supplier[T]): Unit = {}

override def getState =
new TestState[io.Serializable, io.Serializable]()
override def getState = new TestState[io.Serializable, io.Serializable]()

}


0 comments on commit a2f0008

Please sign in to comment.