From 890181694038d5668929d6b2f08c18d1f8c77449 Mon Sep 17 00:00:00 2001 From: Yo Eight Date: Mon, 1 Jul 2024 21:05:06 -0400 Subject: [PATCH] Support GHA. (#5) --- .github/scripts/setup.ps1 | 38 +++++++++ .github/workflows/pr.yml | 81 +++++++++++++++++++ .run/clippy.run.xml | 19 +++++ geth-client/src/next/driver.rs | 12 +-- geth-common/src/lib.rs | 12 +-- geth-consensus/src/state_machine.rs | 6 +- geth-domain/build.rs | 2 +- geth-domain/src/append_propose.rs | 6 +- geth-domain/src/binary.rs | 2 +- geth-domain/src/index/lsm.rs | 6 +- geth-domain/src/index/mem_table.rs | 10 ++- geth-domain/src/iter/events.rs | 4 +- geth-domain/src/lib.rs | 4 +- geth-mikoshi/src/storage/fs.rs | 9 ++- geth-mikoshi/src/storage/in_mem.rs | 14 +++- geth-mikoshi/src/wal/chunks/chunk.rs | 12 +-- geth-mikoshi/src/wal/chunks/mod.rs | 10 +-- geth-node/src/names.rs | 10 +-- geth-node/src/process/storage/reader.rs | 4 +- geth-node/src/process/storage/writer.rs | 8 +- .../src/process/subscriptions/programmable.rs | 61 +++++++------- geth-node/src/services/mod.rs | 6 +- geth-repl/src/main.rs | 2 +- 23 files changed, 232 insertions(+), 106 deletions(-) create mode 100644 .github/scripts/setup.ps1 create mode 100644 .github/workflows/pr.yml create mode 100644 .run/clippy.run.xml diff --git a/.github/scripts/setup.ps1 b/.github/scripts/setup.ps1 new file mode 100644 index 0000000..bf85d83 --- /dev/null +++ b/.github/scripts/setup.ps1 @@ -0,0 +1,38 @@ +param ( + [Parameter(Mandatory = $true)] + [string]$Runner +) + +$global:ProtoCompilerVersion = "27.2" + +function Get-Link +{ + $baseUrl = "https://github.com/protocolbuffers/protobuf/releases/download/v$global:ProtoCompilerVersion/protoc" + switch ($Runner) + { + "ubuntu-latest" { + return "$baseUrl-$global:ProtoCompilerVersion-linux-x86_64.zip" + } + + "windows-latest" { + return "$baseUrl-$global:ProtoCompilerVersion-win64.zip" + } + + "macos-latest" { + return "$baseUrl-$global:ProtoCompilerVersion-osx-universal_binary.zip" + } + + default { + throw "unsupported runner '{$Runner}'" + } + } +} + +$link = Get-Link +$filename = "protobuf-compiler" +Invoke-WebRequest -Uri $link -OutFile "$filename.zip" +[System.IO.Compression.ZipFile]::ExtractToDirectory("$filename.zip", $filename) + +$protobufBinDir = (Get-Item -Path "protobuf-compiler/bin").FullName + +"protoc_bin=$protobufBinDir" | Out-File -FilePath $env:GITHUB_OUTPUT -Append \ No newline at end of file diff --git a/.github/workflows/pr.yml b/.github/workflows/pr.yml new file mode 100644 index 0000000..7fec6fa --- /dev/null +++ b/.github/workflows/pr.yml @@ -0,0 +1,81 @@ +name: Rust + +on: + push: + branches: + - "master" + + pull_request: + branches: + - "master" + + schedule: + - cron: '0 3 * * 0' # Every sunday at 3am UTC. + +env: + CARGO_TERM_COLOR: always + +jobs: + build: + continue-on-error: true + strategy: + fail-fast: false + matrix: + os: + - ubuntu-latest + - windows-latest + - macos-latest + + runs-on: ${{ matrix.os }} + + steps: + - uses: actions/checkout@v4 + + - name: Download and configure dependencies + id: configure + shell: pwsh + run: .github/scripts/setup.ps1 -Runner ${{ matrix.os }} + + - name: Update system path + shell: pwsh + run: | + "${{ steps.configure.outputs.protoc_bin }}" | Out-File -FilePath $env:GITHUB_PATH -Append + + - name: Build + run: cargo check --all-targets + + - name: Run tests + run: cargo test --all-targets + + linting: + needs: build + strategy: + fail-fast: false + matrix: + os: + - ubuntu-latest + + runs-on: ${{ matrix.os }} + steps: + - uses: actions/checkout@v4 + + - name: Download and configure dependencies + id: configure + shell: pwsh + run: .github/scripts/setup.ps1 -Runner ${{ matrix.os }} + + - name: Update system path + shell: pwsh + run: | + "${{ steps.configure.outputs.protoc_bin }}" | Out-File -FilePath $env:GITHUB_PATH -Append + + - name: Install tools + run: | + rustup component add clippy + rustup component add rustfmt + + - name: Lint + run: cargo clippy --all-features -- -D warnings + + - name: Format + run: cargo fmt -- --check \ No newline at end of file diff --git a/.run/clippy.run.xml b/.run/clippy.run.xml new file mode 100644 index 0000000..843817d --- /dev/null +++ b/.run/clippy.run.xml @@ -0,0 +1,19 @@ + + + + \ No newline at end of file diff --git a/geth-client/src/next/driver.rs b/geth-client/src/next/driver.rs index e419f88..801ff1a 100644 --- a/geth-client/src/next/driver.rs +++ b/geth-client/src/next/driver.rs @@ -2,10 +2,10 @@ use std::collections::HashMap; use uuid::Uuid; -use geth_common::{EndPoint, Operation, OperationIn, OperationOut}; use geth_common::generated::next::protocol; +use geth_common::{EndPoint, Operation, OperationIn, OperationOut}; -use crate::next::{Command, connect_to_node, Connection, Mailbox}; +use crate::next::{connect_to_node, Command, Connection, Mailbox}; pub struct Driver { endpoint: EndPoint, @@ -45,13 +45,7 @@ impl Driver { let correlation = command.operation_in.correlation; let operation_in: protocol::OperationIn = command.operation_in.clone().into(); - if self - .connection - .as_ref() - .unwrap() - .send(operation_in.into()) - .is_ok() - { + if self.connection.as_ref().unwrap().send(operation_in).is_ok() { self.registry.insert(correlation, command); return Ok(()); } diff --git a/geth-common/src/lib.rs b/geth-common/src/lib.rs index 980aaff..07956cf 100644 --- a/geth-common/src/lib.rs +++ b/geth-common/src/lib.rs @@ -130,16 +130,8 @@ pub struct OperationOut { impl OperationOut { pub fn is_streaming(&self) -> bool { match &self.reply { - Reply::SubscriptionEvent(event) => match event { - SubscriptionEvent::Unsubscribed(_) => false, - _ => true, - }, - - Reply::StreamRead(event) => match event { - StreamRead::EventAppeared(_) => true, - _ => false, - }, - + Reply::SubscriptionEvent(event) => !matches!(event, SubscriptionEvent::Unsubscribed(_)), + Reply::StreamRead(event) => matches!(event, StreamRead::EventAppeared(_)), _ => false, } } diff --git a/geth-consensus/src/state_machine.rs b/geth-consensus/src/state_machine.rs index 65dccab..baf97d2 100644 --- a/geth-consensus/src/state_machine.rs +++ b/geth-consensus/src/state_machine.rs @@ -94,11 +94,7 @@ where granted = true; } } else { - let last_entry_id = if let Some(last) = storage.last_entry() { - last - } else { - EntryId::default() - }; + let last_entry_id = storage.last_entry().unwrap_or_default(); granted = self.voted_for == Some(args.candidate_id.clone()) && last_entry_id.index <= args.last_log_index diff --git a/geth-domain/build.rs b/geth-domain/build.rs index 6caee2f..ec2fe40 100644 --- a/geth-domain/build.rs +++ b/geth-domain/build.rs @@ -2,7 +2,7 @@ use std::io; fn main() -> io::Result<()> { prost_build::Config::new() - .bytes(&[".RecordedEvent.data", ".RecordedEvent.metadata"]) + .bytes([".RecordedEvent.data", ".RecordedEvent.metadata"]) .format(true) .default_package_filename("model") .compile_protos(&["protos/events.proto"], &["protos/"])?; diff --git a/geth-domain/src/append_propose.rs b/geth-domain/src/append_propose.rs index ccdb83e..cb1430d 100644 --- a/geth-domain/src/append_propose.rs +++ b/geth-domain/src/append_propose.rs @@ -41,7 +41,7 @@ where fn next(&mut self) -> Option { let propose = self.events.next()?; - let event = crate::binary::events::RecordedEvent { + let event = crate::binary::models::RecordedEvent { id: propose.id.into(), revision: self.revision, stream_name: self.stream_name.clone(), @@ -52,8 +52,8 @@ where }; self.revision += 1; - let event = crate::binary::events::Events { - event: Some(crate::binary::events::Event::RecordedEvent(event)), + let event = crate::binary::models::Events { + event: Some(crate::binary::models::Event::RecordedEvent(event)), }; event.encode(&mut self.buffer).unwrap(); diff --git a/geth-domain/src/binary.rs b/geth-domain/src/binary.rs index 54e3237..08b8d00 100644 --- a/geth-domain/src/binary.rs +++ b/geth-domain/src/binary.rs @@ -1,4 +1,4 @@ -pub mod events { +pub mod models { use uuid::Uuid; pub use events::Event; diff --git a/geth-domain/src/index/lsm.rs b/geth-domain/src/index/lsm.rs index 46f9ff7..e190f48 100644 --- a/geth-domain/src/index/lsm.rs +++ b/geth-domain/src/index/lsm.rs @@ -11,7 +11,7 @@ use geth_mikoshi::hashing::mikoshi_hash; use geth_mikoshi::storage::{FileId, Storage}; use geth_mikoshi::wal::{WALRef, WriteAheadLog}; -use crate::binary::events::Event; +use crate::binary::models::Event; use crate::index::block::BlockEntry; use crate::index::mem_table::MemTable; use crate::index::ss_table::SsTable; @@ -22,7 +22,7 @@ pub const LSM_DEFAULT_MEM_TABLE_SIZE: usize = 4_096; pub const LSM_BASE_SSTABLE_BLOCK_COUNT: usize = 4; pub fn sst_table_block_count_limit(level: u8) -> usize { - 2 ^ (level as usize) * LSM_BASE_SSTABLE_BLOCK_COUNT + (2 ^ (level as usize)) * LSM_BASE_SSTABLE_BLOCK_COUNT } #[derive(Debug, Clone, Copy)] @@ -180,7 +180,7 @@ where pub fn ss_table_first(&self) -> Option> { let state = self.state.read().unwrap(); let ts = state.levels.get(&0)?; - ts.get(0).cloned() + ts.front().cloned() } pub fn put_values(&self, values: V) -> io::Result<()> diff --git a/geth-domain/src/index/mem_table.rs b/geth-domain/src/index/mem_table.rs index eb434c5..afaef4e 100644 --- a/geth-domain/src/index/mem_table.rs +++ b/geth-domain/src/index/mem_table.rs @@ -1,9 +1,11 @@ -use crate::index::block::BlockEntry; +use std::cmp::Ordering; +use std::collections::BTreeMap; use bytes::{Buf, BufMut, BytesMut}; + use geth_common::{Direction, Revision}; -use std::cmp::Ordering; -use std::collections::BTreeMap; + +use crate::index::block::BlockEntry; pub const MEM_TABLE_ENTRY_SIZE: usize = 16; @@ -39,7 +41,7 @@ impl MemTable { } pub fn scan(&self, key: u64, direction: Direction, start: Revision, count: usize) -> Scan { - let buffer = self.inner.get(&key).map(|s| s.clone()).unwrap_or_default(); + let buffer = self.inner.get(&key).cloned().unwrap_or_default(); Scan::new(key, buffer, direction, start, count) } diff --git a/geth-domain/src/iter/events.rs b/geth-domain/src/iter/events.rs index 63f73ff..7bcb0d2 100644 --- a/geth-domain/src/iter/events.rs +++ b/geth-domain/src/iter/events.rs @@ -4,7 +4,7 @@ use geth_common::IteratorIO; use geth_mikoshi::wal::entries::EntryIter; use geth_mikoshi::wal::WriteAheadLog; -use crate::binary::events::Event; +use crate::binary::models::Event; use crate::parse_event; pub struct EventIter { @@ -28,6 +28,8 @@ impl IteratorIO for EventIter { if let Event::RecordedEvent(event) = event.event.unwrap() { return Ok(Some((item.position, crate::RecordedEvent::from(event)))); } + + continue; } return Ok(None); diff --git a/geth-domain/src/lib.rs b/geth-domain/src/lib.rs index 5caa115..d440f37 100644 --- a/geth-domain/src/lib.rs +++ b/geth-domain/src/lib.rs @@ -8,7 +8,7 @@ use uuid::Uuid; pub use index::{Lsm, LsmSettings}; pub use crate::append_propose::AppendProposes; -use crate::binary::events::Events; +use crate::binary::models::Events; mod append_propose; pub mod binary; @@ -39,7 +39,7 @@ pub struct RecordedEvent { } impl RecordedEvent { - pub fn from(inner: binary::events::RecordedEvent) -> RecordedEvent { + pub fn from(inner: binary::models::RecordedEvent) -> RecordedEvent { Self { id: inner.id.into(), revision: inner.revision, diff --git a/geth-mikoshi/src/storage/fs.rs b/geth-mikoshi/src/storage/fs.rs index e3bc879..0699ff9 100644 --- a/geth-mikoshi/src/storage/fs.rs +++ b/geth-mikoshi/src/storage/fs.rs @@ -1,6 +1,3 @@ -use crate::constants::CHUNK_SIZE; -use crate::storage::{FileCategory, FileId, Storage}; -use bytes::{Bytes, BytesMut}; use std::collections::HashMap; use std::fs::{read_dir, File, OpenOptions}; use std::io::{self, ErrorKind}; @@ -11,6 +8,11 @@ use std::os::windows::fs::FileExt; use std::path::{Path, PathBuf}; use std::sync::{Arc, Mutex}; +use bytes::{Bytes, BytesMut}; + +use crate::constants::CHUNK_SIZE; +use crate::storage::{FileCategory, FileId, Storage}; + #[derive(Clone)] pub struct FileSystemStorage { root: PathBuf, @@ -55,6 +57,7 @@ impl FileSystemStorage { .write(true) .read(true) .create(true) + .truncate(false) .open(path)?; Ok(file) diff --git a/geth-mikoshi/src/storage/in_mem.rs b/geth-mikoshi/src/storage/in_mem.rs index bcad549..69c5793 100644 --- a/geth-mikoshi/src/storage/in_mem.rs +++ b/geth-mikoshi/src/storage/in_mem.rs @@ -1,10 +1,12 @@ -use crate::constants::CHUNK_SIZE; -use crate::storage::{FileCategory, FileId, Storage}; -use bytes::{Buf, BufMut, Bytes, BytesMut}; use std::collections::HashMap; use std::io; use std::sync::{Arc, Mutex}; +use bytes::{Buf, BufMut, Bytes, BytesMut}; + +use crate::constants::CHUNK_SIZE; +use crate::storage::{FileCategory, FileId, Storage}; + #[derive(Clone)] pub struct InMemoryStorage { inner: Arc>>, @@ -12,6 +14,12 @@ pub struct InMemoryStorage { impl InMemoryStorage { pub fn new() -> Self { + InMemoryStorage::default() + } +} + +impl Default for InMemoryStorage { + fn default() -> Self { Self { inner: Arc::new(Mutex::new(Default::default())), } diff --git a/geth-mikoshi/src/wal/chunks/chunk.rs b/geth-mikoshi/src/wal/chunks/chunk.rs index fd5a5a4..64018e8 100644 --- a/geth-mikoshi/src/wal/chunks/chunk.rs +++ b/geth-mikoshi/src/wal/chunks/chunk.rs @@ -1,11 +1,13 @@ +use std::cmp::Ordering; + +use nom::bytes::complete::{tag, take_till1}; +use nom::IResult; +use uuid::Uuid; + use crate::constants::{CHUNK_HEADER_SIZE, CHUNK_SIZE}; use crate::storage::FileId; use crate::wal::chunks::footer::ChunkFooter; use crate::wal::chunks::header::ChunkHeader; -use nom::bytes::complete::{tag, take_till1}; -use nom::IResult; -use std::cmp::Ordering; -use uuid::Uuid; #[derive(Debug, Copy, Clone)] pub struct ChunkInfo { @@ -21,7 +23,7 @@ impl PartialEq for ChunkInfo { impl PartialOrd for ChunkInfo { fn partial_cmp(&self, other: &Self) -> Option { - self.seq_num.partial_cmp(&other.seq_num) + Some(self.cmp(other)) } } diff --git a/geth-mikoshi/src/wal/chunks/mod.rs b/geth-mikoshi/src/wal/chunks/mod.rs index 1d90707..2a00794 100644 --- a/geth-mikoshi/src/wal/chunks/mod.rs +++ b/geth-mikoshi/src/wal/chunks/mod.rs @@ -55,13 +55,9 @@ impl ChunkBasedWAL { } fn find_chunk(&self, logical_position: u64) -> Option<&Chunk> { - for chunk in &self.chunks { - if chunk.contains_log_position(logical_position) { - return Some(chunk); - } - } - - None + self.chunks + .iter() + .find(|&chunk| chunk.contains_log_position(logical_position)) } } diff --git a/geth-node/src/names.rs b/geth-node/src/names.rs index 0028699..a52142d 100644 --- a/geth-node/src/names.rs +++ b/geth-node/src/names.rs @@ -1,11 +1,11 @@ #![allow(dead_code)] pub mod streams { - pub static ALL: &'static str = "$all"; - pub static GLOBALS: &'static str = "$globals"; - pub static SYSTEM: &'static str = "$system"; + pub static ALL: &str = "$all"; + pub static GLOBALS: &str = "$globals"; + pub static SYSTEM: &str = "$system"; } pub mod types { - pub static STREAM_DELETED: &'static str = "$stream-deleted"; - pub static EVENTS_WRITTEN: &'static str = "$events-written"; + pub static STREAM_DELETED: &str = "$stream-deleted"; + pub static EVENTS_WRITTEN: &str = "$events-written"; } diff --git a/geth-node/src/process/storage/reader.rs b/geth-node/src/process/storage/reader.rs index a7aa040..24dab05 100644 --- a/geth-node/src/process/storage/reader.rs +++ b/geth-node/src/process/storage/reader.rs @@ -5,7 +5,7 @@ use tokio::task::spawn_blocking; use uuid::Uuid; use geth_common::{IteratorIO, Position, Record}; -use geth_domain::binary::events::Event; +use geth_domain::binary::models::Event; use geth_domain::RecordedEvent; use geth_mikoshi::wal::{WALRef, WriteAheadLog}; use geth_mikoshi::{storage::Storage, MikoshiStream}; @@ -99,7 +99,7 @@ where r#type: event.class, stream_name: event.stream_name, revision: event.revision, - data: event.data.into(), + data: event.data, position: Position(record.position), }; diff --git a/geth-node/src/process/storage/writer.rs b/geth-node/src/process/storage/writer.rs index c636dad..215432c 100644 --- a/geth-node/src/process/storage/writer.rs +++ b/geth-node/src/process/storage/writer.rs @@ -101,14 +101,14 @@ where } let created = Utc::now().timestamp(); - let event = geth_domain::binary::events::StreamDeleted { + let event = geth_domain::binary::models::StreamDeleted { stream_name: params.stream_name.clone(), revision: current_revision.next_revision(), created, }; - let event = geth_domain::binary::events::Events { - event: Some(geth_domain::binary::events::Event::StreamDeleted(event)), + let event = geth_domain::binary::models::Events { + event: Some(geth_domain::binary::models::Event::StreamDeleted(event)), }; event.encode(&mut self.buffer).unwrap(); @@ -151,7 +151,7 @@ where }; thread::spawn(move || { - while let Some(msg) = mailbox.recv().ok() { + while let Ok(msg) = mailbox.recv() { match msg { Msg::Append(params, resp) => { let result = internal.append(params); diff --git a/geth-node/src/process/subscriptions/programmable.rs b/geth-node/src/process/subscriptions/programmable.rs index 163a3d5..7d20776 100644 --- a/geth-node/src/process/subscriptions/programmable.rs +++ b/geth-node/src/process/subscriptions/programmable.rs @@ -95,39 +95,34 @@ impl PyroValue for EventRecord { fn serialize(self) -> eyre::Result { let record = self.0; - let mut props = Vec::new(); - - props.push(Prop { - label: Some("id".to_string()), - val: RuntimeValue::string(record.id.to_string()), - }); - - props.push(Prop { - label: Some("stream_name".to_string()), - val: RuntimeValue::string(record.stream_name), - }); - - props.push(Prop { - label: Some("event_type".to_string()), - val: RuntimeValue::string(record.r#type), - }); - - props.push(Prop { - label: Some("event_revision".to_string()), - val: RuntimeValue::Literal(Literal::Integer(record.revision as i64)), - }); - - props.push(Prop { - label: Some("position".to_string()), - val: RuntimeValue::Literal(Literal::Integer(record.position.raw() as i64)), - }); - - let value = serde_json::from_slice::(record.data.as_ref())?; - - props.push(Prop { - label: Some("payload".to_string()), - val: from_json_to_pyro_runtime_value(value)?, - }); + let props = vec![ + Prop { + label: Some("id".to_string()), + val: RuntimeValue::string(record.id.to_string()), + }, + Prop { + label: Some("stream_name".to_string()), + val: RuntimeValue::string(record.stream_name), + }, + Prop { + label: Some("event_type".to_string()), + val: RuntimeValue::string(record.r#type), + }, + Prop { + label: Some("event_revision".to_string()), + val: RuntimeValue::Literal(Literal::Integer(record.revision as i64)), + }, + Prop { + label: Some("position".to_string()), + val: RuntimeValue::Literal(Literal::Integer(record.position.raw() as i64)), + }, + Prop { + label: Some("payload".to_string()), + val: from_json_to_pyro_runtime_value(serde_json::from_slice::( + record.data.as_ref(), + )?)?, + }, + ]; Ok(RuntimeValue::Record(pyro_core::ast::Record { props })) } diff --git a/geth-node/src/services/mod.rs b/geth-node/src/services/mod.rs index 6830538..466b6d2 100644 --- a/geth-node/src/services/mod.rs +++ b/geth-node/src/services/mod.rs @@ -46,12 +46,10 @@ where C: Client + Send + 'static, S: Storage + Sync + Send + 'static, { - let mut inner = vec![]; - - inner.push(Service { + let inner = vec![Service { name: "indexing", handle: tokio::spawn(index::indexing(client, index, sub_client)), - }); + }]; Services { inner } } diff --git a/geth-repl/src/main.rs b/geth-repl/src/main.rs index 7c33ab9..16dc23c 100644 --- a/geth-repl/src/main.rs +++ b/geth-repl/src/main.rs @@ -17,7 +17,7 @@ use geth_common::{ EndPoint, ExpectedRevision, GetProgramError, IteratorIO, Position, ProgramObtained, Propose, Record, Revision, SubscriptionEvent, WriteResult, }; -use geth_domain::binary::events::Event; +use geth_domain::binary::models::Event; use geth_domain::{parse_event, parse_event_io, AppendProposes, Lsm, LsmSettings, RecordedEvent}; use geth_mikoshi::hashing::mikoshi_hash; use geth_mikoshi::storage::{FileSystemStorage, Storage};