Skip to content

Commit

Permalink
stats
Browse files Browse the repository at this point in the history
  • Loading branch information
e-nikolov committed Jan 23, 2024
1 parent 677bfec commit 662ffba
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 5 deletions.
2 changes: 1 addition & 1 deletion mpyc-web-py/lib/weblooper.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
# import .weblooperV2
from .weblooperV9 import *
from .weblooperV10 import *
108 changes: 108 additions & 0 deletions mpyc-web-py/lib/weblooperV10.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
import asyncio
import collections
import contextvars
import types
from typing import Any, Callable, Optional

import js
import rich
from lib.api import async_proxy
from lib.stats import stats
from pyodide.code import run_js
from pyodide.ffi import IN_BROWSER, create_once_callable, create_proxy
from pyodide.webloop import PyodideFuture, PyodideTask, WebLoop


class WebLooper(WebLoop):
def __init__(self):
super().__init__()
stats.reset()
old_add = asyncio.tasks._all_tasks.add

def add(self, task):
stats_add("total_tasks_count")
old_add(task)

asyncio.tasks._all_tasks.add = types.MethodType(add, asyncio.tasks._all_tasks)
stats_set("total_tasks_count", len(asyncio.tasks._all_tasks))

self.running = False
self._ready = collections.deque()
self._run_once_proxy = create_proxy(self._run_once)
self.chan = js.MessageChannel.new()
self.chan.port1.onmessage = self._run_once_proxy

def call_soon(
self,
callback: Callable[..., Any],
*args: Any,
context: contextvars.Context | None = None,
) -> asyncio.Handle:
delay = 0
return self.call_later(delay, callback, *args, context=context)

def call_later( # type: ignore[override]
self,
delay: float,
callback: Callable[..., Any],
*args: Any,
context: contextvars.Context | None = None,
) -> asyncio.Handle:
if delay < 0:
raise ValueError("Can't schedule in the past")
h = asyncio.Handle(callback, args, self, context=context)

def run_handle():
if h._cancelled:
return
try:
h._run()
except SystemExit as e:
if self._system_exit_handler:
self._system_exit_handler(e.code)
else:
raise
except KeyboardInterrupt:
if self._keyboard_interrupt_handler:
self._keyboard_interrupt_handler()
else:
raise

if delay == 0:
stats_add("call_soon_count")
self._ready.append(run_handle)
if not self.running:
self.running = True
self.trigger_run_once()

return h
js.setTimeout(create_once_callable(run_handle), delay * 1000)
return h

def trigger_run_once(self):
stats_add("loop_iters")
self.chan.port2.postMessage(None)

def _run_once(self, *args, **kwargs):
ntodo = len(self._ready)
stats_set("ntodo", ntodo)
async_proxy.maybe_send_stats()

for _ in range(ntodo):
stats_add("loop_inner_iters")
self._ready.popleft()()

nleft = len(self._ready)
stats_set("ready", nleft)
if nleft == 0:
self.running = False
else:
self.trigger_run_once()


def stats_add(path: str, value=1, prefix="asyncio."):
stats.acc_path(f"{prefix}{path}", value)


def stats_set(path: str, value=1, prefix="asyncio."):
stats.set_path(f"{prefix}{path}", value)
8 changes: 4 additions & 4 deletions mpyc-web-py/mpycweb/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,11 @@ async def create_connection(
self.transports[pid] = t
return t, p

# @stats.acc(lambda self, pid, message: stats.sent_to(pid, message))
@stats.acc(lambda self, pid, message: stats.sent_to(pid, message))
def send_ready_message(self, pid: int, message: str):
self.async_proxy.send("proxy:js:mpc:msg:ready", pid, message)

# @stats.acc(lambda self, pid, message: stats.received_from(pid, message))
@stats.acc(lambda self, pid, message: stats.received_from(pid, message))
def on_ready_message(self, pid: int, message: str):
"""
Handle a 'ready' message from a peer.
Expand All @@ -98,12 +98,12 @@ def on_ready_message(self, pid: int, message: str):
return
self.transports[pid].on_ready_message(message)

# @stats.acc(lambda self, pid, message: stats.sent_to(pid, message) | stats.time())
@stats.acc(lambda self, pid, message: stats.sent_to(pid, message) | stats.time())
def send_runtime_message(self, pid: int, message: bytes):
# add timestamp
self.async_proxy.send("proxy:js:mpc:msg:runtime", pid, message)

# @stats.acc(lambda self, pid, message, ts: stats.received_from(pid, message) | stats.time() | stats.latency(ts))
@stats.acc(lambda self, pid, message, ts: stats.received_from(pid, message) | stats.time() | stats.latency(ts))
def on_runtime_message(self, pid: int, message: JsProxy, ts) -> None:
"""
Handle a runtime message from a peer.
Expand Down

0 comments on commit 662ffba

Please sign in to comment.