Skip to content

Commit

Permalink
Add streaming to recorder
Browse files Browse the repository at this point in the history
  • Loading branch information
Joseph McKinsey committed Feb 28, 2024
1 parent 936bdb4 commit 04e2a8f
Showing 1 changed file with 9 additions and 3 deletions.
12 changes: 9 additions & 3 deletions recorder/record_subscription.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,11 @@ def run(self):
start = True
granted_time = h.helicsFederateRequestTime(self.vfed, h.HELICS_TIME_MAXTIME)

with pa.OSFile(self.feather_filename, "wb") as sink:
with pa.OSFile(self.feather_filename, "wb") as sink, pa.OSFile(
self.feather_filename + ".stream", "wb"
) as streamsink:
writer = None
streamwriter = None
while granted_time < h.HELICS_TIME_MAXTIME:
logger.info("start time: " + str(datetime.now()))
logger.debug(granted_time)
Expand All @@ -83,10 +86,12 @@ def run(self):
schema_elements.append(("time", pa.string()))
schema = pa.schema(schema_elements)
writer = pa.ipc.new_file(sink, schema)
streamwriter = pa.ipc.new_stream(streamsink, schema)
start = False
cnt = 0

writer.write_batch(pa.RecordBatch.from_pylist([measurement_dict]))
record_batch = pa.RecordBatch.from_pylist([measurement_dict])
writer.write_batch(record_batch)
streamwriter.write_batch(record_batch)

granted_time = h.helicsFederateRequestTime(
self.vfed, h.HELICS_TIME_MAXTIME
Expand All @@ -95,6 +100,7 @@ def run(self):

if writer is not None:
writer.close()
streamwriter.close()
data = pd.read_feather(self.feather_filename)
data.to_csv(self.csv_filename, header=True, index=False)
self.destroy()
Expand Down

0 comments on commit 04e2a8f

Please sign in to comment.