diff --git a/doc/SEMANTICS.md b/doc/SEMANTICS.md index 490e72b6c..6a2e7d583 100644 --- a/doc/SEMANTICS.md +++ b/doc/SEMANTICS.md @@ -168,7 +168,7 @@ the following behavior: ### Directory operations -Basic read-only directory operations (`opendir`, `readdir`, `closedir`) are supported. +Basic read-only directory operations (`opendir`, `readdir`, `closedir`) are supported. However, seeking (`lseek`) on directory handles is not supported. Creating directories (`mkdir`) is supported, with the following behavior: diff --git a/mountpoint-s3/CHANGELOG.md b/mountpoint-s3/CHANGELOG.md index b338e0d8f..91b7e663f 100644 --- a/mountpoint-s3/CHANGELOG.md +++ b/mountpoint-s3/CHANGELOG.md @@ -1,5 +1,11 @@ ## Unreleased +### Breaking changes +* No breaking changes. + +### Other changes +* Some applications that read directory entries out of order (for example, [PHP](https://github.com/awslabs/mountpoint-s3/issues/477)) will now work correctly. ([#581](https://github.com/awslabs/mountpoint-s3/pull/581)) + ## v1.1.0 (October 23, 2023) ### Breaking changes diff --git a/mountpoint-s3/src/fs.rs b/mountpoint-s3/src/fs.rs index f2ac0da5b..0cfbea909 100644 --- a/mountpoint-s3/src/fs.rs +++ b/mountpoint-s3/src/fs.rs @@ -3,7 +3,7 @@ use futures::task::Spawn; use nix::unistd::{getgid, getuid}; use std::collections::HashMap; -use std::ffi::OsStr; +use std::ffi::{OsStr, OsString}; use std::str::FromStr; use std::time::{Duration, UNIX_EPOCH}; use time::OffsetDateTime; @@ -35,6 +35,7 @@ struct DirHandle { ino: InodeNo, handle: ReaddirHandle, offset: AtomicI64, + last_response: AsyncMutex)>>, } impl DirHandle { @@ -402,15 +403,18 @@ pub struct Opened { pub trait DirectoryReplier { /// Add a new dentry to the reply. Returns true if the buffer was full and so the entry was not /// added. - fn add>( - &mut self, - ino: u64, - offset: i64, - name: T, - attr: FileAttr, - generation: u64, - ttl: Duration, - ) -> bool; + fn add(&mut self, entry: DirectoryEntry) -> bool; +} + +#[derive(Debug, Clone)] +pub struct DirectoryEntry { + pub ino: u64, + pub offset: i64, + pub name: OsString, + pub attr: FileAttr, + pub generation: u64, + pub ttl: Duration, + lookup: LookedUp, } /// Reply to a `read` call. This is funky because we want the reply to happen with only a borrow of @@ -717,6 +721,7 @@ where ino: parent, handle: inode_handle, offset: AtomicI64::new(0), + last_response: AsyncMutex::new(None), }; let mut dir_handles = self.dir_handles.write().await; @@ -764,19 +769,77 @@ where }; if offset != dir_handle.offset() { + // POSIX allows seeking an open directory. That's a pain for us since we are streaming + // the directory entries and don't want to keep them all in memory. But one common case + // we've seen (https://github.com/awslabs/mountpoint-s3/issues/477) is applications that + // request offset 0 twice in a row. So we remember the last response and, if repeated, + // we return it again. + let last_response = dir_handle.last_response.lock().await; + if let Some((last_offset, entries)) = last_response.as_ref() { + if offset == *last_offset { + trace!(offset, "repeating readdir response"); + for entry in entries { + if reply.add(entry.clone()) { + break; + } + // We are returning this result a second time, so the contract is that we + // must remember it again, except that readdirplus specifies that . and .. + // are never incremented. + if is_readdirplus && entry.name != "." && entry.name != ".." { + dir_handle.handle.remember(&entry.lookup); + } + } + return Ok(reply); + } + } return Err(err!( libc::EINVAL, - "offset mismatch, expected={}, actual={}", + "out-of-order readdir, expected={}, actual={}", dir_handle.offset(), offset )); } + /// Wrap a replier to duplicate the entries and store them in `dir_handle.last_response` so + /// we can re-use them if the directory handle rewinds + struct Reply { + reply: R, + entries: Vec, + } + + impl Reply { + async fn finish(self, offset: i64, dir_handle: &DirHandle) -> R { + *dir_handle.last_response.lock().await = Some((offset, self.entries)); + self.reply + } + } + + impl DirectoryReplier for Reply { + fn add(&mut self, entry: DirectoryEntry) -> bool { + let result = self.reply.add(entry.clone()); + if !result { + self.entries.push(entry); + } + result + } + } + + let mut reply = Reply { reply, entries: vec![] }; + if dir_handle.offset() < 1 { let lookup = self.superblock.getattr(&self.client, parent, false).await?; let attr = self.make_attr(&lookup); - if reply.add(parent, dir_handle.offset() + 1, ".", attr, 0u64, lookup.validity()) { - return Ok(reply); + let entry = DirectoryEntry { + ino: parent, + offset: dir_handle.offset() + 1, + name: ".".into(), + attr, + generation: 0, + ttl: lookup.validity(), + lookup, + }; + if reply.add(entry) { + return Ok(reply.finish(offset, &dir_handle).await); } dir_handle.next_offset(); } @@ -786,36 +849,41 @@ where .getattr(&self.client, dir_handle.handle.parent(), false) .await?; let attr = self.make_attr(&lookup); - if reply.add( - dir_handle.handle.parent(), - dir_handle.offset() + 1, - "..", + let entry = DirectoryEntry { + ino: dir_handle.handle.parent(), + offset: dir_handle.offset() + 1, + name: "..".into(), attr, - 0u64, - lookup.validity(), - ) { - return Ok(reply); + generation: 0, + ttl: lookup.validity(), + lookup, + }; + if reply.add(entry) { + return Ok(reply.finish(offset, &dir_handle).await); } dir_handle.next_offset(); } loop { let next = match dir_handle.handle.next(&self.client).await? { - None => return Ok(reply), + None => return Ok(reply.finish(offset, &dir_handle).await), Some(next) => next, }; let attr = self.make_attr(&next); - if reply.add( - attr.ino, - dir_handle.offset() + 1, - next.inode.name(), + let entry = DirectoryEntry { + ino: attr.ino, + offset: dir_handle.offset() + 1, + name: next.inode.name().into(), attr, - 0u64, - next.validity(), - ) { + generation: 0, + ttl: next.validity(), + lookup: next.clone(), + }; + + if reply.add(entry) { dir_handle.handle.readd(next); - return Ok(reply); + return Ok(reply.finish(offset, &dir_handle).await); } if is_readdirplus { dir_handle.handle.remember(&next); diff --git a/mountpoint-s3/src/fuse.rs b/mountpoint-s3/src/fuse.rs index 9d21482a4..f65fa6ab5 100644 --- a/mountpoint-s3/src/fuse.rs +++ b/mountpoint-s3/src/fuse.rs @@ -4,17 +4,19 @@ use futures::executor::block_on; use futures::task::Spawn; use std::ffi::OsStr; use std::path::Path; -use std::time::{Duration, SystemTime}; +use std::time::SystemTime; use time::OffsetDateTime; use tracing::{instrument, Instrument}; -use crate::fs::{self, DirectoryReplier, InodeNo, ReadReplier, S3Filesystem, S3FilesystemConfig, ToErrno}; +use crate::fs::{ + self, DirectoryEntry, DirectoryReplier, InodeNo, ReadReplier, S3Filesystem, S3FilesystemConfig, ToErrno, +}; use crate::prefix::Prefix; #[cfg(target_os = "macos")] use fuser::ReplyXTimes; use fuser::{ - FileAttr, Filesystem, KernelConfig, ReplyAttr, ReplyBmap, ReplyCreate, ReplyData, ReplyEmpty, ReplyEntry, - ReplyIoctl, ReplyLock, ReplyLseek, ReplyOpen, ReplyWrite, ReplyXattr, Request, TimeOrNow, + Filesystem, KernelConfig, ReplyAttr, ReplyBmap, ReplyCreate, ReplyData, ReplyEmpty, ReplyEntry, ReplyIoctl, + ReplyLock, ReplyLseek, ReplyOpen, ReplyWrite, ReplyXattr, Request, TimeOrNow, }; use mountpoint_s3_client::ObjectClient; @@ -168,16 +170,8 @@ where } impl<'a> DirectoryReplier for ReplyDirectory<'a> { - fn add>( - &mut self, - ino: InodeNo, - offset: i64, - name: T, - attr: FileAttr, - _generation: u64, - _ttl: Duration, - ) -> bool { - let result = self.inner.add(ino, offset, attr.kind, name); + fn add(&mut self, entry: DirectoryEntry) -> bool { + let result = self.inner.add(entry.ino, entry.offset, entry.attr.kind, entry.name); if !result { *self.count += 1; } @@ -215,16 +209,15 @@ where } impl<'a> DirectoryReplier for ReplyDirectoryPlus<'a> { - fn add>( - &mut self, - ino: u64, - offset: i64, - name: T, - attr: FileAttr, - generation: u64, - ttl: Duration, - ) -> bool { - let result = self.inner.add(ino, offset, name, &ttl, &attr, generation); + fn add(&mut self, entry: DirectoryEntry) -> bool { + let result = self.inner.add( + entry.ino, + entry.offset, + entry.name, + &entry.ttl, + &entry.attr, + entry.generation, + ); if !result { *self.count += 1; } diff --git a/mountpoint-s3/src/inode.rs b/mountpoint-s3/src/inode.rs index 53a381388..ae8d7e745 100644 --- a/mountpoint-s3/src/inode.rs +++ b/mountpoint-s3/src/inode.rs @@ -1167,7 +1167,8 @@ impl Inode { pub fn dec_lookup_count(&self, n: u64) -> u64 { let mut state = self.inner.sync.write().unwrap(); let lookup_count = &mut state.lookup_count; - *lookup_count -= n; + debug_assert!(n <= *lookup_count, "lookup count cannot go negative"); + *lookup_count = lookup_count.saturating_sub(n); trace!(new_lookup_count = lookup_count, "decremented lookup count"); *lookup_count } diff --git a/mountpoint-s3/tests/common/mod.rs b/mountpoint-s3/tests/common/mod.rs index 72b98fcd2..611bddfb3 100644 --- a/mountpoint-s3/tests/common/mod.rs +++ b/mountpoint-s3/tests/common/mod.rs @@ -2,7 +2,7 @@ use aws_sdk_s3::config::Region; use aws_sdk_s3::primitives::ByteStream; use fuser::{FileAttr, FileType}; use futures::executor::ThreadPool; -use mountpoint_s3::fs::{self, DirectoryReplier, ReadReplier, ToErrno}; +use mountpoint_s3::fs::{self, DirectoryEntry, DirectoryReplier, ReadReplier, ToErrno}; use mountpoint_s3::prefix::Prefix; use mountpoint_s3::{S3Filesystem, S3FilesystemConfig}; use mountpoint_s3_client::mock_client::{MockClient, MockClientConfig}; @@ -11,10 +11,8 @@ use mountpoint_s3_crt::common::rust_log_adapter::RustLogAdapter; use rand::rngs::OsRng; use rand::RngCore; use std::collections::VecDeque; -use std::ffi::{OsStr, OsString}; use std::future::Future; use std::sync::Arc; -use std::time::Duration; pub fn make_test_filesystem( bucket: &str, @@ -105,14 +103,6 @@ pub fn assert_attr(attr: FileAttr, ftype: FileType, size: u64, uid: u32, gid: u3 assert_eq!(attr.perm, perm); } -#[derive(Debug, Clone)] -pub struct DirectoryEntry { - pub ino: u64, - pub offset: i64, - pub attr: FileAttr, - pub name: OsString, -} - #[derive(Debug, Default)] pub struct DirectoryReply { readdir_limit: usize, @@ -120,24 +110,11 @@ pub struct DirectoryReply { } impl DirectoryReplier for &mut DirectoryReply { - fn add>( - &mut self, - ino: u64, - offset: i64, - name: T, - attr: FileAttr, - _generation: u64, - _ttl: Duration, - ) -> bool { + fn add(&mut self, entry: DirectoryEntry) -> bool { if self.readdir_limit > 0 && !self.entries.is_empty() && self.entries.len() % self.readdir_limit == 0 { true } else { - self.entries.push_back(DirectoryEntry { - ino, - offset, - attr, - name: name.as_ref().to_os_string(), - }); + self.entries.push_back(entry); false } } diff --git a/mountpoint-s3/tests/fs.rs b/mountpoint-s3/tests/fs.rs index 307157dbf..8345d756e 100644 --- a/mountpoint-s3/tests/fs.rs +++ b/mountpoint-s3/tests/fs.rs @@ -21,7 +21,7 @@ use std::time::{Duration, SystemTime}; use test_case::test_case; mod common; -use common::{assert_attr, make_test_filesystem, make_test_filesystem_with_client, ReadReply}; +use common::{assert_attr, make_test_filesystem, make_test_filesystem_with_client, DirectoryReply, ReadReply}; #[test_case(""; "unprefixed")] #[test_case("test_prefix/"; "prefixed")] @@ -1232,3 +1232,94 @@ async fn test_flexible_retrieval_objects() { } } } + +#[tokio::test] +async fn test_readdir_rewind() { + let (client, fs) = make_test_filesystem("test_readdir_rewind", &Default::default(), Default::default()); + + for i in 0..10 { + client.add_object(&format!("foo{i}"), b"foo".into()); + } + + let dir_handle = fs.opendir(FUSE_ROOT_INODE, 0).await.unwrap().fh; + + let mut reply = DirectoryReply::new(5); + let _ = fs + .readdirplus(FUSE_ROOT_INODE, dir_handle, 0, &mut reply) + .await + .unwrap(); + let entries = reply + .entries + .iter() + .map(|e| (e.ino, e.name.clone())) + .collect::>(); + assert_eq!(entries.len(), 5); + + // Trying to read out of order should fail (only the previous or next offsets are valid) + assert!(reply.entries.back().unwrap().offset > 1); + fs.readdirplus(FUSE_ROOT_INODE, dir_handle, 1, &mut Default::default()) + .await + .expect_err("out of order"); + + // Requesting the same buffer size should work fine + let mut new_reply = DirectoryReply::new(5); + let _ = fs + .readdirplus(FUSE_ROOT_INODE, dir_handle, 0, &mut new_reply) + .await + .unwrap(); + let new_entries = new_reply + .entries + .iter() + .map(|e| (e.ino, e.name.clone())) + .collect::>(); + assert_eq!(entries, new_entries); + + // Requesting a smaller buffer works fine and returns a prefix + let mut new_reply = DirectoryReply::new(3); + let _ = fs + .readdirplus(FUSE_ROOT_INODE, dir_handle, 0, &mut new_reply) + .await + .unwrap(); + let new_entries = new_reply + .entries + .iter() + .map(|e| (e.ino, e.name.clone())) + .collect::>(); + assert_eq!(&entries[..3], new_entries); + + // Requesting a larger buffer works fine, but only partially fills (which is allowed) + let mut new_reply = DirectoryReply::new(10); + let _ = fs + .readdirplus(FUSE_ROOT_INODE, dir_handle, 0, &mut new_reply) + .await + .unwrap(); + let new_entries = new_reply + .entries + .iter() + .map(|e| (e.ino, e.name.clone())) + .collect::>(); + assert_eq!(entries, new_entries); + + // And we can resume the stream from the end of the first request + let mut next_page = DirectoryReply::new(0); + let _ = fs + .readdirplus( + FUSE_ROOT_INODE, + dir_handle, + reply.entries.back().unwrap().offset, + &mut next_page, + ) + .await + .unwrap(); + assert_eq!(next_page.entries.len(), 7); // 10 directory entries + . + .. = 12, minus the 5 we already saw + assert_eq!(next_page.entries.front().unwrap().name, "foo3"); + + for entry in reply.entries { + // We know we're in the root dir, so the . and .. entries will both be FUSE_ROOT_INODE + if entry.ino != FUSE_ROOT_INODE { + // Each inode in this list should be remembered twice since we did two `readdirplus`es. + // Forget will panic if this makes the lookup count underflow. + fs.forget(entry.ino, 2).await; + } + } +}