Skip to content

Commit

Permalink
Make scheduling delays more idiomatic.
Browse files Browse the repository at this point in the history
  • Loading branch information
jdswinbank committed Jul 8, 2019
1 parent 40c4538 commit 9a8170f
Showing 1 changed file with 17 additions and 48 deletions.
65 changes: 17 additions & 48 deletions bin/sendAlertStream.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,52 +25,13 @@
content.
"""

from __future__ import print_function
import argparse
import glob
import time
import asyncio
from lsst.alert.stream import alertProducer
from lsst.alert.packet import retrieve_alerts


@asyncio.coroutine
def delay(wait_sec, function, *args):
"""Sleep for a given time before calling a function.
Parameters
----------
wait_sec
Time in seconds to sleep before calling `function`.
function
Function to return after sleeping.
"""
yield from asyncio.sleep(wait_sec)
return function(*args)


@asyncio.coroutine
def schedule_delays(eventloop, function, argslist, interval=39):
"""Schedule delayed calls of functions at a repeating interval.
Parameters
----------
eventloop
Event loop returned by asyncio.get_event_loop().
function
Function to be scheduled.
argslist
List of inputs for function to loop over.
interval
Time in seconds between calls.
"""
counter = 1
for arg in argslist:
wait_time = interval - (time.time() % interval)
yield from asyncio.ensure_future(delay(wait_time, function, arg))
print('visits finished: {} \t time: {}'.format(counter, time.time()))
counter += 1
eventloop.stop()


def main():
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument('broker', type=str,
Expand All @@ -89,23 +50,31 @@ def main():
files.sort()

def send_visit(f):
print('visit:', f[15:20], '\ttime:', time.time())
start_time = time.time()
print('visit:', f[15:20], '\ttime:', start_time)
# Load alert contents
with open(f, mode='rb') as file_data:
# TODO replace Avro files with visits having better S/N cut
# for now, limit to first 10,000 alerts (current have ~70,000)
schema, alert_packets = retrieve_alerts(file_data)
alert_count = 0
for record in alert_packets:
if alert_count < 10000:
streamProducer.send(schema, record)
alert_count += 1
else:
break
ALERTS_TO_SEND = 10000
for alert_count, record in enumerate(alert_packets):
if alert_count < ALERTS_TO_SEND:
streamProducer.send(schema, record)
else:
break
streamProducer.flush()
print(f"Sent {alert_count} alerts in {time.time() - start_time}s.")

# Schedule visits to be send every `interval` seconds.
loop = asyncio.get_event_loop()
asyncio.ensure_future(schedule_delays(loop, send_visit, files))
interval = 39 # Seconds between visits
for delay, filename in zip(range(0, interval * len(files), interval),
files):
loop.call_later(delay, send_visit, filename)

# Shut down the event loop after the last visit has been sent.
loop.call_later(delay, loop.stop)
loop.run_forever()
loop.close()

Expand Down

0 comments on commit 9a8170f

Please sign in to comment.