Skip to content

Commit

Permalink
Merge pull request #129 from Tradeshift/worker_stats
Browse files Browse the repository at this point in the history
Update kamon stats for multiple workers, and implement reset
  • Loading branch information
jypma authored Jul 18, 2019
2 parents 743b651 + 807cbd9 commit a1199cc
Show file tree
Hide file tree
Showing 5 changed files with 161 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,14 @@ public abstract class MaterializerActor<E> extends AbstractPersistentActor {
private final FiniteDuration updateAccuracy;
private final FiniteDuration restartDelay;
private final int batchSize;
private final int updateSize;
private final int maxEventsPerTimestamp;
private final int maxWorkerCount;
private final Duration updateOffsetInterval;
private final AtomicReference<Instant> reimportProgress = new AtomicReference<>();
private final ActorMaterializer materializer;

private MaterializerWorkers workers;
private volatile MaterializerWorkers workers;
private Option<KillSwitch> ongoingReimport = Option.none();
private Map<UUID,AtomicLong> workerEndTimestamps = HashMap.empty();

Expand All @@ -83,6 +84,7 @@ protected MaterializerActor() {
updateAccuracy = FiniteDuration.create(config.getDuration("update-accuracy", SECONDS), SECONDS);
restartDelay = FiniteDuration.create(config.getDuration("restart-delay", SECONDS), SECONDS);
batchSize = config.getInt("batch-size");
updateSize = config.getInt("update-size");
maxEventsPerTimestamp = config.getInt("max-events-per-timestamp");
maxWorkerCount = config.getInt("max-worker-count");
updateOffsetInterval = config.getDuration("update-offset-interval");
Expand Down Expand Up @@ -111,8 +113,7 @@ public Receive createReceive() {
reimport(msg.entityIds);
})
.match(QueryProgress.class, msg -> {
Option<Instant> reimportP = Option.of(reimportProgress.get());
sender().tell(new Progress(reimportP, workers), self());
sendProgress();
})
.matchEquals("reimportComplete", msg -> {
log.info("Re-import completed.");
Expand All @@ -131,7 +132,7 @@ public Receive createReceive() {
applyEvent(evt);
context().system().scheduler().scheduleOnce(
updateAccuracy, sender(), "ack", context().dispatcher(), self());
if (lastSequenceNr() > 1) {
if ((lastSequenceNr() > 1) && ((lastSequenceNr() % 25) == 0)) {
deleteMessages(lastSequenceNr() - 1);
}
});
Expand All @@ -141,7 +142,7 @@ public Receive createReceive() {
})
.match(DeleteMessagesFailure.class, msg -> {
log.error(msg.cause(), "Delete messages failed at offsets " + workers
+ ", rethrowing", msg.cause());
+ ", rethrowing");
throw (Exception) msg.cause();
})
.match(WorkerFailure.class, failure -> {
Expand All @@ -153,25 +154,42 @@ public Receive createReceive() {
log.debug("Completed {}, now offset is: {}", msg.worker, workers);
onWorkerStopped(msg.worker);
})
.matchEquals("reset", msg -> {
/* replaced by creating extra workers */
log.warning("Received legacy 'reset' message");
.matchEquals("reset", msg -> { // for compatibility
reset();
})
.match(Reset.class, msg -> {
reset();
})
.build();
}

private void sendProgress() {
Option<Instant> reimportP = Option.of(reimportProgress.get());
sender().tell(new Progress(reimportP, workers), self());
}

private void createWorker(Instant timestamp, Option<Instant> endTimestamp) {
if (workers.getIds().size() >= maxWorkerCount) {
log.warning("Ignoring request to start extra worker at {}, because maximum of {} is already reached.",
timestamp, workers.getIds().size());
return;
}

persist(workers.startWorker(timestamp, endTimestamp), evt -> {
persistAndApply(workers.startWorker(timestamp, endTimestamp));
}

private void reset() {
persistAndApply(workers.reset());
}

private void persistAndApply(MaterializerActorEvent evt) {
persist(evt, e -> {
applyEvent(evt);

// Start the new worker:
// The first worker will have been stopped and we have a new one at the epoch. Start it.
workers.getIds().removeAll(workerEndTimestamps.keySet()).forEach(this::materializeEvents);

sendProgress();
});
}

Expand Down Expand Up @@ -268,14 +286,14 @@ private void materializeEvents(UUID worker) {
maxEventsPerTimestamp, rollback))

// Allow multiple timestamps to be processed simultaneously
.groupedWeightedWithin(maxEventsPerTimestamp, seq -> (long) seq.size(), Duration.ofSeconds(1))
.groupedWeightedWithin(updateSize, seq -> (long) seq.size(), Duration.ofSeconds(1))

// Process them, and emit a single timestamp at t
.mapAsync(1, listOfSeq ->
// re-group into batchSize, each one no longer necessarily within one timestamp
Source.from(Vector.ofAll(listOfSeq).flatMap(seq -> seq))
.grouped(batchSize)
.mapAsync(1, envelopeList -> materialize(envelopeList))
.mapAsync(1, envelopeList -> materialize(workers.getIds().indexOf(worker), envelopeList))
.runWith(Sink.ignore(), materializer)
.thenApply(done -> timestampOf(listOfSeq.get(listOfSeq.size() - 1).last()).toEpochMilli())
)
Expand Down Expand Up @@ -310,7 +328,7 @@ private void reimport(Set<String> entityIds) {
.conflate((t1, t2) -> (t1.isAfter(t2)) ? t1 : t2)
.throttle(1, Duration.ofSeconds(1))
.map(t -> {
metrics.getReimportRemaining().record(ChronoUnit.MILLIS.between(t, maxTimestamp));
metrics.getReimportRemaining().set(ChronoUnit.MILLIS.between(t, maxTimestamp));
return Done.getInstance();
})
.toMat(Sink.onComplete(result -> self.tell("reimportComplete", self())), Keep.left())
Expand Down Expand Up @@ -340,8 +358,13 @@ private void cancelReimport() {

private void recordOffsetMetric() {
Seq<UUID> ids = workers.getIds();
for (int i = 0; i < workers.getIds().size(); i++) {
metrics.getOffset(i).record(Duration.between(workers.getTimestamp(ids.apply(i)), Instant.now()).toMillis());
for (int i = 0; i < ids.size(); i++) {
Instant offset = workers.getTimestamp(ids.apply(i));
metrics.getOffset(i).set(offset.toEpochMilli());
metrics.getDelay(i).set(Duration.between(offset, Instant.now()).toMillis());
for (Instant end: workers.getEndTimestamp(workers.getIds().apply(i))) {
metrics.getRemaining(i).set(Duration.between(offset, end).toMillis());
}
}
}

Expand All @@ -365,24 +388,30 @@ protected String getConcurrencyKey(E envelope) {
/**
* Materialize the given envelopes in parallel, as far as their entityIds allow it.
*/
private CompletionStage<Done> materialize(java.util.List<E> envelopes) {
private CompletionStage<Done> materialize(int workerIndex, java.util.List<E> envelopes) {
long start = System.nanoTime();
return CompletableFutures.sequence(
Vector.ofAll(envelopes)
.groupBy(this::getConcurrencyKey)
.values()
.map(this::persistSequential)
.map(es -> persistSequential(workerIndex, es))
.map(c -> c.toCompletableFuture())
).thenApply(seqOfDone -> Done.getInstance());
).thenApply(seqOfDone -> {
long dur = (System.nanoTime() - start) / 1000;
log.debug("Worker {} materialized {} events in {}ms", workerIndex, envelopes.size(),
dur / 1000.0);
return Done.getInstance();
});
}

private CompletionStage<Done> persistSequential(Seq<E> seq) {
private CompletionStage<Done> persistSequential(int workerIndex, Seq<E> seq) {
if (seq.isEmpty()) {
return done;
} else {
return materialize(seq.head())
.thenCompose(done -> {
metrics.getEvents().increment();
return persistSequential(seq.tail());
metrics.getEvents(workerIndex).increment();
return persistSequential(workerIndex, seq.tail());
});
}
}
Expand Down Expand Up @@ -429,6 +458,25 @@ public static class CancelReimport implements Serializable {
private CancelReimport() {}
}

/**
* Message can be sent to "reset" this materializer, causing it to rematerialize everything.
*
* All existing workers will stay at the timestamps that they are, except for the oldest worker,
* which gets reset to the epoch. In addition, any end timestamps workers have are reset to
* the start timestamp of the next worker.
*
* This way, we queue up rematerialization of all timestamps, while retaining the same amount
* of workers, and minimizing disruptions in ongoing time sequences.
*
* A Progress message is sent back as reply.
*/
public static class Reset implements Serializable {
private static final long serialVersionUID = 1L;

public static final Reset instance = new Reset();
private Reset() {}
}

/**
* Message that can be sent to this actor to query its current progress.
*
Expand Down Expand Up @@ -494,6 +542,8 @@ public Option<Instant> getEndTimestamp() {
*
* If the maximum number of workers has been reached, or if the timestamp is too close to an
* existing worker, the request is ignored.
*
* A Progress message is sent back as reply.
*/
public static class CreateWorker implements Serializable {
private static final long serialVersionUID = 1L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,41 +5,56 @@
import io.vavr.collection.HashMap;
import kamon.Kamon;
import kamon.metric.Counter;
import kamon.metric.Histogram;
import kamon.metric.HistogramMetric;
import kamon.metric.CounterMetric;
import kamon.metric.Gauge;
import kamon.metric.GaugeMetric;
import kamon.metric.MeasurementUnit;

public class MaterializerMetrics {
private final HashMap<String, String> baseTags;
private final Counter events;
private final CounterMetric events;
private final Counter restarts;
private final Histogram reimportRemaining;
private final HistogramMetric offset;
private final Gauge reimportRemaining;
/** The current timestamp for each worker, in milliseconds since the epoch */
private final GaugeMetric offset;
/** The delay between the timestamp of each worker and now(), in milliseconds */
private final GaugeMetric delay;
/** The rime remaining for each worker, in milliseconds (if there is an end timestamp) */
private final GaugeMetric remaining;

public MaterializerMetrics(String name) {
baseTags = HashMap.of("journal-materializer", name);
Map<String, String> tags = baseTags.toJavaMap();
this.events = Kamon.counter("journal-materializer.events").refine(tags);
this.events = Kamon.counter("journal-materializer.events");
this.restarts = Kamon.counter("journal-materializer.restarts").refine(tags);
this.reimportRemaining = Kamon.histogram("journal-materializer.reimport-remaining", MeasurementUnit.time().milliseconds()).refine(tags);
this.offset = Kamon.histogram("journal-materializer.offset",
MeasurementUnit.time().milliseconds());
this.reimportRemaining = Kamon.gauge("journal-materializer.reimport-remaining", MeasurementUnit.time().milliseconds()).refine(tags);
this.offset = Kamon.gauge("journal-materializer.offset", MeasurementUnit.time().milliseconds());
this.delay = Kamon.gauge("journal-materializer.delay", MeasurementUnit.time().milliseconds());
this.remaining = Kamon.gauge("journal-materializer.remaining", MeasurementUnit.time().milliseconds());
}


public Counter getEvents() {
return events;
public Counter getEvents(int index) {
return events.refine(baseTags.put("index", String.valueOf(index)).toJavaMap());
}

public Counter getRestarts() {
return restarts;
}

public Histogram getOffset(int index) {
public Gauge getOffset(int index) {
return offset.refine(baseTags.put("index", String.valueOf(index)).toJavaMap());
}

public Histogram getReimportRemaining() {
public Gauge getDelay(int index) {
return delay.refine(baseTags.put("index", String.valueOf(index)).toJavaMap());
}

public Gauge getRemaining(int index) {
return remaining.refine(baseTags.put("index", String.valueOf(index)).toJavaMap());
}

public Gauge getReimportRemaining() {
return reimportRemaining;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,17 @@
import static com.tradeshift.reaktive.protobuf.UUIDs.toJava;
import static com.tradeshift.reaktive.protobuf.UUIDs.toProtobuf;

import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.time.temporal.TemporalAmount;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import com.tradeshift.reaktive.protobuf.MaterializerActor.MaterializerActorEvent;
import com.tradeshift.reaktive.protobuf.MaterializerActor.MaterializerActorEvent.Worker;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.tradeshift.reaktive.protobuf.UUIDs;

import io.vavr.collection.Seq;
Expand All @@ -24,6 +25,8 @@
* timestamp when it's finished.
**/
public class MaterializerWorkers {
private static final Logger log = LoggerFactory.getLogger(MaterializerWorkers.class);

public static MaterializerWorkers empty(TemporalAmount rollback) {
return new MaterializerWorkers(Vector.empty(), rollback);
}
Expand Down Expand Up @@ -98,7 +101,8 @@ public MaterializerWorkers applyEvent(MaterializerActorEvent event) {
public MaterializerActorEvent onWorkerProgress(UUID workerId, Instant timestamp) {
int index = workers.map(Worker::getId).indexOf(toProtobuf(workerId));
if (index == -1) {
new IllegalArgumentException("Unknown worker: " + workerId);
log.warn("Progress for unknown worker: {}, ignoring.", workerId);
return unchanged();
}
Worker worker = workers.apply(index);
if (worker.hasEndTimestamp() && timestamp.toEpochMilli() >= worker.getEndTimestamp()) {
Expand Down Expand Up @@ -129,12 +133,35 @@ public MaterializerActorEvent onWorkerProgress(UUID workerId, Instant timestamp)
}
}

/**
* Reimport all timestamps, by removing any gaps between workers, and changing the first worker
* to re-start at zero.
*/
public MaterializerActorEvent reset() {
Worker zero = Worker.newBuilder()
.setId(toProtobuf(UUID.randomUUID()))
.setTimestamp(0L)
.build();

if (workers.size() <= 1) {
return toEvent(Vector.of(zero));
} else {
return toEvent(workers.update(0, zero).sliding(2)
.map(pair -> pair.apply(0).toBuilder()
.setEndTimestamp(pair.apply(1).getTimestamp())
.build())
.toVector()
.append(workers.last())
);
}
}

private MaterializerActorEvent unchanged() {
return toEvent(workers);
}

private static MaterializerActorEvent toEvent(Seq<Worker> workers) {
return MaterializerActorEvent.newBuilder() .addAllWorker(workers) .build();
private static MaterializerActorEvent toEvent(Iterable<Worker> workers) {
return MaterializerActorEvent.newBuilder().addAllWorker(workers) .build();
}

/**
Expand Down
7 changes: 7 additions & 0 deletions ts-reaktive-actors/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,13 @@ ts-reaktive {
# be imported in parallel.
batch-size = 4

# How many imported events to maximally process before saving and reporting progress.
# If more than this value of events occur with the same timestamp, they'll still be processed
# together.
# This used to default to max-events-per-timestamp, but is lower now to get more accurate
# progress report metrics into Kamon.
update-size = 256

# Delay before restarting the event stream to the backend if it completes or fails
restart-delay = 10 seconds

Expand Down
Loading

0 comments on commit a1199cc

Please sign in to comment.