From c2f596b64358ca6f60ff61462edb09936846191b Mon Sep 17 00:00:00 2001 From: maffettone Date: Fri, 3 Dec 2021 16:12:29 -0500 Subject: [PATCH 1/7] Add initial basic agents for capilary quality experiments --- federation/quality/__init__.py | 1 + federation/quality/base.py | 146 +++++++++++++++++++++++++++++++++ 2 files changed, 147 insertions(+) create mode 100644 federation/quality/__init__.py create mode 100644 federation/quality/base.py diff --git a/federation/quality/__init__.py b/federation/quality/__init__.py new file mode 100644 index 0000000..17efdf2 --- /dev/null +++ b/federation/quality/__init__.py @@ -0,0 +1 @@ +"""Module of agents that determine quality of samples and next action. Designed with brackets of capilaries in mind.""" diff --git a/federation/quality/base.py b/federation/quality/base.py new file mode 100644 index 0000000..ff3cf59 --- /dev/null +++ b/federation/quality/base.py @@ -0,0 +1,146 @@ +from typing import Callable +import numpy as np +from collections import Counter + + +class Agent: + def __init__( + self, n_samples: int, quality_function: Callable[[np.array], int] = None + ): + """ + Agent class that retains a counter of measurements at each sample, + the index of the current sample, and a quality array with the current sample quality + of each sample. + + Quality should be given as a natural number starting from 1 to the trained maximum. + A regular default is to use {1: bad, 2: mediocre, 3:good}. + It is expected that the sample quality can and should improve over time, and will be + updated in the `tell` method as provided by the document stream. + + Parameters + ---------- + n_samples: int + Number of samples in measurement + """ + self.counter = Counter() # Counter of measurements + self.current = None # Current sample + self.n_samples = n_samples + self.cum_sum = dict() + self.quality = np.zeros(self.n_samples) # Current understood quality + if quality_function is None: + self.quality_function = self._default_quality + else: + self.quality_function = quality_function + + @staticmethod + def _default_quality(arr) -> int: + """Uses a proxy for Signal to Noise to break into 3 tiers.""" + SNR = np.max(arr) / np.mean(arr) + if SNR < 2: + return 1 + elif SNR < 3: + return 2 + else: + return 3 + + def tell(self, x=None, y=None): + """ + Tell's based on current sample only + Parameters + ---------- + x: float, array + y: float, array + + Returns + ------- + + """ + if x is not None: + self.current = x + self.counter[self.current] += 1 + if self.current in self.cum_sum: + self.cum_sum[self.current] += y + else: + self.cum_sum[self.current] = y + self.quality[self.current] = self.quality_function(self.cum_sum[self.current]) + + def tell_many(self, xs, ys): + """Useful for reload""" + for x, y in zip(xs, ys): + self.counter[x] += 1 + if self.current in self.cum_sum: + self.cum_sum[x] += y + else: + self.cum_sum[x] = y + for i in range(self.n_samples): + self.quality[i] = self.quality_function(self.cum_sum[i]) + + def ask(self, n): + raise NotImplementedError + + +class SequentialAgent(Agent): + def __init__(self, n_samples): + """ + Sequential agent that just keeps on going. + + Agent parent class retains a counter of measurements at each sample, + the index of the current sample, and a quality array with the current sample quality + of each sample. + + Quality should be given as a natural number starting from 1 to the trained maximum. + A regular default is to use {1: bad, 2: mediocre, 3:good}. + It is expected that the sample quality can and should improve over time, and will be + updated in the `tell` method as provided by the document stream. + + Parameters + ---------- + n_samples: int + Number of samples in measurement + """ + super().__init__(n_samples) + + def ask(self, n): + return (self.current + 1) % self.n_samples + + +class MarkovAgent(Agent): + def __init__(self, n_samples, max_quality, min_quality=1, seed=None): + """ + Stochastic agent that moves preferentially to worse seeds. + Queries a random transition and accepts with a probability of badness divided by range of quality. + + Agent parent class retains a counter of measurements at each sample, + the index of the current sample, and a quality array with the current sample quality + of each sample. + + Quality should be given as a natural number starting from 1 to the trained maximum. + A regular default is to use {1: bad, 2: mediocre, 3:good}. + It is expected that the sample quality can and should improve over time, and will be + updated in the `tell` method as provided by the document stream. + + Parameters + ---------- + n_samples: int + Number of samples in measurement + max_quality: int + Maximum quality value + min_quality: int + Minimum quality value. Should be 1 unless you're doing something strange. + """ + super().__init__(n_samples) + self.max_quality = max_quality + self.min_quality = min_quality + self.rng = np.random.default_rng(seed) + + def ask(self, n): + accept = False + proposal = None + while not accept: + proposal = self.rng.integers(self.n_samples) + if self.rng.random() < (self.max_quality - self.quality[proposal]) / ( + self.max_quality - self.min_quality + ): + accept = True + + return proposal From de72b98b5b3fe44c873f653713885d517f9323d3 Mon Sep 17 00:00:00 2001 From: maffettone Date: Fri, 3 Dec 2021 16:27:59 -0500 Subject: [PATCH 2/7] Add initial basic agents for capilary quality experiments --- federation/plumbing/queue_server.py | 56 +++++++++++++++++++++++++++++ federation/quality/base.py | 2 ++ 2 files changed, 58 insertions(+) create mode 100644 federation/plumbing/queue_server.py diff --git a/federation/plumbing/queue_server.py b/federation/plumbing/queue_server.py new file mode 100644 index 0000000..3bae1f9 --- /dev/null +++ b/federation/plumbing/queue_server.py @@ -0,0 +1,56 @@ +from queue import Queue +from event_model import RunRouter +from bluesky_adaptive.utils import extract_event_page +from bluesky_adaptive.recommendations import NoRecommendation + + +def index_reccomender_factory( + adaptive_object, + sample_index_key, + sample_data_key, + *, + queue=None, + cache_callback=None, +): + if queue is None: + queue = Queue() + + if cache_callback is None: + prelim_callbacks = () + else: + prelim_callbacks = [ + cache_callback, + ] + + def callback(name, doc): + """Assumes the start doc gives you the sample location, + and the event_page gives quality info. The current index is updated at the start + But the Agent quality matrix is only updated at tell.""" + # TODO: Validate the assumptions on formats + # TODO: Update queue signatures from .put to ...? + print(f"callback received {name}") + + if name == "start": + current_index = doc[sample_index_key] + adaptive_object.tell(x=current_index) + + elif name == "event_page": + data = extract_event_page( + [ + sample_data_key, + ], + payload=doc["data"], + ) + adaptive_object.tell(y=data) + + try: + next_point = adaptive_object.ask(1) + except NoRecommendation: + queue.put(None) + else: + queue.put({sample_index_key: next_point}) + else: + print(f"Document {name} is not handled") + + rr = RunRouter([lambda name, doc: ([prelim_callbacks, callback], [])]) + return rr, queue diff --git a/federation/quality/base.py b/federation/quality/base.py index ff3cf59..ee3827a 100644 --- a/federation/quality/base.py +++ b/federation/quality/base.py @@ -57,6 +57,8 @@ def tell(self, x=None, y=None): """ if x is not None: self.current = x + if y is None: + return self.counter[self.current] += 1 if self.current in self.cum_sum: self.cum_sum[self.current] += y From 0c02bbb8d6bffa2e29d110239e1d79fa592a7fc3 Mon Sep 17 00:00:00 2001 From: maffettone Date: Fri, 3 Dec 2021 16:53:34 -0500 Subject: [PATCH 3/7] Only return callback not runrouter --- federation/plumbing/queue_server.py | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/federation/plumbing/queue_server.py b/federation/plumbing/queue_server.py index 3bae1f9..1589562 100644 --- a/federation/plumbing/queue_server.py +++ b/federation/plumbing/queue_server.py @@ -1,5 +1,4 @@ from queue import Queue -from event_model import RunRouter from bluesky_adaptive.utils import extract_event_page from bluesky_adaptive.recommendations import NoRecommendation @@ -15,13 +14,6 @@ def index_reccomender_factory( if queue is None: queue = Queue() - if cache_callback is None: - prelim_callbacks = () - else: - prelim_callbacks = [ - cache_callback, - ] - def callback(name, doc): """Assumes the start doc gives you the sample location, and the event_page gives quality info. The current index is updated at the start @@ -52,5 +44,4 @@ def callback(name, doc): else: print(f"Document {name} is not handled") - rr = RunRouter([lambda name, doc: ([prelim_callbacks, callback], [])]) - return rr, queue + return callback, queue From e57e126de0f86fd5164879d0e4b44a90ff0c8241 Mon Sep 17 00:00:00 2001 From: maffettone Date: Fri, 3 Dec 2021 17:09:06 -0500 Subject: [PATCH 4/7] Only return callback not runrouter --- federation/plumbing/queue_server.py | 1 - 1 file changed, 1 deletion(-) diff --git a/federation/plumbing/queue_server.py b/federation/plumbing/queue_server.py index 1589562..2aacf4b 100644 --- a/federation/plumbing/queue_server.py +++ b/federation/plumbing/queue_server.py @@ -9,7 +9,6 @@ def index_reccomender_factory( sample_data_key, *, queue=None, - cache_callback=None, ): if queue is None: queue = Queue() From 4f9f7975e4faff9bb37cb0b89a27716ecbd7a545 Mon Sep 17 00:00:00 2001 From: maffettone Date: Fri, 3 Dec 2021 17:18:48 -0500 Subject: [PATCH 5/7] Use queueserver conventions instead of queue.Queue --- federation/plumbing/queue_server.py | 33 +++++++++++++++++++++-------- 1 file changed, 24 insertions(+), 9 deletions(-) diff --git a/federation/plumbing/queue_server.py b/federation/plumbing/queue_server.py index 2aacf4b..2b473eb 100644 --- a/federation/plumbing/queue_server.py +++ b/federation/plumbing/queue_server.py @@ -1,24 +1,28 @@ -from queue import Queue from bluesky_adaptive.utils import extract_event_page from bluesky_adaptive.recommendations import NoRecommendation def index_reccomender_factory( + *, adaptive_object, sample_index_key, sample_data_key, - *, - queue=None, + queue_server, + # TODO: Add more sensible defaults for these args. + mv_kwargs=None, + count_args=(), + count_kwargs=None, ): - if queue is None: - queue = Queue() + if mv_kwargs is None: + mv_kwargs = {} + if count_kwargs is None: + count_kwargs = {} def callback(name, doc): """Assumes the start doc gives you the sample location, and the event_page gives quality info. The current index is updated at the start But the Agent quality matrix is only updated at tell.""" # TODO: Validate the assumptions on formats - # TODO: Update queue signatures from .put to ...? print(f"callback received {name}") if name == "start": @@ -37,10 +41,21 @@ def callback(name, doc): try: next_point = adaptive_object.ask(1) except NoRecommendation: - queue.put(None) + queue_server.queue_item_add(None) else: - queue.put({sample_index_key: next_point}) + queue_server.queue_item_add( + item_name="mv", + item_args=[next_point], + item_kwargs=mv_kwargs, + item_type="plan", + ) + queue_server.queue_item_add( + item_name="count", + item_args=[*count_args], + item_kwargs=count_kwargs, + item_type="plan", + ) else: print(f"Document {name} is not handled") - return callback, queue + return callback From 453f714574737e6a8cec8873f2e9a82f12aff9c5 Mon Sep 17 00:00:00 2001 From: maffettone Date: Fri, 3 Dec 2021 18:08:17 -0500 Subject: [PATCH 6/7] Add script for simple adaptive consumer using queue server --- .../scripts/adaptive_queueserver_consumer.py | 79 +++++++++++++++++++ 1 file changed, 79 insertions(+) create mode 100644 bluesky_config/scripts/adaptive_queueserver_consumer.py diff --git a/bluesky_config/scripts/adaptive_queueserver_consumer.py b/bluesky_config/scripts/adaptive_queueserver_consumer.py new file mode 100644 index 0000000..55d5cda --- /dev/null +++ b/bluesky_config/scripts/adaptive_queueserver_consumer.py @@ -0,0 +1,79 @@ +import argparse +from databroker._drivers.msgpack import BlueskyMsgpackCatalog +from event_model import RunRouter +from suitcase.msgpack import Serializer +from pathlib import Path +import pprint +from federation.quality.base import SequentialAgent, MarkovAgent +from federation.plumbing.queue_server import index_reccomender_factory +from bluesky.callbacks.zmq import RemoteDispatcher as ZmqRemoteDispatcher +from exp_queueclient import BlueskyHttpserverSession +import logging + + +if __name__ == "__main__": + + arg_parser = argparse.ArgumentParser() + arg_parser.add_argument("--document-cache", type=Path, default=None) + arg_parser.add_argument("--agent", type=str, default="sequential") + arg_parser.add_argument("-n", "--n-samples", type=int, default=30) + arg_parser.add_argument("--seed", type=int, default=1234) + + # TODO: Update default server arguments + arg_parser.add_argument("--zmq-host", type=str, default="xf28id1-ca1") + arg_parser.add_argument("--zmq-subscribe-port", type=int, default=5578) + arg_parser.add_argument("--zmq-subscribe-prefix", type=str, default="rr") + + arg_parser.add_argument( + "-u", "--bluesky-httpserver-url", type=str, default="http://localhost:60610" + ) + + args = arg_parser.parse_args() + pprint.pprint(vars(args)) + + zmq_dispatcher = ZmqRemoteDispatcher( + address=(args.zmq_host, args.zmq_subscribe_port), + prefix=args.zmq_subscribe_prefix.encode(), + ) + + #################################################################### + # CHOOSE YOUR FIGHTER + agent = { + "sequntial": SequentialAgent(args.n_samples), + "markov": MarkovAgent(args.n_samples, max_quality=3, seed=1234), + }[args.agent] + #################################################################### + + if args.document_cache is not None: + cat = BlueskyMsgpackCatalog(str(args.document_cache / "*.msgpack")) + xs = [] + ys = [] + for uid in cat: + h = cat[uid] + xs.append(h.metadata["start"]["sample_number"]) + ys.append(h.primary.read()) + agent.tell_many(xs, ys) + cache_callback = Serializer(args.document_cache, flush=True) + else: + cache_callback = None + + with BlueskyHttpserverSession( + bluesky_httpserver_url=args.bluesky_httpserver_url + ) as session: + #################################################################### + # ENSURE THESE KEYS AND QUEUE ARE APPROPRIATE + index_callback, _ = index_reccomender_factory( + adaptive_object=agent, + sample_index_key="sample number", + sample_data_key="data", + queue_server=session, + ) + #################################################################### + + rr = RunRouter([lambda name, doc: ([cache_callback, index_callback], [])]) + zmq_dispatcher.subscribe(rr) + + logging.debug( + f"ADAPTIVE CONSUMER LISTENING ON {args.zmq_subscribe_prefix.encode()}" + ) + zmq_dispatcher.start() From e8a9c0db109d3d2d41c1772865493b5fa45f9963 Mon Sep 17 00:00:00 2001 From: maffettone Date: Fri, 3 Dec 2021 18:11:29 -0500 Subject: [PATCH 7/7] Ensure item add success --- federation/plumbing/queue_server.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/federation/plumbing/queue_server.py b/federation/plumbing/queue_server.py index 2b473eb..16c4f96 100644 --- a/federation/plumbing/queue_server.py +++ b/federation/plumbing/queue_server.py @@ -43,18 +43,22 @@ def callback(name, doc): except NoRecommendation: queue_server.queue_item_add(None) else: - queue_server.queue_item_add( + response = queue_server.queue_item_add( item_name="mv", item_args=[next_point], item_kwargs=mv_kwargs, item_type="plan", ) - queue_server.queue_item_add( + if response.json()["success"] is False: + raise RuntimeError("Queue Server failed to add item for mv plan") + response = queue_server.queue_item_add( item_name="count", item_args=[*count_args], item_kwargs=count_kwargs, item_type="plan", ) + if response.json()["success"] is False: + raise RuntimeError("Queue Server failed to add item for count plan") else: print(f"Document {name} is not handled")