Skip to content

Commit

Permalink
Add multi-tenant support for OAuth authentication (streamnative#1728)
Browse files Browse the repository at this point in the history
### Motivation

Currently, KoP stores all group and offset metadata in one topic,
`public/__kafka/__consumer_offsets`. It's not easy to extend and might
encounter performance issues for large amount of consumers.

> in the long run as consumers keep increasing as all these consumers
share the same topic would there be slowness in committing the offsets
etc.

To solve this issue, we can specify the tenant in `PLAIN`
authentication's username, for example:

```
required username="public/default" password="token:xxx";
```

But when using `OAuth` authentication, there is no way to specify the
tenant.

In this PR, we will introduce a way to specify tenants on OAuth
authentication, and we will add a new property in
`credentials_file.json`. For example:

```json
{
  "client_id":"Xd23RHsUnvUlP7wchjNYOaIfazgeHd9x",
  "client_secret":"rT7ps7WY8uhdVuBTKWZkttwLdQotmdEliaM5rLfmgNibvqziZ-g07ZH52N_poGAb",
  "tenant": "my_tenant"
}
```


**Internal design**

The tenant will be encoded to a token sent by the client, the token
format will be `{tenant} __with_tenant__{token}`, since the token only
allows to `(?<token>[-_\.a-zA-Z0-9]+)`, so here used `__with_tenant__ `
as the delimiter. On the KoP server side, it will try to extract the
tenant and token, the tenant will be used as KoP metadata tenant.

### Modifications

Add multi-tenant support for OAuth authentication.
  • Loading branch information
Demogorgon314 authored and eolivelli committed Feb 28, 2023
1 parent 6fe20fb commit 3d29dff
Show file tree
Hide file tree
Showing 18 changed files with 467 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ public Object getNegotiatedProperty(String propName) {
return tokenForNegotiatedProperty.authDataSource();
}
if (USER_NAME_PROP.equals(propName)) {
if (tokenForNegotiatedProperty.tenant() != null) {
return tokenForNegotiatedProperty.tenant();
}
return defaultKafkaMetadataTenant;
}
return NEGOTIATED_PROPERTY_KEY_TOKEN.equals(propName) ? tokenForNegotiatedProperty : null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,10 @@ public interface KopOAuthBearerToken extends OAuthBearerToken {
* Pass the auth data to oauth server.
*/
AuthenticationDataSource authDataSource();

/**
* Pass the tenant to oauth server if credentials set.
*/
String tenant();
}

Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
public class KopOAuthBearerUnsecuredJws extends OAuthBearerUnsecuredJws implements KopOAuthBearerToken {

private final AuthenticationDataCommand authData;

private final String tenant;
/**
* Constructor with the given principal and scope claim names.
*
Expand All @@ -41,14 +43,21 @@ public class KopOAuthBearerUnsecuredJws extends OAuthBearerUnsecuredJws implemen
* after decoding; or the mandatory '{@code alg}' header value is
* not "{@code none}")
*/
public KopOAuthBearerUnsecuredJws(String compactSerialization, String principalClaimName, String scopeClaimName)
public KopOAuthBearerUnsecuredJws(String compactSerialization, String tenant, String principalClaimName,
String scopeClaimName)
throws OAuthBearerIllegalTokenException {
super(compactSerialization, principalClaimName, scopeClaimName);
this.authData = new AuthenticationDataCommand(compactSerialization);
this.tenant = tenant;
}

@Override
public AuthenticationDataSource authDataSource() {
return authData;
}

@Override
public String tenant() {
return tenant;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import javax.security.auth.callback.UnsupportedCallbackException;
import javax.security.auth.login.AppConfigurationEntry;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule;
import org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerConfigException;
Expand Down Expand Up @@ -115,7 +116,11 @@ private void handleCallback(KopOAuthBearerValidatorCallback callback) {
String scopeClaimName = scopeClaimName();
List<String> requiredScope = requiredScope();
int allowableClockSkewMs = allowableClockSkewMs();
KopOAuthBearerUnsecuredJws unsecuredJwt = new KopOAuthBearerUnsecuredJws(tokenValue, principalClaimName,
// Extract real token.
Pair<String, String> tokenAndTenant = OAuthTokenDecoder.decode(tokenValue);
final String token = tokenAndTenant.getLeft();
final String tenant = tokenAndTenant.getRight();
KopOAuthBearerUnsecuredJws unsecuredJwt = new KopOAuthBearerUnsecuredJws(token, tenant, principalClaimName,
scopeClaimName);
long now = time.milliseconds();
OAuthBearerValidationUtils
Expand Down Expand Up @@ -175,4 +180,4 @@ private String option(String key) {
}
return moduleOptions.get(Objects.requireNonNull(key));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.pulsar.handlers.kop.security.oauth;

import lombok.NonNull;
import org.apache.commons.lang3.tuple.Pair;

public class OAuthTokenDecoder {

protected static final String DELIMITER = "__with_tenant_";

/**
* Decode the raw token to token and tenant.
*
* @param rawToken Raw token, it contains token and tenant. Format: Tenant + "__with_tenant_" + Token.
* @return Token and tenant pair. Left is token, right is tenant.
*/
public static Pair<String, String> decode(@NonNull String rawToken) {
final String token;
final String tenant;
// Extract real token.
int idx = rawToken.indexOf(DELIMITER);
if (idx != -1) {
token = rawToken.substring(idx + DELIMITER.length());
tenant = rawToken.substring(0, idx);
} else {
token = rawToken;
tenant = null;
}
return Pair.of(token, tenant);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.streamnative.pulsar.handlers.kop.security.oauth;

import com.google.common.annotations.VisibleForTesting;
import io.streamnative.pulsar.handlers.kop.security.SaslAuthenticator;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
Expand All @@ -25,6 +26,7 @@
import javax.security.auth.callback.UnsupportedCallbackException;
import javax.security.auth.login.AppConfigurationEntry;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule;
import org.apache.kafka.common.security.oauthbearer.internals.unsecured.OAuthBearerIllegalTokenException;
Expand All @@ -41,9 +43,19 @@
@Slf4j
public class OauthValidatorCallbackHandler implements AuthenticateCallbackHandler {

private static final String DELIMITER = "__with_tenant_";

private ServerConfig config = null;
private AuthenticationService authenticationService;

public OauthValidatorCallbackHandler() {}

@VisibleForTesting
protected OauthValidatorCallbackHandler(ServerConfig config, AuthenticationService authenticationService) {
this.config = config;
this.authenticationService = authenticationService;
}

@Override
public void configure(Map<String, ?> configs, String saslMechanism, List<AppConfigurationEntry> jaasConfigEntries) {
if (!OAuthBearerLoginModule.OAUTHBEARER_MECHANISM.equals(saslMechanism)) {
Expand Down Expand Up @@ -92,7 +104,8 @@ public void handle(Callback[] callbacks) throws IOException, UnsupportedCallback
}
}

private void handleCallback(KopOAuthBearerValidatorCallback callback) {
@VisibleForTesting
protected void handleCallback(KopOAuthBearerValidatorCallback callback) {
if (callback.tokenValue() == null) {
throw new IllegalArgumentException("Callback has null token value!");
}
Expand All @@ -105,7 +118,13 @@ private void handleCallback(KopOAuthBearerValidatorCallback callback) {
throw new IllegalStateException("No AuthenticationProvider found for method " + config.getValidateMethod());
}

final String token = callback.tokenValue();
final String tokenWithTenant = callback.tokenValue();

// Extract real token.
Pair<String, String> tokenAndTenant = OAuthTokenDecoder.decode(tokenWithTenant);
final String token = tokenAndTenant.getLeft();
final String tenant = tokenAndTenant.getRight();

try {
final AuthenticationState authState = authenticationProvider.newAuthState(
AuthData.of(token.getBytes(StandardCharsets.UTF_8)), null, null);
Expand Down Expand Up @@ -138,6 +157,11 @@ public AuthenticationDataSource authDataSource() {
return authDataSource;
}

@Override
public String tenant() {
return tenant;
}

@Override
public Long startTimeMs() {
// TODO: convert "iat" claim to ms.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/**
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.pulsar.handlers.kop.security.oauth;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;

import org.apache.commons.lang3.tuple.Pair;
import org.testng.annotations.Test;

/**
* Test {@link OAuthTokenDecoder}.
*/
public class OAuthTokenDecoderTest {

@Test
public void testDecode() {
Pair<String, String> tokenAndTenant = OAuthTokenDecoder.decode("my-token");
assertEquals(tokenAndTenant.getLeft(), "my-token");
assertNull(tokenAndTenant.getRight());
tokenAndTenant = OAuthTokenDecoder.decode("my-tenant" + OAuthTokenDecoder.DELIMITER + "my-token");
assertEquals(tokenAndTenant.getLeft(), "my-token");
assertEquals(tokenAndTenant.getRight(), "my-tenant");
}
}
6 changes: 6 additions & 0 deletions oauth-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,12 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.streamnative.pulsar.handlers</groupId>
<artifactId>test-listener</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ public ClientCredentialsFlow(ClientConfig clientConfig) {
.build());
}

@VisibleForTesting
protected ClientCredentialsFlow(ClientConfig clientConfig, AsyncHttpClient httpClient) {
this.clientConfig = clientConfig;
this.httpClient = httpClient;
}

public OAuthBearerTokenImpl authenticate() throws IOException {
final String tokenEndPoint = findAuthorizationServer().getTokenEndPoint();
final ClientInfo clientInfo = loadPrivateKey();
Expand All @@ -76,7 +82,13 @@ public OAuthBearerTokenImpl authenticate() throws IOException {
.get();
switch (response.getStatusCode()) {
case 200:
return TOKEN_RESULT_READER.readValue(response.getResponseBodyAsBytes());
OAuthBearerTokenImpl token = TOKEN_RESULT_READER.readValue(response.getResponseBodyAsBytes());
String tenant = clientInfo.getTenant();
// Add tenant for multi-tenant.
if (tenant != null) {
token.setTenant(tenant);
}
return token;
case 400: // Bad request
case 401: // Unauthorized
throw new IOException(OBJECT_MAPPER.writeValueAsString(
Expand Down Expand Up @@ -153,6 +165,9 @@ public static class ClientInfo {

@JsonProperty("client_secret")
private String secret;

@JsonProperty("tenant")
private String tenant;
}

@Getter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
@JsonIgnoreProperties(ignoreUnknown = true)
public class OAuthBearerTokenImpl implements OAuthBearerToken {

protected static final String DELIMITER = "__with_tenant_";

@JsonProperty("access_token")
private String accessToken;

Expand All @@ -40,6 +42,10 @@ public String value() {
return accessToken;
}

public void setTenant(String tenant) {
this.accessToken = tenant + DELIMITER + accessToken;
}

@Override
public Set<String> scope() {
return (scope != null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,19 @@
*/
package io.streamnative.pulsar.handlers.kop.security.oauth;

import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;

import java.io.IOException;
import java.util.Collections;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.BoundRequestBuilder;
import org.asynchttpclient.ListenableFuture;
import org.asynchttpclient.Response;
import org.testng.Assert;
import org.testng.annotations.Test;

Expand All @@ -40,5 +51,41 @@ public void testLoadPrivateKey() throws Exception {
final ClientCredentialsFlow.ClientInfo clientInfo = flow.loadPrivateKey();
Assert.assertEquals(clientInfo.getId(), "my-id");
Assert.assertEquals(clientInfo.getSecret(), "my-secret");
Assert.assertEquals(clientInfo.getTenant(), "my-tenant");
}

@Test
public void testTenantToken() throws ExecutionException, InterruptedException, IOException {
AsyncHttpClient mockHttpClient = mock(AsyncHttpClient.class);
final ClientCredentialsFlow flow = spy(new ClientCredentialsFlow(ClientConfigHelper.create(
"http://localhost:4444",
Objects.requireNonNull(
getClass().getClassLoader().getResource("private_key.json")).toString()
), mockHttpClient));

ClientCredentialsFlow.Metadata mockMetadata = mock(ClientCredentialsFlow.Metadata.class);
doReturn("mockTokenEndPoint").when(mockMetadata).getTokenEndPoint();

doReturn(mockMetadata).when(flow).findAuthorizationServer();

BoundRequestBuilder mockBuilder = mock(BoundRequestBuilder.class);
ListenableFuture<Response> mockFuture = mock(ListenableFuture.class);
Response mockResponse = mock(Response.class);
doReturn(200).when(mockResponse).getStatusCode();
String responseString = "{\n"
+ " \"access_token\":\"my-token\",\n"
+ " \"expires_in\":42,\n"
+ " \"scope\":\"test\"\n"
+ "}";
doReturn(responseString.getBytes()).when(mockResponse).getResponseBodyAsBytes();
doReturn(mockResponse).when(mockFuture).get();
doReturn(mockFuture).when(mockBuilder).execute();
doReturn(mockBuilder).when(mockHttpClient).preparePost(anyString());
doReturn(mockBuilder).when(mockBuilder).setHeader(anyString(), anyString());
doReturn(mockBuilder).when(mockBuilder).setBody(anyString());

OAuthBearerTokenImpl token = flow.authenticate();
Assert.assertEquals(token.value(), "my-tenant" + OAuthBearerTokenImpl.DELIMITER + "my-token");
Assert.assertEquals(token.scope(), Collections.singleton("test"));
}
}
3 changes: 2 additions & 1 deletion oauth-client/src/test/resources/private_key.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{
"client_id": "my-id",
"client_secret": "my-secret"
"client_secret": "my-secret",
"tenant": "my-tenant"
}
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,11 @@ public AuthenticationDataSource authDataSource() {
return new AuthenticationDataCommand(validationCallback.tokenValue());
}

@Override
public String tenant() {
return null;
}

@Override
public Long startTimeMs() {
return null;
Expand Down
Loading

0 comments on commit 3d29dff

Please sign in to comment.