Skip to content

Commit

Permalink
Merge pull request #260 from pikers/dark_vlm
Browse files Browse the repository at this point in the history
Dark vlm
  • Loading branch information
goodboy authored Jan 30, 2022
2 parents 8fe2bd6 + 20a2428 commit a2698c7
Show file tree
Hide file tree
Showing 11 changed files with 368 additions and 179 deletions.
5 changes: 3 additions & 2 deletions piker/brokers/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@

import trio
import tractor
from tractor.experimental import msgpub
from async_generator import asynccontextmanager

from ..log import get_logger, get_console_log
Expand Down Expand Up @@ -98,7 +99,7 @@ class BrokerFeed:
)


@tractor.msg.pub(tasks=['stock', 'option'])
@msgpub(tasks=['stock', 'option'])
async def stream_poll_requests(
get_topics: Callable,
get_quotes: Coroutine,
Expand Down Expand Up @@ -293,7 +294,7 @@ async def start_quote_stream(

await stream_poll_requests(

# ``msg.pub`` required kwargs
# ``trionics.msgpub`` required kwargs
task_name=feed_type,
ctx=ctx,
topics=symbols,
Expand Down
12 changes: 10 additions & 2 deletions piker/brokers/ib.py
Original file line number Diff line number Diff line change
Expand Up @@ -1032,7 +1032,11 @@ async def get_client(
# https://interactivebrokers.github.io/tws-api/tick_types.html
tick_types = {
77: 'trade',
48: 'utrade',

# a "utrade" aka an off exchange "unreportable" (dark) vlm:
# https://interactivebrokers.github.io/tws-api/tick_types.html#rt_volume
48: 'dark_trade',

0: 'bsize',
1: 'bid',
2: 'ask',
Expand All @@ -1046,13 +1050,17 @@ async def get_client(
def normalize(
ticker: Ticker,
calc_price: bool = False

) -> dict:
# convert named tuples to dicts so we send usable keys
new_ticks = []
for tick in ticker.ticks:
if tick and not isinstance(tick, dict):
td = tick._asdict()
td['type'] = tick_types.get(td['tickType'], 'n/a')
td['type'] = tick_types.get(
td['tickType'],
'n/a',
)

new_ticks.append(td)

Expand Down
2 changes: 0 additions & 2 deletions piker/brokers/kraken.py
Original file line number Diff line number Diff line change
Expand Up @@ -553,8 +553,6 @@ async def subscribe(ws: wsproto.WSConnection):
quote = ohlc
topic = quote['symbol'].lower()

# XXX: format required by ``tractor.msg.pub``
# requires a ``Dict[topic: str, quote: dict]``
await send_chan.send({topic: quote})


Expand Down
12 changes: 8 additions & 4 deletions piker/data/_normalize.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,18 @@

def iterticks(
quote: dict,
types: Tuple[str] = ('trade', 'utrade'),
types: Tuple[str] = ('trade', 'dark_trade'),

) -> AsyncIterator:
"""Iterate through ticks delivered per quote cycle.
"""
'''
Iterate through ticks delivered per quote cycle.
'''
# print(f"{quote}\n\n")
ticks = quote.get('ticks', ())
if ticks:
for tick in ticks:
# print(f"{quote['symbol']}: {tick}")
if tick.get('type') in types:
ttype = tick.get('type')
if ttype in types:
yield tick
11 changes: 5 additions & 6 deletions piker/data/_sampling.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,12 +176,11 @@ async def sample_and_broadcast(
# TODO: ``numba`` this!
for sym, quote in quotes.items():

# TODO: in theory you can send the IPC msg *before*
# writing to the sharedmem array to decrease latency,
# however, that will require `tractor.msg.pub` support
# here or at least some way to prevent task switching
# at the yield such that the array write isn't delayed
# while another consumer is serviced..
# TODO: in theory you can send the IPC msg *before* writing
# to the sharedmem array to decrease latency, however, that
# will require at least some way to prevent task switching
# at the yield such that the array write isn't delayed while
# another consumer is serviced..

# start writing the shm buffer with appropriate
# trade data
Expand Down
163 changes: 163 additions & 0 deletions piker/fsp/_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship of pikers)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

'''
FSP (financial signal processing) apis.
'''

# TODO: things to figure the heck out:
# - how to handle non-plottable values (pyqtgraph has facility for this
# now in `arrayToQPath()`)
# - composition of fsps / implicit chaining syntax (we need an issue)

from __future__ import annotations
from functools import partial
from typing import (
Any,
Callable,
Awaitable,
Optional,
)

import numpy as np
import tractor
from tractor.msg import NamespacePath

from ..data._sharedmem import (
ShmArray,
maybe_open_shm_array,
)
from ..log import get_logger

log = get_logger(__name__)

# global fsp registry filled out by @fsp decorator below
_fsp_registry = {}


def _load_builtins() -> dict[tuple, Callable]:

# import to implicity trigger registration via ``@fsp``
from . import _momo # noqa
from . import _volume # noqa

return _fsp_registry


class Fsp:
'''
"Financial signal processor" decorator wrapped async function.
'''

# TODO: checkout the advanced features from ``wrapt``:
# - dynamic enable toggling,
# https://wrapt.readthedocs.io/en/latest/decorators.html#dynamically-disabling-decorators
# - custom object proxies, might be useful for implementing n-compose
# https://wrapt.readthedocs.io/en/latest/wrappers.html#custom-object-proxies
# - custom function wrappers,
# https://wrapt.readthedocs.io/en/latest/wrappers.html#custom-function-wrappers

def __init__(
self,
func: Callable[..., Awaitable],
*,
outputs: tuple[str] = (),
display_name: Optional[str] = None,
**config,

) -> None:

# TODO (maybe):
# - type introspection?
# - should we make this a wrapt object proxy?
self.func = func
self.__name__ = func.__name__ # XXX: must have func-object name

self.ns_path: tuple[str, str] = NamespacePath.from_ref(func)
self.outputs = outputs
self.config: dict[str, Any] = config

# register with declared set.
_fsp_registry[self.ns_path] = func

@property
def name(self) -> str:
return self.__name__

def __call__(
self,

# TODO: when we settle on py3.10 we should probably use the new
# type annots from pep 612:
# https://www.python.org/dev/peps/pep-0612/
# instance,
*args,
**kwargs
):
return self.func(*args, **kwargs)


def fsp(
wrapped=None,
*,
outputs: tuple[str] = (),
display_name: Optional[str] = None,
**config,

) -> Fsp:

if wrapped is None:
return partial(
Fsp,
outputs=outputs,
display_name=display_name,
**config,
)

return Fsp(wrapped, outputs=(wrapped.__name__,))


def maybe_mk_fsp_shm(
sym: str,
target: fsp,
readonly: bool = True,

) -> (ShmArray, bool):
'''
Allocate a single row shm array for an symbol-fsp pair if none
exists, otherwise load the shm already existing for that token.
'''
uid = tractor.current_actor().uid

# TODO: load output types from `Fsp`
# - should `index` be a required internal field?
fsp_dtype = np.dtype(
[('index', int)] +
[(field_name, float) for field_name in target.outputs]
)

key = f'{sym}.fsp.{target.name}.{".".join(uid)}'

shm, opened = maybe_open_shm_array(
key,
# TODO: create entry for each time frame
dtype=fsp_dtype,
readonly=True,
)
return shm, opened
Loading

0 comments on commit a2698c7

Please sign in to comment.