Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NIOFileSystem: BufferedReader isn't really composable #3011

Open
weissi opened this issue Dec 1, 2024 · 3 comments
Open

NIOFileSystem: BufferedReader isn't really composable #3011

weissi opened this issue Dec 1, 2024 · 3 comments

Comments

@weissi
Copy link
Member

weissi commented Dec 1, 2024

Expected behavior

BufferedReader is probably the best abstraction in NIOFileSystem to stream a bunch of data. Real world examples include for example JSON lines (JSONL).

I'd expect to be able to write

try await FileSystem.shared.withFileHandle(forReadingAt: "many-gigabytes.jsonl") { file in
    var reader = file.bufferedReader(capacity: .mebibytes(4))
    for line in try await reader.lines {
        let thing = try JSONDecoder().decode(Thing.self, from: line)
        try await process(thing)
    }
}

And for more complex schemes that aren't just newlines, I'd like to be able to write something composable on BufferedReader.

Actual behavior

Right now I use

try await FileSystem.shared.withFileHandle(forReadingAt: "many-gigabytes.jsonl") { file in
    var reader = file.bufferedReader(capacity: .mebibytes(4))
    while true {
        let (line, seenEOF) = try await reader.read(while: { $0 != UInt8("\n") })
        guard !seenEOF else {
            break
        }
        (_, _) = try await reader.read(while: { $0 == UInt8("\n") }) // skip new lines
        let thing = try JSONDecoder().decode(Thing.self, from: line)
        try await process(thing)
    }
}

But this is

  • quite ugly
    • while true + guard !seenEOF
    • (_, _) =
  • not actually correct because I can only easily test for 1 byte (UInt8("\n")). If I wanted to support \r\n & \n, then this would get a lot more messy

SwiftNIO version/commit hash

2.77.0


FWIW, I also have another thing which allows me to use

try await FileSystem.shared.withFileHandle(forReadingAt: "many-gigabytes.jsonl") { file in
    for try await line in file.lines {
        let thing = try JSONDecoder().decode(Thing.self, from: line)
        try await process(thing)
    }
}

but, it's very complex:

import NIO
import _NIOFileSystem

