Skip to content

Commit

Permalink
Provide way to set cluster consistency (#8)
Browse files Browse the repository at this point in the history
* Provide way to set cluster consistency

rdar://102086631

* Run swiftformat

* Change swiftformat rule to put access control keyword on declaration rather than extension

* Clean up configuration init

* Document default consistency levels
  • Loading branch information
yim-lee authored Nov 9, 2022
1 parent a39d71d commit f42a51d
Show file tree
Hide file tree
Showing 11 changed files with 226 additions and 238 deletions.
1 change: 1 addition & 0 deletions .swiftformat
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

--ifdef no-indent
--indent 4
--extensionacl on-declarations
--patternlet inline
--self insert
--stripunusedargs closure-only
Expand Down
20 changes: 10 additions & 10 deletions Sources/CassandraClient/CassandraClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public class CassandraClient: CassandraSession {
/// - logger: If `nil`, the client's default `Logger` is used.
///
/// - Returns: The resulting ``Rows``.
public func execute(statement: Statement, on eventLoop: EventLoop?, logger: Logger? = nil) -> EventLoopFuture<Rows> {
public func execute(statement: Statement, on eventLoop: EventLoop?, logger: Logger? = .none) -> EventLoopFuture<Rows> {
self.defaultSession.execute(statement: statement, on: eventLoop, logger: logger)
}

Expand All @@ -105,7 +105,7 @@ public class CassandraClient: CassandraSession {
/// - logger: If `nil`, the client's default `Logger` is used.
///
/// - Returns: The ``PaginatedRows``.
public func execute(statement: Statement, pageSize: Int32, on eventLoop: EventLoop?, logger: Logger? = nil) -> EventLoopFuture<PaginatedRows> {
public func execute(statement: Statement, pageSize: Int32, on eventLoop: EventLoop?, logger: Logger? = .none) -> EventLoopFuture<PaginatedRows> {
self.defaultSession.execute(statement: statement, pageSize: pageSize, on: eventLoop, logger: logger)
}

Expand All @@ -116,7 +116,7 @@ public class CassandraClient: CassandraSession {
/// - logger: If `nil`, the client's default `Logger` is used.
///
/// - Returns: The newly created session.
public func makeSession(keyspace: String?, logger: Logger? = nil) -> CassandraSession {
public func makeSession(keyspace: String?, logger: Logger? = .none) -> CassandraSession {
var configuration = self.configuration
configuration.keyspace = keyspace
let logger = logger ?? self.logger
Expand All @@ -129,7 +129,7 @@ public class CassandraClient: CassandraSession {
/// - keyspace: If `nil`, the client's default keyspace is used.
/// - logger: If `nil`, the client's default `Logger` is used.
/// - handler: The closure to invoke, passing in the newly created session.
public func withSession(keyspace: String?, logger: Logger? = nil, handler: (CassandraSession) throws -> Void) rethrows {
public func withSession(keyspace: String?, logger: Logger? = .none, handler: (CassandraSession) throws -> Void) rethrows {
let session = self.makeSession(keyspace: keyspace, logger: logger)
defer {
do {
Expand All @@ -149,7 +149,7 @@ public class CassandraClient: CassandraSession {
/// - handler: The closure to invoke, passing in the newly created session.
///
/// - Returns: The resulting `EventLoopFuture` of the closure.
public func withSession<T>(keyspace: String?, logger: Logger? = nil, handler: (CassandraSession) -> EventLoopFuture<T>) -> EventLoopFuture<T> {
public func withSession<T>(keyspace: String?, logger: Logger? = .none, handler: (CassandraSession) -> EventLoopFuture<T>) -> EventLoopFuture<T> {
let session = self.makeSession(keyspace: keyspace, logger: logger)
return handler(session).always { _ in
do {
Expand All @@ -175,7 +175,7 @@ public class CassandraClient: CassandraSession {
}

#if compiler(>=5.5) && canImport(_Concurrency)
public extension CassandraClient {
extension CassandraClient {
/// Execute a ``Statement`` using the default ``CassandraSession``.
///
/// **All** rows are returned.
Expand All @@ -186,7 +186,7 @@ public extension CassandraClient {
///
/// - Returns: The resulting ``Rows``.
@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
func execute(statement: Statement, logger: Logger? = nil) async throws -> Rows {
public func execute(statement: Statement, logger: Logger? = .none) async throws -> Rows {
try await self.defaultSession.execute(statement: statement, logger: logger)
}

Expand All @@ -201,7 +201,7 @@ public extension CassandraClient {
///
/// - Returns: The ``PaginatedRows``.
@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
func execute(statement: Statement, pageSize: Int32, logger: Logger? = nil) async throws -> PaginatedRows {
public func execute(statement: Statement, pageSize: Int32, logger: Logger? = .none) async throws -> PaginatedRows {
try await self.defaultSession.execute(statement: statement, pageSize: pageSize, logger: logger)
}

Expand All @@ -212,7 +212,7 @@ public extension CassandraClient {
/// - logger: If `nil`, the client's default `Logger` is used.
/// - closure: The closure to invoke, passing in the newly created session.
@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
func withSession(keyspace: String?, logger: Logger? = nil, closure: (CassandraSession) async throws -> Void) async throws {
public func withSession(keyspace: String?, logger: Logger? = .none, closure: (CassandraSession) async throws -> Void) async throws {
let session = self.makeSession(keyspace: keyspace, logger: logger)
defer {
do {
Expand All @@ -233,7 +233,7 @@ public extension CassandraClient {
///
/// - Returns: The result of the closure.
@available(macOS 12, iOS 15, tvOS 15, watchOS 8, *)
func withSession<T>(keyspace: String?, logger: Logger? = nil, handler: (CassandraSession) async throws -> T) async throws -> T {
public func withSession<T>(keyspace: String?, logger: Logger? = .none, handler: (CassandraSession) async throws -> T) async throws -> T {
let session = self.makeSession(keyspace: keyspace, logger: logger)
defer {
do {
Expand Down
93 changes: 31 additions & 62 deletions Sources/CassandraClient/Configuration.swift
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@
import NIO

// TODO: add more config option per C++ cluster impl
public extension CassandraClient {
extension CassandraClient {
/// Configuration for the ``CassandraClient``.
struct Configuration: CustomStringConvertible {
public struct Configuration: CustomStringConvertible {
public typealias ContactPoints = [String]

/// Provides the initial `ContactPoints` of the Cassandra cluster.
/// This can be a subset since each Cassandra instance is capable of discovering its peers.
public var contactPointsProvider: (@escaping (Result<ContactPoints, Swift.Error>) -> Void) -> Void

public var port: Int32
public var protocolVersion: ProtocolVersion
public var username: String?
Expand All @@ -37,7 +38,7 @@ public extension CassandraClient {
public var coreConnectionsPerHost: UInt32?
public var tcpNodelay: Bool?
public var tcpKeepalive: Bool?
public var tcpKeepaliveDelaySeconds: UInt32
public var tcpKeepaliveDelaySeconds: UInt32 = 0
public var connectionHeartbeatInterval: UInt32?
public var connectionIdleTimeout: UInt32?
public var schema: Bool?
Expand All @@ -47,6 +48,9 @@ public extension CassandraClient {
public var prepareStrategy: PrepareStrategy?
public var compact: Bool?

/// Sets the cluster's consistency level. Default is `.localOne`.
public var consistency: CassandraClient.Consistency?

public enum SpeculativeExecutionPolicy {
case constant(delayInMillseconds: Int64, maxExecutions: Int32)
case disabled
Expand All @@ -68,51 +72,11 @@ public extension CassandraClient {
public init(
contactPointsProvider: @escaping (@escaping (Result<ContactPoints, Swift.Error>) -> Void) -> Void,
port: Int32,
protocolVersion: ProtocolVersion,
username: String? = nil,
password: String? = nil,
ssl: SSL? = nil,
keyspace: String? = nil,
numIOThreads: UInt32? = nil,
connectTimeoutMillis: UInt32? = nil,
requestTimeoutMillis: UInt32? = nil,
resolveTimeoutMillis: UInt32? = nil,
coreConnectionsPerHost: UInt32? = nil,
tcpNodelay: Bool? = nil,
tcpKeepalive: Bool? = nil,
tcpKeepaliveDelaySeconds: UInt32 = 0,
connectionHeartbeatInterval: UInt32? = nil,
connectionIdleTimeout: UInt32? = nil,
schema: Bool? = nil,
hostnameResolution: Bool? = nil,
randomizedContactPoints: Bool? = nil,
speculativeExecutionPolicy: SpeculativeExecutionPolicy? = nil,
prepareStrategy: PrepareStrategy? = nil,
compact: Bool? = nil
protocolVersion: ProtocolVersion
) {
self.contactPointsProvider = contactPointsProvider
self.port = port
self.protocolVersion = protocolVersion
self.username = username
self.password = password
self.ssl = ssl
self.keyspace = keyspace
self.numIOThreads = numIOThreads
self.connectTimeoutMillis = connectTimeoutMillis
self.requestTimeoutMillis = requestTimeoutMillis
self.resolveTimeoutMillis = resolveTimeoutMillis
self.coreConnectionsPerHost = coreConnectionsPerHost
self.tcpNodelay = tcpNodelay
self.tcpKeepalive = tcpKeepalive
self.tcpKeepaliveDelaySeconds = tcpKeepaliveDelaySeconds
self.connectionHeartbeatInterval = connectionHeartbeatInterval
self.connectionIdleTimeout = connectionIdleTimeout
self.schema = schema
self.hostnameResolution = hostnameResolution
self.randomizedContactPoints = randomizedContactPoints
self.speculativeExecutionPolicy = speculativeExecutionPolicy
self.prepareStrategy = prepareStrategy
self.compact = compact
}

internal func makeCluster(on eventLoop: EventLoop) -> EventLoopFuture<Cluster> {
Expand Down Expand Up @@ -167,40 +131,40 @@ public extension CassandraClient {
if let ssl = self.ssl {
try cluster.setSSL(try ssl.makeSSLContext())
}
if let value = numIOThreads {
if let value = self.numIOThreads {
try cluster.setNumThreadsIO(value)
}
if let value = connectTimeoutMillis {
if let value = self.connectTimeoutMillis {
try cluster.setConnectTimeout(value)
}
if let value = requestTimeoutMillis {
if let value = self.requestTimeoutMillis {
try cluster.setRequestTimeout(value)
}
if let value = resolveTimeoutMillis {
if let value = self.resolveTimeoutMillis {
try cluster.setResolveTimeout(value)
}
if let value = coreConnectionsPerHost {
if let value = self.coreConnectionsPerHost {
try cluster.setCoreConnectionsPerHost(value)
}
if let value = tcpNodelay {
if let value = self.tcpNodelay {
try cluster.setTcpNodelay(value)
}
if let value = tcpKeepalive {
if let value = self.tcpKeepalive {
try cluster.setTcpKeepalive(value, delayInSeconds: self.tcpKeepaliveDelaySeconds)
}
if let value = connectionHeartbeatInterval {
if let value = self.connectionHeartbeatInterval {
try cluster.setConnectionHeartbeatInterval(value)
}
if let value = connectionIdleTimeout {
if let value = self.connectionIdleTimeout {
try cluster.setConnectionIdleTimeout(value)
}
if let value = schema {
if let value = self.schema {
try cluster.setUseSchema(value)
}
if let value = hostnameResolution {
if let value = self.hostnameResolution {
try cluster.setUseHostnameResolution(value)
}
if let value = randomizedContactPoints {
if let value = self.randomizedContactPoints {
try cluster.setUseRandomizedContactPoints(value)
}
switch self.speculativeExecutionPolicy {
Expand All @@ -219,9 +183,12 @@ public extension CassandraClient {
case .none:
break
}
if let value = compact {
if let value = self.compact {
try cluster.setNoCompact(!value)
}
if let value = self.consistency {
try cluster.setConsistency(value.cassConsistency)
}

return cluster
}
Expand Down Expand Up @@ -338,6 +305,10 @@ internal final class Cluster {
try self.checkResult { cass_cluster_set_no_compact(self.rawPointer, enabled ? cass_true : cass_false) }
}

func setConsistency(_ consistency: CassConsistency) throws {
try self.checkResult { cass_cluster_set_consistency(self.rawPointer, consistency) }
}

func setSSL(_ ssl: SSLContext) throws {
cass_cluster_set_ssl(self.rawPointer, ssl.rawPointer)
}
Expand All @@ -352,8 +323,8 @@ internal final class Cluster {

// MARK: - SSL

public extension CassandraClient.Configuration {
struct SSL {
extension CassandraClient.Configuration {
public struct SSL {
public var trustedCertificates: [String]?
public var verifyFlag: VerifyFlag?
public var cert: String?
Expand All @@ -373,9 +344,7 @@ public extension CassandraClient.Configuration {
case peerIdentityDNS
}

public init(trustedCertificates: [String]?) {
self.trustedCertificates = trustedCertificates
}
public init() {}

func makeSSLContext() throws -> SSLContext {
let sslContext = SSLContext()
Expand Down
59 changes: 59 additions & 0 deletions Sources/CassandraClient/Consistency.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Swift Cassandra Client open source project
//
// Copyright (c) 2022 Apple Inc. and the Swift Cassandra Client project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of Swift Cassandra Client project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//

@_implementationOnly import CDataStaxDriver

extension CassandraClient {
/// Consistency levels
public enum Consistency {
case any
case one
case two
case three
case quorum
case all
case localQuorum
case eachQuorum
case serial
case localSerial
case localOne

var cassConsistency: CassConsistency {
switch self {
case .any:
return CASS_CONSISTENCY_ANY
case .one:
return CASS_CONSISTENCY_ONE
case .two:
return CASS_CONSISTENCY_TWO
case .three:
return CASS_CONSISTENCY_THREE
case .quorum:
return CASS_CONSISTENCY_QUORUM
case .all:
return CASS_CONSISTENCY_ALL
case .localQuorum:
return CASS_CONSISTENCY_LOCAL_QUORUM
case .eachQuorum:
return CASS_CONSISTENCY_EACH_QUORUM
case .serial:
return CASS_CONSISTENCY_SERIAL
case .localSerial:
return CASS_CONSISTENCY_LOCAL_SERIAL
case .localOne:
return CASS_CONSISTENCY_LOCAL_ONE
}
}
}
}
Loading

0 comments on commit f42a51d

Please sign in to comment.