diff --git a/src/main/java/com/teragrep/aer_02/DefaultOutput.java b/src/main/java/com/teragrep/aer_02/DefaultOutput.java index 0c7b90c..adc9e9c 100644 --- a/src/main/java/com/teragrep/aer_02/DefaultOutput.java +++ b/src/main/java/com/teragrep/aer_02/DefaultOutput.java @@ -54,6 +54,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * Implementation of an shareable output. Required to be thread-safe. + */ final class DefaultOutput implements Output { private static final Logger LOGGER = LoggerFactory.getLogger(DefaultOutput.class); diff --git a/src/main/java/com/teragrep/aer_02/EventDataConsumer.java b/src/main/java/com/teragrep/aer_02/EventDataConsumer.java index 6c47550..59e99a6 100644 --- a/src/main/java/com/teragrep/aer_02/EventDataConsumer.java +++ b/src/main/java/com/teragrep/aer_02/EventDataConsumer.java @@ -47,14 +47,12 @@ import com.codahale.metrics.Gauge; import com.codahale.metrics.MetricRegistry; -import com.teragrep.aer_02.config.RelpConnectionConfig; import com.teragrep.aer_02.config.SyslogConfig; import com.teragrep.aer_02.config.source.Sourceable; import com.teragrep.rlo_14.Facility; import com.teragrep.rlo_14.SDElement; import com.teragrep.rlo_14.Severity; import com.teragrep.rlo_14.SyslogMessage; -import com.teragrep.rlp_01.client.SSLContextSupplier; import java.nio.charset.StandardCharsets; import java.time.Instant; @@ -64,7 +62,7 @@ import static com.codahale.metrics.MetricRegistry.name; -final class EventDataConsumer implements AutoCloseable { +final class EventDataConsumer { // Note: Checkpointing is handled automatically. private final Output output; @@ -72,27 +70,6 @@ final class EventDataConsumer implements AutoCloseable { private final SyslogConfig syslogConfig; private final MetricRegistry metricRegistry; - EventDataConsumer( - final Sourceable configSource, - final String hostname, - final MetricRegistry metricRegistry, - final SSLContextSupplier sslContextSupplier - ) { - this( - configSource, - new DefaultOutput("defaultOutput", new RelpConnectionConfig(configSource), metricRegistry, sslContextSupplier), hostname, metricRegistry - ); - } - - EventDataConsumer(final Sourceable configSource, final String hostname, final MetricRegistry metricRegistry) { - this( - configSource, - new DefaultOutput("defaultOutput", new RelpConnectionConfig(configSource), metricRegistry), - hostname, - metricRegistry - ); - } - EventDataConsumer( final Sourceable configSource, final Output output, @@ -166,9 +143,4 @@ public Long getValue() { output.accept(syslogMessage.toRfc5424SyslogMessage().getBytes(StandardCharsets.UTF_8)); } - - @Override - public void close() { - output.close(); - } } diff --git a/src/main/java/com/teragrep/aer_02/SyslogBridge.java b/src/main/java/com/teragrep/aer_02/SyslogBridge.java index 6b02c75..0bcf8fb 100644 --- a/src/main/java/com/teragrep/aer_02/SyslogBridge.java +++ b/src/main/java/com/teragrep/aer_02/SyslogBridge.java @@ -48,6 +48,7 @@ import com.codahale.metrics.MetricRegistry; import com.microsoft.azure.functions.*; import com.microsoft.azure.functions.annotation.*; +import com.teragrep.aer_02.config.RelpConnectionConfig; import com.teragrep.aer_02.config.source.EnvironmentSource; import com.teragrep.aer_02.config.source.Sourceable; import com.teragrep.aer_02.json.JsonRecords; @@ -65,12 +66,14 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; public class SyslogBridge { - private EventDataConsumer consumer = null; - private Report report = null; - private MetricRegistry metricRegistry = null; + private final Lock initLock = new ReentrantLock(); + private final MetricRegistry metricRegistry = new MetricRegistry();; + private DefaultOutput defaultOutput = null; private boolean initialized = false; @FunctionName("metrics") @@ -121,37 +124,50 @@ public void eventHubTriggerToSyslog( context.getLogger().fine("eventHubTriggerToSyslog triggered"); context.getLogger().fine("Got events: " + events.length); - if (!initialized) { - metricRegistry = new MetricRegistry(); - report = new JmxReport( - new Slf4jReport(new PrometheusReport(new DropwizardExports(metricRegistry)), metricRegistry), - metricRegistry - ); - report.start(); + final Sourceable configSource = new EnvironmentSource(); + final String hostname = new Hostname("localhost").hostname(); - final Sourceable configSource = new EnvironmentSource(); - final String hostname = new Hostname("localhost").hostname(); + try { + initLock.lock(); + if (!initialized) { + final Report report = new JmxReport( + new Slf4jReport(new PrometheusReport(new DropwizardExports(metricRegistry)), metricRegistry), + metricRegistry + ); + report.start(); - if (configSource.source("relp.tls.mode", "none").equals("keyVault")) { - consumer = new EventDataConsumer(configSource, hostname, metricRegistry, new AzureSSLContextSupplier()); - } - else { - consumer = new EventDataConsumer(configSource, hostname, metricRegistry); - } + if (configSource.source("relp.tls.mode", "none").equals("keyVault")) { - final Thread shutdownHook = new Thread(() -> { - if (consumer != null) { - consumer.close(); + defaultOutput = new DefaultOutput( + "defaultOutput", + new RelpConnectionConfig(configSource), + metricRegistry, + new AzureSSLContextSupplier() + ); } - if (report != null) { - report.close(); + else { + defaultOutput = new DefaultOutput( + "defaultOutput", + new RelpConnectionConfig(configSource), + metricRegistry + ); } - }); - Runtime.getRuntime().addShutdownHook(shutdownHook); + final Thread shutdownHook = new Thread(() -> { + defaultOutput.close(); + report.close(); + }); - initialized = true; + Runtime.getRuntime().addShutdownHook(shutdownHook); + + initialized = true; + } } + finally { + initLock.unlock(); + } + + EventDataConsumer consumer = new EventDataConsumer(configSource, defaultOutput, hostname, metricRegistry); for (int index = 0; index < events.length; index++) { if (events[index] != null) { diff --git a/src/test/java/com/teragrep/aer_02/EventDataConsumerTest.java b/src/test/java/com/teragrep/aer_02/EventDataConsumerTest.java index 8955e70..1d0f3ad 100644 --- a/src/test/java/com/teragrep/aer_02/EventDataConsumerTest.java +++ b/src/test/java/com/teragrep/aer_02/EventDataConsumerTest.java @@ -92,8 +92,6 @@ public void testLatencyMetric() { .accept("event", partitionContext, ZonedDateTime.now().minusSeconds(10), String.valueOf(i), props, systemProps); } - Assertions.assertDoesNotThrow(eventDataConsumer::close); - // 5 records for each partition Gauge gauge1 = metricRegistry.gauge(name(EventDataConsumer.class, "latency-seconds", "0")); Gauge gauge2 = metricRegistry.gauge(name(EventDataConsumer.class, "latency-seconds", "1")); diff --git a/src/test/java/com/teragrep/aer_02/EventDataConsumerTlsTest.java b/src/test/java/com/teragrep/aer_02/EventDataConsumerTlsTest.java index fa8c62a..84702e3 100644 --- a/src/test/java/com/teragrep/aer_02/EventDataConsumerTlsTest.java +++ b/src/test/java/com/teragrep/aer_02/EventDataConsumerTlsTest.java @@ -46,7 +46,9 @@ package com.teragrep.aer_02; import com.codahale.metrics.MetricRegistry; +import com.teragrep.aer_02.config.RelpConnectionConfig; import com.teragrep.aer_02.config.source.EnvironmentSource; +import com.teragrep.aer_02.config.source.Sourceable; import com.teragrep.aer_02.fakes.PartitionContextFake; import com.teragrep.aer_02.fakes.SystemPropsFake; import com.teragrep.net_01.channel.socket.TLSFactory; @@ -145,35 +147,43 @@ void tearDown() { @Test void testSyslogBridgeTls() { - final EventDataConsumer edc = new EventDataConsumer( - new EnvironmentSource(), - "localhost", - new MetricRegistry(), - new SSLContextSupplier() { - - @Override - public SSLContext get() { - SSLContext rv; - try { - rv = InternalSSLContextFactory - .authenticatedContext( - "src/test/resources/keystore-client.jks", - "src/test/resources/truststore.jks", "changeit", "changeit", "TLSv1.3" - ); - } - catch (GeneralSecurityException | IOException e) { - throw new RuntimeException(e); - } - return rv; - } - - @Override - public boolean isStub() { - return false; - } + SSLContextSupplier sslContextSupplier = new SSLContextSupplier() { + + @Override + public SSLContext get() { + SSLContext rv; + try { + rv = InternalSSLContextFactory + .authenticatedContext( + "src/test/resources/keystore-client.jks", "src/test/resources/truststore.jks", + "changeit", "changeit", "TLSv1.3" + ); + } + catch (GeneralSecurityException | IOException e) { + throw new RuntimeException(e); } + return rv; + } + + @Override + public boolean isStub() { + return false; + } + }; + + MetricRegistry metricRegistry = new MetricRegistry(); + + Sourceable configSource = new EnvironmentSource(); + + DefaultOutput defaultOutput = new DefaultOutput( + "defaultOutput", + new RelpConnectionConfig(configSource), + metricRegistry, + sslContextSupplier ); + final EventDataConsumer edc = new EventDataConsumer(configSource, defaultOutput, "localhost", metricRegistry); + // Fake data PartitionContextFake pcf = new PartitionContextFake("eventhub.123", "test1", "$Default", "0"); Map props = new HashMap<>(); @@ -206,7 +216,6 @@ public boolean isStub() { } Assertions.assertEquals(3, loops); - Assertions.assertDoesNotThrow(edc::close); } private static class InternalSSLContextFactory {