Skip to content

Commit

Permalink
Remove explicit relp output threads setting, add threads per logdefin…
Browse files Browse the repository at this point in the history
…ition and calculate it on the fly (#71)
  • Loading branch information
StrongestNumber9 authored Jul 26, 2023
1 parent 844f16d commit a84648e
Show file tree
Hide file tree
Showing 7 changed files with 38 additions and 29 deletions.
4 changes: 2 additions & 2 deletions etc/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,14 @@
"third-pod_default_third-pod-one-.*",
"third-pod_default_third-pod-two-.*"
]
"maxLogReadingThreads": 2
},
"relp": {
"target": "127.0.0.1",
"port": 1601,
"connectionTimeout": 5000,
"readTimeout": 5000,
"writeTimeout": 5000,
"reconnectInterval": 5000,
"outputThreads": 4
"reconnectInterval": 5000
}
}
10 changes: 5 additions & 5 deletions example/combined.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -65,16 +65,16 @@ data:
"third-pod_default_third-pod-one-.*",
"third-pod_default_third-pod-two-.*",
"fifth-pod_default_.*"
]
],
"maxLogReadingThreads": 3
},
"relp": {
"target": "receiver.receiver.default",
"port": 1601,
"connectionTimeout": 5000,
"readTimeout": 5000,
"writeTimeout": 5000,
"reconnectInterval": 5000,
"outputThreads": 5
"reconnectInterval": 5000
}
}
log4j2.xml: |
Expand Down Expand Up @@ -105,7 +105,7 @@ data:
</Configuration>
kind: ConfigMap
metadata:
name: app-config-58dh7f727h
name: app-config-8dtf4m92md
---
apiVersion: v1
data:
Expand Down Expand Up @@ -271,7 +271,7 @@ spec:
terminationGracePeriodSeconds: 0
volumes:
- configMap:
name: app-config-58dh7f727h
name: app-config-8dtf4m92md
name: app-config
- hostPath:
path: /var/log/containers
Expand Down
6 changes: 3 additions & 3 deletions example/config/k8s_01/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@
"third-pod_default_third-pod-one-.*",
"third-pod_default_third-pod-two-.*",
"fifth-pod_default_.*"
]
],
"maxLogReadingThreads": 3
},
"relp": {
"target": "receiver.receiver.default",
"port": 1601,
"connectionTimeout": 5000,
"readTimeout": 5000,
"writeTimeout": 5000,
"reconnectInterval": 5000,
"outputThreads": 5
"reconnectInterval": 5000
}
}
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
<dependency>
<groupId>com.teragrep</groupId>
<artifactId>rlo_13</artifactId>
<version>1.1.0</version>
<version>1.2.0</version>
</dependency>

<!-- RELP sending library -->
Expand Down
21 changes: 15 additions & 6 deletions src/main/java/com/teragrep/k8s_01/KubernetesLogReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,22 @@ public static void main(String[] args) throws IOException {
PrometheusMetrics prometheusMetrics = new PrometheusMetrics(appConfig.getMetrics().getPort());

// Pool of Relp output threads to be shared by every consumer
BlockingQueue<RelpOutput> relpOutputPool = new LinkedBlockingDeque<>(appConfig.getRelp().getOutputThreads());
int logFileCount = appConfig.getKubernetes().getLogfiles().length;
int outputThreads = appConfig.getKubernetes().getMaxLogReadingThreads() * logFileCount;
LOGGER.info(
"Found {} monitored logfile definitions, reading them with maximum of {} threads each.",
logFileCount,
appConfig.getKubernetes().getMaxLogReadingThreads()
);

BlockingQueue<RelpOutput> relpOutputPool = new LinkedBlockingDeque<>(outputThreads);
LOGGER.info(
"Starting {} Relp threads towards {}:{}",
appConfig.getRelp().getOutputThreads(),
outputThreads,
appConfig.getRelp().getTarget(),
appConfig.getRelp().getPort()
);
for(int i=1; i <= appConfig.getRelp().getOutputThreads(); i++) {
for(int i=1; i <= outputThreads; i++) {
try {
LOGGER.debug(
"Adding RelpOutput thread #{}",
Expand Down Expand Up @@ -135,11 +143,11 @@ public static void main(String[] args) throws IOException {
// Graceful shutdown so Relp sessions are gracefully terminated
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
LOGGER.info("Shutting down.");
for(int i=1; i <= appConfig.getRelp().getOutputThreads(); i++) {
for(int i=1; i <= outputThreads; i++) {
LOGGER.info(
"Disconnecting relp thread #{}/{}",
i,
appConfig.getRelp().getOutputThreads()
outputThreads
);
RelpOutput output;
try {
Expand Down Expand Up @@ -169,7 +177,8 @@ public static void main(String[] args) throws IOException {
Pattern.compile(logfile),
statefulFileReader,
500,
TimeUnit.MILLISECONDS
TimeUnit.MILLISECONDS,
appConfig.getKubernetes().getMaxLogReadingThreads()
);
dew.watch();
} catch (IOException | InterruptedException e) {
Expand Down
12 changes: 12 additions & 0 deletions src/main/java/com/teragrep/k8s_01/config/AppConfigKubernetes.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public void handleOverrides() {
private String[] logfiles;
private String url;
private String timezone;
private Integer maxLogReadingThreads;

public Integer getCacheExpireInterval() {
return cacheExpireInterval;
Expand All @@ -67,6 +68,10 @@ public Integer getCacheMaxEntries() {
}
public String getTimezone() { return timezone; }

public Integer getMaxLogReadingThreads() {
return maxLogReadingThreads;
}

@Override
public String toString() {
return new Gson().toJson(this);
Expand Down Expand Up @@ -104,5 +109,12 @@ public void validate() throws InvalidConfigurationException {
if(timezone == null) {
throw new InvalidConfigurationException("timezone not found or is null in kubernetes config object");
}

if(maxLogReadingThreads == null) {
throw new InvalidConfigurationException("maxLogReadingThreads not found or is null in kubernetes config object");
}
if(maxLogReadingThreads <= 0) {
throw new InvalidConfigurationException("maxLogReadingThreads is invalid, expected >0");
}
}
}
12 changes: 0 additions & 12 deletions src/main/java/com/teragrep/k8s_01/config/AppConfigRelp.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ public void handleOverrides() throws InvalidConfigurationException {
private Integer readTimeout;
private Integer writeTimeout;
private Integer reconnectInterval;
private Integer outputThreads;

public String getTarget() {
return target;
Expand All @@ -86,10 +85,6 @@ public Integer getReconnectInterval() {
return reconnectInterval;
}

public Integer getOutputThreads() {
return outputThreads;
}

@Override
public String toString() {
return new Gson().toJson(this);
Expand Down Expand Up @@ -128,12 +123,5 @@ public void validate() throws InvalidConfigurationException {
if(writeTimeout < 0) {
throw new InvalidConfigurationException("Relp write timeout is invalid, expected positive integer");
}

if(outputThreads == null) {
throw new InvalidConfigurationException("outputThreads not found or is null in relp config object");
}
if(outputThreads <= 0) {
throw new InvalidConfigurationException("Relp output threads is invalid, expected >0");
}
}
}

0 comments on commit a84648e

Please sign in to comment.