diff --git a/etc/config.json b/etc/config.json index 0e2e6d9..e102f67 100644 --- a/etc/config.json +++ b/etc/config.json @@ -28,6 +28,7 @@ "third-pod_default_third-pod-one-.*", "third-pod_default_third-pod-two-.*" ] + "maxLogReadingThreads": 2 }, "relp": { "target": "127.0.0.1", @@ -35,7 +36,6 @@ "connectionTimeout": 5000, "readTimeout": 5000, "writeTimeout": 5000, - "reconnectInterval": 5000, - "outputThreads": 4 + "reconnectInterval": 5000 } } diff --git a/example/combined.yaml b/example/combined.yaml index ff68813..5ae4582 100644 --- a/example/combined.yaml +++ b/example/combined.yaml @@ -65,7 +65,8 @@ data: "third-pod_default_third-pod-one-.*", "third-pod_default_third-pod-two-.*", "fifth-pod_default_.*" - ] + ], + "maxLogReadingThreads": 3 }, "relp": { "target": "receiver.receiver.default", @@ -73,8 +74,7 @@ data: "connectionTimeout": 5000, "readTimeout": 5000, "writeTimeout": 5000, - "reconnectInterval": 5000, - "outputThreads": 5 + "reconnectInterval": 5000 } } log4j2.xml: | @@ -105,7 +105,7 @@ data: kind: ConfigMap metadata: - name: app-config-58dh7f727h + name: app-config-8dtf4m92md --- apiVersion: v1 data: @@ -271,7 +271,7 @@ spec: terminationGracePeriodSeconds: 0 volumes: - configMap: - name: app-config-58dh7f727h + name: app-config-8dtf4m92md name: app-config - hostPath: path: /var/log/containers diff --git a/example/config/k8s_01/config.json b/example/config/k8s_01/config.json index c5e1afd..31a4f48 100644 --- a/example/config/k8s_01/config.json +++ b/example/config/k8s_01/config.json @@ -28,7 +28,8 @@ "third-pod_default_third-pod-one-.*", "third-pod_default_third-pod-two-.*", "fifth-pod_default_.*" - ] + ], + "maxLogReadingThreads": 3 }, "relp": { "target": "receiver.receiver.default", @@ -36,7 +37,6 @@ "connectionTimeout": 5000, "readTimeout": 5000, "writeTimeout": 5000, - "reconnectInterval": 5000, - "outputThreads": 5 + "reconnectInterval": 5000 } } diff --git a/pom.xml b/pom.xml index f162b61..3a034d1 100644 --- a/pom.xml +++ b/pom.xml @@ -45,7 +45,7 @@ com.teragrep rlo_13 - 1.1.0 + 1.2.0 diff --git a/src/main/java/com/teragrep/k8s_01/KubernetesLogReader.java b/src/main/java/com/teragrep/k8s_01/KubernetesLogReader.java index d572cf8..790b075 100644 --- a/src/main/java/com/teragrep/k8s_01/KubernetesLogReader.java +++ b/src/main/java/com/teragrep/k8s_01/KubernetesLogReader.java @@ -92,14 +92,22 @@ public static void main(String[] args) throws IOException { PrometheusMetrics prometheusMetrics = new PrometheusMetrics(appConfig.getMetrics().getPort()); // Pool of Relp output threads to be shared by every consumer - BlockingQueue relpOutputPool = new LinkedBlockingDeque<>(appConfig.getRelp().getOutputThreads()); + int logFileCount = appConfig.getKubernetes().getLogfiles().length; + int outputThreads = appConfig.getKubernetes().getMaxLogReadingThreads() * logFileCount; + LOGGER.info( + "Found {} monitored logfile definitions, reading them with maximum of {} threads each.", + logFileCount, + appConfig.getKubernetes().getMaxLogReadingThreads() + ); + + BlockingQueue relpOutputPool = new LinkedBlockingDeque<>(outputThreads); LOGGER.info( "Starting {} Relp threads towards {}:{}", - appConfig.getRelp().getOutputThreads(), + outputThreads, appConfig.getRelp().getTarget(), appConfig.getRelp().getPort() ); - for(int i=1; i <= appConfig.getRelp().getOutputThreads(); i++) { + for(int i=1; i <= outputThreads; i++) { try { LOGGER.debug( "Adding RelpOutput thread #{}", @@ -135,11 +143,11 @@ public static void main(String[] args) throws IOException { // Graceful shutdown so Relp sessions are gracefully terminated Runtime.getRuntime().addShutdownHook(new Thread(() -> { LOGGER.info("Shutting down."); - for(int i=1; i <= appConfig.getRelp().getOutputThreads(); i++) { + for(int i=1; i <= outputThreads; i++) { LOGGER.info( "Disconnecting relp thread #{}/{}", i, - appConfig.getRelp().getOutputThreads() + outputThreads ); RelpOutput output; try { @@ -169,7 +177,8 @@ public static void main(String[] args) throws IOException { Pattern.compile(logfile), statefulFileReader, 500, - TimeUnit.MILLISECONDS + TimeUnit.MILLISECONDS, + appConfig.getKubernetes().getMaxLogReadingThreads() ); dew.watch(); } catch (IOException | InterruptedException e) { diff --git a/src/main/java/com/teragrep/k8s_01/config/AppConfigKubernetes.java b/src/main/java/com/teragrep/k8s_01/config/AppConfigKubernetes.java index b7843aa..cc75b7e 100644 --- a/src/main/java/com/teragrep/k8s_01/config/AppConfigKubernetes.java +++ b/src/main/java/com/teragrep/k8s_01/config/AppConfigKubernetes.java @@ -42,6 +42,7 @@ public void handleOverrides() { private String[] logfiles; private String url; private String timezone; + private Integer maxLogReadingThreads; public Integer getCacheExpireInterval() { return cacheExpireInterval; @@ -67,6 +68,10 @@ public Integer getCacheMaxEntries() { } public String getTimezone() { return timezone; } + public Integer getMaxLogReadingThreads() { + return maxLogReadingThreads; + } + @Override public String toString() { return new Gson().toJson(this); @@ -104,5 +109,12 @@ public void validate() throws InvalidConfigurationException { if(timezone == null) { throw new InvalidConfigurationException("timezone not found or is null in kubernetes config object"); } + + if(maxLogReadingThreads == null) { + throw new InvalidConfigurationException("maxLogReadingThreads not found or is null in kubernetes config object"); + } + if(maxLogReadingThreads <= 0) { + throw new InvalidConfigurationException("maxLogReadingThreads is invalid, expected >0"); + } } } diff --git a/src/main/java/com/teragrep/k8s_01/config/AppConfigRelp.java b/src/main/java/com/teragrep/k8s_01/config/AppConfigRelp.java index 5e0a850..c631715 100644 --- a/src/main/java/com/teragrep/k8s_01/config/AppConfigRelp.java +++ b/src/main/java/com/teragrep/k8s_01/config/AppConfigRelp.java @@ -60,7 +60,6 @@ public void handleOverrides() throws InvalidConfigurationException { private Integer readTimeout; private Integer writeTimeout; private Integer reconnectInterval; - private Integer outputThreads; public String getTarget() { return target; @@ -86,10 +85,6 @@ public Integer getReconnectInterval() { return reconnectInterval; } - public Integer getOutputThreads() { - return outputThreads; - } - @Override public String toString() { return new Gson().toJson(this); @@ -128,12 +123,5 @@ public void validate() throws InvalidConfigurationException { if(writeTimeout < 0) { throw new InvalidConfigurationException("Relp write timeout is invalid, expected positive integer"); } - - if(outputThreads == null) { - throw new InvalidConfigurationException("outputThreads not found or is null in relp config object"); - } - if(outputThreads <= 0) { - throw new InvalidConfigurationException("Relp output threads is invalid, expected >0"); - } } }