Skip to content

Commit

Permalink
[improve][client] Adding support for token supplier for the Authentic…
Browse files Browse the repository at this point in the history
…ationToken (#395)

* Adding support for token supplier for the AuthenticationToken

* Add producer example

* Add test

* Add jest test

* Fix authentification destructor - release token supplier callback

* Add example for simple token auth; Improve example for token supplier auth; Clean code
  • Loading branch information
Bouk250 authored Oct 20, 2024
1 parent 511f707 commit 530420d
Show file tree
Hide file tree
Showing 10 changed files with 370 additions and 7 deletions.
50 changes: 50 additions & 0 deletions examples/consummer_token_auth.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

const { exit } = require('process');
const Pulsar = require('../');
console.log("Starting consumer");
(async () => {

const auth = new Pulsar.AuthenticationToken({token: "eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJKb2UifQ.ipevRNuRP6HflG8cFKnmUPtypruRC4fb1DWtoLL62SY"});

// Create a client
const client = new Pulsar.Client({
serviceUrl: 'pulsar://localhost:6650',
authentication: auth,
});

// Create a consumer
const consumer = await client.subscribe({
topic: 'persistent://public/default/my-topic',
subscription: 'sub1',
subscriptionType: 'Shared',
ackTimeoutMs: 10000,
});

// Receive messages
for (let i = 0; i < 10; i += 1) {
const msg = await consumer.receive();
console.log(msg.getData().toString());
consumer.acknowledge(msg);
}

await consumer.close();
await client.close();
})();
56 changes: 56 additions & 0 deletions examples/consummer_token_auth_supplier.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

const { exit } = require('process');
const Pulsar = require('..');
console.log("Starting consumer");

async function getToken() {
console.log("Get token");
return "eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJKb2UifQ.ipevRNuRP6HflG8cFKnmUPtypruRC4fb1DWtoLL62SY";
}

(async () => {

const auth = new Pulsar.AuthenticationToken({token: getToken});

// Create a client
const client = new Pulsar.Client({
serviceUrl: 'pulsar://localhost:6650',
authentication: auth,
});

// Create a consumer
const consumer = await client.subscribe({
topic: 'persistent://public/default/my-topic',
subscription: 'sub1',
subscriptionType: 'Shared',
ackTimeoutMs: 10000,
});

// Receive messages
for (let i = 0; i < 10; i += 1) {
const msg = await consumer.receive();
console.log(msg.getData().toString());
consumer.acknowledge(msg);
}

await consumer.close();
await client.close();
})();
48 changes: 48 additions & 0 deletions examples/producer_token_auth.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

const Pulsar = require('../');

(async () => {
const auth = new Pulsar.AuthenticationToken({token: "eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJKb2UifQ.ipevRNuRP6HflG8cFKnmUPtypruRC4fb1DWtoLL62SY"});

// Create a client
const client = new Pulsar.Client({
serviceUrl: 'pulsar://localhost:6650',
authentication: auth,
});

// Create a producer
const producer = await client.createProducer({
topic: 'persistent://public/default/my-topic',
});

// Send messages
for (let i = 0; i < 10; i += 1) {
const msg = `my-message-${i}`;
producer.send({
data: Buffer.from(msg),
});
console.log(`Sent message: ${msg}`);
}
await producer.flush();

await producer.close();
await client.close();
})();
53 changes: 53 additions & 0 deletions examples/producer_token_auth_supplier.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

const Pulsar = require('..');

async function getToken() {
console.log("Get token");
return "eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJKb2UifQ.ipevRNuRP6HflG8cFKnmUPtypruRC4fb1DWtoLL62SY";
}

(async () => {
const auth = new Pulsar.AuthenticationToken({token: getToken});

// Create a client
const client = new Pulsar.Client({
serviceUrl: 'pulsar://localhost:6650',
authentication: auth,
});

// Create a producer
const producer = await client.createProducer({
topic: 'persistent://public/default/my-topic',
});

// Send messages
for (let i = 0; i < 10; i += 1) {
const msg = `my-message-${i}`;
producer.send({
data: Buffer.from(msg),
});
console.log(`Sent message: ${msg}`);
}
await producer.flush();

await producer.close();
await client.close();
})();
2 changes: 1 addition & 1 deletion index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ export class AthenzX509Config {
}

export class AuthenticationToken {
constructor(params: { token: string });
constructor(params: { token: string | (() => string) | (() => Promise<string>) });
}

export class AuthenticationOauth2 {
Expand Down
86 changes: 81 additions & 5 deletions src/Authentication.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,70 @@
*/

#include "Authentication.h"
#include <future>

static const std::string PARAM_TLS_CERT = "certificatePath";
static const std::string PARAM_TLS_KEY = "privateKeyPath";
static const std::string PARAM_TOKEN = "token";
static const std::string PARAM_USERNAME = "username";
static const std::string PARAM_PASSWORD = "password";

void FinalizeTokenSupplierCallback(Napi::Env env, TokenSupplierCallback *cb, void *) { delete cb; }

struct TokenSupplierProxyData {
std::function<void(void)> callback;
std::string token;

TokenSupplierProxyData(std::function<void(void)> callback) : callback(callback), token(std::string()) {}
};

void TokenSupplierProxy(Napi::Env env, Napi::Function jsCallback, TokenSupplierProxyData *data) {
Napi::Value ret = jsCallback.Call({});
if (ret.IsPromise()) {
Napi::Promise promise = ret.As<Napi::Promise>();
Napi::Value thenValue = promise.Get("then");
if (thenValue.IsFunction()) {
Napi::Function then = thenValue.As<Napi::Function>();
Napi::Function callback = Napi::Function::New(env, [data](const Napi::CallbackInfo &info) {
Napi::Value value = info[0];
if (value.IsString()) {
data->token = value.ToString().Utf8Value();
}
data->callback();
});
then.Call(promise, {callback});
return;
}
}
if (ret.IsString()) {
data->token = ret.ToString().Utf8Value();
}
data->callback();
}

char *TokenSupplier(void *ctx) {
TokenSupplierCallback *tokenSupplierCallback = (TokenSupplierCallback *)ctx;
if (tokenSupplierCallback->callback.Acquire() != napi_ok) {
char *empty = (char *)malloc(0);
return empty;
}

std::promise<void> promise;
std::future<void> future = promise.get_future();

std::unique_ptr<TokenSupplierProxyData> dataPtr(
new TokenSupplierProxyData([&promise]() { promise.set_value(); }));

tokenSupplierCallback->callback.BlockingCall(dataPtr.get(), TokenSupplierProxy);
tokenSupplierCallback->callback.Release();

future.wait();

char *token = (char *)malloc(dataPtr->token.size());
strcpy(token, dataPtr->token.c_str());
return token;
}

Napi::FunctionReference Authentication::constructor;

Napi::Object Authentication::Init(Napi::Env env, Napi::Object exports) {
Expand Down Expand Up @@ -69,12 +126,28 @@ Authentication::Authentication(const Napi::CallbackInfo &info)
pulsar_authentication_tls_create(obj.Get(PARAM_TLS_CERT).ToString().Utf8Value().c_str(),
obj.Get(PARAM_TLS_KEY).ToString().Utf8Value().c_str());
} else if (authMethod == "token") {
if (!obj.Has(PARAM_TOKEN) || !obj.Get(PARAM_TOKEN).IsString()) {
Napi::Error::New(env, "Missing required parameter").ThrowAsJavaScriptException();
return;
if (obj.Has(PARAM_TOKEN)) {
if (obj.Get(PARAM_TOKEN).IsString()) {
this->cAuthentication =
pulsar_authentication_token_create(obj.Get(PARAM_TOKEN).ToString().Utf8Value().c_str());
return;
}

if (obj.Get(PARAM_TOKEN).IsFunction()) {
this->tokenSupplier = new TokenSupplierCallback();
Napi::ThreadSafeFunction callback = Napi::ThreadSafeFunction::New(
obj.Env(), obj.Get(PARAM_TOKEN).As<Napi::Function>(), "Token Supplier Callback", 1, 1,
(void *)NULL, FinalizeTokenSupplierCallback, this->tokenSupplier);
this->tokenSupplier->callback = std::move(callback);

this->cAuthentication =
pulsar_authentication_token_create_with_supplier(&TokenSupplier, this->tokenSupplier);
return;
}
}
this->cAuthentication =
pulsar_authentication_token_create(obj.Get(PARAM_TOKEN).ToString().Utf8Value().c_str());

Napi::Error::New(env, "Missing required parameter").ThrowAsJavaScriptException();
return;
} else if (authMethod == "basic") {
if (!obj.Has(PARAM_USERNAME) || !obj.Get(PARAM_USERNAME).IsString() || !obj.Has(PARAM_PASSWORD) ||
!obj.Get(PARAM_PASSWORD).IsString()) {
Expand Down Expand Up @@ -105,6 +178,9 @@ Authentication::Authentication(const Napi::CallbackInfo &info)
}

Authentication::~Authentication() {
if (this->tokenSupplier != nullptr) {
this->tokenSupplier->callback.Release();
}
if (this->cAuthentication != nullptr) {
pulsar_authentication_free(this->cAuthentication);
}
Expand Down
3 changes: 3 additions & 0 deletions src/Authentication.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

#include <napi.h>
#include <pulsar/c/authentication.h>
#include "TokenSupplier.h"

class Authentication : public Napi::ObjectWrap<Authentication> {
public:
Expand All @@ -33,6 +34,8 @@ class Authentication : public Napi::ObjectWrap<Authentication> {
private:
static Napi::FunctionReference constructor;
pulsar_authentication_t *cAuthentication;

TokenSupplierCallback *tokenSupplier;
};

#endif
31 changes: 31 additions & 0 deletions src/TokenSupplier.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#ifndef PULSAR_CLIENT_NODE_TOKENSUPPLIER_H
#define PULSAR_CLIENT_NODE_TOKENSUPPLIER_H

#include <napi.h>

struct TokenSupplierCallback {
Napi::ThreadSafeFunction callback;

TokenSupplierCallback() {}
};

#endif // PULSAR_CLIENT_NODE_TOKENSUPPLIER_H
Loading

0 comments on commit 530420d

Please sign in to comment.