From 64362a012d70c9a83687c39e4fc3825fca9d105a Mon Sep 17 00:00:00 2001 From: tison Date: Wed, 24 Jan 2024 00:51:22 +0800 Subject: [PATCH] replace with pollster::block_on Signed-off-by: tison --- Cargo.lock | 7 ++++ core/Cargo.toml | 1 + core/src/layers/blocking.rs | 81 ++++++++++--------------------------- 3 files changed, 30 insertions(+), 59 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 03cb8b4e56ab..f6ad09d12a3e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4649,6 +4649,7 @@ dependencies = [ "opentelemetry", "percent-encoding", "persy", + "pollster", "pretty_assertions", "prometheus", "prometheus-client", @@ -5272,6 +5273,12 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "pollster" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22686f4785f02a4fcc856d3b3bb19bf6c8160d103f7a99cc258bddd0251dc7f2" + [[package]] name = "portable-atomic" version = "0.3.20" diff --git a/core/Cargo.toml b/core/Cargo.toml index 9eab246585aa..2d9c3047a23f 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -251,6 +251,7 @@ reqwest = { version = "0.11.18", features = [ serde = { version = "1", features = ["derive"] } serde_json = "1" tokio = { version = "1.27", features = ["sync"] } +pollster = "0.3.0" uuid = { version = "1", features = ["serde", "v4"] } # Test only dependencies diff --git a/core/src/layers/blocking.rs b/core/src/layers/blocking.rs index bf2c0e540a26..9d60567837e1 100644 --- a/core/src/layers/blocking.rs +++ b/core/src/layers/blocking.rs @@ -19,7 +19,6 @@ use async_trait::async_trait; use bytes; use bytes::Bytes; use futures::future::poll_fn; -use tokio::runtime::Handle; use crate::raw::oio::ReadExt; use crate::raw::*; @@ -138,17 +137,12 @@ use crate::*; /// } /// ``` #[derive(Debug, Clone)] -pub struct BlockingLayer { - handle: Handle, -} +pub struct BlockingLayer; impl BlockingLayer { /// Create a new `BlockingLayer` with the current runtime's handle pub fn create() -> Result { - Ok(Self { - handle: Handle::try_current() - .map_err(|_| Error::new(ErrorKind::Unexpected, "failed to get current handle"))?, - }) + Ok(BlockingLayer) } } @@ -156,18 +150,13 @@ impl Layer for BlockingLayer { type LayeredAccessor = BlockingAccessor; fn layer(&self, inner: A) -> Self::LayeredAccessor { - BlockingAccessor { - inner, - handle: self.handle.clone(), - } + BlockingAccessor { inner } } } #[derive(Clone, Debug)] pub struct BlockingAccessor { inner: A, - - handle: Handle, } #[cfg_attr(not(target_arch = "wasm32"), async_trait)] @@ -232,129 +221,103 @@ impl LayeredAccessor for BlockingAccessor { } fn blocking_create_dir(&self, path: &str, args: OpCreateDir) -> Result { - self.handle.block_on(self.inner.create_dir(path, args)) + pollster::block_on(self.inner.create_dir(path, args)) } fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> { - self.handle.block_on(async { + pollster::block_on(async { let (rp, reader) = self.inner.read(path, args).await?; - let blocking_reader = Self::BlockingReader::new(self.handle.clone(), reader); + let blocking_reader = Self::BlockingReader::new(reader); Ok((rp, blocking_reader)) }) } fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> { - self.handle.block_on(async { + pollster::block_on(async { let (rp, writer) = self.inner.write(path, args).await?; - let blocking_writer = Self::BlockingWriter::new(self.handle.clone(), writer); + let blocking_writer = Self::BlockingWriter::new(writer); Ok((rp, blocking_writer)) }) } fn blocking_copy(&self, from: &str, to: &str, args: OpCopy) -> Result { - self.handle.block_on(self.inner.copy(from, to, args)) + pollster::block_on(self.inner.copy(from, to, args)) } fn blocking_rename(&self, from: &str, to: &str, args: OpRename) -> Result { - self.handle.block_on(self.inner.rename(from, to, args)) + pollster::block_on(self.inner.rename(from, to, args)) } fn blocking_stat(&self, path: &str, args: OpStat) -> Result { - self.handle.block_on(self.inner.stat(path, args)) + pollster::block_on(self.inner.stat(path, args)) } fn blocking_delete(&self, path: &str, args: OpDelete) -> Result { - self.handle.block_on(self.inner.delete(path, args)) + pollster::block_on(self.inner.delete(path, args)) } fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> { - self.handle.block_on(async { + pollster::block_on(async { let (rp, lister) = self.inner.list(path, args).await?; - let blocking_lister = Self::BlockingLister::new(self.handle.clone(), lister); + let blocking_lister = Self::BlockingLister::new(lister); Ok((rp, blocking_lister)) }) } } pub struct BlockingWrapper { - handle: Handle, inner: I, } impl BlockingWrapper { - fn new(handle: Handle, inner: I) -> Self { - Self { handle, inner } + fn new(inner: I) -> Self { + Self { inner } } } impl oio::BlockingRead for BlockingWrapper { fn read(&mut self, buf: &mut [u8]) -> Result { - self.handle.block_on(self.inner.read(buf)) + pollster::block_on(self.inner.read(buf)) } fn seek(&mut self, pos: std::io::SeekFrom) -> Result { - self.handle.block_on(self.inner.seek(pos)) + pollster::block_on(self.inner.seek(pos)) } fn next(&mut self) -> Option> { - self.handle.block_on(self.inner.next()) + pollster::block_on(self.inner.next()) } } impl oio::BlockingWrite for BlockingWrapper { fn write(&mut self, bs: &dyn oio::WriteBuf) -> Result { - self.handle - .block_on(poll_fn(|cx| self.inner.poll_write(cx, bs))) + pollster::block_on(poll_fn(|cx| self.inner.poll_write(cx, bs))) } fn close(&mut self) -> Result<()> { - self.handle - .block_on(poll_fn(|cx| self.inner.poll_close(cx))) + pollster::block_on(poll_fn(|cx| self.inner.poll_close(cx))) } } impl oio::BlockingList for BlockingWrapper { fn next(&mut self) -> Result> { - self.handle.block_on(poll_fn(|cx| self.inner.poll_next(cx))) + pollster::block_on(poll_fn(|cx| self.inner.poll_next(cx))) } } #[cfg(test)] mod tests { - use once_cell::sync::Lazy; - use super::*; - use crate::types::Result; - - static RUNTIME: Lazy = Lazy::new(|| { - tokio::runtime::Builder::new_multi_thread() - .enable_all() - .build() - .unwrap() - }); - - fn create_blocking_layer() -> Result { - let _guard = RUNTIME.enter(); - BlockingLayer::create() - } #[test] fn test_blocking_layer_in_blocking_context() { - // create in a blocking context should fail let layer = BlockingLayer::create(); - assert!(layer.is_err()); - - // create in an async context and drop in a blocking context - let layer = create_blocking_layer(); assert!(layer.is_ok()) } #[test] fn test_blocking_layer_in_async_context() { - // create and drop in an async context - let _guard = RUNTIME.enter(); - let layer = BlockingLayer::create(); assert!(layer.is_ok()); }