Skip to content

Commit

Permalink
debug scripts/kafka_consumer_iterate_XPD_v2.py
Browse files Browse the repository at this point in the history
  • Loading branch information
XPD_Operator committed Aug 15, 2024
1 parent fc16ef1 commit 588fee4
Show file tree
Hide file tree
Showing 18 changed files with 21 additions and 7 deletions.
1 change: 1 addition & 0 deletions scripts/.~lock.inputs_qserver_kafka_v2.xlsx#
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
,xf28id2,xf28id2-ws3.nsls2.bnl.local,15.08.2024 18:56,file:///home/xf28id2/.config/libreoffice/4;
12 changes: 10 additions & 2 deletions scripts/_LDRD_Kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,13 @@ def save_kafka_dict(self, home_path, reset_uid_catalog=True):

date, ttime = de._readable_time(self.metadata_dic['time'])
json_fn = f'{date}-{ttime}_{self.uid[0:8]}.json'

## Make directory for home_path folder
try:
os.mkdir(home_path)
except FileExistsError:
pass

json_path = os.path.join(home_path, json_fn)

key_to_save = [
Expand All @@ -171,7 +178,7 @@ def save_kafka_dict(self, home_path, reset_uid_catalog=True):



## Reset attributes of keys in _kafka_process() to empty lists for next event
## Reset attributes of key in keys to empty lists for next event
def reset_kafka_process(self, keys):
for key in keys:
setattr(self, key, [])
Expand Down Expand Up @@ -362,7 +369,7 @@ def macro_02_get_uid(self):



def macro_03_stop_queue_uid(sefl, RM):
def macro_03_stop_queue_uid(self, RM, message):
"""macro to stop queue and get raw data uid, used in kafka consumer
while taking a Uv-Vis, no X-ray data but still do analysis of pdfstream
Expand All @@ -374,6 +381,7 @@ def macro_03_stop_queue_uid(sefl, RM):
Args:
RM (REManagerAPI): Run Engine Manager API.
message (dict): message in RE document
"""
inst1 = BInst("queue_stop")
RM.item_add(inst1, pos='front')
Expand Down
6 changes: 3 additions & 3 deletions scripts/_synthesis_queue_RM.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,9 @@ def synthesis_queue_xlsx(parameter_obj):


## 4.0 Configure area detector in Qserver
if det1 == 'pe1c':
if det1 == 'pe1c' or det1 == 'pe2c':
scanplan = BPlan('configure_area_det',
det='pe1c',
det=det1,
exposure=1,
acq_time=det1_frame_rate)
RM.item_add(scanplan, pos=pos)
Expand Down Expand Up @@ -193,7 +193,7 @@ def synthesis_queue_xlsx(parameter_obj):
# scanplan = BPlan('print_glbl_qserver')
# RM.item_add(scanplan, pos=pos)

if det1 == 'pe1c':
if det1 == 'pe1c' or det1 == 'pe2c':
## 6.1 Configure area detector in Qserver
scanplan = BPlan('configure_area_det',
det=det1,
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
Binary file modified scripts/inputs_qserver_kafka_v2.xlsx
Binary file not shown.
9 changes: 7 additions & 2 deletions scripts/kafka_consumer_iterate_XPD_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@
plt.rcParams["figure.raise_window"] = False

# xlsx_fn = '/home/xf28id2/Documents/ChengHung/inputs_qserver_kafka_v2.xlsx'
xlsx_fn = '/home/xf28id2/.ipython/profile_collection/scripts/inputs_qserver_kafka_v2.xlsx'
# xlsx_fn = '/home/xf28id2/.ipython/profile_collection/scripts/inputs_qserver_kafka_v2.xlsx'
xlsx_fn = '/home/xf28id2/.ipython/profile_collection_ldrd20-31/scripts/inputs_qserver_kafka_v2.xlsx'

## Input varaibales for Qserver, reading from xlsx_fn by given sheet name
qserver_process = LK.xlsx_to_inputs(LK._qserver_inputs(), xlsx_fn=xlsx_fn, sheet_name='qserver_test')
Expand Down Expand Up @@ -238,7 +239,7 @@ def print_message(consumer, doctype, doc):
print(f"\n\n\n{datetime.datetime.now().isoformat()} documents {name}\n"
f"contents: {pprint.pformat(message)}")

kafka_process.macro_03_stop_queue_uid(RM)
kafka_process.macro_03_stop_queue_uid(RM, message)


############## (name == 'stop') and (type(kafka_process.uid) is str) ##############
Expand Down Expand Up @@ -414,7 +415,11 @@ def print_message(consumer, doctype, doc):
#####################################################################################
kafka_process.macro_17_add_queue(stream_name, qserver_process, RM)


if (name == 'stop') and ('fluorescence' in kafka_process.stream_list):
kafka_process.save_kafka_dict('/home/xf28id2/Documents/ChengHung/kafka_dict_log')


kafka_config = _read_bluesky_kafka_config_file(config_file_path="/etc/bluesky/kafka.yml")

# this consumer should not be in a group with other consumers
Expand Down
File renamed without changes.

0 comments on commit 588fee4

Please sign in to comment.