Skip to content

Commit

Permalink
fix: add minimimal emscripten support via non-HTTP sources (#956)
Browse files Browse the repository at this point in the history
* refactor: small cleanups

* fix: move import of http.client to function

* feat: add trivial in-memory source

* feat: add `BlockingObjectSource`

* fix: support no-threads

* fix: import

* fix: various implementation issues

* fix: properly open and serialise source

* docs: reference `no_threads`

* refactor: rename to `"use_threads"`

* feat: use `use_threads` everywhere

* test: fix test

* test: fix another test

* test: fix another test

* fix: expose private attributes from Source

* test: include `use_threads`

* style: pre-commit fixes

* fix: add fixture to check thread count

* Update src/uproot/_dask.py

Co-authored-by: Jim Pivarski <jpivarski@users.noreply.github.com>

* Update src/uproot/behaviors/TBranch.py

Co-authored-by: Jim Pivarski <jpivarski@users.noreply.github.com>

* Update src/uproot/behaviors/TBranch.py

Co-authored-by: Jim Pivarski <jpivarski@users.noreply.github.com>

* Update src/uproot/reading.py

Co-authored-by: Jim Pivarski <jpivarski@users.noreply.github.com>

* Update src/uproot/reading.py

Co-authored-by: Jim Pivarski <jpivarski@users.noreply.github.com>

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: Jim Pivarski <jpivarski@users.noreply.github.com>
  • Loading branch information
3 people authored Sep 20, 2023
1 parent 5ae172a commit c0adce7
Show file tree
Hide file tree
Showing 11 changed files with 338 additions and 169 deletions.
1 change: 1 addition & 0 deletions src/uproot/_dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ def dask(
* timeout (float for HTTP, int for XRootD; 30)
* max_num_elements (None or int; None)
* num_workers (int; 1)
* use_threads (bool; False on the emscripten platform (i.e. in a web browser), else True)
* num_fallback_workers (int; 10)
* begin_chunk_size (memory_size; 512)
* minimal_ttree_metadata (bool; True)
Expand Down
2 changes: 2 additions & 0 deletions src/uproot/behaviors/TBranch.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ def iterate(
* timeout (float for HTTP, int for XRootD; 30)
* max_num_elements (None or int; None)
* num_workers (int; 1)
* use_threads (bool; False on the emscripten platform (i.e. in a web browser), else True)
* num_fallback_workers (int; 10)
* begin_chunk_size (memory_size; 512)
* minimal_ttree_metadata (bool; True)
Expand Down Expand Up @@ -332,6 +333,7 @@ def concatenate(
* timeout (float for HTTP, int for XRootD; 30)
* max_num_elements (None or int; None)
* num_workers (int; 1)
* use_threads (bool; False on the emscripten platform (i.e. in a web browser), else True)
* num_fallback_workers (int; 10)
* begin_chunk_size (memory_size; 512)
* minimal_ttree_metadata (bool; True)
Expand Down
3 changes: 3 additions & 0 deletions src/uproot/reading.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ def open(
* timeout (float for HTTP, int for XRootD; 30)
* max_num_elements (None or int; None)
* num_workers (int; 1)
* use_threads (bool; False on the emscripten platform (i.e. in a web browser), else True)
* num_fallback_workers (int; 10)
* begin_chunk_size (memory_size; 403, the smallest a ROOT file can be)
* minimal_ttree_metadata (bool; True)
Expand Down Expand Up @@ -184,6 +185,7 @@ def __getitem__(self, where):
"timeout": 30,
"max_num_elements": None,
"num_workers": 1,
"use_threads": sys.platform != "emscripten",
"num_fallback_workers": 10,
"begin_chunk_size": 403, # the smallest a ROOT file can be
"minimal_ttree_metadata": True,
Expand Down Expand Up @@ -540,6 +542,7 @@ class ReadOnlyFile(CommonFileMethods):
* timeout (float for HTTP, int for XRootD; 30)
* max_num_elements (None or int; None)
* num_workers (int; 1)
* use_threads (bool; False on the emscripten platform (i.e. in a web browser), else True)
* num_fallback_workers (int; 10)
* begin_chunk_size (memory_size; 403, the smallest a ROOT file can be)
* minimal_ttree_metadata (bool; True)
Expand Down
16 changes: 11 additions & 5 deletions src/uproot/source/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ class MultithreadedFileSource(uproot.source.chunk.MultithreadedSource):
"""
Args:
file_path (str): The filesystem path of the file to open.
options: Must include ``"num_workers"``.
options: Must include ``"num_workers"`` and ``"use_threads"``.
A :doc:`uproot.source.chunk.MultithreadedSource` that manages many
:doc:`uproot.source.file.FileResource` objects.
Expand All @@ -239,18 +239,24 @@ class MultithreadedFileSource(uproot.source.chunk.MultithreadedSource):
ResourceClass = FileResource

def __init__(self, file_path, **options):
self._num_workers = options["num_workers"]
self._num_requests = 0
self._num_requested_chunks = 0
self._num_requested_bytes = 0
self._use_threads = options["use_threads"]
self._num_workers = options["num_workers"]

self._file_path = file_path
self._open()

def _open(self):
self._executor = uproot.source.futures.ResourceThreadPoolExecutor(
[FileResource(self._file_path) for x in range(self._num_workers)]
)
if self._use_threads:
self._executor = uproot.source.futures.ResourceThreadPoolExecutor(
[FileResource(self._file_path) for x in range(self._num_workers)]
)
else:
self._executor = uproot.source.futures.ResourceTrivialExecutor(
FileResource(self._file_path)
)
self._num_bytes = os.path.getsize(self._file_path)

def __getstate__(self):
Expand Down
4 changes: 4 additions & 0 deletions src/uproot/source/futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,10 @@ def closed(self):
"""
return self._closed

def __enter__(self):
self._resource.__enter__()

def __exit__(self, exception_type, exception_value, traceback):
self.shutdown()
self._resource.__exit__(exception_type, exception_value, traceback)
self._closed = True
97 changes: 60 additions & 37 deletions src/uproot/source/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import queue
import re
import sys
from http.client import HTTPConnection, HTTPSConnection
from urllib.parse import urlparse

import uproot
Expand All @@ -37,6 +36,8 @@ def make_connection(parsed_url, timeout):
Creates a ``http.client.HTTPConnection`` or a ``http.client.HTTPSConnection``,
depending on the URL scheme.
"""
from http.client import HTTPConnection, HTTPSConnection

if parsed_url.scheme == "https":
return HTTPSConnection(
parsed_url.hostname, parsed_url.port, None, None, timeout
Expand Down Expand Up @@ -283,13 +284,13 @@ def multifuture(source, ranges, futures, results):
``results`` and ``futures``. Subsequent attempts would immediately
use the :ref:`uproot.source.http.HTTPSource.fallback`.
"""
connection = [make_connection(source.parsed_url, source.timeout)]
connection = make_connection(source.parsed_url, source.timeout)

range_strings = []
for start, stop in ranges:
range_strings.append(f"{start}-{stop - 1}")

connection[0].request(
connection.request(
"GET",
full_path(source.parsed_url),
headers=dict(
Expand All @@ -298,25 +299,24 @@ def multifuture(source, ranges, futures, results):
)

def task(resource):
nonlocal connection
try:
response = connection[0].getresponse()
response = connection.getresponse()

if 300 <= response.status < 400:
connection[0].close()
connection.close()

for k, x in response.getheaders():
if k.lower() == "location":
redirect_url = urlparse(x)
connection[0] = make_connection(
redirect_url, source.timeout
)
connection[0].request(
connection = make_connection(redirect_url, source.timeout)
connection.request(
"GET",
full_path(redirect_url),
headers=dict(
{"Range": "bytes=" + ", ".join(range_strings)},
headers={
"Range": "bytes=" + ", ".join(range_strings),
**source.auth_headers,
),
},
)
task(resource)
return
Expand All @@ -343,7 +343,7 @@ def task(resource):
future._set_excinfo(excinfo)

finally:
connection[0].close()
connection.close()

return uproot.source.futures.ResourceFuture(task)

Expand Down Expand Up @@ -531,7 +531,7 @@ class HTTPSource(uproot.source.chunk.Source):
"""
Args:
file_path (str): A URL of the file to open.
options: Must include ``"num_fallback_workers"`` and ``"timeout"``.
options: Must include ``"num_fallback_workers"``, ``"use_threads"``, and ``"timeout"``.
A :doc:`uproot.source.chunk.Source` that first attempts an HTTP(S)
multipart GET, but if the server doesn't support it, it falls back to many
Expand All @@ -553,24 +553,29 @@ def __init__(self, file_path, **options):
self._num_requested_chunks = 0
self._num_requested_bytes = 0

self._use_threads = options["use_threads"]
self._file_path = file_path
self._num_bytes = None

self._fallback = None
self._fallback_options = options.copy()
self._fallback_options["num_workers"] = self._num_fallback_workers

# Parse the URL here, so that we can expose these fields
self._parsed_url = urlparse(file_path)
self._auth_headers = basic_auth_headers(self._parsed_url)

self._open()

def _open(self):
# if running in a jupyter lite environment, then use a TrivialExecutor
if sys.platform == "emscripten":
self._executor = uproot.source.futures.ResourceTrivialExecutor(
HTTPResource(self._file_path, self._timeout)
)
else:
if self._use_threads:
self._executor = uproot.source.futures.ResourceThreadPoolExecutor(
[HTTPResource(self._file_path, self._timeout)]
)
else:
self._executor = uproot.source.futures.ResourceTrivialExecutor(
HTTPResource(self._file_path, self._timeout)
)

def __getstate__(self):
state = dict(self.__dict__)
Expand Down Expand Up @@ -667,20 +672,14 @@ def parsed_url(self):
"""
A ``urllib.parse.ParseResult`` version of the ``file_path``.
"""
if sys.platform == "emscripten":
return urlparse(self._file_path)
else:
return self._executor.workers[0].resource.parsed_url
return self._parsed_url

@property
def auth_headers(self):
"""
Dict containing auth headers, if any
"""
if sys.platform == "emscripten":
return basic_auth_headers(self.parsed_url)
else:
return self._executor.workers[0].resource.auth_headers
return self._auth_headers

@property
def fallback(self):
Expand All @@ -704,7 +703,7 @@ class MultithreadedHTTPSource(uproot.source.chunk.MultithreadedSource):
"""
Args:
file_path (str): A URL of the file to open.
options: Must include ``"num_workers"`` and ``"timeout"``.
options: Must include ``"num_workers"``, ``"use_threads"``, and ``"timeout"``.
A :doc:`uproot.source.chunk.MultithreadedSource` that manages many
:doc:`uproot.source.http.HTTPResource` objects.
Expand All @@ -713,19 +712,43 @@ class MultithreadedHTTPSource(uproot.source.chunk.MultithreadedSource):
ResourceClass = HTTPResource

def __init__(self, file_path, **options):
num_workers = options["num_workers"]
timeout = options["timeout"]
self._num_requests = 0
self._num_requested_chunks = 0
self._num_requested_bytes = 0
self._use_threads = options["use_threads"]
self._num_workers = options["num_workers"]

self._file_path = file_path
self._num_bytes = None
self._timeout = timeout
self._timeout = options["timeout"]

self._executor = uproot.source.futures.ResourceThreadPoolExecutor(
[HTTPResource(file_path, timeout) for x in range(num_workers)]
)
# Parse the URL here, so that we can expose these fields
self._parsed_url = urlparse(file_path)
self._auth_headers = basic_auth_headers(self._parsed_url)

self._open()

def __getstate__(self):
state = dict(self.__dict__)
state.pop("_executor")
return state

def __setstate__(self, state):
self.__dict__ = state
self._open()

def _open(self):
if self._use_threads:
self._executor = uproot.source.futures.ResourceThreadPoolExecutor(
[
HTTPResource(self._file_path, self._timeout)
for x in range(self._num_workers)
]
)
else:
self._executor = uproot.source.futures.ResourceTrivialExecutor(
HTTPResource(self._file_path, self._timeout)
)

@property
def timeout(self):
Expand All @@ -747,11 +770,11 @@ def parsed_url(self):
"""
A ``urllib.parse.ParseResult`` version of the ``file_path``.
"""
return self._executor.workers[0].resource.parsed_url
return self._parsed_url

@property
def auth_headers(self):
"""
Dict containing auth headers, if any
"""
return self._executor.workers[0].resource.auth_headers
return self._auth_headers
13 changes: 10 additions & 3 deletions src/uproot/source/object.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,14 @@ def __init__(self, obj, **options):
self._num_requested_bytes = 0

self._file_path = repr(obj)
self._executor = uproot.source.futures.ResourceThreadPoolExecutor(
[ObjectResource(obj)]
)

if options["use_threads"]:
self._executor = uproot.source.futures.ResourceThreadPoolExecutor(
[ObjectResource(obj)]
)
else:
self._executor = uproot.source.futures.ResourceTrivialExecutor(
ObjectResource(obj)
)

self._num_bytes = None
Loading

0 comments on commit c0adce7

Please sign in to comment.