Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make the BLUESKY_KAFKA_BOOTSTRAP_SERVERS env var required #22

Closed
wants to merge 16 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions .ci/bl-specific.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
#!/bin/bash

# cp -v <...> ~/.ipython/profile_${TEST_PROFILE}/...
# TODO: watch for https://github.com/NSLS-II-HXN/hxnfly/pull/8 to be
# merged/released, then this install can be removed from here.
python3 -m pip install --no-deps -vv git+https://github.com/NSLS-II-HXN/hxnfly@change-xspress3-imports

conda install -y -c ${CONDA_CHANNEL_NAME} 03-id-hxn-collection
sudo mkdir -v -p /home/xf03id/
sudo chown -Rv $USER: /home/xf03id/
touch /home/xf03id/benchmark.out
69 changes: 46 additions & 23 deletions startup/00-startup.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,45 @@
import certifi
import functools
import os
import time
import uuid
import warnings
import pandas as pd
from collections import deque
from datetime import datetime, timedelta, tzinfo

import certifi
import ophyd
import pandas as pd
import pymongo
import six
from ophyd.signal import EpicsSignalBase

EpicsSignalBase.set_defaults(timeout=10, connection_timeout=10)

# Set up a Broker.
# TODO clean this up
from bluesky_kafka import Publisher
from databroker import Broker
from databroker.headersource.mongo import MDS
from databroker.assets.mongo import Registry

from databroker.headersource.core import doc_or_uid_to_uid

from datetime import timedelta, datetime, tzinfo

import pymongo
from pymongo import MongoClient

import uuid
from databroker.headersource.mongo import MDS
from databroker.v0 import Broker
from jsonschema import validate as js_validate
import six
from collections import deque
from pymongo import MongoClient

import os
os.environ["PPMAC_HOST"] = "xf03idc-ppmac1"


bootstrap_servers = os.getenv("BLUESKY_KAFKA_BOOTSTRAP_SERVERS", None)
if bootstrap_servers is None:
# https://github.com/NSLS-II/nslsii/blob/b332c34813adf798c38184292d21537ef4f653bb/nslsii/__init__.py#L710-L712
msg = ("The 'BLUESKY_KAFKA_BOOTSTRAP_SERVERS' environment variable must "
"be defined as a comma-delimited list of Kafka server addresses "
"or hostnames and ports as a string such as "
"``'kafka1:9092,kafka2:9092``")
raise RuntimeError(msg)

