Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Partitioned Distributed Sink encounter NullPointerException #1796

Open
GitHub-Yann opened this issue Oct 18, 2022 · 0 comments
Open

Partitioned Distributed Sink encounter NullPointerException #1796

GitHub-Yann opened this issue Oct 18, 2022 · 0 comments

Comments

@GitHub-Yann
Copy link

GitHub-Yann commented Oct 18, 2022

Description:
Hello ,
I am new for Siddhi.
When I follow the "Distributed Sink" of "Query Guide" , I can't make the demo run successfully.

(1)Is my usage wrong ?

Affected Siddhi Version:
siddhi-tooling-5.1.0

OS, DB, other environment details and versions:
CentOS Linux release 7.5.1804

Steps to reproduce:
I have a service which contains 2 interfaces:
(1)/api/demo/test/post
(2)/api/demo/test/post2
they all need User entity.

below is my siddhi settings:

@App:name("HelloWorldPartitionApp")

@source(type = 'http', receiver.url = "http://0.0.0.0:8008/cargo123", @map(type = 'json'))
define stream CargoStream1 (name string,age int);

@sink(type='http',method='POST',@map(type='json',validate.json='true',@payload("""{"userName":"{{weight}}","age":"{{age}}","mobile":"{{totalWeight}}"}""")),
    @distribution(strategy='partitioned', partitionKey='weight',
        @destination(publisher.url='http://192.168.1.39:9911/api/demo/test/post'),
        @destination(publisher.url='http://192.168.1.39:9911/api/demo/test/post2')))
define stream OutputStream1(weight string,age int, totalWeight long);

@info(name='HelloWorldPartitionQuery')
from CargoStream1
select name as weight, age, sum(age) as totalWeight
insert into OutputStream1;

my request body for cargo123:

{
    "name": "Tom",
    "age": 28
}

Related Issues:

[2022-10-18_15-40-38_886] ERROR {io.siddhi.core.stream.StreamJunction} - Error in 'HelloWorldPartitionApp' after consuming events from Stream 'OutputStream1', null. Hence, dropping event 'StreamEvent{ timestamp=1666078838886, beforeWindowData=null, onAfterWindowData=null, outputData=[Tom, 28, 28], type=CURRENT, next=null}' (Encoded) 

java.lang.NullPointerException
	at io.siddhi.extension.io.http.sink.HttpSink.sendRequest(HttpSink.java:841)
	at io.siddhi.extension.io.http.sink.HttpSink.publish(HttpSink.java:618)
	at io.siddhi.core.util.transport.SingleClientDistributedSink.publish(SingleClientDistributedSink.java:61)
	at io.siddhi.core.stream.output.sink.distributed.DistributedTransport.publish(DistributedTransport.java:125)
	at io.siddhi.core.stream.output.sink.Sink.publish(Sink.java:182)
	at io.siddhi.extension.map.json.sinkmapper.JsonSinkMapper.mapAndSend(JsonSinkMapper.java:211)
	at io.siddhi.core.stream.output.sink.SinkMapper.mapAndSend(SinkMapper.java:180)
	at io.siddhi.core.stream.output.sink.SinkCallback.receive(SinkCallback.java:55)
	at io.siddhi.core.stream.output.StreamCallback.receive(StreamCallback.java:100)
	at io.siddhi.core.stream.StreamJunction.sendEvent(StreamJunction.java:176)
	at io.siddhi.core.stream.StreamJunction$Publisher.send(StreamJunction.java:465)
	at io.siddhi.core.query.output.callback.InsertIntoStreamCallback.send(InsertIntoStreamCallback.java:56)
	at io.siddhi.core.query.output.ratelimit.OutputRateLimiter.sendToCallBacks(OutputRateLimiter.java:104)
	at io.siddhi.core.query.output.ratelimit.PassThroughOutputRateLimiter.process(PassThroughOutputRateLimiter.java:44)
	at io.siddhi.core.query.selector.QuerySelector.process(QuerySelector.java:97)
	at io.siddhi.core.query.input.ProcessStreamReceiver.processAndClear(ProcessStreamReceiver.java:183)
	at io.siddhi.core.query.input.ProcessStreamReceiver.process(ProcessStreamReceiver.java:90)
	at io.siddhi.core.query.input.ProcessStreamReceiver.receive(ProcessStreamReceiver.java:128)
	at io.siddhi.core.stream.StreamJunction.sendEvent(StreamJunction.java:199)
	at io.siddhi.core.stream.StreamJunction$Publisher.send(StreamJunction.java:474)
	at io.siddhi.core.stream.input.InputDistributor.send(InputDistributor.java:34)
	at io.siddhi.core.stream.input.InputEntryValve.send(InputEntryValve.java:45)
	at io.siddhi.core.stream.input.InputHandler.send(InputHandler.java:78)
	at io.siddhi.core.stream.input.source.PassThroughSourceHandler.sendEvent(PassThroughSourceHandler.java:35)
	at io.siddhi.core.stream.input.source.InputEventHandler.sendEvent(InputEventHandler.java:81)
	at io.siddhi.extension.map.json.sourcemapper.JsonSourceMapper.mapAndProcess(JsonSourceMapper.java:234)
	at io.siddhi.core.stream.input.source.SourceMapper.onEvent(SourceMapper.java:152)
	at io.siddhi.core.stream.input.source.SourceMapper.onEvent(SourceMapper.java:118)
	at io.siddhi.extension.io.http.source.HttpWorkerThread.run(HttpWorkerThread.java:62)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant