Skip to content

Commit

Permalink
feat: refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
MiguelAHM committed Dec 5, 2024
1 parent 4690f1e commit cce80a5
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

import static org.apache.commons.codec.binary.Hex.encodeHexString;

public abstract class AbstractMirrorImporter implements MirrorImporter {
public abstract class AbstractMirrorImporter {

final Nrtm4ClientInfoRepository nrtm4ClientInfoRepository;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.function.Consumer;

@Service
@Conditional(Nrtm4ClientCondition.class)
Expand All @@ -34,19 +33,16 @@ public class DeltaMirrorImporter extends AbstractMirrorImporter {

private final NrtmRestClient nrtmRestClient;

public static final String RECORD_SEPARATOR = "\u001E";

public DeltaMirrorImporter(final NrtmRestClient nrtmRestClient,
final Nrtm4ClientRepository nrtm4ClientRepository,
final Nrtm4ClientInfoRepository nrtm4ClientInfoRepository) {
super(nrtm4ClientInfoRepository, nrtm4ClientRepository);
this.nrtmRestClient = nrtmRestClient;
}

@Override
public void doImport(final String source, final UpdateNotificationFileResponse updateNotificationFile){
final List<UpdateNotificationFileResponse.NrtmFileLink> deltas = getNewDeltas(source, updateNotificationFile);

public void doImport(final String source,
final String sessionId,
final List<UpdateNotificationFileResponse.NrtmFileLink> deltas){
if (deltas.isEmpty()) {
LOGGER.info("No new deltas to be processed");
return;
Expand All @@ -66,30 +62,19 @@ public void doImport(final String source, final UpdateNotificationFileResponse u
return;
}

processPayload(deltaFilePayload,
firstRecord -> {
final Metadata metadata = getMetadata(firstRecord);
if (!metadata.sessionId().equals(updateNotificationFile.getSessionID())){
LOGGER.error("The session {} is not the same in the UNF and snapshot {}", metadata.sessionId(), updateNotificationFile.getSessionID());
truncateTables();
throw new IllegalArgumentException("The session is not the same in the UNF and snapshot");
}
nrtm4ClientInfoRepository.saveDeltaFileVersion(source, metadata.version, metadata.sessionId());
}
);

processPayload(deltaFilePayload, sessionId, source);
});
}

private void processPayload(final byte[] deltaFilePayload, final Consumer<String> firstRecordProcessor) {
private void processPayload(final byte[] deltaFilePayload, final String sessionId, final String source) {
ByteBuffer buffer = ByteBuffer.wrap(deltaFilePayload);
InputStream inputStream = new ByteArrayInputStream(buffer.array(), buffer.position(), buffer.remaining());
try (BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))) {
String record;
boolean isFirstRecord = true;
while ((record = reader.readLine()) != null) {
if (isFirstRecord){
firstRecordProcessor.accept(record);
processFirstDeltaRecord(record, sessionId, source);
isFirstRecord = false;
continue;
}
Expand Down Expand Up @@ -147,19 +132,6 @@ private boolean serialExist(final RpslObjectUpdateInfo rpslObjectUpdateInfo) {
return false;
}

private List<UpdateNotificationFileResponse.NrtmFileLink> getNewDeltas(String source, UpdateNotificationFileResponse updateNotificationFile) {
final NrtmClientVersionInfo nrtmClientVersionInfo = nrtm4ClientInfoRepository.getNrtmLastVersionInfoForDeltasPerSource(source);

if (nrtmClientVersionInfo == null){
return updateNotificationFile.getDeltas();
}

return updateNotificationFile.getDeltas()
.stream()
.filter(delta -> delta.getVersion() > nrtmClientVersionInfo.version())
.toList();
}


private void processDeltaRecord(final String records) {
final JSONObject jsonObject = new JSONObject(records);
Expand All @@ -176,11 +148,21 @@ private void processDeltaRecord(final String records) {
deltaPrimaryKey));
}

private void processFirstDeltaRecord(final String firstRecord, final String sessionId, final String source){
final Metadata metadata = getMetadata(firstRecord);
if (!metadata.sessionId().equals(sessionId)){
LOGGER.error("The session {} is not the same in the UNF and snapshot {}", metadata.sessionId(), sessionId);
truncateTables();
throw new IllegalArgumentException("The session is not the same in the UNF and snapshot");
}
nrtm4ClientInfoRepository.saveDeltaFileVersion(source, metadata.version, metadata.sessionId());
}

private static Metadata getMetadata(final String records) {
final JSONObject jsonObject = new JSONObject(records);
final int deltatVersion = jsonObject.getInt("version");
final int deltaVersion = jsonObject.getInt("version");
final String deltaSessionId = jsonObject.getString("session_id");
return new Metadata(deltatVersion, deltaSessionId);
return new Metadata(deltaVersion, deltaSessionId);
}

private record Metadata(int version, String sessionId) {}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,12 @@ public SnapshotMirrorImporter(final NrtmRestClient nrtmRestClient,

}

@Override
public void doImport(final String source, final UpdateNotificationFileResponse updateNotificationFile){

public void doImport(final String source,
final String sessionId,
final UpdateNotificationFileResponse.NrtmFileLink snapshot){

final Stopwatch stopwatch = Stopwatch.createStarted();
final UpdateNotificationFileResponse.NrtmFileLink snapshot = updateNotificationFile.getSnapshot();

if (snapshot == null){
LOGGER.error("Snapshot cannot be null in the notification file");
Expand All @@ -67,7 +69,7 @@ public void doImport(final String source, final UpdateNotificationFileResponse u
try {
GzipDecompressor.decompressRecords(
payload,
firstRecord -> processMetadata(source, updateNotificationFile, firstRecord),
firstRecord -> processMetadata(source, sessionId, firstRecord),
recordBatches -> persistBatches(recordBatches, processedCount)
);
} catch (IllegalArgumentException ex){
Expand Down Expand Up @@ -142,12 +144,12 @@ public static RpslObject getPlaceholderPersonObject() {
}


private void processMetadata(final String source, final UpdateNotificationFileResponse updateNotificationFile,
private void processMetadata(final String source, final String updateNotificationSessionId,
final String firstRecord) throws IllegalArgumentException {
final JSONObject jsonObject = new JSONObject(firstRecord);
final int version = jsonObject.getInt("version");
final String sessionId = jsonObject.getString("session_id");
if (!sessionId.equals(updateNotificationFile.getSessionID())) {
if (!sessionId.equals(updateNotificationSessionId)) {
LOGGER.error("The session is not the same in the UNF and snapshot");
truncateTables();
throw new IllegalArgumentException("The session is not the same in the UNF and snapshot");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,14 +121,29 @@ public void processFile(){

if (nrtmClientLastVersionInfo == null){
LOGGER.info("There is no existing Snapshot for the source {}", source);
snapshotImporter.doImport(source, updateNotificationFile);
snapshotImporter.doImport(source, updateNotificationFile.getSessionID(), updateNotificationFile.getSnapshot());
}

deltaImporter.doImport(source, updateNotificationFile);
final List<UpdateNotificationFileResponse.NrtmFileLink> newDeltas = getNewDeltasFromNotificationFile(source, updateNotificationFile);
deltaImporter.doImport(source, updateNotificationFile.getSessionID(), newDeltas);
});

}

private List<UpdateNotificationFileResponse.NrtmFileLink> getNewDeltasFromNotificationFile(final String source,
final UpdateNotificationFileResponse updateNotificationFile) {
final NrtmClientVersionInfo nrtmClientVersionInfo = nrtm4ClientMirrorDao.getNrtmLastVersionInfoForDeltasPerSource(source);

if (nrtmClientVersionInfo == null){
return updateNotificationFile.getDeltas();
}

return updateNotificationFile.getDeltas()
.stream()
.filter(delta -> delta.getVersion() > nrtmClientVersionInfo.version())
.toList();
}

@Nullable
private UpdateNotificationFileResponse getUpdateNotificationFileResponse(final JWSObject jwsObjectParsed) {
try {
Expand Down

0 comments on commit cce80a5

Please sign in to comment.