diff --git a/csp/adapters/perspective.py b/csp/adapters/perspective.py index aaa17583c..1e82c2e25 100644 --- a/csp/adapters/perspective.py +++ b/csp/adapters/perspective.py @@ -1,5 +1,6 @@ import threading from datetime import timedelta +from perspective import PerspectiveTornadoHandler, Server from typing import Dict, Optional, Union import csp @@ -13,21 +14,20 @@ except ImportError: raise ImportError("perspective adapter requires tornado package") - try: - from perspective import PerspectiveManager, Table as Table_, View as View_, __version__, set_threadpool_size + from perspective import Server, Table as Table_, View as View_, __version__, set_threadpool_size MAJOR, MINOR, PATCH = map(int, __version__.split(".")) - if (MAJOR, MINOR, PATCH) < (0, 6, 2): - raise ImportError("perspective adapter requires 0.6.2 or greater of the perspective-python package") + if (MAJOR, MINOR, PATCH) < (3, 1, 0): + raise ImportError("perspective adapter requires version 3.1.0 or greater of the perspective-python package") except ImportError: - raise ImportError("perspective adapter requires 0.6.2 or greater of the perspective-python package") + raise ImportError("perspective adapter requires version 3.1.0 or greater of the perspective-python package") # Run perspective update in a separate tornado loop -def perspective_thread(manager): +def perspective_thread(client): loop = tornado.ioloop.IOLoop() - manager.set_loop_callback(loop.add_callback) + client.set_loop_callback(loop.add_callback) loop.start() @@ -54,19 +54,17 @@ def _apply_updates(table: object, data: {str: ts[object]}, throttle: timedelta): @csp.node -def _launch_application(port: int, manager: object, stub: ts[object]): +def _launch_application(port: int, server: Server, stub: ts[object]): with csp.state(): s_app = None s_ioloop = None s_iothread = None with csp.start(): - from perspective import PerspectiveTornadoHandler - s_app = tornado.web.Application( [ # create a websocket endpoint that the client Javascript can access - (r"/websocket", PerspectiveTornadoHandler, {"manager": manager, "check_origin": True}) + (r"/websocket", PerspectiveTornadoHandler, {"perspective_server": server, "check_origin": True}) ], websocket_ping_interval=15, ) @@ -197,10 +195,10 @@ def create_table(self, name, limit=None, index=None): def _instantiate(self): set_threadpool_size(self._threadpool_size) + server = Server() + client = server.new_local_client() - manager = PerspectiveManager() - - thread = threading.Thread(target=perspective_thread, kwargs=dict(manager=manager)) + thread = threading.Thread(target=perspective_thread, kwargs=dict(client=client)) thread.daemon = True thread.start() @@ -208,9 +206,8 @@ def _instantiate(self): schema = { k: v.tstype.typ if not issubclass(v.tstype.typ, csp.Enum) else str for k, v in table.columns.items() } - ptable = Table(schema, limit=table.limit, index=table.index) - manager.host_table(table_name, ptable) + ptable = client.table(schema, name=table_name, limit=table.limit, index=table.index) _apply_updates(ptable, table.columns, self._throttle) - _launch_application(self._port, manager, csp.const("stub")) + _launch_application(self._port, server, csp.const("stub")) diff --git a/csp/dataframe.py b/csp/dataframe.py index 8aba3814c..4229b8944 100644 --- a/csp/dataframe.py +++ b/csp/dataframe.py @@ -1,4 +1,4 @@ -from datetime import datetime, timedelta +from datetime import date, datetime, timedelta from typing import Dict, Optional import csp.baselib @@ -198,7 +198,7 @@ def to_pandas_ts(self, trigger, window, tindex=None, wait_all_valid=True): return make_pandas(trigger, self._data, window, tindex, wait_all_valid) - def to_perspective(self, starttime: datetime, endtime: datetime = None, realtime: bool = False): + def to_perspective(self, client, starttime: datetime, endtime: datetime = None, realtime: bool = False): import csp try: @@ -229,7 +229,7 @@ def join(self): return perspective.PerspectiveWidget(df.ffill(), plugin="Y Line", columns=self._columns, group_by="index") @csp.node - def apply_updates(table: object, data: {str: csp.ts[object]}, timecol: str, throttle: timedelta): + def apply_updates(table: object, data: Dict[str, csp.ts[object]], timecol: str, throttle: timedelta): with csp.alarms(): alarm = csp.alarm(bool) with csp.state(): @@ -240,7 +240,7 @@ def apply_updates(table: object, data: {str: csp.ts[object]}, timecol: str, thro if csp.ticked(data): s_buffer.append(dict(data.tickeditems())) - s_buffer[-1][timecol] = csp.now() + s_buffer[-1][timecol] = int(csp.now().timestamp() * 1000) if csp.ticked(alarm): if len(s_buffer) > 0: @@ -250,9 +250,17 @@ def apply_updates(table: object, data: {str: csp.ts[object]}, timecol: str, thro csp.schedule_alarm(alarm, throttle, True) timecol = "time" - schema = {k: v.tstype.typ for k, v in self._data.items()} - schema[timecol] = datetime - table = perspective.Table(schema) + perspective_type_map = { + str: "string", + float: "float", + int: "integer", + date: "date", + datetime: "datetime", + bool: "boolean", + } + schema = {k: perspective_type_map[v.tstype.typ] for k, v in self._data.items()} + schema[timecol] = "datetime" + table = client.table(schema) runner = csp.run_on_thread( apply_updates, table, diff --git a/csp/impl/pandas_perspective.py b/csp/impl/pandas_perspective.py index 92b4f4e9d..e4c5e8766 100644 --- a/csp/impl/pandas_perspective.py +++ b/csp/impl/pandas_perspective.py @@ -1,6 +1,7 @@ import pandas as pd +import pyarrow as pa import pytz -from datetime import datetime, timedelta +from datetime import date, datetime, timedelta, timezone from pandas.compat import set_function_name from typing import Optional @@ -40,7 +41,8 @@ def _apply_updates( if throttle > timedelta(0): csp.schedule_alarm(alarm, throttle, True) s_has_time_col = time_col and time_col not in data.keys() - s_datetime_cols = set([c for c, t in table.schema().items() if t == datetime]) + s_datetime_cols = set([c for c, t in table.schema().items() if t == "datetime"]) + s_date_cols = set([c for c, t in table.schema().items() if t == "date"]) with csp.stop(): try: @@ -81,14 +83,22 @@ def _apply_updates( row[index_col] = idx if s_has_time_col: if localize: - row[time_col] = pytz.utc.localize(csp.now()) + row[time_col] = int(pytz.utc.localize(csp.now()).timestamp() * 1000) else: - row[time_col] = csp.now() + row[time_col] = int(pytz.utc.localize(csp.now()).timestamp() * 1000) else: row = new_rows[idx] - if localize and col in s_datetime_cols and value.tzinfo is None: - row[col] = pytz.utc.localize(value) + if col in s_date_cols: + row[col] = int( + datetime(year=value.year, month=value.month, day=value.day, tzinfo=timezone.utc).timestamp() * 1000 + ) + + elif localize and col in s_datetime_cols: + if value.tzinfo is None: + row[col] = int(pytz.utc.localize(value).timestamp() * 1000) + else: + row[col] = int(pytz.utc.localize(value).timestamp() * 1000) else: row[col] = value @@ -160,28 +170,41 @@ def __init__( self._limit = limit self._localize = localize + # TODO: we do not want 1 server per table, make a Client param? + self._psp_server = perspective.Server() + self._psp_client = self._psp_server.new_local_client() + self._basket = _frame_to_basket(data) self._static_frame = data.csp.static_frame() - self._static_table = perspective.Table(self._static_frame) + self._static_table = self._psp_client.table(self._static_frame) static_schema = self._static_table.schema() # Since the index will be accounted for separately, remove the index from the static table schema, # and re-enter it under index_col raw_index_name = self._static_frame.index.name or "index" index_type = static_schema.pop(raw_index_name) schema = {index_col: index_type} + perspective_type_map = { + str: "string", + float: "float", + int: "integer", + date: "date", + datetime: "datetime", + bool: "boolean", + } + if time_col: - schema[time_col] = datetime + schema[time_col] = "datetime" for col, series in data.items(): if is_csp_type(series): - schema[col] = series.dtype.subtype + schema[col] = perspective_type_map[series.dtype.subtype] else: schema[col] = static_schema[col] if self._keep_history: - self._table = perspective.Table(schema, index=None, limit=limit) + self._table = self._psp_client.table(schema, index=None, limit=limit) self._static_records = self._static_frame.to_dict(orient="index") else: - self._table = perspective.Table(schema, index=self._index_col) + self._table = self._psp_client.table(schema, index=self._index_col) self._static_frame.index = self._static_frame.index.rename(self._index_col) self._table.update(self._static_frame) self._static_records = None # No need to update dynamically @@ -222,7 +245,7 @@ def run_historical(self, starttime, endtime): index = self._index_col if self._limit: df = df.sort_values(self._time_col).tail(self._limit).reset_index(drop=True) - return perspective.Table(df.to_dict("series"), index=index) + return self._psp_client.table(df, index=index) def run(self, starttime=None, endtime=timedelta(seconds=60), realtime=True, clear=False): """Run a graph that sends data to the table on the current thread. @@ -280,7 +303,7 @@ def get_widget(self, **override_kwargs): "sort": [[self._time_col, "desc"]], } else: - kwargs = {"columns": list(self._table.schema())} + kwargs = {"columns": list(self._table.columns())} kwargs.update(override_kwargs) return perspective.PerspectiveWidget(self._table, **kwargs) @@ -294,14 +317,30 @@ def _method(self, **options): @classmethod def _add_view_methods(cls): - cls.to_df = cls._create_view_method(perspective.View.to_df) - cls.to_dict = cls._create_view_method(perspective.View.to_dict) cls.to_json = cls._create_view_method(perspective.View.to_json) cls.to_csv = cls._create_view_method(perspective.View.to_csv) - cls.to_numpy = cls._create_view_method(perspective.View.to_numpy) cls.to_columns = cls._create_view_method(perspective.View.to_columns) cls.to_arrow = cls._create_view_method(perspective.View.to_arrow) + def to_df(self, **kwargs): + ipc_bytes = self.to_arrow() + table = pa.ipc.open_stream(ipc_bytes).read_all() + df = pd.DataFrame(table.to_pandas(**kwargs)) + + # DAVIS: `pyarrow` does not force alphabetical order on categories, so + # we correct this here to make assertions pass. We can enforce this in + # Perspective at a performance hit/API complexity. + for column in df: + if df[column].dtype == "datetime64[ms]": + df[column] = df[column].astype("datetime64[ns]") + elif df[column].dtype == "category": + df[column] = df[column].cat.reorder_categories(df[column].cat.categories.sort_values()) + + if df.index.dtype == "category": + df.index = df.index.cat.reorder_categories(df.index.cat.categories.sort_values()) + + return df + CspPerspectiveTable._add_view_methods() diff --git a/csp/tests/impl/test_dateframe.py b/csp/tests/impl/test_dataframe.py similarity index 95% rename from csp/tests/impl/test_dateframe.py rename to csp/tests/impl/test_dataframe.py index 66b4b2401..00aeef879 100644 --- a/csp/tests/impl/test_dateframe.py +++ b/csp/tests/impl/test_dataframe.py @@ -134,10 +134,13 @@ def test_perspective(self): starttime = datetime(2021, 4, 26) endtime = starttime + timedelta(seconds=10) - _ = df.to_perspective(starttime, endtime) + server = perspective.Server() + client = server.new_local_client() + + _ = df.to_perspective(client, starttime, endtime) # realtime - widget = df.to_perspective(datetime.utcnow(), endtime=timedelta(seconds=30), realtime=True) + widget = df.to_perspective(client, datetime.utcnow(), endtime=timedelta(seconds=30), realtime=True) import time time.sleep(1) diff --git a/csp/tests/impl/test_pandas_perspective.py b/csp/tests/impl/test_pandas_perspective.py index 528ea8166..a359008b1 100644 --- a/csp/tests/impl/test_pandas_perspective.py +++ b/csp/tests/impl/test_pandas_perspective.py @@ -1,5 +1,6 @@ import numpy as np import pandas as pd +import pyarrow as pa import unittest from datetime import date, datetime, timedelta from packaging import version @@ -17,9 +18,28 @@ raise unittest.SkipTest("skipping perspective tests") +def view_to_df(view): + ipc_bytes = view.to_arrow() + table = pa.ipc.open_stream(ipc_bytes).read_all() + table = table.drop(["index"]) # TODO: not sure why this isnt dropped in one of the tests. + df = pd.DataFrame(table.to_pandas()) + + for column in df: + if df[column].dtype == "datetime64[ms]": + df[column] = df[column].astype("datetime64[ns]") + elif df[column].dtype == "category": + df[column] = df[column].cat.reorder_categories(df[column].cat.categories.sort_values()) + + if df.index.dtype == "category": + df.index = df.index.cat.reorder_categories(df.index.cat.categories.sort_values()) + + return df + + class TestCspPerspectiveTable(unittest.TestCase): def setUp(self) -> None: - self.idx = ["ABC", "DEF", "GJH"] + cat = pd.CategoricalDtype(["ABC", "DEF", "GJH"]) + self.idx = pd.Series(["ABC", "DEF", "GJH"], dtype=cat) bid = pd.Series( [csp.const(99.0), csp.timer(timedelta(seconds=1), 103.0), np.nan], dtype=TsDtype(float), index=self.idx ) @@ -28,8 +48,8 @@ def setUp(self) -> None: dtype=TsDtype(float), index=self.idx, ) - sector = ["X", "Y", "X"] - name = [s + " Corp" for s in self.idx] + sector = pd.Series(["X", "Y", "X"], dtype="category", index=self.idx) + name = pd.Series([s + " Corp" for s in self.idx], dtype="category", index=self.idx) self.df = pd.DataFrame( { "name": name, @@ -40,7 +60,9 @@ def setUp(self) -> None: ) def check_table_history(self, table, target, index_col, time_col): - df = table.to_df().set_index([index_col, time_col]) + df = table.to_df(strings_to_categorical=True) + + df = df.set_index([index_col, time_col]) df.index.set_names([None, None], inplace=True) df = df.sort_index() df = df.convert_dtypes() @@ -91,6 +113,7 @@ def test_run_multiple(self): df2 = table.to_df() self.assertEqual(len(df2), 2 * len(df1)) + # DAVIS: Hitting the index bug (perspective bug #2756) def test_limit(self): table = CspPerspectiveTable(self.df, limit=3) table.start(starttime=datetime(2020, 1, 1), endtime=timedelta(seconds=20)) @@ -106,9 +129,13 @@ def test_limit(self): out = out.sort_values(["index", "timestamp"]).reset_index(drop=True).convert_dtypes() if version.parse(perspective.__version__) >= version.parse("1.0.3"): pd.testing.assert_frame_equal(out, target) + pd.testing.assert_frame_equal(out, target) self.assertRaises(ValueError, CspPerspectiveTable, self.df, keep_history=False, limit=3) + # DAVIS: perspective pyarrow output is using milliseconds + # psp-pandas, now built on pyarrow is using ms + # But this test is expected nanoseconds. def test_localize(self): # In this mode, the timestamp column should be in "local time", not utc self.df["datetime_col"] = pd.Series( @@ -120,15 +147,14 @@ def test_localize(self): table.start(starttime=datetime(2020, 1, 1), endtime=timedelta(seconds=4)) table.join() out = table.to_df() - import dateutil - utc = out["my_timestamp"].dt.tz_localize(dateutil.tz.tzlocal()).dt.tz_convert("UTC").dt.tz_localize(None) + utc = out["my_timestamp"] target = pd.Series( ["2020-01-01", "2020-01-01 00:00:04", "2020-01-01"], dtype="datetime64[ns]", name="my_timestamp" ) pd.testing.assert_series_equal(utc, target) - utc = out["datetime_col"].dt.tz_localize(dateutil.tz.tzlocal()).dt.tz_convert("UTC").dt.tz_localize(None) + utc = out["datetime_col"] target = pd.Series(["2020-01-01 01", "2020-01-01 02", np.nan], dtype="datetime64[ns]", name="datetime_col") pd.testing.assert_series_equal(utc, target) @@ -158,23 +184,25 @@ def test_run_types(self): s_int = pd.Series([csp.const(0) for _ in self.idx], dtype=TsDtype(int), index=self.idx) s_float = pd.Series([csp.const(0.1) for _ in self.idx], dtype=TsDtype(float), index=self.idx) s_bool = pd.Series([csp.const(True) for _ in self.idx], dtype=TsDtype(bool), index=self.idx) - s_date = pd.Series([csp.const(date.min) for _ in self.idx], dtype=TsDtype(date), index=self.idx) + s_date = pd.Series( + [csp.const(date(year=1996, month=9, day=9)) for _ in self.idx], dtype=TsDtype(date), index=self.idx + ) self.df = pd.DataFrame({"s_str": s_str, "s_int": s_int, "s_float": s_float, "s_bool": s_bool, "s_date": s_date}) - table = CspPerspectiveTable(self.df) - table.start(starttime=datetime(2020, 1, 1), endtime=timedelta(seconds=0)) + table.start(starttime=datetime(2020, 1, 1), endtime=timedelta(seconds=1)) table.join() df = table.to_df().convert_dtypes() if version.parse(pd.__version__) >= version.parse("1.2.0"): floatDtype = pd.Float64Dtype() else: floatDtype = np.dtype("float64") + dtypes = pd.Series( { - "index": pd.StringDtype(), + "index": pd.CategoricalDtype(["ABC", "DEF", "GJH"]), "timestamp": np.dtype("datetime64[ns]"), - "s_str": pd.StringDtype(), - "s_int": pd.Int64Dtype(), + "s_str": pd.CategoricalDtype(["a"]), + "s_int": pd.Int32Dtype(), "s_float": floatDtype, "s_bool": pd.BooleanDtype(), "s_date": np.dtype("O"), @@ -182,6 +210,7 @@ def test_run_types(self): ) pd.testing.assert_series_equal(df.dtypes, dtypes) + # DAVIS: Hitting the index bug (perspective bug #2756) def test_run_historical(self): index_col = "my_index" time_col = "my_timestamp" @@ -190,25 +219,23 @@ def test_run_historical(self): table.start(starttime=datetime(2020, 1, 1), endtime=timedelta(seconds=4)) table.join() target = table.to_df().sort_values([index_col, time_col]).reset_index(drop=True) - pd.testing.assert_frame_equal(out.view().to_df(), target) - + pd.testing.assert_frame_equal(view_to_df(out.view()), target) table = CspPerspectiveTable(self.df, index_col=index_col, time_col=time_col, keep_history=False) out = table.run_historical(starttime=datetime(2020, 1, 1), endtime=timedelta(seconds=4)) table.start(starttime=datetime(2020, 1, 1), endtime=timedelta(seconds=4)) table.join() target = table.to_df() - pd.testing.assert_frame_equal(out.view().to_df(), target) + pd.testing.assert_frame_equal(view_to_df(out.view()), target) table = CspPerspectiveTable(self.df, index_col=index_col, time_col=time_col, limit=3) out = table.run_historical(starttime=datetime(2020, 1, 1), endtime=timedelta(seconds=20)) - out = out.view().to_df() + out = view_to_df(out.view()) self.assertEqual(len(out), 3) table.start(starttime=datetime(2020, 1, 1), endtime=timedelta(seconds=20)) table.join() target = table.to_df().sort_values([index_col, time_col]).reset_index(drop=True).tail(3) - if version.parse(perspective.__version__) >= version.parse("1.0.3"): - pd.testing.assert_frame_equal(out.sort_values([index_col, time_col]), target) + pd.testing.assert_frame_equal(out.sort_values([index_col, time_col]), target) def test_real_time(self): table = CspPerspectiveTable(self.df, keep_history=False) @@ -239,6 +266,8 @@ def test_empty(self): table.start(starttime=datetime(2020, 1, 1), endtime=timedelta(seconds=2)) table.join() df2 = table.to_df() + # this static_frame() has object dtypes + self.df.csp.static_frame().reset_index().info(verbose=True) pd.testing.assert_frame_equal(df2, self.df.csp.static_frame().reset_index()) def test_get_widget(self): @@ -256,10 +285,11 @@ def test_get_widget(self): table = CspPerspectiveTable(self.df, index_col="my_index", time_col=None, keep_history=False) widget = table.get_widget() self.assertIsInstance(widget, perspective.PerspectiveWidget) - self.assertEqual(widget.columns, ["my_index", "name", "bid", "ask", "sector"]) - self.assertEqual(widget.aggregates, {}) - self.assertEqual(widget.group_by, []) - self.assertEqual(widget.sort, []) + layout = widget.save() + self.assertEqual(layout["columns"], ["my_index", "name", "bid", "ask", "sector"]) + self.assertEqual(layout["aggregates"], {}) + self.assertEqual(layout["group_by"], []) + self.assertEqual(layout["sort"], []) table = CspPerspectiveTable(self.df) widget = table.get_widget(sort=[["foo", "asc"]], theme="Material Dark")