Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for redis streams #51

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,18 @@ Redis database to connect to.



##### `redis.datatype`

Redis data-type to use (sets, streams).

*Importance:* High

*Type:* String

*Default Value:* Sets



##### `redis.operation.timeout.ms`

The amount of time in milliseconds before an operation is marked as timed out.
Expand Down
8 changes: 8 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,14 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>9</source>
<target>9</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ class RedisConnectorConfig extends AbstractConfig {
public static final String CLIENT_MODE_CONFIG = "redis.client.mode";
static final String CLIENT_MODE_DOC = "The client mode to use when interacting with the Redis " +
"cluster.";
public static final String DATA_TYPE_CONFIG = "redis.data.type";
static final String DATA_TYPE_DOC = "Data type to use when storing data in redis. Currently supported data types are SETS and STREAMS";
public static final String AUTO_RECONNECT_ENABLED_CONFIG = "redis.auto.reconnect.enabled";
static final String AUTO_RECONNECT_ENABLED_DOC = "Flag to determine if the Redis client should " +
"automatically reconnect.";
Expand Down Expand Up @@ -77,6 +79,7 @@ class RedisConnectorConfig extends AbstractConfig {
public final static String CONNECTION_RETRY_DELAY_MS_DOC = "The amount of milliseconds to wait between redis connection attempts.";

public final ClientMode clientMode;
public final DataType dataType;
public final List<HostAndPort> hosts;

public final String password;
Expand Down Expand Up @@ -105,6 +108,7 @@ public RedisConnectorConfig(ConfigDef config, Map<?, ?> originals) {
this.password = getPassword(PASSWORD_CONFIG).value();
this.database = getInt(DATABASE_CONFIG);
this.clientMode = ConfigUtils.getEnum(ClientMode.class, this, CLIENT_MODE_CONFIG);
this.dataType = ConfigUtils.getEnum(DataType.class, this, DATA_TYPE_CONFIG);
this.autoReconnectEnabled = getBoolean(AUTO_RECONNECT_ENABLED_CONFIG);
this.requestQueueSize = getInt(REQUEST_QUEUE_SIZE_CONFIG);
this.keepAliveEnabled = getBoolean(SOCKET_KEEP_ALIVE_CONFIG);
Expand Down Expand Up @@ -138,6 +142,13 @@ public static ConfigDef config() {
.validator(ValidEnum.of(ClientMode.class))
.importance(ConfigDef.Importance.MEDIUM)
.build()
).define(
ConfigKeyBuilder.of(DATA_TYPE_CONFIG, ConfigDef.Type.STRING)
.documentation(DATA_TYPE_DOC)
.defaultValue(DataType.Sets.toString())
.validator(ValidEnum.of(DataType.class))
.importance(ConfigDef.Importance.MEDIUM)
.build()
).define(
ConfigKeyBuilder.of(SSL_CONFIG, ConfigDef.Type.BOOLEAN)
.documentation(SSL_DOC)
Expand Down Expand Up @@ -257,6 +268,11 @@ public enum ClientMode {
Cluster
}

public enum DataType {
Streams,
Sets
}

public enum RedisSslProvider {
OPENSSL,
JDK
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ public void put(Collection<SinkRecord> records) {
if (null == value) {
currentOperationType = SinkOperation.Type.DELETE;
} else {
currentOperationType = SinkOperation.Type.SET;
currentOperationType = SinkOperation.defaultPutType(config);
}

if (currentOperationType != operation.type) {
Expand Down Expand Up @@ -200,7 +200,7 @@ public void put(Collection<SinkRecord> records) {

@Override
public void flush(Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
SinkOperation operation = SinkOperation.create(SinkOperation.Type.SET, this.config, currentOffsets.size());
SinkOperation operation = SinkOperation.create(SinkOperation.defaultPutType(config), this.config, currentOffsets.size());

List<SinkOffsetState> states = currentOffsets
.entrySet().stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.List;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;

abstract class SinkOperation {
private static final Logger log = LoggerFactory.getLogger(SinkOperation.class);
Expand All @@ -40,6 +43,7 @@ abstract class SinkOperation {
}

public enum Type {
ADD,
SET,
DELETE,
NONE
Expand All @@ -61,25 +65,40 @@ protected void wait(RedisFuture<?> future) throws InterruptedException {
}
}

public static SinkOperation.Type defaultPutType(RedisConnectorConfig config) {
switch (config.dataType) {
case Streams:
return Type.ADD;
case Sets:
return Type.SET;
default:
return null;
}
}

public static SinkOperation create(Type type, RedisSinkConnectorConfig config, int size) {
SinkOperation result;

switch (type) {
case SET:
result = new SetOperation(config, size);
break;
case DELETE:
result = new DeleteOperation(config, size);
case ADD:
result = new AddOperation(config, size);
break;
case DELETE:
if (config.dataType == RedisConnectorConfig.DataType.Sets) {
result = new DeleteOperation(config, size);
break;
}
default:
throw new IllegalStateException(
String.format("%s is not a supported operation.", type)
String.format("%s is not a supported operation.", type)
);
}

return result;
}


static class NoneOperation extends SinkOperation {
NoneOperation(RedisSinkConnectorConfig config) {
super(Type.NONE, config);
Expand All @@ -103,6 +122,33 @@ public int size() {
}
}

static class AddOperation extends SinkOperation {
final Map<byte[], List<byte[]>> entries;

AddOperation(RedisSinkConnectorConfig config, int size) {
super(Type.SET, config);
this.entries = new HashMap<>(size);
}

@Override
public void add(byte[] key, byte[] value) {
List<byte[]> existingEntries = entries.getOrDefault(key, new ArrayList<>());
existingEntries.add(value);
entries.put(key, existingEntries);
}

public void execute(RedisClusterAsyncCommands<byte[], byte[]> asyncCommands) throws InterruptedException {
log.debug("execute() - Calling xadd with {} value(s)", this.entries.size());
Stream<RedisFuture<?>> futures = entries.entrySet().stream().map(entry -> asyncCommands.xadd(entry.getKey(), entry.getValue().toArray()));
for (RedisFuture<?> future : futures.collect(Collectors.toList())) wait(future);
}

@Override
public int size() {
return entries.size();
}
}

static class SetOperation extends SinkOperation {
final Map<byte[], byte[]> sets;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ public void before() throws InterruptedException {
when(setFuture.await(anyLong(), any(TimeUnit.class))).thenReturn(true);
RedisFuture<Long> deleteFuture = mock(RedisFuture.class);
when(deleteFuture.await(anyLong(), any(TimeUnit.class))).thenReturn(true);
RedisFuture<String> addFuture = mock(RedisFuture.class);
when(addFuture.await(anyLong(), any(TimeUnit.class))).thenReturn(true);
when(asyncCommands.xadd(any(byte[].class), any(byte[].class))).thenReturn(addFuture);
when(asyncCommands.mset(anyMap())).thenReturn(setFuture);
when(asyncCommands.del(any())).thenReturn(deleteFuture);
task.config = new RedisSinkConnectorConfig(
Expand Down Expand Up @@ -156,4 +159,25 @@ public void put() throws InterruptedException {
inOrder.verify(asyncCommands, times(2)).mset(anyMap());
}

@Test
public void add() throws InterruptedException {
task.config = new RedisSinkConnectorConfig(
ImmutableMap.of(RedisSinkConnectorConfig.DATA_TYPE_CONFIG, "Streams")
);
List<SinkRecord> records = Arrays.asList(
record("add1", "456"),
record("add2", "123"),
record("add3", "555"),
record("set2", "666")
);

task.put(records);

InOrder inOrder = Mockito.inOrder(asyncCommands);
inOrder.verify(asyncCommands, times(4)).xadd(any(byte[].class), any(byte[].class));

task.flush(ImmutableMap.of(new TopicPartition(lastRecord.topic(), lastRecord.kafkaPartition()), new OffsetAndMetadata(lastRecord.kafkaOffset())));
inOrder.verify(asyncCommands, times(1)).xadd(any(byte[].class), any(byte[].class));
}

}