diff --git a/.env.example b/.env.example index 2a5b0344876b..08b3c255eaa9 100644 --- a/.env.example +++ b/.env.example @@ -164,3 +164,9 @@ OPENDAL_GDRIVE_ACCESS_TOKEN= OPENDAL_GDRIVE_REFRESH_TOKEN= OPENDAL_GDRIVE_CLIENT_ID= OPENDAL_GDRIVE_CLIENT_SECRET= +# sqlite +OPENDAL_SQLITE_TEST=on +OPENDAL_SQLITE_CONNECTION_STRING=file:///tmp/opendal/test.db +OPENDAL_SQLITE_TABLE=data +OPENDAL_SQLITE_KEY_FIELD=key +OPENDAL_SQLITE_VALUE_FIELD=data diff --git a/Cargo.lock b/Cargo.lock index b936172a39fe..601e740cdb81 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4047,6 +4047,7 @@ dependencies = [ "prometheus-client", "prost", "quick-xml 0.30.0", + "r2d2", "rand 0.8.5", "redb", "redis", @@ -5252,6 +5253,17 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "r2d2" +version = "0.8.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51de85fb3fb6524929c8a2eb85e6b6d363de4e8c48f9e2c2eac4944abc181c93" +dependencies = [ + "log", + "parking_lot 0.12.1", + "scheduled-thread-pool", +] + [[package]] name = "radium" version = "0.7.0" diff --git a/core/Cargo.toml b/core/Cargo.toml index f1c568c516b9..5b1e7b5b53ad 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -141,6 +141,7 @@ services-memcached = ["dep:bb8"] services-memory = [] services-mini-moka = ["dep:mini-moka"] services-moka = ["dep:moka"] +services-mysql = ["dep:mysql_async"] services-obs = [ "dep:reqsign", "reqsign?/services-huaweicloud", @@ -166,6 +167,7 @@ services-s3 = [ ] services-sftp = ["dep:openssh", "dep:openssh-sftp-client", "dep:dirs"] services-sled = ["dep:sled"] +services-sqlite = ["dep:rusqlite", "dep:r2d2"] services-supabase = [] services-tikv = ["tikv-client"] services-vercel-artifacts = [] @@ -176,8 +178,6 @@ services-wasabi = [ ] services-webdav = [] services-webhdfs = [] -services-mysql = ["dep:mysql_async"] -services-sqlite = ["dep:rusqlite"] [lib] bench = false @@ -206,6 +206,7 @@ await-tree = { version = "0.1.1", optional = true } backon = "0.4.1" base64 = "0.21" bb8 = { version = "0.8", optional = true } +bb8-postgres = { version = "0.8.1", optional = true } bytes = "1.4" cacache = { version = "11.6", default-features = false, features = [ "tokio-runtime", @@ -235,6 +236,7 @@ metrics = { version = "0.20", optional = true } mini-moka = { version = "0.10", optional = true } minitrace = { version = "0.5", optional = true } moka = { version = "0.10", optional = true, features = ["future"] } +mysql_async = { version = "0.32.2", optional = true } once_cell = "1" openssh = { version = "0.9.9", optional = true } openssh-sftp-client = { version = "0.13.9", optional = true, features = [ @@ -250,6 +252,7 @@ prometheus = { version = "0.13", features = ["process"], optional = true } prometheus-client = { version = "0.21.2", optional = true } prost = { version = "0.11", optional = true } quick-xml = { version = "0.30", features = ["serialize", "overlapped-lists"] } +r2d2 = { version = "0.8", optional = true } rand = { version = "0.8", optional = true } redb = { version = "1.1.0", optional = true } redis = { version = "0.23.1", features = [ @@ -262,6 +265,7 @@ reqwest = { version = "0.11.18", features = [ "stream", ], default-features = false } rocksdb = { version = "0.21.0", default-features = false, optional = true } +rusqlite = { version = "0.29.0", optional = true, features = ["bundled"] } serde = { version = "1", features = ["derive"] } serde_json = "1" sha2 = { version = "0.10", optional = true } @@ -275,9 +279,6 @@ tokio = "1.27" tokio-postgres = { version = "0.7.8", optional = true } tracing = { version = "0.1", optional = true } uuid = { version = "1", features = ["serde", "v4"] } -mysql_async = { version = "0.32.2", optional = true } -bb8-postgres = { version = "0.8.1", optional = true } -rusqlite = { version = "0.29.0", optional = true, features = ["bundled"] } [dev-dependencies] criterion = { version = "0.4", features = ["async", "async_tokio"] } diff --git a/core/src/raw/mod.rs b/core/src/raw/mod.rs index 34638b9ff9aa..313c5d04f629 100644 --- a/core/src/raw/mod.rs +++ b/core/src/raw/mod.rs @@ -56,6 +56,9 @@ pub use serde_util::*; mod chrono_util; pub use chrono_util::*; +mod tokio_util; +pub use tokio_util::*; + // Expose as a pub mod to avoid confusing. pub mod adapters; pub mod oio; diff --git a/core/src/raw/tokio_util.rs b/core/src/raw/tokio_util.rs new file mode 100644 index 000000000000..e68be2cf2882 --- /dev/null +++ b/core/src/raw/tokio_util.rs @@ -0,0 +1,23 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::*; + +/// Parse tokio error into opendal::Error. +pub fn new_task_join_error(e: tokio::task::JoinError) -> Error { + Error::new(ErrorKind::Unexpected, "tokio task join failed").set_source(e) +} diff --git a/core/src/services/sqlite/backend.rs b/core/src/services/sqlite/backend.rs index 9aac1c44d99a..24dbc1674967 100644 --- a/core/src/services/sqlite/backend.rs +++ b/core/src/services/sqlite/backend.rs @@ -155,8 +155,13 @@ impl Builder for SqliteBuilder { .unwrap_or_else(|| "/".to_string()) .as_str(), ); + let mgr = SqliteConnectionManager { connection_string }; + let pool = r2d2::Pool::new(mgr).map_err(|err| { + Error::new(ErrorKind::Unexpected, "sqlite pool init failed").set_source(err) + })?; + Ok(SqliteBackend::new(Adapter { - connection_string, + pool, table, key_field, value_field, @@ -165,11 +170,33 @@ impl Builder for SqliteBuilder { } } +struct SqliteConnectionManager { + connection_string: String, +} + +impl r2d2::ManageConnection for SqliteConnectionManager { + type Connection = Connection; + type Error = Error; + + fn connect(&self) -> Result { + Connection::open(&self.connection_string) + .map_err(|err| Error::new(ErrorKind::Unexpected, "sqlite open error").set_source(err)) + } + + fn is_valid(&self, conn: &mut Connection) -> Result<()> { + conn.execute_batch("").map_err(parse_rusqlite_error) + } + + fn has_broken(&self, _: &mut Connection) -> bool { + false + } +} + pub type SqliteBackend = kv::Backend; #[derive(Clone)] pub struct Adapter { - connection_string: String, + pool: r2d2::Pool, table: String, key_field: String, @@ -179,7 +206,6 @@ pub struct Adapter { impl Debug for Adapter { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { let mut ds = f.debug_struct("SqliteAdapter"); - ds.field("connection_string", &self.connection_string); ds.field("table", &self.table); ds.field("key_field", &self.key_field); ds.field("value_field", &self.value_field); @@ -198,93 +224,85 @@ impl kv::Adapter for Adapter { write: true, create_dir: true, delete: true, + blocking: true, ..Default::default() }, ) } async fn get(&self, path: &str) -> Result>> { - let cloned_path = path.to_string(); - let cloned_self = self.clone(); + let this = self.clone(); + let path = path.to_string(); - task::spawn_blocking(move || cloned_self.blocking_get(cloned_path.as_str())) + task::spawn_blocking(move || this.blocking_get(&path)) .await - .map_err(Error::from) - .and_then(|inner_result| inner_result) + .map_err(new_task_join_error)? } fn blocking_get(&self, path: &str) -> Result>> { + let conn = self.pool.get().map_err(parse_r2d2_error)?; + let query = format!( "SELECT {} FROM {} WHERE `{}` = $1 LIMIT 1", self.value_field, self.table, self.key_field ); - let conn = Connection::open(self.connection_string.clone()).map_err(Error::from)?; - let mut statement = conn.prepare(&query).map_err(Error::from)?; + let mut statement = conn.prepare(&query).map_err(parse_rusqlite_error)?; let result = statement.query_row([path], |row| row.get(0)); match result { Ok(v) => Ok(Some(v)), Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None), - Err(err) => Err(Error::from(err)), + Err(err) => Err(parse_rusqlite_error(err)), } } async fn set(&self, path: &str, value: &[u8]) -> Result<()> { - let cloned_path = path.to_string(); - let cloned_value = value.to_vec(); - let cloned_self = self.clone(); + let this = self.clone(); + let path = path.to_string(); + // FIXME: can we avoid this copy? + let value = value.to_vec(); - task::spawn_blocking(move || cloned_self.blocking_set(cloned_path.as_str(), &cloned_value)) + task::spawn_blocking(move || this.blocking_set(&path, &value)) .await - .map_err(Error::from) - .and_then(|inner_result| inner_result) + .map_err(new_task_join_error)? } fn blocking_set(&self, path: &str, value: &[u8]) -> Result<()> { + let conn = self.pool.get().map_err(parse_r2d2_error)?; + let query = format!( "INSERT OR REPLACE INTO `{}` (`{}`, `{}`) VALUES ($1, $2)", self.table, self.key_field, self.value_field ); - let conn = Connection::open(self.connection_string.clone()).map_err(Error::from)?; - let mut statement = conn.prepare(&query).map_err(Error::from)?; + let mut statement = conn.prepare(&query).map_err(parse_rusqlite_error)?; statement .execute(params![path, value]) - .map_err(Error::from)?; + .map_err(parse_rusqlite_error)?; Ok(()) } async fn delete(&self, path: &str) -> Result<()> { - let cloned_path = path.to_string(); - let cloned_self = self.clone(); + let this = self.clone(); + let path = path.to_string(); - task::spawn_blocking(move || cloned_self.blocking_delete(cloned_path.as_str())) + task::spawn_blocking(move || this.blocking_delete(&path)) .await - .map_err(Error::from) - .and_then(|inner_result| inner_result) + .map_err(new_task_join_error)? } fn blocking_delete(&self, path: &str) -> Result<()> { - let conn = Connection::open(self.connection_string.clone()).map_err(|err| { - Error::new(ErrorKind::Unexpected, "Sqlite open error").set_source(err) - })?; + let conn = self.pool.get().map_err(parse_r2d2_error)?; + let query = format!("DELETE FROM {} WHERE `{}` = $1", self.table, self.key_field); - let mut statement = conn.prepare(&query).map_err(Error::from)?; - statement.execute([path]).map_err(Error::from)?; + let mut statement = conn.prepare(&query).map_err(parse_rusqlite_error)?; + statement.execute([path]).map_err(parse_rusqlite_error)?; Ok(()) } } -impl From for Error { - fn from(value: rusqlite::Error) -> Error { - Error::new(ErrorKind::Unexpected, "unhandled error from sqlite").set_source(value) - } +fn parse_rusqlite_error(err: rusqlite::Error) -> Error { + Error::new(ErrorKind::Unexpected, "unhandled error from sqlite").set_source(err) } -impl From for Error { - fn from(value: task::JoinError) -> Error { - Error::new( - ErrorKind::Unexpected, - "unhandled error from sqlite when spawning task", - ) - .set_source(value) - } +fn parse_r2d2_error(err: r2d2::Error) -> Error { + Error::new(ErrorKind::Unexpected, "unhandled error from r2d2").set_source(err) } diff --git a/core/tests/behavior/main.rs b/core/tests/behavior/main.rs index 695eefe2ce28..f6a5e2df43ba 100644 --- a/core/tests/behavior/main.rs +++ b/core/tests/behavior/main.rs @@ -104,7 +104,7 @@ fn main() -> anyhow::Result<()> { tests.extend(behavior_test::()); #[cfg(feature = "services-azblob")] tests.extend(behavior_test::()); - #[cfg(feature = "services-Azdls")] + #[cfg(feature = "services-azdls")] tests.extend(behavior_test::()); #[cfg(feature = "services-cacache")] tests.extend(behavior_test::());