diff --git a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpSinkTask.scala b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpSinkTask.scala index ffbed9239..5f1e9648f 100644 --- a/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpSinkTask.scala +++ b/kafka-connect-http/src/main/scala/io/lenses/streamreactor/connect/http/sink/HttpSinkTask.scala @@ -85,8 +85,9 @@ class HttpSinkTask extends SinkTask with LazyLogging with JarManifestProvided { logger.debug(s"[$sinkName] put call with ${records.size()} records") - val storedErrors = errorRef.get.unsafeRunSync() - val nonEmptyRecords = NonEmptySeq.fromSeq(records.asScala.toSeq) + val storedErrors = errorRef.get.unsafeRunSync() + //Filter out null records since there are users who are sending null records + val nonEmptyRecords = NonEmptySeq.fromSeq(records.asScala.toSeq.filter(_ != null)) (storedErrors, nonEmptyRecords) match { case (errors, _) if errors.nonEmpty => handleStoredErrors(errors) diff --git a/kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/HttpSinkTaskTest.scala b/kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/HttpSinkTaskTest.scala index 0a1ffb807..a4ff0ce61 100644 --- a/kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/HttpSinkTaskTest.scala +++ b/kafka-connect-http/src/test/scala/io/lenses/streamreactor/connect/http/sink/HttpSinkTaskTest.scala @@ -15,12 +15,14 @@ */ package io.lenses.streamreactor.connect.http.sink +import org.apache.kafka.connect.sink.SinkRecord import org.scalatest.BeforeAndAfterEach import org.scalatest.EitherValues import org.scalatest.funsuite.AnyFunSuiteLike import org.scalatest.matchers.should.Matchers import java.util +import scala.jdk.CollectionConverters.SeqHasAsJava class HttpSinkTaskTest extends AnyFunSuiteLike with Matchers with EitherValues with BeforeAndAfterEach { @@ -33,4 +35,10 @@ class HttpSinkTaskTest extends AnyFunSuiteLike with Matchers with EitherValues w } } + test("put method should handle null records collection") { + + noException should be thrownBy { + httpSinkTask.put(List[SinkRecord](null).asJava) + } + } }