DataCarrier is a light, embed, high-throughput, publish-subscribe MQ.
This project has been included and donated into Apache software foundation, as a part of Apache SkyWalking.
Sky-Walking APM uses DataCarrier in sky-walking agaent, one implementation version is used in OneAPM Ai commercial edition. See skywalking version.
- Publish-Subscribe In-Memory MQ. Support multi Producers and Consumer.
- Light and Embed. A mini java lib, less than 20k, no other dependences.
- High-throughput. Used in Sky-Walking APM.
- Easy to use. Simple API
- Only need jdk1.6
- Download latest version
- Use Maven, Gradle, Ivy, SBT, etc. set JCenter Center Repository
- maven
<?xml version="1.0" encoding="UTF-8" ?>
<settings xsi:schemaLocation='http://maven.apache.org/SETTINGS/1.0.0 http://maven.apache.org/xsd/settings-1.0.0.xsd'
xmlns='http://maven.apache.org/SETTINGS/1.0.0' xmlns:xsi='http://www.w3.org/2001/XMLSchema-instance'>
<profiles>
<profile>
<repositories>
<repository>
<snapshots>
<enabled>false</enabled>
</snapshots>
<id>central</id>
<name>bintray</name>
<url>http://jcenter.bintray.com</url>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<snapshots>
<enabled>false</enabled>
</snapshots>
<id>central</id>
<name>bintray-plugins</name>
<url>http://jcenter.bintray.com</url>
</pluginRepository>
</pluginRepositories>
<id>bintray</id>
</profile>
</profiles>
<activeProfiles>
<activeProfile>bintray</activeProfile>
</activeProfiles>
</settings>
<dependency>
<groupId>com.a.eye</groupId>
<artifactId>data-carrier</artifactId>
<version>x.x</version>
</dependency>
- create a new DataCarrier instance
/**
* channelSize = 5, bufferSize per channel = 5000
*/
DataCarrier<SampleData> carrier = new DataCarrier<SampleData>(5, 5000);
- set message buffer strategy (optional)
/**
* default is BLOCKING
* BLOCKING, waiting to set value to buffer, return when finished.
* OVERRIDE, force to set value to buffer, return true forever.
* IF_POSSIBLE, try to set value to buffer, return true when set successfully.
*/
carrier.setBufferStrategy(BufferStrategy.IF_POSSIBLE);
- set partitioner (optional)
/**
* default is SimpleRollingPartitioner
* provided: ProducerThreadPartitioner, SimpleRollingPartitioner
* you can create any partitioner, only need to implements IDataPartitioner interface
*/
carrier.setPartitioner(new ProducerThreadPartitioner<SampleData>());
ref to partitioner implements
- set consumer and start to consume data
/**
* set consumers to this Carrier.
* consumer begin to run when {@link DataCarrier<T>#produce(T)} begin to work.
*
* @param consumerClass class of consumer
* @param num number of consumer threads
*/
carrier.consume(SampleConsumer.class, 10);
or
/**
* set consumers to this Carrier.
* consumer begin to run when {@link DataCarrier<T>#produce(T)} begin to work.
*
* @param consumer single instance of consumer, all consumer threads will all use this instance.
* @param num number of consumer threads
* @return
*/
carrier.consume(consumer, 10);
- create a consumer (sample)
public class SampleConsumer implements IConsumer<SampleData> {
public int i = 1;
@Override
public void init() {
}
@Override
public void consume(List<SampleData> data) {
for(SampleData one : data) {
one.setIntValue(this.hashCode());
ConsumerTest.buffer.offer(one);
}
}
@Override
public void onError(List<SampleData> data, Throwable t) {
}
@Override
public void onExit() {
}
}
- produce messages as you need (sample)
for (int i = 0; i < 200; i++) {
carrier.produce(new SampleData());
}