Skip to content

Commit

Permalink
Resolve conflicts between docs PRs; change example so it makes more s…
Browse files Browse the repository at this point in the history
…ense

Signed-off-by: Adam Glustein <[email protected]>
  • Loading branch information
AdamGlustein committed Aug 13, 2024
1 parent 1b6f6e5 commit b03e2e6
Show file tree
Hide file tree
Showing 8 changed files with 307 additions and 210 deletions.
4 changes: 2 additions & 2 deletions docs/wiki/_Sidebar.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ Notes for editors:
**Get Started (Tutorials)**

- [Installation](Installation)
- [First steps](First-Steps)
- [First Steps](First-Steps)
- [More with CSP](More-with-CSP)
- [Build a Basic App](More-with-CSP)
- [IO with Adapters](IO-with-Adapters)

**Concepts**
Expand All @@ -28,7 +29,6 @@ Notes for editors:
**How-to guides**

- [Use Statistical Nodes](Use-Statistical-Nodes)
- Use Adapters (coming soon)
- [Create Dynamic Baskets](Create-Dynamic-Baskets)
- Write Adapters:
- [Write Historical Input Adapters](Write-Historical-Input-Adapters)
Expand Down
149 changes: 149 additions & 0 deletions docs/wiki/get-started/Build-a-Basic-App.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
We have looked at the features of CSP nodes and graphs, as well as how to run an application using `csp.run`. In this tutorial, we will apply what we learned in [First Steps](First-Steps) and [More with CSP](More-with-CSP) to build a basic retail app which maintains an online shopping cart.
We will also introduce two important new concepts: the [`csp.Struct`](csp.Struct-API) data structure and multi-output nodes using `csp.Outputs`.

Our application will track a customer's shopping cart and apply a 10% discount for any items added to the cart in the first minute. Check out the complete code [here](examples/01_basics/e5_retail_cart.py).

## Structured data with `csp.Struct`

An individual item in a shopping cart consists of many fields; for example, the product's name, quantity and cost. The shopping cart itself may contain a list of these items as a field, plus a user ID or name. We also want to store updates to the shopping cart in an organized data structure, which has fields indicating the item in question and whether it was added or removed.

In `csp`, you can use a [`csp.Struct`](csp.Struct-API) to store typed fields together in a single data type. There are many advantages to using a `csp.Struct` instead of a standard Python dataclass. For example, the fields can be accessed as their own time series, ticking independently each time they update. Structs also have builtin conversion methods to JSON or dictionary objects. Due to their underlying C++ implementation, structs are also highly performant within `csp` compared to standard Python user-defined types.

```python
import csp
from typing import List

class Item(csp.Struct):
name: str
cost: float
qty: int

class Cart(csp.Struct):
user_id: int
items: List[Item]

class CartUpdate(csp.Struct):
item: Item
add: bool
```

Any number of fields on a struct can be set by a user; others will remain unset with a special value of `csp.UNSET`. For example, when we remove an item in `CartUpdate`, the cost will not be set.

## Track cart updates

Recall from [More with CSP](More-with-CSP) that we can store state variables in a `csp.node` using a `csp.state` block. We will create a node that tracks updates to a user's cart by storing the `Cart` struct as a state variable named `s_cart`.

> \[!TIP\]
> By convention, state variables are prefixed with `s_` for readability.
A CSP node can return multiple named outputs. To annotate a multi-output node, we use `csp.Outputs` syntax for the return type annotation. To tick out each named value, we use the `csp.output` function. After each update event, we will tick out the total value of the user's cart and the number of items present.

To apply a discount for all items added in the first minute, we can use an alarm. We discussed how to use a `csp.alarm` as an internal time-series in the [Poisson counter example](More-with-CSP). We will only update the cart when the user adds, removes or purchases items. We need to know what the active discount rate to apply is but we don't need to trigger an update when it changes. To achieve this, we make the alarm time-series `discount` a *passive* input.

A *passive* input is a time-series input that will not cause the node to execute when it ticks. When we access the input within the node, we always get its most recent value. The opposite of passive inputs are *active* inputs, which trigger a node to compute upon a tick. So far, every input we've worked with has been an active input. We will set the discount input to be passive at graph startup.

