From c0adce70d460cacd039fd273076f1c37a4318777 Mon Sep 17 00:00:00 2001 From: Angus Hollands Date: Wed, 20 Sep 2023 22:27:56 +0100 Subject: [PATCH] fix: add minimimal emscripten support via non-HTTP sources (#956) * refactor: small cleanups * fix: move import of http.client to function * feat: add trivial in-memory source * feat: add `BlockingObjectSource` * fix: support no-threads * fix: import * fix: various implementation issues * fix: properly open and serialise source * docs: reference `no_threads` * refactor: rename to `"use_threads"` * feat: use `use_threads` everywhere * test: fix test * test: fix another test * test: fix another test * fix: expose private attributes from Source * test: include `use_threads` * style: pre-commit fixes * fix: add fixture to check thread count * Update src/uproot/_dask.py Co-authored-by: Jim Pivarski * Update src/uproot/behaviors/TBranch.py Co-authored-by: Jim Pivarski * Update src/uproot/behaviors/TBranch.py Co-authored-by: Jim Pivarski * Update src/uproot/reading.py Co-authored-by: Jim Pivarski * Update src/uproot/reading.py Co-authored-by: Jim Pivarski --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> Co-authored-by: Jim Pivarski --- src/uproot/_dask.py | 1 + src/uproot/behaviors/TBranch.py | 2 + src/uproot/reading.py | 3 + src/uproot/source/file.py | 16 +- src/uproot/source/futures.py | 4 + src/uproot/source/http.py | 97 +++++--- src/uproot/source/object.py | 13 +- src/uproot/source/xrootd.py | 38 ++- tests/test_0001-source-class.py | 291 ++++++++++++++-------- tests/test_0006-notify-when-downloaded.py | 23 +- tests/test_0007-single-chunk-interface.py | 19 +- 11 files changed, 338 insertions(+), 169 deletions(-) diff --git a/src/uproot/_dask.py b/src/uproot/_dask.py index 4503b9ff2..0737159f5 100644 --- a/src/uproot/_dask.py +++ b/src/uproot/_dask.py @@ -127,6 +127,7 @@ def dask( * timeout (float for HTTP, int for XRootD; 30) * max_num_elements (None or int; None) * num_workers (int; 1) + * use_threads (bool; False on the emscripten platform (i.e. in a web browser), else True) * num_fallback_workers (int; 10) * begin_chunk_size (memory_size; 512) * minimal_ttree_metadata (bool; True) diff --git a/src/uproot/behaviors/TBranch.py b/src/uproot/behaviors/TBranch.py index 17ba05fa4..c277d3b30 100644 --- a/src/uproot/behaviors/TBranch.py +++ b/src/uproot/behaviors/TBranch.py @@ -160,6 +160,7 @@ def iterate( * timeout (float for HTTP, int for XRootD; 30) * max_num_elements (None or int; None) * num_workers (int; 1) + * use_threads (bool; False on the emscripten platform (i.e. in a web browser), else True) * num_fallback_workers (int; 10) * begin_chunk_size (memory_size; 512) * minimal_ttree_metadata (bool; True) @@ -332,6 +333,7 @@ def concatenate( * timeout (float for HTTP, int for XRootD; 30) * max_num_elements (None or int; None) * num_workers (int; 1) + * use_threads (bool; False on the emscripten platform (i.e. in a web browser), else True) * num_fallback_workers (int; 10) * begin_chunk_size (memory_size; 512) * minimal_ttree_metadata (bool; True) diff --git a/src/uproot/reading.py b/src/uproot/reading.py index ecfa3bfaf..83d9487cb 100644 --- a/src/uproot/reading.py +++ b/src/uproot/reading.py @@ -85,6 +85,7 @@ def open( * timeout (float for HTTP, int for XRootD; 30) * max_num_elements (None or int; None) * num_workers (int; 1) + * use_threads (bool; False on the emscripten platform (i.e. in a web browser), else True) * num_fallback_workers (int; 10) * begin_chunk_size (memory_size; 403, the smallest a ROOT file can be) * minimal_ttree_metadata (bool; True) @@ -184,6 +185,7 @@ def __getitem__(self, where): "timeout": 30, "max_num_elements": None, "num_workers": 1, + "use_threads": sys.platform != "emscripten", "num_fallback_workers": 10, "begin_chunk_size": 403, # the smallest a ROOT file can be "minimal_ttree_metadata": True, @@ -540,6 +542,7 @@ class ReadOnlyFile(CommonFileMethods): * timeout (float for HTTP, int for XRootD; 30) * max_num_elements (None or int; None) * num_workers (int; 1) + * use_threads (bool; False on the emscripten platform (i.e. in a web browser), else True) * num_fallback_workers (int; 10) * begin_chunk_size (memory_size; 403, the smallest a ROOT file can be) * minimal_ttree_metadata (bool; True) diff --git a/src/uproot/source/file.py b/src/uproot/source/file.py index a535e7aea..f5c0630db 100644 --- a/src/uproot/source/file.py +++ b/src/uproot/source/file.py @@ -230,7 +230,7 @@ class MultithreadedFileSource(uproot.source.chunk.MultithreadedSource): """ Args: file_path (str): The filesystem path of the file to open. - options: Must include ``"num_workers"``. + options: Must include ``"num_workers"`` and ``"use_threads"``. A :doc:`uproot.source.chunk.MultithreadedSource` that manages many :doc:`uproot.source.file.FileResource` objects. @@ -239,18 +239,24 @@ class MultithreadedFileSource(uproot.source.chunk.MultithreadedSource): ResourceClass = FileResource def __init__(self, file_path, **options): - self._num_workers = options["num_workers"] self._num_requests = 0 self._num_requested_chunks = 0 self._num_requested_bytes = 0 + self._use_threads = options["use_threads"] + self._num_workers = options["num_workers"] self._file_path = file_path self._open() def _open(self): - self._executor = uproot.source.futures.ResourceThreadPoolExecutor( - [FileResource(self._file_path) for x in range(self._num_workers)] - ) + if self._use_threads: + self._executor = uproot.source.futures.ResourceThreadPoolExecutor( + [FileResource(self._file_path) for x in range(self._num_workers)] + ) + else: + self._executor = uproot.source.futures.ResourceTrivialExecutor( + FileResource(self._file_path) + ) self._num_bytes = os.path.getsize(self._file_path) def __getstate__(self): diff --git a/src/uproot/source/futures.py b/src/uproot/source/futures.py index dc3dc27d5..d74ad2f62 100644 --- a/src/uproot/source/futures.py +++ b/src/uproot/source/futures.py @@ -446,6 +446,10 @@ def closed(self): """ return self._closed + def __enter__(self): + self._resource.__enter__() + def __exit__(self, exception_type, exception_value, traceback): self.shutdown() + self._resource.__exit__(exception_type, exception_value, traceback) self._closed = True diff --git a/src/uproot/source/http.py b/src/uproot/source/http.py index cb34f0402..6278e0733 100644 --- a/src/uproot/source/http.py +++ b/src/uproot/source/http.py @@ -19,7 +19,6 @@ import queue import re import sys -from http.client import HTTPConnection, HTTPSConnection from urllib.parse import urlparse import uproot @@ -37,6 +36,8 @@ def make_connection(parsed_url, timeout): Creates a ``http.client.HTTPConnection`` or a ``http.client.HTTPSConnection``, depending on the URL scheme. """ + from http.client import HTTPConnection, HTTPSConnection + if parsed_url.scheme == "https": return HTTPSConnection( parsed_url.hostname, parsed_url.port, None, None, timeout @@ -283,13 +284,13 @@ def multifuture(source, ranges, futures, results): ``results`` and ``futures``. Subsequent attempts would immediately use the :ref:`uproot.source.http.HTTPSource.fallback`. """ - connection = [make_connection(source.parsed_url, source.timeout)] + connection = make_connection(source.parsed_url, source.timeout) range_strings = [] for start, stop in ranges: range_strings.append(f"{start}-{stop - 1}") - connection[0].request( + connection.request( "GET", full_path(source.parsed_url), headers=dict( @@ -298,25 +299,24 @@ def multifuture(source, ranges, futures, results): ) def task(resource): + nonlocal connection try: - response = connection[0].getresponse() + response = connection.getresponse() if 300 <= response.status < 400: - connection[0].close() + connection.close() for k, x in response.getheaders(): if k.lower() == "location": redirect_url = urlparse(x) - connection[0] = make_connection( - redirect_url, source.timeout - ) - connection[0].request( + connection = make_connection(redirect_url, source.timeout) + connection.request( "GET", full_path(redirect_url), - headers=dict( - {"Range": "bytes=" + ", ".join(range_strings)}, + headers={ + "Range": "bytes=" + ", ".join(range_strings), **source.auth_headers, - ), + }, ) task(resource) return @@ -343,7 +343,7 @@ def task(resource): future._set_excinfo(excinfo) finally: - connection[0].close() + connection.close() return uproot.source.futures.ResourceFuture(task) @@ -531,7 +531,7 @@ class HTTPSource(uproot.source.chunk.Source): """ Args: file_path (str): A URL of the file to open. - options: Must include ``"num_fallback_workers"`` and ``"timeout"``. + options: Must include ``"num_fallback_workers"``, ``"use_threads"``, and ``"timeout"``. A :doc:`uproot.source.chunk.Source` that first attempts an HTTP(S) multipart GET, but if the server doesn't support it, it falls back to many @@ -553,24 +553,29 @@ def __init__(self, file_path, **options): self._num_requested_chunks = 0 self._num_requested_bytes = 0 + self._use_threads = options["use_threads"] self._file_path = file_path self._num_bytes = None self._fallback = None self._fallback_options = options.copy() self._fallback_options["num_workers"] = self._num_fallback_workers + + # Parse the URL here, so that we can expose these fields + self._parsed_url = urlparse(file_path) + self._auth_headers = basic_auth_headers(self._parsed_url) + self._open() def _open(self): - # if running in a jupyter lite environment, then use a TrivialExecutor - if sys.platform == "emscripten": - self._executor = uproot.source.futures.ResourceTrivialExecutor( - HTTPResource(self._file_path, self._timeout) - ) - else: + if self._use_threads: self._executor = uproot.source.futures.ResourceThreadPoolExecutor( [HTTPResource(self._file_path, self._timeout)] ) + else: + self._executor = uproot.source.futures.ResourceTrivialExecutor( + HTTPResource(self._file_path, self._timeout) + ) def __getstate__(self): state = dict(self.__dict__) @@ -667,20 +672,14 @@ def parsed_url(self): """ A ``urllib.parse.ParseResult`` version of the ``file_path``. """ - if sys.platform == "emscripten": - return urlparse(self._file_path) - else: - return self._executor.workers[0].resource.parsed_url + return self._parsed_url @property def auth_headers(self): """ Dict containing auth headers, if any """ - if sys.platform == "emscripten": - return basic_auth_headers(self.parsed_url) - else: - return self._executor.workers[0].resource.auth_headers + return self._auth_headers @property def fallback(self): @@ -704,7 +703,7 @@ class MultithreadedHTTPSource(uproot.source.chunk.MultithreadedSource): """ Args: file_path (str): A URL of the file to open. - options: Must include ``"num_workers"`` and ``"timeout"``. + options: Must include ``"num_workers"``, ``"use_threads"``, and ``"timeout"``. A :doc:`uproot.source.chunk.MultithreadedSource` that manages many :doc:`uproot.source.http.HTTPResource` objects. @@ -713,19 +712,43 @@ class MultithreadedHTTPSource(uproot.source.chunk.MultithreadedSource): ResourceClass = HTTPResource def __init__(self, file_path, **options): - num_workers = options["num_workers"] - timeout = options["timeout"] self._num_requests = 0 self._num_requested_chunks = 0 self._num_requested_bytes = 0 + self._use_threads = options["use_threads"] + self._num_workers = options["num_workers"] self._file_path = file_path self._num_bytes = None - self._timeout = timeout + self._timeout = options["timeout"] - self._executor = uproot.source.futures.ResourceThreadPoolExecutor( - [HTTPResource(file_path, timeout) for x in range(num_workers)] - ) + # Parse the URL here, so that we can expose these fields + self._parsed_url = urlparse(file_path) + self._auth_headers = basic_auth_headers(self._parsed_url) + + self._open() + + def __getstate__(self): + state = dict(self.__dict__) + state.pop("_executor") + return state + + def __setstate__(self, state): + self.__dict__ = state + self._open() + + def _open(self): + if self._use_threads: + self._executor = uproot.source.futures.ResourceThreadPoolExecutor( + [ + HTTPResource(self._file_path, self._timeout) + for x in range(self._num_workers) + ] + ) + else: + self._executor = uproot.source.futures.ResourceTrivialExecutor( + HTTPResource(self._file_path, self._timeout) + ) @property def timeout(self): @@ -747,11 +770,11 @@ def parsed_url(self): """ A ``urllib.parse.ParseResult`` version of the ``file_path``. """ - return self._executor.workers[0].resource.parsed_url + return self._parsed_url @property def auth_headers(self): """ Dict containing auth headers, if any """ - return self._executor.workers[0].resource.auth_headers + return self._auth_headers diff --git a/src/uproot/source/object.py b/src/uproot/source/object.py index f8a63c1e1..019d1f57f 100644 --- a/src/uproot/source/object.py +++ b/src/uproot/source/object.py @@ -112,7 +112,14 @@ def __init__(self, obj, **options): self._num_requested_bytes = 0 self._file_path = repr(obj) - self._executor = uproot.source.futures.ResourceThreadPoolExecutor( - [ObjectResource(obj)] - ) + + if options["use_threads"]: + self._executor = uproot.source.futures.ResourceThreadPoolExecutor( + [ObjectResource(obj)] + ) + else: + self._executor = uproot.source.futures.ResourceTrivialExecutor( + ObjectResource(obj) + ) + self._num_bytes = None diff --git a/src/uproot/source/xrootd.py b/src/uproot/source/xrootd.py index 57dd06b3e..be0897b17 100644 --- a/src/uproot/source/xrootd.py +++ b/src/uproot/source/xrootd.py @@ -11,6 +11,7 @@ """ +import contextlib import sys import uproot @@ -68,6 +69,11 @@ def get_server_config(file): return readv_iov_max, readv_ior_max +@contextlib.contextmanager +def trivial_resource(): + yield + + class XRootDResource(uproot.source.chunk.Resource): """ Args: @@ -249,7 +255,7 @@ class XRootDSource(uproot.source.chunk.Source): """ Args: file_path (str): A URL of the file to open. - options: Must include ``"timeout"``, ``"max_num_elements"`` and ``"num_workers"`` + options: Must include ``"timeout"``, ``"max_num_elements"``, ``"use_threads"``, and ``"num_workers"`` A :doc:`uproot.source.chunk.Source` that uses XRootD's vector-read to get many chunks in one request. @@ -260,6 +266,7 @@ class XRootDSource(uproot.source.chunk.Source): def __init__(self, file_path, **options): self._timeout = options["timeout"] self._desired_max_num_elements = options["max_num_elements"] + self._use_threads = options["use_threads"] self._num_workers = options["num_workers"] self._num_requests = 0 self._num_requested_chunks = 0 @@ -274,9 +281,14 @@ def _open(self): # this ThreadPool does not need a resource, it's only used to submit # futures that wait for chunks that have been split to merge them. - self._executor = uproot.source.futures.ResourceThreadPoolExecutor( - [None for i in range(self._num_workers)] - ) + if self._use_threads: + self._executor = uproot.source.futures.ResourceThreadPoolExecutor( + [trivial_resource() for x in range(self._num_workers)] + ) + else: + self._executor = uproot.source.futures.ResourceTrivialExecutor( + trivial_resource() + ) self._max_num_elements, self._max_element_size = get_server_config( self._resource.file @@ -431,7 +443,7 @@ class MultithreadedXRootDSource(uproot.source.chunk.MultithreadedSource): """ Args: file_path (str): A URL of the file to open. - options: Must include ``"num_workers"`` and ``"timeout"``. + options: Must include ``"num_workers"``, ``"use_threads"``, and ``"timeout"``. A :doc:`uproot.source.chunk.MultithreadedSource` that manages many :doc:`uproot.source.xrootd.XRootDResource` objects. @@ -442,6 +454,7 @@ class MultithreadedXRootDSource(uproot.source.chunk.MultithreadedSource): def __init__(self, file_path, **options): self._num_workers = options["num_workers"] self._timeout = options["timeout"] + self._use_threads = options["use_threads"] self._num_requests = 0 self._num_requested_chunks = 0 self._num_requested_bytes = 0 @@ -451,12 +464,17 @@ def __init__(self, file_path, **options): self._open() def _open(self): - self._executor = uproot.source.futures.ResourceThreadPoolExecutor( - [ + if self._use_threads: + self._executor = uproot.source.futures.ResourceThreadPoolExecutor( + [ + XRootDResource(self._file_path, self._timeout) + for x in range(self._num_workers) + ] + ) + else: + self._executor = uproot.source.futures.ResourceTrivialExecutor( XRootDResource(self._file_path, self._timeout) - for x in range(self._num_workers) - ] - ) + ) def __getstate__(self): state = dict(self.__dict__) diff --git a/tests/test_0001-source-class.py b/tests/test_0001-source-class.py index da80c2441..616f7f627 100644 --- a/tests/test_0001-source-class.py +++ b/tests/test_0001-source-class.py @@ -5,10 +5,11 @@ import queue import sys from io import StringIO +import contextlib import numpy import pytest - +import threading import uproot @@ -19,57 +20,79 @@ def tobytes(x): return x.tostring() -def test_file(tmpdir): - filename = os.path.join(str(tmpdir), "tmp.raw") +@pytest.fixture +def use_threads(request): + if request.param: + yield + return + else: + print("CHECK") + n_threads = threading.active_count() + yield request.param + assert threading.active_count() == n_threads + +@pytest.mark.parametrize( + "use_threads,num_workers", + [(True, 1), (True, 2), (False, 0)], + indirect=["use_threads"], +) +def test_file(use_threads, num_workers, tmp_path): + filename = tmp_path / "tmp.raw" with open(filename, "wb") as tmp: tmp.write(b"****** ...+++++++!!!!!@@@@@") - for num_workers in [1, 2]: - source = uproot.source.file.MultithreadedFileSource( - filename, num_workers=num_workers + with uproot.source.file.MultithreadedFileSource( + filename, num_workers=num_workers, use_threads=use_threads + ) as source: + notifications = queue.Queue() + chunks = source.chunks( + [(0, 6), (6, 10), (10, 13), (13, 20), (20, 25), (25, 30)], + notifications, ) - with source as tmp: - notifications = queue.Queue() - chunks = tmp.chunks( - [(0, 6), (6, 10), (10, 13), (13, 20), (20, 25), (25, 30)], - notifications, - ) - assert [tobytes(chunk.raw_data) for chunk in chunks] == [ - b"******", - b" ", - b"...", - b"+++++++", - b"!!!!!", - b"@@@@@", - ] + assert [tobytes(chunk.raw_data) for chunk in chunks] == [ + b"******", + b" ", + b"...", + b"+++++++", + b"!!!!!", + b"@@@@@", + ] - assert source.num_bytes == 30 + assert source.num_bytes == 30 -def test_file_fail(tmpdir): - filename = os.path.join(str(tmpdir), "tmp.raw") +@pytest.mark.parametrize( + "use_threads,num_workers", + [(True, 1), (True, 2), (False, 0)], + indirect=["use_threads"], +) +def test_file_fail(use_threads, num_workers, tmp_path): + filename = tmp_path / "tmp.raw" with open(filename, "wb") as tmp: tmp.write(b"****** ...+++++++!!!!!@@@@@") - for num_workers in [1, 2]: - with pytest.raises(Exception): - uproot.source.file.MultithreadedFileSource( - filename + "-does-not-exist", num_workers=num_workers - ) + with pytest.raises(Exception): + uproot.source.file.MultithreadedFileSource( + filename + "-does-not-exist", + num_workers=num_workers, + use_threads=use_threads, + ) -def test_memmap(tmpdir): - filename = os.path.join(str(tmpdir), "tmp.raw") +@pytest.mark.parametrize("use_threads", [True, False], indirect=True) +def test_memmap(use_threads, tmp_path): + filename = tmp_path / "tmp.raw" with open(filename, "wb") as tmp: tmp.write(b"****** ...+++++++!!!!!@@@@@") - source = uproot.source.file.MemmapSource(filename, num_fallback_workers=1) - with source as tmp: + with uproot.source.file.MemmapSource( + filename, num_fallback_workers=1, use_threads=use_threads + ) as source: notifications = queue.Queue() - chunks = tmp.chunks( + chunks = source.chunks( [(0, 6), (6, 10), (10, 13), (13, 20), (20, 25), (25, 30)], notifications ) assert [tobytes(chunk.raw_data) for chunk in chunks] == [ @@ -84,35 +107,43 @@ def test_memmap(tmpdir): assert source.num_bytes == 30 -def test_memmap_fail(tmpdir): - filename = os.path.join(str(tmpdir), "tmp.raw") +@pytest.mark.parametrize("use_threads", [True, False], indirect=True) +def test_memmap_fail(use_threads, tmp_path): + filename = tmp_path / "tmp.raw" with open(filename, "wb") as tmp: tmp.write(b"****** ...+++++++!!!!!@@@@@") with pytest.raises(Exception): - uproot.source.file.MultithreadedFileSource(filename + "-does-not-exist") + with uproot.source.file.MemmapSource( + tmp_path / f"{filename.name}-does-not-exist", + num_fallback_workers=1, + use_threads=use_threads, + ): + ... @pytest.mark.skip(reason="RECHECK: example.com is flaky, too") +@pytest.mark.parametrize("use_threads", [True, False], indirect=True) @pytest.mark.network -def test_http(): - source = uproot.source.http.HTTPSource( - "https://example.com", timeout=10, num_fallback_workers=1 - ) - with source as tmp: +def test_http(use_threads): + with uproot.source.http.HTTPSource( + "https://example.com", + timeout=10, + num_fallback_workers=1, + use_threads=use_threads, + ) as tmp: notifications = queue.Queue() chunks = tmp.chunks([(0, 100), (50, 55), (200, 400)], notifications) one, two, three = (tobytes(chunk.raw_data) for chunk in chunks) assert len(one) == 100 assert len(two) == 5 assert len(three) == 200 - assert source.fallback is None + assert tmp.fallback is None - source = uproot.source.http.MultithreadedHTTPSource( - "https://example.com", num_workers=1, timeout=10 - ) - with source as tmp: + with uproot.source.http.MultithreadedHTTPSource( + "https://example.com", num_workers=1, timeout=10, use_threads=use_threads + ) as tmp: notifications = queue.Queue() chunks = tmp.chunks([(0, 100), (50, 55), (200, 400)], notifications) assert [tobytes(x.raw_data) for x in chunks] == [one, two, three] @@ -134,10 +165,14 @@ def colons_and_ports(): @pytest.mark.skip(reason="RECHECK: example.com is flaky, too") +@pytest.mark.parametrize("use_threads", [True, False], indirect=True) @pytest.mark.network -def test_http_port(): +def test_http_port(use_threads): source = uproot.source.http.HTTPSource( - "https://example.com:443", timeout=10, num_fallback_workers=1 + "https://example.com:443", + timeout=10, + num_fallback_workers=1, + use_threads=use_threads, ) with source as tmp: notifications = queue.Queue() @@ -148,7 +183,7 @@ def test_http_port(): assert len(three) == 200 source = uproot.source.http.MultithreadedHTTPSource( - "https://example.com:443", num_workers=1, timeout=10 + "https://example.com:443", num_workers=1, timeout=10, use_threads=use_threads ) with source as tmp: notifications = queue.Queue() @@ -156,29 +191,36 @@ def test_http_port(): assert [tobytes(x.raw_data) for x in chunks] == [one, two, three] +@pytest.mark.parametrize("use_threads", [True, False], indirect=True) @pytest.mark.network -def test_http_size(): +def test_http_size(use_threads): with uproot.source.http.HTTPSource( "https://scikit-hep.org/uproot3/examples/Zmumu.root", timeout=10, num_fallback_workers=1, + use_threads=use_threads, ) as source: size1 = source.num_bytes with uproot.source.http.MultithreadedHTTPSource( - "https://scikit-hep.org/uproot3/examples/Zmumu.root", num_workers=1, timeout=10 + "https://scikit-hep.org/uproot3/examples/Zmumu.root", + num_workers=1, + timeout=10, + use_threads=use_threads, ) as source: size2 = source.num_bytes assert size1 == size2 +@pytest.mark.parametrize("use_threads", [True, False], indirect=True) @pytest.mark.network -def test_http_size_port(): +def test_http_size_port(use_threads): with uproot.source.http.HTTPSource( "https://scikit-hep.org:443/uproot3/examples/Zmumu.root", timeout=10, num_fallback_workers=1, + use_threads=use_threads, ) as source: size1 = source.num_bytes @@ -186,16 +228,21 @@ def test_http_size_port(): "https://scikit-hep.org:443/uproot3/examples/Zmumu.root", num_workers=1, timeout=10, + use_threads=use_threads, ) as source: size2 = source.num_bytes assert size1 == size2 +@pytest.mark.parametrize("use_threads", [True, False], indirect=True) @pytest.mark.network -def test_http_fail(): +def test_http_fail(use_threads): source = uproot.source.http.HTTPSource( - "https://wonky.cern/does-not-exist", timeout=0.1, num_fallback_workers=1 + "https://wonky.cern/does-not-exist", + timeout=0.1, + num_fallback_workers=1, + use_threads=use_threads, ) with pytest.raises(Exception) as err: notifications = queue.Queue() @@ -203,50 +250,67 @@ def test_http_fail(): chunks[0].raw_data +@pytest.mark.parametrize( + "use_threads,num_workers", + [(True, 1), (True, 2), (False, 0)], + indirect=["use_threads"], +) @pytest.mark.network -def test_no_multipart(): - for num_workers in [1, 2]: - with uproot.source.http.MultithreadedHTTPSource( - "https://scikit-hep.org/uproot3/examples/Zmumu.root", - num_workers=num_workers, - timeout=10, - ) as source: - notifications = queue.Queue() - chunks = source.chunks([(0, 100), (50, 55), (200, 400)], notifications) - one, two, three = (tobytes(chunk.raw_data) for chunk in chunks) - assert len(one) == 100 - assert len(two) == 5 - assert len(three) == 200 - assert one[:4] == b"root" +def test_no_multipart(use_threads, num_workers): + with uproot.source.http.MultithreadedHTTPSource( + "https://scikit-hep.org/uproot3/examples/Zmumu.root", + num_workers=num_workers, + timeout=10, + use_threads=use_threads, + ) as source: + notifications = queue.Queue() + chunks = source.chunks([(0, 100), (50, 55), (200, 400)], notifications) + one, two, three = (tobytes(chunk.raw_data) for chunk in chunks) + assert len(one) == 100 + assert len(two) == 5 + assert len(three) == 200 + assert one[:4] == b"root" +@pytest.mark.parametrize( + "use_threads,num_workers", + [(True, 1), (True, 2), (False, 0)], + indirect=["use_threads"], +) @pytest.mark.network -def test_no_multipart_fail(): - for num_workers in [1, 2]: - source = uproot.source.http.MultithreadedHTTPSource( - "https://wonky.cern/does-not-exist", num_workers=num_workers, timeout=0.1 - ) - with pytest.raises(Exception) as err: - notifications = queue.Queue() - chunks = source.chunks([(0, 100), (50, 55), (200, 400)], notifications) - chunks[0].raw_data +def test_no_multipart_fail(use_threads, num_workers): + source = uproot.source.http.MultithreadedHTTPSource( + "https://wonky.cern/does-not-exist", + num_workers=num_workers, + timeout=0.1, + use_threads=use_threads, + ) + with pytest.raises(Exception) as err: + notifications = queue.Queue() + chunks = source.chunks([(0, 100), (50, 55), (200, 400)], notifications) + chunks[0].raw_data +@pytest.mark.parametrize( + "use_threads,num_workers", + [(True, 1), (True, 2), (False, 0)], + indirect=["use_threads"], +) @pytest.mark.network -def test_fallback(): - for num_workers in [1, 2]: - with uproot.source.http.HTTPSource( - "https://scikit-hep.org/uproot3/examples/Zmumu.root", - timeout=10, - num_fallback_workers=num_workers, - ) as source: - notifications = queue.Queue() - chunks = source.chunks([(0, 100), (50, 55), (200, 400)], notifications) - one, two, three = (tobytes(chunk.raw_data) for chunk in chunks) - assert len(one) == 100 - assert len(two) == 5 - assert len(three) == 200 - assert one[:4] == b"root" +def test_fallback(use_threads, num_workers): + with uproot.source.http.HTTPSource( + "https://scikit-hep.org/uproot3/examples/Zmumu.root", + timeout=10, + num_fallback_workers=num_workers, + use_threads=use_threads, + ) as source: + notifications = queue.Queue() + chunks = source.chunks([(0, 100), (50, 55), (200, 400)], notifications) + one, two, three = (tobytes(chunk.raw_data) for chunk in chunks) + assert len(one) == 100 + assert len(two) == 5 + assert len(three) == 200 + assert one[:4] == b"root" @pytest.mark.skip( @@ -254,12 +318,14 @@ def test_fallback(): ) @pytest.mark.network @pytest.mark.xrootd -def test_xrootd(): +@pytest.mark.parametrize("use_threads", [True, False], indirect=True) +def test_xrootd(use_threads): pytest.importorskip("XRootD") with uproot.source.xrootd.MultithreadedXRootDSource( "root://eospublic.cern.ch//eos/root-eos/cms_opendata_2012_nanoaod/Run2012B_DoubleMuParked.root", num_workers=1, timeout=20, + use_threads=use_threads, ) as source: notifications = queue.Queue() chunks = source.chunks([(0, 100), (50, 55), (200, 400)], notifications) @@ -275,22 +341,28 @@ def test_xrootd(): ) @pytest.mark.network @pytest.mark.xrootd -def test_xrootd_deadlock(): +@pytest.mark.parametrize("use_threads", [True, False], indirect=True) +def test_xrootd_deadlock(use_threads): pytest.importorskip("XRootD") # Attach this file to the "test_xrootd_deadlock" function so it leaks pytest.uproot_test_xrootd_deadlock_f = uproot.source.xrootd.XRootDResource( "root://eospublic.cern.ch//eos/root-eos/cms_opendata_2012_nanoaod/Run2012B_DoubleMuParked.root", timeout=20, + use_threads=use_threads, ) @pytest.mark.network @pytest.mark.xrootd -def test_xrootd_fail(): +@pytest.mark.parametrize("use_threads", [True, False], indirect=True) +def test_xrootd_fail(use_threads): pytest.importorskip("XRootD") with pytest.raises(Exception) as err: - source = uproot.source.xrootd.MultithreadedXRootDSource( - "root://wonky.cern/does-not-exist", num_workers=1, timeout=1 + uproot.source.xrootd.MultithreadedXRootDSource( + "root://wonky.cern/does-not-exist", + num_workers=1, + timeout=1, + use_threads=use_threads, ) @@ -299,13 +371,15 @@ def test_xrootd_fail(): ) @pytest.mark.network @pytest.mark.xrootd -def test_xrootd_vectorread(): +@pytest.mark.parametrize("use_threads", [True, False], indirect=True) +def test_xrootd_vectorread(use_threads): pytest.importorskip("XRootD") with uproot.source.xrootd.XRootDSource( "root://eospublic.cern.ch//eos/root-eos/cms_opendata_2012_nanoaod/Run2012B_DoubleMuParked.root", timeout=10, max_num_elements=None, num_workers=1, + use_threads=use_threads, ) as source: notifications = queue.Queue() chunks = source.chunks([(0, 100), (50, 55), (200, 400)], notifications) @@ -321,13 +395,15 @@ def test_xrootd_vectorread(): ) @pytest.mark.network @pytest.mark.xrootd -def test_xrootd_vectorread_max_element_split(): +@pytest.mark.parametrize("use_threads", [True, False], indirect=True) +def test_xrootd_vectorread_max_element_split(use_threads): pytest.importorskip("XRootD") with uproot.source.xrootd.XRootDSource( "root://eospublic.cern.ch//eos/root-eos/cms_opendata_2012_nanoaod/Run2012B_DoubleMuParked.root", timeout=10, max_num_elements=None, num_workers=1, + use_threads=use_threads, ) as source: notifications = queue.Queue() max_element_size = 2097136 @@ -341,7 +417,8 @@ def test_xrootd_vectorread_max_element_split(): ) @pytest.mark.network @pytest.mark.xrootd -def test_xrootd_vectorread_max_element_split_consistency(): +@pytest.mark.parametrize("use_threads", [True, False], indirect=True) +def test_xrootd_vectorread_max_element_split_consistency(use_threads): pytest.importorskip("XRootD") filename = "root://eospublic.cern.ch//eos/root-eos/cms_opendata_2012_nanoaod/Run2012B_DoubleMuParked.root" @@ -358,6 +435,7 @@ def get_chunk(Source, **kwargs): timeout=10, max_num_elements=None, num_workers=1, + use_threads=use_threads, ) chunk2 = get_chunk( uproot.source.xrootd.MultithreadedXRootDSource, timeout=10, num_workers=1 @@ -367,14 +445,16 @@ def get_chunk(Source, **kwargs): @pytest.mark.network @pytest.mark.xrootd -def test_xrootd_vectorread_fail(): +@pytest.mark.parametrize("use_threads", [True, False], indirect=True) +def test_xrootd_vectorread_fail(use_threads): pytest.importorskip("XRootD") with pytest.raises(Exception) as err: - source = uproot.source.xrootd.XRootDSource( + uproot.source.xrootd.XRootDSource( "root://wonky.cern/does-not-exist", timeout=1, max_num_elements=None, num_workers=1, + use_threads=use_threads, ) @@ -383,13 +463,15 @@ def test_xrootd_vectorread_fail(): ) @pytest.mark.network @pytest.mark.xrootd -def test_xrootd_size(): +@pytest.mark.parametrize("use_threads", [True, False], indirect=True) +def test_xrootd_size(use_threads): pytest.importorskip("XRootD") with uproot.source.xrootd.XRootDSource( "root://eospublic.cern.ch//eos/root-eos/cms_opendata_2012_nanoaod/Run2012B_DoubleMuParked.root", timeout=10, max_num_elements=None, num_workers=1, + use_threads=use_threads, ) as source: size1 = source.num_bytes @@ -398,6 +480,7 @@ def test_xrootd_size(): "root://eospublic.cern.ch//eos/root-eos/cms_opendata_2012_nanoaod/Run2012B_DoubleMuParked.root", timeout=10, num_workers=1, + use_threads=use_threads, ) as source: size2 = source.num_bytes @@ -410,13 +493,15 @@ def test_xrootd_size(): ) @pytest.mark.network @pytest.mark.xrootd -def test_xrootd_numpy_int(): +@pytest.mark.parametrize("use_threads", [True, False], indirect=True) +def test_xrootd_numpy_int(use_threads): pytest.importorskip("XRootD") with uproot.source.xrootd.XRootDSource( "root://eospublic.cern.ch//eos/root-eos/cms_opendata_2012_nanoaod/Run2012B_DoubleMuParked.root", timeout=10, max_num_elements=None, num_workers=1, + use_threads=use_threads, ) as source: chunk = source.chunk(numpy.int64(0), numpy.int64(100)) assert len(chunk.raw_data) == 100 diff --git a/tests/test_0006-notify-when-downloaded.py b/tests/test_0006-notify-when-downloaded.py index 177258a28..527983642 100644 --- a/tests/test_0006-notify-when-downloaded.py +++ b/tests/test_0006-notify-when-downloaded.py @@ -17,7 +17,9 @@ def test_file(tmpdir): tmp.write(b"****** ...+++++++!!!!!@@@@@") notifications = queue.Queue() - with uproot.source.file.MultithreadedFileSource(filename, num_workers=1) as source: + with uproot.source.file.MultithreadedFileSource( + filename, num_workers=1, use_threads=True + ) as source: chunks = source.chunks( [(0, 6), (6, 10), (10, 13), (13, 20), (20, 25), (25, 30)], notifications=notifications, @@ -34,7 +36,9 @@ def test_file_workers(tmpdir): tmp.write(b"****** ...+++++++!!!!!@@@@@") notifications = queue.Queue() - with uproot.source.file.MultithreadedFileSource(filename, num_workers=5) as source: + with uproot.source.file.MultithreadedFileSource( + filename, num_workers=5, use_threads=True + ) as source: chunks = source.chunks( [(0, 6), (6, 10), (10, 13), (13, 20), (20, 25), (25, 30)], notifications=notifications, @@ -51,7 +55,9 @@ def test_memmap(tmpdir): tmp.write(b"****** ...+++++++!!!!!@@@@@") notifications = queue.Queue() - with uproot.source.file.MemmapSource(filename, num_fallback_workers=1) as source: + with uproot.source.file.MemmapSource( + filename, num_fallback_workers=1, use_threads=True + ) as source: chunks = source.chunks( [(0, 6), (6, 10), (10, 13), (13, 20), (20, 25), (25, 30)], notifications=notifications, @@ -67,7 +73,7 @@ def test_memmap(tmpdir): def test_http_multipart(): notifications = queue.Queue() with uproot.source.http.HTTPSource( - "https://example.com", timeout=10, num_fallback_workers=1 + "https://example.com", timeout=10, num_fallback_workers=1, use_threads=True ) as source: chunks = source.chunks( [(0, 100), (50, 55), (200, 400)], notifications=notifications @@ -83,7 +89,7 @@ def test_http_multipart(): def test_http(): notifications = queue.Queue() with uproot.source.http.MultithreadedHTTPSource( - "https://example.com", timeout=10, num_workers=1 + "https://example.com", timeout=10, num_workers=1, use_threads=True ) as source: chunks = source.chunks( [(0, 100), (50, 55), (200, 400)], notifications=notifications @@ -99,7 +105,7 @@ def test_http(): def test_http_workers(): notifications = queue.Queue() with uproot.source.http.MultithreadedHTTPSource( - "https://example.com", timeout=10, num_workers=2 + "https://example.com", timeout=10, num_workers=2, use_threads=True ) as source: chunks = source.chunks( [(0, 100), (50, 55), (200, 400)], notifications=notifications @@ -117,6 +123,7 @@ def test_http_fallback(): "https://scikit-hep.org/uproot3/examples/Zmumu.root", timeout=10, num_fallback_workers=1, + use_threads=True, ) as source: chunks = source.chunks( [(0, 100), (50, 55), (200, 400)], notifications=notifications @@ -134,6 +141,7 @@ def test_http_fallback_workers(): "https://scikit-hep.org/uproot3/examples/Zmumu.root", timeout=10, num_fallback_workers=5, + use_threads=True, ) as source: chunks = source.chunks( [(0, 100), (50, 55), (200, 400)], notifications=notifications @@ -156,6 +164,7 @@ def test_xrootd(): "root://eospublic.cern.ch//eos/root-eos/cms_opendata_2012_nanoaod/Run2012B_DoubleMuParked.root", num_workers=1, timeout=10, + use_threads=True, ) as source: chunks = source.chunks( [(0, 100), (50, 55), (200, 400)], notifications=notifications @@ -178,6 +187,7 @@ def test_xrootd_workers(): "root://eospublic.cern.ch//eos/root-eos/cms_opendata_2012_nanoaod/Run2012B_DoubleMuParked.root", num_workers=5, timeout=10, + use_threads=True, ) as source: chunks = source.chunks( [(0, 100), (50, 55), (200, 400)], notifications=notifications @@ -201,6 +211,7 @@ def test_xrootd_vectorread(): timeout=10, max_num_elements=None, num_workers=1, + use_threads=True, ) as source: chunks = source.chunks( [(0, 100), (50, 55), (200, 400)], notifications=notifications diff --git a/tests/test_0007-single-chunk-interface.py b/tests/test_0007-single-chunk-interface.py index 77859e1f2..359e796c2 100644 --- a/tests/test_0007-single-chunk-interface.py +++ b/tests/test_0007-single-chunk-interface.py @@ -32,7 +32,7 @@ def test_file(tmpdir): for num_workers in [1, 2]: with uproot.source.file.MultithreadedFileSource( - filename, num_workers=num_workers + filename, num_workers=num_workers, use_threads=True ) as source: for i, (start, stop) in enumerate( [(0, 6), (6, 10), (10, 13), (13, 20), (20, 25), (25, 30)] @@ -60,7 +60,9 @@ def test_memmap(tmpdir): b"@@@@@", ] - with uproot.source.file.MemmapSource(filename, num_fallback_workers=1) as source: + with uproot.source.file.MemmapSource( + filename, num_fallback_workers=1, use_threads=True + ) as source: for i, (start, stop) in enumerate( [(0, 6), (6, 10), (10, 13), (13, 20), (20, 25), (25, 30)] ): @@ -78,7 +80,7 @@ def test_memmap(tmpdir): def test_http(): for num_workers in [1, 2]: with uproot.source.http.MultithreadedHTTPSource( - "https://example.com", num_workers=num_workers, timeout=10 + "https://example.com", num_workers=num_workers, timeout=10, use_threads=True ) as source: for start, stop in [(0, 100), (50, 55), (200, 400)]: chunk = source.chunk(start, stop) @@ -93,6 +95,7 @@ def test_http_fail(): "https://wonky.cern/does-not-exist", num_workers=num_workers, timeout=0.1, + use_threads=True, ) as source: source.chunk(0, 100) @@ -101,7 +104,7 @@ def test_http_fail(): @pytest.mark.network def test_http_multipart(): with uproot.source.http.HTTPSource( - "https://example.com", timeout=10, num_fallback_workers=1 + "https://example.com", timeout=10, num_fallback_workers=1, use_threads=True ) as source: for start, stop in [(0, 100), (50, 55), (200, 400)]: chunk = source.chunk(start, stop) @@ -112,7 +115,10 @@ def test_http_multipart(): def test_http_multipart_fail(): with pytest.raises(Exception): with uproot.source.http.HTTPSource( - "https://wonky.cern/does-not-exist", timeout=0.1, num_fallback_workers=1 + "https://wonky.cern/does-not-exist", + timeout=0.1, + num_fallback_workers=1, + use_threads=True, ) as source: tobytes(source.chunk(0, 100).raw_data) @@ -128,6 +134,7 @@ def test_xrootd(): "root://eospublic.cern.ch//eos/root-eos/cms_opendata_2012_nanoaod/Run2012B_DoubleMuParked.root", num_workers=1, timeout=10, + use_threads=True, ) as source: one = tobytes(source.chunk(0, 100).raw_data) assert len(one) == 100 @@ -149,6 +156,7 @@ def test_xrootd_worker(): "root://eospublic.cern.ch//eos/root-eos/cms_opendata_2012_nanoaod/Run2012B_DoubleMuParked.root", num_workers=5, timeout=10, + use_threads=True, ) as source: one = tobytes(source.chunk(0, 100).raw_data) assert len(one) == 100 @@ -171,6 +179,7 @@ def test_xrootd_vectorread(): timeout=10, max_num_elements=None, num_workers=1, + use_threads=True, ) as source: one = tobytes(source.chunk(0, 100).raw_data) assert len(one) == 100