Skip to content

Commit

Permalink
add thread safety for SyslogBridge (#55)
Browse files Browse the repository at this point in the history
* add thread safety for SyslogBridge

* remove relp connection closure from EventDataConsumer

* fix tls usage

* fix tests

* fix tests, reuse metricRegistry
  • Loading branch information
kortemik authored Jan 21, 2025
1 parent efb550b commit 8f0c524
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 84 deletions.
3 changes: 3 additions & 0 deletions src/main/java/com/teragrep/aer_02/DefaultOutput.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Implementation of an shareable output. Required to be thread-safe.
*/
final class DefaultOutput implements Output {

private static final Logger LOGGER = LoggerFactory.getLogger(DefaultOutput.class);
Expand Down
30 changes: 1 addition & 29 deletions src/main/java/com/teragrep/aer_02/EventDataConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,12 @@

import com.codahale.metrics.Gauge;
import com.codahale.metrics.MetricRegistry;
import com.teragrep.aer_02.config.RelpConnectionConfig;
import com.teragrep.aer_02.config.SyslogConfig;
import com.teragrep.aer_02.config.source.Sourceable;
import com.teragrep.rlo_14.Facility;
import com.teragrep.rlo_14.SDElement;
import com.teragrep.rlo_14.Severity;
import com.teragrep.rlo_14.SyslogMessage;
import com.teragrep.rlp_01.client.SSLContextSupplier;

import java.nio.charset.StandardCharsets;
import java.time.Instant;
Expand All @@ -64,35 +62,14 @@

import static com.codahale.metrics.MetricRegistry.name;

final class EventDataConsumer implements AutoCloseable {
final class EventDataConsumer {

// Note: Checkpointing is handled automatically.
private final Output output;
private final String realHostName;
private final SyslogConfig syslogConfig;
private final MetricRegistry metricRegistry;

EventDataConsumer(
final Sourceable configSource,
final String hostname,
final MetricRegistry metricRegistry,
final SSLContextSupplier sslContextSupplier
) {
this(
configSource,
new DefaultOutput("defaultOutput", new RelpConnectionConfig(configSource), metricRegistry, sslContextSupplier), hostname, metricRegistry
);
}

EventDataConsumer(final Sourceable configSource, final String hostname, final MetricRegistry metricRegistry) {
this(
configSource,
new DefaultOutput("defaultOutput", new RelpConnectionConfig(configSource), metricRegistry),
hostname,
metricRegistry
);
}

EventDataConsumer(
final Sourceable configSource,
final Output output,
Expand Down Expand Up @@ -166,9 +143,4 @@ public Long getValue() {

output.accept(syslogMessage.toRfc5424SyslogMessage().getBytes(StandardCharsets.UTF_8));
}

@Override
public void close() {
output.close();
}
}
68 changes: 42 additions & 26 deletions src/main/java/com/teragrep/aer_02/SyslogBridge.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import com.codahale.metrics.MetricRegistry;
import com.microsoft.azure.functions.*;
import com.microsoft.azure.functions.annotation.*;
import com.teragrep.aer_02.config.RelpConnectionConfig;
import com.teragrep.aer_02.config.source.EnvironmentSource;
import com.teragrep.aer_02.config.source.Sourceable;
import com.teragrep.aer_02.json.JsonRecords;
Expand All @@ -65,12 +66,14 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class SyslogBridge {

private EventDataConsumer consumer = null;
private Report report = null;
private MetricRegistry metricRegistry = null;
private final Lock initLock = new ReentrantLock();
private final MetricRegistry metricRegistry = new MetricRegistry();;
private DefaultOutput defaultOutput = null;
private boolean initialized = false;

@FunctionName("metrics")
Expand Down Expand Up @@ -121,37 +124,50 @@ public void eventHubTriggerToSyslog(
context.getLogger().fine("eventHubTriggerToSyslog triggered");
context.getLogger().fine("Got events: " + events.length);

if (!initialized) {
metricRegistry = new MetricRegistry();
report = new JmxReport(
new Slf4jReport(new PrometheusReport(new DropwizardExports(metricRegistry)), metricRegistry),
metricRegistry
);
report.start();
final Sourceable configSource = new EnvironmentSource();
final String hostname = new Hostname("localhost").hostname();

final Sourceable configSource = new EnvironmentSource();
final String hostname = new Hostname("localhost").hostname();
try {
initLock.lock();
if (!initialized) {
final Report report = new JmxReport(
new Slf4jReport(new PrometheusReport(new DropwizardExports(metricRegistry)), metricRegistry),
metricRegistry
);
report.start();

if (configSource.source("relp.tls.mode", "none").equals("keyVault")) {
consumer = new EventDataConsumer(configSource, hostname, metricRegistry, new AzureSSLContextSupplier());
}
else {
consumer = new EventDataConsumer(configSource, hostname, metricRegistry);
}
if (configSource.source("relp.tls.mode", "none").equals("keyVault")) {

final Thread shutdownHook = new Thread(() -> {
if (consumer != null) {
consumer.close();
defaultOutput = new DefaultOutput(
"defaultOutput",
new RelpConnectionConfig(configSource),
metricRegistry,
new AzureSSLContextSupplier()
);
}
if (report != null) {
report.close();
else {
defaultOutput = new DefaultOutput(
"defaultOutput",
new RelpConnectionConfig(configSource),
metricRegistry
);
}
});

Runtime.getRuntime().addShutdownHook(shutdownHook);
final Thread shutdownHook = new Thread(() -> {
defaultOutput.close();
report.close();
});

initialized = true;
Runtime.getRuntime().addShutdownHook(shutdownHook);

initialized = true;
}
}
finally {
initLock.unlock();
}

EventDataConsumer consumer = new EventDataConsumer(configSource, defaultOutput, hostname, metricRegistry);

for (int index = 0; index < events.length; index++) {
if (events[index] != null) {
Expand Down
2 changes: 0 additions & 2 deletions src/test/java/com/teragrep/aer_02/EventDataConsumerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,6 @@ public void testLatencyMetric() {
.accept("event", partitionContext, ZonedDateTime.now().minusSeconds(10), String.valueOf(i), props, systemProps);
}

Assertions.assertDoesNotThrow(eventDataConsumer::close);

// 5 records for each partition
Gauge<Long> gauge1 = metricRegistry.gauge(name(EventDataConsumer.class, "latency-seconds", "0"));
Gauge<Long> gauge2 = metricRegistry.gauge(name(EventDataConsumer.class, "latency-seconds", "1"));
Expand Down
63 changes: 36 additions & 27 deletions src/test/java/com/teragrep/aer_02/EventDataConsumerTlsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@
package com.teragrep.aer_02;

import com.codahale.metrics.MetricRegistry;
import com.teragrep.aer_02.config.RelpConnectionConfig;
import com.teragrep.aer_02.config.source.EnvironmentSource;
import com.teragrep.aer_02.config.source.Sourceable;
import com.teragrep.aer_02.fakes.PartitionContextFake;
import com.teragrep.aer_02.fakes.SystemPropsFake;
import com.teragrep.net_01.channel.socket.TLSFactory;
Expand Down Expand Up @@ -145,35 +147,43 @@ void tearDown() {

@Test
void testSyslogBridgeTls() {
final EventDataConsumer edc = new EventDataConsumer(
new EnvironmentSource(),
"localhost",
new MetricRegistry(),
new SSLContextSupplier() {

@Override
public SSLContext get() {
SSLContext rv;
try {
rv = InternalSSLContextFactory
.authenticatedContext(
"src/test/resources/keystore-client.jks",
"src/test/resources/truststore.jks", "changeit", "changeit", "TLSv1.3"
);
}
catch (GeneralSecurityException | IOException e) {
throw new RuntimeException(e);
}
return rv;
}

@Override
public boolean isStub() {
return false;
}
SSLContextSupplier sslContextSupplier = new SSLContextSupplier() {

@Override
public SSLContext get() {
SSLContext rv;
try {
rv = InternalSSLContextFactory
.authenticatedContext(
"src/test/resources/keystore-client.jks", "src/test/resources/truststore.jks",
"changeit", "changeit", "TLSv1.3"
);
}
catch (GeneralSecurityException | IOException e) {
throw new RuntimeException(e);
}
return rv;
}

@Override
public boolean isStub() {
return false;
}
};

MetricRegistry metricRegistry = new MetricRegistry();

Sourceable configSource = new EnvironmentSource();

DefaultOutput defaultOutput = new DefaultOutput(
"defaultOutput",
new RelpConnectionConfig(configSource),
metricRegistry,
sslContextSupplier
);

final EventDataConsumer edc = new EventDataConsumer(configSource, defaultOutput, "localhost", metricRegistry);

// Fake data
PartitionContextFake pcf = new PartitionContextFake("eventhub.123", "test1", "$Default", "0");
Map<String, Object> props = new HashMap<>();
Expand Down Expand Up @@ -206,7 +216,6 @@ public boolean isStub() {
}

Assertions.assertEquals(3, loops);
Assertions.assertDoesNotThrow(edc::close);
}

private static class InternalSSLContextFactory {
Expand Down

0 comments on commit 8f0c524

Please sign in to comment.