> \[!TIP\]
> By default, all `csp.ts` inputs are active. You can change the activity of an input at any point during execution by using `csp.make_passive` or `csp.make_active`.
```python
from csp import ts
from datetime import timedelta
from functools import reduce

@csp.node
def update_cart(event: ts[CartUpdate], user_id: int) -> csp.Outputs(total=ts[float], num_items=ts[int]):
"""
Track of the cart total and number of items.
"""
with csp.alarms():
discount = csp.alarm(float)

with csp.state():
# create an empty shopping cart
s_cart = Cart(user_id=user_id, items=[])

with csp.start():
csp.make_passive(discount)
csp.schedule_alarm(discount, timedelta(), 0.9) # 10% off for the first minute
csp.schedule_alarm(discount, timedelta(minutes=1), 1.0) # full price after!

if csp.ticked(event):
if event.add:
# apply current discount
event.item.cost *= discount
s_cart.items.append(event.item)
else:
# remove the given qty of the item
new_items = []
remaining_qty = event.item.qty
for item in s_cart.items:
if item.name == event.item.name:
if item.qty > remaining_qty:
item.qty -= remaining_qty
new_items.append(item)
else:
remaining_qty -= item.qty
else:
new_items.append(item)
s_cart.items = new_items

current_total = reduce(lambda a, b: a + b.cost * b.qty, s_cart.items, 0)
current_num_items = reduce(lambda a, b: a + b.qty, s_cart.items, 0)
csp.output(total=current_total, num_items=current_num_items)
```

## Create workflow graph

