forked from pravega/pravega-samples
-
Notifications
You must be signed in to change notification settings - Fork 0
/
SimpleReader.java
89 lines (77 loc) · 3.05 KB
/
SimpleReader.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
/*
* Copyright (c) 2017 Dell Inc., or its subsidiaries. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*/
package io.pravega.example.noop;
import io.pravega.client.ClientFactory;
import io.pravega.client.admin.ReaderGroupManager;
import io.pravega.client.stream.EventRead;
import io.pravega.client.stream.EventStreamReader;
import io.pravega.client.stream.ReaderConfig;
import io.pravega.client.stream.ReaderGroupConfig;
import io.pravega.client.stream.ReinitializationRequiredException;
import io.pravega.client.stream.Sequence;
import io.pravega.client.stream.Serializer;
import io.pravega.client.stream.Stream;
import java.net.URI;
import java.util.Collections;
import java.util.UUID;
import java.util.function.Consumer;
public class SimpleReader<T> implements Runnable {
private static final int READER_TIMEOUT_MS = 200000;
private String scope;
private String streamName;
private URI controllerURI;
private Serializer<T> serializer;
private Consumer<T> onNext;
private volatile boolean running;
private Consumer<Throwable> onError = (Throwable throwable) -> throwable.printStackTrace();
public SimpleReader(String scope, String streamName, URI controllerURI, Serializer<T> serializer, Consumer<T> onNext) {
this.scope = scope;
this.streamName = streamName;
this.controllerURI = controllerURI;
this.serializer = serializer;
this.onNext = onNext;
}
public void setOnError(Consumer<Throwable> onError) {
this.onError = onError;
}
public boolean isRunning() {
return running;
}
public void setRunning(boolean running) {
this.running = running;
}
public void run() {
setRunning(true);
final String readerGroup = UUID.randomUUID().toString().replace("-", "");
final ReaderGroupConfig readerGroupConfig = ReaderGroupConfig.builder()
.stream(Stream.of(scope, streamName))
.build();
try (ReaderGroupManager readerGroupManager = ReaderGroupManager.withScope(scope, controllerURI)) {
readerGroupManager.createReaderGroup(readerGroup, readerGroupConfig);
}
try (ClientFactory clientFactory = ClientFactory.withScope(scope, controllerURI);
EventStreamReader<T> reader = clientFactory.createReader("reader",
readerGroup, serializer, ReaderConfig.builder().build())) {
while (isRunning()) {
try {
EventRead<T> event = reader.readNextEvent(READER_TIMEOUT_MS);
T eventData = event.getEvent();
if (eventData != null) {
onNext.accept(event.getEvent());
}
}
catch (ReinitializationRequiredException e) {
onError.accept(e);
}
}
}
}
}