Skip to content

Commit

Permalink
Merge pull request #96 from logzio/feature/inject-otel-context
Browse files Browse the repository at this point in the history
[Feature] iInject otel context
  • Loading branch information
yotamloe authored Dec 30, 2024
2 parents cd7d0df + 0189748 commit 8004362
Show file tree
Hide file tree
Showing 8 changed files with 176 additions and 52 deletions.
51 changes: 27 additions & 24 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,36 +21,37 @@ This appender uses [BigQueue](https://github.com/bulldog2011/bigqueue) implement
If you use Gradle, add the dependency to your project as follows:

```java
implementation 'io.logz.sender:logzio-java-sender:2.0.0'
implementation 'io.logz.sender:logzio-java-sender:${logzio-sender-version}'
```

### Parameters
| Parameter | Default | Explained |
| ------------------ | ------------------------------------ | ----- |
| **token** | *None* | Your Logz.io token, which can be found under "settings" in your account.
| **logzioType** | *java* | The [log type](http://support.logz.io/support/solutions/articles/6000103063-what-is-type-) for that sender |
| **drainTimeoutSec** | *5* | How often the sender should drain the queue (in seconds) |
| **logzioUrl** | *https://listener.logz.io:8071* | Logz.io URL, that can be found under "Log Shipping -> Libraries" in your account.
| **socketTimeout** | *10 * 1000* | The socket timeout during log shipment |
| **connectTimeout** | *10 * 1000* | The connection timeout during log shipment |
| **debug** | *false* | Print some debug messages to stdout to help to diagnose issues |
| **compressRequests** | *false* | Boolean. `true` if logs are compressed in gzip format before sending. `false` if logs are sent uncompressed. |
| **exceedMaxSizeAction** | `cut` | String. `cut` to truncate the message field or `drop` to drop log that exceed the allowed maximum size for logzio. If the log size exceeding the maximum size allowed after truncating the message field, the log will be dropped.|
| Parameter | Default | Explained |
|------------------------------|---------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| **token** | *None* | Your Logz.io token, which can be found under "settings" in your account. |
| **logzioType** | *java* | The [log type](http://support.logz.io/support/solutions/articles/6000103063-what-is-type-) for that sender |
| **drainTimeoutSec** | *5* | How often the sender should drain the queue (in seconds) |
| **logzioUrl** | *https://listener.logz.io:8071* | Logz.io URL, that can be found under "Log Shipping -> Libraries" in your account. |
| **socketTimeout** | *10 * 1000* | The socket timeout during log shipment |
| **connectTimeout** | *10 * 1000* | The connection timeout during log shipment |
| **debug** | *false* | Print some debug messages to stdout to help to diagnose issues |
| **compressRequests** | *false* | Boolean. `true` if logs are compressed in gzip format before sending. `false` if logs are sent uncompressed. |
| **exceedMaxSizeAction** | `cut` | String. `cut` to truncate the message field or `drop` to drop log that exceed the allowed maximum size for logzio. If the log size exceeding the maximum size allowed after truncating the message field, the log will be dropped. |
| **withOpentelemetryContext** | `true` | Boolean. Add trace_id, span_id, service_name fields to logs when opentelemetry context is available. |

#### Parameters for in-memory queue
| Parameter | Default | Explained |
| ------------------ | ------------------------------------ | ----- |
| **inMemoryQueueCapacityInBytes** | *1024 * 1024 * 100* | The amount of memory(bytes) we are allowed to use for the memory queue. If the value is -1 the sender will not limit the queue size.|
| **logsCountLimit** | *-1* | The number of logs in the memory queue before dropping new logs. Default value is -1 (the sender will not limit the queue by logs count)|
| Parameter | Default | Explained |
|----------------------------------|---------------------|------------------------------------------------------------------------------------------------------------------------------------------|
| **inMemoryQueueCapacityInBytes** | *1024 * 1024 * 100* | The amount of memory(bytes) we are allowed to use for the memory queue. If the value is -1 the sender will not limit the queue size. |
| **logsCountLimit** | *-1* | The number of logs in the memory queue before dropping new logs. Default value is -1 (the sender will not limit the queue by logs count) |


#### Parameters for disk queue
| Parameter | Default | Explained |
| ------------------ | ------------------------------------ | ----- |
| **queueDir** | *None* | Where the sender should store the queue. It should be at least one folder in path.|
| **fileSystemFullPercentThreshold** | *98* | The percent of used file system space at which the sender will stop queueing. When we will reach that percentage, the file system in which the queue is stored will drop all new logs until the percentage of used space drops below that threshold. Set to -1 to never stop processing new logs |
| **gcPersistedQueueFilesIntervalSeconds** | *30* | How often the disk queue should clean sent logs from disk |
| **checkDiskSpaceInterval** | *1000* | How often the should disk queue check for space (in milliseconds) |
| Parameter | Default | Explained |
|------------------------------------------|---------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| **queueDir** | *None* | Where the sender should store the queue. It should be at least one folder in path. |
| **fileSystemFullPercentThreshold** | *98* | The percent of used file system space at which the sender will stop queueing. When we will reach that percentage, the file system in which the queue is stored will drop all new logs until the percentage of used space drops below that threshold. Set to -1 to never stop processing new logs |
| **gcPersistedQueueFilesIntervalSeconds** | *30* | How often the disk queue should clean sent logs from disk |
| **checkDiskSpaceInterval** | *1000* | How often the should disk queue check for space (in milliseconds) |



Expand Down Expand Up @@ -135,8 +136,10 @@ public class LogzioSenderExample {


## Release notes
- 2.1.0
- Upgrade packages versions
- 2.2.0
- Add `WithOpentelemetryContext` parameter to add `trace_id`, `span_id`, `service_name` fields to logs when opentelemetry context is available.
- 2.1.0
- Upgrade packages versions
- 2.0.1
- Add `User-Agent` header with logz.io information
- 2.0.0 - **THIS IS A SNAPSHOT RELEASE - SUPPORTED WITH JDK 11 AND ABOVE**
Expand Down
2 changes: 1 addition & 1 deletion logzio-sender-test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>logzio-java-sender</artifactId>
<groupId>io.logz.sender</groupId>
<version>2.1.0</version>
<version>2.2.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
17 changes: 16 additions & 1 deletion logzio-sender/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>logzio-java-sender</artifactId>
<groupId>io.logz.sender</groupId>
<version>2.1.0</version>
<version>2.2.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down Expand Up @@ -84,6 +84,21 @@
</build>

<dependencies>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-api</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-semconv</artifactId>
<version>1.26.0-alpha</version>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.ikasan</groupId>
<artifactId>bigqueue</artifactId>
Expand Down
39 changes: 34 additions & 5 deletions logzio-sender/src/main/java/io/logz/sender/LogzioSender.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.sdk.resources.Resource;

public class LogzioSender {
private static final int MAX_SIZE_IN_BYTES = 3 * 1024 * 1024; // 3 MB
Expand All @@ -37,10 +41,11 @@ public class LogzioSender {
private ScheduledExecutorService tasksExecutor;
private final AtomicBoolean drainRunning = new AtomicBoolean(false);
private final HttpsSyncSender httpsSyncSender;
private final boolean withOpentelemetryContext;

private LogzioSender(HttpsRequestConfiguration httpsRequestConfiguration, int drainTimeout, boolean debug,
SenderStatusReporter reporter, ScheduledExecutorService tasksExecutor,
LogsQueue logsQueue, String exceedMaxSizeAction) throws LogzioParameterErrorException {
LogsQueue logsQueue, String exceedMaxSizeAction, boolean withOpentelemetryContext) throws LogzioParameterErrorException {

if (logsQueue == null || reporter == null || httpsRequestConfiguration == null) {
throw new LogzioParameterErrorException("logsQueue=" + logsQueue + " reporter=" + reporter
Expand All @@ -55,6 +60,7 @@ private LogzioSender(HttpsRequestConfiguration httpsRequestConfiguration, int dr
this.reporter = reporter;
httpsSyncSender = new HttpsSyncSender(httpsRequestConfiguration, reporter);
this.tasksExecutor = tasksExecutor;
this.withOpentelemetryContext = withOpentelemetryContext;
debug("Created new LogzioSender class");
}

Expand All @@ -67,7 +73,7 @@ private String validateAndGetExceedMaxSizeAction(String exceedMaxSizeAction) thr
}

private static LogzioSender getLogzioSender(HttpsRequestConfiguration httpsRequestConfiguration, int drainTimeout, boolean debug, SenderStatusReporter reporter,
ScheduledExecutorService tasksExecutor, LogsQueue logsQueue, String exceedMaxSizeAction)
ScheduledExecutorService tasksExecutor, LogsQueue logsQueue, String exceedMaxSizeAction, boolean withOpentelemetryContext)
throws LogzioParameterErrorException {
String tokenHash = Hashing.sha256()
.hashString(httpsRequestConfiguration.getLogzioToken(), StandardCharsets.UTF_8)
Expand All @@ -85,7 +91,7 @@ private static LogzioSender getLogzioSender(HttpsRequestConfiguration httpsReque
}

LogzioSender logzioSender = new LogzioSender(httpsRequestConfiguration, drainTimeout, debug, reporter,
tasksExecutor, logsQueue, exceedMaxSizeAction);
tasksExecutor, logsQueue, exceedMaxSizeAction, withOpentelemetryContext);
logzioSenderInstances.put(tokenAndTypePair, logzioSender);
return logzioSender;
} else {
Expand All @@ -102,6 +108,20 @@ private static LogzioSender getLogzioSender(HttpsRequestConfiguration httpsReque
}
}

private void addOpenTelemetryContext(JsonObject jsonMessage) {
Span currentSpan = Span.current();
if (currentSpan != null) {
SpanContext spanContext = currentSpan.getSpanContext();
if (spanContext.isValid()) {
jsonMessage.addProperty("trace_id", spanContext.getTraceId());
jsonMessage.addProperty("span_id", spanContext.getSpanId());
Resource resource = Resource.getDefault();
Attributes attributes = resource.getAttributes();
String serviceName = attributes.get(io.opentelemetry.semconv.resource.attributes.ResourceAttributes.SERVICE_NAME);
jsonMessage.addProperty("service_name", serviceName);
}
}
}
public void start() {
tasksExecutor.scheduleWithFixedDelay(this::drainQueueAndSend, 0, drainTimeout, TimeUnit.SECONDS);
}
Expand Down Expand Up @@ -145,7 +165,9 @@ public void clearQueue() throws IOException {
}

public void send(JsonObject jsonMessage) {

if (this.withOpentelemetryContext) {
addOpenTelemetryContext(jsonMessage);
}
// check for oversized message
int jsonByteLength = jsonMessage.toString().getBytes(StandardCharsets.UTF_8).length;
String jsonMessageField = jsonMessage.get("message").getAsString();
Expand Down Expand Up @@ -280,6 +302,12 @@ public static class Builder {
private DiskQueue.Builder diskQueueBuilder;
private HttpsRequestConfiguration httpsRequestConfiguration;
private String exceedMaxSizeAction = "cut";
private boolean withOpentelemetryContext = true;

public Builder setWithOpentelemetryContext(boolean withOpentelemetryContext) {
this.withOpentelemetryContext = withOpentelemetryContext;
return this;
}

public Builder setExceedMaxSizeAction(String exceedMaxSizeAction) {
this.exceedMaxSizeAction = exceedMaxSizeAction;
Expand Down Expand Up @@ -342,7 +370,8 @@ public LogzioSender build() throws LogzioParameterErrorException, IOException {
reporter,
tasksExecutor,
getLogsQueue(),
exceedMaxSizeAction
exceedMaxSizeAction,
withOpentelemetryContext
);
}

Expand Down
8 changes: 4 additions & 4 deletions logzio-sender/src/test/java/io/logz/sender/DiskQueueTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ public class DiskQueueTest extends LogzioSenderTest {
protected Builder getLogzioSenderBuilder(String token, String type, Integer drainTimeout,
Integer socketTimeout, Integer serverTimeout,
ScheduledExecutorService tasks,
boolean compressRequests) throws LogzioParameterErrorException {
boolean compressRequests, boolean withOpentelemetryContext) throws LogzioParameterErrorException {
Builder logzioSenderBuilder = super.getLogzioSenderBuilder(token, type, drainTimeout,
socketTimeout, serverTimeout, tasks, compressRequests);
socketTimeout, serverTimeout, tasks, compressRequests, withOpentelemetryContext);

if (queueDir == null) {
queueDir = TestEnvironment.createTempDirectory();
Expand Down Expand Up @@ -70,7 +70,7 @@ public void testSenderCantWriteToEmptyDirectory() {
try {
setQueueDir(tempDirectory);
Builder testSenderBuilder = getLogzioSenderBuilder(token, type, drainTimeout, 10 * 1000,
10 * 1000, tasks, false);
10 * 1000, tasks, false, false);
LogzioSender testSender = createLogzioSender(testSenderBuilder);
throw new LogzioParameterErrorException("Should not reach here", "fail");
} catch (LogzioParameterErrorException | IOException e) {
Expand All @@ -95,7 +95,7 @@ public void testSenderCreatesDirectoryWhichDoesNotExists() throws Exception {
assertFalse(queueDir.exists());
setQueueDir(queueDir);
Builder testSenderBuilder = getLogzioSenderBuilder(token, type, drainTimeout,
10 * 1000, 10 * 1000, tasks, false);
10 * 1000, 10 * 1000, tasks, false, false);
LogzioSender testSender = createLogzioSender(testSenderBuilder);
testSender.send(createJsonMessage(loggerName, message1));
assertTrue(queueDir.exists());
Expand Down
Loading

0 comments on commit 8004362

Please sign in to comment.