From 530420d74dffdebb53ac9f64e9278978a66d6887 Mon Sep 17 00:00:00 2001 From: Ariel Boukris <42237792+Bouk250@users.noreply.github.com> Date: Sun, 20 Oct 2024 06:17:40 +0300 Subject: [PATCH] [improve][client] Adding support for token supplier for the AuthenticationToken (#395) * Adding support for token supplier for the AuthenticationToken * Add producer example * Add test * Add jest test * Fix authentification destructor - release token supplier callback * Add example for simple token auth; Improve example for token supplier auth; Clean code --- examples/consummer_token_auth.js | 50 +++++++++++++ examples/consummer_token_auth_supplier.js | 56 +++++++++++++++ examples/producer_token_auth.js | 48 +++++++++++++ examples/producer_token_auth_supplier.js | 53 ++++++++++++++ index.d.ts | 2 +- src/Authentication.cc | 86 +++++++++++++++++++++-- src/Authentication.h | 3 + src/TokenSupplier.h | 31 ++++++++ tests/end_to_end.test.js | 44 ++++++++++++ tstest.ts | 4 +- 10 files changed, 370 insertions(+), 7 deletions(-) create mode 100644 examples/consummer_token_auth.js create mode 100644 examples/consummer_token_auth_supplier.js create mode 100644 examples/producer_token_auth.js create mode 100644 examples/producer_token_auth_supplier.js create mode 100644 src/TokenSupplier.h diff --git a/examples/consummer_token_auth.js b/examples/consummer_token_auth.js new file mode 100644 index 00000000..e6a90197 --- /dev/null +++ b/examples/consummer_token_auth.js @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +const { exit } = require('process'); +const Pulsar = require('../'); +console.log("Starting consumer"); +(async () => { + + const auth = new Pulsar.AuthenticationToken({token: "eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJKb2UifQ.ipevRNuRP6HflG8cFKnmUPtypruRC4fb1DWtoLL62SY"}); + + // Create a client + const client = new Pulsar.Client({ + serviceUrl: 'pulsar://localhost:6650', + authentication: auth, + }); + + // Create a consumer + const consumer = await client.subscribe({ + topic: 'persistent://public/default/my-topic', + subscription: 'sub1', + subscriptionType: 'Shared', + ackTimeoutMs: 10000, + }); + + // Receive messages + for (let i = 0; i < 10; i += 1) { + const msg = await consumer.receive(); + console.log(msg.getData().toString()); + consumer.acknowledge(msg); + } + + await consumer.close(); + await client.close(); +})(); diff --git a/examples/consummer_token_auth_supplier.js b/examples/consummer_token_auth_supplier.js new file mode 100644 index 00000000..a85b6a1b --- /dev/null +++ b/examples/consummer_token_auth_supplier.js @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +const { exit } = require('process'); +const Pulsar = require('..'); +console.log("Starting consumer"); + +async function getToken() { + console.log("Get token"); + return "eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJKb2UifQ.ipevRNuRP6HflG8cFKnmUPtypruRC4fb1DWtoLL62SY"; +} + +(async () => { + + const auth = new Pulsar.AuthenticationToken({token: getToken}); + + // Create a client + const client = new Pulsar.Client({ + serviceUrl: 'pulsar://localhost:6650', + authentication: auth, + }); + + // Create a consumer + const consumer = await client.subscribe({ + topic: 'persistent://public/default/my-topic', + subscription: 'sub1', + subscriptionType: 'Shared', + ackTimeoutMs: 10000, + }); + + // Receive messages + for (let i = 0; i < 10; i += 1) { + const msg = await consumer.receive(); + console.log(msg.getData().toString()); + consumer.acknowledge(msg); + } + + await consumer.close(); + await client.close(); +})(); diff --git a/examples/producer_token_auth.js b/examples/producer_token_auth.js new file mode 100644 index 00000000..f0af460a --- /dev/null +++ b/examples/producer_token_auth.js @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +const Pulsar = require('../'); + +(async () => { + const auth = new Pulsar.AuthenticationToken({token: "eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJKb2UifQ.ipevRNuRP6HflG8cFKnmUPtypruRC4fb1DWtoLL62SY"}); + + // Create a client + const client = new Pulsar.Client({ + serviceUrl: 'pulsar://localhost:6650', + authentication: auth, + }); + + // Create a producer + const producer = await client.createProducer({ + topic: 'persistent://public/default/my-topic', + }); + + // Send messages + for (let i = 0; i < 10; i += 1) { + const msg = `my-message-${i}`; + producer.send({ + data: Buffer.from(msg), + }); + console.log(`Sent message: ${msg}`); + } + await producer.flush(); + + await producer.close(); + await client.close(); +})(); diff --git a/examples/producer_token_auth_supplier.js b/examples/producer_token_auth_supplier.js new file mode 100644 index 00000000..6b121e30 --- /dev/null +++ b/examples/producer_token_auth_supplier.js @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +const Pulsar = require('..'); + +async function getToken() { + console.log("Get token"); + return "eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJKb2UifQ.ipevRNuRP6HflG8cFKnmUPtypruRC4fb1DWtoLL62SY"; +} + +(async () => { + const auth = new Pulsar.AuthenticationToken({token: getToken}); + + // Create a client + const client = new Pulsar.Client({ + serviceUrl: 'pulsar://localhost:6650', + authentication: auth, + }); + + // Create a producer + const producer = await client.createProducer({ + topic: 'persistent://public/default/my-topic', + }); + + // Send messages + for (let i = 0; i < 10; i += 1) { + const msg = `my-message-${i}`; + producer.send({ + data: Buffer.from(msg), + }); + console.log(`Sent message: ${msg}`); + } + await producer.flush(); + + await producer.close(); + await client.close(); +})(); diff --git a/index.d.ts b/index.d.ts index 7c2ae814..804aa641 100644 --- a/index.d.ts +++ b/index.d.ts @@ -225,7 +225,7 @@ export class AthenzX509Config { } export class AuthenticationToken { - constructor(params: { token: string }); + constructor(params: { token: string | (() => string) | (() => Promise) }); } export class AuthenticationOauth2 { diff --git a/src/Authentication.cc b/src/Authentication.cc index 0b4dd7eb..226e74f7 100644 --- a/src/Authentication.cc +++ b/src/Authentication.cc @@ -18,6 +18,7 @@ */ #include "Authentication.h" +#include static const std::string PARAM_TLS_CERT = "certificatePath"; static const std::string PARAM_TLS_KEY = "privateKeyPath"; @@ -25,6 +26,62 @@ static const std::string PARAM_TOKEN = "token"; static const std::string PARAM_USERNAME = "username"; static const std::string PARAM_PASSWORD = "password"; +void FinalizeTokenSupplierCallback(Napi::Env env, TokenSupplierCallback *cb, void *) { delete cb; } + +struct TokenSupplierProxyData { + std::function callback; + std::string token; + + TokenSupplierProxyData(std::function callback) : callback(callback), token(std::string()) {} +}; + +void TokenSupplierProxy(Napi::Env env, Napi::Function jsCallback, TokenSupplierProxyData *data) { + Napi::Value ret = jsCallback.Call({}); + if (ret.IsPromise()) { + Napi::Promise promise = ret.As(); + Napi::Value thenValue = promise.Get("then"); + if (thenValue.IsFunction()) { + Napi::Function then = thenValue.As(); + Napi::Function callback = Napi::Function::New(env, [data](const Napi::CallbackInfo &info) { + Napi::Value value = info[0]; + if (value.IsString()) { + data->token = value.ToString().Utf8Value(); + } + data->callback(); + }); + then.Call(promise, {callback}); + return; + } + } + if (ret.IsString()) { + data->token = ret.ToString().Utf8Value(); + } + data->callback(); +} + +char *TokenSupplier(void *ctx) { + TokenSupplierCallback *tokenSupplierCallback = (TokenSupplierCallback *)ctx; + if (tokenSupplierCallback->callback.Acquire() != napi_ok) { + char *empty = (char *)malloc(0); + return empty; + } + + std::promise promise; + std::future future = promise.get_future(); + + std::unique_ptr dataPtr( + new TokenSupplierProxyData([&promise]() { promise.set_value(); })); + + tokenSupplierCallback->callback.BlockingCall(dataPtr.get(), TokenSupplierProxy); + tokenSupplierCallback->callback.Release(); + + future.wait(); + + char *token = (char *)malloc(dataPtr->token.size()); + strcpy(token, dataPtr->token.c_str()); + return token; +} + Napi::FunctionReference Authentication::constructor; Napi::Object Authentication::Init(Napi::Env env, Napi::Object exports) { @@ -69,12 +126,28 @@ Authentication::Authentication(const Napi::CallbackInfo &info) pulsar_authentication_tls_create(obj.Get(PARAM_TLS_CERT).ToString().Utf8Value().c_str(), obj.Get(PARAM_TLS_KEY).ToString().Utf8Value().c_str()); } else if (authMethod == "token") { - if (!obj.Has(PARAM_TOKEN) || !obj.Get(PARAM_TOKEN).IsString()) { - Napi::Error::New(env, "Missing required parameter").ThrowAsJavaScriptException(); - return; + if (obj.Has(PARAM_TOKEN)) { + if (obj.Get(PARAM_TOKEN).IsString()) { + this->cAuthentication = + pulsar_authentication_token_create(obj.Get(PARAM_TOKEN).ToString().Utf8Value().c_str()); + return; + } + + if (obj.Get(PARAM_TOKEN).IsFunction()) { + this->tokenSupplier = new TokenSupplierCallback(); + Napi::ThreadSafeFunction callback = Napi::ThreadSafeFunction::New( + obj.Env(), obj.Get(PARAM_TOKEN).As(), "Token Supplier Callback", 1, 1, + (void *)NULL, FinalizeTokenSupplierCallback, this->tokenSupplier); + this->tokenSupplier->callback = std::move(callback); + + this->cAuthentication = + pulsar_authentication_token_create_with_supplier(&TokenSupplier, this->tokenSupplier); + return; + } } - this->cAuthentication = - pulsar_authentication_token_create(obj.Get(PARAM_TOKEN).ToString().Utf8Value().c_str()); + + Napi::Error::New(env, "Missing required parameter").ThrowAsJavaScriptException(); + return; } else if (authMethod == "basic") { if (!obj.Has(PARAM_USERNAME) || !obj.Get(PARAM_USERNAME).IsString() || !obj.Has(PARAM_PASSWORD) || !obj.Get(PARAM_PASSWORD).IsString()) { @@ -105,6 +178,9 @@ Authentication::Authentication(const Napi::CallbackInfo &info) } Authentication::~Authentication() { + if (this->tokenSupplier != nullptr) { + this->tokenSupplier->callback.Release(); + } if (this->cAuthentication != nullptr) { pulsar_authentication_free(this->cAuthentication); } diff --git a/src/Authentication.h b/src/Authentication.h index 3666bd8f..2a507dd3 100644 --- a/src/Authentication.h +++ b/src/Authentication.h @@ -22,6 +22,7 @@ #include #include +#include "TokenSupplier.h" class Authentication : public Napi::ObjectWrap { public: @@ -33,6 +34,8 @@ class Authentication : public Napi::ObjectWrap { private: static Napi::FunctionReference constructor; pulsar_authentication_t *cAuthentication; + + TokenSupplierCallback *tokenSupplier; }; #endif diff --git a/src/TokenSupplier.h b/src/TokenSupplier.h new file mode 100644 index 00000000..61132a55 --- /dev/null +++ b/src/TokenSupplier.h @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef PULSAR_CLIENT_NODE_TOKENSUPPLIER_H +#define PULSAR_CLIENT_NODE_TOKENSUPPLIER_H + +#include + +struct TokenSupplierCallback { + Napi::ThreadSafeFunction callback; + + TokenSupplierCallback() {} +}; + +#endif // PULSAR_CLIENT_NODE_TOKENSUPPLIER_H diff --git a/tests/end_to_end.test.js b/tests/end_to_end.test.js index 1a551def..dbdf8bd6 100644 --- a/tests/end_to_end.test.js +++ b/tests/end_to_end.test.js @@ -1284,5 +1284,49 @@ const Pulsar = require('../index'); await consumer.close(); await client.close(); }); + + test('AuthenticationToken token supplier', async () => { + const mockTokenSupplier = jest.fn().mockReturnValue('token'); + const auth = new Pulsar.AuthenticationToken({ + token: mockTokenSupplier, + }); + const client = new Pulsar.Client({ + serviceUrl: 'pulsar://localhost:6650', + authentication: auth, + }); + + // A producer/consumer is needed to triger the callback function + const topic = 'persistent://public/default/token-auth'; + const producer = await client.createProducer({ + topic, + }); + expect(producer).not.toBeNull(); + expect(mockTokenSupplier).toHaveBeenCalledTimes(1); + + await producer.close(); + await client.close(); + }); + + test('AuthenticationToken async token supplier', async () => { + const mockTokenSupplier = jest.fn().mockResolvedValue('token'); + const auth = new Pulsar.AuthenticationToken({ + token: mockTokenSupplier, + }); + const client = new Pulsar.Client({ + serviceUrl: 'pulsar://localhost:6650', + authentication: auth, + }); + + // A producer/consumer is needed to triger the callback function + const topic = 'persistent://public/default/token-auth'; + const producer = await client.createProducer({ + topic, + }); + expect(producer).not.toBeNull(); + expect(mockTokenSupplier).toHaveBeenCalledTimes(1); + + await producer.close(); + await client.close(); + }); }); })(); diff --git a/tstest.ts b/tstest.ts index 611f4f22..1ca89b73 100644 --- a/tstest.ts +++ b/tstest.ts @@ -67,7 +67,9 @@ import Pulsar = require('./index'); }); const authToken: Pulsar.AuthenticationToken = new Pulsar.AuthenticationToken({ - token: 'foobar', + token: async () => { + return 'foobar'; + }, }); const authBasic: Pulsar.AuthenticationBasic = new Pulsar.AuthenticationBasic({