Skip to content

Commit

Permalink
test macros in bluesky
Browse files Browse the repository at this point in the history
  • Loading branch information
XPD Operator committed Aug 12, 2024
1 parent 9f3c89a commit a59b0a9
Show file tree
Hide file tree
Showing 6 changed files with 317 additions and 491 deletions.
203 changes: 107 additions & 96 deletions scripts/_LDRD_Kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,20 @@
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import importlib

import _data_export as de
from _plot_helper import plot_uvvis
import _data_analysis as da
# import _synthesis_queue_RM as sq
import _pdf_calculator as pc
import _get_pdf as gp
from scipy import integrate

import torch
# from prepare_agent_pdf import build_agen
from diffpy.pdfgetx import PDFConfig
# from tiled.client import from_uri
# from diffpy.pdfgetx import PDFConfig
# import _get_pdf as gp
from tiled.client import from_uri

# from bluesky_queueserver_api.zmq import REManagerAPI
from bluesky_queueserver_api.zmq import REManagerAPI
from bluesky_queueserver_api import BPlan, BInst


Expand All @@ -31,7 +30,7 @@ def _qserver_inputs():
'sample',
'wait_dilute', 'mixer', 'wash_tube', 'resident_t_ratio',
'rate_unit', 'uvvis_config', 'perkin_config',
'set_target_list', 'infuse_rates',
'auto_set_target_list', 'set_target_list', 'infuse_rates',
]

return qserver_list
Expand Down Expand Up @@ -87,54 +86,60 @@ def __init__(self, parameters_dict, parameters_list):


class xlsx_to_inputs():
def __init__(self, parameters_list, xlsx_fn, sheet_name='inputs'):
def __init__(self, parameters_list, xlsx_fn, sheet_name='inputs', is_kafka=False):
self.parameters_list = parameters_list
self.from_xlsx = xlsx_fn
self.sheet_name = sheet_name

## set attributes of keys in _kafka_process() for processing data
for key in _kafka_process():
setattr(self, key, [])

## set attributes of keys in self.parameters_list for input parameters
self.print_dic = de._read_input_xlsx(self.from_xlsx, sheet_name=self.sheet_name)
## Every attribute in self.inputs is default to a list!!!
self.inputs = dic_to_inputs(self.print_dic, self.parameters_list)

try:
## Assign Agent to self.agent
self.agent = build_agen(
peak_target=self.inputs.peak_target[0],
agent_data_path=self.inputs.agent_data_path[0])

## self.inputs.sandbox_uri[0] is just the uri of sandbox
## so, turn uri into client and assign it to self.sandbox_tiled_client
if type(self.inputs.sandbox_uri[0]) is str:
self.sandbox_tiled_client = from_uri(self.inputs.sandbox_uri[0])


## Use glob.glob to find the complete path of cfg_fn, bkg_fn, iq_fn, cif_fn, gr_fn
# fn_TBD = ['cfg_fn', 'bkg_fn', 'iq_fn', 'cif_fn', 'gr_fn']
for fn in self.inputs.fn_TBD:

path = getattr(self.inputs, fn)[0]
if path in self.parameters_list:
path = getattr(self.inputs, path)[0]

ff = getattr(self.inputs, fn)[1]

fn_glob = glob.glob(os.path.join(path, ff))

for i in fn_glob:
getattr(self.inputs, fn).append(i)

## Making rate_label_dic by rate_label_dic_key and rate_label_dic_value
self.rate_label_dic = {}
for key, value in zip(self.inputs.rate_label_dic_key, rate_label_dic_value):
self.rate_label_dic.update({key: value})

except AttributeError:
pass
if is_kafka:
## set attributes of keys in _kafka_process() for processing data
for key in _kafka_process():
setattr(self, key, [])

try:
## Assign Agent to self.agent
self.agent
print('\n***** Start to initialize blop agent ***** \n')
build_agent = importlib.import_module("prepare_agent_pdf").build_agent
self.agent = build_agent(
peak_target=self.inputs.peak_target[0],
agent_data_path=self.inputs.agent_data_path[0])
print(f'\n***** Initialized blop agent at {self.agent} ***** \n')

