diff --git a/bin/sendAlertStream.py b/bin/sendAlertStream.py index 781286a..903e297 100644 --- a/bin/sendAlertStream.py +++ b/bin/sendAlertStream.py @@ -25,7 +25,6 @@ content. """ -from __future__ import print_function import argparse import glob import time @@ -33,44 +32,6 @@ from lsst.alert.stream import alertProducer from lsst.alert.packet import retrieve_alerts - -@asyncio.coroutine -def delay(wait_sec, function, *args): - """Sleep for a given time before calling a function. - Parameters - ---------- - wait_sec - Time in seconds to sleep before calling `function`. - function - Function to return after sleeping. - """ - yield from asyncio.sleep(wait_sec) - return function(*args) - - -@asyncio.coroutine -def schedule_delays(eventloop, function, argslist, interval=39): - """Schedule delayed calls of functions at a repeating interval. - Parameters - ---------- - eventloop - Event loop returned by asyncio.get_event_loop(). - function - Function to be scheduled. - argslist - List of inputs for function to loop over. - interval - Time in seconds between calls. - """ - counter = 1 - for arg in argslist: - wait_time = interval - (time.time() % interval) - yield from asyncio.ensure_future(delay(wait_time, function, arg)) - print('visits finished: {} \t time: {}'.format(counter, time.time())) - counter += 1 - eventloop.stop() - - def main(): parser = argparse.ArgumentParser(description=__doc__) parser.add_argument('broker', type=str, @@ -89,23 +50,31 @@ def main(): files.sort() def send_visit(f): - print('visit:', f[15:20], '\ttime:', time.time()) + start_time = time.time() + print('visit:', f[15:20], '\ttime:', start_time) # Load alert contents with open(f, mode='rb') as file_data: # TODO replace Avro files with visits having better S/N cut # for now, limit to first 10,000 alerts (current have ~70,000) schema, alert_packets = retrieve_alerts(file_data) - alert_count = 0 - for record in alert_packets: - if alert_count < 10000: - streamProducer.send(schema, record) - alert_count += 1 - else: - break + ALERTS_TO_SEND = 10000 + for alert_count, record in enumerate(alert_packets): + if alert_count < ALERTS_TO_SEND: + streamProducer.send(schema, record) + else: + break streamProducer.flush() + print(f"Sent {alert_count} alerts in {time.time() - start_time}s.") + # Schedule visits to be send every `interval` seconds. loop = asyncio.get_event_loop() - asyncio.ensure_future(schedule_delays(loop, send_visit, files)) + interval = 39 # Seconds between visits + for delay, filename in zip(range(0, interval * len(files), interval), + files): + loop.call_later(delay, send_visit, filename) + + # Shut down the event loop after the last visit has been sent. + loop.call_later(delay, loop.stop) loop.run_forever() loop.close()