Skip to content

Commit

Permalink
Add some log messages to improve traceability
Browse files Browse the repository at this point in the history
  • Loading branch information
carlesarnal committed May 14, 2024
1 parent d01e086 commit 332252b
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -187,9 +187,8 @@ private void autoCreateTopics() {
}

/**
* Start the KSQL Kafka consumer thread which is responsible for subscribing to the kafka topic,
* consuming JournalRecord entries found on that topic, and applying those journal entries to
* the internal data model.
* Consume the snapshots topic, looking for the most recent snapshots in the topic. Once found, it restores the internal h2 database using the snapshot's content.
* WARNING: This has the limitation of processing the first 500 snapshots, which should be enough for most deployments.
*/
private String consumeSnapshotsTopic(KafkaConsumer<String, String> snapshotsConsumer) {
// Subscribe to the snapshots topic
Expand All @@ -211,7 +210,7 @@ private String consumeSnapshotsTopic(KafkaConsumer<String, String> snapshotsCons
try {
String path = snapshotFound.value();
if (null != path && !path.isBlank() && Files.exists(Path.of(snapshotFound.value()))) {
log.info("Snapshot with path {} found.", snapshotFound.value());
log.debug("Snapshot with path {} found.", snapshotFound.value());
snapshotRecordKey = snapshotFound.key();
mostRecentSnapshotPath = Path.of(snapshotFound.value());
}
Expand All @@ -229,7 +228,6 @@ private String consumeSnapshotsTopic(KafkaConsumer<String, String> snapshotsCons
}

snapshotsConsumer.commitSync();

return snapshotRecordKey;
}

Expand All @@ -248,9 +246,6 @@ private void startConsumerThread(final KafkaConsumer<KafkaSqlMessageKey, KafkaSq
Runnable runner = () -> {
try (consumer) {
log.info("Subscribing to {}", configuration.topic());

//TODO use the snapshot record metadata to put the journal consumer into the appropiate offset so it does not consume unneeded messages

// Subscribe to the journal topic
Collection<String> topics = Collections.singleton(configuration.topic());
consumer.subscribe(topics);
Expand Down Expand Up @@ -888,15 +883,13 @@ public String triggerSnapshotCreation() throws RegistryStorageException {
Path path = Path.of(configuration.snapshotLocation(), snapshotId + ".sql");
var message = new CreateSnapshot1Message(path.toString(), snapshotId);
this.lastTriggeredSnapshot = snapshotId;
log.debug("Snapshot with id {} triggered.", snapshotId);
var uuid = ConcurrentUtil.get(submitter.submitMessage(message));
String snapshotLocation = (String) coordinator.waitForResponse(uuid);

//Then we send a new message to the snapshots topic, using the snapshot id as the key of the snapshot message.
ProducerRecord<String, String> record = new ProducerRecord<>(configuration.snapshotsTopic(), 0, snapshotId, snapshotLocation,
Collections.emptyList());

RecordMetadata recordMetadata = ConcurrentUtil.get(snapshotsProducer.apply(record));

return snapshotLocation;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3492,6 +3492,7 @@ public String triggerSnapshotCreation() throws RegistryStorageException {

@Override
public String createSnapshot(String location) throws RegistryStorageException {
log.debug("Creating internal database snapshot to location {}.", location);
handles.withHandleNoException(handle -> {
handle.createQuery(sqlStatements.createDataSnapshot())
.bind(0, location).mapTo(Integer.class);
Expand Down

0 comments on commit 332252b

Please sign in to comment.