From 55cfe6082bc4533f86228e1d3245c706b8130795 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 21 Jan 2022 17:00:42 -0500 Subject: [PATCH 01/17] Re-key ib's 'unreportable trades' (tick 48) as --- piker/brokers/ib.py | 12 ++++++++++-- piker/data/_normalize.py | 12 ++++++++---- 2 files changed, 18 insertions(+), 6 deletions(-) diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index b378f5f20..4dcf7b14c 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -1032,7 +1032,11 @@ async def get_client( # https://interactivebrokers.github.io/tws-api/tick_types.html tick_types = { 77: 'trade', - 48: 'utrade', + + # a "utrade" aka an off exchange "unreportable" (dark) vlm: + # https://interactivebrokers.github.io/tws-api/tick_types.html#rt_volume + 48: 'dark_trade', + 0: 'bsize', 1: 'bid', 2: 'ask', @@ -1046,13 +1050,17 @@ async def get_client( def normalize( ticker: Ticker, calc_price: bool = False + ) -> dict: # convert named tuples to dicts so we send usable keys new_ticks = [] for tick in ticker.ticks: if tick and not isinstance(tick, dict): td = tick._asdict() - td['type'] = tick_types.get(td['tickType'], 'n/a') + td['type'] = tick_types.get( + td['tickType'], + 'n/a', + ) new_ticks.append(td) diff --git a/piker/data/_normalize.py b/piker/data/_normalize.py index 3474879e9..56d64b75e 100644 --- a/piker/data/_normalize.py +++ b/piker/data/_normalize.py @@ -25,14 +25,18 @@ def iterticks( quote: dict, - types: Tuple[str] = ('trade', 'utrade'), + types: Tuple[str] = ('trade', 'dark_trade'), + ) -> AsyncIterator: - """Iterate through ticks delivered per quote cycle. - """ + ''' + Iterate through ticks delivered per quote cycle. + + ''' # print(f"{quote}\n\n") ticks = quote.get('ticks', ()) if ticks: for tick in ticks: # print(f"{quote['symbol']}: {tick}") - if tick.get('type') in types: + ttype = tick.get('type') + if ttype in types: yield tick From 13b8807f1ffb1956d9fa2d9183aa7d2bb41658dc Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 21 Jan 2022 17:01:18 -0500 Subject: [PATCH 02/17] Print dark trades to console for the moment --- piker/fsp/_volume.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/piker/fsp/_volume.py b/piker/fsp/_volume.py index e662343f4..9bb88a418 100644 --- a/piker/fsp/_volume.py +++ b/piker/fsp/_volume.py @@ -105,6 +105,7 @@ async def _tina_vwap( # @fsp.config( # name='dolla_vlm', +# fields=('dolla_vlm', 'dark_$vlm' # ohlc=False, # style='step', # ) @@ -138,6 +139,10 @@ async def dolla_vlm( async for quote in source: for tick in iterticks(quote): + ttype = tick.get('type') + if ttype == 'dark_trade': + print(f'dark_trade: {tick}') + # this computes tick-by-tick weightings from here forward size = tick['size'] price = tick['price'] @@ -152,7 +157,9 @@ async def dolla_vlm( ][0] lvlm += price * size - tina_lvlm = c+h+l/3 * v + + # TODO: plot both to compare? + # tina_lvlm = c+h+l/3 * v # print(f' tinal vlm: {tina_lvlm}') yield lvlm From c6a3c66e7eeb541ee155e7737e858052372edf28 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 26 Jan 2022 14:38:49 -0500 Subject: [PATCH 03/17] WIP start a `@piker.fsp` API for registering processors --- piker/fsp/_api.py | 178 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 178 insertions(+) create mode 100644 piker/fsp/_api.py diff --git a/piker/fsp/_api.py b/piker/fsp/_api.py new file mode 100644 index 000000000..6c3c6cd76 --- /dev/null +++ b/piker/fsp/_api.py @@ -0,0 +1,178 @@ +# piker: trading gear for hackers +# Copyright (C) Tyler Goodlet (in stewardship of pikers) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +''' +FSP (financial signal processing) apis. + +''' +from __future__ import annotations +from functools import partial +from typing import ( + Any, + Callable, + Awaitable, + Optional, +) + +import numpy as np +import tractor +from tractor._portal import NamespacePath +# import wrapt + +from ..data._sharedmem import ( + ShmArray, + maybe_open_shm_array, +) + + +# global fsp registry filled out by @fsp decorator below +_fsp_builtins = {} + # 'rsi': _rsi, + # 'wma': _wma, + # 'vwap': _tina_vwap, + # 'dolla_vlm': dolla_vlm, +# } + +def _load_builtins() -> dict[tuple, Callable]: + from ._momo import _rsi, _wma + from ._volume import tina_vwap, dolla_vlm + + return _fsp_builtins + +# TODO: things to figure the heck out: +# - how to handle non-plottable values (pyqtgraph has facility for this +# now in `arrayToQPath()`) +# - composition of fsps / implicit chaining syntax (we need an issue) + + +class Fsp: + ''' + "Financial signal processor" decorator wrapped async function. + + ''' + + # TODO: checkout the advanced features from ``wrapt``: + # - dynamic enable toggling, + # https://wrapt.readthedocs.io/en/latest/decorators.html#dynamically-disabling-decorators + # - custom object proxies, might be useful for implementing n-compose + # https://wrapt.readthedocs.io/en/latest/wrappers.html#custom-object-proxies + # - custom function wrappers, + # https://wrapt.readthedocs.io/en/latest/wrappers.html#custom-function-wrappers + + def __init__( + self, + func: Callable[..., Awaitable], + *, + outputs: tuple[str] = (), + display_name: Optional[str] = None, + **config, + + ) -> None: + # if wrapped is not None: + # self.name = wrapped.__name__ + # TODO: should we make this a wrapt object proxy? + self.func = func + self.__name__ = func.__name__ + self.__module__ = func.__module__ + self.ns_path: tuple[str, str] = NamespacePath.from_ref(func) + _fsp_builtins[self.ns_path] = func + self.outputs = outputs + self.config: dict[str, Any] = config + + # @wrapt.decorator + def __call__( + self, + + # TODO: when we settle on py3.10 we should probably use the new + # type annots from pep 612: + # https://www.python.org/dev/peps/pep-0612/ + # instance, + *args, + **kwargs + ): + return self.func(*args, **kwargs) + # return wrapped(*args, **kwargs) + + +def fsp( + wrapped=None, + *, + outputs: tuple[str] = (), + display_name: Optional[str] = None, + **config, + +) -> Fsp: + # @wrapt.decorator + # def wrapper(wrapped, instance, args, kwargs): + # return wrapped(*args, **kwargs) + + if wrapped is None: + # return functools.partial(with_optional_arguments, + # myarg1=myarg1, myarg2=myarg2) + return partial( + Fsp, + outputs=outputs, + display_name=display_name, + **config, + ) + + # return wrapper(wrapped) + return Fsp(wrapped, outputs=(wrapped.__name__,)) + # outputs=outputs, + # display_name=display_name, + # **config, + # )(wrapped) + + +def maybe_mk_fsp_shm( + sym: str, + target: fsp, + # field_name: str, + # display_name: Optional[str] = None, + readonly: bool = True, + +) -> (ShmArray, bool): + ''' + Allocate a single row shm array for an symbol-fsp pair if none + exists, otherwise load the shm already existing for that token. + + ''' + uid = tractor.current_actor().uid + + # load declared fields from fsp and allocate in + # shm array. + # if not display_name: + # display_name = field_name + + # TODO: load function here and introspect + # return stream type(s) + display_name = target.__name__ + + # TODO: should `index` be a required internal field? + fsp_dtype = np.dtype( + [('index', int)] + + [(field_name, float) for field_name in target.outputs] + ) + + key = f'{sym}.fsp.{display_name}.{".".join(uid)}' + + shm, opened = maybe_open_shm_array( + key, + # TODO: create entry for each time frame + dtype=fsp_dtype, + readonly=True, + ) + return shm, opened From 72f44742736a9b49af69070c4251e758eb8f6b13 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 27 Jan 2022 09:08:03 -0500 Subject: [PATCH 04/17] Use new decorator on volume fsp routines --- piker/fsp/_volume.py | 39 ++++++++++++++++++++++----------------- 1 file changed, 22 insertions(+), 17 deletions(-) diff --git a/piker/fsp/_volume.py b/piker/fsp/_volume.py index 9bb88a418..2a7ac2e85 100644 --- a/piker/fsp/_volume.py +++ b/piker/fsp/_volume.py @@ -1,5 +1,5 @@ # piker: trading gear for hackers -# Copyright (C) 2018-present Tyler Goodlet (in stewardship of piker0) +# Copyright (C) Tyler Goodlet (in stewardship of pikers) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by @@ -19,6 +19,7 @@ import numpy as np from tractor.trionics._broadcast import AsyncReceiver +from ._api import fsp from ..data._normalize import iterticks from ..data._sharedmem import ShmArray @@ -50,7 +51,8 @@ def wap( ) -async def _tina_vwap( +@fsp +async def tina_vwap( source: AsyncReceiver[dict], ohlcv: ShmArray, # OHLC sampled history @@ -62,7 +64,8 @@ async def _tina_vwap( AsyncIterator[np.ndarray], float ]: - '''Streaming volume weighted moving average. + ''' + Streaming volume weighted moving average. Calling this "tina" for now since we're using HLC3 instead of tick. @@ -100,27 +103,25 @@ async def _tina_vwap( w_tot += price * size # yield ((((o + h + l) / 3) * v) weights_tot) / v_tot - yield w_tot / v_tot + yield 'tina_vwap', w_tot / v_tot -# @fsp.config( -# name='dolla_vlm', -# fields=('dolla_vlm', 'dark_$vlm' -# ohlc=False, -# style='step', -# ) +@fsp( + outputs=('dolla_vlm', 'dark_vlm'), + ohlc=False, + curve_style='step', +) async def dolla_vlm( source: AsyncReceiver[dict], ohlcv: ShmArray, # OHLC sampled history -) -> Union[ - AsyncIterator[np.ndarray], - float +) -> AsyncIterator[ + tuple[str, Union[np.ndarray, float]], ]: ''' "Dollar Volume", aka the volume in asset-currency-units (usually a fiat) computed from some price function for the sample step - *times* the asset unit volume. + *multiplied* (*) by the asset unit volume. Useful for comparing cross asset "money flow" in #s that are asset-currency-independent. @@ -130,7 +131,7 @@ async def dolla_vlm( chl3 = (a['close'] + a['high'] + a['low']) / 3 v = a['volume'] - # history + # on first iteration yield history yield chl3 * v i = ohlcv.index @@ -141,7 +142,11 @@ async def dolla_vlm( ttype = tick.get('type') if ttype == 'dark_trade': - print(f'dark_trade: {tick}') + # print(f'dark_trade: {tick}') + key = 'dark_vlm' + + else: + key = 'dolla_vlm' # this computes tick-by-tick weightings from here forward size = tick['size'] @@ -162,4 +167,4 @@ async def dolla_vlm( # tina_lvlm = c+h+l/3 * v # print(f' tinal vlm: {tina_lvlm}') - yield lvlm + yield key, lvlm From cc5390376ca6f81cbc7c67bd01638ecd3b0ee111 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 27 Jan 2022 18:57:16 -0500 Subject: [PATCH 05/17] Use `Fsp` abstration layer through engine and UI Instead of referencing the remote processing funcs by a `str` name start embracing the new `@fsp`/`Fsp` API such that wrapped processing functions are first class APIs. Summary of the changeset: - move and load the fsp built-in set in the new `.fsp._api` module - handle processors ("fsps") which want to yield multiple keyed-values (interleaved in time) by expecting both history that is keyed and assigned to the appropriate struct-array field, *and* real-time `yield`ed value in tuples of the form `tuple[str, float]` such that any one (async) processing function can deliver multiple outputs from the same base calculation. - drop `maybe_mk_fsp_shm()` from UI module - expect and manage `Fsp` instances (`@fsp` decorated funcs) throughout the UI code, particularly the `FspAdmin` layer. --- piker/fsp/_engine.py | 58 +++++++++------- piker/ui/_fsp.py | 162 ++++++++++++++++++++++--------------------- 2 files changed, 117 insertions(+), 103 deletions(-) diff --git a/piker/fsp/_engine.py b/piker/fsp/_engine.py index aafaf76cc..dac672bb6 100644 --- a/piker/fsp/_engine.py +++ b/piker/fsp/_engine.py @@ -1,5 +1,5 @@ # piker: trading gear for hackers -# Copyright (C) Tyler Goodlet (in stewardship of piker0) +# Copyright (C) Tyler Goodlet (in stewardship of pikers) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by @@ -27,29 +27,19 @@ import trio from trio_typing import TaskStatus import tractor +from tractor._portal import NamespacePath from ..log import get_logger, get_console_log from .. import data from ..data import attach_shm_array from ..data.feed import Feed from ..data._sharedmem import ShmArray -from ._momo import _rsi, _wma -from ._volume import _tina_vwap, dolla_vlm +# from ._momo import _rsi, _wma +# from ._volume import _tina_vwap, dolla_vlm +from ._api import _load_builtins log = get_logger(__name__) -_fsp_builtins = { - 'rsi': _rsi, - 'wma': _wma, - 'vwap': _tina_vwap, - 'dolla_vlm': dolla_vlm, -} - -# TODO: things to figure the heck out: -# - how to handle non-plottable values (pyqtgraph has facility for this -# now in `arrayToQPath()`) -# - composition of fsps / implicit chaining syntax (we need an issue) - @dataclass class TaskTracker: @@ -88,7 +78,7 @@ async def fsp_compute( src: ShmArray, dst: ShmArray, - func_name: str, + # func_name: str, func: Callable, attach_stream: bool = False, @@ -115,15 +105,27 @@ async def fsp_compute( # and get historical output history_output = await out_stream.__anext__() + func_name = func.__name__ profiler(f'{func_name} generated history') - # build a struct array which includes an 'index' field to push - # as history + # build struct array with an 'index' field to push as history history = np.array( np.arange(len(history_output)), dtype=dst.array.dtype ) - history[func_name] = history_output + + # TODO: push using a[['f0', 'f1', .., 'fn']] = .. syntax no? + # if the output array is multi-field then push + # each respective field. + fields = getattr(history.dtype, 'fields', None) + if fields: + for key in fields.keys(): + if key in history.dtype.fields: + history[func_name] = history_output + + # single-key output stream + else: + history[func_name] = history_output # TODO: XXX: # THERE'S A BIG BUG HERE WITH THE `index` field since we're @@ -164,8 +166,9 @@ async def fsp_compute( async for processed in out_stream: log.debug(f"{func_name}: {processed}") + key, output = processed index = src.index - dst.array[-1][func_name] = processed + dst.array[-1][key] = output # NOTE: for now we aren't streaming this to the consumer # stream latest array index entry which basically just acts @@ -194,7 +197,7 @@ async def cascade( src_shm_token: dict, dst_shm_token: tuple[str, np.dtype], - func_name: str, + ns_path: NamespacePath, zero_on_step: bool = False, loglevel: Optional[str] = None, @@ -213,10 +216,14 @@ async def cascade( src = attach_shm_array(token=src_shm_token) dst = attach_shm_array(readonly=False, token=dst_shm_token) - func: Callable = _fsp_builtins.get(func_name) + # func: Callable = _fsp_builtins.get(tuple(ns_path)) + func: Fsp = _load_builtins().get( + NamespacePath(ns_path) + ) + if not func: # TODO: assume it's a func target path - raise ValueError('Unknown fsp target: {func_name}') + raise ValueError(f'Unknown fsp target: {ns_path}') # open a data feed stream with requested broker async with data.feed.maybe_open_feed( @@ -231,11 +238,12 @@ async def cascade( ) as (feed, quote_stream): - profiler(f'{func_name}: feed up') + profiler(f'{func}: feed up') assert src.token == feed.shm.token # last_len = new_len = len(src.array) + func_name = func.__name__ async with ( trio.open_nursery() as n, ): @@ -252,7 +260,7 @@ async def cascade( src=src, dst=dst, - func_name=func_name, + # func_name=func_name, func=func ) diff --git a/piker/ui/_fsp.py b/piker/ui/_fsp.py index 22196f231..97dd5022e 100644 --- a/piker/ui/_fsp.py +++ b/piker/ui/_fsp.py @@ -29,6 +29,7 @@ from pydantic import create_model import tractor # from tractor.trionics import gather_contexts +from tractor._portal import NamespacePath import pyqtgraph as pg import trio from trio_typing import TaskStatus @@ -38,57 +39,25 @@ from ..calc import humanize from ..data._sharedmem import ( ShmArray, - maybe_open_shm_array, try_read, ) from ._chart import ( ChartPlotWidget, LinkedSplits, ) -from .. import fsp from ._forms import ( FieldsForm, mk_form, open_form_input_handling, ) +from ..fsp._api import maybe_mk_fsp_shm, Fsp +from ..fsp import cascade +from ..fsp._volume import tina_vwap, dolla_vlm from ..log import get_logger log = get_logger(__name__) -def maybe_mk_fsp_shm( - sym: str, - field_name: str, - display_name: Optional[str] = None, - readonly: bool = True, - -) -> (ShmArray, bool): - ''' - Allocate a single row shm array for an symbol-fsp pair if none - exists, otherwise load the shm already existing for that token. - - ''' - uid = tractor.current_actor().uid - if not display_name: - display_name = field_name - - # TODO: load function here and introspect - # return stream type(s) - - # TODO: should `index` be a required internal field? - fsp_dtype = np.dtype([('index', int), (field_name, float)]) - - key = f'{sym}.fsp.{display_name}.{".".join(uid)}' - - shm, opened = maybe_open_shm_array( - key, - # TODO: create entry for each time frame - dtype=fsp_dtype, - readonly=True, - ) - return shm, opened - - def has_vlm(ohlcv: ShmArray) -> bool: # make sure that the instrument supports volume history # (sometimes this is not the case for some commodities and @@ -148,11 +117,13 @@ async def open_fsp_sidepane( assert len(conf) == 1 # for now # add (single) selection widget - for display_name, config in conf.items(): - schema[display_name] = { + for name, config in conf.items(): + # schema[display_name] = { + # name = target.__name__ + schema[name] = { 'label': '**fsp**:', 'type': 'select', - 'default_value': [display_name], + 'default_value': [name], } # add parameters for selection "options" @@ -180,7 +151,7 @@ async def open_fsp_sidepane( # https://pydantic-docs.helpmanual.io/usage/models/#dynamic-model-creation FspConfig = create_model( 'FspConfig', - name=display_name, + name=name, **params, ) sidepane.model = FspConfig() @@ -228,8 +199,9 @@ async def run_fsp_ui( linkedsplits: LinkedSplits, shm: ShmArray, started: trio.Event, - func_name: str, - display_name: str, + target: Fsp, + # func_name: str, + # display_name: str, conf: dict[str, dict], loglevel: str, # profiler: pg.debug.Profiler, @@ -245,12 +217,14 @@ async def run_fsp_ui( ''' # profiler(f'started UI task for fsp: {func_name}') + name = target.__name__ async with ( # side UI for parameters/controls open_fsp_sidepane( linkedsplits, - {display_name: conf}, + {name: conf}, + # {display_name: conf}, ) as sidepane, ): await started.wait() @@ -264,24 +238,29 @@ async def run_fsp_ui( chart = linkedsplits.subplots[overlay_with] chart.draw_curve( - name=display_name, + # name=display_name, + name=name, data=shm.array, overlay=True, color='default_light', - array_key=func_name, + # array_key=func_name, + array_key=name, separate_axes=conf.get('separate_axes', False), **conf.get('chart_kwargs', {}) ) # specially store ref to shm for lookup in display loop - chart._overlays[display_name] = shm + # chart._overlays[display_name] = shm + chart._overlays[name] = shm else: # create a new sub-chart widget for this fsp chart = linkedsplits.add_plot( - name=display_name, + name=name, + # name=display_name, array=shm.array, - array_key=func_name, + # array_key=func_name, + array_key=name, sidepane=sidepane, # curve by default @@ -299,7 +278,8 @@ async def run_fsp_ui( # should **not** be the same sub-chart widget assert chart.name != linkedsplits.chart.name - array_key = func_name + # array_key = func_name + array_key = name # profiler(f'fsp:{func_name} chart created') @@ -307,7 +287,8 @@ async def run_fsp_ui( update_fsp_chart( chart, shm, - display_name, + name, + # display_name, array_key=array_key, ) @@ -410,7 +391,7 @@ async def open_chain( started: trio.Event, dst_shm: ShmArray, conf: dict, - func_name: str, + target: Fsp, loglevel: str, ) -> None: @@ -420,11 +401,12 @@ async def open_chain( ''' brokername, sym = self.linked.symbol.front_feed() + ns_path = NamespacePath.from_ref(target) async with ( portal.open_context( # chaining entrypoint - fsp.cascade, + cascade, # data feed key brokername=brokername, @@ -435,7 +417,8 @@ async def open_chain( dst_shm_token=dst_shm.token, # target - func_name=func_name, + ns_path=str(ns_path), + # func_name=func_name, loglevel=loglevel, zero_on_step=conf.get('zero_on_step', False), @@ -444,8 +427,13 @@ async def open_chain( ctx.open_stream() as stream, ): # register output data - self._registry[(brokername, sym, func_name)] = ( - stream, dst_shm, complete) + self._registry[ + (brokername, sym, ns_path) + ] = ( + stream, + dst_shm, + complete + ) started.set() @@ -455,7 +443,8 @@ async def open_chain( async def start_engine_task( self, - display_name: str, + target: Fsp, + # display_name: str, conf: dict[str, dict[str, Any]], worker_name: Optional[str] = None, @@ -464,17 +453,21 @@ async def start_engine_task( ) -> (ShmArray, trio.Event): # unpack FSP details from config dict - func_name = conf['func_name'] - + # func_name = conf['func_name'] + # func_name = target.__name__ + fqsn = self.linked.symbol.front_feed() # allocate an output shm array dst_shm, opened = maybe_mk_fsp_shm( - self.linked.symbol.front_feed(), - field_name=func_name, - display_name=display_name, + fqsn, + # field_name=func_name, + # display_name=display_name, + target=target, readonly=True, ) - if not opened: - raise RuntimeError(f'Already started FSP {func_name}') + # if not opened: + # raise RuntimeError( + # f'Already started FSP `{fqsn}:{func_name}`' + # ) portal = self.cluster.get(worker_name) or self.rr_next_portal() complete = trio.Event() @@ -487,7 +480,8 @@ async def start_engine_task( started, dst_shm, conf, - func_name, + # func_name, + target, loglevel, ) @@ -495,16 +489,21 @@ async def start_engine_task( async def open_fsp_chart( self, - display_name: str, + + target: Fsp, + # display_name: str, + conf: dict, # yeah probably dumb.. loglevel: str = 'error', ) -> (trio.Event, ChartPlotWidget): - func_name = conf['func_name'] + # func_name = conf['func_name'] + # func_name = target.__name__ shm, started = await self.start_engine_task( - display_name, + target, + # display_name, conf, loglevel, ) @@ -517,8 +516,9 @@ async def open_fsp_chart( self.linked, shm, started, - func_name, - display_name, + # func_name, + # display_name, + target, conf=conf, loglevel=loglevel, @@ -671,8 +671,9 @@ def maxmin(name) -> tuple[float, float]: # spawn and overlay $ vlm on the same subchart shm, started = await admin.start_engine_task( - 'dolla_vlm', - # linked.symbol.front_feed(), # data-feed symbol key + # 'dolla_vlm', + dolla_vlm, + { # fsp engine conf 'func_name': 'dolla_vlm', 'zero_on_step': True, @@ -759,15 +760,18 @@ def maxmin(name) -> tuple[float, float]: axis.size_to_values() # built-in vlm fsps - for display_name, conf in { - 'vwap': { - 'func_name': 'vwap', + # for display_name, conf in { + for target, conf in { + # 'vwap': { + tina_vwap: { + # 'func_name': 'vwap', 'overlay': 'ohlc', # overlays with OHLCV (main) chart 'anchor': 'session', }, }.items(): started = await admin.open_fsp_chart( - display_name, + # display_name, + target, conf, ) @@ -822,20 +826,22 @@ async def start_fsp_displays( open_fsp_admin(linked, ohlcv) as admin, ): statuses = [] - for display_name, conf in fsp_conf.items(): + # for display_name, conf in fsp_conf.items(): + for target, conf in fsp_conf.items(): started = await admin.open_fsp_chart( - display_name, + # display_name, + target, conf, ) done = linked.window().status_bar.open_status( - f'loading fsp, {display_name}..', + f'loading fsp, {target}..', group_key=group_status_key, ) statuses.append((started, done)) for fsp_loaded, status_cb in statuses: await fsp_loaded.wait() - profiler(f'attached to fsp portal: {display_name}') + profiler(f'attached to fsp portal: {target}') status_cb() # blocks on nursery until all fsp actors complete From 9d9929fb899b9369b5a585fa6b32ec84c4a774eb Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 28 Jan 2022 07:18:14 -0500 Subject: [PATCH 06/17] Drop old `wrapt` cruft, add `Fsp.name` --- piker/fsp/_api.py | 77 ++++++++++++++++++++--------------------------- 1 file changed, 32 insertions(+), 45 deletions(-) diff --git a/piker/fsp/_api.py b/piker/fsp/_api.py index 6c3c6cd76..05bb3f99f 100644 --- a/piker/fsp/_api.py +++ b/piker/fsp/_api.py @@ -18,8 +18,15 @@ FSP (financial signal processing) apis. ''' + +# TODO: things to figure the heck out: +# - how to handle non-plottable values (pyqtgraph has facility for this +# now in `arrayToQPath()`) +# - composition of fsps / implicit chaining syntax (we need an issue) + from __future__ import annotations from functools import partial +from pprint import pformat from typing import ( Any, Callable, @@ -30,32 +37,27 @@ import numpy as np import tractor from tractor._portal import NamespacePath -# import wrapt from ..data._sharedmem import ( ShmArray, maybe_open_shm_array, ) +from ..log import get_logger +log = get_logger(__name__) # global fsp registry filled out by @fsp decorator below -_fsp_builtins = {} - # 'rsi': _rsi, - # 'wma': _wma, - # 'vwap': _tina_vwap, - # 'dolla_vlm': dolla_vlm, -# } +_fsp_registry = {} + def _load_builtins() -> dict[tuple, Callable]: - from ._momo import _rsi, _wma - from ._volume import tina_vwap, dolla_vlm - return _fsp_builtins + # import to implicity trigger registration via ``@fsp`` + from . import _momo # noqa + from . import _volume # noqa -# TODO: things to figure the heck out: -# - how to handle non-plottable values (pyqtgraph has facility for this -# now in `arrayToQPath()`) -# - composition of fsps / implicit chaining syntax (we need an issue) + log.info(f'Registered FSP set:\n{pformat(_fsp_registry)}') + return _fsp_registry class Fsp: @@ -81,18 +83,24 @@ def __init__( **config, ) -> None: - # if wrapped is not None: - # self.name = wrapped.__name__ - # TODO: should we make this a wrapt object proxy? + + # TODO (maybe): + # - type introspection? + # - should we make this a wrapt object proxy? self.func = func - self.__name__ = func.__name__ - self.__module__ = func.__module__ + self.__name__ = func.__name__ # XXX: must have func-object name + self.ns_path: tuple[str, str] = NamespacePath.from_ref(func) - _fsp_builtins[self.ns_path] = func self.outputs = outputs self.config: dict[str, Any] = config - # @wrapt.decorator + # register with declared set. + _fsp_registry[self.ns_path] = func + + @property + def name(self) -> str: + return self.__name__ + def __call__( self, @@ -104,7 +112,6 @@ def __call__( **kwargs ): return self.func(*args, **kwargs) - # return wrapped(*args, **kwargs) def fsp( @@ -115,13 +122,8 @@ def fsp( **config, ) -> Fsp: - # @wrapt.decorator - # def wrapper(wrapped, instance, args, kwargs): - # return wrapped(*args, **kwargs) if wrapped is None: - # return functools.partial(with_optional_arguments, - # myarg1=myarg1, myarg2=myarg2) return partial( Fsp, outputs=outputs, @@ -129,19 +131,12 @@ def fsp( **config, ) - # return wrapper(wrapped) return Fsp(wrapped, outputs=(wrapped.__name__,)) - # outputs=outputs, - # display_name=display_name, - # **config, - # )(wrapped) def maybe_mk_fsp_shm( sym: str, target: fsp, - # field_name: str, - # display_name: Optional[str] = None, readonly: bool = True, ) -> (ShmArray, bool): @@ -152,22 +147,14 @@ def maybe_mk_fsp_shm( ''' uid = tractor.current_actor().uid - # load declared fields from fsp and allocate in - # shm array. - # if not display_name: - # display_name = field_name - - # TODO: load function here and introspect - # return stream type(s) - display_name = target.__name__ - - # TODO: should `index` be a required internal field? + # TODO: load output types from `Fsp` + # - should `index` be a required internal field? fsp_dtype = np.dtype( [('index', int)] + [(field_name, float) for field_name in target.outputs] ) - key = f'{sym}.fsp.{display_name}.{".".join(uid)}' + key = f'{sym}.fsp.{target.name}.{".".join(uid)}' shm, opened = maybe_open_shm_array( key, From be93ded0e5b7afd2d43df9ebf5c3196b06d432d8 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 28 Jan 2022 07:43:23 -0500 Subject: [PATCH 07/17] Log fsp registy entries in `cascade` startup --- piker/fsp/_api.py | 2 -- piker/fsp/_engine.py | 12 +++++++----- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/piker/fsp/_api.py b/piker/fsp/_api.py index 05bb3f99f..af57d2f84 100644 --- a/piker/fsp/_api.py +++ b/piker/fsp/_api.py @@ -26,7 +26,6 @@ from __future__ import annotations from functools import partial -from pprint import pformat from typing import ( Any, Callable, @@ -56,7 +55,6 @@ def _load_builtins() -> dict[tuple, Callable]: from . import _momo # noqa from . import _volume # noqa - log.info(f'Registered FSP set:\n{pformat(_fsp_registry)}') return _fsp_registry diff --git a/piker/fsp/_engine.py b/piker/fsp/_engine.py index dac672bb6..53c872e88 100644 --- a/piker/fsp/_engine.py +++ b/piker/fsp/_engine.py @@ -34,8 +34,7 @@ from ..data import attach_shm_array from ..data.feed import Feed from ..data._sharedmem import ShmArray -# from ._momo import _rsi, _wma -# from ._volume import _tina_vwap, dolla_vlm +from ._api import Fsp from ._api import _load_builtins log = get_logger(__name__) @@ -78,7 +77,6 @@ async def fsp_compute( src: ShmArray, dst: ShmArray, - # func_name: str, func: Callable, attach_stream: bool = False, @@ -216,8 +214,12 @@ async def cascade( src = attach_shm_array(token=src_shm_token) dst = attach_shm_array(readonly=False, token=dst_shm_token) - # func: Callable = _fsp_builtins.get(tuple(ns_path)) - func: Fsp = _load_builtins().get( + reg = _load_builtins() + lines = '\n'.join([f'{key.rpartition(":")[2]} => {key}' for key in reg]) + log.info( + f'Registered FSP set:\n{lines}' + ) + func: Fsp = reg.get( NamespacePath(ns_path) ) From bd2460846e9ab8c058aa7af21f8a11798ff01061 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 28 Jan 2022 07:43:49 -0500 Subject: [PATCH 08/17] Decorate momo routines --- piker/fsp/_momo.py | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/piker/fsp/_momo.py b/piker/fsp/_momo.py index 2ee55e000..29e94f986 100644 --- a/piker/fsp/_momo.py +++ b/piker/fsp/_momo.py @@ -23,6 +23,7 @@ import numpy as np from numba import jit, float64, optional, int64 +from ._api import fsp from ..data._normalize import iterticks from ..data._sharedmem import ShmArray @@ -106,7 +107,7 @@ def ema( # nopython=True, # nogil=True # ) -def rsi( +def _rsi( # TODO: use https://github.com/ramonhagenaars/nptyping signal: 'np.ndarray[float64]', @@ -146,7 +147,7 @@ def rsi( return rsi, up_ema[-1], down_ema[-1] -def wma( +def _wma( signal: np.ndarray, length: int, @@ -169,10 +170,8 @@ def wma( return np.convolve(signal, weights, 'valid') -# @piker.fsp.emit( -# timeframes=['1s', '5s', '15s', '1m', '5m', '1H'], -# ) -async def _rsi( +@fsp +async def rsi( source: 'QuoteStream[Dict[str, Any]]', # noqa ohlcv: ShmArray, @@ -188,11 +187,11 @@ async def _rsi( sig = ohlcv.array['close'] # wilder says to seed the RSI EMAs with the SMA for the "period" - seed = wma(ohlcv.last(period)['close'], period)[0] + seed = _wma(ohlcv.last(period)['close'], period)[0] # TODO: the emas here should be seeded with a period SMA as per # wilder's original formula.. - rsi_h, last_up_ema_close, last_down_ema_close = rsi( + rsi_h, last_up_ema_close, last_down_ema_close = _rsi( sig, period, seed, seed) up_ema_last = last_up_ema_close down_ema_last = last_down_ema_close @@ -218,7 +217,7 @@ async def _rsi( last_down_ema_close = down_ema_last index = ohlcv.index - rsi_out, up_ema_last, down_ema_last = rsi( + rsi_out, up_ema_last, down_ema_last = _rsi( sig, period=period, up_ema_last=last_up_ema_close, @@ -227,7 +226,8 @@ async def _rsi( yield rsi_out[-1:] -async def _wma( +@fsp +async def wma( source, #: AsyncStream[np.ndarray], length: int, @@ -243,10 +243,10 @@ async def _wma( ''' # deliver historical output as "first yield" - yield wma(ohlcv.array['close'], length) + yield _wma(ohlcv.array['close'], length) # begin real-time section async for quote in source: for tick in iterticks(quote, type='trade'): - yield wma(ohlcv.last(length)) + yield _wma(ohlcv.last(length)) From b112f24e7ede1795c9147647d2c62fd11a498157 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 28 Jan 2022 07:51:13 -0500 Subject: [PATCH 09/17] Drop old commented cruft, use `Fsp.name` --- piker/ui/_fsp.py | 55 ++++++++---------------------------------------- 1 file changed, 9 insertions(+), 46 deletions(-) diff --git a/piker/ui/_fsp.py b/piker/ui/_fsp.py index 97dd5022e..5112ede9d 100644 --- a/piker/ui/_fsp.py +++ b/piker/ui/_fsp.py @@ -28,8 +28,6 @@ import numpy as np from pydantic import create_model import tractor -# from tractor.trionics import gather_contexts -from tractor._portal import NamespacePath import pyqtgraph as pg import trio from trio_typing import TaskStatus @@ -118,8 +116,6 @@ async def open_fsp_sidepane( # add (single) selection widget for name, config in conf.items(): - # schema[display_name] = { - # name = target.__name__ schema[name] = { 'label': '**fsp**:', 'type': 'select', @@ -200,8 +196,6 @@ async def run_fsp_ui( shm: ShmArray, started: trio.Event, target: Fsp, - # func_name: str, - # display_name: str, conf: dict[str, dict], loglevel: str, # profiler: pg.debug.Profiler, @@ -216,19 +210,18 @@ async def run_fsp_ui( config. ''' - # profiler(f'started UI task for fsp: {func_name}') - name = target.__name__ + name = target.name + # profiler(f'started UI task for fsp: {name}') async with ( # side UI for parameters/controls open_fsp_sidepane( linkedsplits, {name: conf}, - # {display_name: conf}, ) as sidepane, ): await started.wait() - # profiler(f'fsp:{func_name} attached to fsp ctx-stream') + # profiler(f'fsp:{name} attached to fsp ctx-stream') overlay_with = conf.get('overlay', False) if overlay_with: @@ -238,28 +231,23 @@ async def run_fsp_ui( chart = linkedsplits.subplots[overlay_with] chart.draw_curve( - # name=display_name, name=name, data=shm.array, overlay=True, color='default_light', - # array_key=func_name, array_key=name, separate_axes=conf.get('separate_axes', False), **conf.get('chart_kwargs', {}) ) # specially store ref to shm for lookup in display loop - # chart._overlays[display_name] = shm chart._overlays[name] = shm else: # create a new sub-chart widget for this fsp chart = linkedsplits.add_plot( name=name, - # name=display_name, array=shm.array, - # array_key=func_name, array_key=name, sidepane=sidepane, @@ -278,17 +266,15 @@ async def run_fsp_ui( # should **not** be the same sub-chart widget assert chart.name != linkedsplits.chart.name - # array_key = func_name array_key = name - # profiler(f'fsp:{func_name} chart created') + # profiler(f'fsp:{name} chart created') # first UI update, usually from shm pushed history update_fsp_chart( chart, shm, name, - # display_name, array_key=array_key, ) @@ -301,7 +287,7 @@ async def run_fsp_ui( # logic inside ``.paint()`` for ``self.opts['fillLevel']`` which # might be the best solution? - # graphics = chart.update_from_array(chart.name, array[func_name]) + # graphics = chart.update_from_array(chart.name, array[name]) # graphics.curve.setBrush(50, 50, 200, 100) # graphics.curve.setFillLevel(50) @@ -401,7 +387,7 @@ async def open_chain( ''' brokername, sym = self.linked.symbol.front_feed() - ns_path = NamespacePath.from_ref(target) + ns_path = str(target.ns_path) async with ( portal.open_context( @@ -417,8 +403,7 @@ async def open_chain( dst_shm_token=dst_shm.token, # target - ns_path=str(ns_path), - # func_name=func_name, + ns_path=ns_path, loglevel=loglevel, zero_on_step=conf.get('zero_on_step', False), @@ -444,23 +429,18 @@ async def start_engine_task( self, target: Fsp, - # display_name: str, conf: dict[str, dict[str, Any]], worker_name: Optional[str] = None, - loglevel: str = 'error', + loglevel: str = 'info', ) -> (ShmArray, trio.Event): - # unpack FSP details from config dict - # func_name = conf['func_name'] - # func_name = target.__name__ fqsn = self.linked.symbol.front_feed() + # allocate an output shm array dst_shm, opened = maybe_mk_fsp_shm( fqsn, - # field_name=func_name, - # display_name=display_name, target=target, readonly=True, ) @@ -474,13 +454,11 @@ async def start_engine_task( started = trio.Event() self.tn.start_soon( self.open_chain, - portal, complete, started, dst_shm, conf, - # func_name, target, loglevel, ) @@ -491,19 +469,14 @@ async def open_fsp_chart( self, target: Fsp, - # display_name: str, conf: dict, # yeah probably dumb.. loglevel: str = 'error', ) -> (trio.Event, ChartPlotWidget): - # func_name = conf['func_name'] - # func_name = target.__name__ - shm, started = await self.start_engine_task( target, - # display_name, conf, loglevel, ) @@ -516,8 +489,6 @@ async def open_fsp_chart( self.linked, shm, started, - # func_name, - # display_name, target, conf=conf, @@ -671,7 +642,6 @@ def maxmin(name) -> tuple[float, float]: # spawn and overlay $ vlm on the same subchart shm, started = await admin.start_engine_task( - # 'dolla_vlm', dolla_vlm, { # fsp engine conf @@ -760,17 +730,13 @@ def maxmin(name) -> tuple[float, float]: axis.size_to_values() # built-in vlm fsps - # for display_name, conf in { for target, conf in { - # 'vwap': { tina_vwap: { - # 'func_name': 'vwap', 'overlay': 'ohlc', # overlays with OHLCV (main) chart 'anchor': 'session', }, }.items(): started = await admin.open_fsp_chart( - # display_name, target, conf, ) @@ -819,17 +785,14 @@ async def start_fsp_displays( disabled=False ) - # async with gather_contexts(( async with ( # NOTE: this admin internally opens an actor cluster open_fsp_admin(linked, ohlcv) as admin, ): statuses = [] - # for display_name, conf in fsp_conf.items(): for target, conf in fsp_conf.items(): started = await admin.open_fsp_chart( - # display_name, target, conf, ) From 67de8f24b98e1b3ccbb2ff3f1c74e0413ef766c8 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 28 Jan 2022 08:45:28 -0500 Subject: [PATCH 10/17] Init history output with `np.zeros()` --- piker/fsp/_engine.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/piker/fsp/_engine.py b/piker/fsp/_engine.py index 53c872e88..0a318a3e8 100644 --- a/piker/fsp/_engine.py +++ b/piker/fsp/_engine.py @@ -107,8 +107,8 @@ async def fsp_compute( profiler(f'{func_name} generated history') # build struct array with an 'index' field to push as history - history = np.array( - np.arange(len(history_output)), + history = np.zeros( + len(history_output), dtype=dst.array.dtype ) From 28b5be0719a8e8ec349630e7ad192a0d9bcccac7 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 28 Jan 2022 08:46:04 -0500 Subject: [PATCH 11/17] Accumulate dark vlm ticks independently per sample step --- piker/fsp/_volume.py | 37 ++++++++++++++++++++++--------------- 1 file changed, 22 insertions(+), 15 deletions(-) diff --git a/piker/fsp/_volume.py b/piker/fsp/_volume.py index 2a7ac2e85..7cf7d7b4f 100644 --- a/piker/fsp/_volume.py +++ b/piker/fsp/_volume.py @@ -135,19 +135,12 @@ async def dolla_vlm( yield chl3 * v i = ohlcv.index - lvlm = 0 + output = vlm = 0 + dvlm = 0 async for quote in source: for tick in iterticks(quote): - ttype = tick.get('type') - if ttype == 'dark_trade': - # print(f'dark_trade: {tick}') - key = 'dark_vlm' - - else: - key = 'dolla_vlm' - # this computes tick-by-tick weightings from here forward size = tick['size'] price = tick['price'] @@ -155,16 +148,30 @@ async def dolla_vlm( li = ohlcv.index if li > i: i = li - lvlm = 0 + vlm = 0 + dvlm = 0 - c, h, l, v = ohlcv.last()[ - ['close', 'high', 'low', 'volume'] - ][0] + # TODO: for marginned instruments (futes, etfs?) we need to + # show the margin $vlm by multiplying by whatever multiplier + # is reported in the sym info. - lvlm += price * size + ttype = tick.get('type') + if ttype == 'dark_trade': + print(f'dark_trade: {tick}') + key = 'dark_vlm' + dvlm += price * size + output = dvlm + + else: + key = 'dolla_vlm' + vlm += price * size + output = vlm # TODO: plot both to compare? + # c, h, l, v = ohlcv.last()[ + # ['close', 'high', 'low', 'volume'] + # ][0] # tina_lvlm = c+h+l/3 * v # print(f' tinal vlm: {tina_lvlm}') - yield key, lvlm + yield key, output From 06934be0474894641f01faf1e1fae93da60d311f Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 28 Jan 2022 08:46:24 -0500 Subject: [PATCH 12/17] Overlay dark $volume B) --- piker/ui/_fsp.py | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/piker/ui/_fsp.py b/piker/ui/_fsp.py index 5112ede9d..704ecf9ec 100644 --- a/piker/ui/_fsp.py +++ b/piker/ui/_fsp.py @@ -678,15 +678,10 @@ def maxmin(name) -> tuple[float, float]: pi.vb._maxmin = partial(maxmin, name='dolla_vlm') curve, _ = chart.draw_curve( - name='dolla_vlm', data=shm.array, - array_key='dolla_vlm', overlay=pi, - # color='bracket', - # TODO: this color or dark volume - # color='charcoal', step_mode=True, # **conf.get('chart_kwargs', {}) ) @@ -703,6 +698,17 @@ def maxmin(name) -> tuple[float, float]: # ``.draw_curve()``. chart._overlays['dolla_vlm'] = shm + curve, _ = chart.draw_curve( + + name='dark_vlm', + data=shm.array, + array_key='dark_vlm', + overlay=pi, + color='charcoal', # darker theme hue + step_mode=True, + # **conf.get('chart_kwargs', {}) + ) + chart._overlays['dark_vlm'] = shm # XXX: old dict-style config before it was moved into the # helper task # 'dolla_vlm': { From 6a0fba1eb36c3ff5bb71b98ba7f6dbdeba532da0 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 28 Jan 2022 11:45:47 -0500 Subject: [PATCH 13/17] Support maxmin over multiple arrays; Keep dark vlm in view --- piker/ui/_fsp.py | 26 +++++++++++++++++++------- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/piker/ui/_fsp.py b/piker/ui/_fsp.py index 704ecf9ec..937b7e1ec 100644 --- a/piker/ui/_fsp.py +++ b/piker/ui/_fsp.py @@ -592,14 +592,22 @@ async def open_vlm_displays( ) # force 0 to always be in view - def maxmin(name) -> tuple[float, float]: - mxmn = chart.maxmin(name=name) - if mxmn: - return 0, mxmn[1] + def maxmin( + names: list[str], - return 0, 0 + ) -> tuple[float, float]: + mx = 0 + for name in names: + mxmn = chart.maxmin(name=name) + if mxmn: + mx = max(mxmn[1], mx) - chart.view._maxmin = partial(maxmin, name='volume') + # if mx: + # return 0, mxmn[1] + + return 0, mx + + chart.view._maxmin = partial(maxmin, names=['volume']) # TODO: fix the x-axis label issue where if you put # the axis on the left it's totally not lined up... @@ -675,7 +683,11 @@ def maxmin(name) -> tuple[float, float]: ) # add custom auto range handler - pi.vb._maxmin = partial(maxmin, name='dolla_vlm') + pi.vb._maxmin = partial( + maxmin, + # keep both regular and dark vlm in view + names=['dolla_vlm', 'dark_vlm'], + ) curve, _ = chart.draw_curve( name='dolla_vlm', From 95b31cbc0fb89572bcf69569fc967709aa925977 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 29 Jan 2022 12:44:45 -0500 Subject: [PATCH 14/17] Drop references to deprecated `tractor.msg.pub` --- piker/brokers/data.py | 4 ++-- piker/brokers/kraken.py | 2 -- piker/data/_sampling.py | 11 +++++------ 3 files changed, 7 insertions(+), 10 deletions(-) diff --git a/piker/brokers/data.py b/piker/brokers/data.py index 48b20d80b..035d6f4c2 100644 --- a/piker/brokers/data.py +++ b/piker/brokers/data.py @@ -98,7 +98,7 @@ class BrokerFeed: ) -@tractor.msg.pub(tasks=['stock', 'option']) +@tractor.trionics.msgpub(tasks=['stock', 'option']) async def stream_poll_requests( get_topics: Callable, get_quotes: Coroutine, @@ -293,7 +293,7 @@ async def start_quote_stream( await stream_poll_requests( - # ``msg.pub`` required kwargs + # ``trionics.msgpub`` required kwargs task_name=feed_type, ctx=ctx, topics=symbols, diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index 3278e40bb..24d2dab3f 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -553,8 +553,6 @@ async def subscribe(ws: wsproto.WSConnection): quote = ohlc topic = quote['symbol'].lower() - # XXX: format required by ``tractor.msg.pub`` - # requires a ``Dict[topic: str, quote: dict]`` await send_chan.send({topic: quote}) diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index 5e702e08a..b29b0f7dc 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -176,12 +176,11 @@ async def sample_and_broadcast( # TODO: ``numba`` this! for sym, quote in quotes.items(): - # TODO: in theory you can send the IPC msg *before* - # writing to the sharedmem array to decrease latency, - # however, that will require `tractor.msg.pub` support - # here or at least some way to prevent task switching - # at the yield such that the array write isn't delayed - # while another consumer is serviced.. + # TODO: in theory you can send the IPC msg *before* writing + # to the sharedmem array to decrease latency, however, that + # will require at least some way to prevent task switching + # at the yield such that the array write isn't delayed while + # another consumer is serviced.. # start writing the shm buffer with appropriate # trade data From 296863348df159bf689ec39e47599dbe006cb72f Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 30 Jan 2022 12:46:20 -0500 Subject: [PATCH 15/17] Update imports to `tractor.msg.NamespacePath` --- piker/fsp/_api.py | 2 +- piker/fsp/_engine.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/piker/fsp/_api.py b/piker/fsp/_api.py index af57d2f84..f2c7cdc8f 100644 --- a/piker/fsp/_api.py +++ b/piker/fsp/_api.py @@ -35,7 +35,7 @@ import numpy as np import tractor -from tractor._portal import NamespacePath +from tractor.msg import NamespacePath from ..data._sharedmem import ( ShmArray, diff --git a/piker/fsp/_engine.py b/piker/fsp/_engine.py index 0a318a3e8..afd986e0b 100644 --- a/piker/fsp/_engine.py +++ b/piker/fsp/_engine.py @@ -27,7 +27,7 @@ import trio from trio_typing import TaskStatus import tractor -from tractor._portal import NamespacePath +from tractor.msg import NamespacePath from ..log import get_logger, get_console_log from .. import data From bb8fade16f92b0a556bad5bfaf20c00ea0bef2ca Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 30 Jan 2022 12:46:54 -0500 Subject: [PATCH 16/17] Update `msgpub` import from `tractor.experimental` --- piker/brokers/data.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/piker/brokers/data.py b/piker/brokers/data.py index 035d6f4c2..f0a8d3674 100644 --- a/piker/brokers/data.py +++ b/piker/brokers/data.py @@ -38,6 +38,7 @@ import trio import tractor +from tractor.experimental import msgpub from async_generator import asynccontextmanager from ..log import get_logger, get_console_log @@ -98,7 +99,7 @@ class BrokerFeed: ) -@tractor.trionics.msgpub(tasks=['stock', 'option']) +@msgpub(tasks=['stock', 'option']) async def stream_poll_requests( get_topics: Callable, get_quotes: Coroutine, From 20a24283a1545185ae5c43605e848e1bf1cd6be0 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 30 Jan 2022 12:51:32 -0500 Subject: [PATCH 17/17] Link to `tractor`'s master branch instead of pin --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index ca910e010..680360b0e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,7 @@ # we require a pinned dev branch to get some edge features that # are often untested in tractor's CI and/or being tested by us # first before committing as core features in tractor's base. --e git+git://github.com/goodboy/tractor.git@piker_pin#egg=tractor +-e git+git://github.com/goodboy/tractor.git@master#egg=tractor # `pyqtgraph` peeps keep breaking, fixing, improving so might as well # pin this to a dev branch that we have more control over especially