Skip to content

Commit

Permalink
prepare 4.0.0 release (#80)
Browse files Browse the repository at this point in the history
* add time threshold for backoff reset

* allow endpoint to be specified as either URI or HttpUrl

* add @SInCE

* add interface for customizing requests

* javadoc fixes

* add changelog for past releases

* remove JSR305

* replace SSL-specific config method with general-purpose HTTP config method

* make helper method static

* add end-to-end EventSource tests

* spacing

* omit default header value if there's a custom value

* avoid trailing period in logger name

* add 1.x branch

* update to OkHttp 4.x and Java 8

* javadoc fixes

* remove EventSource setters, + test improvements

* update Gradle release

* enable Github Pages

* skip tests in release

* add ability to force a stream restart; improve tests so we can test this

* revert whitespace change

* bump OkHttp version to latest Java 7-compatible version

* add ability to force a stream restart; improve tests so we can test this (#29)

* update to okhttp 4.5.0

* longer timeout for cleaner shutdown of test servers

* fix Gradle scopes

* allow setting specific thread priority

* remove misleading logging & unnecessary backoff, improve tests (#34)

* known issue with onClose() - add comment, disable test assertions

* allow caller to specify a custom logger instead of SLF4J (#32)

* add method for changing base name of SLF4J logger

* enable coverage reports in CI, improve CI to test all supported Java versions

* rm inapplicable CI copy-paste

* another CI fix

* add checkstyle config

* fix jitter calculation when upper bound is a power of 2

* misc coverage + test improvements, add CI enforcement of coverage (#39)

* fix shutdown state logic, simplify code paths (#40)

* Fix Java 7 compatibility.

* add OpenJDK 7 build + fix test race condition + javadoc fix (#42)

* update Gradle to 6.8.3

* Kotlinize build script

* fix logic for shutting down after an unrecoverable error

* use newer HTTP test helpers

* use Releaser v2 config + newer CI images (#47)

* use new stream-reading implementation to support CR-only line endings

* make buffer size configurable

* rm usage that's not allowed in Java 8

* add Guava test dependency

* add code coverage ovverride

* implement contract tests (#48)

* use Gradle 7

* Bounded queues for the EventHandler thread (#58)

* Bounded queue for the EventHandler thread

The unbounded queue fronting the 'event' thread can cause trouble when the
EventHandler is unable to keep up with the workload. This can lead to heap
exhaustion, GC issues and failure modes that are generally considered "bad".

Band-aid over this with a semaphore to limit the number of tasks in the queue.
The semaphore is opt-in and disabled by default to avoid any nasty surprises
for folks upgrading.

Also add 'EventSource.awaitClosed()' to allow users to wait for underlying
thread pools to completely shut down. We can't know if it's safe to clean
up resources used by the EventHandler thread if we can't be certain that it
has completely terminated.

* Address checkstyle griping in StubServer

* Fix JavaDoc issue

* Tighten up exception handling

Co-authored-by: Eli Bishop <[email protected]>

* update @SInCE

* test Java 17 in CI (#51)

* improve tests for AsyncEventHandler and EventSource.awaitClosed (#52)

* add streaming data mode for very large events (#53)

* add option to ensure that expected fields are always read

* add Gradle option to suppress kotlin-stdlib in our pom

* update okhttp to 4.9.3

* use LaunchDarkly logging facade

* rm unused

* misc fixes

* improve javadoc links

* remove SLF4J dependency, use only com.launchdarkly.logging

* update com.launchdarkly.logging version

* consistently use placeholders instead of concatenation in log output

* update release metadata

* use SecureRandom instead of Random, just to make scanners happier

* use SecureRandom instead of Random, just to make scanners happier

* use SecureRandom instead of Random, just to make scanners happier

* fix release metadata

* remove usage of Duration for Android compatibility

* new synchronous EventSource implementation (#64)

* add async wrapper to emulate old EventSource (#65)

Co-authored-by: Eli Bishop <[email protected]>
Co-authored-by: LaunchDarklyCI <[email protected]>
Co-authored-by: Gavin Whelan <[email protected]>
Co-authored-by: LaunchDarklyReleaseBot <[email protected]>
Co-authored-by: Tom Lee <[email protected]>
  • Loading branch information
6 people authored Dec 20, 2022
1 parent 0110f1b commit d3dba83
Show file tree
Hide file tree
Showing 60 changed files with 6,773 additions and 2,904 deletions.
22 changes: 12 additions & 10 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ dependencies {
api("com.launchdarkly:launchdarkly-logging:${Versions.launchdarklyLogging}")
api("com.squareup.okhttp3:okhttp:${Versions.okhttp}")
testImplementation("org.mockito:mockito-core:1.10.19")
testImplementation("com.launchdarkly:test-helpers:1.0.0")
testImplementation("com.launchdarkly:test-helpers:2.0.1")
testImplementation("com.google.guava:guava:30.1-jre")
testImplementation("junit:junit:4.12")
testImplementation("org.hamcrest:hamcrest-all:1.3")
Expand Down Expand Up @@ -122,24 +122,26 @@ tasks.jacocoTestCoverageVerification.configure {
violationRules {
val knownMissedLinesForMethods = mapOf(
// The key for each of these items is the complete method signature minus the "com.launchdarkly.eventsource." prefix.
"AsyncEventHandler.acquire()" to 2,
"AsyncEventHandler.execute(java.lang.Runnable)" to 3,
"EventParser.IncrementalMessageDataInputStream.read()" to 7,
"EventParser.IncrementalMessageDataInputStream.read(byte[])" to 1,
"EventParser.IncrementalMessageDataInputStream.read(byte[], int, int)" to 2,
"EventParser.IncrementalMessageDataInputStream.canGetNextChunk()" to 3,
"EventSource.awaitClosed(long, java.util.concurrent.TimeUnit)" to 2,
"EventSource.awaitClosed(java.time.Duration)" to 1,
"EventSource.handleSuccessfulResponse(okhttp3.Response)" to 2,
"EventSource.maybeReconnectDelay(int, long)" to 2,
"EventSource.run()" to 3,
"EventSource.Builder.createInitialClientBuilder()" to 1,
"EventSource.Builder.defaultTrustManager()" to 2,
"Helpers.utf8ByteArrayOutputStreamToString(java.io.ByteArrayOutputStream)" to 2,
"HttpConnectStrategy.defaultTrustManager()" to 2,
"HttpConnectStrategy.Client.awaitClosed(long)" to 1,
"HttpConnectStrategy.Client.createHttpClient()" to 2,
"MessageEvent.getData()" to 2,
"ModernTLSSocketFactory.createSocket(java.lang.String, int)" to 1,
"ModernTLSSocketFactory.createSocket(java.lang.String, int, java.net.InetAddress, int)" to 1,
"ModernTLSSocketFactory.createSocket(java.net.InetAddress, int)" to 1,
"ModernTLSSocketFactory.createSocket(java.net.InetAddress, int, java.net.InetAddress, int)" to 1,
"ModernTLSSocketFactory.createSocket(java.net.Socket, java.lang.String, int, boolean)" to 1,
"ModernTLSSocketFactory.getDefaultCipherSuites()" to 1,
"ModernTLSSocketFactory.getSupportedCipherSuites()" to 1
)
"ModernTLSSocketFactory.getSupportedCipherSuites()" to 1,
"background.BackgroundEventSource.dispatchEvent(com.launchdarkly.eventsource.StreamEvent)" to 3
)

knownMissedLinesForMethods.forEach { (signature, maxMissedLines) ->
if (maxMissedLines > 0) { // < 0 means skip entire method
Expand Down
78 changes: 46 additions & 32 deletions contract-tests/service/src/main/java/ssetest/StreamEntity.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@
import okhttp3.*;
import ssetest.Representations.*;

public class StreamEntity implements EventHandler {
public class StreamEntity {
private final TestService owner;
private final String id;
private final EventSource stream;
private final EventSource eventSource;
private final StreamOptions options;
private final AtomicInteger callbackMessageCounter = new AtomicInteger(0);
private final LDLogger logger;
Expand All @@ -25,74 +25,88 @@ public StreamEntity(TestService owner, String id, StreamOptions options, LDLogAd
this.logger = LDLogger.withAdapter(logAdapter, options.tag);
logger.info("Opening stream to {}", options.streamUrl);

EventSource.Builder eb = new EventSource.Builder(this, URI.create(options.streamUrl))
.logger(logger.subLogger("stream"));
HttpConnectStrategy connectStrategy = ConnectStrategy.http(URI.create(options.streamUrl));
if (options.readTimeoutMs != null) {
connectStrategy = connectStrategy.readTimeout((long)options.readTimeoutMs, null);
}
if (options.method != null) {
connectStrategy = connectStrategy.methodAndBody(options.method,
options.body == null ? null :
RequestBody.create(options.body,
MediaType.parse(options.headers.get("content-type") == null ?
"text/plain; charset=utf-8" : options.headers.get("content-type")))
);
}
if (options.headers != null) {
Headers.Builder hb = new Headers.Builder();
for (String name: options.headers.keySet()) {
hb.add(name, options.headers.get(name));
connectStrategy = connectStrategy.header(name, options.headers.get(name));
}
eb.headers(hb.build());
}

EventSource.Builder eb = new EventSource.Builder(connectStrategy)
.errorStrategy(ErrorStrategy.alwaysContinue())
.logger(logger.subLogger("stream"));
if (options.initialDelayMs != null) {
eb.reconnectTime(options.initialDelayMs, null);
}
if (options.readTimeoutMs != null) {
eb.readTimeout(options.readTimeoutMs, null);
eb.retryDelay((long)options.initialDelayMs, null);
}
if (options.lastEventId != null) {
eb.lastEventId(options.lastEventId);
}
if (options.method != null) {
eb.method(options.method);
}
if (options.body != null) {
String contentType = options.headers == null ? null : options.headers.get("content-type");
eb.body(RequestBody.create(options.body,
MediaType.parse(contentType == null ? "text/plain; charset=utf-8" : contentType)));
}
this.stream = eb.build();

this.stream.start();

this.eventSource = eb.build();
new Thread(() -> {
for (StreamEvent event: this.eventSource.anyEvents()) {
handleEvent(event);
}
}).start();
}

public boolean doCommand(String command) {
logger.info("Test harness sent command: {}", command);
if (command.equals("restart")) {
stream.restart();
eventSource.interrupt();
return true;
}
return false;
}

public void close() {
closed = true;
stream.close();
eventSource.close();
owner.forgetStream(id);
logger.info("Test ended");
}

public void onOpen() {}

public void onClosed() {}
private void handleEvent(StreamEvent event) {
if (event instanceof MessageEvent) {
onMessage((MessageEvent)event);
} else if (event instanceof CommentEvent) {
onComment(((CommentEvent)event).getText());
} else if (event instanceof FaultEvent) {
onError(((FaultEvent)event).getCause());
}
}

public void onMessage(String name, MessageEvent e) {
logger.info("Received event from stream ({})", name);
private void onMessage(MessageEvent e) {
logger.info("Received event from stream ({})", e.getEventName());
Message m = new Message("event");
m.event = new EventMessage();
m.event.type = name;
m.event.type = e.getEventName();
m.event.data = e.getData();
m.event.id = e.getLastEventId();
writeMessage(m);
}

public void onComment(String comment) {
private void onComment(String comment) {
Message m = new Message("comment");
m.comment = comment;
writeMessage(m);
}

public void onError(Throwable t) {
private void onError(Throwable t) {
if (t instanceof StreamClosedByCallerException) {
return; // the SSE contract tests don't want to see this non-error
}
logger.info("Received error from stream: {}", t.toString());
Message m = new Message("error");
m.error = t.toString();
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version=3.0.0
version=4.0.0-SNAPSHOT
ossrhUsername=
ossrhPassword=

Expand Down
126 changes: 0 additions & 126 deletions src/main/java/com/launchdarkly/eventsource/AsyncEventHandler.java

This file was deleted.

53 changes: 53 additions & 0 deletions src/main/java/com/launchdarkly/eventsource/CommentEvent.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package com.launchdarkly.eventsource;

import java.util.Objects;

/**
* Describes a comment line received from the stream.
* <p>
* An SSE comment is a line that starts with a colon. There is no defined meaning for this
* in the SSE specification, and most clients ignore it. It may be used to provide a
* periodic heartbeat from the server to keep connections from timing out.
*
* @since 4.0.0
*/
public final class CommentEvent implements StreamEvent {
private final String text;

/**
* Creates an instance.
*
* @param text the comment text, not including the leading colon
*/
public CommentEvent(String text) {
this.text = text;
}

/**
* Returns the comment text, not including the leading colon.
*
* @return the text
*/
public String getText() {
return text;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

CommentEvent that = (CommentEvent) o;
return Objects.equals(text, that.text);
}

@Override
public int hashCode() {
return Objects.hash(text);
}

@Override
public String toString() {
return "CommentEvent(" + text + ")";
}
}
Loading

0 comments on commit d3dba83

Please sign in to comment.