Neo4j sink connector
Write data to Neo4j.
neo4j-java-driver
version 4.4.9
name | type | required | default value |
---|---|---|---|
uri | String | Yes | - |
username | String | No | - |
password | String | No | - |
max_batch_size | Integer | No | - |
write_mode | String | No | OneByOne |
bearer_token | String | No | - |
kerberos_ticket | String | No | - |
database | String | Yes | - |
query | String | Yes | - |
queryParamPosition | Object | Yes | - |
max_transaction_retry_time | Long | No | 30 |
max_connection_timeout | Long | No | 30 |
common-options | config | no | - |
The URI of the Neo4j database. Refer to a case: neo4j://localhost:7687
username of the Neo4j
password of the Neo4j. required if username
is provided
max_batch_size refers to the maximum number of data entries that can be written in a single transaction when writing to a database.
The default value is oneByOne, or set it to "Batch" if you want to have the ability to write in batches
unwind $ttt as row create (n:Label) set n.name = row.name,n.age = rw.age
"ttt" represents a batch of data.,"ttt" can be any arbitrary string as long as it matches the configured "batch_data_variable".
base64 encoded bearer token of the Neo4j. for Auth.
base64 encoded kerberos ticket of the Neo4j. for Auth.
database name.
Query statement. contain parameter placeholders that are substituted with the corresponding values at runtime
position mapping information for query parameters.
key name is parameter placeholder name.
associated value is position of field in input data row.
maximum transaction retry time(seconds). transaction fail if exceeded
The maximum amount of time to wait for a TCP connection to be established (seconds)
Sink plugin common parameters, please refer to Sink Common Options for details
sink {
Neo4j {
uri = "neo4j://localhost:7687"
username = "neo4j"
password = "1234"
database = "neo4j"
max_transaction_retry_time = 10
max_connection_timeout = 10
query = "CREATE (a:Person {name: $name, age: $age})"
queryParamPosition = {
name = 0
age = 1
}
}
}
The unwind keyword provided by cypher supports batch writing, and the default variable for a batch of data is batch. If you write a batch write statement, then you should declare cypher:unwind $batch as row to do someting
sink {
Neo4j {
uri = "bolt://localhost:7687"
username = "neo4j"
password = "neo4j"
database = "neo4j"
max_batch_size = 1000
write_mode = "BATCH"
max_transaction_retry_time = 3
max_connection_timeout = 10
query = "unwind $batch as row create(n:MyLabel) set n.name = row.name,n.age = row.age"
}
}
- Add Neo4j Sink Connector
- Sink supports batch write