## self.inputs.sandbox_uri[0] is just the uri of sandbox
## so, turn uri into client and assign it to self.sandbox_tiled_client
if type(self.inputs.sandbox_uri[0]) is str:
self.sandbox_tiled_client = from_uri(self.inputs.sandbox_uri[0])


## Use glob.glob to find the complete path of cfg_fn, bkg_fn, iq_fn, cif_fn, gr_fn
# fn_TBD = ['cfg_fn', 'bkg_fn', 'iq_fn', 'cif_fn', 'gr_fn']
for fn in self.inputs.fn_TBD:

path = getattr(self.inputs, fn)[0]
if path in self.parameters_list:
path = getattr(self.inputs, path)[0]

ff = getattr(self.inputs, fn)[1]

fn_glob = glob.glob(os.path.join(path, ff))

for i in fn_glob:
getattr(self.inputs, fn).append(i)

## Making rate_label_dic by rate_label_dic_key and rate_label_dic_value
self.rate_label_dic = {}
for key, value in zip(self.inputs.rate_label_dic_key, self.inputs.rate_label_dic_value):
self.rate_label_dic.update({key: value})

except AttributeError:
pass


def auto_rate_list(self, pump_list, new_points, fix_Br_ratio):
Expand Down Expand Up @@ -163,7 +168,7 @@ def auto_rate_list(self, pump_list, new_points, fix_Br_ratio):



def macro_agent(self, qserver_process, RM, check_target=False):
def macro_agent(self, qserver_process, RM, check_target=False, use_OAm=False, is_1st=False):
"""macro to build agent, make optimization, and update agent_data
This macro will
Expand All @@ -189,10 +194,14 @@ def macro_agent(self, qserver_process, RM, check_target=False):
peak_target = self.inputs.peak_target[0]
peak_tolerance = self.inputs.peak_target[1]

self.agent = build_agen(
peak_target = peak_target,
agent_data_path = self.inputs.agent_data_path[0],
use_OAm = True)
if is_1st:
pass
else:
build_agent = importlib.import_module("prepare_agent_pdf").build_agent
self.agent = build_agent(
peak_target = peak_target,
agent_data_path = self.inputs.agent_data_path[0],
use_OAm = use_OAm)

if len(self.agent.table) < 2:
acq_func = "qr"
Expand All @@ -210,7 +219,7 @@ def macro_agent(self, qserver_process, RM, check_target=False):
if i in new_points['points'].keys():
res_values.append(new_points['points'][i][0])
x_tensor = torch.tensor(res_values)
posterior = agent.posterior(x_tensor)
posterior = self.agent.posterior(x_tensor)
post_mean = posterior.mean.tolist()[0]
post_stddev = posterior.stddev.tolist()[0]

Expand All @@ -226,15 +235,15 @@ def macro_agent(self, qserver_process, RM, check_target=False):
self.agent_data.update({'agent_target': agent_target})
self.agent_data.update({'posterior_mean': post_mean})
self.agent_data.update({'posterior_stddev': post_stddev})

peak_diff = abs(self.PL_fitting['peak_emission'] - peak_target)
meet_target = (peak_diff <= peak_tolerance)

if check_target and meet_target:
peak_diff = abs(self.PL_fitting['peak_emission'] - peak_target)
meet_target = (peak_diff <= peak_tolerance)
print(f'\nTarget peak: {self.inputs.peak_target[0]} nm vs. Current peak: {self.PL_fitting["peak_emission"]} nm\n')
print(f'\nReach the target, stop iteration, stop all pumps, and wash the loop.\n')

### Stop all infusing pumps and wash loop
sq = importlib.import_module("_synthesis_queue_RM")
sq.wash_tube_queue2(qin.pump_list, qin.wash_tube, 'ul/min',
zmq_control_addr=qin.zmq_control_addr[0],
zmq_info_addr=qin.zmq_info_addr[0])
Expand Down Expand Up @@ -309,7 +318,7 @@ def macro_02_get_uid(self):
stream_list = self.tiled_client[self.uid].metadata['summary']['stream_names']
## Reset self.stream_list to an empty list
self.stream_list = []
for stream_name in syringe_list:
for stream_name in stream_list:
self.stream_list.append(stream_name)


Expand All @@ -336,7 +345,7 @@ def macro_03_stop_queue_uid(sefl, RM):
stream_list = list(message['num_events'].keys())
## Reset self.stream_list to an empty list
self.stream_list = []
for stream_name in syringe_list:
for stream_name in stream_list:
self.stream_list.append(stream_name)


Expand Down Expand Up @@ -370,46 +379,46 @@ def macro_04_dummy_pdf(sefl):



def macro_05_iq_to_gr(self, beamline_acronym):
"""macro to condcut data reduction from I(Q) to g(r), used in kafka consumer
# def macro_05_iq_to_gr(self, beamline_acronym):
# """macro to condcut data reduction from I(Q) to g(r), used in kafka consumer

