forked from pravega/pravega-samples
-
Notifications
You must be signed in to change notification settings - Fork 0
/
HelloWorldWriter.java
105 lines (89 loc) · 4.64 KB
/
HelloWorldWriter.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
/*
* Copyright (c) 2017 Dell Inc., or its subsidiaries. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*/
package io.pravega.example.gettingstarted;
import java.net.URI;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import io.pravega.client.ClientFactory;
import io.pravega.client.admin.StreamManager;
import io.pravega.client.stream.EventStreamWriter;
import io.pravega.client.stream.EventWriterConfig;
import io.pravega.client.stream.ScalingPolicy;
import io.pravega.client.stream.StreamConfiguration;
import io.pravega.client.stream.impl.JavaSerializer;
/**
* A simple example app that uses a Pravega Writer to write to a given scope and stream.
*/
public class HelloWorldWriter {
public final String scope;
public final String streamName;
public final URI controllerURI;
public HelloWorldWriter(String scope, String streamName, URI controllerURI) {
this.scope = scope;
this.streamName = streamName;
this.controllerURI = controllerURI;
}
public void run(String routingKey, String message) {
StreamManager streamManager = StreamManager.create(controllerURI);
final boolean scopeIsNew = streamManager.createScope(scope);
StreamConfiguration streamConfig = StreamConfiguration.builder()
.scalingPolicy(ScalingPolicy.fixed(1))
.build();
final boolean streamIsNew = streamManager.createStream(scope, streamName, streamConfig);
try (ClientFactory clientFactory = ClientFactory.withScope(scope, controllerURI);
EventStreamWriter<String> writer = clientFactory.createEventWriter(streamName,
new JavaSerializer<String>(),
EventWriterConfig.builder().build())) {
System.out.format("Writing message: '%s' with routing-key: '%s' to stream '%s / %s'%n",
message, routingKey, scope, streamName);
final CompletableFuture writeFuture = writer.writeEvent(routingKey, message);
}
}
public static void main(String[] args) {
Options options = getOptions();
CommandLine cmd = null;
try {
cmd = parseCommandLineArgs(options, args);
} catch (ParseException e) {
System.out.format("%s.%n", e.getMessage());
final HelpFormatter formatter = new HelpFormatter();
formatter.printHelp("HelloWorldWriter", options);
System.exit(1);
}
final String scope = cmd.getOptionValue("scope") == null ? Constants.DEFAULT_SCOPE : cmd.getOptionValue("scope");
final String streamName = cmd.getOptionValue("name") == null ? Constants.DEFAULT_STREAM_NAME : cmd.getOptionValue("name");
final String uriString = cmd.getOptionValue("uri") == null ? Constants.DEFAULT_CONTROLLER_URI : cmd.getOptionValue("uri");
final URI controllerURI = URI.create(uriString);
HelloWorldWriter hww = new HelloWorldWriter(scope, streamName, controllerURI);
final String routingKey = cmd.getOptionValue("routingKey") == null ? Constants.DEFAULT_ROUTING_KEY : cmd.getOptionValue("routingKey");
final String message = cmd.getOptionValue("message") == null ? Constants.DEFAULT_MESSAGE : cmd.getOptionValue("message");
hww.run(routingKey, message);
}
private static Options getOptions() {
final Options options = new Options();
options.addOption("s", "scope", true, "The scope name of the stream to read from.");
options.addOption("n", "name", true, "The name of the stream to read from.");
options.addOption("u", "uri", true, "The URI to the controller in the form tcp://host:port");
options.addOption("r", "routingKey", true, "The routing key of the message to write.");
options.addOption("m", "message", true, "The message to write.");
return options;
}
private static CommandLine parseCommandLineArgs(Options options, String[] args) throws ParseException {
CommandLineParser parser = new DefaultParser();
CommandLine cmd = parser.parse(options, args);
return cmd;
}
}