Skip to content

Commit

Permalink
Supporting in memory queue (#27)
Browse files Browse the repository at this point in the history
* refactor http request as a stand alone class

* fixes for pr

* fix PR

* changing import *

* fixing spaces

* adding queue impl

* add sender builder

* m

* g

* adding default queue function

* get logs buffer interface as parameter

* changing tests to abstract to fit both queues

* adding long run test for in memory q

* optimizing imports and editing README

* adding a thread to check for disk space making the disk queue fully async

* update release notes

* adding null checks

* deleting author + fixing tests

* fixing long run tests log message

* fix print Successfully sent bulk... when error happend and shouldRetry returns false

* update readme

* refactor http request as a stand alone class

* fixes for pr

* fix PR

* changing import *

* adding queue impl

* fixing spaces

* add sender builder

* m

* g

* adding default queue function

* get logs buffer interface as parameter

* changing tests to abstract to fit both queues

* adding long run test for in memory q

* optimizing imports and editing README

* adding a thread to check for disk space making the disk queue fully async

* update release notes

* adding null checks

* deleting author + fixing tests

* fixing long run tests log message

* fix print Successfully sent bulk... when error happend and shouldRetry returns false

* update readme

* after PR - change style, selling, add constants, and renaming in memory class
  • Loading branch information
idohalevi authored Sep 14, 2018
1 parent e516770 commit 4fc9123
Show file tree
Hide file tree
Showing 17 changed files with 797 additions and 265 deletions.
67 changes: 67 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,72 @@ This appender uses [BigQueue](https://github.com/bulldog2011/bigqueue) implement
| **connectTimeout** | *10 * 1000* | The connection timeout during log shipment |
| **debug** | *false* | Print some debug messages to stdout to help to diagnose issues |
| **compressRequests** | *false* | Boolean. `true` if logs are compressed in gzip format before sending. `false` if logs are sent uncompressed. |
| **gcPersistedQueueFilesIntervalSeconds** | *30* | How often the disk queue should clean sent logs from disk |
| **bufferThreshold** | *1024 * 1024 * 100* | The amount of memory disk we are allowed to use for the memory queue |
| **checkDiskSpaceInterval** | *1000* | How often the should disk queue check for space (in milliseconds) |




### Code Example
From version 1.0.15 we use a builder to get Logz.io sender
```java
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.gson.JsonObject;

public class LogzioSenderExample {

public static void main(String[] args) {

HttpsRequestConfiguration conf = HttpsRequestConfiguration
.builder()
.setCompressRequests(true)
.setConnectTimeout(10*1000)
.setSocketTimeout(10*1000)
.setLogzioListenerUrl("https://listener.logz.io:8071")
.setLogzioType("javaSenderType")
.setLogzioToken("123456789")
.build();

// Use one of the following implementations
// 1) disk queue example
LogzioSender logzioSender = LogzioSender
.builder()
.setDebug(false)
.setTasksExecutor(Executors.newScheduledThreadPool(3))
.setDrainTimeout(drainTimeout)
.setReporter(new LogzioStatusReporter(){/*implement simple interface for logging sender logging */})
.setHttpsRequestConfiguration(httpsRequestConfiguration)
.WithDiskMemoryQueue()
.setBufferDir(bufferDir)
.setFsPercentThreshold(fsPercentThreshold)
.setCheckDiskSpaceInterval(1000)
.setGcPersistedQueueFilesIntervalSeconds(30)
.EndDiskQueue()
.build();

// 2) in memory queue example
LogzioSender logzioSender = LogzioSender
.builder()
.setDebug(false)
.setTasksExecutor(Executors.newScheduledThreadPool(3))
.setDrainTimeout(drainTimeout)
.setReporter(new LogzioStatusReporter(){/*implement simple interface for logging sender logging */})
.setHttpsRequestConfiguration(conf)
.WithInMemoryLogsBuffer()
.setBufferThreshold(1024 * 1024 * 100) //100MB
.EndInMemoryLogsBuffer()
.build();

sender.start();
JsonObject jsonMessage = createLogMessage(); // create JsonObject to send to logz.io
sender.send(jsonMessage);
}
}
```

Until version 1.0.14
```java
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -53,6 +116,10 @@ public class LogzioSenderExample {


### Release notes
- 1.0.12 - 1.0.15
- separating https request from the sender
- add implementation for in memory queue
- add a builder for sender, http configuration, and buffers implementation
- 1.0.11 fix shaded
- 1.0.10 add gzip compression
- 1.0.9 add auto deploy
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,20 @@
import com.google.common.base.Throwables;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import org.eclipse.jetty.http.GzipHttpContent;
import org.eclipse.jetty.io.RuntimeIOException;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.*;
import java.io.BufferedReader;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Reader;
import java.net.BindException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
Expand Down
158 changes: 158 additions & 0 deletions logzio-sender/src/main/java/io/logz/sender/DiskQueue.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
package io.logz.sender;

import com.bluejeans.common.bigqueue.BigQueue;
import io.logz.sender.exceptions.LogzioParameterErrorException;

import java.io.File;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class DiskQueue implements LogzioLogsBufferInterface{
private final BigQueue logsBuffer;
private final File queueDirectory;
private final boolean dontCheckEnoughDiskSpace;
private final int fsPercentThreshold;
private final SenderStatusReporter reporter;
private final ScheduledExecutorService diskSpaceTasks;
private volatile boolean isEnoughSpace;

private DiskQueue(File bufferDir, boolean dontCheckEnoughDiskSpace, int fsPercentThreshold,
int gcPersistedQueueFilesIntervalSeconds, SenderStatusReporter reporter,
int checkDiskSpaceInterval, ScheduledExecutorService diskSpaceTasks)
throws LogzioParameterErrorException {

this.reporter = reporter;
queueDirectory = bufferDir;
validateParameters();
// divide bufferDir to dir and queue name
String dir = bufferDir.getAbsoluteFile().getParent();
String queueNameDir = bufferDir.getName();
if (dir == null || queueNameDir.isEmpty() ) {
throw new LogzioParameterErrorException("bufferDir", " value is empty: " + bufferDir.getAbsolutePath());
}
logsBuffer = new BigQueue(dir, queueNameDir);
this.dontCheckEnoughDiskSpace = dontCheckEnoughDiskSpace;
this.fsPercentThreshold = fsPercentThreshold;
this.diskSpaceTasks = diskSpaceTasks;
this.isEnoughSpace = true;
diskSpaceTasks.scheduleWithFixedDelay(this::gcBigQueue, 0, gcPersistedQueueFilesIntervalSeconds, TimeUnit.SECONDS);
diskSpaceTasks.scheduleWithFixedDelay(this::isEnoughSpace, 0, checkDiskSpaceInterval, TimeUnit.MILLISECONDS);
}

private void validateParameters() throws LogzioParameterErrorException {
if (queueDirectory == null) {
throw new LogzioParameterErrorException("bufferDir", "value is null.");
}
if (reporter == null) {
throw new LogzioParameterErrorException("reporter", "value is null.");
}
}

@Override
public void enqueue(byte[] log) {
if (isEnoughSpace) {
logsBuffer.enqueue(log);
}
}

@Override
public byte[] dequeue() {
return logsBuffer.dequeue();
}

@Override
public boolean isEmpty() {
return logsBuffer.isEmpty();
}

private void isEnoughSpace() {
if (dontCheckEnoughDiskSpace) {
return;
}
int actualUsedFsPercent = 100 - ((int) (((double) queueDirectory.getUsableSpace() / queueDirectory.getTotalSpace()) * 100));
if (actualUsedFsPercent >= fsPercentThreshold) {
reporter.warning(String.format("Logz.io: Dropping logs, as FS used space on %s is %d percent, and the drop threshold is %d percent",
queueDirectory.getAbsolutePath(), actualUsedFsPercent, fsPercentThreshold));
isEnoughSpace = false;
} else {
isEnoughSpace = true;
}
}

private void gcBigQueue() {
try {
logsBuffer.gc();
} catch (Throwable e) {
// We cant throw anything out, or the task will stop, so just swallow all
reporter.error("Uncaught error from BigQueue.gc()", e);
}
}

@Override
public void close() {
gcBigQueue();
}

public static class Builder {
private boolean dontCheckEnoughDiskSpace = false;
private int fsPercentThreshold = 98;
private int gcPersistedQueueFilesIntervalSeconds = 30;
private int checkDiskSpaceInterval = 1000;
private File bufferDir;
private SenderStatusReporter reporter;
private ScheduledExecutorService diskSpaceTasks;
private LogzioSender.Builder context;

Builder(LogzioSender.Builder context, ScheduledExecutorService diskSpaceTasks) {
this.context = context;
this.diskSpaceTasks = diskSpaceTasks;
}

public Builder setFsPercentThreshold(int fsPercentThreshold) {
this.fsPercentThreshold = fsPercentThreshold;
if (fsPercentThreshold == -1) {
dontCheckEnoughDiskSpace = true;
}
return this;
}

public Builder setGcPersistedQueueFilesIntervalSeconds(int gcPersistedQueueFilesIntervalSeconds) {
this.gcPersistedQueueFilesIntervalSeconds = gcPersistedQueueFilesIntervalSeconds;
return this;
}

public Builder setCheckDiskSpaceInterval(int checkDiskSpaceInterval) {
this.checkDiskSpaceInterval = checkDiskSpaceInterval;
return this;
}

public Builder setBufferDir(File bufferDir) {
this.bufferDir = bufferDir;
return this;
}

Builder setReporter(SenderStatusReporter reporter) {
this.reporter = reporter;
return this;
}

Builder setDiskSpaceTasks(ScheduledExecutorService diskSpaceTasks) {
this.diskSpaceTasks = diskSpaceTasks;
return this;
}

public LogzioSender.Builder EndDiskQueue() {
context.setDiskQueueBuilder(this);
return context;
}

DiskQueue build() throws LogzioParameterErrorException {
return new DiskQueue(bufferDir, dontCheckEnoughDiskSpace, fsPercentThreshold,
gcPersistedQueueFilesIntervalSeconds, reporter, checkDiskSpaceInterval, diskSpaceTasks);
}
}

public static Builder builder(LogzioSender.Builder context, ScheduledExecutorService diskSpaceTasks){
return new Builder(context, diskSpaceTasks);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@

import java.net.MalformedURLException;
import java.net.URL;
import java.util.logging.Logger;

import static java.util.Objects.requireNonNull;

public class HttpsRequestConfiguration {
private final int initialWaitBeforeRetryMS;
Expand All @@ -17,6 +14,7 @@ public class HttpsRequestConfiguration {
private final String logzioToken;
private final String logzioType;
private final URL logzioListenerUrl;
private final boolean compressRequests;

public int getInitialWaitBeforeRetryMS() {
return initialWaitBeforeRetryMS;
Expand Down Expand Up @@ -54,22 +52,39 @@ public boolean isCompressRequests() {
return compressRequests;
}

private final boolean compressRequests;

private URL createURL(String url) throws MalformedURLException {
if (url == null || url.isEmpty()) {
throw new MalformedURLException();
}
return logzioType == null ?
new URL(url + "/?token=" + logzioToken) :
new URL(url + "/?token=" + logzioToken + "&type=" + logzioType);
}

private HttpsRequestConfiguration(String logzioToken,
int maxRetriesAttempts, int initialWaitBeforeRetryMS, int socketTimeout,
int connectTimeout, String requestMethod, URL logzioListenerUrl,
int connectTimeout, String requestMethod, String logzioListenerUrl,
boolean compressRequests, String logzioType) throws LogzioParameterErrorException {
this.maxRetriesAttempts = maxRetriesAttempts;
this.initialWaitBeforeRetryMS = initialWaitBeforeRetryMS;
this.socketTimeout = socketTimeout;
this.connectTimeout = connectTimeout;
this.requestMethod = requestMethod;
this.logzioListenerUrl = logzioListenerUrl;

if (logzioToken == null || logzioToken.isEmpty()) {
throw new LogzioParameterErrorException("logzioToken = " + logzioToken, "logzioToken can't be empty string or null ");
}

this.logzioToken = logzioToken;
this.compressRequests = compressRequests;
this.logzioType = logzioType;

try {
this.logzioListenerUrl = createURL(logzioListenerUrl);
} catch (MalformedURLException e){
throw new LogzioParameterErrorException("logzioUrl=" + logzioListenerUrl + " token=" + logzioToken
+ " type=" + logzioType, "For some reason could not initialize URL. Cant recover.." + e);
}
}

public static Builder builder() { return new Builder(); }
Expand All @@ -84,7 +99,6 @@ public static class Builder {
private String logzioListenerUrl = "https://listener.logz.io:8071";
private String logzioToken;
private boolean compressRequests = false;
private static final Logger LOGGER = Logger.getLogger( Builder.class.getName() );

public Builder setLogzioToken(String logzioToken){
this.logzioToken = logzioToken;
Expand Down Expand Up @@ -121,9 +135,6 @@ public Builder setLogzioType(String logzioType){
}

public Builder setLogzioListenerUrl(String logzioListenerUrl){
if (logzioListenerUrl.equals(this.logzioListenerUrl)) {
return this;
}
this.logzioListenerUrl = logzioListenerUrl;
return this;
}
Expand All @@ -133,29 +144,15 @@ public Builder setCompressRequests(boolean compressRequests) {
return this;
}

private URL createURL(String url) throws MalformedURLException {
return logzioType == null ?
new URL(url + "/?token=" + logzioToken) :
new URL(url + "/?token=" + logzioToken + "&type=" + logzioType);
}

public HttpsRequestConfiguration build() throws LogzioParameterErrorException {
URL url;
try {
url = createURL(logzioListenerUrl);
} catch (MalformedURLException e){
LOGGER.severe("Can't connect to Logzio: " + e.getMessage());
throw new LogzioParameterErrorException("logzioUrl=" + logzioListenerUrl + " token=" + logzioToken
+ " type=" + logzioType, "For some reason could not initialize URL. Cant recover.." + e);
}
return new HttpsRequestConfiguration(
requireNonNull(logzioToken, "logzioToken can't be null"),
logzioToken,
maxRetriesAttempts,
initialWaitBeforeRetryMS,
socketTimeout,
connectTimeout,
requestMethod,
url,
logzioListenerUrl,
compressRequests,
logzioType);
}
Expand Down
Loading

0 comments on commit 4fc9123

Please sign in to comment.