Skip to content

Commit

Permalink
Merge branch 'master' into feature/transfer-reliability-test
Browse files Browse the repository at this point in the history
# Conflicts:
#	tests/integration/multinodes.nim
#	tests/integration/nodeprocess.nim
  • Loading branch information
benbierens committed Dec 16, 2024
2 parents b895c41 + bef1160 commit 0d6fcee
Show file tree
Hide file tree
Showing 28 changed files with 1,106 additions and 186 deletions.
35 changes: 17 additions & 18 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ env:
cache_nonce: 0 # Allows for easily busting actions/cache caches
nim_version: pinned


concurrency:
group: ${{ github.workflow }}-${{ github.ref || github.run_id }}
cancel-in-progress: true
Expand All @@ -23,23 +22,23 @@ jobs:
matrix: ${{ steps.matrix.outputs.matrix }}
cache_nonce: ${{ env.cache_nonce }}
steps:
- name: Compute matrix
id: matrix
uses: fabiocaccamo/create-matrix-action@v4
with:
matrix: |
os {linux}, cpu {amd64}, builder {ubuntu-20.04}, tests {unittest}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail}
os {linux}, cpu {amd64}, builder {ubuntu-20.04}, tests {contract}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail}
os {linux}, cpu {amd64}, builder {ubuntu-20.04}, tests {integration}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail}
os {linux}, cpu {amd64}, builder {ubuntu-20.04}, tests {tools}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail}
os {macos}, cpu {amd64}, builder {macos-13}, tests {unittest}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail}
os {macos}, cpu {amd64}, builder {macos-13}, tests {contract}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail}
os {macos}, cpu {amd64}, builder {macos-13}, tests {integration}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail}
os {macos}, cpu {amd64}, builder {macos-13}, tests {tools}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail}
os {windows}, cpu {amd64}, builder {windows-latest}, tests {unittest}, nim_version {${{ env.nim_version }}}, shell {msys2}
os {windows}, cpu {amd64}, builder {windows-latest}, tests {contract}, nim_version {${{ env.nim_version }}}, shell {msys2}
os {windows}, cpu {amd64}, builder {windows-latest}, tests {integration}, nim_version {${{ env.nim_version }}}, shell {msys2}
os {windows}, cpu {amd64}, builder {windows-latest}, tests {tools}, nim_version {${{ env.nim_version }}}, shell {msys2}
- name: Compute matrix
id: matrix
uses: fabiocaccamo/create-matrix-action@v4
with:
matrix: |
os {linux}, cpu {amd64}, builder {ubuntu-20.04}, tests {unittest}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail}
os {linux}, cpu {amd64}, builder {ubuntu-20.04}, tests {contract}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail}
os {linux}, cpu {amd64}, builder {ubuntu-20.04}, tests {integration}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail}
os {linux}, cpu {amd64}, builder {ubuntu-20.04}, tests {tools}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail}
os {macos}, cpu {amd64}, builder {macos-13}, tests {unittest}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail}
os {macos}, cpu {amd64}, builder {macos-13}, tests {contract}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail}
os {macos}, cpu {amd64}, builder {macos-13}, tests {integration}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail}
os {macos}, cpu {amd64}, builder {macos-13}, tests {tools}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail}
os {windows}, cpu {amd64}, builder {windows-latest}, tests {unittest}, nim_version {${{ env.nim_version }}}, shell {msys2}
os {windows}, cpu {amd64}, builder {windows-latest}, tests {contract}, nim_version {${{ env.nim_version }}}, shell {msys2}
os {windows}, cpu {amd64}, builder {windows-latest}, tests {integration}, nim_version {${{ env.nim_version }}}, shell {msys2}
os {windows}, cpu {amd64}, builder {windows-latest}, tests {tools}, nim_version {${{ env.nim_version }}}, shell {msys2}
build:
needs: matrix
Expand Down
5 changes: 4 additions & 1 deletion .github/workflows/nim-matrix.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ jobs:
uses: fabiocaccamo/create-matrix-action@v4
with:
matrix: |
os {linux}, cpu {amd64}, builder {ubuntu-20.04}, tests {all}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail}
os {linux}, cpu {amd64}, builder {ubuntu-20.04}, tests {unittest}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail}
os {linux}, cpu {amd64}, builder {ubuntu-20.04}, tests {contract}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail}
os {linux}, cpu {amd64}, builder {ubuntu-20.04}, tests {integration}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail}
os {linux}, cpu {amd64}, builder {ubuntu-20.04}, tests {tools}, nim_version {${{ env.nim_version }}}, shell {bash --noprofile --norc -e -o pipefail}
build:
needs: matrix
Expand Down
3 changes: 3 additions & 0 deletions build.nims
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ task testContracts, "Build & run Codex Contract tests":
task testIntegration, "Run integration tests":
buildBinary "codex", params = "-d:chronicles_runtime_filtering -d:chronicles_log_level=TRACE -d:codex_enable_proof_failures=true"
test "testIntegration"
# use params to enable logging from the integration test executable
# test "testIntegration", params = "-d:chronicles_sinks=textlines[notimestamps,stdout],textlines[dynamic] " &
# "-d:chronicles_enabled_topics:integration:TRACE"

