From f42a51dd9fc594d93430aad76962ce84ab8e9653 Mon Sep 17 00:00:00 2001 From: Yim Lee Date: Wed, 9 Nov 2022 15:33:35 -0800 Subject: [PATCH] Provide way to set cluster consistency (#8) * Provide way to set cluster consistency rdar://102086631 * Run swiftformat * Change swiftformat rule to put access control keyword on declaration rather than extension * Clean up configuration init * Document default consistency levels --- .swiftformat | 1 + Sources/CassandraClient/CassandraClient.swift | 20 +-- Sources/CassandraClient/Configuration.swift | 93 ++++------- Sources/CassandraClient/Consistency.swift | 59 +++++++ Sources/CassandraClient/Data.swift | 144 +++++++++--------- Sources/CassandraClient/Decoding.swift | 4 +- Sources/CassandraClient/Errors.swift | 10 +- Sources/CassandraClient/Session.swift | 70 ++++----- Sources/CassandraClient/Statement.swift | 53 +------ .../CassandraClientTests.swift | 8 +- docker/docker-compose-dev.yaml | 2 +- 11 files changed, 226 insertions(+), 238 deletions(-) create mode 100644 Sources/CassandraClient/Consistency.swift diff --git a/.swiftformat b/.swiftformat index e54a244..67c4de3 100644 --- a/.swiftformat +++ b/.swiftformat @@ -8,6 +8,7 @@ --ifdef no-indent --indent 4 +--extensionacl on-declarations --patternlet inline --self insert --stripunusedargs closure-only diff --git a/Sources/CassandraClient/CassandraClient.swift b/Sources/CassandraClient/CassandraClient.swift index 7371f99..eb43e26 100644 --- a/Sources/CassandraClient/CassandraClient.swift +++ b/Sources/CassandraClient/CassandraClient.swift @@ -90,7 +90,7 @@ public class CassandraClient: CassandraSession { /// - logger: If `nil`, the client's default `Logger` is used. /// /// - Returns: The resulting ``Rows``. - public func execute(statement: Statement, on eventLoop: EventLoop?, logger: Logger? = nil) -> EventLoopFuture { + public func execute(statement: Statement, on eventLoop: EventLoop?, logger: Logger? = .none) -> EventLoopFuture { self.defaultSession.execute(statement: statement, on: eventLoop, logger: logger) } @@ -105,7 +105,7 @@ public class CassandraClient: CassandraSession { /// - logger: If `nil`, the client's default `Logger` is used. /// /// - Returns: The ``PaginatedRows``. - public func execute(statement: Statement, pageSize: Int32, on eventLoop: EventLoop?, logger: Logger? = nil) -> EventLoopFuture { + public func execute(statement: Statement, pageSize: Int32, on eventLoop: EventLoop?, logger: Logger? = .none) -> EventLoopFuture { self.defaultSession.execute(statement: statement, pageSize: pageSize, on: eventLoop, logger: logger) } @@ -116,7 +116,7 @@ public class CassandraClient: CassandraSession { /// - logger: If `nil`, the client's default `Logger` is used. /// /// - Returns: The newly created session. - public func makeSession(keyspace: String?, logger: Logger? = nil) -> CassandraSession { + public func makeSession(keyspace: String?, logger: Logger? = .none) -> CassandraSession { var configuration = self.configuration configuration.keyspace = keyspace let logger = logger ?? self.logger @@ -129,7 +129,7 @@ public class CassandraClient: CassandraSession { /// - keyspace: If `nil`, the client's default keyspace is used. /// - logger: If `nil`, the client's default `Logger` is used. /// - handler: The closure to invoke, passing in the newly created session. - public func withSession(keyspace: String?, logger: Logger? = nil, handler: (CassandraSession) throws -> Void) rethrows { + public func withSession(keyspace: String?, logger: Logger? = .none, handler: (CassandraSession) throws -> Void) rethrows { let session = self.makeSession(keyspace: keyspace, logger: logger) defer { do { @@ -149,7 +149,7 @@ public class CassandraClient: CassandraSession { /// - handler: The closure to invoke, passing in the newly created session. /// /// - Returns: The resulting `EventLoopFuture` of the closure. - public func withSession(keyspace: String?, logger: Logger? = nil, handler: (CassandraSession) -> EventLoopFuture) -> EventLoopFuture { + public func withSession(keyspace: String?, logger: Logger? = .none, handler: (CassandraSession) -> EventLoopFuture) -> EventLoopFuture { let session = self.makeSession(keyspace: keyspace, logger: logger) return handler(session).always { _ in do { @@ -175,7 +175,7 @@ public class CassandraClient: CassandraSession { } #if compiler(>=5.5) && canImport(_Concurrency) -public extension CassandraClient { +extension CassandraClient { /// Execute a ``Statement`` using the default ``CassandraSession``. /// /// **All** rows are returned. @@ -186,7 +186,7 @@ public extension CassandraClient { /// /// - Returns: The resulting ``Rows``. @available(macOS 12, iOS 15, tvOS 15, watchOS 8, *) - func execute(statement: Statement, logger: Logger? = nil) async throws -> Rows { + public func execute(statement: Statement, logger: Logger? = .none) async throws -> Rows { try await self.defaultSession.execute(statement: statement, logger: logger) } @@ -201,7 +201,7 @@ public extension CassandraClient { /// /// - Returns: The ``PaginatedRows``. @available(macOS 12, iOS 15, tvOS 15, watchOS 8, *) - func execute(statement: Statement, pageSize: Int32, logger: Logger? = nil) async throws -> PaginatedRows { + public func execute(statement: Statement, pageSize: Int32, logger: Logger? = .none) async throws -> PaginatedRows { try await self.defaultSession.execute(statement: statement, pageSize: pageSize, logger: logger) } @@ -212,7 +212,7 @@ public extension CassandraClient { /// - logger: If `nil`, the client's default `Logger` is used. /// - closure: The closure to invoke, passing in the newly created session. @available(macOS 12, iOS 15, tvOS 15, watchOS 8, *) - func withSession(keyspace: String?, logger: Logger? = nil, closure: (CassandraSession) async throws -> Void) async throws { + public func withSession(keyspace: String?, logger: Logger? = .none, closure: (CassandraSession) async throws -> Void) async throws { let session = self.makeSession(keyspace: keyspace, logger: logger) defer { do { @@ -233,7 +233,7 @@ public extension CassandraClient { /// /// - Returns: The result of the closure. @available(macOS 12, iOS 15, tvOS 15, watchOS 8, *) - func withSession(keyspace: String?, logger: Logger? = nil, handler: (CassandraSession) async throws -> T) async throws -> T { + public func withSession(keyspace: String?, logger: Logger? = .none, handler: (CassandraSession) async throws -> T) async throws -> T { let session = self.makeSession(keyspace: keyspace, logger: logger) defer { do { diff --git a/Sources/CassandraClient/Configuration.swift b/Sources/CassandraClient/Configuration.swift index 16e56f7..147312c 100644 --- a/Sources/CassandraClient/Configuration.swift +++ b/Sources/CassandraClient/Configuration.swift @@ -16,14 +16,15 @@ import NIO // TODO: add more config option per C++ cluster impl -public extension CassandraClient { +extension CassandraClient { /// Configuration for the ``CassandraClient``. - struct Configuration: CustomStringConvertible { + public struct Configuration: CustomStringConvertible { public typealias ContactPoints = [String] /// Provides the initial `ContactPoints` of the Cassandra cluster. /// This can be a subset since each Cassandra instance is capable of discovering its peers. public var contactPointsProvider: (@escaping (Result) -> Void) -> Void + public var port: Int32 public var protocolVersion: ProtocolVersion public var username: String? @@ -37,7 +38,7 @@ public extension CassandraClient { public var coreConnectionsPerHost: UInt32? public var tcpNodelay: Bool? public var tcpKeepalive: Bool? - public var tcpKeepaliveDelaySeconds: UInt32 + public var tcpKeepaliveDelaySeconds: UInt32 = 0 public var connectionHeartbeatInterval: UInt32? public var connectionIdleTimeout: UInt32? public var schema: Bool? @@ -47,6 +48,9 @@ public extension CassandraClient { public var prepareStrategy: PrepareStrategy? public var compact: Bool? + /// Sets the cluster's consistency level. Default is `.localOne`. + public var consistency: CassandraClient.Consistency? + public enum SpeculativeExecutionPolicy { case constant(delayInMillseconds: Int64, maxExecutions: Int32) case disabled @@ -68,51 +72,11 @@ public extension CassandraClient { public init( contactPointsProvider: @escaping (@escaping (Result) -> Void) -> Void, port: Int32, - protocolVersion: ProtocolVersion, - username: String? = nil, - password: String? = nil, - ssl: SSL? = nil, - keyspace: String? = nil, - numIOThreads: UInt32? = nil, - connectTimeoutMillis: UInt32? = nil, - requestTimeoutMillis: UInt32? = nil, - resolveTimeoutMillis: UInt32? = nil, - coreConnectionsPerHost: UInt32? = nil, - tcpNodelay: Bool? = nil, - tcpKeepalive: Bool? = nil, - tcpKeepaliveDelaySeconds: UInt32 = 0, - connectionHeartbeatInterval: UInt32? = nil, - connectionIdleTimeout: UInt32? = nil, - schema: Bool? = nil, - hostnameResolution: Bool? = nil, - randomizedContactPoints: Bool? = nil, - speculativeExecutionPolicy: SpeculativeExecutionPolicy? = nil, - prepareStrategy: PrepareStrategy? = nil, - compact: Bool? = nil + protocolVersion: ProtocolVersion ) { self.contactPointsProvider = contactPointsProvider self.port = port self.protocolVersion = protocolVersion - self.username = username - self.password = password - self.ssl = ssl - self.keyspace = keyspace - self.numIOThreads = numIOThreads - self.connectTimeoutMillis = connectTimeoutMillis - self.requestTimeoutMillis = requestTimeoutMillis - self.resolveTimeoutMillis = resolveTimeoutMillis - self.coreConnectionsPerHost = coreConnectionsPerHost - self.tcpNodelay = tcpNodelay - self.tcpKeepalive = tcpKeepalive - self.tcpKeepaliveDelaySeconds = tcpKeepaliveDelaySeconds - self.connectionHeartbeatInterval = connectionHeartbeatInterval - self.connectionIdleTimeout = connectionIdleTimeout - self.schema = schema - self.hostnameResolution = hostnameResolution - self.randomizedContactPoints = randomizedContactPoints - self.speculativeExecutionPolicy = speculativeExecutionPolicy - self.prepareStrategy = prepareStrategy - self.compact = compact } internal func makeCluster(on eventLoop: EventLoop) -> EventLoopFuture { @@ -167,40 +131,40 @@ public extension CassandraClient { if let ssl = self.ssl { try cluster.setSSL(try ssl.makeSSLContext()) } - if let value = numIOThreads { + if let value = self.numIOThreads { try cluster.setNumThreadsIO(value) } - if let value = connectTimeoutMillis { + if let value = self.connectTimeoutMillis { try cluster.setConnectTimeout(value) } - if let value = requestTimeoutMillis { + if let value = self.requestTimeoutMillis { try cluster.setRequestTimeout(value) } - if let value = resolveTimeoutMillis { + if let value = self.resolveTimeoutMillis { try cluster.setResolveTimeout(value) } - if let value = coreConnectionsPerHost { + if let value = self.coreConnectionsPerHost { try cluster.setCoreConnectionsPerHost(value) } - if let value = tcpNodelay { + if let value = self.tcpNodelay { try cluster.setTcpNodelay(value) } - if let value = tcpKeepalive { + if let value = self.tcpKeepalive { try cluster.setTcpKeepalive(value, delayInSeconds: self.tcpKeepaliveDelaySeconds) } - if let value = connectionHeartbeatInterval { + if let value = self.connectionHeartbeatInterval { try cluster.setConnectionHeartbeatInterval(value) } - if let value = connectionIdleTimeout { + if let value = self.connectionIdleTimeout { try cluster.setConnectionIdleTimeout(value) } - if let value = schema { + if let value = self.schema { try cluster.setUseSchema(value) } - if let value = hostnameResolution { + if let value = self.hostnameResolution { try cluster.setUseHostnameResolution(value) } - if let value = randomizedContactPoints { + if let value = self.randomizedContactPoints { try cluster.setUseRandomizedContactPoints(value) } switch self.speculativeExecutionPolicy { @@ -219,9 +183,12 @@ public extension CassandraClient { case .none: break } - if let value = compact { + if let value = self.compact { try cluster.setNoCompact(!value) } + if let value = self.consistency { + try cluster.setConsistency(value.cassConsistency) + } return cluster } @@ -338,6 +305,10 @@ internal final class Cluster { try self.checkResult { cass_cluster_set_no_compact(self.rawPointer, enabled ? cass_true : cass_false) } } + func setConsistency(_ consistency: CassConsistency) throws { + try self.checkResult { cass_cluster_set_consistency(self.rawPointer, consistency) } + } + func setSSL(_ ssl: SSLContext) throws { cass_cluster_set_ssl(self.rawPointer, ssl.rawPointer) } @@ -352,8 +323,8 @@ internal final class Cluster { // MARK: - SSL -public extension CassandraClient.Configuration { - struct SSL { +extension CassandraClient.Configuration { + public struct SSL { public var trustedCertificates: [String]? public var verifyFlag: VerifyFlag? public var cert: String? @@ -373,9 +344,7 @@ public extension CassandraClient.Configuration { case peerIdentityDNS } - public init(trustedCertificates: [String]?) { - self.trustedCertificates = trustedCertificates - } + public init() {} func makeSSLContext() throws -> SSLContext { let sslContext = SSLContext() diff --git a/Sources/CassandraClient/Consistency.swift b/Sources/CassandraClient/Consistency.swift new file mode 100644 index 0000000..3221953 --- /dev/null +++ b/Sources/CassandraClient/Consistency.swift @@ -0,0 +1,59 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift Cassandra Client open source project +// +// Copyright (c) 2022 Apple Inc. and the Swift Cassandra Client project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of Swift Cassandra Client project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +@_implementationOnly import CDataStaxDriver + +extension CassandraClient { + /// Consistency levels + public enum Consistency { + case any + case one + case two + case three + case quorum + case all + case localQuorum + case eachQuorum + case serial + case localSerial + case localOne + + var cassConsistency: CassConsistency { + switch self { + case .any: + return CASS_CONSISTENCY_ANY + case .one: + return CASS_CONSISTENCY_ONE + case .two: + return CASS_CONSISTENCY_TWO + case .three: + return CASS_CONSISTENCY_THREE + case .quorum: + return CASS_CONSISTENCY_QUORUM + case .all: + return CASS_CONSISTENCY_ALL + case .localQuorum: + return CASS_CONSISTENCY_LOCAL_QUORUM + case .eachQuorum: + return CASS_CONSISTENCY_EACH_QUORUM + case .serial: + return CASS_CONSISTENCY_SERIAL + case .localSerial: + return CASS_CONSISTENCY_LOCAL_SERIAL + case .localOne: + return CASS_CONSISTENCY_LOCAL_ONE + } + } + } +} diff --git a/Sources/CassandraClient/Data.swift b/Sources/CassandraClient/Data.swift index 803b95e..d00a546 100644 --- a/Sources/CassandraClient/Data.swift +++ b/Sources/CassandraClient/Data.swift @@ -17,9 +17,9 @@ import Foundation import Logging import NIO -public extension CassandraClient { +extension CassandraClient { /// Resulting row(s) of a Cassandra query. Data are returned all at once. - final class Rows: Sequence { + public final class Rows: Sequence { internal let rawPointer: OpaquePointer internal init(_ resultFutureRawPointer: OpaquePointer) { @@ -74,7 +74,7 @@ public extension CassandraClient { } /// Resulting row(s) of a Cassandra query. Data are paginated. - final class PaginatedRows { + public final class PaginatedRows { let session: Session let statement: Statement let eventLoop: EventLoop? @@ -232,7 +232,7 @@ public extension CassandraClient { } /// A resulting row of a Cassandra query. - struct Row { + public struct Row { internal let rawPointer: OpaquePointer // Used to make sure the iterator isn't freed while a reference to one // of its rows still exists @@ -257,7 +257,7 @@ public extension CassandraClient { /// A column in a resulting ``Row`` of a Cassandra query. /// /// Note that the value is only good as long as the iterator it came from hasn't been advanced. - struct Column { + public struct Column { let rawPointer: OpaquePointer // Used to make sure the row isn't freed while a reference to one // of its columns still exists @@ -287,9 +287,9 @@ public extension CassandraClient { // MARK: - Int8 -public extension CassandraClient.Column { +extension CassandraClient.Column { /// Get column value as `Int8`. - var int8: Int8? { + public var int8: Int8? { var value: Int8 = 0 let error = cass_value_get_int8(rawPointer, &value) if error == CASS_OK { @@ -300,184 +300,184 @@ public extension CassandraClient.Column { } } -public extension CassandraClient.Row { +extension CassandraClient.Row { /// Get column value as `Int8`. - func column(_ name: String) -> Int8? { + public func column(_ name: String) -> Int8? { self.column(name)?.int8 } /// Get column value as `Int8`. - func column(_ index: Int) -> Int8? { + public func column(_ index: Int) -> Int8? { self.column(index)?.int8 } } // MARK: - Int16 -public extension CassandraClient.Column { +extension CassandraClient.Column { /// Get column value as `Int16`. - var int16: Int16? { + public var int16: Int16? { var value: Int16 = 0 let error = cass_value_get_int16(rawPointer, &value) return error == CASS_OK ? value : nil } } -public extension CassandraClient.Row { +extension CassandraClient.Row { /// Get column value as `Int16`. - func column(_ name: String) -> Int16? { + public func column(_ name: String) -> Int16? { self.column(name)?.int16 } /// Get column value as `Int16`. - func column(_ index: Int) -> Int16? { + public func column(_ index: Int) -> Int16? { self.column(index)?.int16 } } // MARK: - Int32 -public extension CassandraClient.Column { +extension CassandraClient.Column { /// Get column value as `Int32`. - var int32: Int32? { + public var int32: Int32? { var value: Int32 = 0 let error = cass_value_get_int32(rawPointer, &value) return error == CASS_OK ? value : nil } } -public extension CassandraClient.Row { +extension CassandraClient.Row { /// Get column value as `Int32`. - func column(_ name: String) -> Int32? { + public func column(_ name: String) -> Int32? { self.column(name)?.int32 } /// Get column value as `Int32`. - func column(_ index: Int) -> Int32? { + public func column(_ index: Int) -> Int32? { self.column(index)?.int32 } } // MARK: - UInt32 -public extension CassandraClient.Column { +extension CassandraClient.Column { /// Get column value as `UInt32`. - var uint32: UInt32? { + public var uint32: UInt32? { var value: UInt32 = 0 let error = cass_value_get_uint32(rawPointer, &value) return error == CASS_OK ? value : nil } } -public extension CassandraClient.Row { +extension CassandraClient.Row { /// Get column value as `UInt32`. - func column(_ name: String) -> UInt32? { + public func column(_ name: String) -> UInt32? { self.column(name)?.uint32 } /// Get column value as `UInt32`. - func column(_ index: Int) -> UInt32? { + public func column(_ index: Int) -> UInt32? { self.column(index)?.uint32 } } // MARK: - Int64 -public extension CassandraClient.Column { +extension CassandraClient.Column { /// Get column value as `Int64`. - var int64: Int64? { + public var int64: Int64? { var value: cass_int64_t = 0 let error = cass_value_get_int64(rawPointer, &value) return error == CASS_OK ? Int64(value) : nil } } -public extension CassandraClient.Row { +extension CassandraClient.Row { /// Get column value as `Int64`. - func column(_ name: String) -> Int64? { + public func column(_ name: String) -> Int64? { self.column(name)?.int64 } /// Get column value as `Int64`. - func column(_ index: Int) -> Int64? { + public func column(_ index: Int) -> Int64? { self.column(index)?.int64 } } // MARK: - Float32 -public extension CassandraClient.Column { +extension CassandraClient.Column { /// Get column value as `Float32`. - var float32: Float32? { + public var float32: Float32? { var value: Float32 = 0 let error = cass_value_get_float(rawPointer, &value) return error == CASS_OK ? value : nil } } -public extension CassandraClient.Row { +extension CassandraClient.Row { /// Get column value as `Float32`. - func column(_ name: String) -> Float32? { + public func column(_ name: String) -> Float32? { self.column(name)?.float32 } /// Get column value as `Float32`. - func column(_ index: Int) -> Float32? { + public func column(_ index: Int) -> Float32? { self.column(index)?.float32 } } // MARK: - Double -public extension CassandraClient.Column { +extension CassandraClient.Column { /// Get column value as `Double`. - var double: Double? { + public var double: Double? { var value: Double = 0 let error = cass_value_get_double(rawPointer, &value) return error == CASS_OK ? value : nil } } -public extension CassandraClient.Row { +extension CassandraClient.Row { /// Get column value as `Double`. - func column(_ name: String) -> Double? { + public func column(_ name: String) -> Double? { self.column(name)?.double } /// Get column value as `Double`. - func column(_ index: Int) -> Double? { + public func column(_ index: Int) -> Double? { self.column(index)?.double } } // MARK: - Bool -public extension CassandraClient.Column { +extension CassandraClient.Column { /// Get column value as `Bool`. - var bool: Bool? { + public var bool: Bool? { var value = cass_bool_t(0) let error = cass_value_get_bool(rawPointer, &value) return error == CASS_OK ? value == cass_true : nil } } -public extension CassandraClient.Row { +extension CassandraClient.Row { /// Get column value as `Bool`. - func column(_ name: String) -> Bool? { + public func column(_ name: String) -> Bool? { self.column(name)?.bool } /// Get column value as `Bool`. - func column(_ index: Int) -> Bool? { + public func column(_ index: Int) -> Bool? { self.column(index)?.bool } } // MARK: - String -public extension CassandraClient.Column { +extension CassandraClient.Column { /// Get column value as `String`. - var string: String? { + public var string: String? { var value: UnsafePointer? var valueSize = 0 let error = cass_value_get_string(rawPointer, &value, &valueSize) @@ -491,14 +491,14 @@ public extension CassandraClient.Column { } } -public extension CassandraClient.Row { +extension CassandraClient.Row { /// Get column value as `String`. - func column(_ name: String) -> String? { + public func column(_ name: String) -> String? { self.column(name)?.string } /// Get column value as `String`. - func column(_ index: Int) -> String? { + public func column(_ index: Int) -> String? { self.column(index)?.string } } @@ -553,9 +553,9 @@ public struct TimeBasedUUID: Codable, Hashable, Equatable, CustomStringConvertib } } -public extension CassandraClient.Column { +extension CassandraClient.Column { /// Get column value as `UUID`. - var uuid: Foundation.UUID? { + public var uuid: Foundation.UUID? { var value = CassUuid() let error = cass_value_get_uuid(rawPointer, &value) guard error == CASS_OK else { @@ -565,7 +565,7 @@ public extension CassandraClient.Column { } /// Get column value as ``TimeBasedUUID``. - var timeuuid: TimeBasedUUID? { + public var timeuuid: TimeBasedUUID? { var value = CassUuid() let error = cass_value_get_uuid(rawPointer, &value) guard error == CASS_OK else { @@ -575,30 +575,30 @@ public extension CassandraClient.Column { } } -public extension CassandraClient.Row { +extension CassandraClient.Row { /// Get column value as `UUID`. - func column(_ name: String) -> Foundation.UUID? { + public func column(_ name: String) -> Foundation.UUID? { self.column(name)?.uuid } /// Get column value as `UUID`. - func column(_ index: Int) -> Foundation.UUID? { + public func column(_ index: Int) -> Foundation.UUID? { self.column(index)?.uuid } /// Get column value as ``TimeBasedUUID``. - func column(_ name: String) -> TimeBasedUUID? { + public func column(_ name: String) -> TimeBasedUUID? { self.column(name)?.timeuuid } /// Get column value as ``TimeBasedUUID``. - func column(_ index: Int) -> TimeBasedUUID? { + public func column(_ index: Int) -> TimeBasedUUID? { self.column(index)?.timeuuid } } -internal extension CassUuid { - init(_ uuid: uuid_t) { +extension CassUuid { + internal init(_ uuid: uuid_t) { self.init() var timeAndVersion = [UInt8]() @@ -628,7 +628,7 @@ internal extension CassUuid { }.pointee)) } - func uuid() -> Foundation.UUID { + internal func uuid() -> Foundation.UUID { var buffer: uuid_t let timeAndVersion = withUnsafeBytes(of: time_and_version.bigEndian) { Array($0) } @@ -657,9 +657,9 @@ internal extension CassUuid { // MARK: - Date -public extension CassandraClient.Column { +extension CassandraClient.Column { /// Get column date value as `UInt32`. - var date: UInt32? { + public var date: UInt32? { var value: UInt32 = 0 let error = cass_value_get_uint32(rawPointer, &value) return error == CASS_OK ? value : nil @@ -668,18 +668,18 @@ public extension CassandraClient.Column { // MARK: - Timestamp -public extension CassandraClient.Column { +extension CassandraClient.Column { /// Get column timestamp value as `Int64`. - var timestamp: Int64? { + public var timestamp: Int64? { self.int64 } } // MARK: - Bytes -public extension CassandraClient.Column { +extension CassandraClient.Column { /// Get column value as `[UInt8]`. - var bytes: [UInt8]? { + public var bytes: [UInt8]? { var value: UnsafePointer? var size = 0 let error = cass_value_get_bytes(rawPointer, &value, &size) @@ -687,23 +687,23 @@ public extension CassandraClient.Column { } } -public extension CassandraClient.Row { +extension CassandraClient.Row { /// Get column value as `[UInt8]`. - func column(_ name: String) -> [UInt8]? { + public func column(_ name: String) -> [UInt8]? { self.column(name)?.bytes } /// Get column value as `[UInt8]`. - func column(_ index: Int) -> [UInt8]? { + public func column(_ index: Int) -> [UInt8]? { self.column(index)?.bytes } } // MARK: - Unsafe bytes -public extension CassandraClient.Column { +extension CassandraClient.Column { /// Get column value as buffer pointer and pass it to the given closure. - func withUnsafeBuffer(closure: (UnsafeBufferPointer?) throws -> R) rethrows -> R { + public func withUnsafeBuffer(closure: (UnsafeBufferPointer?) throws -> R) rethrows -> R { var value: UnsafePointer? var valueSize = Int() let error = cass_value_get_bytes(rawPointer, &value, &valueSize) diff --git a/Sources/CassandraClient/Decoding.swift b/Sources/CassandraClient/Decoding.swift index a99c946..0f265fa 100644 --- a/Sources/CassandraClient/Decoding.swift +++ b/Sources/CassandraClient/Decoding.swift @@ -14,8 +14,8 @@ import Foundation // for date and uuid -internal extension CassandraClient { - struct RowDecoder: Decoder { +extension CassandraClient { + internal struct RowDecoder: Decoder { private let row: Row var codingPath = [CodingKey]() diff --git a/Sources/CassandraClient/Errors.swift b/Sources/CassandraClient/Errors.swift index 5d978ed..11754da 100644 --- a/Sources/CassandraClient/Errors.swift +++ b/Sources/CassandraClient/Errors.swift @@ -14,9 +14,9 @@ @_implementationOnly import CDataStaxDriver -public extension CassandraClient { +extension CassandraClient { /// Possible ``CassandraClient`` errors. - struct Error: Swift.Error, Equatable, CustomStringConvertible { + public struct Error: Swift.Error, Equatable, CustomStringConvertible { private enum Code: Equatable { case rowsExhausted case disconnected @@ -58,7 +58,7 @@ public extension CassandraClient { self.init(errorCode, message: message) } - init(_ error: CassError, message: String? = nil) { + init(_ error: CassError, message: String? = .none) { let message = message ?? "" switch error { case CASS_ERROR_SERVER_SERVER_ERROR: @@ -343,8 +343,8 @@ public extension CassandraClient { } } -public extension CassandraClient { - struct ConfigurationError: Swift.Error, CustomStringConvertible { +extension CassandraClient { + public struct ConfigurationError: Swift.Error, CustomStringConvertible { public let message: String public var description: String { diff --git a/Sources/CassandraClient/Session.swift b/Sources/CassandraClient/Session.swift index 2e40a49..51fa7cf 100644 --- a/Sources/CassandraClient/Session.swift +++ b/Sources/CassandraClient/Session.swift @@ -82,7 +82,7 @@ public protocol CassandraSession { func getMetrics() -> CassandraMetrics } -internal extension CassandraSession { +extension CassandraSession { /// Execute a prepared statement. /// /// **All** rows are returned. @@ -92,21 +92,21 @@ internal extension CassandraSession { /// - logger: The `Logger` to use. Optional. /// /// - Returns: The resulting ``CassandraClient/Rows``. - func execute(statement: CassandraClient.Statement, logger: Logger? = nil) -> EventLoopFuture { + internal func execute(statement: CassandraClient.Statement, logger: Logger? = .none) -> EventLoopFuture { self.execute(statement: statement, on: nil, logger: logger) } } -public extension CassandraSession { +extension CassandraSession { /// Run insert / update / delete or DDL command where no result is expected. /// /// If `eventLoop` is `nil`, a new one will get created through the `EventLoopGroup` provided during initialization. - func run( + public func run( _ command: String, parameters: [CassandraClient.Statement.Value] = [], options: CassandraClient.Statement.Options = .init(), - on eventLoop: EventLoop? = nil, - logger: Logger? = nil + on eventLoop: EventLoop? = .none, + logger: Logger? = .none ) -> EventLoopFuture { self.query(command, parameters: parameters, options: options, on: eventLoop, logger: logger).map { _ in () } } @@ -114,12 +114,12 @@ public extension CassandraSession { /// Query small data-sets that fit into memory. Only use this when it is safe to buffer the entire data-set into memory. /// /// If `eventLoop` is `nil`, a new one will get created through the `EventLoopGroup` provided during initialization. - func query( + public func query( _ query: String, parameters: [CassandraClient.Statement.Value] = [], options: CassandraClient.Statement.Options = .init(), - on eventLoop: EventLoop? = nil, - logger: Logger? = nil, + on eventLoop: EventLoop? = .none, + logger: Logger? = .none, transform: @escaping (CassandraClient.Row) -> T? ) -> EventLoopFuture<[T]> { self.query(query, parameters: parameters, options: options, on: eventLoop, logger: logger).map { rows in @@ -130,12 +130,12 @@ public extension CassandraSession { /// Query small data-sets that fit into memory. Only use this when it's safe to buffer the entire data-set into memory. /// /// If `eventLoop` is `nil`, a new one will get created through the `EventLoopGroup` provided during initialization. - func query( + public func query( _ query: String, parameters: [CassandraClient.Statement.Value] = [], options: CassandraClient.Statement.Options = .init(), - on eventLoop: EventLoop? = nil, - logger: Logger? = nil + on eventLoop: EventLoop? = .none, + logger: Logger? = .none ) -> EventLoopFuture<[T]> { self.query(query, parameters: parameters, options: options, on: eventLoop, logger: logger).flatMapThrowing { rows in try rows.map { row in @@ -151,12 +151,12 @@ public extension CassandraSession { /// - Important: /// - Advancing the iterator invalidates values retrieved by the previous iteration. /// - Attempting to wrap the ``CassandraClient/Rows`` sequence in a list will not work, use the transformer variant instead. - func query( + public func query( _ query: String, parameters: [CassandraClient.Statement.Value] = [], options: CassandraClient.Statement.Options = .init(), - on eventLoop: EventLoop? = nil, - logger: Logger? = nil + on eventLoop: EventLoop? = .none, + logger: Logger? = .none ) -> EventLoopFuture { do { let statement = try CassandraClient.Statement(query: query, parameters: parameters, options: options) @@ -170,13 +170,13 @@ public extension CassandraSession { /// Query large data-sets where the number of rows fetched at a time is limited by `pageSize`. /// /// If `eventLoop` is `nil`, a new one will get created through the `EventLoopGroup` provided during initialization. - func query( + public func query( _ query: String, parameters: [CassandraClient.Statement.Value] = [], pageSize: Int32, options: CassandraClient.Statement.Options = .init(), - on eventLoop: EventLoop? = nil, - logger: Logger? = nil + on eventLoop: EventLoop? = .none, + logger: Logger? = .none ) -> EventLoopFuture { do { let statement = try CassandraClient.Statement(query: query, parameters: parameters, options: options) @@ -188,8 +188,8 @@ public extension CassandraSession { } } -internal extension CassandraClient { - final class Session: CassandraSession { +extension CassandraClient { + internal final class Session: CassandraSession { private let eventLoopGroupContainer: EventLoopGroupConainer public var eventLoopGroup: EventLoopGroup { self.eventLoopGroupContainer.value @@ -240,7 +240,7 @@ internal extension CassandraClient { } } - func execute(statement: Statement, on eventLoop: EventLoop?, logger: Logger? = nil) -> EventLoopFuture { + func execute(statement: Statement, on eventLoop: EventLoop?, logger: Logger? = .none) -> EventLoopFuture { let eventLoop = eventLoop ?? self.eventLoopGroup.next() let logger = logger ?? self.logger @@ -293,7 +293,7 @@ internal extension CassandraClient { } } - func execute(statement: Statement, pageSize: Int32, on eventLoop: EventLoop?, logger: Logger? = nil) -> EventLoopFuture { + func execute(statement: Statement, pageSize: Int32, on eventLoop: EventLoop?, logger: Logger? = .none) -> EventLoopFuture { let eventLoop = eventLoop ?? self.eventLoopGroup.next() do { @@ -370,25 +370,25 @@ internal extension CassandraClient { // MARK: - Cassandra session with async-await support #if compiler(>=5.5) && canImport(_Concurrency) -public extension CassandraSession { +extension CassandraSession { /// Run insert / update / delete or DDL commands where no result is expected @available(macOS 12, iOS 15, tvOS 15, watchOS 8, *) - func run( + public func run( _ command: String, parameters: [CassandraClient.Statement.Value] = [], options: CassandraClient.Statement.Options = .init(), - logger: Logger? = nil + logger: Logger? = .none ) async throws { _ = try await self.query(command, parameters: parameters, options: options, logger: logger) } /// Query small data-sets that fit into memory. Only use this when it's safe to buffer the entire data-set into memory. @available(macOS 12, iOS 15, tvOS 15, watchOS 8, *) - func query( + public func query( _ query: String, parameters: [CassandraClient.Statement.Value] = [], options: CassandraClient.Statement.Options = .init(), - logger: Logger? = nil, + logger: Logger? = .none, transform: @escaping (CassandraClient.Row) -> T? ) async throws -> [T] { let rows = try await self.query(query, parameters: parameters, options: options, logger: logger) @@ -397,11 +397,11 @@ public extension CassandraSession { /// Query small data-sets that fit into memory. Only use this when it's safe to buffer the entire data-set into memory. @available(macOS 12, iOS 15, tvOS 15, watchOS 8, *) - func query( + public func query( _ query: String, parameters: [CassandraClient.Statement.Value] = [], options: CassandraClient.Statement.Options = .init(), - logger: Logger? = nil + logger: Logger? = .none ) async throws -> [T] { let rows = try await self.query(query, parameters: parameters, options: options, logger: logger) return try rows.map { row in @@ -415,11 +415,11 @@ public extension CassandraSession { /// - Advancing the iterator invalidates values retrieved by the previous iteration. /// - Attempting to wrap the ``CassandraClient/Rows`` sequence in a list will not work, use the transformer variant instead. @available(macOS 12, iOS 15, tvOS 15, watchOS 8, *) - func query( + public func query( _ query: String, parameters: [CassandraClient.Statement.Value] = [], options: CassandraClient.Statement.Options = .init(), - logger: Logger? = nil + logger: Logger? = .none ) async throws -> CassandraClient.Rows { let statement = try CassandraClient.Statement(query: query, parameters: parameters, options: options) return try await self.execute(statement: statement, logger: logger) @@ -427,12 +427,12 @@ public extension CassandraSession { /// Query large data-sets where the number of rows fetched at a time is limited by `pageSize`. @available(macOS 12, iOS 15, tvOS 15, watchOS 8, *) - func query( + public func query( _ query: String, parameters: [CassandraClient.Statement.Value] = [], pageSize: Int32, options: CassandraClient.Statement.Options = .init(), - logger: Logger? = nil + logger: Logger? = .none ) async throws -> CassandraClient.PaginatedRows { let statement = try CassandraClient.Statement(query: query, parameters: parameters, options: options) return try await self.execute(statement: statement, pageSize: pageSize, logger: logger) @@ -441,7 +441,7 @@ public extension CassandraSession { extension CassandraClient.Session { @available(macOS 12, iOS 15, tvOS 15, watchOS 8, *) - func execute(statement: CassandraClient.Statement, logger: Logger? = nil) async throws -> CassandraClient.Rows { + func execute(statement: CassandraClient.Statement, logger: Logger? = .none) async throws -> CassandraClient.Rows { let logger = logger ?? self.logger lock.lock() @@ -485,7 +485,7 @@ extension CassandraClient.Session { } @available(macOS 12, iOS 15, tvOS 15, watchOS 8, *) - func execute(statement: CassandraClient.Statement, pageSize: Int32, logger: Logger? = nil) async throws -> CassandraClient.PaginatedRows { + func execute(statement: CassandraClient.Statement, pageSize: Int32, logger: Logger? = .none) async throws -> CassandraClient.PaginatedRows { try statement.setPagingSize(pageSize) return CassandraClient.PaginatedRows(session: self, statement: statement, logger: logger) } diff --git a/Sources/CassandraClient/Statement.swift b/Sources/CassandraClient/Statement.swift index 8048e23..60dd326 100644 --- a/Sources/CassandraClient/Statement.swift +++ b/Sources/CassandraClient/Statement.swift @@ -15,9 +15,9 @@ @_implementationOnly import CDataStaxDriver import Foundation // for date and uuid -public extension CassandraClient { +extension CassandraClient { /// A prepared statement to run in a Cassandra database. - final class Statement: CustomStringConvertible { + public final class Statement: CustomStringConvertible { internal let query: String internal let parameters: [Value] internal let options: Options @@ -79,33 +79,7 @@ public extension CassandraClient { } if let consistency = options.consistency { - let cassConsistency: CassConsistency - switch consistency { - case .any: - cassConsistency = CASS_CONSISTENCY_ANY - case .one: - cassConsistency = CASS_CONSISTENCY_ONE - case .two: - cassConsistency = CASS_CONSISTENCY_TWO - case .three: - cassConsistency = CASS_CONSISTENCY_THREE - case .quorum: - cassConsistency = CASS_CONSISTENCY_QUORUM - case .all: - cassConsistency = CASS_CONSISTENCY_ALL - case .localQuorum: - cassConsistency = CASS_CONSISTENCY_LOCAL_QUORUM - case .eachQuorum: - cassConsistency = CASS_CONSISTENCY_EACH_QUORUM - case .serial: - cassConsistency = CASS_CONSISTENCY_SERIAL - case .localSerial: - cassConsistency = CASS_CONSISTENCY_LOCAL_SERIAL - case .localOne: - cassConsistency = CASS_CONSISTENCY_LOCAL_ONE - } - - try checkResult { cass_statement_set_consistency(self.rawPointer, cassConsistency) } + try checkResult { cass_statement_set_consistency(self.rawPointer, consistency.cassConsistency) } } } @@ -141,26 +115,11 @@ public extension CassandraClient { case bytesUnsafe(UnsafeBufferPointer) } - /// Consistency levels - public enum Consistency { - case any - case one - case two - case three - case quorum - case all - case localQuorum - case eachQuorum - case serial - case localSerial - case localOne - } - public struct Options: CustomStringConvertible { - /// Desired consistency level - public var consistency: Consistency? + /// Sets the statement's consistency level. Default is `.localOne`. + public var consistency: CassandraClient.Consistency? - public init(consistency: Consistency? = nil) { + public init(consistency: CassandraClient.Consistency? = .none) { self.consistency = consistency } diff --git a/Tests/CassandraClientTests/CassandraClientTests.swift b/Tests/CassandraClientTests/CassandraClientTests.swift index e81d450..4bcce76 100644 --- a/Tests/CassandraClientTests/CassandraClientTests.swift +++ b/Tests/CassandraClientTests/CassandraClientTests.swift @@ -30,11 +30,11 @@ final class Tests: XCTestCase { self.configuration = CassandraClient.Configuration( contactPointsProvider: { callback in callback(.success([env["CASSANDRA_HOST"] ?? "127.0.0.1"])) }, port: env["CASSANDRA_CQL_PORT"].flatMap(Int32.init) ?? 9042, - protocolVersion: .v3, - username: env["CASSANDRA_USER"], - password: env["CASSANDRA_PASSWORD"], - keyspace: keyspace + protocolVersion: .v3 ) + self.configuration.username = env["CASSANDRA_USER"] + self.configuration.password = env["CASSANDRA_PASSWORD"] + self.configuration.keyspace = keyspace var logger = Logger(label: "test") logger.logLevel = .debug diff --git a/docker/docker-compose-dev.yaml b/docker/docker-compose-dev.yaml index 3771201..b3f1287 100644 --- a/docker/docker-compose-dev.yaml +++ b/docker/docker-compose-dev.yaml @@ -1,6 +1,6 @@ # this file is not designed to be run directly # instead, use the docker-compose.. files -# eg docker-compose -f docker/docker-compose.yaml -f docker/docker-compose.2004.56.yaml run test +# eg docker-compose -f docker/docker-compose-dev.yaml -f docker/docker-compose.2004.56.yaml run test version: "3" services: