Skip to content

Commit

Permalink
feat: integration test added
Browse files Browse the repository at this point in the history
  • Loading branch information
erayarslan committed Dec 24, 2023
1 parent 8b65f14 commit 471449f
Show file tree
Hide file tree
Showing 16 changed files with 561 additions and 161 deletions.
91 changes: 44 additions & 47 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
@@ -1,47 +1,44 @@
name: Build

on:
push:
branches:
- master

jobs:
build:
name: Build
runs-on: ${{ matrix.os }}
strategy:
matrix:
build: [ linux, macos, windows ]
include:
- build: linux
os: ubuntu-latest
rust: stable
- build: macos
os: macos-latest
rust: stable
- build: windows
os: windows-latest
rust: stable

steps:
- uses: actions/checkout@v3

- name: Install Rust ${{ matrix.rust }}
uses: dtolnay/rust-toolchain@stable
with:
toolchain: ${{ matrix.rust }}

- uses: Swatinem/rust-cache@v2

- name: Build
run: |
cargo build --verbose
rustfmt:
name: Rustfmt
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Install Rust
run: rustup update stable && rustup default stable && rustup component add rustfmt
- run: cargo fmt -- --check
name: Build

on:
push:
branches: [master]
pull_request:
branches: [master]
workflow_dispatch:

jobs:
build:
name: Build
runs-on: ${{ matrix.os }}
strategy:
matrix:
build: [ linux, macos, windows ]
include:
- build: linux
os: ubuntu-latest
rust: stable
- build: macos
os: macos-latest
rust: stable
- build: windows
os: windows-latest
rust: stable

steps:
- uses: actions/checkout@v3

- name: Install Rust ${{ matrix.rust }}
uses: dtolnay/rust-toolchain@stable
with:
toolchain: ${{ matrix.rust }}

- name: Rustfmt
run: |
cargo fmt -- --check
- uses: Swatinem/rust-cache@v2

- name: Build
run: |
cargo build --verbose
42 changes: 42 additions & 0 deletions .github/workflows/dcp.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
name: DCP

on:
push:
branches: [master]
pull_request:
branches: [master]
workflow_dispatch:

jobs:
build:
name: dcp
runs-on: ${{ matrix.os }}
strategy:
matrix:
version: [ "5.0.1", "5.1.3", "5.5.6", "6.0.5", "6.5.2", "6.6.6", "7.0.5", "7.1.6", "7.2.3" ]
build: [ linux ]
include:
- build: linux
os: ubuntu-latest
rust: stable

steps:
- uses: actions/checkout@v3

- name: Install Rust ${{ matrix.rust }}
uses: dtolnay/rust-toolchain@stable
with:
toolchain: ${{ matrix.rust }}

- name: Rustfmt
run: |
cargo fmt -- --check
- uses: Swatinem/rust-cache@v2

- name: Test DCP
run: |
cargo test --package dcp-rs --test integration_test -- --exact --nocapture
env:
RUST_LOG: debug
CB_VERSION: ${{ matrix.version }}
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
[workspace]
members = ["dcp"]
members = ["dcp"]
resolver = "2"
22 changes: 9 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,33 +9,29 @@ This repository contains Rust implementation of a Couchbase Database Change Prot
### Example

