diff --git a/ts-reaktive-actors/src/main/java/com/tradeshift/reaktive/materialize/MaterializerActor.java b/ts-reaktive-actors/src/main/java/com/tradeshift/reaktive/materialize/MaterializerActor.java index 05b21faa..e99e4f4a 100644 --- a/ts-reaktive-actors/src/main/java/com/tradeshift/reaktive/materialize/MaterializerActor.java +++ b/ts-reaktive-actors/src/main/java/com/tradeshift/reaktive/materialize/MaterializerActor.java @@ -66,13 +66,14 @@ public abstract class MaterializerActor extends AbstractPersistentActor { private final FiniteDuration updateAccuracy; private final FiniteDuration restartDelay; private final int batchSize; + private final int updateSize; private final int maxEventsPerTimestamp; private final int maxWorkerCount; private final Duration updateOffsetInterval; private final AtomicReference reimportProgress = new AtomicReference<>(); private final ActorMaterializer materializer; - private MaterializerWorkers workers; + private volatile MaterializerWorkers workers; private Option ongoingReimport = Option.none(); private Map workerEndTimestamps = HashMap.empty(); @@ -83,6 +84,7 @@ protected MaterializerActor() { updateAccuracy = FiniteDuration.create(config.getDuration("update-accuracy", SECONDS), SECONDS); restartDelay = FiniteDuration.create(config.getDuration("restart-delay", SECONDS), SECONDS); batchSize = config.getInt("batch-size"); + updateSize = config.getInt("update-size"); maxEventsPerTimestamp = config.getInt("max-events-per-timestamp"); maxWorkerCount = config.getInt("max-worker-count"); updateOffsetInterval = config.getDuration("update-offset-interval"); @@ -111,8 +113,7 @@ public Receive createReceive() { reimport(msg.entityIds); }) .match(QueryProgress.class, msg -> { - Option reimportP = Option.of(reimportProgress.get()); - sender().tell(new Progress(reimportP, workers), self()); + sendProgress(); }) .matchEquals("reimportComplete", msg -> { log.info("Re-import completed."); @@ -131,7 +132,7 @@ public Receive createReceive() { applyEvent(evt); context().system().scheduler().scheduleOnce( updateAccuracy, sender(), "ack", context().dispatcher(), self()); - if (lastSequenceNr() > 1) { + if ((lastSequenceNr() > 1) && ((lastSequenceNr() % 25) == 0)) { deleteMessages(lastSequenceNr() - 1); } }); @@ -141,7 +142,7 @@ public Receive createReceive() { }) .match(DeleteMessagesFailure.class, msg -> { log.error(msg.cause(), "Delete messages failed at offsets " + workers - + ", rethrowing", msg.cause()); + + ", rethrowing"); throw (Exception) msg.cause(); }) .match(WorkerFailure.class, failure -> { @@ -153,13 +154,20 @@ public Receive createReceive() { log.debug("Completed {}, now offset is: {}", msg.worker, workers); onWorkerStopped(msg.worker); }) - .matchEquals("reset", msg -> { - /* replaced by creating extra workers */ - log.warning("Received legacy 'reset' message"); + .matchEquals("reset", msg -> { // for compatibility + reset(); + }) + .match(Reset.class, msg -> { + reset(); }) .build(); } + private void sendProgress() { + Option reimportP = Option.of(reimportProgress.get()); + sender().tell(new Progress(reimportP, workers), self()); + } + private void createWorker(Instant timestamp, Option endTimestamp) { if (workers.getIds().size() >= maxWorkerCount) { log.warning("Ignoring request to start extra worker at {}, because maximum of {} is already reached.", @@ -167,11 +175,21 @@ private void createWorker(Instant timestamp, Option endTimestamp) { return; } - persist(workers.startWorker(timestamp, endTimestamp), evt -> { + persistAndApply(workers.startWorker(timestamp, endTimestamp)); + } + + private void reset() { + persistAndApply(workers.reset()); + } + + private void persistAndApply(MaterializerActorEvent evt) { + persist(evt, e -> { applyEvent(evt); - // Start the new worker: + // The first worker will have been stopped and we have a new one at the epoch. Start it. workers.getIds().removeAll(workerEndTimestamps.keySet()).forEach(this::materializeEvents); + + sendProgress(); }); } @@ -268,14 +286,14 @@ private void materializeEvents(UUID worker) { maxEventsPerTimestamp, rollback)) // Allow multiple timestamps to be processed simultaneously - .groupedWeightedWithin(maxEventsPerTimestamp, seq -> (long) seq.size(), Duration.ofSeconds(1)) + .groupedWeightedWithin(updateSize, seq -> (long) seq.size(), Duration.ofSeconds(1)) // Process them, and emit a single timestamp at t .mapAsync(1, listOfSeq -> // re-group into batchSize, each one no longer necessarily within one timestamp Source.from(Vector.ofAll(listOfSeq).flatMap(seq -> seq)) .grouped(batchSize) - .mapAsync(1, envelopeList -> materialize(envelopeList)) + .mapAsync(1, envelopeList -> materialize(workers.getIds().indexOf(worker), envelopeList)) .runWith(Sink.ignore(), materializer) .thenApply(done -> timestampOf(listOfSeq.get(listOfSeq.size() - 1).last()).toEpochMilli()) ) @@ -310,7 +328,7 @@ private void reimport(Set entityIds) { .conflate((t1, t2) -> (t1.isAfter(t2)) ? t1 : t2) .throttle(1, Duration.ofSeconds(1)) .map(t -> { - metrics.getReimportRemaining().record(ChronoUnit.MILLIS.between(t, maxTimestamp)); + metrics.getReimportRemaining().set(ChronoUnit.MILLIS.between(t, maxTimestamp)); return Done.getInstance(); }) .toMat(Sink.onComplete(result -> self.tell("reimportComplete", self())), Keep.left()) @@ -340,8 +358,13 @@ private void cancelReimport() { private void recordOffsetMetric() { Seq ids = workers.getIds(); - for (int i = 0; i < workers.getIds().size(); i++) { - metrics.getOffset(i).record(Duration.between(workers.getTimestamp(ids.apply(i)), Instant.now()).toMillis()); + for (int i = 0; i < ids.size(); i++) { + Instant offset = workers.getTimestamp(ids.apply(i)); + metrics.getOffset(i).set(offset.toEpochMilli()); + metrics.getDelay(i).set(Duration.between(offset, Instant.now()).toMillis()); + for (Instant end: workers.getEndTimestamp(workers.getIds().apply(i))) { + metrics.getRemaining(i).set(Duration.between(offset, end).toMillis()); + } } } @@ -365,24 +388,30 @@ protected String getConcurrencyKey(E envelope) { /** * Materialize the given envelopes in parallel, as far as their entityIds allow it. */ - private CompletionStage materialize(java.util.List envelopes) { + private CompletionStage materialize(int workerIndex, java.util.List envelopes) { + long start = System.nanoTime(); return CompletableFutures.sequence( Vector.ofAll(envelopes) .groupBy(this::getConcurrencyKey) .values() - .map(this::persistSequential) + .map(es -> persistSequential(workerIndex, es)) .map(c -> c.toCompletableFuture()) - ).thenApply(seqOfDone -> Done.getInstance()); + ).thenApply(seqOfDone -> { + long dur = (System.nanoTime() - start) / 1000; + log.debug("Worker {} materialized {} events in {}ms", workerIndex, envelopes.size(), + dur / 1000.0); + return Done.getInstance(); + }); } - private CompletionStage persistSequential(Seq seq) { + private CompletionStage persistSequential(int workerIndex, Seq seq) { if (seq.isEmpty()) { return done; } else { return materialize(seq.head()) .thenCompose(done -> { - metrics.getEvents().increment(); - return persistSequential(seq.tail()); + metrics.getEvents(workerIndex).increment(); + return persistSequential(workerIndex, seq.tail()); }); } } @@ -429,6 +458,25 @@ public static class CancelReimport implements Serializable { private CancelReimport() {} } + /** + * Message can be sent to "reset" this materializer, causing it to rematerialize everything. + * + * All existing workers will stay at the timestamps that they are, except for the oldest worker, + * which gets reset to the epoch. In addition, any end timestamps workers have are reset to + * the start timestamp of the next worker. + * + * This way, we queue up rematerialization of all timestamps, while retaining the same amount + * of workers, and minimizing disruptions in ongoing time sequences. + * + * A Progress message is sent back as reply. + */ + public static class Reset implements Serializable { + private static final long serialVersionUID = 1L; + + public static final Reset instance = new Reset(); + private Reset() {} + } + /** * Message that can be sent to this actor to query its current progress. * @@ -494,6 +542,8 @@ public Option getEndTimestamp() { * * If the maximum number of workers has been reached, or if the timestamp is too close to an * existing worker, the request is ignored. + * + * A Progress message is sent back as reply. */ public static class CreateWorker implements Serializable { private static final long serialVersionUID = 1L; diff --git a/ts-reaktive-actors/src/main/java/com/tradeshift/reaktive/materialize/MaterializerMetrics.java b/ts-reaktive-actors/src/main/java/com/tradeshift/reaktive/materialize/MaterializerMetrics.java index 7bfe54db..ea539075 100644 --- a/ts-reaktive-actors/src/main/java/com/tradeshift/reaktive/materialize/MaterializerMetrics.java +++ b/ts-reaktive-actors/src/main/java/com/tradeshift/reaktive/materialize/MaterializerMetrics.java @@ -5,41 +5,56 @@ import io.vavr.collection.HashMap; import kamon.Kamon; import kamon.metric.Counter; -import kamon.metric.Histogram; -import kamon.metric.HistogramMetric; +import kamon.metric.CounterMetric; +import kamon.metric.Gauge; +import kamon.metric.GaugeMetric; import kamon.metric.MeasurementUnit; public class MaterializerMetrics { private final HashMap baseTags; - private final Counter events; + private final CounterMetric events; private final Counter restarts; - private final Histogram reimportRemaining; - private final HistogramMetric offset; + private final Gauge reimportRemaining; + /** The current timestamp for each worker, in milliseconds since the epoch */ + private final GaugeMetric offset; + /** The delay between the timestamp of each worker and now(), in milliseconds */ + private final GaugeMetric delay; + /** The rime remaining for each worker, in milliseconds (if there is an end timestamp) */ + private final GaugeMetric remaining; public MaterializerMetrics(String name) { baseTags = HashMap.of("journal-materializer", name); Map tags = baseTags.toJavaMap(); - this.events = Kamon.counter("journal-materializer.events").refine(tags); + this.events = Kamon.counter("journal-materializer.events"); this.restarts = Kamon.counter("journal-materializer.restarts").refine(tags); - this.reimportRemaining = Kamon.histogram("journal-materializer.reimport-remaining", MeasurementUnit.time().milliseconds()).refine(tags); - this.offset = Kamon.histogram("journal-materializer.offset", - MeasurementUnit.time().milliseconds()); + this.reimportRemaining = Kamon.gauge("journal-materializer.reimport-remaining", MeasurementUnit.time().milliseconds()).refine(tags); + this.offset = Kamon.gauge("journal-materializer.offset", MeasurementUnit.time().milliseconds()); + this.delay = Kamon.gauge("journal-materializer.delay", MeasurementUnit.time().milliseconds()); + this.remaining = Kamon.gauge("journal-materializer.remaining", MeasurementUnit.time().milliseconds()); } - public Counter getEvents() { - return events; + public Counter getEvents(int index) { + return events.refine(baseTags.put("index", String.valueOf(index)).toJavaMap()); } public Counter getRestarts() { return restarts; } - public Histogram getOffset(int index) { + public Gauge getOffset(int index) { return offset.refine(baseTags.put("index", String.valueOf(index)).toJavaMap()); } - public Histogram getReimportRemaining() { + public Gauge getDelay(int index) { + return delay.refine(baseTags.put("index", String.valueOf(index)).toJavaMap()); + } + + public Gauge getRemaining(int index) { + return remaining.refine(baseTags.put("index", String.valueOf(index)).toJavaMap()); + } + + public Gauge getReimportRemaining() { return reimportRemaining; } } diff --git a/ts-reaktive-actors/src/main/java/com/tradeshift/reaktive/materialize/MaterializerWorkers.java b/ts-reaktive-actors/src/main/java/com/tradeshift/reaktive/materialize/MaterializerWorkers.java index 94c2142d..a2d674a7 100644 --- a/ts-reaktive-actors/src/main/java/com/tradeshift/reaktive/materialize/MaterializerWorkers.java +++ b/ts-reaktive-actors/src/main/java/com/tradeshift/reaktive/materialize/MaterializerWorkers.java @@ -3,16 +3,17 @@ import static com.tradeshift.reaktive.protobuf.UUIDs.toJava; import static com.tradeshift.reaktive.protobuf.UUIDs.toProtobuf; -import java.time.Duration; import java.time.Instant; -import java.time.temporal.ChronoUnit; import java.time.temporal.TemporalAmount; import java.util.UUID; -import java.util.concurrent.TimeUnit; import java.util.function.Function; import com.tradeshift.reaktive.protobuf.MaterializerActor.MaterializerActorEvent; import com.tradeshift.reaktive.protobuf.MaterializerActor.MaterializerActorEvent.Worker; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import com.tradeshift.reaktive.protobuf.UUIDs; import io.vavr.collection.Seq; @@ -24,6 +25,8 @@ * timestamp when it's finished. **/ public class MaterializerWorkers { + private static final Logger log = LoggerFactory.getLogger(MaterializerWorkers.class); + public static MaterializerWorkers empty(TemporalAmount rollback) { return new MaterializerWorkers(Vector.empty(), rollback); } @@ -98,7 +101,8 @@ public MaterializerWorkers applyEvent(MaterializerActorEvent event) { public MaterializerActorEvent onWorkerProgress(UUID workerId, Instant timestamp) { int index = workers.map(Worker::getId).indexOf(toProtobuf(workerId)); if (index == -1) { - new IllegalArgumentException("Unknown worker: " + workerId); + log.warn("Progress for unknown worker: {}, ignoring.", workerId); + return unchanged(); } Worker worker = workers.apply(index); if (worker.hasEndTimestamp() && timestamp.toEpochMilli() >= worker.getEndTimestamp()) { @@ -129,12 +133,35 @@ public MaterializerActorEvent onWorkerProgress(UUID workerId, Instant timestamp) } } + /** + * Reimport all timestamps, by removing any gaps between workers, and changing the first worker + * to re-start at zero. + */ + public MaterializerActorEvent reset() { + Worker zero = Worker.newBuilder() + .setId(toProtobuf(UUID.randomUUID())) + .setTimestamp(0L) + .build(); + + if (workers.size() <= 1) { + return toEvent(Vector.of(zero)); + } else { + return toEvent(workers.update(0, zero).sliding(2) + .map(pair -> pair.apply(0).toBuilder() + .setEndTimestamp(pair.apply(1).getTimestamp()) + .build()) + .toVector() + .append(workers.last()) + ); + } + } + private MaterializerActorEvent unchanged() { return toEvent(workers); } - private static MaterializerActorEvent toEvent(Seq workers) { - return MaterializerActorEvent.newBuilder() .addAllWorker(workers) .build(); + private static MaterializerActorEvent toEvent(Iterable workers) { + return MaterializerActorEvent.newBuilder().addAllWorker(workers) .build(); } /** diff --git a/ts-reaktive-actors/src/main/resources/reference.conf b/ts-reaktive-actors/src/main/resources/reference.conf index 48aeb97f..5523f78a 100644 --- a/ts-reaktive-actors/src/main/resources/reference.conf +++ b/ts-reaktive-actors/src/main/resources/reference.conf @@ -14,6 +14,13 @@ ts-reaktive { # be imported in parallel. batch-size = 4 + # How many imported events to maximally process before saving and reporting progress. + # If more than this value of events occur with the same timestamp, they'll still be processed + # together. + # This used to default to max-events-per-timestamp, but is lower now to get more accurate + # progress report metrics into Kamon. + update-size = 256 + # Delay before restarting the event stream to the backend if it completes or fails restart-delay = 10 seconds diff --git a/ts-reaktive-actors/src/test/java/com/tradeshift/reaktive/materialize/MaterializerWorkersTheories.java b/ts-reaktive-actors/src/test/java/com/tradeshift/reaktive/materialize/MaterializerWorkersTheories.java index ed85f6e8..3a080c0e 100644 --- a/ts-reaktive-actors/src/test/java/com/tradeshift/reaktive/materialize/MaterializerWorkersTheories.java +++ b/ts-reaktive-actors/src/test/java/com/tradeshift/reaktive/materialize/MaterializerWorkersTheories.java @@ -114,6 +114,28 @@ public class MaterializerWorkersTheories { }); }); }); + + describe("MaterializerWorkers.reset()", () -> { + it("should never have gaps in its result", () -> { + qt() + .forAll(Generators.materializerWorkers()) + .checkAssert(w -> { + MaterializerActorEvent event = w.reset(); + for (int i = 1; i < event.getWorkerCount(); i++) { + assertThat(event.getWorker(i-1).getEndTimestamp()) + .describedAs("Worker %d's end timestamp, in event %s", i, event) + .isEqualTo(event.getWorker(i).getTimestamp()); + + assertThat(event.getWorker(i).getTimestamp()) + .isEqualTo(w.getTimestamp(w.getIds().apply(i)).toEpochMilli()); + } + + MaterializerWorkers p = w.applyEvent(event); + assertSane(p); + }); + }); + + }); } private Condition workerForTime(Instant time) {