task build, "build codex binary":
codexTask()
Expand Down
51 changes: 26 additions & 25 deletions codex/blockexchange/engine/advertiser.nim
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import ../protobuf/presence
import ../peers

import ../../utils
import ../../utils/exceptions
import ../../utils/trackedfutures
import ../../discovery
import ../../stores/blockstore
import ../../logutils
Expand All @@ -42,7 +44,7 @@ type

advertiseLocalStoreLoop*: Future[void] # Advertise loop task handle
advertiseQueue*: AsyncQueue[Cid] # Advertise queue
advertiseTasks*: seq[Future[void]] # Advertise tasks
trackedFutures*: TrackedFutures # Advertise tasks futures

advertiseLocalStoreLoopSleep: Duration # Advertise loop sleep
inFlightAdvReqs*: Table[Cid, Future[void]] # Inflight advertise requests
Expand Down Expand Up @@ -70,20 +72,26 @@ proc advertiseBlock(b: Advertiser, cid: Cid) {.async.} =
await b.addCidToQueue(cid)
await b.addCidToQueue(manifest.treeCid)

proc advertiseLocalStoreLoop(b: Advertiser) {.async.} =
proc advertiseLocalStoreLoop(b: Advertiser) {.async: (raises: []).} =
while b.advertiserRunning:
if cids =? await b.localStore.listBlocks(blockType = BlockType.Manifest):
trace "Advertiser begins iterating blocks..."
for c in cids:
if cid =? await c:
await b.advertiseBlock(cid)
trace "Advertiser iterating blocks finished."
try:
if cids =? await b.localStore.listBlocks(blockType = BlockType.Manifest):
trace "Advertiser begins iterating blocks..."
for c in cids:
if cid =? await c:
await b.advertiseBlock(cid)
trace "Advertiser iterating blocks finished."

await sleepAsync(b.advertiseLocalStoreLoopSleep)

await sleepAsync(b.advertiseLocalStoreLoopSleep)
except CancelledError:
break # do not propagate as advertiseLocalStoreLoop was asyncSpawned
except CatchableError as e:
error "failed to advertise blocks in local store", error = e.msgDetail

info "Exiting advertise task loop"

proc processQueueLoop(b: Advertiser) {.async.} =
proc processQueueLoop(b: Advertiser) {.async: (raises: []).} =
while b.advertiserRunning:
try:
let
Expand Down Expand Up @@ -129,9 +137,11 @@ proc start*(b: Advertiser) {.async.} =

b.advertiserRunning = true
for i in 0..<b.concurrentAdvReqs:
b.advertiseTasks.add(processQueueLoop(b))
let fut = b.processQueueLoop().track(b)
asyncSpawn fut

b.advertiseLocalStoreLoop = advertiseLocalStoreLoop(b)
b.advertiseLocalStoreLoop = advertiseLocalStoreLoop(b).track(b)
asyncSpawn b.advertiseLocalStoreLoop

proc stop*(b: Advertiser) {.async.} =
## Stop the advertiser
Expand All @@ -145,19 +155,9 @@ proc stop*(b: Advertiser) {.async.} =
b.advertiserRunning = false
# Stop incoming tasks from callback and localStore loop
b.localStore.onBlockStored = CidCallback.none
if not b.advertiseLocalStoreLoop.isNil and not b.advertiseLocalStoreLoop.finished:
trace "Awaiting advertise loop to stop"
await b.advertiseLocalStoreLoop.cancelAndWait()
trace "Advertise loop stopped"