```rust
use dcp_rs::{Dcp, Config, DcpConfig, GroupConfig, ListenerCallback};
use dcp_rs::{Config, Dcp, DcpConfig, GroupConfig};

fn main() -> Result<(), std::io::Error> {
let config = Config {
hosts: vec![
"localhost:11210".to_string()
],
hosts: vec!["localhost:11210".to_string()],
username: "user".to_string(),
password: "123456".to_string(),
bucket: "dcp-test".to_string(),
scope_name: "_default".to_string(),
collection_names: vec![
"_default".to_string()
],
collection_names: vec!["_default".to_string()],
dcp: DcpConfig {
group: GroupConfig {
name: "group_name".to_string()
}
name: "group_name".to_string(),
},
},
};

let listener: ListenerCallback = |event| {
let dcp = Dcp::new(config)?;
dcp.add_listener(Box::new(|event| {
println!("event: {}", event);
};

let dcp = Dcp::new(config, listener);

Ok(())
}));
dcp.start()?;

return Ok(());
Expand Down
14 changes: 10 additions & 4 deletions dcp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,19 @@ keywords = ["dcp", "couchbase", "client"]
categories = ["database"]
description = "Couchbase DCP client library"
edition = "2021"
exclude = [
"tests/data/*",
"tests/scripts/*",
]
readme = "../README.md"

[dependencies]
byteorder = "1.5.0"
lazy_static = "1.4.0"
rand = "0.8.5"
log = "0.4.20"

[[test]]
name = "test"
path = "integration/main.rs"
harness = false
[dev-dependencies]
testcontainers = "0.15.0"
env_logger = { version = "0.10.1", features = [] }
chrono = { version = "0.4.31", features = [] }
10 changes: 5 additions & 5 deletions dcp/examples/dcp.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use dcp_rs::{Config, Dcp, DcpConfig, GroupConfig, ListenerCallback};
use dcp_rs::{Config, Dcp, DcpConfig, GroupConfig};

fn main() -> Result<(), std::io::Error> {
let config = Config {
Expand All @@ -15,12 +15,12 @@ fn main() -> Result<(), std::io::Error> {
},
};

let listener: ListenerCallback = |event| {
let dcp = Dcp::new(config)?;
dcp.add_listener(Box::new(|event| {
println!("event: {}", event);
};

let dcp = Dcp::new(config, listener);

Ok(())
}));
dcp.start()?;

return Ok(());
Expand Down
3 changes: 0 additions & 3 deletions dcp/integration/main.rs

This file was deleted.

55 changes: 48 additions & 7 deletions dcp/src/io/client.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,36 @@
use crate::dcp_io::consts::{ListenerCallback, PacketCallback};
use crate::dcp_io::consts::PacketCallback;
use crate::dcp_io::packet::Packet;
use std::collections::HashMap;
use std::io;
use std::io::{BufReader, Write};
use std::net::TcpStream;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Mutex;

pub struct Client {
tcp_stream: Mutex<TcpStream>,
opaque_map: Mutex<HashMap<u32, PacketCallback>>,
running: AtomicBool,
callback: Mutex<Option<PacketCallback>>,
finished: AtomicBool,
}

impl Client {
pub fn new(tcp_stream: TcpStream) -> Self {
Client {
tcp_stream: Mutex::new(tcp_stream),
opaque_map: Mutex::new(HashMap::new()),
running: AtomicBool::new(false),
callback: Mutex::new(None),
finished: AtomicBool::new(false),
}
}

pub fn add_listener(&self, callback: PacketCallback) {
let mut listener = self.callback.lock().unwrap();
*listener = Some(callback)
}

pub(crate) fn send(&self, packet: &Packet) -> io::Result<()> {
let buffer = packet.to_buffer()?;
self.tcp_stream.lock().unwrap().write_all(&buffer)?;
Expand All @@ -37,27 +49,56 @@ impl Client {
Ok(())
}

pub fn listen(&self, callback: ListenerCallback) -> io::Result<()> {
pub fn start(&self) -> io::Result<()> {
let tcp_stream = self.tcp_stream.lock().unwrap().try_clone()?;
let mut reader: BufReader<&TcpStream> = BufReader::new(&tcp_stream);

loop {
let packet = Packet::from_buffer(&mut reader)?;
self.running.store(true, Ordering::Relaxed);
while self.running.load(Ordering::Relaxed) {
let packet = Packet::from_buffer(&mut reader);
if packet.is_err() {
self.running.store(false, Ordering::Relaxed);
return Err(io::Error::new(
io::ErrorKind::Other,
"failed to read packet",
));
}

let packet = packet.unwrap();
log::debug!("packet command: {}", packet.command);

if packet.status != 0 {
self.running.store(false, Ordering::Relaxed);
return Err(io::Error::new(io::ErrorKind::Other, "got error response"));
}

if self.opaque_map.lock().unwrap().contains_key(&packet.opaque) {
let mut opaque_map = self.opaque_map.lock().unwrap();
let callback = opaque_map.remove(&packet.opaque).unwrap();
let mut opaque_callback = opaque_map.remove(&packet.opaque).unwrap();

callback(&packet);
opaque_callback(&packet).expect("opaque callback got error");
}

if packet.command == 0x57 || packet.command == 0x58 || packet.command == 0x59 {
callback(&packet);
let mut listener = self.callback.lock().unwrap();
if let Some(callback) = &mut *listener {
callback(&packet).expect("listener got error");
}
}
}

self.finished.store(true, Ordering::Relaxed);

Ok(())
}

pub fn stop(&self) {
if !self.running.load(Ordering::Relaxed) {
return;
}
self.running.store(false, Ordering::Relaxed);
while !self.finished.load(Ordering::Relaxed) {
std::thread::sleep(std::time::Duration::from_secs(1));
}
}
}
6 changes: 3 additions & 3 deletions dcp/src/io/consts.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use crate::dcp_io::packet::Packet;
use lazy_static::lazy_static;
use std::collections::HashMap;
use std::io;
use std::sync::Mutex;

pub type CmdMagic = u8;
pub type CmdCode = u8;
pub type StatusCode = u16;
pub type PacketCallback = fn(&Packet);
pub type ListenerCallback = fn(&Packet);

pub type PacketCallbackResult<T = ()> = io::Result<T>;
pub type PacketCallback = Box<dyn FnMut(&Packet) -> PacketCallbackResult + Send + Sync + 'static>;
pub static CMD_MAGIC_REQ: CmdMagic = 0x80;
pub static CMD_MAGIC_RES: CmdMagic = 0x81;
pub static CMD_MAGIC_SERVER_REQ: CmdMagic = 0x82;
Expand Down
Loading

0 comments on commit 471449f

Please sign in to comment.