Skip to content

Commit

Permalink
feat: Create initial Coordinator V2 service (#444)
Browse files Browse the repository at this point in the history
This PR adds the initial Coordinator V2 service, which acts as the main
driver for the Control Plane.

## Coordinator Overview
Coordinator V2 will exist as a new standalone service with it's primary
goal being ensuring the current registry configuration is mapped to the
system. It's core logic is just an infinite loop which reads the
registry, and sends necessary requests to the Block Streamer and Runner
services to synchronise that config.

## Block Streamer Changes
Some changes have been made to Block Streamer to enable the above:
1. `version` and `redis_stream` have been added to the proto so that
Coordinator can configure them.
2. Support for `ActionFunctionCall` has been added - Initially I thought
only `ActionAny` allowed, but the current registry has
`ActionFunctionCall` and therefore needs to be supported.
3. `last_published_block` is now written to Redis to enable "Start from
interruption"

## What's not been done
I wanted to limit the scope of this PR as it was starting to get big,
I'll address these tasks in follow up PRs:
- Provisioning - Coordinator should check the status of provisioning,
and act when the state isn't as expected. This can be done after
#426 is implemented.
- Retry recoverable errors - Any error will be propagated cause the
error to exit, this includes connection errors to the Block Streamer and
Runner services. As these are very likely to occur (across restarts) we
should retry these errors.
- Environment configuration - There are many hard coded values
(endpoints, registry contract, etc.) which should be configurable via
the environment.
- Logging and Metrics
  • Loading branch information
morgsmccauley committed Jan 11, 2024
1 parent 7655aef commit b76fac0
Show file tree
Hide file tree
Showing 15 changed files with 5,797 additions and 92 deletions.
8 changes: 8 additions & 0 deletions .github/workflows/block-streamer-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ jobs:
- uses: actions/checkout@v3
- name: Install Protoc
uses: arduino/setup-protoc@v2
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- name: Check
working-directory: ./block-streamer
run: cargo check
Expand All @@ -29,6 +31,8 @@ jobs:
- uses: actions/checkout@v3
- name: Install Protoc
uses: arduino/setup-protoc@v2
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- name: Test
working-directory: ./block-streamer
run: cargo test
Expand All @@ -41,6 +45,8 @@ jobs:
uses: actions/checkout@v3
- name: Install Protoc
uses: arduino/setup-protoc@v2
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- name: Install Rust
uses: actions-rs/toolchain@v1
with:
Expand All @@ -60,6 +66,8 @@ jobs:
uses: actions/checkout@v3
- name: Install Protoc
uses: arduino/setup-protoc@v2
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- name: Install Rust
uses: actions-rs/toolchain@v1
with:
Expand Down
81 changes: 81 additions & 0 deletions .github/workflows/coordinator-ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
name: Coordinator

on:
push:
branches: [ main ]
paths:
- "coordinator/**"
pull_request:
paths:
- "coordinator/**"

env:
CARGO_TERM_COLOR: always

jobs:
check:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Install Protoc
uses: arduino/setup-protoc@v2
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- name: Check
working-directory: ./coordinator
run: cargo check

test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Install Protoc
uses: arduino/setup-protoc@v2
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- name: Test
working-directory: ./coordinator
run: cargo test


format:
runs-on: ubuntu-20.04
steps:
- name: Checkout repository
uses: actions/checkout@v3
- name: Install Protoc
uses: arduino/setup-protoc@v2
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- name: Install Rust
uses: actions-rs/toolchain@v1
with:
toolchain: 1.70.0
override: true
profile: minimal
components: rustfmt
- name: Check formatting
working-directory: ./coordinator
run: |
cargo fmt -- --check
clippy:
runs-on: ubuntu-20.04
steps:
- name: Checkout repository
uses: actions/checkout@v3
- name: Install Protoc
uses: arduino/setup-protoc@v2
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
- name: Install Rust
uses: actions-rs/toolchain@v1
with:
toolchain: 1.70.0
override: true
profile: minimal
components: clippy
- name: Clippy check
working-directory: ./coordinator
run: |
cargo clippy
2 changes: 1 addition & 1 deletion block-streamer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ wildmatch = "2.1.1"

registry-types = { path = "../registry/types", features = ["near-primitives"] }

near-lake-framework = "0.7.4"
near-lake-framework = "=0.7.4"

[build-dependencies]
tonic-build = "0.10"
Expand Down
2 changes: 2 additions & 0 deletions block-streamer/examples/start_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
start_block_height: 106700000,
account_id: "morgs.near".to_string(),
function_name: "test".to_string(),
version: 0,
redis_stream: "morgs.near/test:block_stream".to_string(),
rule: Some(Rule::ActionAnyRule(ActionAnyRule {
affected_account_id: "social.near".to_string(),
status: Status::Success.into(),
Expand Down
69 changes: 38 additions & 31 deletions block-streamer/proto/block_streamer.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2,48 +2,52 @@ syntax = "proto3";

package blockstreamer;

// The BlockStreamer service provides RPCs to manage BlockStream instances.
// The BlockStreamer service provides RPCs to manage BlockStream instances
service BlockStreamer {
// Starts a new BlockStream process.
// Starts a new BlockStream process
rpc StartStream (StartStreamRequest) returns (StartStreamResponse);

// Stops an existing BlockStream process.
// Stops an existing BlockStream process
rpc StopStream (StopStreamRequest) returns (StopStreamResponse);

// Lists all current BlockStream processes.
// Lists all current BlockStream processes
rpc ListStreams (ListStreamsRequest) returns (ListStreamsResponse);
}

// Request message for starting a BlockStream.
// Request message for starting a BlockStream
message StartStreamRequest {
// Which block height to start from.
// Which block height to start from
uint64 start_block_height = 1;
// The account ID which the indexer is defined under
// Account ID which the indexer is defined under
string account_id = 2;
// The name of the indexer
// Name of the indexer
string function_name = 3;
// The filter rule to apply to incoming blocks
// Block height corresponding to the created/updated height of the indexer
uint64 version = 4;
// Key of Redis Stream to publish blocks to
string redis_stream = 5;
// Filter rule to apply to incoming blocks
oneof rule {
ActionAnyRule action_any_rule = 4;
ActionFunctionCallRule action_function_call_rule = 5;
ActionAnyRule action_any_rule = 6;
ActionFunctionCallRule action_function_call_rule = 7;
}
}

// Match any action against the specified account
message ActionAnyRule {
// The account ID pattern to match against
// Account ID pattern to match against
string affected_account_id = 1;
// The status of the action to match against
// Status of the action to match against
Status status = 2;
}

// Match a specific function call against the specified account
message ActionFunctionCallRule {
// The account ID pattern to match against
// Account ID pattern to match against
string affected_account_id = 1;
// The function name to match against
// Function name to match against
string function_name = 2;
// The status of the action to match against
// Status of the action to match against
Status status = 3;
}

Expand All @@ -54,40 +58,43 @@ enum Status {
STATUS_ANY = 3;
}

// Response message for starting a BlockStream.
// Response message for starting a BlockStream
message StartStreamResponse {
// ID or handle of the started BlockStream.
// ID or handle of the started BlockStream
string stream_id = 1;
}

// Request message for stopping a BlockStream.
// Request message for stopping a BlockStream
message StopStreamRequest {
// ID or handle of the BlockStream to stop.
// ID or handle of the BlockStream to stop
string stream_id = 1;
}

// Response message for stopping a BlockStream.
// Response message for stopping a BlockStream
message StopStreamResponse {
// Confirmation message or status.
// Confirmation message or status
string status = 1;
}

// Request message for listing BlockStreams.
// Request message for listing BlockStreams
message ListStreamsRequest {
// Optional filters or parameters for listing streams.
// Optional filters or parameters for listing streams
}

// Response message for listing BlockStreams.
// Response message for listing BlockStreams
message ListStreamsResponse {
// List of active BlockStreams.
// List of active BlockStreams
repeated StreamInfo streams = 1;
}

// Information about a single BlockStream instance.
// Information about a single BlockStream instance
message StreamInfo {
// ID or handle of the BlockStream
string stream_id = 1;
int64 start_block_height = 2;
string indexer_name = 3;
string chain_id = 4;
string status = 5;
// Account ID of the indexer
string account_id = 3;
// Function name of the indexer
string function_name = 4;
// Block height corresponding to the created/updated height of the indexer
uint64 version = 5;
}
Loading

0 comments on commit b76fac0

Please sign in to comment.