Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -670,9 +670,9 @@ jobs:
python-version:
- 3.9
package:
- "sqlalchemy>=2"
- "sqlalchemy<2"
- "numpy==1.19.5"
- "numpy==1.22.4" # Min supported version of pandas 2.2
- "perspective-python<3"

runs-on: ${{ matrix.os }}

Expand Down Expand Up @@ -709,11 +709,15 @@ jobs:

- name: Python Test Steps
run: make test TEST_ARGS="-k TestDBReader"
if: ${{ contains( 'sqlalchemy', matrix.package )}}
if: ${{ contains( matrix.package, 'sqlalchemy' )}}

- name: Python Test Steps
run: make test
if: ${{ contains( 'numpy', matrix.package )}}
if: ${{ contains( matrix.package, 'numpy' )}}

- name: Python Test Steps
run: make test TEST_ARGS="-k Perspective"
if: ${{ contains( matrix.package, 'perspective' )}}

###########################################################################################################
#.........................................................................................................#
Expand Down
2 changes: 1 addition & 1 deletion conda/dev-environment-unix.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ dependencies:
- librdkafka
- lz4-c
- mamba
- mdformat>=0.7.17,<0.8
- mdformat=0.7.17
- ninja
- numpy<2
- pandas
Expand Down
2 changes: 1 addition & 1 deletion conda/dev-environment-win.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ dependencies:
- lz4-c
- make
- mamba
- mdformat>=0.7.17,<0.8
- mdformat=0.7.17
- ninja
- numpy<2
- pandas
Expand Down
78 changes: 57 additions & 21 deletions csp/adapters/perspective.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
import threading
from datetime import timedelta
from perspective import Table as Table_, View as View_
from typing import Dict, Optional, Union

import csp
from csp import ts
from csp.impl.perspective_common import (
date_to_perspective,
datetime_to_perspective,
is_perspective3,
perspective_type_map,
)
from csp.impl.wiring.delayed_node import DelayedNodeWrapperDef

try:
Expand All @@ -14,20 +21,17 @@
raise ImportError("perspective adapter requires tornado package")


try:
from perspective import PerspectiveManager, Table as Table_, View as View_, __version__, set_threadpool_size

MAJOR, MINOR, PATCH = map(int, __version__.split("."))
if (MAJOR, MINOR, PATCH) < (0, 6, 2):
raise ImportError("perspective adapter requires 0.6.2 or greater of the perspective-python package")
except ImportError:
raise ImportError("perspective adapter requires 0.6.2 or greater of the perspective-python package")
_PERSPECTIVE_3 = is_perspective3()
if _PERSPECTIVE_3:
from perspective import Server
else:
from perspective import PerspectiveManager


# Run perspective update in a separate tornado loop
def perspective_thread(manager):
def perspective_thread(client):
loop = tornado.ioloop.IOLoop()
manager.set_loop_callback(loop.add_callback)
client.set_loop_callback(loop.add_callback)
loop.start()


Expand All @@ -38,12 +42,25 @@ def _apply_updates(table: object, data: {str: ts[object]}, throttle: timedelta):

with csp.state():
s_buffer = []
s_datetime_cols = set()
s_date_cols = set()

with csp.start():
csp.schedule_alarm(alarm, throttle, True)
if _PERSPECTIVE_3:
s_datetime_cols = set([c for c, t in table.schema().items() if t == "datetime"])
s_date_cols = set([c for c, t in table.schema().items() if t == "date"])

if csp.ticked(data):
s_buffer.append(dict(data.tickeditems()))
row = dict(data.tickeditems())
if _PERSPECTIVE_3:
for col, value in row.items():
if col in s_datetime_cols:
row[col] = datetime_to_perspective(row[col])
if col in s_date_cols:
row[col] = date_to_perspective(row[col])

s_buffer.append(row)

if csp.ticked(alarm):
if len(s_buffer) > 0:
Expand All @@ -54,19 +71,25 @@ def _apply_updates(table: object, data: {str: ts[object]}, throttle: timedelta):


@csp.node
def _launch_application(port: int, manager: object, stub: ts[object]):
def _launch_application(port: int, server: object, stub: ts[object]):
with csp.state():
s_app = None
s_ioloop = None
s_iothread = None

with csp.start():
from perspective import PerspectiveTornadoHandler
if _PERSPECTIVE_3:
from perspective.handlers.tornado import PerspectiveTornadoHandler

handler_args = {"perspective_server": server, "check_origin": True}
else:
from perspective import PerspectiveTornadoHandler

handler_args = {"manager": server, "check_origin": True}
s_app = tornado.web.Application(
[
# create a websocket endpoint that the client Javascript can access
(r"/websocket", PerspectiveTornadoHandler, {"manager": manager, "check_origin": True})
(r"/websocket", PerspectiveTornadoHandler, handler_args)
],
websocket_ping_interval=15,
)
Expand Down Expand Up @@ -196,21 +219,34 @@ def create_table(self, name, limit=None, index=None):
return table

