Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelize topics - consumer groups #267

Merged
merged 4 commits into from
Jan 12, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions config.defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ kowalski:
bootstrap.test.servers: "localhost:9092"
zookeeper.test: "localhost:2181"
path: "kafka_2.13-3.4.1"
processes_per_topic:
ZTF: 1

database:
max_pool_size: 200
Expand Down
43 changes: 28 additions & 15 deletions kowalski/alert_brokers/alert_broker_ztf.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,8 +317,6 @@ def make_filter_templates(self, active_filters: Sequence):
self.verbose > 1,
):
response = self.api_skyportal_get_group(active_filter["group_id"])
if self.verbose > 1:
log(response.json())
if response.json()["status"] == "success":
group_name = (
response.json()["data"]["nickname"]
Expand Down Expand Up @@ -861,6 +859,7 @@ def watchdog(obs_date: str = None, test: bool = False):
for t in topics
if (datestr in t)
and ("programid" in t)
and ("ztf" in t)
and ("zuds" not in t)
and ("pgir" not in t)
]
Expand All @@ -878,23 +877,35 @@ def watchdog(obs_date: str = None, test: bool = False):
bootstrap_servers = config["kafka"]["bootstrap.test.servers"]
group = config["kafka"]["group"]

topics_on_watch[t] = multiprocessing.Process(
target=topic_listener,
args=(t, bootstrap_servers, offset_reset, group, test),
processes_per_topic = (
config["kafka"].get("processes_per_topic", {}).get("ZTF", 1)
)
topics_on_watch[t].daemon = True
log(f"set daemon to true {topics_on_watch}")
topics_on_watch[t].start()
topics_on_watch[t] = []
for _ in range(processes_per_topic):
topics_on_watch[t].append(
multiprocessing.Process(
target=topic_listener,
args=(t, bootstrap_servers, offset_reset, group, test),
)
)

for i in range(processes_per_topic):
topics_on_watch[t][i].daemon = True
log(f"set daemon to true {topics_on_watch}")
topics_on_watch[t][i].start()

else:
log(f"Performing thread health check for {t}")
try:
if not topics_on_watch[t].is_alive():
log(f"Thread {t} died, removing")
# topics_on_watch[t].terminate()
topics_on_watch.pop(t, None)
else:
log(f"Thread {t} appears normal")
for i in range(len(topics_on_watch[t])):
if not topics_on_watch[t][i].is_alive():
log(f"Thread {i} topic {t} died, removing")
topics_on_watch[t].pop(i)
else:
log(f"Thread {i} topic {t} appears normal")
if len(topics_on_watch[t]) == 0:
log(f"Topic {t} has no threads left, removing")
topics_on_watch.pop(t)
except Exception as _e:
log(f"Failed to perform health check: {_e}")
pass
Expand All @@ -903,7 +914,9 @@ def watchdog(obs_date: str = None, test: bool = False):
time.sleep(120)
# when testing, wait for topic listeners to pull all the data, then break
for t in topics_on_watch:
topics_on_watch[t].kill()
for i in range(len(topics_on_watch[t])):
topics_on_watch[t][i].kill()
log(f"Test mode: Killed thread {i} - topic {t} after 120s")
break

except Exception as e:
Expand Down
Loading