From 3591a82a90ea07bec05feeca65fccf7271799b87 Mon Sep 17 00:00:00 2001 From: yjhmelody Date: Sat, 24 Aug 2024 22:20:00 +0800 Subject: [PATCH] chore: reduce duplicated code for hdfs --- core/src/services/hdfs/backend.rs | 120 +++--------------------------- 1 file changed, 10 insertions(+), 110 deletions(-) diff --git a/core/src/services/hdfs/backend.rs b/core/src/services/hdfs/backend.rs index e4aba77ec48a..ded7d8e41326 100644 --- a/core/src/services/hdfs/backend.rs +++ b/core/src/services/hdfs/backend.rs @@ -237,31 +237,12 @@ impl Access for HdfsBackend { am.into() } - async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result { - let p = build_rooted_abs_path(&self.root, path); - - self.client.create_dir(&p).map_err(new_std_io_error)?; - - Ok(RpCreateDir::default()) + async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result { + self.blocking_create_dir(path, args) } - async fn stat(&self, path: &str, _: OpStat) -> Result { - let p = build_rooted_abs_path(&self.root, path); - - let meta = self.client.metadata(&p).map_err(new_std_io_error)?; - - let mode = if meta.is_dir() { - EntryMode::DIR - } else if meta.is_file() { - EntryMode::FILE - } else { - EntryMode::Unknown - }; - let mut m = Metadata::new(mode); - m.set_content_length(meta.len()); - m.set_last_modified(meta.modified().into()); - - Ok(RpStat::new(m)) + async fn stat(&self, path: &str, args: OpStat) -> Result { + self.blocking_stat(path, args) } async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { @@ -345,97 +326,16 @@ impl Access for HdfsBackend { )) } - async fn delete(&self, path: &str, _: OpDelete) -> Result { - let p = build_rooted_abs_path(&self.root, path); - - let meta = self.client.metadata(&p); - - if let Err(err) = meta { - return if err.kind() == io::ErrorKind::NotFound { - Ok(RpDelete::default()) - } else { - Err(new_std_io_error(err)) - }; - } - - // Safety: Err branch has been checked, it's OK to unwrap. - let meta = meta.ok().unwrap(); - - let result = if meta.is_dir() { - self.client.remove_dir(&p) - } else { - self.client.remove_file(&p) - }; - - result.map_err(new_std_io_error)?; - - Ok(RpDelete::default()) + async fn delete(&self, path: &str, args: OpDelete) -> Result { + self.blocking_delete(path, args) } - async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> { - let p = build_rooted_abs_path(&self.root, path); - - let f = match self.client.read_dir(&p) { - Ok(f) => f, - Err(e) => { - return if e.kind() == io::ErrorKind::NotFound { - Ok((RpList::default(), None)) - } else { - Err(new_std_io_error(e)) - } - } - }; - - let rd = HdfsLister::new(&self.root, f); - - Ok((RpList::default(), Some(rd))) + async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> { + self.blocking_list(path, args) } - async fn rename(&self, from: &str, to: &str, _args: OpRename) -> Result { - let from_path = build_rooted_abs_path(&self.root, from); - self.client.metadata(&from_path).map_err(new_std_io_error)?; - - let to_path = build_rooted_abs_path(&self.root, to); - let result = self.client.metadata(&to_path); - match result { - Err(err) => { - // Early return if other error happened. - if err.kind() != io::ErrorKind::NotFound { - return Err(new_std_io_error(err)); - } - - let parent = PathBuf::from(&to_path) - .parent() - .ok_or_else(|| { - Error::new( - ErrorKind::Unexpected, - "path should have parent but not, it must be malformed", - ) - .with_context("input", &to_path) - })? - .to_path_buf(); - - self.client - .create_dir(&parent.to_string_lossy()) - .map_err(new_std_io_error)?; - } - Ok(metadata) => { - if metadata.is_file() { - self.client - .remove_file(&to_path) - .map_err(new_std_io_error)?; - } else { - return Err(Error::new(ErrorKind::IsADirectory, "path should be a file") - .with_context("input", &to_path)); - } - } - } - - self.client - .rename_file(&from_path, &to_path) - .map_err(new_std_io_error)?; - - Ok(RpRename::new()) + async fn rename(&self, from: &str, to: &str, args: OpRename) -> Result { + self.blocking_rename(from, to, args) } fn blocking_create_dir(&self, path: &str, _: OpCreateDir) -> Result {