This macro will
1. Generate a filename for g(r) data by using metadata of stream_name == fluorescence
2. Read pdf config file from self.inputs.cfg_fn[-1]
3. Read pdf background file from self.inputs.bkg_fn[-1]
4. Generate s(q), f(q), g(r) data by gp.transform_bkg() and save in self.inputs.iq_to_gr_path[0]
5. Read saved g(r) into pd.DataFrame and save again to remove the headers
6. Update g(r) data path and data frame to self.gr_data
self.gr_data[0]: gr_data (path)
self.gr_data[1]: gr_df
Args:
beamline_acronym (str): catalog name for tiled to access data
"""
# Grab metadat from stream_name = fluorescence for naming gr file
fn_uid = de._fn_generator(self.uid, beamline_acronym=beamline_acronym)
gr_fn = f'{fn_uid}_scattering.gr'

### dummy test, e.g., CsPbBr2
if self.inputs.dummy_pdf[0]:
gr_fn = f'{self.inputs.iq_fn[-1][:-4]}.gr'

# Build pdf config file from a scratch
pdfconfig = PDFConfig()
pdfconfig.readConfig(self.inputs.cfg_fn[-1])
pdfconfig.backgroundfiles = self.inputs.bkg_fn[-1]
sqfqgr_path = gp.transform_bkg(pdfconfig, self.iq_data['array'], output_dir=self.inputs.iq_to_gr_path[0],
plot_setting={'marker':'.','color':'green'}, test=True,
gr_fn=gr_fn)
gr_data = sqfqgr_path['gr']

## Remove headers by reading gr_data into pd.Dataframe and save again
gr_df = pd.read_csv(gr_data, skiprows=26, names=['r', 'g(r)'], sep =' ')
gr_df.to_csv(gr_data, index=False, header=False, sep =' ')

self.gr_data = []
self.gr_data.append(gr_data)
self.gr_data.append(gr_df)
# This macro will
# 1. Generate a filename for g(r) data by using metadata of stream_name == fluorescence
# 2. Read pdf config file from self.inputs.cfg_fn[-1]
# 3. Read pdf background file from self.inputs.bkg_fn[-1]
# 4. Generate s(q), f(q), g(r) data by gp.transform_bkg() and save in self.inputs.iq_to_gr_path[0]
# 5. Read saved g(r) into pd.DataFrame and save again to remove the headers
# 6. Update g(r) data path and data frame to self.gr_data
# self.gr_data[0]: gr_data (path)
# self.gr_data[1]: gr_df

# Args:
# beamline_acronym (str): catalog name for tiled to access data
# """
# # Grab metadat from stream_name = fluorescence for naming gr file
# fn_uid = de._fn_generator(self.uid, beamline_acronym=beamline_acronym)
# gr_fn = f'{fn_uid}_scattering.gr'

# ### dummy test, e.g., CsPbBr2
# if self.inputs.dummy_pdf[0]:
# gr_fn = f'{self.inputs.iq_fn[-1][:-4]}.gr'

# # Build pdf config file from a scratch
# pdfconfig = PDFConfig()
# pdfconfig.readConfig(self.inputs.cfg_fn[-1])
# pdfconfig.backgroundfiles = self.inputs.bkg_fn[-1]
# sqfqgr_path = gp.transform_bkg(pdfconfig, self.iq_data['array'], output_dir=self.inputs.iq_to_gr_path[0],
# plot_setting={'marker':'.','color':'green'}, test=True,
# gr_fn=gr_fn)
# gr_data = sqfqgr_path['gr']

