Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: CN 2 BN protocol communications #518

Draft
wants to merge 12 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import com.hedera.block.server.config.ConfigInjectionModule;
import com.hedera.block.server.health.HealthInjectionModule;
import com.hedera.block.server.manager.BlockManagerInjectionModule;
import com.hedera.block.server.mediator.MediatorInjectionModule;
import com.hedera.block.server.metrics.MetricsInjectionModule;
import com.hedera.block.server.notifier.NotifierInjectionModule;
Expand All @@ -29,6 +30,7 @@
MetricsInjectionModule.class,
PbjInjectionModule.class,
VerificationInjectionModule.class,
BlockManagerInjectionModule.class
})
public interface BlockNodeAppInjectionComponent {
/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// SPDX-License-Identifier: Apache-2.0
package com.hedera.block.server.manager;

import com.hedera.pbj.runtime.io.buffer.Bytes;

/**
* POJO that holds information about a block while in process.
*/
public class BlockInfo {

private final long blockNumber;
private Bytes blockHash;
private final BlockStatus blockStatus;

public BlockInfo(long blockNumber) {
this.blockNumber = blockNumber;
this.blockStatus = new BlockStatus();
}

public long getBlockNumber() {
return blockNumber;
}

public Bytes getBlockHash() {
return blockHash;
}

public BlockStatus getBlockStatus() {
return blockStatus;
}

public void setBlockHash(Bytes blockHash) {
this.blockHash = blockHash;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// SPDX-License-Identifier: Apache-2.0
package com.hedera.block.server.manager;

import com.hedera.pbj.runtime.io.buffer.Bytes;

/**
* A simplified BlockManager interface that does not have "blockReceived" nor "blockStateUpdated".
*/
public interface BlockManager {

/**
* Called when we receive a "persistence" event for the given blockNumber.
*/
void blockPersisted(long blockNumber);

/**
* Called when we receive a "verified" event for the given blockNumber,
* with the newly computed blockHash.
*/
void blockVerified(long blockNumber, Bytes blockHash);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// SPDX-License-Identifier: Apache-2.0
package com.hedera.block.server.manager;

import com.hedera.block.server.notifier.Notifier;
import com.hedera.pbj.runtime.io.buffer.Bytes;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.inject.Inject;

/**
* A simplified BlockManager that:
* Creates BlockInfo entries on demand when blockPersisted or blockVerified arrives.
* If either skipPersistence or skipVerification is true, ignores all events entirely (no ACKs).
* Acks blocks only in strictly increasing order
* the ACK is delayed until it is that block's turn.
* consecutive ACKs for all blocks that are both persisted and verified.
*/
public class BlockManagerImpl implements BlockManager {

private final Map<Long, BlockInfo> blockInfoMap = new ConcurrentHashMap<>();
private volatile long lastAcknowledgedBlockNumber = -1;
private final Notifier notifier;
private final boolean skipAcknowledgement;

/**
* Constructor. If either skipPersistence or skipVerification is true,
* we ignore all events (no ACKs ever sent).
*/
@Inject
public BlockManagerImpl(@NonNull Notifier notifier, boolean skipAcknowledgement) {
this.notifier = notifier;
this.skipAcknowledgement = skipAcknowledgement;
}

@Override
public void blockPersisted(long blockNumber) {
if (skipAcknowledgement) {
return;
}

BlockInfo info = blockInfoMap.computeIfAbsent(blockNumber, BlockInfo::new);
info.getBlockStatus().setPersisted();

attemptAcks();
}

@Override
public void blockVerified(long blockNumber, Bytes blockHash) {
if (skipAcknowledgement) {
return;
}

BlockInfo info = blockInfoMap.computeIfAbsent(blockNumber, BlockInfo::new);
info.setBlockHash(blockHash);
info.getBlockStatus().setVerified();

attemptAcks();
}

private void attemptAcks() {
// Temporarily if lastAcknowledgedBlockNumber is -1, we get the first block in the map
if (lastAcknowledgedBlockNumber == -1) {
lastAcknowledgedBlockNumber = blockInfoMap.keySet().stream().min(Long::compareTo).orElse(-1L);
}

// Keep ACK-ing starting from the next block in sequence
while (true) {
long nextBlock = lastAcknowledgedBlockNumber + 1;
BlockInfo info = blockInfoMap.get(nextBlock);

if (info == null) {
// We have no info for the next expected block yet.
// => We can't ACK the "next" block. Stop.
break;
}

// Check if this block is fully ready
if (!info.getBlockStatus().isPersisted() ||
!info.getBlockStatus().isVerified()) {
// Not fully ready. Stop.
break;
}

// Attempt to mark ACK sent (CAS-protected to avoid duplicates)
if (info.getBlockStatus().markAckSentIfNotAlready()) {
// We "won" the race; we do the actual ACK
notifier.sendAck(nextBlock, info.getBlockHash(), false);

// Update last acknowledged
lastAcknowledgedBlockNumber = nextBlock;

// Remove from map if desired (so we don't waste memory)
blockInfoMap.remove(nextBlock);
}

// Loop again in case the next block is also ready.
// This can ACK multiple consecutive blocks if they are all
// persisted & verified in order.
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// SPDX-License-Identifier: Apache-2.0
package com.hedera.block.server.manager;

import com.hedera.block.server.notifier.Notifier;
import com.hedera.block.server.persistence.storage.PersistenceStorageConfig;
import com.hedera.block.server.verification.VerificationConfig;
import dagger.Module;
import dagger.Provides;
import edu.umd.cs.findbugs.annotations.NonNull;
import javax.inject.Singleton;

@Module
public interface BlockManagerInjectionModule {

@Provides
@Singleton
static BlockManager provideBlockManager(
@NonNull final Notifier notifier,
@NonNull final PersistenceStorageConfig persistenceStorageConfig,
@NonNull final VerificationConfig verificationConfig) {

boolean skipPersistence = persistenceStorageConfig.type().equals(PersistenceStorageConfig.StorageType.NO_OP);
boolean skipVerification = verificationConfig.type().equals(VerificationConfig.VerificationServiceType.NO_OP);

return new BlockManagerImpl(notifier, skipPersistence | skipVerification);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// SPDX-License-Identifier: Apache-2.0
package com.hedera.block.server.manager;

import java.util.concurrent.atomic.AtomicBoolean;

/**
* A simple block status object that:
* - Uses volatile booleans for 'persisted' and 'verified' (set once, from false to true).
* - Uses an AtomicBoolean 'ackSent' for lock-free compare-and-set if a block has been ACKed.
*/
public class BlockStatus {

private volatile boolean persisted = false;
private volatile boolean verified = false;

/** Flag that tracks whether this block has been ACKed. */
private final AtomicBoolean ackSent = new AtomicBoolean(false);

/**
* Marks this block as persisted.
* This is a "set once" transition from false -> true (idempotent if called again).
*/
public void setPersisted() {
persisted = true;
}

/**
* Marks this block as verified.
* This is a "set once" transition from false -> true (idempotent if called again).
*/
public void setVerified() {
verified = true;
}

/**
* Atomically marks this block as ACKed if not already done.
*
* @return true if this call successfully set 'ackSent' from false -> true,
* false if 'ackSent' was already true.
*/
public boolean markAckSentIfNotAlready() {
return ackSent.compareAndSet(false, true);
}

/**
* @return true if persisted = true
*/
public boolean isPersisted() {
return persisted;
}

/**
* @return true if verified = true
*/
public boolean isVerified() {
return verified;
}

/**
* @return true if 'ackSent' has already been set
*/
public boolean isAckSent() {
return ackSent.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@
import com.hedera.block.server.mediator.StreamMediator;
import com.hedera.hapi.block.BlockItemUnparsed;
import com.hedera.hapi.block.PublishStreamResponse;
import com.hedera.pbj.runtime.io.buffer.Bytes;
import java.util.List;

/**
* Use this interface to combine the contract for streaming block items with the contract to be
* notified of critical system events.
*/
public interface Notifier extends StreamMediator<List<BlockItemUnparsed>, PublishStreamResponse>, Notifiable {}
public interface Notifier extends StreamMediator<List<BlockItemUnparsed>, PublishStreamResponse>, Notifiable {
void sendAck(long blockNumber, Bytes blockHash, boolean isDuplicated);
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,20 @@
import static com.hedera.block.server.metrics.BlockNodeMetricTypes.Counter.SuccessfulPubStreamResp;
import static com.hedera.block.server.metrics.BlockNodeMetricTypes.Gauge.NotifierRingBufferRemainingCapacity;
import static com.hedera.block.server.metrics.BlockNodeMetricTypes.Gauge.Producers;
import static com.hedera.block.server.producer.Util.getFakeHash;
import static java.lang.System.Logger.Level.ERROR;

import com.hedera.block.server.config.BlockNodeContext;
import com.hedera.block.server.mediator.SubscriptionHandlerBase;
import com.hedera.block.server.metrics.MetricsService;
import com.hedera.block.server.service.ServiceStatus;
import com.hedera.hapi.block.Acknowledgement;
import com.hedera.hapi.block.BlockAcknowledgement;
import com.hedera.hapi.block.BlockItemUnparsed;
import com.hedera.hapi.block.EndOfStream;
import com.hedera.hapi.block.ItemAcknowledgement;
import com.hedera.hapi.block.PublishStreamResponse;
import com.hedera.hapi.block.PublishStreamResponseCode;
import com.hedera.pbj.runtime.io.buffer.Bytes;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.security.NoSuchAlgorithmException;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import javax.inject.Inject;
Expand Down Expand Up @@ -94,34 +92,7 @@ public void notifyUnrecoverableError() {
* upstream producers
*/
@Override
public void publish(@NonNull List<BlockItemUnparsed> blockItems) {

try {
if (serviceStatus.isRunning()) {
// Publish the block item to the subscribers
final var publishStreamResponse = PublishStreamResponse.newBuilder()
.acknowledgement(buildAck(blockItems))
.build();
ringBuffer.publishEvent((event, sequence) -> event.set(publishStreamResponse));

metricsService.get(NotifierRingBufferRemainingCapacity).set(ringBuffer.remainingCapacity());
metricsService.get(SuccessfulPubStreamResp).increment();
} else {
LOGGER.log(ERROR, "Notifier is not running.");
}

} catch (NoSuchAlgorithmException e) {

// Stop the server
serviceStatus.stopRunning(getClass().getName());

final var errorResponse = buildErrorStreamResponse();
LOGGER.log(ERROR, "Error calculating hash: ", e);

// Send an error response to all the producers.
ringBuffer.publishEvent((event, sequence) -> event.set(errorResponse));
}
}
public void publish(@NonNull List<BlockItemUnparsed> blockItems) {}

/**
* Builds an error stream response.
Expand All @@ -137,20 +108,31 @@ static PublishStreamResponse buildErrorStreamResponse() {
return PublishStreamResponse.newBuilder().status(endOfStream).build();
}

/**
* Protected method meant for testing. Builds an Acknowledgement for the block item.
*
* @param blockItems the block items to build the Acknowledgement for
* @return the Acknowledgement for the block item
* @throws NoSuchAlgorithmException if the hash algorithm is not supported
*/
@NonNull
Acknowledgement buildAck(@NonNull final List<BlockItemUnparsed> blockItems) throws NoSuchAlgorithmException {
final ItemAcknowledgement itemAck = ItemAcknowledgement.newBuilder()
// TODO: Replace this with a real hash generator
.itemsHash(Bytes.wrap(getFakeHash(blockItems)))
Acknowledgement buildAck(
@NonNull final Bytes blockHash, @NonNull final long blockNumber, @NonNull boolean alreadyExists) {
final BlockAcknowledgement blockAcknowledgement = BlockAcknowledgement.newBuilder()
.blockRootHash(blockHash)
.blockNumber(blockNumber)
.blockAlreadyExists(alreadyExists)
.build();

return Acknowledgement.newBuilder().itemAck(itemAck).build();
return Acknowledgement.newBuilder().blockAck(blockAcknowledgement).build();
}

public void sendAck(long blockNumber, Bytes blockHash, boolean duplicated) {
if (serviceStatus.isRunning()) {
// Publish the block item to the subscribers
final var publishStreamResponse = PublishStreamResponse.newBuilder()
.acknowledgement(buildAck(blockHash, blockNumber, duplicated))
.build();

ringBuffer.publishEvent((event, sequence) -> event.set(publishStreamResponse));

metricsService.get(NotifierRingBufferRemainingCapacity).set(ringBuffer.remainingCapacity());
metricsService.get(SuccessfulPubStreamResp).increment();
} else {
LOGGER.log(ERROR, "Service is not running. Notifier skipping sendAck");
}
}
}
Loading
Loading