Skip to content

Commit

Permalink
websockets support
Browse files Browse the repository at this point in the history
  • Loading branch information
worryg0d committed Mar 21, 2024
1 parent 476725e commit db37df0
Show file tree
Hide file tree
Showing 6 changed files with 218 additions and 31 deletions.
29 changes: 28 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,32 @@ client.on('log', (level, loggerName, message, furtherInfo) => {
The `level` being passed to the listener can be `verbose`, `info`, `warning` or `error`. Visit the [logging
documentation][doc-logging] for more information.

## WebSockets

You can use websocket as transport. But Cassandra doesn't support this protocol
so some proxy should be deployed in front of Cassandra, which can handle this transport protocol.

```javascript
const client = new cassandra.Client({
transport: 'WebSocket',
contactPoints: [
// some proxies that support websocket transport
'127.0.0.1:9043',
'localhost:9044'
],
webSocketOptions: {
// some client websocket options
protocolVersion: 13,
...
}
});
```

You can configure your websocket client with `webSocketOptions`.
To properly configure it follow [websocket/ws doc][ws-doc].

You also can use websockets over SSL by passing `transport: 'SecureWebSocket'`.

## Compatibility

The driver supports all versions of Node.js, Cassandra, and DSE that are not EOL at the time of release. Only LTS eligible branches (i.e. even numbered releases) are supported for Node.js. See the [project documentation] for more information about the Node.js release cycle.
Expand Down Expand Up @@ -296,4 +322,5 @@ Unless required by applicable law or agreed to in writing, software distributed
[streams2]: https://nodejs.org/api/stream.html#stream_class_stream_readable
[cql-udt]: https://cassandra.apache.org/doc/latest/cql/types.html#udts
[dse]: https://www.datastax.com/products/datastax-enterprise
[astra]: https://www.datastax.com/products/datastax-astra
[astra]: https://www.datastax.com/products/datastax-astra
[ws-doc]: https://github.com/websockets/ws/blob/master/doc/ws.md#new-websocketaddress-protocols-options
6 changes: 6 additions & 0 deletions index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import { metrics } from './lib/metrics';
import { tracker } from './lib/tracker';
import { metadata } from './lib/metadata';
import { datastax } from './lib/datastax/';
import { ClientRequestArgs } from 'http';
import Long = types.Long;
import Uuid = types.Uuid;
import graph = datastax.graph;
Expand Down Expand Up @@ -191,7 +192,11 @@ export interface ExecutionOptions {
setHints(hints: string[]): void;
}

export type WebSocketClientOptions = (ClientOptions | ClientRequestArgs)
& {protocols?: string | string[] | undefined};

export interface ClientOptions {
transport?: 'SecureWebSocket' | 'WebSocket' | undefined
contactPoints?: string[];
localDataCenter?: string;
keyspace?: string;
Expand Down Expand Up @@ -253,6 +258,7 @@ export interface ClientOptions {
tcpNoDelay?: boolean;
};
sslOptions?: tls.ConnectionOptions;
webSocketOptions?: WebSocketClientOptions;
}

export interface QueryOptions {
Expand Down
81 changes: 61 additions & 20 deletions lib/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ const StreamIdStack = require('./stream-id-stack');
const OperationState = require('./operation-state');
const promiseUtils = require('./promise-utils');
const { ExecutionOptions } = require('./execution-options');
const { WebSocketWrapper } = require('./websocket');

/**
* Represents a connection to a Cassandra node
Expand Down Expand Up @@ -171,30 +172,70 @@ class Connection extends events.EventEmitter {
const self = this;
this.log('info', `Connecting to ${this.endpointFriendlyName}`);

if (!this.options.sslOptions) {
this.netClient = new net.Socket({ highWaterMark: this.options.socketOptions.coalescingThreshold });
this.netClient.connect(this.port, this.address, function connectCallback() {
self.log('verbose', `Socket connected to ${self.endpointFriendlyName}`);
self.bindSocketListeners();
self.startup(callback);
});
}
else {
// Use TLS
const sslOptions = utils.extend({ rejectUnauthorized: false }, this.options.sslOptions);
if (this.options.transport) {
if (this.options.transport.toLowerCase() === 'securewebsocket') {
// Use secure WebSocket
const options = utils.extend({ rejectUnauthorized: false, transport: this.options.transport },
this.options.webSocketOptions);

if (!options.protocols) {
options.protocols = ['cql'];
}

this.netClient = new WebSocketWrapper(options);

this.netClient.connect(this.port, this.address, function connectCallback() {
self.log('verbose', `Secure WebSocket to ${self.endpointFriendlyName}`);
self.bindSocketListeners();
self.startup(callback);
});
} else {
// Use WebSocket
const options = utils.extend({
transport: this.options.transport,
highWaterMark: this.options.socketOptions.coalescingThreshold,
handshakeTimeout: this.options.socketOptions.connectTimeout,
}, this.options.webSocketOptions);

if (!options.protocols) {
options.protocols = ['cql'];
}

if (this.options.sni) {
sslOptions.servername = this._serverName;
this.netClient = new WebSocketWrapper(options);

this.netClient.connect(this.port, this.address, function connectCallback() {
self.log('verbose', `WebSocket connected to ${self.endpointFriendlyName}`);
self.bindSocketListeners();
self.startup(callback);
});
}
} else {
// Use Socket
if (!this.options.sslOptions) {
this.netClient = new net.Socket({ highWaterMark: this.options.socketOptions.coalescingThreshold });

this.netClient.connect(this.port, this.address, function connectCallback() {
self.log('verbose', `Socket connected to ${self.endpointFriendlyName}`);
self.bindSocketListeners();
self.startup(callback);
});
} else {
// Use Socket with TLS
const sslOptions = utils.extend({ rejectUnauthorized: false }, this.options.sslOptions);

this.netClient = tls.connect(this.port, this.address, sslOptions, function tlsConnectCallback() {
self.log('verbose', `Secure socket connected to ${self.endpointFriendlyName} with protocol ${self.netClient.getProtocol()}`);
self.bindSocketListeners();
self.startup(callback);
});
if (this.options.sni) {
sslOptions.servername = this._serverName;
}

// TLSSocket will validate for values from 512 to 16K (depending on the SSL protocol version)
this.netClient.setMaxSendFragment(this.options.socketOptions.coalescingThreshold);
this.netClient = tls.connect(this.port, this.address, sslOptions, function tlsConnectCallback() {
self.log('verbose', `Secure socket connected to ${self.endpointFriendlyName} with protocol ${self.netClient.getProtocol()}`);
self.bindSocketListeners();
self.startup(callback);
});

// TLSSocket will validate for values from 512 to 16K (depending on the SSL protocol version)
this.netClient.setMaxSendFragment(this.options.socketOptions.coalescingThreshold);
}
}

this.netClient.once('error', function socketError(err) {
Expand Down
91 changes: 91 additions & 0 deletions lib/websocket.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright DataStax, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

'use strict';

const { EventEmitter } = require('events');
const { WebSocket } = require('ws');

/**
* WebSocketWrapper is a wrapper on the `ws.Websocket` which implements
* `net.Socket` interface to be used by the `cassandra.Connection`
*/
class WebSocketWrapper extends EventEmitter {
/**
* Creates a websocket wrapper instance. To connect use `connect` method
* @param {object} options client options for a websocket
*/
constructor(options) {
super();
this.options = options;
}

/**
* Creates an instance of a websocket and connects
* @param {String} port
* @param {String} address
* @param {() => void} connectionCallback is called when connection is successfully established
* @returns {WebSocketWrapper} wrapper itself
*/
connect(port, address, connectionCallback) {
const schema = this.options.transport.toLowerCase() === 'securewebsocket' ? 'wss' : 'ws';

this.ws = new WebSocket(schema+'://'+address+':'+port, this.options.protocols, this.options);

if (connectionCallback) {
this.ws.on('open', connectionCallback);
}

const stream = WebSocket.createWebSocketStream(this.ws, this.options);

stream.on('error', err => {
this.emit('error', err);
});
stream.on('drain', () => {
this.emit('drain');
});
stream.on('close', () => {
this.emit('close');
});
stream.on('end', () => {
this.emit('end');
});

this.write = stream.write.bind(stream);
this.pipe = stream.pipe.bind(stream);
this.end = stream.end.bind(stream);
this.destroy = stream.destroy.bind(stream);

return this;
}

/**
* It is not implemented because `ws` lib doesn't provide API to work with
*/
setTimeout() {}

/**
* It is not implemented because `ws` lib doesn't provide API to work with
*/
setKeepAlive() {}

/**
* It is not implemented because `ws` lib doesn't provide API to work with
*/
setNoDelay() {}
}

module.exports.WebSocketWrapper = WebSocketWrapper;
39 changes: 30 additions & 9 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
"@types/long": "~5.0.0",
"@types/node": ">=8",
"adm-zip": "~0.5.10",
"long": "~5.2.3"
"long": "~5.2.3",
"ws": "^8.16.0"
},
"devDependencies": {
"chai": "~4.3.8",
Expand Down

0 comments on commit db37df0

Please sign in to comment.