To create example cart updates, we will use a [`csp.curve`](Base-Adapters-API#cspcurve) like we have in previous examples. The `csp.curve` replays a list of events at specific times.

```python
st = datetime(2020, 1, 1)

@csp.graph
def my_graph():
# Example cart updates
events = csp.curve(
CartUpdate,
[
# Add 1 unit of X at $10 plus a 10% discount
(st + timedelta(seconds=15), CartUpdate(item=Item(name="X", cost=10, qty=1), add=True)),
# Add 2 units of Y at $15 each, plus a 10% discount
(st + timedelta(seconds=30), CartUpdate(item=Item(name="Y", cost=15, qty=2), add=True)),
# Remove 1 unit of Y
(st + timedelta(seconds=45), CartUpdate(item=Item(name="Y", qty=1), add=False)),
# Add 1 unit of Z at $20 but no discount, since our minute expired
(st + timedelta(seconds=75), CartUpdate(item=Item(name="Z", cost=20, qty=1), add=True)),
],
)

csp.print("Events", events)

current_cart = update_cart(events, user_id=42)

csp.print("Cart number of items", current_cart.num_items)
csp.print("Cart total", current_cart.total)
```

## Execute the graph

Execute the program and observe the outputs that our shopping cart provides.

```python
def main():
csp.run(my_graph, starttime=st)
```

```raw
2020-01-01 00:00:15 Events:CartUpdate( item=Item( name=X, cost=10.0, qty=1 ), add=True )
2020-01-01 00:00:15 Cart total:9.0
2020-01-01 00:00:15 Cart number of items:1
2020-01-01 00:00:30 Events:CartUpdate( item=Item( name=Y, cost=15.0, qty=2 ), add=True )
2020-01-01 00:00:30 Cart total:36.0
2020-01-01 00:00:30 Cart number of items:3
2020-01-01 00:00:45 Events:CartUpdate( item=Item( name=Y, cost=<unset>, qty=1 ), add=False )
2020-01-01 00:00:45 Cart total:22.5
2020-01-01 00:00:45 Cart number of items:2
2020-01-01 00:01:15 Events:CartUpdate( item=Item( name=Z, cost=20.0, qty=1 ), add=True )
2020-01-01 00:01:15 Cart total:42.5
2020-01-01 00:01:15 Cart number of items:3
```
18 changes: 10 additions & 8 deletions docs/wiki/get-started/First-Steps.md
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
CSP is a graph-based stream processing library, where you create directed graphs for real-time event streaming workflows.
In this introductory tutorial, you will write a CSP program to calculate the Bid-Ask Spread for specified `bid` and `ask` values.
csp is a graph-based stream processing library, where you create directed graphs for real-time event streaming workflows.
In this introductory tutorial, you will write a csp program to calculate the Bid-Ask Spread for specified `bid` and `ask` values.

> \[!TIP\]
> The bid–ask spread is the difference between the prices quoted for an immediate sale (ask) and an immediate purchase (bid) for stocks, futures contracts, options, or currency pairs in some auction scenario.
> ~ [Bid–ask spread on Wikipedia](https://en.wikipedia.org/wiki/Bid%E2%80%93ask_spread)
## Introduction to real-time event stream processing

Real-time data is continuously collected or updated data like IT log monitorings or weather recordings. Stream processing is a the practice of working with or analyzing this data in real time. Streaming applications are driven by updates or changes to the input values. In CSP, you refer to the input changes as "tick"s, and write the analysis workflow as a directed graph.
Real-time data is continuously collected or updated data like IT log monitorings or weather recordings. Stream processing is a the practice of working with or analyzing this data in real time. Streaming applications are driven by updates or changes to the input values. In csp, you refer to the input changes as "tick"s, and write the analysis workflow as a directed graph.

CSP programs are written in a functional-style, and consist of:
csp programs are written in a functional-style, and consist of:

- runtime components in the form of `csp.node` methods, and
- graph-building components in the form of `csp.graph` components.
Expand All @@ -23,11 +23,11 @@ import csp
from csp import ts
```

Data streams are often expressed as Time Series, and CSP defines a high-level `ts` type that denotes a Time Series input. Most CSP computation nodes require Time Series inputs.
Data streams are often expressed as Time Series, and csp defines a high-level `ts` type that denotes a Time Series input. Most csp computation nodes require Time Series inputs.

## Create a `csp.node` to calculate spread

`csp.node`s are the computational building-blocks of a CSP program. You can use the `@csp.node` decorator to create a node that calculates the bid-ask spread.
`csp.node`s are the computational building-blocks of a csp program. You can use the `@csp.node` decorator to create a node that calculates the bid-ask spread.

```python
@csp.node
Expand All @@ -39,14 +39,14 @@ def spread(bid: ts[float], ask: ts[float]) -> ts[float]:
The `bid` and `ask` values are expected to be Time Series values.

> \[!IMPORTANT\]
> CSP nodes are strictly typed, and the type is enforced by the C++ engine.
> csp nodes are strictly typed, and the type is enforced by the C++ engine.
This node needs to be executed each time the `ask` and `bid` values change, so we use the following built-in nodes:

- `csp.valid` - To ensure the values have ticked at least once, where a "tick" refers to any change in the input
- `csp.ticked(bid, ask)` - to check if ask OR bid have ticked since we last checked

CSP has several helpful nodes for common computations. Check out the API Reference documentation pages to learn more.
csp have several helpful nodes for common computations. Check out the API Reference documentation pages to learn more.

## Create the graph

Expand All @@ -72,6 +72,8 @@ def my_graph():

During runtime, only the inputs, `csp.node`s, and outputs will be active as data flows through the graph. The graph run is driven by input **ticks**.

[csp.const](Base-Adapters-API#cspconst)

> \[!TIP\]
> You can also create csp-friendly constant time series values with [csp.const](Base-Adapters-API#cspconst).
Expand Down
10 changes: 5 additions & 5 deletions docs/wiki/get-started/IO-with-Adapters.md
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
In [First Steps](First-Steps) and [More with csp](More-with-CSP) you created example/sample data for the streaming workflows. In real workflows, you use data stored in particular formats and storage spaces, that can be accessed directly or through an API.

CSP has [several built-in "adapters" to access certain types of data](Input-Output-Adapters-API), including Kafka and Parquet. CSP requires friendly (Time Series) data types, and the I/O adapters form the interface between the data types. You can also write your own adapters for other data types, check the corresponding how-to guides for more details.
csp has [several built-in "adapters" to access certain types of data](Input-Output-Adapters-API), including Kafka and Parquet. csp requires friendly (Time Series) data types, and the I/O adapters form the interface between the data types. You can also write your own adapters for other data types, check the corresponding how-to guides for more details.

In this tutorial, you write to, and read from, Parquet files on the local file system.

CSP has the `ParquetWriter` and `ParquetReader` adapters to stream data to and from Parquet files. Check out the complete [API in the Reference documentation](https://github.com/Point72/csp/wiki/Input-Output-Adapters-API#parquet).
csp has the `ParquetWriter` and `ParquetReader` adapters to stream data to and from Parquet files. Check out the complete [API in the Reference documentation](https://github.com/Point72/csp/wiki/Input-Output-Adapters-API#parquet).

> \[!IMPORTANT\]
> csp can handle historical and real-time data, and the CSP program remains similar in both cases.
> csp can handle historical and real-time data, and the csp program remains similar in both cases.
## Example

Expand All @@ -25,7 +25,7 @@ class Example(csp.Struct):

In a graph, create some sample values for `Example` and use `ParquetWriter` to stream it to a Parquet file.

1. The `timestamp_column_name` is how CSP preserves the timestamps. Since you need to read this file back into CSP, you can provide a column name. If this was the final output and the time stamp information is not required, you can provide `None`.
1. The `timestamp_column_name` is how csp preserves the timestamps. Since you need to read this file back into csp, you can provide a column name. If this was the final output and the time stamp information is not required, you can provide `None`.

1. You can provide optional configurations to `config` in the `ParquetOutputConfig` format (which can set `allow_overwrite`, , `batch_size`, `compression`, `write_arrow_binary`).

Expand Down Expand Up @@ -62,4 +62,4 @@ def read_struct(file_name: str):
csp.print("struct_all", struct_all)
```

Go through the complete example at [e1_parquet_write_read.py](https://github.com/Point72/csp/blob/main/examples/03_using_adapters/parquet/e1_parquet_write_read.py) and check out the the [API reference](Input-Output-Adapters-API#parquet) for more details.
Go through the complete example at [examples/03_using_adapters/parquet/e1_parquet_write_read.py](https://github.com/Point72/csp/blob/main/examples/03_using_adapters/parquet/e1_parquet_write_read.py) and check out the the [API reference](Input-Output-Adapters-API#parquet) for more details.
Loading

0 comments on commit b03e2e6

Please sign in to comment.