Skip to content

Commit

Permalink
Merge pull request #6 from nmervaillie/neo4j-4-compatibility
Browse files Browse the repository at this point in the history
Neo4j 4 compatibility
  • Loading branch information
mariussturm authored Feb 25, 2021
2 parents 8a76a2e + 4af1c1c commit 1cf25dc
Show file tree
Hide file tree
Showing 13 changed files with 600 additions and 150 deletions.
30 changes: 30 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,19 @@
# Graylog Neo4jOutput Plugin (Experimental)

**Required Graylog version:** 2.0 and later
**Required Neo4j version:** 3.5 and later

Breaking changes between version 2.3 and 4.0
--------------------------------------------

* 4.0 is only compatible with Neo4j 3.5+
* The HTTP url format changed (they are different between Neo4j 3.5 and Neo4j 4.x)
* The query parameters now uses Neo4j standard format `$paramName`

Limitations:
* does not support neo4j native types in query parameters (number, dates, booleans).
The parameters have to be converted into the correct type if needed using cypher functions.
* no multiple database support

Installation
------------
Expand All @@ -21,3 +34,20 @@ This project is using Maven and requires Java 8 or higher.
* Optional: Run `mvn jdeb:jdeb` and `mvn rpm:rpm` to create a DEB and RPM package respectively.
* Copy generated JAR file in target directory to your Graylog plugin directory.
* Restart the Graylog.

Testing in a graylog instance
-----------------------------

* Build the plugin jar using `mvn package -DskipTests`
* Start the graylog and databases with `docker-compose up`
* Configure the plugin. See `test.http` or do it manually:
- connect to http://localhost:9000/system/outputs
- create a neo4j output
- connect to http://localhost:9000/streams/000000000000000000000001/outputs
- assign the neo4j output to the All message stream
- create a new GELF HTTP input here http://localhost:9000/system/inputs
* Send some test data

`curl -XPOST http://localhost:12201/gelf -p0 -d '{"short_message":"Hello there", "host":"example.org", "facility":"test", "source":"1.2.3.4", "user_id":"foo"}'`

* Check the nodes are created with correct properties in Neo4j
56 changes: 56 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
version: '2'
services:
# MongoDB: https://hub.docker.com/_/mongo/
mongodb:
image: mongo:3
neo4j:
image: 'neo4j:4.2.2'
environment:
- NEO4J_AUTH=neo4j/password
ports:
- 7474:7474
- 7687:7687
# Elasticsearch: https://www.elastic.co/guide/en/elasticsearch/reference/6.x/docker.html
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch-oss:6.5.4
environment:
- http.host=0.0.0.0
- transport.host=localhost
- network.host=0.0.0.0
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
ulimits:
memlock:
soft: -1
hard: -1
mem_limit: 1g
# Graylog: https://hub.docker.com/r/graylog/graylog/
graylog:
image: graylog/graylog:2.5
environment:
# CHANGE ME (must be at least 16 characters)!
- GRAYLOG_PASSWORD_SECRET=somepasswordpepper
# Password: admin
- GRAYLOG_ROOT_PASSWORD_SHA2=8c6976e5b5410415bde908bd4dee15dfb167a9c873fc4bb8a81f6f2ab448a918
- GRAYLOG_WEB_ENDPOINT_URI=http://127.0.0.1:9000/api
links:
- mongodb:mongo
- elasticsearch
- neo4j
depends_on:
- mongodb
- elasticsearch
- neo4j
ports:
# Graylog web interface and REST API
- 9000:9000
# Syslog TCP
- 514:514
# Syslog UDP
- 514:514/udp
# GELF TCP
- 12201:12201
# GELF UDP
- 12201:12201/udp
volumes:
# Mount local plugin file into Docker container
- ./target/plugin-output-neo4j-4.0.0-SNAPSHOT.jar:/usr/share/graylog/plugin/plugin-output-neo4j-4.0.0.jar
27 changes: 22 additions & 5 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>org.graylog.plugins</groupId>
<artifactId>plugin-output-neo4j</artifactId>
<version>2.3.0-SNAPSHOT</version>
<version>4.0.0-SNAPSHOT</version>
<packaging>jar</packaging>

