-
Notifications
You must be signed in to change notification settings - Fork 6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conform to PooledConnection #43
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,16 +11,27 @@ | |
// SPDX-License-Identifier: Apache-2.0 | ||
// | ||
//===----------------------------------------------------------------------===// | ||
@_spi(AsyncChannel) | ||
|
||
import _ConnectionPoolModule | ||
import Atomics | ||
import NIOCore | ||
import NIOPosix | ||
import ServiceLifecycle | ||
|
||
/// An actor to create a connection to a Memcache server. | ||
/// | ||
/// This actor can be used to send commands to the server. | ||
public actor MemcacheConnection: Service { | ||
public actor MemcacheConnection: Service, PooledConnection { | ||
public typealias ID = Int | ||
public let id: ID | ||
private static var nextID: ManagedAtomic<Int> = ManagedAtomic(0) | ||
|
||
private let closePromise: EventLoopPromise<Void> | ||
|
||
public var closeFuture: EventLoopFuture<Void> { | ||
return self.closePromise.futureResult | ||
} | ||
|
||
private typealias StreamElement = (MemcacheRequest, CheckedContinuation<MemcacheResponse, Error>) | ||
private let host: String | ||
private let port: Int | ||
|
@@ -56,23 +67,63 @@ public actor MemcacheConnection: Service { | |
|
||
private var state: State | ||
|
||
/// Initialize a new MemcacheConnection. | ||
/// Initialize a new MemcacheConnection, with an option to specify an ID. | ||
/// If no ID is provided, a default value is used. | ||
/// | ||
/// - Parameters: | ||
/// - host: The host address of the Memcache server. | ||
/// - port: The port number of the Memcache server. | ||
/// - eventLoopGroup: The event loop group to use for this connection. | ||
public init(host: String, port: Int, eventLoopGroup: EventLoopGroup) { | ||
/// - id: The unique identifier for the connection (optional). | ||
public init(host: String, port: Int, id: ID? = nil, eventLoopGroup: EventLoopGroup) { | ||
self.host = host | ||
self.port = port | ||
self.id = id ?? MemcacheConnection.nextID.wrappingIncrementThenLoad(ordering: .sequentiallyConsistent) | ||
let (stream, continuation) = AsyncStream<StreamElement>.makeStream() | ||
let bufferAllocator = ByteBufferAllocator() | ||
self.state = .initial( | ||
eventLoopGroup: eventLoopGroup, | ||
bufferAllocator: bufferAllocator, | ||
requestStream: stream, | ||
requestContinuation: continuation | ||
) | ||
self.closePromise = eventLoopGroup.next().makePromise(of: Void.self) | ||
self.state = .initial(eventLoopGroup: eventLoopGroup, bufferAllocator: bufferAllocator, requestStream: stream, requestContinuation: continuation) | ||
} | ||
|
||
deinit { | ||
// Fulfill the promise if it has not been fulfilled yet | ||
closePromise.fail(MemcacheError(code: .connectionShutdown, | ||
message: "MemcacheConnection deinitialized without closing", | ||
cause: nil, | ||
location: .here())) | ||
} | ||
|
||
/// Closes the connection. This method is responsible for properly shutting down | ||
/// and cleaning up resources associated with the connection. | ||
public nonisolated func close() { | ||
Task { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @fabianfett just wanted to ping you here to show you the problems that we hit when adopting There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here is also a few notes I had in context to this. One reason I have a Xcode by default will give you a fix prompting you to from my understanding There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it is important that close is not async, because the whole point of We can see this as an issue of the Also I want to make explicit, that the I think I come back to my favorite point about structured concurrency:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @FranzBusch I think the issue is not really the actor here, but the question which concurrency structure do you follow. All our approaches are structured. :) |
||
await self.closeConnection() | ||
} | ||
} | ||
|
||
private func closeConnection() async { | ||
switch self.state { | ||
case .running(_, let channel, _, _): | ||
channel.channel.close().cascade(to: self.closePromise) | ||
default: | ||
self.closePromise.succeed(()) | ||
} | ||
self.state = .finished | ||
} | ||
|
||
/// Registers a closure to be called when the connection is closed. | ||
/// This is useful for performing cleanup or notification tasks. | ||
public nonisolated func onClose(_ closure: @escaping ((any Error)?) -> Void) { | ||
Task { | ||
await self.closeFuture.whenComplete { result in | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if the closeFuture is marked as |
||
switch result { | ||
case .success: | ||
closure(nil) | ||
case .failure(let error): | ||
closure(error) | ||
} | ||
} | ||
} | ||
} | ||
|
||
/// Runs the Memcache connection. | ||
|
@@ -95,7 +146,7 @@ public actor MemcacheConnection: Service { | |
return channel.eventLoop.makeCompletedFuture { | ||
try channel.pipeline.syncOperations.addHandler(MessageToByteHandler(MemcacheRequestEncoder())) | ||
try channel.pipeline.syncOperations.addHandler(ByteToMessageHandler(MemcacheResponseDecoder())) | ||
return try NIOAsyncChannel<MemcacheResponse, MemcacheRequest>(synchronouslyWrapping: channel) | ||
return try NIOAsyncChannel<MemcacheResponse, MemcacheRequest>(wrappingChannelSynchronously: channel) | ||
} | ||
}.get() | ||
|
||
|
@@ -106,39 +157,41 @@ public actor MemcacheConnection: Service { | |
requestContinuation: continuation | ||
) | ||
|
||
var iterator = channel.inboundStream.makeAsyncIterator() | ||
switch self.state { | ||
case .running(_, let channel, let requestStream, let requestContinuation): | ||
for await (request, continuation) in requestStream { | ||
do { | ||
try await channel.outboundWriter.write(request) | ||
let responseBuffer = try await iterator.next() | ||
|
||
if let response = responseBuffer { | ||
continuation.resume(returning: response) | ||
} else { | ||
self.state = .finished | ||
requestContinuation.finish() | ||
continuation.resume(throwing: MemcacheError( | ||
code: .connectionShutdown, | ||
message: "The connection to the Memcache server was unexpectedly closed.", | ||
cause: nil, | ||
location: .here() | ||
)) | ||
} | ||
} catch { | ||
switch self.state { | ||
case .running: | ||
self.state = .finished | ||
requestContinuation.finish() | ||
continuation.resume(throwing: MemcacheError( | ||
code: .connectionShutdown, | ||
message: "The connection to the Memcache server has shut down while processing a request.", | ||
cause: error, | ||
location: .here() | ||
)) | ||
case .initial, .finished: | ||
break | ||
try await channel.executeThenClose { inbound, outbound in | ||
var inboundIterator = inbound.makeAsyncIterator() | ||
for await (request, continuation) in requestStream { | ||
do { | ||
try await outbound.write(request) | ||
let responseBuffer = try await inboundIterator.next() | ||
|
||
if let response = responseBuffer { | ||
continuation.resume(returning: response) | ||
} else { | ||
self.state = .finished | ||
requestContinuation.finish() | ||
continuation.resume(throwing: MemcacheError( | ||
code: .connectionShutdown, | ||
message: "The connection to the Memcache server was unexpectedly closed.", | ||
cause: nil, | ||
location: .here() | ||
)) | ||
} | ||
} catch { | ||
switch self.state { | ||
case .running: | ||
self.state = .finished | ||
requestContinuation.finish() | ||
continuation.resume(throwing: MemcacheError( | ||
code: .connectionShutdown, | ||
message: "The connection to the Memcache server has shut down while processing a request.", | ||
cause: error, | ||
location: .here() | ||
)) | ||
case .initial, .finished: | ||
break | ||
} | ||
} | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this can be marked as nonisolated.