Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run tests on GH actions + release speed improvements #28

Merged
merged 17 commits into from
Aug 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
on:
push:
branches: [ main ]
pull_request:
branches:
- main

name: CI
env:
RUSTFLAGS: -D warnings
CARGO_TERM_COLOR: always

jobs:
build:
name: Test
strategy:
fail-fast: false
matrix:
include:
- target: x86_64-apple-darwin
os: macos-latest
- target: x86_64-unknown-linux-gnu
os: ubuntu-latest

runs-on: ${{ matrix.os }}

steps:
- uses: actions/checkout@v3

- uses: rui314/setup-mold@v1

- name: Install Rust stable
uses: dtolnay/rust-toolchain@stable
with:
target: ${{ matrix.target }}

- name: Install cargo-nextest
uses: taiki-e/install-action@v2
with:
tool: cargo-nextest

- uses: Swatinem/rust-cache@v2
with:
cache-on-failure: true

- name: Test with latest nextest release
run: cargo nextest run --workspace --target ${{ matrix.target }}
16 changes: 13 additions & 3 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ on:
tags:
- v[0-9]+.*

env:
RUSTFLAGS: -D warnings
CARGO_TERM_COLOR: always

jobs:
create-release:
runs-on: ubuntu-latest
Expand All @@ -32,12 +36,18 @@ jobs:
runs-on: ${{ matrix.os }}
steps:
- uses: actions/checkout@v3
- name: Install cross-compilation tools
uses: taiki-e/setup-cross-toolchain-action@v1

- uses: rui314/setup-mold@v1

- uses: taiki-e/setup-cross-toolchain-action@v1
with:
target: ${{ matrix.target }}
if: startsWith(matrix.os, 'ubuntu')
- uses: rui314/setup-mold@v1

- uses: Swatinem/rust-cache@v2
with:
cache-on-failure: true

- uses: taiki-e/upload-rust-binary-action@v1
with:
bin: corrosion
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 7 additions & 1 deletion crates/corro-agent/src/api/peer.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::cmp;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::ops::RangeInclusive;
use std::sync::Arc;

Expand Down Expand Up @@ -273,7 +274,12 @@ async fn build_quinn_client_config(config: &GossipConfig) -> eyre::Result<quinn:

pub async fn gossip_client_endpoint(config: &GossipConfig) -> eyre::Result<quinn::Endpoint> {
let client_config = build_quinn_client_config(config).await?;
let mut client = quinn::Endpoint::client("[::]:0".parse()?)?;

let client_bind_addr = match config.bind_addr {
SocketAddr::V4(_) => "0.0.0.0:0".parse()?,
SocketAddr::V6(_) => "[::]:0".parse()?,
};
let mut client = quinn::Endpoint::client(client_bind_addr)?;

client.set_default_client_config(client_config);
Ok(client)
Expand Down
27 changes: 27 additions & 0 deletions crates/corro-api-types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,33 @@ impl ColumnType {
_ => return None,
})
}

pub fn from_str(s: &str) -> Option<Self> {
Some(match s {
"INTEGER" => Self::Integer,
"REAL" => Self::Float,
"TEXT" => Self::Text,
"BLOB" => Self::Blob,
_ => return None,
})
}
}

impl FromSql for ColumnType {
fn column_result(value: ValueRef<'_>) -> rusqlite::types::FromSqlResult<Self> {
match value {
ValueRef::Text(s) => Ok(match String::from_utf8_lossy(s).as_ref() {
"INTEGER" => Self::Integer,
"REAL" => Self::Float,
"TEXT" => Self::Text,
"BLOB" => Self::Blob,
_ => {
return Err(FromSqlError::InvalidType);
}
}),
_ => Err(FromSqlError::InvalidType),
}
}
}

#[derive(Debug, Default, Clone, Serialize, Deserialize, PartialEq)]
Expand Down
5 changes: 0 additions & 5 deletions crates/corro-types/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,10 +273,5 @@ pub enum LogFormat {
#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(rename_all = "kebab-case")]
pub struct ConsulConfig {
#[serde(default)]
pub extra_services_columns: Vec<String>,
#[serde(default)]
pub extra_statements: Vec<String>,

pub client: consul_client::Config,
}
1 change: 1 addition & 0 deletions crates/corrosion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ corro-admin = { path = "../corro-admin" }
corro-agent = { path = "../corro-agent" }
corro-client = { path = "../corro-client" }
corro-types = { path = "../corro-types" }
corro-api-types = { path = "../corro-api-types" }
crc32fast = { workspace = true }
eyre = { workspace = true }
fallible-iterator = { workspace = true }
Expand Down
129 changes: 74 additions & 55 deletions crates/corrosion/src/command/consul/sync.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use consul_client::{AgentCheck, AgentService, Client};
use corro_api_types::ColumnType;
use corro_client::CorrosionClient;
use corro_types::{api::Statement, config::ConsulConfig};
use metrics::{histogram, increment_counter};
Expand All @@ -21,8 +22,6 @@ pub async fn run<P: AsRef<Path>>(
api_addr: SocketAddr,
db_path: P,
) -> eyre::Result<()> {
tracing_subscriber::fmt::init();

let (mut tripwire, tripwire_worker) = tripwire::Tripwire::new_signals();

let node: &'static str = Box::leak(
Expand All @@ -37,9 +36,7 @@ pub async fn run<P: AsRef<Path>>(

info!("Setting up corrosion for consul sync");
setup(
&corrosion,
config.extra_services_columns.as_slice(),
config.extra_statements.as_slice(),
&corrosion
)
.await?;

Expand Down Expand Up @@ -120,12 +117,9 @@ pub async fn run<P: AsRef<Path>>(

async fn setup(
corrosion: &CorrosionClient,
extra_services_columns: &[String],
extra_statements: &[String],
) -> eyre::Result<()> {
let mut conn = corrosion.pool().get().await?;
{
let mut conn = corrosion.pool().get().await?;

let tx = conn.transaction()?;

info!("Creating internal tables");
Expand All @@ -145,51 +139,51 @@ async fn setup(
tx.commit()?;
}
info!("Ensuring schema...");
corrosion
.schema(&build_schema(extra_services_columns, extra_statements))
.await?;
Ok(())
}

fn build_schema(extra_services_columns: &[String], extra_statements: &[String]) -> Vec<Statement> {
let extra = extra_services_columns.join(",");
let mut statements = vec![
Statement::Simple(format!("CREATE TABLE consul_services (
node TEXT NOT NULL,
id TEXT NOT NULL,
name TEXT NOT NULL DEFAULT '',
tags TEXT NOT NULL DEFAULT '[]',
meta TEXT NOT NULL DEFAULT '{{}}',
port INTEGER NOT NULL DEFAULT 0,
address TEXT NOT NULL DEFAULT '',
updated_at INTEGER NOT NULL DEFAULT 0,

{}

PRIMARY KEY (node, id)
) WITHOUT ROWID;", if extra.is_empty() { String::new() } else {format!("{extra},")})),
Statement::Simple("CREATE INDEX consul_services_node_id_updated_at ON consul_services (node, id, updated_at);".to_string()),

Statement::Simple("CREATE TABLE consul_checks (
node TEXT NOT NULL,
id TEXT NOT NULL,
service_id TEXT NOT NULL DEFAULT '',
service_name TEXT NOT NULL DEFAULT '',
name TEXT NOT NULL DEFAULT '',
status TEXT NOT NULL DEFAULT '',
output TEXT NOT NULL DEFAULT '',
updated_at INTEGER NOT NULL DEFAULT 0,
PRIMARY KEY (node, id)
) WITHOUT ROWID;".to_string()),
Statement::Simple("CREATE INDEX consul_checks_node_id_updated_at ON consul_checks (node, id, updated_at);".to_string()),
Statement::Simple("CREATE INDEX consul_checks_node_service_id ON consul_checks (node, service_id);".to_string()),
struct ColumnInfo {
name: String,
kind: corro_api_types::ColumnType
}

let col_infos: Vec<ColumnInfo> = conn.prepare("PRAGMA table_info(consul_services)")?.query_map([], |row| Ok(ColumnInfo { name: row.get(1)?, kind: row.get(2)? })).map_err(|e| eyre::eyre!("could not query consul_services' table_info: {e}"))?.collect::<Result<Vec<_>, _>>()?;

let expected_cols = [
("node", ColumnType::Text),
("id", ColumnType::Text),
("name", ColumnType::Text),
("tags", ColumnType::Text),
("meta", ColumnType::Text),
("port", ColumnType::Integer),
("address", ColumnType::Text),
("updated_at", ColumnType::Integer),
];

for s in extra_statements {
statements.push(Statement::Simple(s.clone()));
for (name, kind) in expected_cols {
if col_infos.iter().find(|info| info.name == name && info.kind == kind ).is_none() {
eyre::bail!("expected a column consul_services.{name} w/ type {kind:?}");
}
}

statements
let col_infos: Vec<ColumnInfo> = conn.prepare("PRAGMA table_info(consul_checks)")?.query_map([], |row| Ok(ColumnInfo { name: row.get(1)?, kind: row.get(2)? })).map_err(|e| eyre::eyre!("could not query consul_checks' table_info: {e}"))?.collect::<Result<Vec<_>, _>>()?;

let expected_cols = [
("node", ColumnType::Text),
("id", ColumnType::Text),
("service_id", ColumnType::Text),
("service_name", ColumnType::Text),
("name", ColumnType::Text),
("status", ColumnType::Text),
("output", ColumnType::Text),
("updated_at", ColumnType::Integer),
];

for (name, kind) in expected_cols {
if col_infos.iter().find(|info| info.name == name && info.kind == kind ).is_none() {
eyre::bail!("expected a column consul_checks.{name} w/ type {kind:?}");
}
}

Ok(())
}

#[derive(Debug, Serialize, Deserialize)]
Expand Down Expand Up @@ -584,10 +578,39 @@ mod tests {
_ = tracing_subscriber::fmt::try_init();
let (tripwire, tripwire_worker, tripwire_tx) = Tripwire::new_simple();

let ta1 = launch_test_agent(|conf| conf.build(), tripwire.clone()).await?;
let tmpdir = tempfile::TempDir::new()?;
tokio::fs::write(tmpdir.path().join("consul.sql"), b"
CREATE TABLE consul_services (
node TEXT NOT NULL,
id TEXT NOT NULL,
name TEXT NOT NULL DEFAULT '',
tags TEXT NOT NULL DEFAULT '[]',
meta TEXT NOT NULL DEFAULT '{}',
port INTEGER NOT NULL DEFAULT 0,
address TEXT NOT NULL DEFAULT '',
updated_at INTEGER NOT NULL DEFAULT 0,
app_id INTEGER AS (CAST(JSON_EXTRACT(meta, '$.app_id') AS INTEGER)),

PRIMARY KEY (node, id)
);

CREATE TABLE consul_checks (
node TEXT NOT NULL,
id TEXT NOT NULL,
service_id TEXT NOT NULL DEFAULT '',
service_name TEXT NOT NULL DEFAULT '',
name TEXT NOT NULL DEFAULT '',
status TEXT NOT NULL DEFAULT '',
output TEXT NOT NULL DEFAULT '',
updated_at INTEGER NOT NULL DEFAULT 0,
PRIMARY KEY (node, id)
);
").await?;

let ta1 = launch_test_agent(|conf| conf.add_schema_path(tmpdir.path().display().to_string()).build(), tripwire.clone()).await?;
let ta2 = launch_test_agent(
|conf| {
conf.bootstrap(vec![ta1.agent.gossip_addr().to_string()])
conf.bootstrap(vec![ta1.agent.gossip_addr().to_string()]).add_schema_path(tmpdir.path().display().to_string())
.build()
},
tripwire.clone(),
Expand All @@ -598,8 +621,6 @@ mod tests {

setup(
&ta1_client,
&["app_id INTEGER AS (CAST (JSON_EXTRACT (meta, '$.app_id') AS INTEGER))".into()],
&[],
)
.await?;

Expand Down Expand Up @@ -655,8 +676,6 @@ mod tests {

setup(
&ta2_client,
&["app_id INTEGER AS (CAST (JSON_EXTRACT (meta, '$.app_id') AS INTEGER))".into()],
&[],
)
.await?;

Expand Down
Loading