kafka_publisher = Publisher(
topic="hxn.bluesky.datum.documents",
bootstrap_servers=os.environ['BLUESKY_KAFKA_BOOTSTRAP_SERVERS'],
bootstrap_servers=bootstrap_servers,
key=str(uuid.uuid4()),
producer_config={
"acks": 1,
Expand Down Expand Up @@ -257,6 +267,7 @@ def bulk_register_datum_table(self, resource_uid, dkwargs_table, validate=False)
mds_db1 = MDS(_mds_config_db1, auth=False)
db1 = Broker(mds_db1, CompositeRegistry(_fs_config_db1))


class CompositeBroker(Broker):
"""wrapper for two databases"""

Expand Down Expand Up @@ -326,6 +337,7 @@ def insert(self, name, doc):
db = CompositeBroker(mds_db1, CompositeRegistry(_fs_config_db1))

from hxntools.handlers import register as _hxn_register_handlers

_hxn_register_handlers(db)
del _hxn_register_handlers
# do the rest of the standard configuration
Expand All @@ -336,6 +348,7 @@ def insert(self, name, doc):
# configure_olog(get_ipython().user_ns)

from bluesky.callbacks.best_effort import BestEffortCallback

bec = BestEffortCallback()

# un import *
Expand All @@ -355,11 +368,11 @@ def insert(self, name, doc):
RE.md['beamline_id'] = 'HXN'
RE.verbose = True

# set up some HXN specific callbacks
from ophyd.callbacks import UidPublish
from hxntools.scan_number import HxnScanNumberPrinter
from hxntools.scan_status import HxnScanStatus
from ophyd import EpicsSignal
# set up some HXN specific callbacks
from ophyd.callbacks import UidPublish

uid_signal = EpicsSignal('XF:03IDC-ES{BS-Scan}UID-I', name='uid_signal')
uid_broadcaster = UidPublish(uid_signal)
Expand Down Expand Up @@ -389,6 +402,7 @@ def ensure_proposal_id(md):

# be nice on segfaults
import faulthandler

faulthandler.enable()

# set up logging framework
Expand Down Expand Up @@ -418,7 +432,8 @@ def ensure_proposal_id(md):
pd.options.display.max_columns = 10


from bluesky.plan_stubs import mov
from bluesky.plan_stubs import mov

# from bluesky.utils import register_transform

def register_transform(RE, *, prefix='<'):
Expand All @@ -435,6 +450,7 @@ def register_transform(RE, *, prefix='<'):
valid python syntax or an existing transform you are on your own.
'''
import IPython

# from IPython.core.inputtransformer2 import StatelessInputTransformer

# @StatelessInputTransformer.wrap
Expand Down Expand Up @@ -522,7 +538,16 @@ def _epicssignal_get(self, *, as_string=None, connection_timeout=1.0, **kwargs):
if as_string is None:
as_string = self._string

with self._lock:
###########################################
# Usedf only for old ophyd 1.3.3 and older.
from distutils.version import LooseVersion

import ophyd
if ophyd.__version__ < LooseVersion('1.4'):
self._metadata_lock = self._lock
###########################################

with self._metadata_lock:
if not self._read_pv.connected:
if not self._read_pv.wait_for_connection(connection_timeout):
raise TimeoutError('Failed to connect to %s' %
Expand Down Expand Up @@ -557,11 +582,9 @@ def _epicssignal_get(self, *, as_string=None, connection_timeout=1.0, **kwargs):
return ret


from ophyd import EpicsSignal
from ophyd import EpicsSignalRO
from ophyd import EpicsSignal, EpicsSignalRO
from ophyd.areadetector import EpicsSignalWithRBV

EpicsSignal.get = _epicssignal_get
EpicsSignalRO.get = _epicssignal_get
EpicsSignalWithRBV.get = _epicssignal_get

6 changes: 3 additions & 3 deletions startup/13-mll.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ class HxnMLLSample(NamedDevice):
sz = Cpt(EpicsMotor, 'XF:03IDC-ES{ANC350:3-Ax:2}Mtr', doc='coarse z')

kill = Cpt(EpicsSignal, 'XF:03IDC-ES{Ppmac:1}KillAll-Cmd.PROC',
doc='kill all piezos')
doc='kill all piezos', kind='omitted')
zero = Cpt(EpicsSignal, 'XF:03IDC-ES{Ppmac:1}KillZero-Cmd.PROC',
doc='zero all piezos')
doc='zero all piezos', kind='omitted')
# sz1 = Cpt(EpicsMotor, 'XF:03IDC-ES{ANC350:3-Ax:3}Mtr', doc='coarse z1')
# sz1 was replaced with vz when controller 2 died

Expand Down Expand Up @@ -74,7 +74,7 @@ class HxnMLLDiffractionSample(NamedDevice):
dssz = Cpt(EpicsMotor, 'XF:03IDC-ES{Ppmac:1-dssz}Mtr', doc='fine_z')

kill = Cpt(EpicsSignal, 'XF:03IDC-ES{Ppmac:1-Diff}Kill-Cmd.PROC',
doc='kill all piezos')
doc='kill all piezos', kind='omitted')


smlld = HxnMLLDiffractionSample('', name='smlld')
Expand Down
4 changes: 2 additions & 2 deletions startup/15-zp.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ class HxnZPSample(NamedDevice):
smarthy = Cpt(SmarpodRotationAxis, axis=3, doc='smarpod theta around y')
smarthz = Cpt(SmarpodRotationAxis, axis=1, doc='smarpod theta around z')

kill = Cpt(EpicsSignal, 'XF:03IDC-ES{Ppmac:1-ZP}Kill-Cmd.PROC')
zero = Cpt(EpicsSignal, 'XF:03IDC-ES{Ppmac:1-ZP}Zero-Cmd.PROC')
kill = Cpt(EpicsSignal, 'XF:03IDC-ES{Ppmac:1-ZP}Kill-Cmd.PROC', kind='omitted')
zero = Cpt(EpicsSignal, 'XF:03IDC-ES{Ppmac:1-ZP}Zero-Cmd.PROC', kind='omitted')
mode = Cpt(EpicsSignal, 'XF:03IDC-ES{Ppmac:1-ZP}Mode-I')


Expand Down
11 changes: 8 additions & 3 deletions startup/21-xspress3.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from ophyd.device import (Component as Cpt)

from hxntools.detectors.xspress3 import (Xspress3FileStore,
Xspress3Channel)
from nslsii.detectors.xspress3 import (Xspress3FileStore,
Xspress3Channel)
from hxntools.detectors.hxn_xspress3 import HxnXspress3DetectorBase
import threading
from ophyd import DeviceStatus
Expand Down Expand Up @@ -139,7 +139,12 @@ def unstage(self, *args, **kwargs):


# Create directories on the xspress3 server, otherwise scans can fail:
xspress3.make_directories.put(True)
#====================================================================
# Note from DAMA: commented it out during the 2020-2 deployment visit.
# The corresponding code is in
# https://github.com/NSLS-II/nslsii/blob/master/nslsii/detectors/xspress3.py
#====================================================================
# xspress3.make_directories.put(True)


elem_K_list = np.array(['Na','Mg','Al','Si','P','S','Cl','Ar','K','Ca','Sc','Ti','V','Cr','Mn','Fe','Co','Ni','Cu','Zn','Ga','Ge','As','Se','Br','Kr','Rb','Sr','Y','Zr','Nb','Mo','Tc','Ru','Rh','Pd','Ag','Cd','In','Sn','Sb','Te','I','Xe','Cs','Ba','La','Hf','Ta','W','Re','Os','Ir','Pt','Au','Hg','Tl','Pb','Bi','Po','At','Rn','Fr','Ra','Ac','Ce','Pr','Nd','Pm','Sm','Eu','Gd','Tb','Dy','Ho','Er','Tm','Yb','Lu','Th','Pa','U','Np','Pu','Am','Cm','Bk','Cf'])
Expand Down
Loading