def _instantiate(self):
set_threadpool_size(self._threadpool_size)

manager = PerspectiveManager()
if _PERSPECTIVE_3:
server = Server()
client = server.new_local_client()
thread = threading.Thread(target=perspective_thread, kwargs=dict(client=client))
else:
from perspective import set_threadpool_size

thread = threading.Thread(target=perspective_thread, kwargs=dict(manager=manager))
set_threadpool_size(self._threadpool_size)
manager = PerspectiveManager()
thread = threading.Thread(target=perspective_thread, kwargs=dict(manager=manager))
thread.daemon = True
thread.start()

for table_name, table in self._tables.items():
schema = {
k: v.tstype.typ if not issubclass(v.tstype.typ, csp.Enum) else str for k, v in table.columns.items()
}
ptable = Table(schema, limit=table.limit, index=table.index)
manager.host_table(table_name, ptable)
if _PERSPECTIVE_3:
psp_type_map = perspective_type_map()
schema = {col: psp_type_map.get(typ, typ) for col, typ in schema.items()}
ptable = client.table(schema, name=table_name, limit=table.limit, index=table.index)
else:
ptable = Table(schema, limit=table.limit, index=table.index)
manager.host_table(table_name, ptable)

_apply_updates(ptable, table.columns, self._throttle)

_launch_application(self._port, manager, csp.const("stub"))
if _PERSPECTIVE_3:
_launch_application(self._port, server, csp.const("stub"))
else:
_launch_application(self._port, manager, csp.const("stub"))
40 changes: 33 additions & 7 deletions csp/dataframe.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from datetime import datetime, timedelta
from datetime import date, datetime, timedelta
from packaging import version
from typing import Dict, Optional

import csp.baselib
Expand All @@ -12,6 +13,7 @@ class DataFrame:
def __init__(self, data: Optional[Dict] = None):
self._data = data or {}
self._columns = list(self._data.keys())
self._psp_client = None

@property
def columns(self):
Expand Down Expand Up @@ -204,10 +206,17 @@ def to_perspective(self, starttime: datetime, endtime: datetime = None, realtime
try:
import perspective

if version.parse(perspective.__version__) >= version.parse("3"):
_PERSPECTIVE_3 = True
from perspective.widget import PerspectiveWidget
else:
_PERSPECTIVE_3 = False
from perspective import PerspectiveWidget

global RealtimePerspectiveWidget
if RealtimePerspectiveWidget is None:

class RealtimePerspectiveWidget(perspective.PerspectiveWidget):
class RealtimePerspectiveWidget(PerspectiveWidget):
def __init__(self, engine_runner, *args, **kwargs):
super().__init__(*args, **kwargs)
self._runner = engine_runner
Expand All @@ -222,14 +231,14 @@ def join(self):
self._runner.join()

except ImportError:
raise ImportError("eval_perspective requires perspective-python installed")
raise ImportError("to_perspective requires perspective-python installed")

if not realtime:
df = self.to_pandas(starttime, endtime)
return perspective.PerspectiveWidget(df.ffill(), plugin="Y Line", columns=self._columns, group_by="index")
return PerspectiveWidget(df.ffill(), plugin="Y Line", columns=self._columns, group_by="index")

@csp.node
def apply_updates(table: object, data: {str: csp.ts[object]}, timecol: str, throttle: timedelta):
def apply_updates(table: object, data: Dict[str, csp.ts[object]], timecol: str, throttle: timedelta):
with csp.alarms():
alarm = csp.alarm(bool)
with csp.state():
Expand All @@ -240,7 +249,10 @@ def apply_updates(table: object, data: {str: csp.ts[object]}, timecol: str, thro

if csp.ticked(data):
s_buffer.append(dict(data.tickeditems()))
s_buffer[-1][timecol] = csp.now()
if _PERSPECTIVE_3:
s_buffer[-1][timecol] = int(csp.now().timestamp() * 1000)
else:
s_buffer[-1][timecol] = csp.now()

if csp.ticked(alarm):
if len(s_buffer) > 0:
Expand All @@ -252,7 +264,21 @@ def apply_updates(table: object, data: {str: csp.ts[object]}, timecol: str, thro
timecol = "time"
schema = {k: v.tstype.typ for k, v in self._data.items()}
schema[timecol] = datetime
table = perspective.Table(schema)
if _PERSPECTIVE_3:
perspective_type_map = {
str: "string",
float: "float",
int: "integer",
date: "date",
datetime: "datetime",
bool: "boolean",
}
schema = {col: perspective_type_map[typ] for col, typ in schema.items()}
if self._psp_client is None:
self._psp_client = perspective.Server().new_local_client()
table = self._psp_client.table(schema)
else:
table = perspective.Table(schema)
runner = csp.run_on_thread(
apply_updates,
table,
Expand Down
Loading
Loading