Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(processing_engine): Runtime and Write-back improvements #25672

Merged
merged 1 commit into from
Dec 18, 2024

Conversation

jacksonrnewhouse
Copy link
Contributor

This makes two major changes, running plugins in their own tasks and enabling writing back line protocol to the WriteBuffer.

Running Plugins as Separate Tasks

Each plugin now runs in its own task, started either when created through the API or when the system is started. It is connected to the rest of the system through two mechanisms:

  • A broadcast channel whose sender is owned by the QueryableBuffer and whose receiver it polls from.
  • An Arc to enable write-backs. This also gives access to the catalog and much of the rest of the db as we continue to build out the API.
    Right now the only message being sent is Arc<WalContents>, but I expect the PluginEvent enum to be expanded both for other triggering events and control messages, e.g. Shutdown, Status, etc.

Write-Back Line Protocol

The running pythons have a reference to the WriteBuffer, and this PR also adds a simple receiver for python to write line protocols to with the insert_line_protocol call. Everything is buffered per PluginEvent, and data is copied several times, so there is a lot of room for improvement.

Example REST calls

Creating a Plugin:

curl -X POST 'http://127.0.0.1:8181/api/v3/configure/processing_engine_plugin' \ 
-H 'Content-Type: application/json' \
-d '{
    "db": "load_test",
    "plugin_name": "counter_output",
    "plugin_type": "wal_rows",                 
    "function_name": "count_points",
    "code": "import time\nfrom collections import defaultdict\nfrom influxdb_client_3.write_client.client.write.point import Point\n\ndef count_points(iterator, output):\n    start_time = time.time()\n    table_counts = defaultdict(int)\n    \n    while True:\n        point = iterator.next_point()\n        if point is None:\n            break\n            \n        measurement = point._name\n        table_counts[measurement] += 1\n    \n    for table_name, count in table_counts.items():\n        stats_point = Point(\"table_counts\")\n        stats_point.tag(\"table_name\", table_name)\n        stats_point.field(\"row_count\", count)\n        output.insert_line_protocol(str(stats_point))\n    \n    end_time = time.time()\n    duration = end_time - start_time\n    \n    print(f\"Processing completed in {duration:.2f} seconds\")"
}'

Creating a Trigger:

 curl -X POST 'http://127.0.0.1:8181/api/v3/configure/processing_engine_trigger' \
-H 'Content-Type: application/json' \
-d '{
    "db": "load_test",
    "plugin_name": "counter_output",
    "trigger_name": "counter_output_trigger",
    "trigger_specification": {
         "single_table_wal_write": {"table_name": "measurement_data"}
    }
}'

@jacksonrnewhouse jacksonrnewhouse force-pushed the processing_engine/write-back branch 2 times, most recently from a24353c to 4e9c322 Compare December 17, 2024 21:39
Copy link
Member

@pauldix pauldix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple of small comments, but otherwise LGTM

influxdb3_py_api/src/system_py.rs Outdated Show resolved Hide resolved
influxdb3_write/src/write_buffer/mod.rs Outdated Show resolved Hide resolved
* Move processing engine invocation to a seperate tokio task.
* Support writing back line protocol from python via insert_line_protocol().
* Update structs to work with bincode.
@jacksonrnewhouse jacksonrnewhouse force-pushed the processing_engine/write-back branch from 4e9c322 to 354b185 Compare December 18, 2024 00:03
@jacksonrnewhouse jacksonrnewhouse merged commit 8bfccb7 into main Dec 18, 2024
13 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants