diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 0000000..08891d8 --- /dev/null +++ b/.editorconfig @@ -0,0 +1,8 @@ +root = true + +[*] +indent_style = space +indent_size = 4 +end_of_line = lf +insert_final_newline = true +trim_trailing_whitespace = true \ No newline at end of file diff --git a/.github/workflows/pull_request.yml b/.github/workflows/pull_request.yml index 00c0baf..af6641d 100644 --- a/.github/workflows/pull_request.yml +++ b/.github/workflows/pull_request.yml @@ -11,7 +11,6 @@ jobs: with: license_header_check_project_name: "Swift Cassandra Client" api_breakage_check_enabled: false - format_check_enabled: false unit-tests: name: Unit Tests diff --git a/.swift-format b/.swift-format new file mode 100644 index 0000000..7fa06fb --- /dev/null +++ b/.swift-format @@ -0,0 +1,62 @@ +{ + "version" : 1, + "indentation" : { + "spaces" : 4 + }, + "tabWidth" : 4, + "fileScopedDeclarationPrivacy" : { + "accessLevel" : "private" + }, + "spacesAroundRangeFormationOperators" : false, + "indentConditionalCompilationBlocks" : false, + "indentSwitchCaseLabels" : false, + "lineBreakAroundMultilineExpressionChainComponents" : false, + "lineBreakBeforeControlFlowKeywords" : false, + "lineBreakBeforeEachArgument" : true, + "lineBreakBeforeEachGenericRequirement" : true, + "lineLength" : 120, + "maximumBlankLines" : 1, + "respectsExistingLineBreaks" : true, + "prioritizeKeepingFunctionOutputTogether" : true, + "rules" : { + "AllPublicDeclarationsHaveDocumentation" : false, + "AlwaysUseLiteralForEmptyCollectionInit" : false, + "AlwaysUseLowerCamelCase" : false, + "AmbiguousTrailingClosureOverload" : true, + "BeginDocumentationCommentWithOneLineSummary" : false, + "DoNotUseSemicolons" : true, + "DontRepeatTypeInStaticProperties" : true, + "FileScopedDeclarationPrivacy" : true, + "FullyIndirectEnum" : true, + "GroupNumericLiterals" : true, + "IdentifiersMustBeASCII" : true, + "NeverForceUnwrap" : false, + "NeverUseForceTry" : false, + "NeverUseImplicitlyUnwrappedOptionals" : false, + "NoAccessLevelOnExtensionDeclaration" : true, + "NoAssignmentInExpressions" : true, + "NoBlockComments" : true, + "NoCasesWithOnlyFallthrough" : true, + "NoEmptyTrailingClosureParentheses" : true, + "NoLabelsInCasePatterns" : true, + "NoLeadingUnderscores" : false, + "NoParensAroundConditions" : true, + "NoVoidReturnOnFunctionSignature" : true, + "OmitExplicitReturns" : true, + "OneCasePerLine" : true, + "OneVariableDeclarationPerLine" : true, + "OnlyOneTrailingClosureArgument" : true, + "OrderedImports" : true, + "ReplaceForEachWithForLoop" : true, + "ReturnVoidInsteadOfEmptyTuple" : true, + "UseEarlyExits" : false, + "UseExplicitNilCheckInConditions" : false, + "UseLetInEveryBoundCaseVariable" : false, + "UseShorthandTypeNames" : true, + "UseSingleLinePropertyGetter" : false, + "UseSynthesizedInitializer" : false, + "UseTripleSlashForDocumentationComments" : true, + "UseWhereClausesInForLoops" : false, + "ValidateDocumentationComments" : false + } +} diff --git a/.swiftformat b/.swiftformat deleted file mode 100644 index 67c4de3..0000000 --- a/.swiftformat +++ /dev/null @@ -1,21 +0,0 @@ ---swiftversion 5.2 - -# file options - ---exclude .build,**/CDataStaxDriver,**/Clibuv - -# format options - ---ifdef no-indent ---indent 4 ---extensionacl on-declarations ---patternlet inline ---self insert ---stripunusedargs closure-only ---wraparguments before-first - -# rules - ---disable blankLinesAroundMark ---disable wrapMultilineStatementBraces - diff --git a/Package.swift b/Package.swift index b8c1e5f..8e9b032 100644 --- a/Package.swift +++ b/Package.swift @@ -1,7 +1,9 @@ // swift-tools-version:5.6 -import class Foundation.FileManager + import PackageDescription +import class Foundation.FileManager + // Compute libuv sources to exclude var libuvExclude = [ "./libuv/src/unix/aix-common.c", @@ -58,7 +60,8 @@ var datastaxExclude = [ "./datastax-cpp-driver/src/wktgen.sh", "./datastax-cpp-driver/src/gssapi", "./datastax-cpp-driver/src/ssl/ssl_no_impl.cpp", - "./datastax-cpp-driver/src/ssl/ssl_openssl_impl.cpp", // See ./custom/src/ssl/ssl_openssl_impl.cpp + // See ./custom/src/ssl/ssl_openssl_impl.cpp + "./datastax-cpp-driver/src/ssl/ssl_openssl_impl.cpp", "./datastax-cpp-driver/src/third_party/curl/CMakeLists.txt", "./datastax-cpp-driver/src/third_party/curl/COPYING", "./datastax-cpp-driver/src/third_party/hdr_histogram/CMakeLists.txt", @@ -82,7 +85,9 @@ var datastaxExclude = [ ] do { - if !(try FileManager.default.contentsOfDirectory(atPath: "./Sources/CDataStaxDriver/datastax-cpp-driver/src/third_party/sparsehash/CMakeFiles").isEmpty) { + if !(try FileManager.default.contentsOfDirectory( + atPath: "./Sources/CDataStaxDriver/datastax-cpp-driver/src/third_party/sparsehash/CMakeFiles" + ).isEmpty) { datastaxExclude.append("./datastax-cpp-driver/src/third_party/sparsehash/CMakeFiles/") } } catch { @@ -92,7 +97,7 @@ do { let package = Package( name: "swift-cassandra-client", products: [ - .library(name: "CassandraClient", targets: ["CassandraClient"]), + .library(name: "CassandraClient", targets: ["CassandraClient"]) ], dependencies: [ .package(url: "https://github.com/apple/swift-nio", .upToNextMajor(from: "2.41.1")), @@ -121,7 +126,7 @@ let package = Package( ], cSettings: [ .headerSearchPath("./libuv/src"), - .define("_GNU_SOURCE", to: "1"), // required to fix "undefined CPU_COUNT" error + .define("_GNU_SOURCE", to: "1"), // required to fix "undefined CPU_COUNT" error ] ), @@ -138,7 +143,9 @@ let package = Package( ], publicHeadersPath: "./datastax-cpp-driver/include", cxxSettings: [ - .define("HAVE_TIMERFD", .when(platforms: [.linux])), // this is available on all modern Linux systems, and is needed for efficient MicroTimer implementation. Otherwise busy waits are used. + // This is available on all modern Linux systems, and is needed for efficient + // MicroTimer implementation. Otherwise busy waits are used. + .define("HAVE_TIMERFD", .when(platforms: [.linux])), .headerSearchPath("./custom/include"), .headerSearchPath("./extras"), .headerSearchPath("./datastax-cpp-driver/src"), @@ -147,13 +154,16 @@ let package = Package( ] ), - .target(name: "CassandraClient", dependencies: [ - "CDataStaxDriver", - .product(name: "NIO", package: "swift-nio"), - .product(name: "NIOCore", package: "swift-nio"), - .product(name: "Atomics", package: "swift-atomics"), - .product(name: "Logging", package: "swift-log"), - ]), + .target( + name: "CassandraClient", + dependencies: [ + "CDataStaxDriver", + .product(name: "NIO", package: "swift-nio"), + .product(name: "NIOCore", package: "swift-nio"), + .product(name: "Atomics", package: "swift-atomics"), + .product(name: "Logging", package: "swift-log"), + ] + ), .testTarget(name: "CassandraClientTests", dependencies: ["CassandraClient"]), ], diff --git a/Package@swift-5.4.swift b/Package@swift-5.4.swift index fde3b30..65b8c17 100644 --- a/Package@swift-5.4.swift +++ b/Package@swift-5.4.swift @@ -1,7 +1,9 @@ // swift-tools-version:5.4 -import class Foundation.FileManager + import PackageDescription +import class Foundation.FileManager + // Compute libuv sources to exclude var libuvExclude = [ "./libuv/src/unix/aix-common.c", @@ -58,7 +60,8 @@ var datastaxExclude = [ "./datastax-cpp-driver/src/wktgen.sh", "./datastax-cpp-driver/src/gssapi", "./datastax-cpp-driver/src/ssl/ssl_no_impl.cpp", - "./datastax-cpp-driver/src/ssl/ssl_openssl_impl.cpp", // See ./custom/src/ssl/ssl_openssl_impl.cpp + // See ./custom/src/ssl/ssl_openssl_impl.cpp + "./datastax-cpp-driver/src/ssl/ssl_openssl_impl.cpp", "./datastax-cpp-driver/src/third_party/curl/CMakeLists.txt", "./datastax-cpp-driver/src/third_party/curl/COPYING", "./datastax-cpp-driver/src/third_party/hdr_histogram/CMakeLists.txt", @@ -82,7 +85,9 @@ var datastaxExclude = [ ] do { - if !(try FileManager.default.contentsOfDirectory(atPath: "./Sources/CDataStaxDriver/datastax-cpp-driver/src/third_party/sparsehash/CMakeFiles").isEmpty) { + if !(try FileManager.default.contentsOfDirectory( + atPath: "./Sources/CDataStaxDriver/datastax-cpp-driver/src/third_party/sparsehash/CMakeFiles" + ).isEmpty) { datastaxExclude.append("./datastax-cpp-driver/src/third_party/sparsehash/CMakeFiles/") } } catch { @@ -92,7 +97,7 @@ do { let package = Package( name: "swift-cassandra-client", products: [ - .library(name: "CassandraClient", targets: ["CassandraClient"]), + .library(name: "CassandraClient", targets: ["CassandraClient"]) ], dependencies: [ .package(url: "https://github.com/apple/swift-nio", .upToNextMinor(from: "2.41.1")), @@ -121,7 +126,7 @@ let package = Package( ], cSettings: [ .headerSearchPath("./libuv/src"), - .define("_GNU_SOURCE", to: "1"), // required to fix "undefined CPU_COUNT" error + .define("_GNU_SOURCE", to: "1"), // required to fix "undefined CPU_COUNT" error ] ), @@ -146,13 +151,16 @@ let package = Package( ] ), - .target(name: "CassandraClient", dependencies: [ - "CDataStaxDriver", - .product(name: "NIO", package: "swift-nio"), - .product(name: "NIOCore", package: "swift-nio"), - .product(name: "Atomics", package: "swift-atomics"), - .product(name: "Logging", package: "swift-log"), - ]), + .target( + name: "CassandraClient", + dependencies: [ + "CDataStaxDriver", + .product(name: "NIO", package: "swift-nio"), + .product(name: "NIOCore", package: "swift-nio"), + .product(name: "Atomics", package: "swift-atomics"), + .product(name: "Logging", package: "swift-log"), + ] + ), .testTarget(name: "CassandraClientTests", dependencies: ["CassandraClient"]), ], diff --git a/Package@swift-5.5.swift b/Package@swift-5.5.swift index fde3b30..65b8c17 100644 --- a/Package@swift-5.5.swift +++ b/Package@swift-5.5.swift @@ -1,7 +1,9 @@ // swift-tools-version:5.4 -import class Foundation.FileManager + import PackageDescription +import class Foundation.FileManager + // Compute libuv sources to exclude var libuvExclude = [ "./libuv/src/unix/aix-common.c", @@ -58,7 +60,8 @@ var datastaxExclude = [ "./datastax-cpp-driver/src/wktgen.sh", "./datastax-cpp-driver/src/gssapi", "./datastax-cpp-driver/src/ssl/ssl_no_impl.cpp", - "./datastax-cpp-driver/src/ssl/ssl_openssl_impl.cpp", // See ./custom/src/ssl/ssl_openssl_impl.cpp + // See ./custom/src/ssl/ssl_openssl_impl.cpp + "./datastax-cpp-driver/src/ssl/ssl_openssl_impl.cpp", "./datastax-cpp-driver/src/third_party/curl/CMakeLists.txt", "./datastax-cpp-driver/src/third_party/curl/COPYING", "./datastax-cpp-driver/src/third_party/hdr_histogram/CMakeLists.txt", @@ -82,7 +85,9 @@ var datastaxExclude = [ ] do { - if !(try FileManager.default.contentsOfDirectory(atPath: "./Sources/CDataStaxDriver/datastax-cpp-driver/src/third_party/sparsehash/CMakeFiles").isEmpty) { + if !(try FileManager.default.contentsOfDirectory( + atPath: "./Sources/CDataStaxDriver/datastax-cpp-driver/src/third_party/sparsehash/CMakeFiles" + ).isEmpty) { datastaxExclude.append("./datastax-cpp-driver/src/third_party/sparsehash/CMakeFiles/") } } catch { @@ -92,7 +97,7 @@ do { let package = Package( name: "swift-cassandra-client", products: [ - .library(name: "CassandraClient", targets: ["CassandraClient"]), + .library(name: "CassandraClient", targets: ["CassandraClient"]) ], dependencies: [ .package(url: "https://github.com/apple/swift-nio", .upToNextMinor(from: "2.41.1")), @@ -121,7 +126,7 @@ let package = Package( ], cSettings: [ .headerSearchPath("./libuv/src"), - .define("_GNU_SOURCE", to: "1"), // required to fix "undefined CPU_COUNT" error + .define("_GNU_SOURCE", to: "1"), // required to fix "undefined CPU_COUNT" error ] ), @@ -146,13 +151,16 @@ let package = Package( ] ), - .target(name: "CassandraClient", dependencies: [ - "CDataStaxDriver", - .product(name: "NIO", package: "swift-nio"), - .product(name: "NIOCore", package: "swift-nio"), - .product(name: "Atomics", package: "swift-atomics"), - .product(name: "Logging", package: "swift-log"), - ]), + .target( + name: "CassandraClient", + dependencies: [ + "CDataStaxDriver", + .product(name: "NIO", package: "swift-nio"), + .product(name: "NIOCore", package: "swift-nio"), + .product(name: "Atomics", package: "swift-atomics"), + .product(name: "Logging", package: "swift-log"), + ] + ), .testTarget(name: "CassandraClientTests", dependencies: ["CassandraClient"]), ], diff --git a/Sources/CassandraClient/CassandraClient.swift b/Sources/CassandraClient/CassandraClient.swift index eb43e26..667397d 100644 --- a/Sources/CassandraClient/CassandraClient.swift +++ b/Sources/CassandraClient/CassandraClient.swift @@ -37,27 +37,42 @@ public class CassandraClient: CassandraSession { /// - eventLoopGroupProvider: The ``EventLoopGroupProvider`` to use, uses ``EventLoopGroupProvider/createNew`` strategy by default. /// - configuration: The client's ``Configuration``. /// - logger: The client's default `Logger`. - public init(eventLoopGroupProvider: EventLoopGroupProvider = .createNew, configuration: Configuration, logger: Logger? = nil) { + public init( + eventLoopGroupProvider: EventLoopGroupProvider = .createNew, + configuration: Configuration, + logger: Logger? = nil + ) { self.configuration = configuration self.logger = logger ?? Logger(label: "com.apple.cassandra") switch eventLoopGroupProvider { case .createNew: - self.eventLoopGroupContainer = (value: MultiThreadedEventLoopGroup(numberOfThreads: System.coreCount), managed: true) + self.eventLoopGroupContainer = ( + value: MultiThreadedEventLoopGroup(numberOfThreads: System.coreCount), managed: true + ) case .shared(let eventLoopGroup): self.eventLoopGroupContainer = (value: eventLoopGroup, managed: false) } - self.defaultSession = Session(configuration: self.configuration, logger: self.logger, eventLoopGroupContainer: self.eventLoopGroupContainer) + self.defaultSession = Session( + configuration: self.configuration, + logger: self.logger, + eventLoopGroupContainer: self.eventLoopGroupContainer + ) } deinit { - precondition(self.isShutdown.load(ordering: .relaxed), "Client not shut down before the deinit. Please call client.shutdown() when no longer needed.") + precondition( + self.isShutdown.load(ordering: .relaxed), + "Client not shut down before the deinit. Please call client.shutdown() when no longer needed." + ) } /// Shutdown the client. /// /// - Note: It is required to call this method before terminating the program. `CassandraClient` will assert it was cleanly shut down as part of its deinitializer. public func shutdown() throws { - if !self.isShutdown.compareExchange(expected: false, desired: true, ordering: .relaxed).exchanged { + if !self.isShutdown.compareExchange(expected: false, desired: true, ordering: .relaxed) + .exchanged + { return } @@ -90,7 +105,13 @@ public class CassandraClient: CassandraSession { /// - logger: If `nil`, the client's default `Logger` is used. /// /// - Returns: The resulting ``Rows``. - public func execute(statement: Statement, on eventLoop: EventLoop?, logger: Logger? = .none) -> EventLoopFuture { + public func execute( + statement: Statement, + on eventLoop: EventLoop?, + logger: Logger? = .none + ) + -> EventLoopFuture + { self.defaultSession.execute(statement: statement, on: eventLoop, logger: logger) } @@ -105,8 +126,18 @@ public class CassandraClient: CassandraSession { /// - logger: If `nil`, the client's default `Logger` is used. /// /// - Returns: The ``PaginatedRows``. - public func execute(statement: Statement, pageSize: Int32, on eventLoop: EventLoop?, logger: Logger? = .none) -> EventLoopFuture { - self.defaultSession.execute(statement: statement, pageSize: pageSize, on: eventLoop, logger: logger) + public func execute( + statement: Statement, + pageSize: Int32, + on eventLoop: EventLoop?, + logger: Logger? = .none + ) -> EventLoopFuture { + self.defaultSession.execute( + statement: statement, + pageSize: pageSize, + on: eventLoop, + logger: logger + ) } /// Create a new ``CassandraSession`` that can be used to perform queries on the given or configured keyspace. @@ -120,7 +151,11 @@ public class CassandraClient: CassandraSession { var configuration = self.configuration configuration.keyspace = keyspace let logger = logger ?? self.logger - return Session(configuration: configuration, logger: logger, eventLoopGroupContainer: self.eventLoopGroupContainer) + return Session( + configuration: configuration, + logger: logger, + eventLoopGroupContainer: self.eventLoopGroupContainer + ) } /// Create a new ``CassandraSession`` for the given or configured keyspace then invoke the closure. @@ -129,7 +164,11 @@ public class CassandraClient: CassandraSession { /// - keyspace: If `nil`, the client's default keyspace is used. /// - logger: If `nil`, the client's default `Logger` is used. /// - handler: The closure to invoke, passing in the newly created session. - public func withSession(keyspace: String?, logger: Logger? = .none, handler: (CassandraSession) throws -> Void) rethrows { + public func withSession( + keyspace: String?, + logger: Logger? = .none, + handler: (CassandraSession) throws -> Void + ) rethrows { let session = self.makeSession(keyspace: keyspace, logger: logger) defer { do { @@ -149,7 +188,11 @@ public class CassandraClient: CassandraSession { /// - handler: The closure to invoke, passing in the newly created session. /// /// - Returns: The resulting `EventLoopFuture` of the closure. - public func withSession(keyspace: String?, logger: Logger? = .none, handler: (CassandraSession) -> EventLoopFuture) -> EventLoopFuture { + public func withSession( + keyspace: String?, + logger: Logger? = .none, + handler: (CassandraSession) -> EventLoopFuture + ) -> EventLoopFuture { let session = self.makeSession(keyspace: keyspace, logger: logger) return handler(session).always { _ in do { @@ -201,8 +244,18 @@ extension CassandraClient { /// /// - Returns: The ``PaginatedRows``. @available(macOS 12, iOS 15, tvOS 15, watchOS 8, *) - public func execute(statement: Statement, pageSize: Int32, logger: Logger? = .none) async throws -> PaginatedRows { - try await self.defaultSession.execute(statement: statement, pageSize: pageSize, logger: logger) + public func execute( + statement: Statement, + pageSize: Int32, + logger: Logger? = .none + ) async throws + -> PaginatedRows + { + try await self.defaultSession.execute( + statement: statement, + pageSize: pageSize, + logger: logger + ) } /// Create a new ``CassandraSession`` for the given or configured keyspace then invoke the closure. @@ -212,7 +265,11 @@ extension CassandraClient { /// - logger: If `nil`, the client's default `Logger` is used. /// - closure: The closure to invoke, passing in the newly created session. @available(macOS 12, iOS 15, tvOS 15, watchOS 8, *) - public func withSession(keyspace: String?, logger: Logger? = .none, closure: (CassandraSession) async throws -> Void) async throws { + public func withSession( + keyspace: String?, + logger: Logger? = .none, + closure: (CassandraSession) async throws -> Void + ) async throws { let session = self.makeSession(keyspace: keyspace, logger: logger) defer { do { @@ -233,7 +290,11 @@ extension CassandraClient { /// /// - Returns: The result of the closure. @available(macOS 12, iOS 15, tvOS 15, watchOS 8, *) - public func withSession(keyspace: String?, logger: Logger? = .none, handler: (CassandraSession) async throws -> T) async throws -> T { + public func withSession( + keyspace: String?, + logger: Logger? = .none, + handler: (CassandraSession) async throws -> T + ) async throws -> T { let session = self.makeSession(keyspace: keyspace, logger: logger) defer { do { diff --git a/Sources/CassandraClient/Configuration.swift b/Sources/CassandraClient/Configuration.swift index f58f01c..051bfc4 100644 --- a/Sources/CassandraClient/Configuration.swift +++ b/Sources/CassandraClient/Configuration.swift @@ -70,7 +70,8 @@ extension CassandraClient { } public init( - contactPointsProvider: @escaping (@escaping (Result) -> Void) -> Void, + contactPointsProvider: @escaping (@escaping (Result) -> Void) -> + Void, port: Int32, protocolVersion: ProtocolVersion ) { @@ -121,7 +122,9 @@ extension CassandraClient { private func makeCluster(contactPoints: ContactPoints) throws -> Cluster { let cluster = Cluster() - try contactPoints.forEach { try cluster.addContactPoint($0) } + for contactPoint in contactPoints { + try cluster.addContactPoint(contactPoint) + } try cluster.setPort(self.port) try cluster.setProtocolVersion(self.protocolVersion.rawValue) @@ -169,7 +172,10 @@ extension CassandraClient { } switch self.speculativeExecutionPolicy { case .constant(let delayInMillseconds, let maxExecutions): - try cluster.setConstantSpeculativeExecutionPolicy(delayInMillseconds: delayInMillseconds, maxExecutions: maxExecutions) + try cluster.setConstantSpeculativeExecutionPolicy( + delayInMillseconds: delayInMillseconds, + maxExecutions: maxExecutions + ) case .disabled: try cluster.setNoSpeculativeExecutionPolicy() case .none: @@ -254,7 +260,9 @@ internal final class Cluster { } func setCoreConnectionsPerHost(_ numberOfConnection: UInt32) throws { - try self.checkResult { cass_cluster_set_core_connections_per_host(self.rawPointer, numberOfConnection) } + try self.checkResult { + cass_cluster_set_core_connections_per_host(self.rawPointer, numberOfConnection) + } } func setTcpNodelay(_ enabled: Bool) throws { @@ -262,7 +270,11 @@ internal final class Cluster { } func setTcpKeepalive(_ enabled: Bool, delayInSeconds: UInt32) throws { - cass_cluster_set_tcp_keepalive(self.rawPointer, enabled ? cass_true : cass_false, delayInSeconds) + cass_cluster_set_tcp_keepalive( + self.rawPointer, + enabled ? cass_true : cass_false, + delayInSeconds + ) } func setConnectionHeartbeatInterval(_ seconds: UInt32) throws { @@ -278,15 +290,28 @@ internal final class Cluster { } func setUseHostnameResolution(_ enabled: Bool) throws { - try self.checkResult { cass_cluster_set_use_hostname_resolution(self.rawPointer, enabled ? cass_true : cass_false) } + try self.checkResult { + cass_cluster_set_use_hostname_resolution(self.rawPointer, enabled ? cass_true : cass_false) + } } func setUseRandomizedContactPoints(_ enabled: Bool) throws { - try self.checkResult { cass_cluster_set_use_randomized_contact_points(self.rawPointer, enabled ? cass_true : cass_false) } + try self.checkResult { + cass_cluster_set_use_randomized_contact_points( + self.rawPointer, + enabled ? cass_true : cass_false + ) + } } func setConstantSpeculativeExecutionPolicy(delayInMillseconds: Int64, maxExecutions: Int32) throws { - try self.checkResult { cass_cluster_set_constant_speculative_execution_policy(self.rawPointer, cass_int64_t(delayInMillseconds), maxExecutions) } + try self.checkResult { + cass_cluster_set_constant_speculative_execution_policy( + self.rawPointer, + cass_int64_t(delayInMillseconds), + maxExecutions + ) + } } func setNoSpeculativeExecutionPolicy() throws { @@ -294,15 +319,21 @@ internal final class Cluster { } func setPrepareOnAllHosts(_ enabled: Bool) throws { - try self.checkResult { cass_cluster_set_prepare_on_all_hosts(self.rawPointer, enabled ? cass_true : cass_false) } + try self.checkResult { + cass_cluster_set_prepare_on_all_hosts(self.rawPointer, enabled ? cass_true : cass_false) + } } func setPrepareOnUpOrAddHost(_ enabled: Bool) throws { - try self.checkResult { cass_cluster_set_prepare_on_up_or_add_host(self.rawPointer, enabled ? cass_true : cass_false) } + try self.checkResult { + cass_cluster_set_prepare_on_up_or_add_host(self.rawPointer, enabled ? cass_true : cass_false) + } } func setNoCompact(_ enabled: Bool) throws { - try self.checkResult { cass_cluster_set_no_compact(self.rawPointer, enabled ? cass_true : cass_false) } + try self.checkResult { + cass_cluster_set_no_compact(self.rawPointer, enabled ? cass_true : cass_false) + } } func setConsistency(_ consistency: CassConsistency) throws { @@ -368,7 +399,7 @@ extension CassandraClient.Configuration { case .peerIdentityDNS: sslContext.setVerifyFlags(CASS_SSL_VERIFY_PEER_IDENTITY_DNS) case .default: - () // use DataStax driver's default + () // use DataStax driver's default } if let cert = self.cert { diff --git a/Sources/CassandraClient/Data+PaginatedRows.swift b/Sources/CassandraClient/Data+PaginatedRows.swift index 277658d..11584c1 100644 --- a/Sources/CassandraClient/Data+PaginatedRows.swift +++ b/Sources/CassandraClient/Data+PaginatedRows.swift @@ -44,7 +44,11 @@ extension CassandraClient { return eventLoop.makeFailedFuture(CassandraClient.Error.rowsExhausted) } - let future = self.session.execute(statement: self.statement, on: eventLoop, logger: self.logger) + let future = self.session.execute( + statement: self.statement, + on: eventLoop, + logger: self.logger + ) future.whenComplete { result in switch result { case .success(let rows): @@ -62,7 +66,10 @@ extension CassandraClient { /// Iterates through all rows in all pages and invokes the given closure on each. @available(*, deprecated, message: "Use Swift Concurrency and AsyncSequence APIs instead.") public func forEach(_ body: @escaping (Row) throws -> Void) -> EventLoopFuture { - precondition(self.hasMorePages, "Only one of 'forEach' or 'map' can be called once per PaginatedRows") + precondition( + self.hasMorePages, + "Only one of 'forEach' or 'map' can be called once per PaginatedRows" + ) guard let eventLoop = self.eventLoop else { preconditionFailure("EventLoop must not be nil") @@ -88,7 +95,10 @@ extension CassandraClient { /// Iterates through all rows in all pages and applies `transform` on each. @available(*, deprecated, message: "Use Swift Concurrency and AsyncSequence APIs instead.") public func map(_ transform: @escaping (Row) throws -> T) -> EventLoopFuture<[T]> { - precondition(self.hasMorePages, "Only one of 'forEach' or 'map' can be called once per PaginatedRows") + precondition( + self.hasMorePages, + "Only one of 'forEach' or 'map' can be called once per PaginatedRows" + ) guard let eventLoop = self.eventLoop else { preconditionFailure("EventLoop must not be nil in EventLoop based APIs") @@ -144,7 +154,10 @@ extension CassandraClient { @available(macOS 12, iOS 15, tvOS 15, watchOS 8, *) @available(*, deprecated, message: "Use AsyncSequence APIs instead.") public func forEach(_ body: @escaping (Row) throws -> Void) async throws { - precondition(self.hasMorePages, "Only one of 'forEach' or 'map' can be called once per PaginatedRows") + precondition( + self.hasMorePages, + "Only one of 'forEach' or 'map' can be called once per PaginatedRows" + ) func _forEach() async throws { let rows = try await nextPage() @@ -162,7 +175,10 @@ extension CassandraClient { @available(macOS 12, iOS 15, tvOS 15, watchOS 8, *) @available(*, deprecated, message: "Use AsyncSequence APIs instead.") public func map(_ transform: @escaping (Row) throws -> T) async throws -> [T] { - precondition(self.hasMorePages, "Only one of 'forEach' or 'map' can be called once per PaginatedRows") + precondition( + self.hasMorePages, + "Only one of 'forEach' or 'map' can be called once per PaginatedRows" + ) func _map(_ accumulated: [T]) async throws -> [T] { let rows = try await nextPage() diff --git a/Sources/CassandraClient/Data.swift b/Sources/CassandraClient/Data.swift index 1ff3a19..19a295f 100644 --- a/Sources/CassandraClient/Data.swift +++ b/Sources/CassandraClient/Data.swift @@ -162,10 +162,6 @@ extension CassandraClient { public struct OpaquePagingStateToken: PagingStateToken { let token: [UInt8] - internal init(token: [UInt8]) { - self.token = token - } - public func withUnsafeBytes(_ body: (UnsafeRawBufferPointer) throws -> R) rethrows -> R { try self.token.withUnsafeBytes(body) } @@ -499,9 +495,13 @@ extension CassUuid { timeAndVersion.append(uuid.1) timeAndVersion.append(uuid.2) timeAndVersion.append(uuid.3) - time_and_version = cass_uint64_t(UInt64(bigEndian: timeAndVersion.withUnsafeBufferPointer { - ($0.baseAddress!.withMemoryRebound(to: UInt64.self, capacity: 1) { $0 }) - }.pointee)) + time_and_version = cass_uint64_t( + UInt64( + bigEndian: timeAndVersion.withUnsafeBufferPointer { + ($0.baseAddress!.withMemoryRebound(to: UInt64.self, capacity: 1) { $0 }) + }.pointee + ) + ) var clockSeqAndNode = [UInt8]() clockSeqAndNode.append(uuid.8) @@ -512,9 +512,13 @@ extension CassUuid { clockSeqAndNode.append(uuid.13) clockSeqAndNode.append(uuid.14) clockSeqAndNode.append(uuid.15) - clock_seq_and_node = cass_uint64_t(UInt64(bigEndian: clockSeqAndNode.withUnsafeBufferPointer { - ($0.baseAddress!.withMemoryRebound(to: UInt64.self, capacity: 1) { $0 }) - }.pointee)) + clock_seq_and_node = cass_uint64_t( + UInt64( + bigEndian: clockSeqAndNode.withUnsafeBufferPointer { + ($0.baseAddress!.withMemoryRebound(to: UInt64.self, capacity: 1) { $0 }) + }.pointee + ) + ) } internal func uuid() -> Foundation.UUID { diff --git a/Sources/CassandraClient/Decoding.swift b/Sources/CassandraClient/Decoding.swift index d46f2fb..bb98954 100644 --- a/Sources/CassandraClient/Decoding.swift +++ b/Sources/CassandraClient/Decoding.swift @@ -12,7 +12,7 @@ // //===----------------------------------------------------------------------===// -import Foundation // for date and uuid +import Foundation // for date and uuid extension CassandraClient { internal struct RowDecoder: Decoder { @@ -66,42 +66,54 @@ extension CassandraClient { public func decode(_: Bool.Type, forKey key: Key) throws -> Bool { guard let value: Bool = row.column(key.stringValue) else { - throw DecodingError.typeMismatch("value for \(key.stringValue) not found or of incorrect data type.") + throw DecodingError.typeMismatch( + "value for \(key.stringValue) not found or of incorrect data type." + ) } return value } public func decode(_: Int.Type, forKey key: Key) throws -> Int { guard let value: Int32 = row.column(key.stringValue) else { - throw DecodingError.typeMismatch("value for \(key.stringValue) not found or of incorrect data type.") + throw DecodingError.typeMismatch( + "value for \(key.stringValue) not found or of incorrect data type." + ) } - return Int(value) // will always fit since storage is 32 + return Int(value) // will always fit since storage is 32 } public func decode(_: Int8.Type, forKey key: Key) throws -> Int8 { guard let value: Int8 = row.column(key.stringValue) else { - throw DecodingError.typeMismatch("value for \(key.stringValue) not found or of incorrect data type.") + throw DecodingError.typeMismatch( + "value for \(key.stringValue) not found or of incorrect data type." + ) } return value } public func decode(_: Int16.Type, forKey key: Key) throws -> Int16 { guard let value: Int16 = row.column(key.stringValue) else { - throw DecodingError.typeMismatch("value for \(key.stringValue) not found or of incorrect data type.") + throw DecodingError.typeMismatch( + "value for \(key.stringValue) not found or of incorrect data type." + ) } return value } public func decode(_: Int32.Type, forKey key: Key) throws -> Int32 { guard let value: Int32 = row.column(key.stringValue) else { - throw DecodingError.typeMismatch("value for \(key.stringValue) not found or of incorrect data type.") + throw DecodingError.typeMismatch( + "value for \(key.stringValue) not found or of incorrect data type." + ) } return value } public func decode(_: Int64.Type, forKey key: Key) throws -> Int64 { guard let value: Int64 = row.column(key.stringValue) else { - throw DecodingError.typeMismatch("value for \(key.stringValue) not found or of incorrect data type.") + throw DecodingError.typeMismatch( + "value for \(key.stringValue) not found or of incorrect data type." + ) } return value } @@ -128,21 +140,27 @@ extension CassandraClient { public func decode(_: Float.Type, forKey key: Key) throws -> Float { guard let value: Float32 = row.column(key.stringValue) else { - throw DecodingError.typeMismatch("value for \(key.stringValue) not found or of incorrect data type.") + throw DecodingError.typeMismatch( + "value for \(key.stringValue) not found or of incorrect data type." + ) } return value } public func decode(_: Double.Type, forKey key: Key) throws -> Double { guard let value: Double = row.column(key.stringValue) else { - throw DecodingError.typeMismatch("value for \(key.stringValue) not found or of incorrect data type.") + throw DecodingError.typeMismatch( + "value for \(key.stringValue) not found or of incorrect data type." + ) } return value } public func decode(_: String.Type, forKey key: Key) throws -> String { guard let value: String = row.column(key.stringValue) else { - throw DecodingError.typeMismatch("value for \(key.stringValue) not found or of incorrect data type.") + throw DecodingError.typeMismatch( + "value for \(key.stringValue) not found or of incorrect data type." + ) } return value } @@ -151,57 +169,79 @@ extension CassandraClient { public func decode(_ type: T.Type, forKey key: Key) throws -> T { if type == [UInt8].self { guard let value: [UInt8] = row.column(key.stringValue) else { - throw DecodingError.typeMismatch("value for \(key.stringValue) not found or of incorrect data type.") + throw DecodingError.typeMismatch( + "value for \(key.stringValue) not found or of incorrect data type." + ) } return value as! T } else if type == Foundation.Date.self { guard let value: Int64 = row.column(key.stringValue)?.timestamp else { - throw DecodingError.typeMismatch("value for \(key.stringValue) not found or of incorrect data type.") + throw DecodingError.typeMismatch( + "value for \(key.stringValue) not found or of incorrect data type." + ) } return Foundation.Date(timeIntervalSince1970: Double(value) / 1000) as! T } else if type == Foundation.UUID.self { guard let value: Foundation.UUID = row.column(key.stringValue)?.uuid else { - throw DecodingError.typeMismatch("value for \(key.stringValue) not found or of incorrect data type.") + throw DecodingError.typeMismatch( + "value for \(key.stringValue) not found or of incorrect data type." + ) } return value as! T } else if type == TimeBasedUUID.self { guard let value: TimeBasedUUID = row.column(key.stringValue)?.timeuuid else { - throw DecodingError.typeMismatch("value for \(key.stringValue) not found or of incorrect data type.") + throw DecodingError.typeMismatch( + "value for \(key.stringValue) not found or of incorrect data type." + ) } return value as! T } else if type == [Int8].self { guard let value: [Int8] = row.column(key.stringValue) else { - throw DecodingError.typeMismatch("value for \(key.stringValue) not found or of incorrect data type.") + throw DecodingError.typeMismatch( + "value for \(key.stringValue) not found or of incorrect data type." + ) } return value as! T } else if type == [Int16].self { guard let value: [Int16] = row.column(key.stringValue) else { - throw DecodingError.typeMismatch("value for \(key.stringValue) not found or of incorrect data type.") + throw DecodingError.typeMismatch( + "value for \(key.stringValue) not found or of incorrect data type." + ) } return value as! T } else if type == [Int32].self { guard let value: [Int32] = row.column(key.stringValue) else { - throw DecodingError.typeMismatch("value for \(key.stringValue) not found or of incorrect data type.") + throw DecodingError.typeMismatch( + "value for \(key.stringValue) not found or of incorrect data type." + ) } return value as! T } else if type == [Int64].self { guard let value: [Int64] = row.column(key.stringValue) else { - throw DecodingError.typeMismatch("value for \(key.stringValue) not found or of incorrect data type.") + throw DecodingError.typeMismatch( + "value for \(key.stringValue) not found or of incorrect data type." + ) } return value as! T } else if type == [Float32].self { guard let value: [Float32] = row.column(key.stringValue) else { - throw DecodingError.typeMismatch("value for \(key.stringValue) not found or of incorrect data type.") + throw DecodingError.typeMismatch( + "value for \(key.stringValue) not found or of incorrect data type." + ) } return value as! T } else if type == [Double].self { guard let value: [Double] = row.column(key.stringValue) else { - throw DecodingError.typeMismatch("value for \(key.stringValue) not found or of incorrect data type.") + throw DecodingError.typeMismatch( + "value for \(key.stringValue) not found or of incorrect data type." + ) } return value as! T } else if type == [String].self { guard let value: [String] = row.column(key.stringValue) else { - throw DecodingError.typeMismatch("value for \(key.stringValue) not found or of incorrect data type.") + throw DecodingError.typeMismatch( + "value for \(key.stringValue) not found or of incorrect data type." + ) } return value as! T } else { @@ -209,7 +249,12 @@ extension CassandraClient { } } - public func nestedContainer(keyedBy _: NestedKey.Type, forKey _: Key) throws -> KeyedDecodingContainer { + public func nestedContainer( + keyedBy _: NestedKey.Type, + forKey _: Key + ) throws + -> KeyedDecodingContainer + { throw DecodingError.notSupported() } diff --git a/Sources/CassandraClient/Metrics.swift b/Sources/CassandraClient/Metrics.swift index 0a916ee..11213be 100644 --- a/Sources/CassandraClient/Metrics.swift +++ b/Sources/CassandraClient/Metrics.swift @@ -84,7 +84,9 @@ public struct CassandraMetrics: Codable { self.requestsFifteenMinuteRate = metrics.requests.fifteen_minute_rate self.statsTotalConnections = UInt(metrics.stats.total_connections) self.statsAvailableConnections = UInt(metrics.stats.available_connections) - self.statsExceededPendingRequestsWaterMark = UInt(metrics.stats.exceeded_pending_requests_water_mark) + self.statsExceededPendingRequestsWaterMark = UInt( + metrics.stats.exceeded_pending_requests_water_mark + ) self.statsExceededWriteBytesWaterMark = UInt(metrics.stats.exceeded_write_bytes_water_mark) self.errorsConnectionTimeouts = UInt(metrics.errors.connection_timeouts) self.errorsPendingRequestTimeouts = UInt(metrics.errors.pending_request_timeouts) diff --git a/Sources/CassandraClient/Session.swift b/Sources/CassandraClient/Session.swift index 51fa7cf..d18fa9a 100644 --- a/Sources/CassandraClient/Session.swift +++ b/Sources/CassandraClient/Session.swift @@ -17,7 +17,7 @@ import Dispatch import Logging import NIO import NIOConcurrencyHelpers -import NIOCore // for async-await bridge +import NIOCore // for async-await bridge /// API for executing statements against Cassandra. public protocol CassandraSession { @@ -33,7 +33,12 @@ public protocol CassandraSession { /// - logger: The `Logger` to use. Optional. /// /// - Returns: The resulting ``CassandraClient/Rows``. - func execute(statement: CassandraClient.Statement, on eventLoop: EventLoop?, logger: Logger?) -> EventLoopFuture + func execute( + statement: CassandraClient.Statement, + on eventLoop: EventLoop?, + logger: Logger? + ) + -> EventLoopFuture /// Execute a prepared statement. /// @@ -46,7 +51,12 @@ public protocol CassandraSession { /// - logger: The `Logger` to use. Optional. /// /// - Returns: The resulting ``CassandraClient/PaginatedRows``. - func execute(statement: CassandraClient.Statement, pageSize: Int32, on eventLoop: EventLoop?, logger: Logger?) -> EventLoopFuture + func execute( + statement: CassandraClient.Statement, + pageSize: Int32, + on eventLoop: EventLoop?, + logger: Logger? + ) -> EventLoopFuture #if compiler(>=5.5) && canImport(_Concurrency) /// Execute a prepared statement. @@ -59,7 +69,11 @@ public protocol CassandraSession { /// /// - Returns: The resulting ``CassandraClient/Rows``. @available(macOS 12, iOS 15, tvOS 15, watchOS 8, *) - func execute(statement: CassandraClient.Statement, logger: Logger?) async throws -> CassandraClient.Rows + func execute( + statement: CassandraClient.Statement, + logger: Logger? + ) async throws + -> CassandraClient.Rows /// Execute a prepared statement. /// @@ -72,7 +86,12 @@ public protocol CassandraSession { /// /// - Returns: The resulting ``CassandraClient/PaginatedRows``. @available(macOS 12, iOS 15, tvOS 15, watchOS 8, *) - func execute(statement: CassandraClient.Statement, pageSize: Int32, logger: Logger?) async throws -> CassandraClient.PaginatedRows + func execute( + statement: CassandraClient.Statement, + pageSize: Int32, + logger: Logger? + ) + async throws -> CassandraClient.PaginatedRows #endif /// Terminate the session and free resources. @@ -92,7 +111,12 @@ extension CassandraSession { /// - logger: The `Logger` to use. Optional. /// /// - Returns: The resulting ``CassandraClient/Rows``. - internal func execute(statement: CassandraClient.Statement, logger: Logger? = .none) -> EventLoopFuture { + internal func execute( + statement: CassandraClient.Statement, + logger: Logger? = .none + ) + -> EventLoopFuture + { self.execute(statement: statement, on: nil, logger: logger) } } @@ -122,7 +146,8 @@ extension CassandraSession { logger: Logger? = .none, transform: @escaping (CassandraClient.Row) -> T? ) -> EventLoopFuture<[T]> { - self.query(query, parameters: parameters, options: options, on: eventLoop, logger: logger).map { rows in + self.query(query, parameters: parameters, options: options, on: eventLoop, logger: logger).map { + rows in rows.compactMap(transform) } } @@ -137,11 +162,12 @@ extension CassandraSession { on eventLoop: EventLoop? = .none, logger: Logger? = .none ) -> EventLoopFuture<[T]> { - self.query(query, parameters: parameters, options: options, on: eventLoop, logger: logger).flatMapThrowing { rows in - try rows.map { row in - try T(from: CassandraClient.RowDecoder(row: row)) + self.query(query, parameters: parameters, options: options, on: eventLoop, logger: logger) + .flatMapThrowing { rows in + try rows.map { row in + try T(from: CassandraClient.RowDecoder(row: row)) + } } - } } /// Query large data-sets where using an interator helps control memory usage. @@ -159,7 +185,11 @@ extension CassandraSession { logger: Logger? = .none ) -> EventLoopFuture { do { - let statement = try CassandraClient.Statement(query: query, parameters: parameters, options: options) + let statement = try CassandraClient.Statement( + query: query, + parameters: parameters, + options: options + ) return self.execute(statement: statement, on: eventLoop, logger: logger) } catch { let eventLoop = eventLoop ?? eventLoopGroup.next() @@ -179,7 +209,11 @@ extension CassandraSession { logger: Logger? = .none ) -> EventLoopFuture { do { - let statement = try CassandraClient.Statement(query: query, parameters: parameters, options: options) + let statement = try CassandraClient.Statement( + query: query, + parameters: parameters, + options: options + ) return self.execute(statement: statement, pageSize: pageSize, on: eventLoop, logger: logger) } catch { let eventLoop = eventLoop ?? eventLoopGroup.next() @@ -212,7 +246,11 @@ extension CassandraClient { case disconnected } - internal init(configuration: Configuration, logger: Logger, eventLoopGroupContainer: EventLoopGroupConainer) { + internal init( + configuration: Configuration, + logger: Logger, + eventLoopGroupContainer: EventLoopGroupConainer + ) { self.configuration = configuration self.logger = logger self.eventLoopGroupContainer = eventLoopGroupContainer @@ -221,7 +259,9 @@ extension CassandraClient { deinit { guard case .disconnected = (self.lock.withLock { self.state }) else { - preconditionFailure("Session not shut down before the deinit. Please call session.shutdown() when no longer needed.") + preconditionFailure( + "Session not shut down before the deinit. Please call session.shutdown() when no longer needed." + ) } cass_session_free(self.rawPointer) } @@ -240,7 +280,13 @@ extension CassandraClient { } } - func execute(statement: Statement, on eventLoop: EventLoop?, logger: Logger? = .none) -> EventLoopFuture { + func execute( + statement: Statement, + on eventLoop: EventLoop?, + logger: Logger? = .none + ) + -> EventLoopFuture + { let eventLoop = eventLoop ?? self.eventLoopGroup.next() let logger = logger ?? self.logger @@ -293,7 +339,12 @@ extension CassandraClient { } } - func execute(statement: Statement, pageSize: Int32, on eventLoop: EventLoop?, logger: Logger? = .none) -> EventLoopFuture { + func execute( + statement: Statement, + pageSize: Int32, + on eventLoop: EventLoop?, + logger: Logger? = .none + ) -> EventLoopFuture { let eventLoop = eventLoop ?? self.eventLoopGroup.next() do { @@ -302,7 +353,9 @@ extension CassandraClient { return eventLoop.makeFailedFuture(error) } - return eventLoop.makeSucceededFuture(PaginatedRows(session: self, statement: statement, on: eventLoop, logger: logger)) + return eventLoop.makeSucceededFuture( + PaginatedRows(session: self, statement: statement, on: eventLoop, logger: logger) + ) } private func connect(on eventLoop: EventLoop, logger: Logger) -> EventLoopFuture { @@ -391,7 +444,12 @@ extension CassandraSession { logger: Logger? = .none, transform: @escaping (CassandraClient.Row) -> T? ) async throws -> [T] { - let rows = try await self.query(query, parameters: parameters, options: options, logger: logger) + let rows = try await self.query( + query, + parameters: parameters, + options: options, + logger: logger + ) return rows.compactMap(transform) } @@ -403,7 +461,12 @@ extension CassandraSession { options: CassandraClient.Statement.Options = .init(), logger: Logger? = .none ) async throws -> [T] { - let rows = try await self.query(query, parameters: parameters, options: options, logger: logger) + let rows = try await self.query( + query, + parameters: parameters, + options: options, + logger: logger + ) return try rows.map { row in try T(from: CassandraClient.RowDecoder(row: row)) } @@ -421,7 +484,11 @@ extension CassandraSession { options: CassandraClient.Statement.Options = .init(), logger: Logger? = .none ) async throws -> CassandraClient.Rows { - let statement = try CassandraClient.Statement(query: query, parameters: parameters, options: options) + let statement = try CassandraClient.Statement( + query: query, + parameters: parameters, + options: options + ) return try await self.execute(statement: statement, logger: logger) } @@ -434,14 +501,23 @@ extension CassandraSession { options: CassandraClient.Statement.Options = .init(), logger: Logger? = .none ) async throws -> CassandraClient.PaginatedRows { - let statement = try CassandraClient.Statement(query: query, parameters: parameters, options: options) + let statement = try CassandraClient.Statement( + query: query, + parameters: parameters, + options: options + ) return try await self.execute(statement: statement, pageSize: pageSize, logger: logger) } } extension CassandraClient.Session { @available(macOS 12, iOS 15, tvOS 15, watchOS 8, *) - func execute(statement: CassandraClient.Statement, logger: Logger? = .none) async throws -> CassandraClient.Rows { + func execute( + statement: CassandraClient.Statement, + logger: Logger? = .none + ) async throws + -> CassandraClient.Rows + { let logger = logger ?? self.logger lock.lock() @@ -485,7 +561,13 @@ extension CassandraClient.Session { } @available(macOS 12, iOS 15, tvOS 15, watchOS 8, *) - func execute(statement: CassandraClient.Statement, pageSize: Int32, logger: Logger? = .none) async throws -> CassandraClient.PaginatedRows { + func execute( + statement: CassandraClient.Statement, + pageSize: Int32, + logger: Logger? = .none + ) + async throws -> CassandraClient.PaginatedRows + { try statement.setPagingSize(pageSize) return CassandraClient.PaginatedRows(session: self, statement: statement, logger: logger) } @@ -529,13 +611,15 @@ private func callAndReleaseUnmanagedClosure(_ opaque: UnsafeRawPointer) { closure.value() } -private func futureSetCallback(_ future: OpaquePointer, completion: @escaping (Result) -> Void) { +private func futureSetCallback( + _ future: OpaquePointer, + completion: @escaping (Result) -> Void +) { let closure = unmanagedRetainedClosure { DispatchQueue.global().async { let resultCode = cass_future_error_code(future) - let result: Result = resultCode == CASS_OK ? - .success(()) : - .failure(CassandraClient.Error(future)) + let result: Result = + resultCode == CASS_OK ? .success(()) : .failure(CassandraClient.Error(future)) cass_future_free(future) completion(result) } @@ -543,13 +627,16 @@ private func futureSetCallback(_ future: OpaquePointer, completion: @escaping (R cass_future_set_callback(future, { _, data in callAndReleaseUnmanagedClosure(data!) }, closure) } -private func futureSetResultCallback(_ future: OpaquePointer, completion: @escaping (Result) -> Void) { +private func futureSetResultCallback( + _ future: OpaquePointer, + completion: @escaping (Result) -> Void +) { let closure = unmanagedRetainedClosure { DispatchQueue.global().async { let resultCode = cass_future_error_code(future) - let result: Result = resultCode == CASS_OK ? - .success(CassandraClient.Rows(future)) : - .failure(CassandraClient.Error(future)) + let result: Result = + resultCode == CASS_OK + ? .success(CassandraClient.Rows(future)) : .failure(CassandraClient.Error(future)) cass_future_free(future) completion(result) } diff --git a/Sources/CassandraClient/Statement.swift b/Sources/CassandraClient/Statement.swift index caac506..df32ab6 100644 --- a/Sources/CassandraClient/Statement.swift +++ b/Sources/CassandraClient/Statement.swift @@ -13,7 +13,7 @@ //===----------------------------------------------------------------------===// @_implementationOnly import CDataStaxDriver -import Foundation // for date and uuid +import Foundation // for date and uuid extension CassandraClient { /// A prepared statement to run in a Cassandra database. @@ -48,7 +48,11 @@ extension CassandraClient { case .double(let value): result = cass_statement_bind_double(self.rawPointer, index, value) case .bool(let value): - result = cass_statement_bind_bool(self.rawPointer, index, value ? cass_bool_t(1) : cass_bool_t(0)) + result = cass_statement_bind_bool( + self.rawPointer, + index, + value ? cass_bool_t(1) : cass_bool_t(0) + ) case .string(let value): result = cass_statement_bind_string(self.rawPointer, index, value) case .uuid(let value): @@ -70,7 +74,12 @@ extension CassandraClient { cass_statement_bind_bytes(this.rawPointer, index, buffer.baseAddress, buffer.count) } case .bytesUnsafe(let buffer): - result = cass_statement_bind_bytes(self.rawPointer, index, buffer.baseAddress, buffer.count) + result = cass_statement_bind_bytes( + self.rawPointer, + index, + buffer.baseAddress, + buffer.count + ) case .int8Array(let array): result = try self.bindArray(array, at: index) case .int16Array(let array): @@ -93,7 +102,9 @@ extension CassandraClient { } if let consistency = options.consistency { - try checkResult { cass_statement_set_consistency(self.rawPointer, consistency.cassConsistency) } + try checkResult { + cass_statement_set_consistency(self.rawPointer, consistency.cassConsistency) + } } if let requestTimeout = options.requestTimeout { @@ -103,7 +114,7 @@ extension CassandraClient { private func bindArray(_ array: [T], at index: Int) throws -> CassError { let collection = cass_collection_new(CASS_COLLECTION_TYPE_LIST, array.count) - try array.forEach { element in + for element in array { let appendResult: CassError switch element { case let value as Int8: @@ -147,7 +158,11 @@ extension CassandraClient { try checkResult { pagingStateToken.withUnsafeBytes { let buffer = $0.bindMemory(to: CChar.self) - return cass_statement_set_paging_state_token(self.rawPointer, buffer.baseAddress, buffer.count) + return cass_statement_set_paging_state_token( + self.rawPointer, + buffer.baseAddress, + buffer.count + ) } } } diff --git a/Tests/CassandraClientTests/CassandraClientTests.swift b/Tests/CassandraClientTests/CassandraClientTests.swift index 7f8f933..1b89f0d 100644 --- a/Tests/CassandraClientTests/CassandraClientTests.swift +++ b/Tests/CassandraClientTests/CassandraClientTests.swift @@ -12,12 +12,13 @@ // //===----------------------------------------------------------------------===// -@testable import CassandraClient import Foundation import Logging import NIO import XCTest +@testable import CassandraClient + final class Tests: XCTestCase { var cassandraClient: CassandraClient! var configuration: CassandraClient.Configuration! @@ -28,7 +29,9 @@ final class Tests: XCTestCase { let env = ProcessInfo.processInfo.environment let keyspace = env["CASSANDRA_KEYSPACE"] ?? "test" self.configuration = CassandraClient.Configuration( - contactPointsProvider: { callback in callback(.success([env["CASSANDRA_HOST"] ?? "127.0.0.1"])) }, + contactPointsProvider: { callback in + callback(.success([env["CASSANDRA_HOST"] ?? "127.0.0.1"])) + }, port: env["CASSANDRA_CQL_PORT"].flatMap(Int32.init) ?? 9042, protocolVersion: .v3 ) @@ -42,18 +45,22 @@ final class Tests: XCTestCase { // client for the tests self.cassandraClient = CassandraClient(configuration: self.configuration, logger: logger) // keyspace for the tests - XCTAssertNoThrow(try self.cassandraClient.withSession(keyspace: .none) { session in - try session - .run("create keyspace if not exists \(keyspace) with replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }") - .wait() - }) + XCTAssertNoThrow( + try self.cassandraClient.withSession(keyspace: .none) { session in + try session + .run( + "create keyspace if not exists \(keyspace) with replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }" + ) + .wait() + } + ) } override func tearDown() { super.tearDown() XCTAssertNoThrow(try self.cassandraClient.shutdown()) - self.cassandraClient = nil // FIXME: for tsan + self.cassandraClient = nil // FIXME: for tsan } func testSession() { @@ -63,8 +70,8 @@ final class Tests: XCTestCase { let tableName = "test_\(DispatchTime.now().uptimeNanoseconds)" XCTAssertNoThrow(try session.run("create table \(tableName) (data bigint primary key);").wait()) - let count = Int.random(in: 10 ... 100) - try! (0 ..< count).forEach { index in + let count = Int.random(in: 10...100) + for index in 0..]() - (0 ..< count).forEach { index in + for index in 0..]() - (0 ..< count).forEach { index in + for index in 0..=5.5) && canImport(_Concurrency)) try XCTSkipIf(true) #else - runAsyncAndWaitFor({ - let tableName = "test_\(DispatchTime.now().uptimeNanoseconds)" - try await self.cassandraClient.run("create table \(tableName) (id int primary key, data text);") + runAsyncAndWaitFor( + { + let tableName = "test_\(DispatchTime.now().uptimeNanoseconds)" + try await self.cassandraClient.run( + "create table \(tableName) (id int primary key, data text);" + ) - let count = Int.random(in: 1000 ... 2000) - await withThrowingTaskGroup(of: Void.self) { group in - let options = CassandraClient.Statement.Options(consistency: .localQuorum) - (0 ..< count).forEach { index in - group.addTask { - try await self.cassandraClient.run( - "insert into \(tableName) (id, data) values (?, ?);", - parameters: [.int32(Int32(index)), .string(UUID().uuidString)], - options: options - ) + let count = Int.random(in: 1000...2000) + await withThrowingTaskGroup(of: Void.self) { group in + let options = CassandraClient.Statement.Options(consistency: .localQuorum) + for index in 0..]() - (0 ..< count).forEach { index in + for index in 0..=5.5) && canImport(_Concurrency)) try XCTSkipIf(true) #else - runAsyncAndWaitFor({ - let tableName = "test_\(DispatchTime.now().uptimeNanoseconds)" - try await self.cassandraClient.run("create table \(tableName) (id int primary key, data text);") + runAsyncAndWaitFor( + { + let tableName = "test_\(DispatchTime.now().uptimeNanoseconds)" + try await self.cassandraClient.run( + "create table \(tableName) (id int primary key, data text);" + ) - let count = Int.random(in: 1000 ... 2000) - await withThrowingTaskGroup(of: Void.self) { group in - (0 ..< count).forEach { index in - group.addTask { - try await self.cassandraClient.run( - "insert into \(tableName) (id, data) values (?, ?);", - parameters: [.int32(Int32(index)), .string(UUID().uuidString)] - ) + let count = Int.random(in: 1000...2000) + await withThrowingTaskGroup(of: Void.self) { group in + for index in 0..]() - (0 ..< count).forEach { index in + for index in 0.., - col15 list, - col16 list, - col17 list, - col18 list, - col19 list, - col20 list - ); - """ - ).wait()) + XCTAssertNoThrow( + try self.cassandraClient.run( + """ + create table \(tableName) + ( + col1 tinyint primary key, + col2 smallint, + col3 int, + col4 bigint, + col5 float, + col6 double, + col7 text, + col8 timestamp, + col9 uuid, + col10 timeuuid, + col11 blob, + col12 boolean, + col13 text, + col14 list, + col15 list, + col16 list, + col17 list, + col18 list, + col19 list, + col20 list + ); + """ + ).wait() + ) var futures = [EventLoopFuture]() - data.forEach { model in - let parameters: [CassandraClient.Statement.Value] = [.int8(model.col1), - .int16(model.col2), - .int32(model.col3), - .int64(model.col4), - .float32(model.col5), - .double(model.col6), - .string(model.col7), - .date(model.col8), - .uuid(model.col9), - .timeuuid(model.col10), - .bytes(model.col11), - .bool(model.col12), - .null, - .int8Array(model.col14), - .int16Array(model.col15), - .int32Array(model.col16), - .int64Array(model.col17), - .float32Array(model.col18), - .doubleArray(model.col19), - .stringArray(model.col20)] + for model in data { + let parameters: [CassandraClient.Statement.Value] = [ + .int8(model.col1), + .int16(model.col2), + .int32(model.col3), + .int64(model.col4), + .float32(model.col5), + .double(model.col6), + .string(model.col7), + .date(model.col8), + .uuid(model.col9), + .timeuuid(model.col10), + .bytes(model.col11), + .bool(model.col12), + .null, + .int8Array(model.col14), + .int16Array(model.col15), + .int32Array(model.col16), + .int64Array(model.col17), + .float32Array(model.col18), + .doubleArray(model.col19), + .stringArray(model.col20), + ] futures.append( self.cassandraClient.run( """ @@ -529,9 +616,10 @@ final class Tests: XCTestCase { defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) } XCTAssertNoThrow(try EventLoopFuture.andAllSucceed(futures, on: eventLoopGroup.next()).wait()) - let result: [Model] = try! self.cassandraClient.query("select * from \(tableName);").wait().sorted { a, b in a.col1 < b.col1 } + let result: [Model] = try! self.cassandraClient.query("select * from \(tableName);").wait() + .sorted { a, b in a.col1 < b.col1 } XCTAssertEqual(result.count, data.count, "result count should match") - result.enumerated().forEach { index, item in + for (index, item) in result.enumerated() { XCTAssertEqual(item.col1, data[index].col1, "results should match") XCTAssertEqual(item.col2, data[index].col2, "results should match") XCTAssertEqual(item.col3, data[index].col3, "results should match") @@ -539,7 +627,12 @@ final class Tests: XCTestCase { XCTAssertEqual(item.col5, data[index].col5, "results should match") XCTAssertEqual(item.col6, data[index].col6, "results should match") XCTAssertEqual(item.col7, data[index].col7, "results should match") - XCTAssertEqual(item.col8.timeIntervalSince1970, data[index].col8.timeIntervalSince1970, accuracy: 1, "results should match") + XCTAssertEqual( + item.col8.timeIntervalSince1970, + data[index].col8.timeIntervalSince1970, + accuracy: 1, + "results should match" + ) XCTAssertEqual(item.col9, data[index].col9, "results should match") XCTAssertEqual(item.col10, data[index].col10, "results should match") XCTAssertEqual(item.col11, data[index].col11, "results should match") @@ -583,91 +676,107 @@ final class Tests: XCTestCase { // IP: address string in IPv4 or IPv6 format func testDataTypes() { let tableName = "test_\(DispatchTime.now().uptimeNanoseconds)" - XCTAssertNoThrow(try self.cassandraClient.run(""" - create table \(tableName) ( - col1 tinyint primary key, - col2 smallint, - col3 int, - col4 bigint, - col5 varint, - col6 decimal, - col7 float, - col8 double, - col9 text, - col10 date, - col11 timestamp, - col12 uuid, - col13 timeuuid, - col14 blob, - col15 boolean, - col16 text, - col17 list, - col18 list, - col19 list, - col20 list, - col21 list, - col22 list, - col23 list, + XCTAssertNoThrow( + try self.cassandraClient.run( + """ + create table \(tableName) ( + col1 tinyint primary key, + col2 smallint, + col3 int, + col4 bigint, + col5 varint, + col6 decimal, + col7 float, + col8 double, + col9 text, + col10 date, + col11 timestamp, + col12 uuid, + col13 timeuuid, + col14 blob, + col15 boolean, + col16 text, + col17 list, + col18 list, + col19 list, + col20 list, + col21 list, + col22 list, + col23 list, + ) + """ + ).wait() ) - """).wait()) - for index in Int8(0) ..< Int8(10) { - let int16 = Int16.random(in: Int16.min ... Int16.max) - let int32 = Int32.random(in: Int32.min ... Int32.max) - let int64 = Int64.random(in: Int64.min ... Int64.max) + for index in Int8(0).. - .int16Array(int16List), // list - .int32Array(int32List), // list - .int64Array(int64List), // list - .float32Array(float32List), // list - .doubleArray(doubleList), // list - .stringArray(textList), // list + .int8Array(int8List), // list + .int16Array(int16List), // list + .int32Array(int32List), // list + .int64Array(int64List), // list + .float32Array(float32List), // list + .doubleArray(doubleList), // list + .stringArray(textList), // list ] - XCTAssertNoThrow(try self.cassandraClient.run(""" - insert into \(tableName) - (col1, col2, col3, col4, col5, col6, col7, col8, col9, col10, col11, col12, col13, col14, col15, col16, col17, col18, col19, col20, col21, col22, col23) - values - (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - """, parameters: parameters).wait()) + XCTAssertNoThrow( + try self.cassandraClient.run( + """ + insert into \(tableName) + (col1, col2, col3, col4, col5, col6, col7, col8, col9, col10, col11, col12, col13, col14, col15, col16, col17, col18, col19, col20, col21, col22, col23) + values + (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + parameters: parameters + ).wait() + ) let result = try! self.cassandraClient.query("select * from \(tableName);").wait() XCTAssertEqual(Int8(result.count), index + 1, "expected exactly one result") @@ -681,8 +790,18 @@ final class Tests: XCTestCase { XCTAssertEqual(row.column("col7"), float32, "expected value to match") XCTAssertEqual(row.column("col8"), double, "expected value to match") XCTAssertEqual(row.column("col9"), text, "expected value to match") - XCTAssertEqual(row.column("col10")?.date.map { Double($0 * 86400) } ?? 0.0, now, accuracy: 100_000, "expected value to match") - XCTAssertEqual(row.column("col11")?.timestamp.map { Double($0 / 1000) } ?? 0.0, now, accuracy: 1, "expected value to match") + XCTAssertEqual( + row.column("col10")?.date.map { Double($0 * 86400) } ?? 0.0, + now, + accuracy: 100_000, + "expected value to match" + ) + XCTAssertEqual( + row.column("col11")?.timestamp.map { Double($0 / 1000) } ?? 0.0, + now, + accuracy: 1, + "expected value to match" + ) XCTAssertEqual(row.column("col12"), uuid, "expected value to match") XCTAssertEqual(row.column("col13"), timeuuid, "expected value to match") XCTAssertEqual(row.column("col14"), blob, "expected value to match") @@ -700,7 +819,10 @@ final class Tests: XCTestCase { func testErrorMapping() { XCTAssertThrowsError(try self.cassandraClient.run("boom!").wait()) { error in - XCTAssertEqual(error as? CassandraClient.Error, .syntaxError("line 1:0 no viable alternative at input \'boom\' ([boom]...)")) + XCTAssertEqual( + error as? CassandraClient.Error, + .syntaxError("line 1:0 no viable alternative at input \'boom\' ([boom]...)") + ) } } @@ -709,7 +831,7 @@ final class Tests: XCTestCase { var buffer = [UInt8]() var generator = SystemRandomNumberGenerator() for index in stride(from: 0, to: size, by: 8) { - let int64 = Int64.random(in: Int64.min ... Int64.max, using: &generator) + let int64 = Int64.random(in: Int64.min...Int64.max, using: &generator) let bytes = withUnsafeBytes(of: int64.bigEndian) { Array($0) } if index + bytes.count > size { buffer += bytes.dropLast(index + bytes.count - size) @@ -725,7 +847,10 @@ final class Tests: XCTestCase { extension XCTestCase { // TODO: remove once XCTest supports async functions @available(macOS 12, iOS 15, tvOS 15, watchOS 8, *) - func runAsyncAndWaitFor(_ closure: @escaping @Sendable () async throws -> Void, _ timeout: TimeInterval = 3.0) { + func runAsyncAndWaitFor( + _ closure: @escaping @Sendable () async throws -> Void, + _ timeout: TimeInterval = 3.0 + ) { let finished = expectation(description: "finished") Task.detached { try await closure()