Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial support for GELF HTTP transport #37

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -93,18 +93,25 @@
<gpg.keyname>B1606F22</gpg.keyname>

<slf4j.version>1.7.25</slf4j.version>
<netty.version>4.1.42.Final</netty.version>
<jackson.version>2.8.11</jackson.version>
</properties>

<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-handler</artifactId>
<version>4.1.42.Final</version>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-http</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.8.11</version>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
Expand All @@ -129,6 +136,12 @@
<version>${slf4j.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
12 changes: 12 additions & 0 deletions src/main/java/org/graylog2/gelfclient/GelfConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,18 @@

import java.io.File;
import java.net.InetSocketAddress;
import java.net.URI;

/**
* The configuration used by a {@link org.graylog2.gelfclient.transport.GelfTransport}.
*/
public class GelfConfiguration {
private static final int DEFAULT_PORT = 12201;
private static final String DEFAULT_HOSTNAME = "127.0.0.1";

private final String hostname;
private final int port;
private URI uri = URI.create("http://127.0.0.1:12201/gelf");
private GelfTransports transport = GelfTransports.TCP;
private Compression compression = Compression.GZIP;
private int queueSize = 512;
Expand Down Expand Up @@ -114,6 +117,15 @@ public InetSocketAddress getRemoteAddress() {
return new InetSocketAddress(hostname, port);
}

public URI getUri() {
return uri;
}

public GelfConfiguration uri(URI uri) {
this.uri = uri;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The uri is set but it's not used in #getRemoteAddress(). We should probably check if the uri field is set and use it as an override for the hostname and port fields.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The getUri() and uri() methods are also missing javadoc.

return this;
}

/**
* Get the transport protocol used with the GELF server.
*
Expand Down
7 changes: 6 additions & 1 deletion src/main/java/org/graylog2/gelfclient/GelfTransports.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package org.graylog2.gelfclient;

import org.graylog2.gelfclient.transport.GelfHttpTransport;
import org.graylog2.gelfclient.transport.GelfTcpTransport;
import org.graylog2.gelfclient.transport.GelfTransport;
import org.graylog2.gelfclient.transport.GelfUdpTransport;
Expand All @@ -25,7 +26,8 @@
*/
public enum GelfTransports {
TCP,
UDP;
UDP,
HTTP;

/**
* Creates a {@link GelfTransport} from the given protocol and configuration.
Expand All @@ -44,6 +46,9 @@ public static GelfTransport create(final GelfTransports transport, final GelfCon
case UDP:
gelfTransport = new GelfUdpTransport(config);
break;
case HTTP:
gelfTransport = new GelfHttpTransport(config);
break;
default:
throw new IllegalArgumentException("Unsupported GELF transport: " + transport);
}
Expand Down
44 changes: 44 additions & 0 deletions src/main/java/org/graylog2/gelfclient/encoder/GelfHttpEncoder.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package org.graylog2.gelfclient.encoder;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpVersion;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.URI;
import java.util.List;

public class GelfHttpEncoder extends MessageToMessageEncoder<ByteBuf> {
private static final Logger LOG = LoggerFactory.getLogger(GelfHttpEncoder.class);

private final URI uri;

public GelfHttpEncoder(URI uri) {
this.uri = uri;
}

@Override
protected void encode(ChannelHandlerContext channelHandlerContext, ByteBuf msg, List<Object> list) throws Exception {
final FullHttpRequest request = new DefaultFullHttpRequest(
HttpVersion.HTTP_1_1, HttpMethod.POST, uri.getRawPath(), msg.retain());
request.headers().set(HttpHeaderNames.HOST, uri.getHost());
request.headers().set(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON);
request.headers().set(HttpHeaderNames.CONTENT_LENGTH, msg.readableBytes());
request.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE);

list.add(request);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
LOG.error("Error while encoding HTTP request", cause);
ctx.close();
}
}
135 changes: 135 additions & 0 deletions src/main/java/org/graylog2/gelfclient/transport/GelfHttpTransport.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Copyright 2018 Graylog, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.graylog2.gelfclient.transport;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpContentDecompressor;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import org.graylog2.gelfclient.GelfConfiguration;
import org.graylog2.gelfclient.encoder.GelfHttpEncoder;
import org.graylog2.gelfclient.encoder.GelfMessageJsonEncoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A {@link GelfTransport} implementation that uses HTTP(S) to send GELF messages.
* <p>This class is thread-safe.</p>
*/
public class GelfHttpTransport extends AbstractGelfTransport {
private static final Logger LOG = LoggerFactory.getLogger(GelfHttpTransport.class);

/**
* Creates a new TCP GELF transport.
*
* @param config the GELF client configuration
*/
public GelfHttpTransport(GelfConfiguration config) {
super(config);
}

@Override
protected void createBootstrap(final EventLoopGroup workerGroup) {
final Bootstrap bootstrap = new Bootstrap();
final GelfSenderThread senderThread = new GelfSenderThread(queue, config.getMaxInflightSends());
senderThreadReference.set(senderThread);

bootstrap.group(workerGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, config.getConnectTimeout())
.option(ChannelOption.TCP_NODELAY, config.isTcpNoDelay())
.option(ChannelOption.SO_KEEPALIVE, config.isTcpKeepAlive())
.remoteAddress(config.getRemoteAddress())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not taking the GelfConfiguration#getUri() configuration option into account. (see comment in GelfConfiguration.java)

.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
if (config.isTlsEnabled()) {
LOG.debug("TLS enabled.");
final SslContext sslContext;

if (!config.isTlsCertVerificationEnabled()) {
// If the cert should not be verified just use an insecure trust manager.
LOG.debug("TLS certificate verification disabled!");
sslContext = SslContextBuilder.forClient()
.trustManager(InsecureTrustManagerFactory.INSTANCE)
.build();
} else if (config.getTlsTrustCertChainFile() != null) {
// If a cert chain file is set, use it.
LOG.debug("TLS certificate chain file: {}", config.getTlsTrustCertChainFile());
sslContext = SslContextBuilder.forClient()
.trustManager(config.getTlsTrustCertChainFile())
.build();
} else {
// Otherwise use the JVM default cert chain.
sslContext = SslContextBuilder.forClient().build();
}

ch.pipeline().addLast(sslContext.newHandler(ch.alloc()));
}

ch.pipeline().addLast(new HttpClientCodec());
ch.pipeline().addLast(new HttpContentDecompressor());
ch.pipeline().addLast(new GelfHttpEncoder(config.getUri()));
ch.pipeline().addLast(new GelfMessageJsonEncoder());
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
senderThread.start(ctx.channel());
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
LOG.info("Channel disconnected!");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see lots of these errors when running against a real HTTP server and it also drops messages.

[gelfHttpTransport-1-1] INFO org.graylog2.gelfclient.transport.GelfHttpTransport - Channel disconnected!
[gelfHttpTransport-1-1] ERROR org.graylog2.gelfclient.encoder.GelfHttpEncoder - Error while encoding HTTP request
java.io.IOException: Connection reset by peer
	at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
	at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
	at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
	at sun.nio.ch.IOUtil.read(IOUtil.java:192)
	at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
	at io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:247)
	at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1147)
	at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:347)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:700)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:635)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:552)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:514)
	at io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1044)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:748)