# Clear up remaining tasks
for task in b.advertiseTasks:
if not task.finished:
trace "Awaiting advertise task to stop"
await task.cancelAndWait()
trace "Advertise task stopped"

trace "Advertiser stopped"
trace "Stopping advertise loop and tasks"
await b.trackedFutures.cancelTracked()
trace "Advertiser loop and tasks stopped"

proc new*(
T: type Advertiser,
Expand All @@ -173,5 +173,6 @@ proc new*(
discovery: discovery,
concurrentAdvReqs: concurrentAdvReqs,
advertiseQueue: newAsyncQueue[Cid](concurrentAdvReqs),
trackedFutures: TrackedFutures.new(),
inFlightAdvReqs: initTable[Cid, Future[void]](),
advertiseLocalStoreLoopSleep: advertiseLocalStoreLoopSleep)
44 changes: 24 additions & 20 deletions codex/blockexchange/engine/discovery.nim
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import ../network
import ../peers

import ../../utils
import ../../utils/trackedfutures
import ../../discovery
import ../../stores/blockstore
import ../../logutils
Expand Down Expand Up @@ -50,12 +51,12 @@ type
concurrentDiscReqs: int # Concurrent discovery requests
discoveryLoop*: Future[void] # Discovery loop task handle
discoveryQueue*: AsyncQueue[Cid] # Discovery queue
discoveryTasks*: seq[Future[void]] # Discovery tasks
trackedFutures*: TrackedFutures # Tracked Discovery tasks futures
minPeersPerBlock*: int # Max number of peers with block
discoveryLoopSleep: Duration # Discovery loop sleep
inFlightDiscReqs*: Table[Cid, Future[seq[SignedPeerRecord]]] # Inflight discovery requests

proc discoveryQueueLoop(b: DiscoveryEngine) {.async.} =
proc discoveryQueueLoop(b: DiscoveryEngine) {.async: (raises: []).} =
while b.discEngineRunning:
for cid in toSeq(b.pendingBlocks.wantListBlockCids):
try:
Expand All @@ -66,13 +67,15 @@ proc discoveryQueueLoop(b: DiscoveryEngine) {.async.} =
except CatchableError as exc:
warn "Exception in discovery loop", exc = exc.msg

logScope:
sleep = b.discoveryLoopSleep
wanted = b.pendingBlocks.len

await sleepAsync(b.discoveryLoopSleep)
try:
logScope:
sleep = b.discoveryLoopSleep
wanted = b.pendingBlocks.len
await sleepAsync(b.discoveryLoopSleep)
except CancelledError:
discard # do not propagate as discoveryQueueLoop was asyncSpawned

proc discoveryTaskLoop(b: DiscoveryEngine) {.async.} =
proc discoveryTaskLoop(b: DiscoveryEngine) {.async: (raises: []).} =
## Run discovery tasks
##

Expand Down Expand Up @@ -116,6 +119,11 @@ proc discoveryTaskLoop(b: DiscoveryEngine) {.async.} =
return
except CatchableError as exc:
warn "Exception in discovery task runner", exc = exc.msg
except Exception as e:
# Raised by b.discovery.removeProvider somehow...
# This should not be catchable, and we should never get here. Therefore,
# raise a Defect.
raiseAssert "Exception when removing provider"

info "Exiting discovery task runner"

Expand All @@ -139,9 +147,11 @@ proc start*(b: DiscoveryEngine) {.async.} =

b.discEngineRunning = true
for i in 0..<b.concurrentDiscReqs:
b.discoveryTasks.add(discoveryTaskLoop(b))
let fut = b.discoveryTaskLoop().track(b)
asyncSpawn fut

b.discoveryLoop = discoveryQueueLoop(b)
b.discoveryLoop = b.discoveryQueueLoop().track(b)
asyncSpawn b.discoveryLoop

proc stop*(b: DiscoveryEngine) {.async.} =
## Stop the discovery engine
Expand All @@ -153,16 +163,9 @@ proc stop*(b: DiscoveryEngine) {.async.} =
return

b.discEngineRunning = false
for task in b.discoveryTasks:
if not task.finished:
trace "Awaiting discovery task to stop"
await task.cancelAndWait()
trace "Discovery task stopped"

if not b.discoveryLoop.isNil and not b.discoveryLoop.finished:
trace "Awaiting discovery loop to stop"
await b.discoveryLoop.cancelAndWait()
trace "Discovery loop stopped"
trace "Stopping discovery loop and tasks"
await b.trackedFutures.cancelTracked()
trace "Discovery loop and tasks stopped"

trace "Discovery engine stopped"

Expand All @@ -187,6 +190,7 @@ proc new*(
pendingBlocks: pendingBlocks,
concurrentDiscReqs: concurrentDiscReqs,
discoveryQueue: newAsyncQueue[Cid](concurrentDiscReqs),
trackedFutures: TrackedFutures.new(),
inFlightDiscReqs: initTable[Cid, Future[seq[SignedPeerRecord]]](),
discoveryLoopSleep: discoveryLoopSleep,
minPeersPerBlock: minPeersPerBlock)
29 changes: 17 additions & 12 deletions codex/blockexchange/engine/engine.nim
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import pkg/questionable
import ../../stores/blockstore
import ../../blocktype
import ../../utils
import ../../utils/exceptions
import ../../utils/trackedfutures
import ../../merkletree
import ../../logutils
import ../../manifest
Expand Down Expand Up @@ -70,7 +72,7 @@ type
peers*: PeerCtxStore # Peers we're currently actively exchanging with
taskQueue*: AsyncHeapQueue[BlockExcPeerCtx] # Peers we're currently processing tasks for
concurrentTasks: int # Number of concurrent peers we're serving at any given time
blockexcTasks: seq[Future[void]] # Future to control blockexc task
trackedFutures: TrackedFutures # Tracks futures of blockexc tasks
blockexcRunning: bool # Indicates if the blockexc task is running
pendingBlocks*: PendingBlocksManager # Blocks we're awaiting to be resolved
peersPerRequest: int # Max number of peers to request from
Expand All @@ -88,7 +90,7 @@ type
proc scheduleTask(b: BlockExcEngine, task: BlockExcPeerCtx): bool {.gcsafe} =
b.taskQueue.pushOrUpdateNoWait(task).isOk()

proc blockexcTaskRunner(b: BlockExcEngine): Future[void] {.gcsafe.}
proc blockexcTaskRunner(b: BlockExcEngine) {.async: (raises: []).}

proc start*(b: BlockExcEngine) {.async.} =
## Start the blockexc task
Expand All @@ -104,7 +106,8 @@ proc start*(b: BlockExcEngine) {.async.} =

b.blockexcRunning = true
for i in 0..<b.concurrentTasks:
b.blockexcTasks.add(blockexcTaskRunner(b))
let fut = b.blockexcTaskRunner().track(b)
asyncSpawn fut

proc stop*(b: BlockExcEngine) {.async.} =
## Stop the blockexc blockexc
Expand All @@ -119,11 +122,7 @@ proc stop*(b: BlockExcEngine) {.async.} =
return

b.blockexcRunning = false
for task in b.blockexcTasks:
if not task.finished:
trace "Awaiting task to stop"
await task.cancelAndWait()
trace "Task stopped"
await b.trackedFutures.cancelTracked()

trace "NetworkStore stopped"

Expand Down Expand Up @@ -565,16 +564,21 @@ proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} =

