Skip to content

Commit

Permalink
feat: refactor tcp stream
Browse files Browse the repository at this point in the history
  • Loading branch information
erayarslan committed Dec 31, 2023
1 parent 471449f commit 42b18a7
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 10 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/dcp.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ jobs:

- name: Test DCP
run: |
cargo test --package dcp-rs --test integration_test -- --exact --nocapture
cargo test --release --package dcp-rs --test integration_test -- --exact --nocapture
env:
RUST_LOG: debug
CB_VERSION: ${{ matrix.version }}
6 changes: 2 additions & 4 deletions dcp/src/io/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::dcp_io::consts::PacketCallback;
use crate::dcp_io::packet::Packet;
use std::collections::HashMap;
use std::io;
use std::io::{BufReader, Write};
use std::io::Write;
use std::net::TcpStream;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Mutex;
Expand Down Expand Up @@ -51,11 +51,9 @@ impl Client {

pub fn start(&self) -> io::Result<()> {
let tcp_stream = self.tcp_stream.lock().unwrap().try_clone()?;
let mut reader: BufReader<&TcpStream> = BufReader::new(&tcp_stream);

self.running.store(true, Ordering::Relaxed);
while self.running.load(Ordering::Relaxed) {
let packet = Packet::from_buffer(&mut reader);
let packet = Packet::from_buffer(&tcp_stream);
if packet.is_err() {
self.running.store(false, Ordering::Relaxed);
return Err(io::Error::new(
Expand Down
10 changes: 5 additions & 5 deletions dcp/src/io/packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::dcp_io::utils::{
use byteorder::{BigEndian, ByteOrder, WriteBytesExt};
use std::fmt::{Display, Formatter};
use std::io;
use std::io::{BufReader, Read, Write};
use std::io::{Read, Write};
use std::net::TcpStream;

pub struct BarrierFrame {}
Expand Down Expand Up @@ -130,7 +130,7 @@ impl Default for Packet {
}

impl Packet {
pub fn from_buffer(reader: &mut BufReader<&TcpStream>) -> io::Result<Self> {
pub fn from_buffer(mut reader: &TcpStream) -> io::Result<Self> {
let mut packet: Packet = Default::default();

let mut header_buf = [0u8; 24];
Expand Down Expand Up @@ -158,11 +158,11 @@ impl Packet {

packet.command = header_buf[1];
packet.datatype = header_buf[5];
packet.opaque = BigEndian::read_u32(&mut header_buf[12..]);
packet.cas = BigEndian::read_u64(&mut header_buf[16..]);
packet.opaque = BigEndian::read_u32(&header_buf[12..]);
packet.cas = BigEndian::read_u64(&header_buf[16..]);

let ext_len = header_buf[4];
let mut key_len = BigEndian::read_u16(&mut header_buf[2..]) as i32;
let mut key_len = BigEndian::read_u16(&header_buf[2..]) as i32;
let mut frames_len: usize = 0;

if pkt_magic == CMD_MAGIC_REQ_EXT || pkt_magic == CMD_MAGIC_RES_EXT {
Expand Down

0 comments on commit 42b18a7

Please sign in to comment.