From 795e88e77f960c0e05e1675412a78b29802e2647 Mon Sep 17 00:00:00 2001 From: Jan Ypma Date: Mon, 10 Oct 2016 14:57:25 +0200 Subject: [PATCH] 0.0.15: Add docs and extend reference config for replication settings --- README.md | 3 +- build.sbt | 2 +- .../reaktive/replication/EventClassifier.java | 9 +++++- .../reaktive/replication/Replication.java | 17 +++++++--- .../src/main/resources/reference.conf | 32 +++++++++++++++++++ 5 files changed, 55 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index 8b52be37..039f7829 100644 --- a/README.md +++ b/README.md @@ -31,13 +31,14 @@ If you use SBT, you can use this library by adding the following: resolvers += Resolver.bintrayRepo("jypma", "maven") libraryDependencies ++= { - val version = "0.0.14" + val version = "0.0.15" Seq( "com.tradeshift" % "ts-reaktive-actors" % version, "com.tradeshift" %% "ts-reaktive-akka" % version, "com.tradeshift" % "ts-reaktive-cassandra" % version, "com.tradeshift" % "ts-reaktive-marshal" % version, "com.tradeshift" % "ts-reaktive-marshal-akka" % version, + "com.tradeshift" % "ts-reaktive-replication" % version, "com.tradeshift" % "ts-reaktive-ssl" % version, "com.tradeshift" % "ts-reaktive-testkit" % version % "test", "com.tradeshift" % "ts-reaktive-testkit-assertj" % version % "test", diff --git a/build.sbt b/build.sbt index 12a21467..1920c6ce 100644 --- a/build.sbt +++ b/build.sbt @@ -5,7 +5,7 @@ import sbtprotobuf.{ProtobufPlugin=>PB} lazy val projectSettings = PB.protobufSettings ++ Seq( licenses := Seq(("MIT", url("http://opensource.org/licenses/MIT"))), organization := "com.tradeshift", - version := "0.0.14", + version := "0.0.15", scalaVersion := "2.11.8", publishMavenStyle := true, javacOptions ++= Seq("-source", "1.8"), diff --git a/ts-reaktive-replication/src/main/java/com/tradeshift/reaktive/replication/EventClassifier.java b/ts-reaktive-replication/src/main/java/com/tradeshift/reaktive/replication/EventClassifier.java index 64973dc2..0ac32ef4 100644 --- a/ts-reaktive-replication/src/main/java/com/tradeshift/reaktive/replication/EventClassifier.java +++ b/ts-reaktive-replication/src/main/java/com/tradeshift/reaktive/replication/EventClassifier.java @@ -7,9 +7,16 @@ */ public interface EventClassifier { /** - * Returns the names of any additional data centers that should get access to the + * Returns the names of any additional data centers that should get access to the * persistence ID of the given event, after the event has been applied, or Seq.empty() * if the data centers should remain unchanged. + * + * This must be implemented as a pure function, relying only on the event itself. In other words, + * the event must fully imply to which datacenters replication is to be done. This is to guarantee + * full idempotency when replaying. + * + * You can typically achieve this by having an explicit "MadeVisibleToNewDataCenter" event, or explicit + * "alsoNowVisibleToDataCenter" fields on other events. */ public Seq getDataCenterNames(E event); } diff --git a/ts-reaktive-replication/src/main/java/com/tradeshift/reaktive/replication/Replication.java b/ts-reaktive-replication/src/main/java/com/tradeshift/reaktive/replication/Replication.java index d0f5d0c9..89f35389 100644 --- a/ts-reaktive-replication/src/main/java/com/tradeshift/reaktive/replication/Replication.java +++ b/ts-reaktive-replication/src/main/java/com/tradeshift/reaktive/replication/Replication.java @@ -37,7 +37,7 @@ public static Replication get(ActorSystem system) { return ReplicationId.INSTANCE.get(system); } - private final Map> started = new HashMap>(); + private final Map> started = new HashMap<>(); private final ActorSystem system; private final Config config; @@ -64,7 +64,7 @@ public EventClassifier getEventClassifier(Class eventType) { } public String getEventTag(Class eventType) { - return AbstractStatefulPersistentActor.getEventTag(config, eventType); + return AbstractStatefulPersistentActor.getEventTag(config, eventType); } public CompletionStage start(Class eventType, ActorRef shardRegion) { @@ -74,6 +74,13 @@ public CompletionStage start(Class eventType, ActorRef shardRegion) { synchronized(started) { if (started.containsKey(eventTag)) return started.get(eventTag); + try { + getEventClassifier(eventType); + } catch (RuntimeException x) { + throw new IllegalArgumentException("You must configure ts-reaktive.replication.event-classifiers.\"" + + eventType.getClass() + "\" with an EventClassifier implementation."); + } + ActorMaterializer materializer = ActorMaterializer.create(system); Config tagConfig = config.hasPath(eventTag) ? config.getConfig(eventTag).withFallback(config) : config; EventEnvelopeSerializer serializer = new EventEnvelopeSerializer(system); @@ -82,7 +89,7 @@ public CompletionStage start(Class eventType, ActorRef shardRegion) { String name = e.getKey(); Config remote = ((ConfigObject) e.getValue()).toConfig(); // FIXME add config options for ConnectionContext - return new WebSocketDataCenterClient(system, ConnectionContext.noEncryption(), name, remote.getString("url"), serializer); + return new WebSocketDataCenterClient(system, ConnectionContext.noEncryption(), name, remote.getString("url"), serializer); }); DataCenterRepository dataCenterRepository = new DataCenterRepository() { @@ -101,7 +108,7 @@ public String getLocalName() { VisibilityRepository visibilityRepo = new VisibilityRepository(session); ReadJournal journal = PersistenceQuery.get(system).getReadJournalFor(ReadJournal.class, config.getString("read-journal-plugin-id")); - DataCenterForwarder.startAll(system, materializer, dataCenterRepository, visibilityRepo, eventType, + DataCenterForwarder.startAll(system, materializer, dataCenterRepository, visibilityRepo, eventType, (EventsByTagQuery)journal, (CurrentEventsByPersistenceIdQuery) journal); WebSocketDataCenterServer server = new WebSocketDataCenterServer(config.getConfig("server"), shardRegion); @@ -109,7 +116,7 @@ public String getLocalName() { final String host = tagConfig.getString("local-datacenter.host"); log.debug("Binding to {}:{}", host, port); - CompletionStage bind = Http.get(system).bindAndHandle(server.route().flow(system, materializer), + CompletionStage bind = Http.get(system).bindAndHandle(server.route().flow(system, materializer), ConnectHttp.toHost(host, port), materializer); // The returned future completes when both the HTTP binding is ready, and the cassandra visibility session has initialized. diff --git a/ts-reaktive-replication/src/main/resources/reference.conf b/ts-reaktive-replication/src/main/resources/reference.conf index 1c2f5a5c..fb5f8407 100644 --- a/ts-reaktive-replication/src/main/resources/reference.conf +++ b/ts-reaktive-replication/src/main/resources/reference.conf @@ -7,6 +7,10 @@ ts-reaktive { # Number of aggregates to broadcast to new data centers in parallel parallellism = 8 + + # Akka persistence plugin ID for the read journal to use. It must support EventsByTagQuery and CurrentEventsByPersistenceIdQuery. + # The default here is for the akka cassandra plugin. + read-journal-plugin-id = "cassandra-query-journal" server { # Time to wait for acknowledgement from a persistent actor (and journal) after posting an event for storing @@ -32,5 +36,33 @@ ts-reaktive { # Replication factor list for data centers, e.g. ["dc1:3", "dc2:2"]. Is only used when replication-strategy is NetworkTopologyStrategy. data-center-replication-factors = [] } + + local-datacenter { + # Host to bind the server to + host = "127.0.0.1" + + # TCP port to bind the server to + port = 8623 + + # Name for this datacenter. This MUST be changed to a unique name per datacenter in production. + name = "local" + } + + remote-datacenters { + # The actual application must have sub-config sections here, one for each remote data center, with the following entries: + # + # example-datacenter { + # # The URL to the replication server of the remote datacenter (must match its local-datacenter setting) + # url = "ws://server/path" + # } + # + } + + event-classifiers { + # For each event type you want to replicate, you need to implement EventClassifier (with a no-args constructor) + # and mention the fully-qualified class name here. + # + # "com.package.MyEvent" = "com.mypackage.MyEventClassifier" + } } } \ No newline at end of file