task.peerWants.keepItIf(it.address notin successAddresses)

proc blockexcTaskRunner(b: BlockExcEngine) {.async.} =
proc blockexcTaskRunner(b: BlockExcEngine) {.async: (raises: []).} =
## process tasks
##

trace "Starting blockexc task runner"
while b.blockexcRunning:
let
peerCtx = await b.taskQueue.pop()
try:
let
peerCtx = await b.taskQueue.pop()

await b.taskHandler(peerCtx)
await b.taskHandler(peerCtx)
except CancelledError:
break # do not propagate as blockexcTaskRunner was asyncSpawned
except CatchableError as e:
error "error running block exchange task", error = e.msgDetail

info "Exiting blockexc task runner"

Expand Down Expand Up @@ -603,6 +607,7 @@ proc new*(
network: network,
wallet: wallet,
concurrentTasks: concurrentTasks,
trackedFutures: TrackedFutures.new(),
taskQueue: newAsyncHeapQueue[BlockExcPeerCtx](DefaultTaskQueueSize),
discovery: discovery,
advertiser: advertiser,
Expand Down
2 changes: 2 additions & 0 deletions codex/contracts.nim
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ import contracts/requests
import contracts/marketplace
import contracts/market
import contracts/interactions
import contracts/provider

export requests
export marketplace
export market
export interactions
export provider
Loading

0 comments on commit 0d6fcee

Please sign in to comment.