Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dark vlm #260

Merged
merged 17 commits into from
Jan 30, 2022
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions piker/brokers/ib.py
Original file line number Diff line number Diff line change
Expand Up @@ -1032,7 +1032,11 @@ async def get_client(
# https://interactivebrokers.github.io/tws-api/tick_types.html
tick_types = {
77: 'trade',
48: 'utrade',

# a "utrade" aka an off exchange "unreportable" (dark) vlm:
# https://interactivebrokers.github.io/tws-api/tick_types.html#rt_volume
48: 'dark_trade',

0: 'bsize',
1: 'bid',
2: 'ask',
Expand All @@ -1046,13 +1050,17 @@ async def get_client(
def normalize(
ticker: Ticker,
calc_price: bool = False

) -> dict:
# convert named tuples to dicts so we send usable keys
new_ticks = []
for tick in ticker.ticks:
if tick and not isinstance(tick, dict):
td = tick._asdict()
td['type'] = tick_types.get(td['tickType'], 'n/a')
td['type'] = tick_types.get(
td['tickType'],
'n/a',
)

new_ticks.append(td)

Expand Down
12 changes: 8 additions & 4 deletions piker/data/_normalize.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,18 @@

def iterticks(
quote: dict,
types: Tuple[str] = ('trade', 'utrade'),
types: Tuple[str] = ('trade', 'dark_trade'),

) -> AsyncIterator:
"""Iterate through ticks delivered per quote cycle.
"""
'''
Iterate through ticks delivered per quote cycle.

'''
# print(f"{quote}\n\n")
ticks = quote.get('ticks', ())
if ticks:
for tick in ticks:
# print(f"{quote['symbol']}: {tick}")
if tick.get('type') in types:
ttype = tick.get('type')
if ttype in types:
yield tick
163 changes: 163 additions & 0 deletions piker/fsp/_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship of pikers)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

'''
FSP (financial signal processing) apis.

'''

# TODO: things to figure the heck out:
# - how to handle non-plottable values (pyqtgraph has facility for this
# now in `arrayToQPath()`)
# - composition of fsps / implicit chaining syntax (we need an issue)

from __future__ import annotations
from functools import partial
from typing import (
Any,
Callable,
Awaitable,
Optional,
)

import numpy as np
import tractor
from tractor._portal import NamespacePath

from ..data._sharedmem import (
ShmArray,
maybe_open_shm_array,
)
from ..log import get_logger

log = get_logger(__name__)

# global fsp registry filled out by @fsp decorator below
_fsp_registry = {}


def _load_builtins() -> dict[tuple, Callable]:

# import to implicity trigger registration via ``@fsp``
from . import _momo # noqa
from . import _volume # noqa

return _fsp_registry


class Fsp:
'''
"Financial signal processor" decorator wrapped async function.

'''

# TODO: checkout the advanced features from ``wrapt``:
# - dynamic enable toggling,
# https://wrapt.readthedocs.io/en/latest/decorators.html#dynamically-disabling-decorators
# - custom object proxies, might be useful for implementing n-compose
# https://wrapt.readthedocs.io/en/latest/wrappers.html#custom-object-proxies
# - custom function wrappers,
# https://wrapt.readthedocs.io/en/latest/wrappers.html#custom-function-wrappers

def __init__(
self,
func: Callable[..., Awaitable],
*,
outputs: tuple[str] = (),
display_name: Optional[str] = None,
**config,

) -> None:

# TODO (maybe):
# - type introspection?
# - should we make this a wrapt object proxy?
self.func = func
self.__name__ = func.__name__ # XXX: must have func-object name

self.ns_path: tuple[str, str] = NamespacePath.from_ref(func)
self.outputs = outputs
self.config: dict[str, Any] = config

# register with declared set.
_fsp_registry[self.ns_path] = func

@property
def name(self) -> str:
return self.__name__

def __call__(
self,

# TODO: when we settle on py3.10 we should probably use the new
# type annots from pep 612:
# https://www.python.org/dev/peps/pep-0612/
# instance,
*args,
**kwargs
):
return self.func(*args, **kwargs)


def fsp(
wrapped=None,
*,
outputs: tuple[str] = (),
display_name: Optional[str] = None,
**config,

) -> Fsp:

if wrapped is None:
return partial(
Fsp,
outputs=outputs,
display_name=display_name,
**config,
)

return Fsp(wrapped, outputs=(wrapped.__name__,))


def maybe_mk_fsp_shm(
sym: str,
target: fsp,
readonly: bool = True,

) -> (ShmArray, bool):
'''
Allocate a single row shm array for an symbol-fsp pair if none
exists, otherwise load the shm already existing for that token.

'''
uid = tractor.current_actor().uid

# TODO: load output types from `Fsp`
# - should `index` be a required internal field?
fsp_dtype = np.dtype(
[('index', int)] +
[(field_name, float) for field_name in target.outputs]
)

key = f'{sym}.fsp.{target.name}.{".".join(uid)}'

shm, opened = maybe_open_shm_array(
key,
# TODO: create entry for each time frame
dtype=fsp_dtype,
readonly=True,
)
return shm, opened
64 changes: 37 additions & 27 deletions piker/fsp/_engine.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship of piker0)
# Copyright (C) Tyler Goodlet (in stewardship of pikers)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
Expand Down Expand Up @@ -27,29 +27,18 @@
import trio
from trio_typing import TaskStatus
import tractor
from tractor._portal import NamespacePath

from ..log import get_logger, get_console_log
from .. import data
from ..data import attach_shm_array
from ..data.feed import Feed
from ..data._sharedmem import ShmArray
from ._momo import _rsi, _wma
from ._volume import _tina_vwap, dolla_vlm
from ._api import Fsp
from ._api import _load_builtins

log = get_logger(__name__)

_fsp_builtins = {
'rsi': _rsi,
'wma': _wma,
'vwap': _tina_vwap,
'dolla_vlm': dolla_vlm,
}

# TODO: things to figure the heck out:
# - how to handle non-plottable values (pyqtgraph has facility for this
# now in `arrayToQPath()`)
# - composition of fsps / implicit chaining syntax (we need an issue)


@dataclass
class TaskTracker:
Expand Down Expand Up @@ -88,7 +77,6 @@ async def fsp_compute(
src: ShmArray,
dst: ShmArray,

func_name: str,
func: Callable,

attach_stream: bool = False,
Expand All @@ -115,15 +103,27 @@ async def fsp_compute(
# and get historical output
history_output = await out_stream.__anext__()

func_name = func.__name__
profiler(f'{func_name} generated history')

# build a struct array which includes an 'index' field to push
# as history
history = np.array(
np.arange(len(history_output)),
# build struct array with an 'index' field to push as history
history = np.zeros(
len(history_output),
dtype=dst.array.dtype
)
history[func_name] = history_output

# TODO: push using a[['f0', 'f1', .., 'fn']] = .. syntax no?
# if the output array is multi-field then push
# each respective field.
fields = getattr(history.dtype, 'fields', None)
if fields:
for key in fields.keys():
if key in history.dtype.fields:
history[func_name] = history_output

# single-key output stream
else:
history[func_name] = history_output

# TODO: XXX:
# THERE'S A BIG BUG HERE WITH THE `index` field since we're
Expand Down Expand Up @@ -164,8 +164,9 @@ async def fsp_compute(
async for processed in out_stream:

log.debug(f"{func_name}: {processed}")
key, output = processed
index = src.index
dst.array[-1][func_name] = processed
dst.array[-1][key] = output

# NOTE: for now we aren't streaming this to the consumer
# stream latest array index entry which basically just acts
Expand Down Expand Up @@ -194,7 +195,7 @@ async def cascade(
src_shm_token: dict,
dst_shm_token: tuple[str, np.dtype],

func_name: str,
ns_path: NamespacePath,

zero_on_step: bool = False,
loglevel: Optional[str] = None,
Expand All @@ -213,10 +214,18 @@ async def cascade(
src = attach_shm_array(token=src_shm_token)
dst = attach_shm_array(readonly=False, token=dst_shm_token)

func: Callable = _fsp_builtins.get(func_name)
reg = _load_builtins()
lines = '\n'.join([f'{key.rpartition(":")[2]} => {key}' for key in reg])
log.info(
f'Registered FSP set:\n{lines}'
)
func: Fsp = reg.get(
NamespacePath(ns_path)
)

if not func:
# TODO: assume it's a func target path
raise ValueError('Unknown fsp target: {func_name}')
raise ValueError(f'Unknown fsp target: {ns_path}')

# open a data feed stream with requested broker
async with data.feed.maybe_open_feed(
Expand All @@ -231,11 +240,12 @@ async def cascade(

) as (feed, quote_stream):

profiler(f'{func_name}: feed up')
profiler(f'{func}: feed up')

assert src.token == feed.shm.token
# last_len = new_len = len(src.array)

func_name = func.__name__
async with (
trio.open_nursery() as n,
):
Expand All @@ -252,7 +262,7 @@ async def cascade(
src=src,
dst=dst,

func_name=func_name,
# func_name=func_name,
func=func
)

Expand Down
Loading