<name>${project.artifactId}</name>
Expand All @@ -15,9 +15,9 @@

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
<graylog2.version>2.3.0</graylog2.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<graylog2.version>2.5.1</graylog2.version>
<graylog2.plugin-dir>/usr/share/graylog-server/plugin</graylog2.plugin-dir>
</properties>

Expand All @@ -27,6 +27,12 @@
<artifactId>graylog2-server</artifactId>
<version>${graylog2.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.core</groupId>
Expand All @@ -51,12 +57,23 @@
<dependency>
<groupId>org.neo4j.driver</groupId>
<artifactId>neo4j-java-driver</artifactId>
<version>1.4.3</version>
<!-- Compatible with all supported Neo4j version (3.5+ atm) -->
<version>4.2.1</version>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<version>5.7.0</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin> <!-- Required to get the tests running -->
<artifactId>maven-surefire-plugin</artifactId>
<version>2.22.0</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
Expand Down
34 changes: 17 additions & 17 deletions src/main/java/org/graylog/plugins/outputs/neo4j/Neo4jOutput.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import com.google.inject.assistedinject.Assisted;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.graylog.plugins.outputs.neo4j.transport.INeo4jTransport;
import org.graylog.plugins.outputs.neo4j.transport.Neo4jTransports;
import org.graylog2.plugin.Message;
Expand All @@ -17,16 +21,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;

public class Neo4jOutput implements MessageOutput {
private static final Logger LOG = LoggerFactory.getLogger(Neo4jOutput.class);

private Configuration configuration;
private final AtomicBoolean isRunning = new AtomicBoolean(false);
private INeo4jTransport transport;
private final INeo4jTransport transport;

public static final String CK_PROTOCOL = "neo4j_protocol";
public static final String CK_NEO4J_URL = "neo4j_url";
Expand All @@ -37,9 +36,8 @@ public class Neo4jOutput implements MessageOutput {

@Inject
public Neo4jOutput(@Assisted Stream stream, @Assisted Configuration config) throws MessageOutputConfigurationException {
configuration = config;
final Neo4jTransports transportSelection;
switch (configuration.getString(CK_PROTOCOL).toUpperCase(Locale.ENGLISH)) {
Neo4jTransports transportSelection;
switch (config.getString(CK_PROTOCOL).toUpperCase(Locale.ENGLISH)) {
case "BOLT":
transportSelection = Neo4jTransports.BOLT;
break;
Expand All @@ -48,11 +46,11 @@ public Neo4jOutput(@Assisted Stream stream, @Assisted Configuration config) thro
break;

default:
throw new MessageOutputConfigurationException("Unknown protocol " + configuration.getString(CK_PROTOCOL));
throw new MessageOutputConfigurationException("Unknown protocol " + config.getString(CK_PROTOCOL));
}

try {
transport = Neo4jTransports.create(transportSelection, configuration);
transport = Neo4jTransports.create(transportSelection, config);
} catch (Exception e) {
final String error = "Error initializing " + INeo4jTransport.class;
LOG.error(error, e);
Expand Down Expand Up @@ -114,8 +112,10 @@ public ConfigurationRequest getRequestedConfiguration() {
ConfigurationField.Optional.NOT_OPTIONAL));

configurationRequest.addField(new TextField(
CK_NEO4J_URL, "Neo4j URL", "http://localhost:7474/db/data",
"default for Bolt: bolt://localhost:7687/",
CK_NEO4J_URL, "Neo4j URL", "http://localhost:7474/db/neo4j/tx/commit",
"HTTP URL format for Neo4j 3.5.x: http://localhost:7474/db/data/transaction/commit\n"
+ "HTTP URL format for Neo4j 4.x: http://localhost:7474/db/<dbName>/tx/commit\n"
+ "Bolt URL format: bolt://localhost:7687/",
ConfigurationField.Optional.NOT_OPTIONAL)
);

Expand All @@ -127,11 +127,11 @@ public ConfigurationRequest getRequestedConfiguration() {

configurationRequest.addField(new TextField(
CK_NEO4J_QUERY, "Cypher query",
"MERGE (source:HOST { address: '${source}' })\n" +
"MERGE (user_id:USER { user_id: '${user_id}'})\nMERGE (source)-[:CONNECT]->(user_id)",
"MERGE (source:HOST { address: $source })\n" +
"MERGE (user_id:USER { user_id: $user_id})\nMERGE (source)-[:CONNECT]->(user_id)",
"Query will be executed on every log message.\n" +
"HTTP Mode: Use template substitutions to access message fields: ${took_ms}\n" +
"Bolt Mode: Use curly brackets only to access message fields: {took_ms}",
"Queries use template substitutions to access message fields using default neo4j parameter syntax: $parameterName.\n" +
"All parameters are sent to the database as strings.",
ConfigurationField.Optional.NOT_OPTIONAL, TextField.Attribute.TEXTAREA)
);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package org.graylog.plugins.outputs.neo4j;

import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Neo4jStatement {

private static final Logger LOG = LoggerFactory.getLogger(Neo4jStatement.class);
private static final Pattern CYPHER_PARAMETER_REGEX = Pattern.compile("\\$([a-zA-Z0-9_]+)");

private final String statement;
private final Collection<String> parameterNames;

public Neo4jStatement(String statement) {
this.statement = statement;
Matcher m = CYPHER_PARAMETER_REGEX.matcher(statement);
parameterNames = Collections.unmodifiableSet(extractParameterNames(m));
}

public String sanitizedQuery() {
return statement.replace("\n", " ")
.replace("\r", " ");
}

public Collection<String> parameterNames() {
return parameterNames;
}

private Set<String> extractParameterNames(Matcher m) {
Set<String> params = new HashSet<>();
while (m.find()) {
params.add(m.group(1));
LOG.debug("Found field in cypher statement: " + m.group(1));
}
LOG.info("Identified " + params.size() + " fields in graph create query.");
return params;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package org.graylog.plugins.outputs.neo4j.transport;

import static java.util.stream.Collectors.toMap;

import java.util.Collection;
import java.util.Map;
import java.util.Map.Entry;
import java.util.stream.Collectors;
import org.graylog.plugins.outputs.neo4j.Neo4jStatement;
import org.graylog2.plugin.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractNeo4jTransport implements INeo4jTransport {

private static final Logger LOG = LoggerFactory.getLogger(AbstractNeo4jTransport.class);

private final Neo4jStatement statement;

public AbstractNeo4jTransport(Neo4jStatement statement) {
this.statement = statement;
}


@Override
public void send(Message message) {

Collection<String> missingFieldNames = statement.parameterNames().stream()
.filter(it -> !message.hasField(it))
.collect(Collectors.toSet());
if (!missingFieldNames.isEmpty()) {
LOG.warn("Unable to execute query because of missing parameters in context : {}", missingFieldNames);
return;
}

Map<String, Object> convertedFields = message.getFields().entrySet().stream()
.filter(it -> statement.parameterNames().contains(it.getKey()))
.collect(toMap(Entry::getKey, it -> String.valueOf(it.getValue())));

postQuery(statement.sanitizedQuery(), convertedFields);
}

protected abstract void postQuery(String queryString, Map<String, Object> parameters);
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public interface INeo4jTransport {
* @param message message to send to the remote host
* @throws InterruptedException
*/
public void send(Message message) throws InterruptedException;
void send(Message message) throws InterruptedException;

/**
* Tries to send the given message to the remote host. It does <strong>not block</strong> if there is not enough
Expand All @@ -24,10 +24,10 @@ public interface INeo4jTransport {
* @param message message to send to the remote host
* @return true if the message could be dispatched, false otherwise
*/
public boolean trySend(Message message);
boolean trySend(Message message);

/**
* Stops the transport. Should be used to gracefully shutdown the backend.
*/
public void stop();
void stop();
}
Loading

0 comments on commit 1cf25dc

Please sign in to comment.