Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] iInject otel context #96

Merged
merged 15 commits into from
Dec 30, 2024
51 changes: 27 additions & 24 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,36 +21,37 @@ This appender uses [BigQueue](https://github.com/bulldog2011/bigqueue) implement
If you use Gradle, add the dependency to your project as follows:

```java
implementation 'io.logz.sender:logzio-java-sender:2.0.0'
implementation 'io.logz.sender:logzio-java-sender:${logzio-sender-version}'
```

### Parameters
| Parameter | Default | Explained |
| ------------------ | ------------------------------------ | ----- |
| **token** | *None* | Your Logz.io token, which can be found under "settings" in your account.
| **logzioType** | *java* | The [log type](http://support.logz.io/support/solutions/articles/6000103063-what-is-type-) for that sender |
| **drainTimeoutSec** | *5* | How often the sender should drain the queue (in seconds) |
| **logzioUrl** | *https://listener.logz.io:8071* | Logz.io URL, that can be found under "Log Shipping -> Libraries" in your account.
| **socketTimeout** | *10 * 1000* | The socket timeout during log shipment |
| **connectTimeout** | *10 * 1000* | The connection timeout during log shipment |
| **debug** | *false* | Print some debug messages to stdout to help to diagnose issues |
| **compressRequests** | *false* | Boolean. `true` if logs are compressed in gzip format before sending. `false` if logs are sent uncompressed. |
| **exceedMaxSizeAction** | `cut` | String. `cut` to truncate the message field or `drop` to drop log that exceed the allowed maximum size for logzio. If the log size exceeding the maximum size allowed after truncating the message field, the log will be dropped.|
| Parameter | Default | Explained |
|------------------------------|---------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| **token** | *None* | Your Logz.io token, which can be found under "settings" in your account. |
| **logzioType** | *java* | The [log type](http://support.logz.io/support/solutions/articles/6000103063-what-is-type-) for that sender |
| **drainTimeoutSec** | *5* | How often the sender should drain the queue (in seconds) |
| **logzioUrl** | *https://listener.logz.io:8071* | Logz.io URL, that can be found under "Log Shipping -> Libraries" in your account. |
| **socketTimeout** | *10 * 1000* | The socket timeout during log shipment |
| **connectTimeout** | *10 * 1000* | The connection timeout during log shipment |
| **debug** | *false* | Print some debug messages to stdout to help to diagnose issues |
| **compressRequests** | *false* | Boolean. `true` if logs are compressed in gzip format before sending. `false` if logs are sent uncompressed. |
| **exceedMaxSizeAction** | `cut` | String. `cut` to truncate the message field or `drop` to drop log that exceed the allowed maximum size for logzio. If the log size exceeding the maximum size allowed after truncating the message field, the log will be dropped. |
| **withOpentelemetryContext** | `true` | Boolean. Add trace_id, span_id, service_name fields to logs when opentelemetry context is available. |

#### Parameters for in-memory queue
| Parameter | Default | Explained |
| ------------------ | ------------------------------------ | ----- |
| **inMemoryQueueCapacityInBytes** | *1024 * 1024 * 100* | The amount of memory(bytes) we are allowed to use for the memory queue. If the value is -1 the sender will not limit the queue size.|
| **logsCountLimit** | *-1* | The number of logs in the memory queue before dropping new logs. Default value is -1 (the sender will not limit the queue by logs count)|
| Parameter | Default | Explained |
|----------------------------------|---------------------|------------------------------------------------------------------------------------------------------------------------------------------|
| **inMemoryQueueCapacityInBytes** | *1024 * 1024 * 100* | The amount of memory(bytes) we are allowed to use for the memory queue. If the value is -1 the sender will not limit the queue size. |
| **logsCountLimit** | *-1* | The number of logs in the memory queue before dropping new logs. Default value is -1 (the sender will not limit the queue by logs count) |


#### Parameters for disk queue
| Parameter | Default | Explained |
| ------------------ | ------------------------------------ | ----- |
| **queueDir** | *None* | Where the sender should store the queue. It should be at least one folder in path.|
| **fileSystemFullPercentThreshold** | *98* | The percent of used file system space at which the sender will stop queueing. When we will reach that percentage, the file system in which the queue is stored will drop all new logs until the percentage of used space drops below that threshold. Set to -1 to never stop processing new logs |
| **gcPersistedQueueFilesIntervalSeconds** | *30* | How often the disk queue should clean sent logs from disk |
| **checkDiskSpaceInterval** | *1000* | How often the should disk queue check for space (in milliseconds) |
| Parameter | Default | Explained |
|------------------------------------------|---------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| **queueDir** | *None* | Where the sender should store the queue. It should be at least one folder in path. |
| **fileSystemFullPercentThreshold** | *98* | The percent of used file system space at which the sender will stop queueing. When we will reach that percentage, the file system in which the queue is stored will drop all new logs until the percentage of used space drops below that threshold. Set to -1 to never stop processing new logs |
| **gcPersistedQueueFilesIntervalSeconds** | *30* | How often the disk queue should clean sent logs from disk |
| **checkDiskSpaceInterval** | *1000* | How often the should disk queue check for space (in milliseconds) |



Expand Down Expand Up @@ -135,8 +136,10 @@ public class LogzioSenderExample {


## Release notes
- 2.1.0
- Upgrade packages versions
- 2.2.0
- Add `WithOpentelemetryContext` parameter to add `trace_id`, `span_id`, `service_name` fields to logs when opentelemetry context is available.
- 2.1.0
- Upgrade packages versions
- 2.0.1
- Add `User-Agent` header with logz.io information
- 2.0.0 - **THIS IS A SNAPSHOT RELEASE - SUPPORTED WITH JDK 11 AND ABOVE**
Expand Down
15 changes: 15 additions & 0 deletions logzio-sender/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,21 @@
</build>

<dependencies>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-api</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-semconv</artifactId>
<version>1.26.0-alpha</version>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk</artifactId>
<version>1.10.0</version>
</dependency>
<dependency>
<groupId>org.ikasan</groupId>
<artifactId>bigqueue</artifactId>
Expand Down
39 changes: 34 additions & 5 deletions logzio-sender/src/main/java/io/logz/sender/LogzioSender.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.sdk.resources.Resource;

public class LogzioSender {
private static final int MAX_SIZE_IN_BYTES = 3 * 1024 * 1024; // 3 MB
Expand All @@ -37,10 +41,11 @@ public class LogzioSender {
private ScheduledExecutorService tasksExecutor;
private final AtomicBoolean drainRunning = new AtomicBoolean(false);
private final HttpsSyncSender httpsSyncSender;
private final boolean withOpentelemetryContext;

private LogzioSender(HttpsRequestConfiguration httpsRequestConfiguration, int drainTimeout, boolean debug,
SenderStatusReporter reporter, ScheduledExecutorService tasksExecutor,
LogsQueue logsQueue, String exceedMaxSizeAction) throws LogzioParameterErrorException {
LogsQueue logsQueue, String exceedMaxSizeAction, boolean withOpentelemetryContext) throws LogzioParameterErrorException {

if (logsQueue == null || reporter == null || httpsRequestConfiguration == null) {
throw new LogzioParameterErrorException("logsQueue=" + logsQueue + " reporter=" + reporter
Expand All @@ -55,6 +60,7 @@ private LogzioSender(HttpsRequestConfiguration httpsRequestConfiguration, int dr
this.reporter = reporter;
httpsSyncSender = new HttpsSyncSender(httpsRequestConfiguration, reporter);
this.tasksExecutor = tasksExecutor;
this.withOpentelemetryContext = withOpentelemetryContext;
debug("Created new LogzioSender class");
}

Expand All @@ -67,7 +73,7 @@ private String validateAndGetExceedMaxSizeAction(String exceedMaxSizeAction) thr
}

private static LogzioSender getLogzioSender(HttpsRequestConfiguration httpsRequestConfiguration, int drainTimeout, boolean debug, SenderStatusReporter reporter,
ScheduledExecutorService tasksExecutor, LogsQueue logsQueue, String exceedMaxSizeAction)
ScheduledExecutorService tasksExecutor, LogsQueue logsQueue, String exceedMaxSizeAction, boolean withOpentelemetryContext)
throws LogzioParameterErrorException {
String tokenHash = Hashing.sha256()
.hashString(httpsRequestConfiguration.getLogzioToken(), StandardCharsets.UTF_8)
Expand All @@ -85,7 +91,7 @@ private static LogzioSender getLogzioSender(HttpsRequestConfiguration httpsReque
}

LogzioSender logzioSender = new LogzioSender(httpsRequestConfiguration, drainTimeout, debug, reporter,
tasksExecutor, logsQueue, exceedMaxSizeAction);
tasksExecutor, logsQueue, exceedMaxSizeAction, withOpentelemetryContext);
logzioSenderInstances.put(tokenAndTypePair, logzioSender);
return logzioSender;
} else {
Expand All @@ -102,6 +108,20 @@ private static LogzioSender getLogzioSender(HttpsRequestConfiguration httpsReque
}
}

private void addOpenTelemetryContext(JsonObject jsonMessage) {
Span currentSpan = Span.current();
if (currentSpan != null) {
SpanContext spanContext = currentSpan.getSpanContext();
if (spanContext.isValid()) {
jsonMessage.addProperty("trace_id", spanContext.getTraceId());
jsonMessage.addProperty("span_id", spanContext.getSpanId());
Resource resource = Resource.getDefault();
Attributes attributes = resource.getAttributes();
String serviceName = attributes.get(io.opentelemetry.semconv.resource.attributes.ResourceAttributes.SERVICE_NAME);
jsonMessage.addProperty("service_name", serviceName);
}
}
}
public void start() {
tasksExecutor.scheduleWithFixedDelay(this::drainQueueAndSend, 0, drainTimeout, TimeUnit.SECONDS);
}
Expand Down Expand Up @@ -145,7 +165,9 @@ public void clearQueue() throws IOException {
}

public void send(JsonObject jsonMessage) {

if (this.withOpentelemetryContext) {
addOpenTelemetryContext(jsonMessage);
}
// check for oversized message
int jsonByteLength = jsonMessage.toString().getBytes(StandardCharsets.UTF_8).length;
String jsonMessageField = jsonMessage.get("message").getAsString();
Expand Down Expand Up @@ -280,6 +302,12 @@ public static class Builder {
private DiskQueue.Builder diskQueueBuilder;
private HttpsRequestConfiguration httpsRequestConfiguration;
private String exceedMaxSizeAction = "cut";
private boolean withOpentelemetryContext = true;

public Builder setWithOpentelemetryContext(boolean withOpentelemetryContext) {
this.withOpentelemetryContext = withOpentelemetryContext;
return this;
}

public Builder setExceedMaxSizeAction(String exceedMaxSizeAction) {
this.exceedMaxSizeAction = exceedMaxSizeAction;
Expand Down Expand Up @@ -342,7 +370,8 @@ public LogzioSender build() throws LogzioParameterErrorException, IOException {
reporter,
tasksExecutor,
getLogsQueue(),
exceedMaxSizeAction
exceedMaxSizeAction,
withOpentelemetryContext
);
}

Expand Down
8 changes: 4 additions & 4 deletions logzio-sender/src/test/java/io/logz/sender/DiskQueueTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ public class DiskQueueTest extends LogzioSenderTest {
protected Builder getLogzioSenderBuilder(String token, String type, Integer drainTimeout,
Integer socketTimeout, Integer serverTimeout,
ScheduledExecutorService tasks,
boolean compressRequests) throws LogzioParameterErrorException {
boolean compressRequests, boolean withOpentelemetryContext) throws LogzioParameterErrorException {
Builder logzioSenderBuilder = super.getLogzioSenderBuilder(token, type, drainTimeout,
socketTimeout, serverTimeout, tasks, compressRequests);
socketTimeout, serverTimeout, tasks, compressRequests, withOpentelemetryContext);

if (queueDir == null) {
queueDir = TestEnvironment.createTempDirectory();
Expand Down Expand Up @@ -70,7 +70,7 @@ public void testSenderCantWriteToEmptyDirectory() {
try {
setQueueDir(tempDirectory);
Builder testSenderBuilder = getLogzioSenderBuilder(token, type, drainTimeout, 10 * 1000,
10 * 1000, tasks, false);
10 * 1000, tasks, false, false);
LogzioSender testSender = createLogzioSender(testSenderBuilder);
throw new LogzioParameterErrorException("Should not reach here", "fail");
} catch (LogzioParameterErrorException | IOException e) {
Expand All @@ -95,7 +95,7 @@ public void testSenderCreatesDirectoryWhichDoesNotExists() throws Exception {
assertFalse(queueDir.exists());
setQueueDir(queueDir);
Builder testSenderBuilder = getLogzioSenderBuilder(token, type, drainTimeout,
10 * 1000, 10 * 1000, tasks, false);
10 * 1000, 10 * 1000, tasks, false, false);
LogzioSender testSender = createLogzioSender(testSenderBuilder);
testSender.send(createJsonMessage(loggerName, message1));
assertTrue(queueDir.exists());
Expand Down
Loading
Loading