-
-
Notifications
You must be signed in to change notification settings - Fork 435
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Have the dead letter also take events to avoid race conditions (#2267)
* Have the dead letter also take events to avoid race conditions * Ensure we take the event * Tests for event taken * Rename
- Loading branch information
Showing
5 changed files
with
172 additions
and
23 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,63 @@ | ||
#!/usr/bin/env python3 | ||
import argparse | ||
import time | ||
|
||
from sqlalchemy import func | ||
|
||
from app.events.event_dispatcher import EventDispatcher | ||
from app.events.generated.event_pb2 import UserPlanChanged, EventContent | ||
from app.models import PartnerUser | ||
from app.db import Session | ||
|
||
parser = argparse.ArgumentParser( | ||
prog="Backfill alias", description="Update alias notes and backfill flag" | ||
) | ||
parser.add_argument( | ||
"-s", "--start_pu_id", default=0, type=int, help="Initial partner_user_id" | ||
) | ||
parser.add_argument( | ||
"-e", "--end_pu_id", default=0, type=int, help="Last partner_user_id" | ||
) | ||
|
||
args = parser.parse_args() | ||
pu_id_start = args.start_pu_id | ||
max_pu_id = args.end_pu_id | ||
if max_pu_id == 0: | ||
max_pu_id = Session.query(func.max(PartnerUser.id)).scalar() | ||
|
||
print(f"Checking partner user {pu_id_start} to {max_pu_id}") | ||
step = 100 | ||
updated = 0 | ||
start_time = time.time() | ||
with_premium = 0 | ||
for batch_start in range(pu_id_start, max_pu_id, step): | ||
partner_users = ( | ||
Session.query(PartnerUser).filter( | ||
PartnerUser.id >= batch_start, PartnerUser.id < batch_start + step | ||
) | ||
).all() | ||
for partner_user in partner_users: | ||
subscription_end = partner_user.user.get_active_subscription_end( | ||
include_partner_subscription=False | ||
) | ||
end_timestamp = None | ||
if subscription_end: | ||
with_premium += 1 | ||
end_timestamp = subscription_end.timestamp | ||
event = UserPlanChanged(plan_end_time=end_timestamp) | ||
EventDispatcher.send_event( | ||
partner_user.user, EventContent(user_plan_change=event) | ||
) | ||
Session.flush() | ||
updated += 1 | ||
Session.commit() | ||
elapsed = time.time() - start_time | ||
last_batch_id = batch_start + step | ||
time_per_alias = elapsed / (last_batch_id) | ||
remaining = max_pu_id - last_batch_id | ||
time_remaining = remaining / time_per_alias | ||
hours_remaining = time_remaining / 60.0 | ||
print( | ||
f"\PartnerUser {batch_start}/{max_pu_id} {updated} {hours_remaining:.2f} mins remaining" | ||
) | ||
print(f"With SL premium {with_premium}") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,54 @@ | ||
import arrow | ||
|
||
from app.db import Session | ||
from app.models import SyncEvent | ||
from events.event_source import DeadLetterEventSource, _DEAD_LETTER_THRESHOLD_MINUTES | ||
|
||
|
||
class EventCounter: | ||
def __init__(self): | ||
self.processed_events = 0 | ||
|
||
def on_event(self, event: SyncEvent): | ||
self.processed_events += 1 | ||
|
||
|
||
def setup_function(func): | ||
Session.query(SyncEvent).delete() | ||
|
||
|
||
def test_dead_letter_does_not_take_untaken_events(): | ||
source = DeadLetterEventSource(1) | ||
counter = EventCounter() | ||
threshold_time = arrow.utcnow().shift(minutes=-(_DEAD_LETTER_THRESHOLD_MINUTES) + 1) | ||
SyncEvent.create( | ||
content="test".encode("utf-8"), created_at=threshold_time, flush=True | ||
) | ||
SyncEvent.create( | ||
content="test".encode("utf-8"), taken_time=threshold_time, flush=True | ||
) | ||
events_processed = source.execute_loop(on_event=counter.on_event) | ||
assert len(events_processed) == 0 | ||
assert counter.processed_events == 0 | ||
|
||
|
||
def test_dead_letter_takes_untaken_events_created_older_than_threshold(): | ||
source = DeadLetterEventSource(1) | ||
counter = EventCounter() | ||
old_create = arrow.utcnow().shift(minutes=-_DEAD_LETTER_THRESHOLD_MINUTES - 1) | ||
SyncEvent.create(content="test".encode("utf-8"), created_at=old_create, flush=True) | ||
events_processed = source.execute_loop(on_event=counter.on_event) | ||
assert len(events_processed) == 1 | ||
assert events_processed[0].taken_time > old_create | ||
assert counter.processed_events == 1 | ||
|
||
|
||
def test_dead_letter_takes_taken_events_created_older_than_threshold(): | ||
source = DeadLetterEventSource(1) | ||
counter = EventCounter() | ||
old_taken = arrow.utcnow().shift(minutes=-_DEAD_LETTER_THRESHOLD_MINUTES - 1) | ||
SyncEvent.create(content="test".encode("utf-8"), taken_time=old_taken, flush=True) | ||
events_processed = source.execute_loop(on_event=counter.on_event) | ||
assert len(events_processed) == 1 | ||
assert events_processed[0].taken_time > old_taken | ||
assert counter.processed_events == 1 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters