Skip to content

Commit

Permalink
0.0.15: Add docs and extend reference config for replication settings
Browse files Browse the repository at this point in the history
  • Loading branch information
jypma committed Oct 10, 2016
1 parent 09c2dec commit 795e88e
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 8 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,14 @@ If you use SBT, you can use this library by adding the following:
resolvers += Resolver.bintrayRepo("jypma", "maven")

libraryDependencies ++= {
val version = "0.0.14"
val version = "0.0.15"
Seq(
"com.tradeshift" % "ts-reaktive-actors" % version,
"com.tradeshift" %% "ts-reaktive-akka" % version,
"com.tradeshift" % "ts-reaktive-cassandra" % version,
"com.tradeshift" % "ts-reaktive-marshal" % version,
"com.tradeshift" % "ts-reaktive-marshal-akka" % version,
"com.tradeshift" % "ts-reaktive-replication" % version,
"com.tradeshift" % "ts-reaktive-ssl" % version,
"com.tradeshift" % "ts-reaktive-testkit" % version % "test",
"com.tradeshift" % "ts-reaktive-testkit-assertj" % version % "test",
Expand Down
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import sbtprotobuf.{ProtobufPlugin=>PB}
lazy val projectSettings = PB.protobufSettings ++ Seq(
licenses := Seq(("MIT", url("http://opensource.org/licenses/MIT"))),
organization := "com.tradeshift",
version := "0.0.14",
version := "0.0.15",
scalaVersion := "2.11.8",
publishMavenStyle := true,
javacOptions ++= Seq("-source", "1.8"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,16 @@
*/
public interface EventClassifier<E> {
/**
* Returns the names of any additional data centers that should get access to the
* Returns the names of any additional data centers that should get access to the
* persistence ID of the given event, after the event has been applied, or Seq.empty()
* if the data centers should remain unchanged.
*
* This must be implemented as a pure function, relying only on the event itself. In other words,
* the event must fully imply to which datacenters replication is to be done. This is to guarantee
* full idempotency when replaying.
*
* You can typically achieve this by having an explicit "MadeVisibleToNewDataCenter" event, or explicit
* "alsoNowVisibleToDataCenter" fields on other events.
*/
public Seq<String> getDataCenterNames(E event);
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public static Replication get(ActorSystem system) {
return ReplicationId.INSTANCE.get(system);
}

private final Map<String,CompletionStage<Void>> started = new HashMap<String,CompletionStage<Void>>();
private final Map<String,CompletionStage<Void>> started = new HashMap<>();
private final ActorSystem system;
private final Config config;

Expand All @@ -64,7 +64,7 @@ public <E> EventClassifier<E> getEventClassifier(Class<E> eventType) {
}

public <E> String getEventTag(Class<E> eventType) {
return AbstractStatefulPersistentActor.getEventTag(config, eventType);
return AbstractStatefulPersistentActor.getEventTag(config, eventType);
}

public CompletionStage<Void> start(Class<?> eventType, ActorRef shardRegion) {
Expand All @@ -74,6 +74,13 @@ public CompletionStage<Void> start(Class<?> eventType, ActorRef shardRegion) {
synchronized(started) {
if (started.containsKey(eventTag)) return started.get(eventTag);

try {
getEventClassifier(eventType);
} catch (RuntimeException x) {
throw new IllegalArgumentException("You must configure ts-reaktive.replication.event-classifiers.\"" +
eventType.getClass() + "\" with an EventClassifier implementation.");
}

ActorMaterializer materializer = ActorMaterializer.create(system);
Config tagConfig = config.hasPath(eventTag) ? config.getConfig(eventTag).withFallback(config) : config;
EventEnvelopeSerializer serializer = new EventEnvelopeSerializer(system);
Expand All @@ -82,7 +89,7 @@ public CompletionStage<Void> start(Class<?> eventType, ActorRef shardRegion) {
String name = e.getKey();
Config remote = ((ConfigObject) e.getValue()).toConfig();
// FIXME add config options for ConnectionContext
return new WebSocketDataCenterClient(system, ConnectionContext.noEncryption(), name, remote.getString("url"), serializer);
return new WebSocketDataCenterClient(system, ConnectionContext.noEncryption(), name, remote.getString("url"), serializer);
});

DataCenterRepository dataCenterRepository = new DataCenterRepository() {
Expand All @@ -101,15 +108,15 @@ public String getLocalName() {
VisibilityRepository visibilityRepo = new VisibilityRepository(session);
ReadJournal journal = PersistenceQuery.get(system).getReadJournalFor(ReadJournal.class, config.getString("read-journal-plugin-id"));

DataCenterForwarder.startAll(system, materializer, dataCenterRepository, visibilityRepo, eventType,
DataCenterForwarder.startAll(system, materializer, dataCenterRepository, visibilityRepo, eventType,
(EventsByTagQuery)journal, (CurrentEventsByPersistenceIdQuery) journal);

WebSocketDataCenterServer server = new WebSocketDataCenterServer(config.getConfig("server"), shardRegion);
final int port = tagConfig.getInt("local-datacenter.port");
final String host = tagConfig.getString("local-datacenter.host");
log.debug("Binding to {}:{}", host, port);

CompletionStage<ServerBinding> bind = Http.get(system).bindAndHandle(server.route().flow(system, materializer),
CompletionStage<ServerBinding> bind = Http.get(system).bindAndHandle(server.route().flow(system, materializer),
ConnectHttp.toHost(host, port), materializer);

// The returned future completes when both the HTTP binding is ready, and the cassandra visibility session has initialized.
Expand Down
32 changes: 32 additions & 0 deletions ts-reaktive-replication/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ ts-reaktive {

# Number of aggregates to broadcast to new data centers in parallel
parallellism = 8

# Akka persistence plugin ID for the read journal to use. It must support EventsByTagQuery and CurrentEventsByPersistenceIdQuery.
# The default here is for the akka cassandra plugin.
read-journal-plugin-id = "cassandra-query-journal"

server {
# Time to wait for acknowledgement from a persistent actor (and journal) after posting an event for storing
Expand All @@ -32,5 +36,33 @@ ts-reaktive {
# Replication factor list for data centers, e.g. ["dc1:3", "dc2:2"]. Is only used when replication-strategy is NetworkTopologyStrategy.
data-center-replication-factors = []
}

local-datacenter {
# Host to bind the server to
host = "127.0.0.1"

# TCP port to bind the server to
port = 8623

# Name for this datacenter. This MUST be changed to a unique name per datacenter in production.
name = "local"
}

remote-datacenters {
# The actual application must have sub-config sections here, one for each remote data center, with the following entries:
#
# example-datacenter {
# # The URL to the replication server of the remote datacenter (must match its local-datacenter setting)
# url = "ws://server/path"
# }
#
}

event-classifiers {
# For each event type you want to replicate, you need to implement EventClassifier (with a no-args constructor)
# and mention the fully-qualified class name here.
#
# "com.package.MyEvent" = "com.mypackage.MyEventClassifier"
}
}
}

0 comments on commit 795e88e

Please sign in to comment.