Skip to content

Commit

Permalink
Upgrade CSP to Perspective 3.0.3
Browse files Browse the repository at this point in the history
  • Loading branch information
sinistersnare committed Oct 7, 2024
1 parent 9da93f1 commit 7363855
Show file tree
Hide file tree
Showing 5 changed files with 142 additions and 65 deletions.
31 changes: 14 additions & 17 deletions csp/adapters/perspective.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import threading
from datetime import timedelta
from perspective import PerspectiveTornadoHandler, Server
from typing import Dict, Optional, Union

import csp
Expand All @@ -13,21 +14,20 @@
except ImportError:
raise ImportError("perspective adapter requires tornado package")


try:
from perspective import PerspectiveManager, Table as Table_, View as View_, __version__, set_threadpool_size
from perspective import Server, Table as Table_, View as View_, __version__, set_threadpool_size

MAJOR, MINOR, PATCH = map(int, __version__.split("."))
if (MAJOR, MINOR, PATCH) < (0, 6, 2):
raise ImportError("perspective adapter requires 0.6.2 or greater of the perspective-python package")
if (MAJOR, MINOR, PATCH) < (3, 1, 0):
raise ImportError("perspective adapter requires version 3.1.0 or greater of the perspective-python package")
except ImportError:
raise ImportError("perspective adapter requires 0.6.2 or greater of the perspective-python package")
raise ImportError("perspective adapter requires version 3.1.0 or greater of the perspective-python package")


# Run perspective update in a separate tornado loop
def perspective_thread(manager):
def perspective_thread(client):
loop = tornado.ioloop.IOLoop()
manager.set_loop_callback(loop.add_callback)
client.set_loop_callback(loop.add_callback)
loop.start()


Expand All @@ -54,19 +54,17 @@ def _apply_updates(table: object, data: {str: ts[object]}, throttle: timedelta):


@csp.node
def _launch_application(port: int, manager: object, stub: ts[object]):
def _launch_application(port: int, server: Server, stub: ts[object]):
with csp.state():
s_app = None
s_ioloop = None
s_iothread = None

with csp.start():
from perspective import PerspectiveTornadoHandler

s_app = tornado.web.Application(
[
# create a websocket endpoint that the client Javascript can access
(r"/websocket", PerspectiveTornadoHandler, {"manager": manager, "check_origin": True})
(r"/websocket", PerspectiveTornadoHandler, {"perspective_server": server, "check_origin": True})
],
websocket_ping_interval=15,
)
Expand Down Expand Up @@ -197,20 +195,19 @@ def create_table(self, name, limit=None, index=None):

def _instantiate(self):
set_threadpool_size(self._threadpool_size)
server = Server()
client = server.new_local_client()

manager = PerspectiveManager()

thread = threading.Thread(target=perspective_thread, kwargs=dict(manager=manager))
thread = threading.Thread(target=perspective_thread, kwargs=dict(client=client))
thread.daemon = True
thread.start()

for table_name, table in self._tables.items():
schema = {
k: v.tstype.typ if not issubclass(v.tstype.typ, csp.Enum) else str for k, v in table.columns.items()
}
ptable = Table(schema, limit=table.limit, index=table.index)
manager.host_table(table_name, ptable)
ptable = client.table(schema, name=table_name, limit=table.limit, index=table.index)

_apply_updates(ptable, table.columns, self._throttle)

_launch_application(self._port, manager, csp.const("stub"))
_launch_application(self._port, server, csp.const("stub"))
22 changes: 15 additions & 7 deletions csp/dataframe.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from datetime import datetime, timedelta
from datetime import date, datetime, timedelta
from typing import Dict, Optional

import csp.baselib
Expand Down Expand Up @@ -198,7 +198,7 @@ def to_pandas_ts(self, trigger, window, tindex=None, wait_all_valid=True):

return make_pandas(trigger, self._data, window, tindex, wait_all_valid)

def to_perspective(self, starttime: datetime, endtime: datetime = None, realtime: bool = False):
def to_perspective(self, client, starttime: datetime, endtime: datetime = None, realtime: bool = False):
import csp