public struct AsyncByteBufferLineSequence<Base: Sendable>: AsyncSequence & Sendable
where Base: AsyncSequence, Base.Element == ByteBuffer {
    public typealias Element = ByteBuffer
    private let underlying: Base
    private let dropTerminator: Bool
    private let maximumAllowableBufferSize: Int
    private let dropLastChunkIfNoNewline: Bool

    public struct AsyncIterator: AsyncIteratorProtocol {
        public typealias Element = ByteBuffer
        private var underlying: Base.AsyncIterator
        private let dropTerminator: Bool
        private let maximumAllowableBufferSize: Int
        private let dropLastChunkIfNoNewline: Bool
        private var buffer = Buffer()

        struct Buffer {
            private var buffer: [ByteBuffer] = []
            internal private(set) var byteCount: Int = 0

            mutating func append(_ buffer: ByteBuffer) {
                self.buffer.append(buffer)
                self.byteCount += buffer.readableBytes
            }

            func allButLast() -> ArraySlice<ByteBuffer> {
                return self.buffer.dropLast()
            }

            var byteCountButLast: Int {
                return self.byteCount - (self.buffer.last?.readableBytes ?? 0)
            }

            var lastChunkView: ByteBufferView? {
                return self.buffer.last?.readableBytesView
            }

            mutating func concatenateEverything(upToLastChunkLengthToConsume lastLength: Int) -> ByteBuffer {
                var output = ByteBuffer()
                output.reserveCapacity(lastLength + self.byteCountButLast)

                var writtenBytes = 0
                for buffer in self.buffer.dropLast() {
                    writtenBytes += output.writeImmutableBuffer(buffer)
                }
                writtenBytes += output.writeImmutableBuffer(
                    self.buffer[self.buffer.endIndex - 1].readSlice(length: lastLength)!
                )
                if self.buffer.last!.readableBytes > 0 {
                    if self.buffer.count > 1 {
                        self.buffer.swapAt(0, self.buffer.endIndex - 1)
                    }
                    self.buffer.removeLast(self.buffer.count - 1)
                } else {
                    self.buffer = []
                }

                self.byteCount -= writtenBytes
                assert(self.byteCount >= 0)
                return output
            }
        }

        internal init(
            underlying: Base.AsyncIterator,
            dropTerminator: Bool,
            maximumAllowableBufferSize: Int,
            dropLastChunkIfNoNewline: Bool
        ) {
            self.underlying = underlying
            self.dropTerminator = dropTerminator
            self.maximumAllowableBufferSize = maximumAllowableBufferSize
            self.dropLastChunkIfNoNewline = dropLastChunkIfNoNewline
        }

        private mutating func deliverUpTo(
            view: ByteBufferView,
            index: ByteBufferView.Index,
            expectNewline: Bool
        ) -> ByteBuffer {
            let howMany = view.startIndex.distance(to: index) + (expectNewline ? 1 : 0)

            var output = self.buffer.concatenateEverything(upToLastChunkLengthToConsume: howMany)
            if expectNewline {
                assert(output.readableBytesView.last == UInt8(ascii: "\n"))
                assert(
                    output.readableBytesView.firstIndex(of: UInt8(ascii: "\n"))
                        == output.readableBytesView.index(before: output.readableBytesView.endIndex))
            } else {
                assert(output.readableBytesView.last != UInt8(ascii: "\n"))
                assert(!output.readableBytesView.contains(UInt8(ascii: "\n")))
            }
            if self.dropTerminator && expectNewline {
                output.moveWriterIndex(to: output.writerIndex - 1)
            }

            return output
        }

        public mutating func next() async throws -> Element? {
            while true {
                if let view = self.buffer.lastChunkView {
                    if let newlineIndex = view.firstIndex(of: UInt8(ascii: "\n")) {
                        return self.deliverUpTo(
                            view: view,
                            index: newlineIndex,
                            expectNewline: true
                        )
                    }

                    if self.buffer.byteCount > self.maximumAllowableBufferSize {
                        return self.deliverUpTo(
                            view: view,
                            index: view.endIndex,
                            expectNewline: false
                        )
                    }
                }

                if let nextBuffer = try await self.underlying.next() {
                    self.buffer.append(nextBuffer)
                } else {
                    if !self.dropLastChunkIfNoNewline, let view = self.buffer.lastChunkView, !view.isEmpty {
                        return self.deliverUpTo(
                            view: view,
                            index: view.endIndex,
                            expectNewline: false
                        )
                    } else {
                        return nil
                    }
                }
            }
        }
    }

    public init(
        _ underlying: Base, dropTerminator: Bool,
        maximumAllowableBufferSize: Int,
        dropLastChunkIfNoNewline: Bool
    ) {
        self.underlying = underlying
        self.dropTerminator = dropTerminator
        self.maximumAllowableBufferSize = maximumAllowableBufferSize
        self.dropLastChunkIfNoNewline = dropLastChunkIfNoNewline
    }

    public func makeAsyncIterator() -> AsyncIterator {
        return AsyncIterator(
            underlying: self.underlying.makeAsyncIterator(),
            dropTerminator: self.dropTerminator,
            maximumAllowableBufferSize: self.maximumAllowableBufferSize,
            dropLastChunkIfNoNewline: self.dropLastChunkIfNoNewline
        )
    }
}

extension ReadFileHandle {
    public var lines: AsyncByteBufferLineSequence<FileChunks> {
        return AsyncByteBufferLineSequence(
            self.readChunks(),
            dropTerminator: true,
            maximumAllowableBufferSize: 8 * 1024 * 1024,
            dropLastChunkIfNoNewline: false
        )
    }
}
@glbrntt
Copy link
Contributor

glbrntt commented Dec 2, 2024

What's the concrete ask here Johannes, a file.lines() AsyncSequence? Or a line reader on BufferedReader?

@weissi
Copy link
Member Author

weissi commented Dec 2, 2024

What's the concrete ask here Johannes, a file.lines() AsyncSequence? Or a line reader on BufferedReader?

Both. a lines and general composability of BufferedReader so I can write things like lines more easily.

@FranzBusch
Copy link
Member

I understand your ask here. IMO we should start with providing an AsyncSequence way for now since this is the standard API for asynchronous streaming at this point.

I personally feel like a general protocol AsyncReader and protocol AsyncWriter is something very useful that we might want to introduce at the NIO level. Though I personally feel like they are standard library concepts.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants