Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

first stab at snapshotting #18

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,24 @@
*/
package io.debezium.connector.db2as400;

import java.util.Optional;

import io.debezium.jdbc.MainConnectionProvidingConnectionFactory;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.notification.NotificationService;
import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotChangeEventSource;
import io.debezium.pipeline.source.snapshot.incremental.SignalBasedIncrementalSnapshotChangeEventSource;
import io.debezium.pipeline.source.spi.ChangeEventSourceFactory;
import io.debezium.pipeline.source.spi.DataChangeEventListener;
import io.debezium.pipeline.source.spi.SnapshotChangeEventSource;
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
import io.debezium.pipeline.source.spi.StreamingChangeEventSource;
import io.debezium.relational.TableId;
import io.debezium.snapshot.SnapshotterService;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.util.Clock;
import io.debezium.util.Strings;

public class As400ChangeEventSourceFactory implements ChangeEventSourceFactory<As400Partition, As400OffsetContext> {

Expand Down Expand Up @@ -59,4 +66,27 @@ public StreamingChangeEventSource<As400Partition, As400OffsetContext> getStreami
return new As400StreamingChangeEventSource(configuration, rpcConnection, jdbcConnectionFactory.mainConnection(),
dispatcher, errorHandler, clock, schema);
}

@Override
public Optional<IncrementalSnapshotChangeEventSource<As400Partition, ? extends DataCollectionId>> getIncrementalSnapshotChangeEventSource(As400OffsetContext offsetContext,
SnapshotProgressListener<As400Partition> snapshotProgressListener,
DataChangeEventListener<As400Partition> dataChangeEventListener,
NotificationService<As400Partition, As400OffsetContext> notificationService) {
// If no data collection id is provided, don't return an instance as the implementation requires
// that a signal data collection id be provided to work.
if (Strings.isNullOrEmpty(configuration.getSignalingDataCollectionId())) {
return Optional.empty();
}
final SignalBasedIncrementalSnapshotChangeEventSource<As400Partition, TableId> incrementalSnapshotChangeEventSource = new SignalBasedIncrementalSnapshotChangeEventSource<>(
configuration,
jdbcConnectionFactory.mainConnection(),
dispatcher,
schema,
clock,
snapshotProgressListener,
dataChangeEventListener,
notificationService);
return Optional.of(incrementalSnapshotChangeEventSource);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@
import io.debezium.connector.SnapshotRecord;
import io.debezium.ibmi.db2.journal.retrieve.JournalProcessedPosition;
import io.debezium.ibmi.db2.journal.retrieve.JournalReceiver;
import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotContext;
import io.debezium.pipeline.source.snapshot.incremental.SignalBasedIncrementalSnapshotContext;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.txmetadata.TransactionContext;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.TableId;
import io.debezium.spi.schema.DataCollectionId;

public class As400OffsetContext implements OffsetContext {
Expand Down Expand Up @@ -50,6 +53,7 @@ public class As400OffsetContext implements OffsetContext {
private final String inclueTables;
private boolean hasNewTables = false;
private volatile boolean snapshotComplete = false;
private final IncrementalSnapshotContext<TableId> incrementalSnapshotContext = new SignalBasedIncrementalSnapshotContext<>();

public As400OffsetContext(As400ConnectorConfig connectorConfig) {
super();
Expand All @@ -70,7 +74,7 @@ public As400OffsetContext(As400ConnectorConfig connectorConfig, JournalProcessed
}

public As400OffsetContext(As400ConnectorConfig connectorConfig, JournalProcessedPosition position, String includeTables,
boolean snapshotComplete) {
boolean snapshotComplete, IncrementalSnapshotContext<TableId> incrementalSnapshotContext) {
super();
partition = Collections.singletonMap(SERVER_PARTITION_KEY, connectorConfig.getLogicalName());
this.position = position;
Expand Down Expand Up @@ -209,7 +213,7 @@ public As400OffsetContext load(Map<String, ?> map) {
Instant time = (TimeStr == null) ? Instant.ofEpochSecond(0) : Instant.ofEpochSecond(Long.parseLong(TimeStr));
position = new JournalProcessedPosition(offset, new JournalReceiver(receiver, receiverLibrary), time, processed);
}
return new As400OffsetContext(connectorConfig, position, inclueTables, snapshotComplete);
return new As400OffsetContext(connectorConfig, position, inclueTables, snapshotComplete, SignalBasedIncrementalSnapshotContext.load(map));
}
}

Expand All @@ -230,4 +234,9 @@ public String toString() {
public void markSnapshotRecord(SnapshotRecord record) {
sourceInfo.setSnapshot(record);
}

@Override
public IncrementalSnapshotContext<?> getIncrementalSnapshotContext() {
return incrementalSnapshotContext;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import io.debezium.DebeziumException;
import io.debezium.connector.db2as400.As400RpcConnection.BlockingReceiverConsumer;
import io.debezium.connector.db2as400.As400RpcConnection.RpcException;
import io.debezium.data.Envelope.Operation;
import io.debezium.ibmi.db2.journal.retrieve.JournalEntryType;
import io.debezium.ibmi.db2.journal.retrieve.JournalProcessedPosition;
Expand Down Expand Up @@ -336,4 +337,16 @@ private BlockingReceiverConsumer processJournalEntries(As400Partition partition,
private boolean ignore(JournalEntryType journalCode) {
return journalCode == JournalEntryType.OPEN || journalCode == JournalEntryType.CLOSE;
}

@Override
public As400OffsetContext getOffsetContext() {
// TODO should this be the last process position?
try {
return new As400OffsetContext(connectorConfig, new JournalProcessedPosition(dataConnection.getCurrentPosition(), null, true));
}
catch (RpcException e) {
log.error("failed to retrieve journal position", e);
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public boolean retrieveJournal(JournalProcessedPosition previousPosition, final
this.header = new FirstHeader(0, 0, 0, OffsetStatus.NOT_CALLED,
new JournalProcessedPosition(range.end(), Instant.EPOCH, true));

log.debug("start equals end - range {}", range);
log.trace("start equals end - range {}", range);
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ public AS400JDBCConnectionForcedCcsid() {
@Override
public ConvTable getConverter(int ccsid) throws SQLException {
if (this.fromCcsid != null && fromCcsid.intValue() == ccsid && toCcsid != null) {
log.fine(() -> String.format("requested ccsid %d using replacement ccsid %d\n\t%s", ccsid, toCcsid,
getStack()));
return super.getConverter(this.toCcsid);
}
log.fine(() -> String.format("requested ccsid %d using parent converter\n\t%s", ccsid, getStack()));
Expand Down
Loading