[gelfHttpTransport-1-1] INFO org.graylog2.gelfclient.transport.GelfHttpTransport - Channel disconnected!
[gelfHttpTransport-1-1] INFO org.graylog2.gelfclient.transport.GelfHttpTransport - Channel disconnected!
[gelfHttpTransport-1-1] ERROR org.graylog2.gelfclient.encoder.GelfHttpEncoder - Error while encoding HTTP request
java.io.IOException: Connection reset by peer
	at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
	at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
	at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
	at sun.nio.ch.IOUtil.read(IOUtil.java:192)
	at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
	at io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:247)
	at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1147)
	at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:347)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:700)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:635)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:552)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:514)
	at io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1044)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:748)
[gelfHttpTransport-1-1] INFO org.graylog2.gelfclient.transport.GelfHttpTransport - Channel disconnected!
[gelfHttpTransport-1-1] INFO org.graylog2.gelfclient.transport.GelfHttpTransport - Channel disconnected!

senderThread.stop();
scheduleReconnect(ctx.channel().eventLoop());
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
LOG.error("Exception caught", cause);
}
});
}
});

if (config.getSendBufferSize() != -1) {
bootstrap.option(ChannelOption.SO_SNDBUF, config.getSendBufferSize());
}

bootstrap.connect().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
LOG.debug("Connected!");
} else {
LOG.error("Connection failed: {}", future.cause().getMessage());
scheduleReconnect(future.channel().eventLoop());
}
}
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright 2018 Graylog, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.graylog2.gelfclient.encoder;

import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.EncoderException;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpMethod;
import org.testng.annotations.Test;

import java.net.URI;
import java.nio.charset.StandardCharsets;

import static org.testng.AssertJUnit.assertEquals;
import static org.testng.AssertJUnit.assertTrue;

public class GelfHttpEncoderTest {

@Test(expectedExceptions = EncoderException.class)
public void testExceptionIsPassedThrough() throws Exception {
final EmbeddedChannel channel = new EmbeddedChannel(new GelfHttpEncoder(null));
channel.writeOutbound(Unpooled.EMPTY_BUFFER);
}

@Test
public void testEncode() throws Exception {
final URI uri = URI.create("http://example.org:8080/gelf");
final EmbeddedChannel channel = new EmbeddedChannel(new GelfHttpEncoder(uri));
assertTrue(channel.writeOutbound(Unpooled.copiedBuffer("{}", StandardCharsets.UTF_8)));
assertTrue(channel.finish());

final FullHttpRequest request = channel.readOutbound();
assertEquals(HttpMethod.POST, request.method());
assertEquals("/gelf", request.uri());
assertEquals("application/json", request.headers().get(HttpHeaderNames.CONTENT_TYPE));
assertEquals("2", request.headers().get(HttpHeaderNames.CONTENT_LENGTH));

final byte[] bytes = ByteBufUtil.getBytes(request.content());
assertEquals(new byte[]{'{', '}'}, bytes);
}
}
Loading