Skip to content

Commit

Permalink
Add seedRemote flag to TranslogConfig
Browse files Browse the repository at this point in the history
Signed-off-by: Bhumika Saini <[email protected]>
  • Loading branch information
Bhumika Saini committed Apr 22, 2024
1 parent aa8a905 commit 7939aeb
Show file tree
Hide file tree
Showing 15 changed files with 83 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ public IndexShard(
logger.debug("state: [CREATED]");

this.checkIndexOnStartup = indexSettings.getValue(IndexSettings.INDEX_CHECK_ON_STARTUP);
this.translogConfig = new TranslogConfig(shardId, shardPath().resolveTranslog(), indexSettings, bigArrays, nodeId);
this.translogConfig = new TranslogConfig(shardId, shardPath().resolveTranslog(), indexSettings, bigArrays, nodeId, seedRemote);
final String aId = shardRouting.allocationId().getId();
final long primaryTerm = indexSettings.getIndexMetadata().primaryTerm(shardId.id());
this.pendingPrimaryTerm = primaryTerm;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,12 @@ public class RemoteBlobStoreInternalTranslogFactory implements TranslogFactory {

private final RemoteStoreSettings remoteStoreSettings;

private final boolean seedRemote;

public RemoteBlobStoreInternalTranslogFactory(
Supplier<RepositoriesService> repositoriesServiceSupplier,
ThreadPool threadPool,
String repositoryName,
RemoteTranslogTransferTracker remoteTranslogTransferTracker,
RemoteStoreSettings remoteStoreSettings,
boolean seedRemote
RemoteStoreSettings remoteStoreSettings
) {
Repository repository;
try {
Expand All @@ -57,7 +54,6 @@ public RemoteBlobStoreInternalTranslogFactory(
this.threadPool = threadPool;
this.remoteTranslogTransferTracker = remoteTranslogTransferTracker;
this.remoteStoreSettings = remoteStoreSettings;
this.seedRemote = seedRemote;
}

@Override
Expand All @@ -84,8 +80,7 @@ public Translog newTranslog(
threadPool,
startedPrimarySupplier,
remoteTranslogTransferTracker,
remoteStoreSettings,
seedRemote
remoteStoreSettings
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ public class RemoteFsTranslog extends Translog {
private static final int SYNC_PERMIT = 1;
private final Semaphore syncPermit = new Semaphore(SYNC_PERMIT);
private final AtomicBoolean pauseSync = new AtomicBoolean(false);
private final boolean seedRemote;

public RemoteFsTranslog(
TranslogConfig config,
Expand All @@ -104,12 +103,10 @@ public RemoteFsTranslog(
ThreadPool threadPool,
BooleanSupplier startedPrimarySupplier,
RemoteTranslogTransferTracker remoteTranslogTransferTracker,
RemoteStoreSettings remoteStoreSettings,
boolean seedRemote
RemoteStoreSettings remoteStoreSettings
) throws IOException {
super(config, translogUUID, deletionPolicy, globalCheckpointSupplier, primaryTermSupplier, persistedSequenceNumberConsumer);
logger = Loggers.getLogger(getClass(), shardId);
this.seedRemote = seedRemote;
this.startedPrimarySupplier = startedPrimarySupplier;
this.remoteTranslogTransferTracker = remoteTranslogTransferTracker;
fileTransferTracker = new FileTransferTracker(shardId, remoteTranslogTransferTracker);
Expand All @@ -123,7 +120,7 @@ public RemoteFsTranslog(
remoteStoreSettings
);
try {
download(translogTransferManager, location, logger, seedRemote);
download(translogTransferManager, location, logger, config.shouldSeedRemote());
Checkpoint checkpoint = readCheckpoint(location);
logger.info("Downloaded data from remote translog till maxSeqNo = {}", checkpoint.maxSeqNo);
this.readers.addAll(recoverFromFiles(checkpoint));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,25 @@ public final class TranslogConfig {
private final Path translogPath;
private final ByteSizeValue bufferSize;
private final String nodeId;
private final boolean seedRemote;

/**
* Creates a new TranslogConfig instance
* @param shardId the shard ID this translog belongs to
* @param translogPath the path to use for the transaction log files
* @param indexSettings the index settings used to set internal variables
* @param bigArrays a bigArrays instance used for temporarily allocating write operations
* @param seedRemote boolean denoting whether remote store needs to be seeded as part of remote migration
*/
public TranslogConfig(ShardId shardId, Path translogPath, IndexSettings indexSettings, BigArrays bigArrays, String nodeId) {
this(shardId, translogPath, indexSettings, bigArrays, DEFAULT_BUFFER_SIZE, nodeId);
public TranslogConfig(
ShardId shardId,
Path translogPath,
IndexSettings indexSettings,
BigArrays bigArrays,
String nodeId,
boolean seedRemote
) {
this(shardId, translogPath, indexSettings, bigArrays, DEFAULT_BUFFER_SIZE, nodeId, seedRemote);
}

TranslogConfig(
Expand All @@ -77,14 +86,16 @@ public TranslogConfig(ShardId shardId, Path translogPath, IndexSettings indexSet
IndexSettings indexSettings,
BigArrays bigArrays,
ByteSizeValue bufferSize,
String nodeId
String nodeId,
boolean seedRemote
) {
this.bufferSize = bufferSize;
this.indexSettings = indexSettings;
this.shardId = shardId;
this.translogPath = translogPath;
this.bigArrays = bigArrays;
this.nodeId = nodeId;
this.seedRemote = seedRemote;
}

/**
Expand Down Expand Up @@ -125,4 +136,8 @@ public ByteSizeValue getBufferSize() {
public String getNodeId() {
return nodeId;
}

public boolean shouldSeedRemote() {
return seedRemote;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,8 @@ private boolean isTranslogClean(ShardPath shardPath, ClusterState clusterState,
translogPath,
indexSettings,
BigArrays.NON_RECYCLING_INSTANCE,
""
"",
false
);
long primaryTerm = indexSettings.getIndexMetadata().primaryTerm(shardPath.getShardId().id());
// We open translog to check for corruption, do not clean anything.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -526,17 +526,15 @@ private static BiFunction<IndexSettings, ShardRouting, TranslogFactory> getTrans
threadPool,
indexSettings.getRemoteStoreTranslogRepository(),
remoteStoreStatsTrackerFactory.getRemoteTranslogTransferTracker(shardRouting.shardId()),
remoteStoreSettings,
false
remoteStoreSettings
);
} else if (isRemoteDataAttributePresent(settings) && shardRouting.primary()) {
return new RemoteBlobStoreInternalTranslogFactory(
repositoriesServiceSupplier,
threadPool,
RemoteStoreNodeAttribute.getRemoteStoreTranslogRepo(indexSettings.getNodeSettings()),
remoteStoreStatsTrackerFactory.getRemoteTranslogTransferTracker(shardRouting.shardId()),
remoteStoreSettings,
true
remoteStoreSettings
);
}
return new InternalTranslogFactory();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,7 @@ private IndexService newIndexService(IndexModule module) throws IOException {
threadPool,
indexSettings.getRemoteStoreTranslogRepository(),
new RemoteTranslogTransferTracker(shardRouting.shardId(), 10),
DefaultRemoteStoreSettings.INSTANCE,
false
DefaultRemoteStoreSettings.INSTANCE
);
}
return new InternalTranslogFactory();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4002,7 +4002,7 @@ public void testRecoverFromForeignTranslog() throws IOException {
final Path badTranslogLog = createTempDir();
final String badUUID = Translog.createEmptyTranslog(badTranslogLog, SequenceNumbers.NO_OPS_PERFORMED, shardId, primaryTerm.get());
Translog translog = new LocalTranslog(
new TranslogConfig(shardId, badTranslogLog, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE, ""),
new TranslogConfig(shardId, badTranslogLog, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE, "", false),
badUUID,
createTranslogDeletionPolicy(INDEX_SETTINGS),
() -> SequenceNumbers.NO_OPS_PERFORMED,
Expand All @@ -4020,7 +4020,8 @@ public void testRecoverFromForeignTranslog() throws IOException {
translog.location(),
config.getIndexSettings(),
BigArrays.NON_RECYCLING_INSTANCE,
""
"",
false
);

EngineConfig brokenConfig = new EngineConfig.Builder().shardId(shardId)
Expand Down Expand Up @@ -7714,7 +7715,8 @@ public void testNotWarmUpSearcherInEngineCtor() throws Exception {
createTempDir(),
config.getTranslogConfig().getIndexSettings(),
config.getTranslogConfig().getBigArrays(),
""
"",
false
);
EngineConfig configWithWarmer = new EngineConfig.Builder().shardId(config.getShardId())
.threadPool(config.getThreadPool())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,8 @@ public void setupListeners() throws Exception {
createTempDir("translog"),
indexSettings,
BigArrays.NON_RECYCLING_INSTANCE,
""
"",
false
);
Engine.EventListener eventListener = new Engine.EventListener() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public void testRecoveryFromTranslog() throws IOException {
LocalCheckpointTracker tracker = new LocalCheckpointTracker(NO_OPS_PERFORMED, NO_OPS_PERFORMED);
try {
translogManager = new InternalTranslogManager(
new TranslogConfig(shardId, primaryTranslogDir, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE, ""),
new TranslogConfig(shardId, primaryTranslogDir, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE, "", false),
primaryTerm,
globalCheckpoint::get,
createTranslogDeletionPolicy(INDEX_SETTINGS),
Expand Down Expand Up @@ -68,7 +68,7 @@ public void testRecoveryFromTranslog() throws IOException {
translogManager.syncTranslog();
translogManager.close();
translogManager = new InternalTranslogManager(
new TranslogConfig(shardId, primaryTranslogDir, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE, ""),
new TranslogConfig(shardId, primaryTranslogDir, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE, "", false),
primaryTerm,
globalCheckpoint::get,
createTranslogDeletionPolicy(INDEX_SETTINGS),
Expand Down Expand Up @@ -117,7 +117,7 @@ public void testTranslogRollsGeneration() throws IOException {
LocalCheckpointTracker tracker = new LocalCheckpointTracker(NO_OPS_PERFORMED, NO_OPS_PERFORMED);
try {
translogManager = new InternalTranslogManager(
new TranslogConfig(shardId, primaryTranslogDir, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE, ""),
new TranslogConfig(shardId, primaryTranslogDir, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE, "", false),
primaryTerm,
globalCheckpoint::get,
createTranslogDeletionPolicy(INDEX_SETTINGS),
Expand Down Expand Up @@ -147,7 +147,7 @@ public void testTranslogRollsGeneration() throws IOException {
translogManager.syncTranslog();
translogManager.close();
translogManager = new InternalTranslogManager(
new TranslogConfig(shardId, primaryTranslogDir, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE, ""),
new TranslogConfig(shardId, primaryTranslogDir, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE, "", false),
primaryTerm,
globalCheckpoint::get,
createTranslogDeletionPolicy(INDEX_SETTINGS),
Expand Down Expand Up @@ -182,7 +182,7 @@ public void testTrimOperationsFromTranslog() throws IOException {
LocalCheckpointTracker tracker = new LocalCheckpointTracker(NO_OPS_PERFORMED, NO_OPS_PERFORMED);
try {
translogManager = new InternalTranslogManager(
new TranslogConfig(shardId, primaryTranslogDir, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE, ""),
new TranslogConfig(shardId, primaryTranslogDir, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE, "", false),
primaryTerm,
globalCheckpoint::get,
createTranslogDeletionPolicy(INDEX_SETTINGS),
Expand Down Expand Up @@ -214,7 +214,7 @@ public void testTrimOperationsFromTranslog() throws IOException {

translogManager.close();
translogManager = new InternalTranslogManager(
new TranslogConfig(shardId, primaryTranslogDir, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE, ""),
new TranslogConfig(shardId, primaryTranslogDir, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE, "", false),
primaryTerm,
globalCheckpoint::get,
createTranslogDeletionPolicy(INDEX_SETTINGS),
Expand Down Expand Up @@ -253,7 +253,7 @@ public void testTranslogSync() throws IOException {
ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(), B_1, null);
AtomicReference<InternalTranslogManager> translogManagerAtomicReference = new AtomicReference<>();
translogManager = new InternalTranslogManager(
new TranslogConfig(shardId, primaryTranslogDir, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE, ""),
new TranslogConfig(shardId, primaryTranslogDir, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE, "", false),
primaryTerm,
globalCheckpoint::get,
createTranslogDeletionPolicy(INDEX_SETTINGS),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ private TranslogConfig getTranslogConfig(final Path path, final Settings setting
);

final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(shardId.getIndex(), settings);
return new TranslogConfig(shardId, path, indexSettings, NON_RECYCLING_INSTANCE, bufferSize, "");
return new TranslogConfig(shardId, path, indexSettings, NON_RECYCLING_INSTANCE, bufferSize, "", false);
}

private Location addToTranslogAndList(Translog translog, List<Translog.Operation> list, Translog.Operation op) throws IOException {
Expand Down Expand Up @@ -1453,7 +1453,8 @@ public void testTranslogWriterCanFlushInAddOrReadCall() throws IOException {
temp.getIndexSettings(),
temp.getBigArrays(),
new ByteSizeValue(1, ByteSizeUnit.KB),
""
"",
false
);

final Set<Long> persistedSeqNos = new HashSet<>();
Expand Down Expand Up @@ -1552,7 +1553,8 @@ public void testTranslogWriterFsyncedWithLocalTranslog() throws IOException {
temp.getIndexSettings(),
temp.getBigArrays(),
new ByteSizeValue(1, ByteSizeUnit.KB),
""
"",
false
);

final Set<Long> persistedSeqNos = new HashSet<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,7 @@ private RemoteFsTranslog create(Path path, BlobStoreRepository repository, Strin
threadPool,
primaryMode::get,
new RemoteTranslogTransferTracker(shardId, 10),
DefaultRemoteStoreSettings.INSTANCE,
false
DefaultRemoteStoreSettings.INSTANCE
);
}

Expand Down Expand Up @@ -225,7 +224,7 @@ private TranslogConfig getTranslogConfig(final Path path, final Settings setting
// To simulate that the node is remote backed
Settings nodeSettings = Settings.builder().put("node.attr.remote_store.translog.repository", "my-repo-1").build();
final IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(shardId.getIndex(), settings, nodeSettings);
return new TranslogConfig(shardId, path, indexSettings, NON_RECYCLING_INSTANCE, bufferSize, "");
return new TranslogConfig(shardId, path, indexSettings, NON_RECYCLING_INSTANCE, bufferSize, "", false);
}

private BlobStoreRepository createRepository() {
Expand Down Expand Up @@ -400,7 +399,8 @@ private TranslogConfig getConfig(int gensToKeep) {
temp.getIndexSettings(),
temp.getBigArrays(),
new ByteSizeValue(1, ByteSizeUnit.KB),
""
"",
false
);
return config;
}
Expand Down Expand Up @@ -463,8 +463,7 @@ public void testExtraGenToKeep() throws Exception {
threadPool,
() -> Boolean.TRUE,
new RemoteTranslogTransferTracker(shardId, 10),
DefaultRemoteStoreSettings.INSTANCE,
false
DefaultRemoteStoreSettings.INSTANCE
) {
@Override
ChannelFactory getChannelFactory() {
Expand Down Expand Up @@ -1514,8 +1513,7 @@ public void testTranslogWriterCanFlushInAddOrReadCall() throws IOException {
threadPool,
() -> Boolean.TRUE,
new RemoteTranslogTransferTracker(shardId, 10),
DefaultRemoteStoreSettings.INSTANCE,
false
DefaultRemoteStoreSettings.INSTANCE
) {
@Override
ChannelFactory getChannelFactory() {
Expand Down Expand Up @@ -1564,7 +1562,8 @@ public void testTranslogWriterFsyncDisabledInRemoteFsTranslog() throws IOExcepti
temp.getIndexSettings(),
temp.getBigArrays(),
new ByteSizeValue(1, ByteSizeUnit.KB),
""
"",
false
);

final Set<Long> persistedSeqNos = new HashSet<>();
Expand Down Expand Up @@ -1624,8 +1623,7 @@ public void force(boolean metaData) throws IOException {
threadPool,
() -> Boolean.TRUE,
new RemoteTranslogTransferTracker(shardId, 10),
DefaultRemoteStoreSettings.INSTANCE,
false
DefaultRemoteStoreSettings.INSTANCE
) {
@Override
ChannelFactory getChannelFactory() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,14 @@ protected Translog createTranslog(LongSupplier primaryTermSupplier) throws IOExc
}

protected Translog createTranslog(Path translogPath, LongSupplier primaryTermSupplier) throws IOException {
TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE, "");
TranslogConfig translogConfig = new TranslogConfig(
shardId,
translogPath,
INDEX_SETTINGS,
BigArrays.NON_RECYCLING_INSTANCE,
"",
false
);
String translogUUID = Translog.createEmptyTranslog(
translogPath,
SequenceNumbers.NO_OPS_PERFORMED,
Expand Down
Loading

0 comments on commit 7939aeb

Please sign in to comment.