# ## Remove headers by reading gr_data into pd.Dataframe and save again
# gr_df = pd.read_csv(gr_data, skiprows=26, names=['r', 'g(r)'], sep =' ')
# gr_df.to_csv(gr_data, index=False, header=False, sep =' ')

# self.gr_data = []
# self.gr_data.append(gr_data)
# self.gr_data.append(gr_df)



Expand Down Expand Up @@ -617,7 +626,7 @@ def macro_10_good_bad(self, stream_name):
self.qepro_dic,
self.metadata_dic,
key_height=self.inputs.key_height[0],
distance=self.inputs..distance[0],
distance=self.inputs.distance[0],
height=self.inputs.height[0],
dummy_test=self.inputs.dummy_kafka[0],
percent_range=[30, 100])
Expand Down Expand Up @@ -912,6 +921,7 @@ def macro_17_add_queue(self, stream_name, qserver_process, RM):
print('*** qsever aborted due to too many bad scans, please check setup ***\n')

### Stop all infusing pumps and wash loop
sq = importlib.import_module("_synthesis_queue_RM")
sq.wash_tube_queue2(qin.pump_list, qin.wash_tube, 'ul/min',
zmq_control_addr=qin.zmq_control_addr[0],
zmq_info_addr=qin.zmq_info_addr[0])
Expand Down Expand Up @@ -959,6 +969,7 @@ def macro_17_add_queue(self, stream_name, qserver_process, RM):

qin.sample = de._auto_name_sample(rate_list, prefix=qin.prefix[1:])
qin.infuse_rates = rate_list
sq = importlib.import_module("_synthesis_queue_RM")
sq.synthesis_queue_xlsx(qserver_process)

else:
Expand Down
33 changes: 22 additions & 11 deletions scripts/_synthesis_queue_RM.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ def synthesis_queue_xlsx(parameter_obj):

syringe_list = qsp.syringe_list
pump_list = qsp.pump_list
auto_set_target_list = qsp.auto_set_target_list[0]
set_target_list = qsp.set_target_list
target_vol_list = qsp.target_vol_list
rate_list = qsp.infuse_rates
Expand Down Expand Up @@ -60,12 +61,16 @@ def synthesis_queue_xlsx(parameter_obj):
else:
rate_list = rate_list.tolist()

set_target_list = np.asarray(set_target_list, dtype=np.int8)
if len(set_target_list.shape) == 1:
set_target_list = set_target_list.reshape(1, set_target_list.shape[0])
set_target_list = set_target_list.tolist()
if auto_set_target_list:
set_target_list = np.zeros([len(rate_list), len(pump_list)]).tolist()

else:
set_target_list = set_target_list.tolist()
set_target_list = np.asarray(set_target_list, dtype=np.int8)
if len(set_target_list.shape) == 1:
set_target_list = set_target_list.reshape(1, set_target_list.shape[0])
set_target_list = set_target_list.tolist()
else:
set_target_list = set_target_list.tolist()


# 0. stop infuese for all pumps
Expand Down Expand Up @@ -238,7 +243,8 @@ def synthesis_queue_xlsx(parameter_obj):
## Arrange tasks of for PQDs synthesis
def synthesis_queue(
syringe_list,
pump_list,
pump_list,
auto_set_target_list,
set_target_list,
target_vol_list,
rate_list,
Expand Down Expand Up @@ -275,12 +281,17 @@ def synthesis_queue(
else:
rate_list = rate_list.tolist()

set_target_list = np.asarray(set_target_list, dtype=np.int8)
if len(set_target_list.shape) == 1:
set_target_list = set_target_list.reshape(1, set_target_list.shape[0])
set_target_list = set_target_list.tolist()

if auto_set_target_list[0]:
set_target_list = np.zeros([len(rate_list), len(pump_list)]).tolist()

else:
set_target_list = set_target_list.tolist()
set_target_list = np.asarray(set_target_list, dtype=np.int8)
if len(set_target_list.shape) == 1:
set_target_list = set_target_list.reshape(1, set_target_list.shape[0])
set_target_list = set_target_list.tolist()
else:
set_target_list = set_target_list.tolist()


# 0. stop infuese for all pumps
Expand Down
Binary file modified scripts/inputs_qserver_kafka_v2.xlsx
Binary file not shown.
Loading

0 comments on commit a59b0a9

Please sign in to comment.