Skip to content

Commit

Permalink
Document in detail how events at duplicate timestamps are handled (#348)
Browse files Browse the repository at this point in the history
Signed-off-by: Adam Glustein <Adam.Glustein@Point72.com>
  • Loading branch information
AdamGlustein authored Aug 5, 2024
1 parent 4153d01 commit a386b6c
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 2 deletions.
1 change: 1 addition & 0 deletions docs/wiki/api-references/Base-Nodes-API.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ csp.unroll(x: ts[['T']]) → ts['T']
Given a timeseries of a *list* of values, unroll will "unroll" the values in the list into a timeseries of the elements.
`unroll` will ensure to preserve the order across all list ticks.
Ticks will be unrolled in subsequent engine cycles.
For a detailed explanation of this behavior, see the documentation on [duplicate timestamps](Execution-Modes#handling-duplicate-timestamps).

## `csp.collect`

Expand Down
2 changes: 1 addition & 1 deletion docs/wiki/concepts/Common-Mistakes.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ from typing import List
def next_movie_showing(show_times: ts[List[datetime]]) -> ts[datetime]:
next_showing = None
for time in show_times:
if time >= csp.now(): # list may include some shows today that have already past, so let's filter those out
if time >= datetime.now(): # list may include some shows today that have already past, so let's filter those out
if next_showing is None or time < next_showing:
next_showing = time

Expand Down
69 changes: 68 additions & 1 deletion docs/wiki/concepts/Execution-Modes.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@ All inputs in simulation are driven off the provided timestamped data of its inp
In realtime mode, the engine runs in wallclock time as of "now".
Realtime engines can get data from realtime adapters which source data on separate threads and pass them through to the engine (ie think of activeMQ events happening on an activeMQ thread and being passed along to the engine in "realtime").

Since engines can run in both simulated and realtime mode, users should **always** use **`csp.now()`** to get the current time in `csp.node`s.
Since engines can run in both simulated and realtime mode, users should **always** use **`csp.now()`** to get the current time in a `csp.node`.

## Table of Contents

- [Table of Contents](#table-of-contents)
- [Simulation Mode](#simulation-mode)
- [Realtime Mode](#realtime-mode)
- [csp.PushMode](#csppushmode)
- [Handling Duplicate Timestamps](#handling-duplicate-timestamps)
- [Realtime Group Event Synchronization](#realtime-group-event-synchronization)

## Simulation Mode
Expand Down Expand Up @@ -50,6 +51,72 @@ When consuming data from input adapters there are three choices on how one can c
| **BURST** | Simulation | all ticks from input source with duplicate timestamps (on the same timeseries) will tick once with a list of all values |
| | Realtime | all ticks that occurred since previous engine cycle will tick once with a list of all the values |

## Handling duplicate timestamps

In `csp`, there can be multiple engine cycles that occur at the same engine time. This is often the case when using nodes with internal alarms (e.g. [`csp.unroll`](Base-Nodes-API#cspunroll)) or using feedback edges ([`csp.feedback`](Feedback-and-Delayed-Edge#cspfeedback)).
If multiple events are scheduled at the same timestamp on a single time-series edge, they will be executed on separate cycles *in the order* they were scheduled. For example, consider the code snippet below:

```python
import csp
from csp import ts
from datetime import datetime, timedelta

@csp.node
def ticks_n_times(x: ts[int], n: int) -> ts[int]:
# Ticks out a value n times, incrementing it each time
with csp.alarms():
alarm = csp.alarm(int)

if csp.ticked(x):
for i in range(n):
csp.schedule_alarm(alarm, timedelta(), x+i)

if csp.ticked(alarm):
return alarm

@csp.graph
def duplicate_timestamps():
v = csp.const(1)
csp.print('ticks_once', ticks_n_times(v, 1))
csp.print('ticks_twice', ticks_n_times(v, 2))
csp.print('ticks_thrice', ticks_n_times(v, 3))

csp.run(duplicate_timestamps, starttime=datetime(2020,1,1))
```

When we run this graph, the output is:

```raw
2020-01-01 00:00:00 ticks_once:1
2020-01-01 00:00:00 ticks_twice:1
2020-01-01 00:00:00 ticks_thrice:1
2020-01-01 00:00:00 ticks_twice:2
2020-01-01 00:00:00 ticks_thrice:2
2020-01-01 00:00:00 ticks_thrice:3
```

A real life example is when using `csp.unroll` to tick out a list of values on separate engine cycles. If we were to use `csp.sample` on the output, we would get the *first* value that is unrolled at each timestamp. Why?
The event that is scheduled on the sampling timer is its first (and only) event at that time; thus, it is executed on the first engine cycle, and samples the first unrolled value.

```python
def sampling_unroll():
u = csp.unroll(csp.const.using(T=[int])([1, 2, 3]))
s = csp.sample(csp.const(True), u)
csp.print('unrolled', u)
csp.print('sampled', s)

csp.run(sampling_unroll, starttime=datetime(2020,1,1))
```

Output:

```raw
2020-01-01 00:00:00 unrolled:1
2020-01-01 00:00:00 sampled:1
2020-01-01 00:00:00 unrolled:2
2020-01-01 00:00:00 unrolled:3
```

## Realtime Group Event Synchronization

The CSP framework supports properly synchronizing events across multiple timeseries that are sourced from the same realtime adapter.
Expand Down

0 comments on commit a386b6c

Please sign in to comment.