Skip to content

Commit

Permalink
Run pre-commit hooks for the first time
Browse files Browse the repository at this point in the history
  • Loading branch information
lpsinger committed Aug 2, 2024
1 parent 83282b8 commit 9f19710
Show file tree
Hide file tree
Showing 7 changed files with 39 additions and 40 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ consumer = Consumer(client_id='fill me in',

A key feature of kafka consumer clients is the ability to perform persistent tracking of which messages have been read. This allows clients to recover missed messages after a restart by beginning at the earliest unread message rather than the next available message from the stream. In order to enable this feature, you will need to set a client Group ID using the configuration dictionary argument for the Consumer class as well as change the auto offset reset option to the ‘earliest’ setting. Once this is done, every new client with the given Group ID will begin reading the specified topic at the earliest unread message. When doing this, it is recommended to turn OFF the auto commit feature because it can lose track of the last read message if the client crashes before the auto commit interval (5 seconds by default) occurs. Manually committing messages (i.e. storing the state of the last read message) once they are read is the most robust method for tracking the last read message.

Example code:
Example code:
```python3
from gcn_kafka import Consumer

Expand Down
4 changes: 2 additions & 2 deletions gcn_kafka/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# SPDX-License-Identifier: CC0-1.0

from .core import Consumer, Producer, AdminClient
from ._version import version as __version__ # noqa: F401
from .core import AdminClient, Consumer, Producer
from .env import config_from_env
from ._version import version as __version__

__all__ = ("config_from_env", "Consumer", "Producer", "AdminClient")
6 changes: 3 additions & 3 deletions gcn_kafka/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,14 @@ def get_config(mode, config, **kwargs):
if client_secret:
config.setdefault("sasl.oauthbearer.client.secret", client_secret)
config.setdefault(
"sasl.oauthbearer.token.endpoint.url",
f"https://auth.{domain}/oauth2/token")
"sasl.oauthbearer.token.endpoint.url", f"https://auth.{domain}/oauth2/token"
)

if mode == "consumer" and not config.get("group.id"):
config["group.id"] = str(uuid4())

if mode == "producer":
config.setdefault('compression.type', 'zstd')
config.setdefault("compression.type", "zstd")

set_oauth_cb(config)
return config
Expand Down
10 changes: 6 additions & 4 deletions gcn_kafka/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@
import re
from typing import Mapping, Optional

env_key_splitter = re.compile(r'_+')
replacement_dict = {'_': '.', '__': '-', '___': '_'}
env_key_splitter = re.compile(r"_+")
replacement_dict = {"_": ".", "__": "-", "___": "_"}


# Adapted from https://peps.python.org/pep-0616/
# # FIXME: Remove after dropping support for Python 3.8
def removeprefix(self: str, prefix: str) -> str:
if self.startswith(prefix):
return self[len(prefix):]
return self[len(prefix) :]
else:
return self[:]

Expand All @@ -22,7 +22,9 @@ def replacement(match: re.Match) -> str:
return replacement_dict.get(text) or text


def config_from_env(env: Optional[Mapping[str, str]] = None, prefix: str = 'KAFKA_') -> Mapping[str, str]:
def config_from_env(
env: Optional[Mapping[str, str]] = None, prefix: str = "KAFKA_"
) -> Mapping[str, str]:
"""Construct a Kafka client configuration dictionary from env variables.
This uses the same rules as
https://docs.confluent.io/platform/current/installation/docker/config-reference.html
Expand Down
24 changes: 10 additions & 14 deletions gcn_kafka/test/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,32 +3,28 @@

def test_update_config_no_overwrite():
config = {
'client_id':'qwertyuiopasdfghjklzxcvbnm',
'client_secret':'qwertyuiopljhgfdsazxcvbnmlkjhgfdsaertyuio',
'domain':'test.test.test'
"client_id": "qwertyuiopasdfghjklzxcvbnm",
"client_secret": "qwertyuiopljhgfdsazxcvbnmlkjhgfdsaertyuio",
"domain": "test.test.test",
}

newConfig = update_config(
config,
client_id=None,
client_secret=None
)
newConfig = update_config(config, client_id=None, client_secret=None)

assert newConfig == config


def test_update_config_with_overwrite():
config = {
'client_id':'qwertyuiopasdfghjklzxcvbnm',
'client_secret':'qwertyuiopljhgfdsazxcvbnmlkjhgfdsaertyuio',
'domain':'test.test.test'
"client_id": "qwertyuiopasdfghjklzxcvbnm",
"client_secret": "qwertyuiopljhgfdsazxcvbnmlkjhgfdsaertyuio",
"domain": "test.test.test",
}

newConfig = update_config(
config,
client_id="client_id update Success",
client_secret="client_secret update Success"
client_secret="client_secret update Success",
)

assert newConfig['client_id'] == "client_id update Success"
assert newConfig['client_secret'] == "client_secret update Success"
assert newConfig["client_id"] == "client_id update Success"
assert newConfig["client_secret"] == "client_secret update Success"
13 changes: 6 additions & 7 deletions gcn_kafka/test/test_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,12 @@


def test_config_from_env(monkeypatch):
env = {'FOO_BAR_BAT__BAZ___': '123',
'XYZZ_BAR_BAT__BAZ___': '456'}
env = {"FOO_BAR_BAT__BAZ___": "123", "XYZZ_BAR_BAT__BAZ___": "456"}

config = config_from_env(env, 'FOO_')
assert config == {'bar.bat-baz_': '123'}
config = config_from_env(env, "FOO_")
assert config == {"bar.bat-baz_": "123"}

monkeypatch.setattr(os, 'environ', env)
monkeypatch.setattr(os, "environ", env)

config = config_from_env(prefix='FOO_')
assert config == {'bar.bat-baz_': '123'}
config = config_from_env(prefix="FOO_")
assert config == {"bar.bat-baz_": "123"}
20 changes: 11 additions & 9 deletions gcn_kafka/test/test_oidc.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,23 @@ def test_no_oidc():

def test_oidc(monkeypatch):
mock_session_class = MagicMock()
monkeypatch.setattr(oidc, 'OAuth2Session', mock_session_class)
monkeypatch.setattr(oidc, "OAuth2Session", mock_session_class)

config = {
'sasl.oauthbearer.method': 'oidc',
'sasl.oauthbearer.client.id': 'client_id',
'sasl.oauthbearer.client.secret': 'client_secret',
'sasl.oauthbearer.scope': 'scope',
'sasl.oauthbearer.token.endpoint.url': 'token_endpoint'
"sasl.oauthbearer.method": "oidc",
"sasl.oauthbearer.client.id": "client_id",
"sasl.oauthbearer.client.secret": "client_secret",
"sasl.oauthbearer.scope": "scope",
"sasl.oauthbearer.token.endpoint.url": "token_endpoint",
}
oidc.set_oauth_cb(config)

oauth_cb = config.pop('oauth_cb')
oauth_cb = config.pop("oauth_cb")
assert config == {}
mock_session_class.assert_called_once_with(
'client_id', 'client_secret', scope='scope')
"client_id", "client_secret", scope="scope"
)
oauth_cb()
mock_session_class.return_value.fetch_token.assert_called_once_with(
"token_endpoint", grant_type="client_credentials")
"token_endpoint", grant_type="client_credentials"
)

0 comments on commit 9f19710

Please sign in to comment.