Connector is used to read from a RabbitMQ Queue or Topic.
Importance: High
Type: String
Kafka topic to write the messages to.
Importance: High
Type: List
rabbitmq.queue
Importance: High
Type: String
Default Value: localhost
The RabbitMQ host to connect to. See ConnectionFactory.setHost(java.lang.String) <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/ConnectionFactory.html#setHost-java.lang.String->
_
Importance: High
Type: String
Default Value: guest
The password to authenticate to RabbitMQ with. See ConnectionFactory.setPassword(java.lang.String) <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/ConnectionFactory.html#setPassword-java.lang.String->
_
Importance: High
Type: String
Default Value: guest
The username to authenticate to RabbitMQ with. See ConnectionFactory.setUsername(java.lang.String) <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/ConnectionFactory.html#setUsername-java.lang.String->
_
Importance: High
Type: String
Default Value: /
The virtual host to use when connecting to the broker. See ConnectionFactory.setVirtualHost(java.lang.String) <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/ConnectionFactory.html#setVirtualHost-java.lang.String->
_
Importance: Medium
Type: Int
Default Value: 5672
The RabbitMQ port to connect to. See ConnectionFactory.setPort(int) <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/ConnectionFactory.html#setPort-int->
_
Importance: Medium
Type: Int
Default Value: 0
Maximum number of messages that the server will deliver, 0 if unlimited. See Channel.basicQos(int, boolean) <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/Channel.html#basicQos-int-boolean->
_
Importance: Medium
Type: Boolean
Default Value: false
True if the settings should be applied to the entire channel rather than each consumer. See Channel.basicQos(int, boolean) <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/Channel.html#basicQos-int-boolean->
_
Importance: Low
Type: Boolean
Default Value: true
Enables or disables automatic connection recovery. See ConnectionFactory.setAutomaticRecoveryEnabled(boolean) <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/ConnectionFactory.html#setAutomaticRecoveryEnabled-boolean->
_
Importance: Low
Type: Int
Default Value: 60000
Connection TCP establishment timeout in milliseconds. zero for infinite. See ConnectionFactory.setConnectionTimeout(int) <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/ConnectionFactory.html#setConnectionTimeout-int->
_
Importance: Low
Type: Int
Default Value: 10000
The AMQP0-9-1 protocol handshake timeout, in milliseconds. See ConnectionFactory.setHandshakeTimeout(int) <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/ConnectionFactory.html#setHandshakeTimeout-int->
_
Importance: Low
Type: Int
Default Value: 10000
See ConnectionFactory.setNetworkRecoveryInterval(long) <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/ConnectionFactory.html#setNetworkRecoveryInterval-long->
_
Importance: Low
Type: Int
Default Value: 0
Initially requested maximum channel number. Zero for unlimited. See ConnectionFactory.setRequestedChannelMax(int) <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/ConnectionFactory.html#setRequestedChannelMax-int->
_
Importance: Low
Type: Int
Default Value: 0
Initially requested maximum frame size, in octets. Zero for unlimited. See ConnectionFactory.setRequestedFrameMax(int) <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/ConnectionFactory.html#setRequestedFrameMax-int->
_
Importance: Low
Type: Int
Default Value: 60
Set the requested heartbeat timeout. Heartbeat frames will be sent at about 1/2 the timeout interval. If server heartbeat timeout is configured to a non-zero value, this method can only be used to lower the value; otherwise any value provided by the client will be used. See ConnectionFactory.setRequestedHeartbeat(int) <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/ConnectionFactory.html#setRequestedHeartbeat-int->
_
Importance: Low
Type: Int
Default Value: 10000
Set the shutdown timeout. This is the amount of time that Consumer implementations have to continue working through deliveries (and other Consumer callbacks) after the connection has closed but before the ConsumerWorkService is torn down. If consumers exceed this timeout then any remaining queued deliveries (and other Consumer callbacks, including the Consumer's handleShutdownSignal() invocation) will be lost. See ConnectionFactory.setShutdownTimeout(int) <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/ConnectionFactory.html#setShutdownTimeout-int->
_
Importance: Low
Type: Boolean
Default Value: true
Enables or disables topology recovery. See ConnectionFactory.setTopologyRecoveryEnabled(boolean) <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/ConnectionFactory.html#setTopologyRecoveryEnabled-boolean->
_
This configuration is used typically along with standalone mode.
name=RabbitMQSourceConnector1
connector.class=com.github.jcustenborder.kafka.connect.rabbitmq.RabbitMQSourceConnector
tasks.max=1
kafka.topic=< Required Configuration >
rabbitmq.queue=< Required Configuration >
This configuration is used typically along with distributed mode.
Write the following json to connector.json
, configure all of the required values, and use the command below to
post the configuration to one the distributed connect worker(s).
{
"config" : {
"name" : "RabbitMQSourceConnector1",
"connector.class" : "com.github.jcustenborder.kafka.connect.rabbitmq.RabbitMQSourceConnector",
"tasks.max" : "1",
"kafka.topic" : "< Required Configuration >",
"rabbitmq.queue" : "< Required Configuration >"
}
}
Use curl to post the configuration to one of the Kafka Connect Workers. Change http://localhost:8083/
the the endpoint of
one of your Kafka Connect worker(s).
Create a new instance.
curl -s -X POST -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors
Update an existing instance.
curl -s -X PUT -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors/TestSinkConnector1/config
Connector is used to read data from a Kafka topic and publish it on a RabbitMQ exchange and routing key pair.
Importance: High
Type: String
exchange to publish the messages on.
Importance: High
Type: String
routing key used for publishing the messages.
Importance: High
Type: String
Kafka topic to read the messages from.
Importance: High
Type: String
Default Value: localhost
The RabbitMQ host to connect to. See ConnectionFactory.setHost(java.lang.String) <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/ConnectionFactory.html#setHost-java.lang.String->
_
Importance: High
Type: String
Default Value: guest
The password to authenticate to RabbitMQ with. See ConnectionFactory.setPassword(java.lang.String) <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/ConnectionFactory.html#setPassword-java.lang.String->
_
Importance: High
Type: String
Default Value: guest
The username to authenticate to RabbitMQ with. See ConnectionFactory.setUsername(java.lang.String) <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/ConnectionFactory.html#setUsername-java.lang.String->
_
Importance: High
Type: String
Default Value: /
The virtual host to use when connecting to the broker. See ConnectionFactory.setVirtualHost(java.lang.String) <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/ConnectionFactory.html#setVirtualHost-java.lang.String->
_
Importance: Medium
Type: Int
Default Value: 5672
The RabbitMQ port to connect to. See ConnectionFactory.setPort(int) <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/ConnectionFactory.html#setPort-int->
_
Importance: Low
Type: Boolean
Default Value: true
Enables or disables automatic connection recovery. See ConnectionFactory.setAutomaticRecoveryEnabled(boolean) <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/ConnectionFactory.html#setAutomaticRecoveryEnabled-boolean->
_
Importance: Low
Type: Int
Default Value: 60000
Connection TCP establishment timeout in milliseconds. zero for infinite. See ConnectionFactory.setConnectionTimeout(int) <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/ConnectionFactory.html#setConnectionTimeout-int->
_
Importance: Low
Type: Int
Default Value: 10000
The AMQP0-9-1 protocol handshake timeout, in milliseconds. See ConnectionFactory.setHandshakeTimeout(int) <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/ConnectionFactory.html#setHandshakeTimeout-int->
_
Importance: Low
Type: Int
Default Value: 10000
See ConnectionFactory.setNetworkRecoveryInterval(long) <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/ConnectionFactory.html#setNetworkRecoveryInterval-long->
_
Importance: Low
Type: Int
Default Value: 0
Initially requested maximum channel number. Zero for unlimited. See ConnectionFactory.setRequestedChannelMax(int) <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/ConnectionFactory.html#setRequestedChannelMax-int->
_
Importance: Low
Type: Int
Default Value: 0
Initially requested maximum frame size, in octets. Zero for unlimited. See ConnectionFactory.setRequestedFrameMax(int) <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/ConnectionFactory.html#setRequestedFrameMax-int->
_
Importance: Low
Type: Int
Default Value: 60
Set the requested heartbeat timeout. Heartbeat frames will be sent at about 1/2 the timeout interval. If server heartbeat timeout is configured to a non-zero value, this method can only be used to lower the value; otherwise any value provided by the client will be used. See ConnectionFactory.setRequestedHeartbeat(int) <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/ConnectionFactory.html#setRequestedHeartbeat-int->
_
Importance: Low
Type: Int
Default Value: 10000
Set the shutdown timeout. This is the amount of time that Consumer implementations have to continue working through deliveries (and other Consumer callbacks) after the connection has closed but before the ConsumerWorkService is torn down. If consumers exceed this timeout then any remaining queued deliveries (and other Consumer callbacks, including the Consumer's handleShutdownSignal() invocation) will be lost. See ConnectionFactory.setShutdownTimeout(int) <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/ConnectionFactory.html#setShutdownTimeout-int->
_
Importance: Low
Type: Boolean
Default Value: true
Enables or disables topology recovery. See ConnectionFactory.setTopologyRecoveryEnabled(boolean) <https://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/ConnectionFactory.html#setTopologyRecoveryEnabled-boolean->
_
This configuration is used typically along with standalone mode.
name=RabbitMQSinkConnector1
connector.class=com.github.jcustenborder.kafka.connect.rabbitmq.RabbitMQSinkConnector
tasks.max=1
topics=< Required Configuration >
rabbitmq.exchange=< Required Configuration >
rabbitmq.routing.key=< Required Configuration >
topics=< Required Configuration >
This configuration is used typically along with distributed mode.
Write the following json to connector.json
, configure all of the required values, and use the command below to
post the configuration to one the distributed connect worker(s).
{
"config" : {
"name" : "RabbitMQSinkConnector1",
"connector.class" : "com.github.jcustenborder.kafka.connect.rabbitmq.RabbitMQSinkConnector",
"tasks.max" : "1",
"topics" : "< Required Configuration >",
"rabbitmq.exchange" : "< Required Configuration >",
"rabbitmq.routing.key" : "< Required Configuration >"
}
}
Use curl to post the configuration to one of the Kafka Connect Workers. Change http://localhost:8083/
the the endpoint of
one of your Kafka Connect worker(s).
Create a new instance.
curl -s -X POST -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors
Update an existing instance.
curl -s -X PUT -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors/TestSinkConnector1/config
This transformation is used to extract a header from the message and use it as a key.
Importance: High
Type: String
Header name.
This configuration is used typically along with standalone mode.
name=Connector1
connector.class=org.apache.kafka.some.SourceConnector
tasks.max=1
transforms=tran
transforms.tran.type=com.github.jcustenborder.kafka.connect.rabbitmq.ExtractHeader$Key
transforms.tran.header.name=< Required Configuration >
This configuration is used typically along with distributed mode.
Write the following json to connector.json
, configure all of the required values, and use the command below to
post the configuration to one the distributed connect worker(s).
{
"name" : "Connector1",
"connector.class" : "org.apache.kafka.some.SourceConnector",
"transforms" : "tran",
"transforms.tran.type" : "com.github.jcustenborder.kafka.connect.rabbitmq.ExtractHeader$Key",
"transforms.tran.header.name" : "< Required Configuration >"
}
Use curl to post the configuration to one of the Kafka Connect Workers. Change http://localhost:8083/
the the endpoint of
one of your Kafka Connect worker(s).
Create a new instance.
curl -s -X POST -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors
Update an existing instance.
curl -s -X PUT -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors/TestSinkConnector1/config
This transformation is used to extract a header from the message and use it as a value.
Importance: High
Type: String
Header name.
This configuration is used typically along with standalone mode.
name=Connector1
connector.class=org.apache.kafka.some.SourceConnector
tasks.max=1
transforms=tran
transforms.tran.type=com.github.jcustenborder.kafka.connect.rabbitmq.ExtractHeader$Value
transforms.tran.header.name=< Required Configuration >
This configuration is used typically along with distributed mode.
Write the following json to connector.json
, configure all of the required values, and use the command below to
post the configuration to one the distributed connect worker(s).
{
"name" : "Connector1",
"connector.class" : "org.apache.kafka.some.SourceConnector",
"transforms" : "tran",
"transforms.tran.type" : "com.github.jcustenborder.kafka.connect.rabbitmq.ExtractHeader$Value",
"transforms.tran.header.name" : "< Required Configuration >"
}
Use curl to post the configuration to one of the Kafka Connect Workers. Change http://localhost:8083/
the the endpoint of
one of your Kafka Connect worker(s).
Create a new instance.
curl -s -X POST -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors
Update an existing instance.
curl -s -X PUT -H 'Content-Type: application/json' --data @connector.json http://localhost:8083/connectors/TestSinkConnector1/config