try:
Expand Down Expand Up @@ -229,7 +229,7 @@ def join(self):
return perspective.PerspectiveWidget(df.ffill(), plugin="Y Line", columns=self._columns, group_by="index")

@csp.node
def apply_updates(table: object, data: {str: csp.ts[object]}, timecol: str, throttle: timedelta):
def apply_updates(table: object, data: Dict[str, csp.ts[object]], timecol: str, throttle: timedelta):
with csp.alarms():
alarm = csp.alarm(bool)
with csp.state():
Expand All @@ -240,7 +240,7 @@ def apply_updates(table: object, data: {str: csp.ts[object]}, timecol: str, thro

if csp.ticked(data):
s_buffer.append(dict(data.tickeditems()))
s_buffer[-1][timecol] = csp.now()
s_buffer[-1][timecol] = int(csp.now().timestamp() * 1000)

if csp.ticked(alarm):
if len(s_buffer) > 0:
Expand All @@ -250,9 +250,17 @@ def apply_updates(table: object, data: {str: csp.ts[object]}, timecol: str, thro
csp.schedule_alarm(alarm, throttle, True)

timecol = "time"
schema = {k: v.tstype.typ for k, v in self._data.items()}
schema[timecol] = datetime
table = perspective.Table(schema)
perspective_type_map = {
str: "string",
float: "float",
int: "integer",
date: "date",
datetime: "datetime",
bool: "boolean",
}
schema = {k: perspective_type_map[v.tstype.typ] for k, v in self._data.items()}
schema[timecol] = "datetime"
table = client.table(schema)
runner = csp.run_on_thread(
apply_updates,
table,
Expand Down
71 changes: 55 additions & 16 deletions csp/impl/pandas_perspective.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import pandas as pd
import pyarrow as pa
import pytz
from datetime import datetime, timedelta
from datetime import date, datetime, timedelta, timezone
from pandas.compat import set_function_name
from typing import Optional

Expand Down Expand Up @@ -40,7 +41,8 @@ def _apply_updates(
if throttle > timedelta(0):
csp.schedule_alarm(alarm, throttle, True)
s_has_time_col = time_col and time_col not in data.keys()
s_datetime_cols = set([c for c, t in table.schema().items() if t == datetime])
s_datetime_cols = set([c for c, t in table.schema().items() if t == "datetime"])
s_date_cols = set([c for c, t in table.schema().items() if t == "date"])

with csp.stop():
try:
Expand Down Expand Up @@ -81,14 +83,22 @@ def _apply_updates(
row[index_col] = idx
if s_has_time_col:
if localize:
row[time_col] = pytz.utc.localize(csp.now())
row[time_col] = int(pytz.utc.localize(csp.now()).timestamp() * 1000)
else:
row[time_col] = csp.now()
row[time_col] = int(pytz.utc.localize(csp.now()).timestamp() * 1000)
else:
row = new_rows[idx]

if localize and col in s_datetime_cols and value.tzinfo is None:
row[col] = pytz.utc.localize(value)
if col in s_date_cols:
row[col] = int(
datetime(year=value.year, month=value.month, day=value.day, tzinfo=timezone.utc).timestamp() * 1000
)

elif localize and col in s_datetime_cols:
if value.tzinfo is None:
row[col] = int(pytz.utc.localize(value).timestamp() * 1000)
else:
row[col] = int(pytz.utc.localize(value).timestamp() * 1000)
else:
row[col] = value

Expand Down Expand Up @@ -160,28 +170,41 @@ def __init__(
self._limit = limit
self._localize = localize

# TODO: we do not want 1 server per table, make a Client param?
self._psp_server = perspective.Server()
self._psp_client = self._psp_server.new_local_client()

self._basket = _frame_to_basket(data)
self._static_frame = data.csp.static_frame()
self._static_table = perspective.Table(self._static_frame)
self._static_table = self._psp_client.table(self._static_frame)
static_schema = self._static_table.schema()
# Since the index will be accounted for separately, remove the index from the static table schema,
# and re-enter it under index_col
raw_index_name = self._static_frame.index.name or "index"
index_type = static_schema.pop(raw_index_name)
schema = {index_col: index_type}
perspective_type_map = {
str: "string",
float: "float",
int: "integer",
date: "date",
datetime: "datetime",
bool: "boolean",
}

if time_col:
schema[time_col] = datetime
schema[time_col] = "datetime"
for col, series in data.items():
if is_csp_type(series):
schema[col] = series.dtype.subtype
schema[col] = perspective_type_map[series.dtype.subtype]
else:
schema[col] = static_schema[col]

if self._keep_history:
self._table = perspective.Table(schema, index=None, limit=limit)
self._table = self._psp_client.table(schema, index=None, limit=limit)
self._static_records = self._static_frame.to_dict(orient="index")
else:
self._table = perspective.Table(schema, index=self._index_col)
self._table = self._psp_client.table(schema, index=self._index_col)
self._static_frame.index = self._static_frame.index.rename(self._index_col)
self._table.update(self._static_frame)
self._static_records = None # No need to update dynamically
Expand Down Expand Up @@ -222,7 +245,7 @@ def run_historical(self, starttime, endtime):
index = self._index_col
if self._limit:
df = df.sort_values(self._time_col).tail(self._limit).reset_index(drop=True)
return perspective.Table(df.to_dict("series"), index=index)
return self._psp_client.table(df, index=index)

def run(self, starttime=None, endtime=timedelta(seconds=60), realtime=True, clear=False):
"""Run a graph that sends data to the table on the current thread.
Expand Down Expand Up @@ -280,7 +303,7 @@ def get_widget(self, **override_kwargs):
"sort": [[self._time_col, "desc"]],
}
else:
kwargs = {"columns": list(self._table.schema())}
kwargs = {"columns": list(self._table.columns())}
kwargs.update(override_kwargs)
return perspective.PerspectiveWidget(self._table, **kwargs)

Expand All @@ -294,14 +317,30 @@ def _method(self, **options):

@classmethod
def _add_view_methods(cls):
cls.to_df = cls._create_view_method(perspective.View.to_df)
cls.to_dict = cls._create_view_method(perspective.View.to_dict)
cls.to_json = cls._create_view_method(perspective.View.to_json)
cls.to_csv = cls._create_view_method(perspective.View.to_csv)
cls.to_numpy = cls._create_view_method(perspective.View.to_numpy)
cls.to_columns = cls._create_view_method(perspective.View.to_columns)
cls.to_arrow = cls._create_view_method(perspective.View.to_arrow)

def to_df(self, **kwargs):
ipc_bytes = self.to_arrow()
table = pa.ipc.open_stream(ipc_bytes).read_all()
df = pd.DataFrame(table.to_pandas(**kwargs))

# DAVIS: `pyarrow` does not force alphabetical order on categories, so
# we correct this here to make assertions pass. We can enforce this in
# Perspective at a performance hit/API complexity.
for column in df:
if df[column].dtype == "datetime64[ms]":
df[column] = df[column].astype("datetime64[ns]")
elif df[column].dtype == "category":
df[column] = df[column].cat.reorder_categories(df[column].cat.categories.sort_values())

if df.index.dtype == "category":
df.index = df.index.cat.reorder_categories(df.index.cat.categories.sort_values())

return df


CspPerspectiveTable._add_view_methods()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,13 @@ def test_perspective(self):
starttime = datetime(2021, 4, 26)
endtime = starttime + timedelta(seconds=10)

_ = df.to_perspective(starttime, endtime)
server = perspective.Server()
client = server.new_local_client()

_ = df.to_perspective(client, starttime, endtime)

# realtime
widget = df.to_perspective(datetime.utcnow(), endtime=timedelta(seconds=30), realtime=True)
widget = df.to_perspective(client, datetime.utcnow(), endtime=timedelta(seconds=30), realtime=True)
import time

time.sleep(1)
Expand Down
Loading

0 comments on commit 7363855

Please sign in to comment.