Skip to content

Commit

Permalink
feat: add option to limit max number of processes (#31)
Browse files Browse the repository at this point in the history
* feat: add option to limit max number of processes

* ci: add geocoding cache for tests

* fix: add cache directory creation

* ci: update pre-commit-config

* ci: update refurb language version

* ci: bump pdm version
  • Loading branch information
RaczeQ authored Dec 17, 2024
1 parent c38ce3f commit 9f97305
Show file tree
Hide file tree
Showing 8 changed files with 109 additions and 6 deletions.
6 changes: 3 additions & 3 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ repos:
- id: mypy
additional_dependencies: ['types-requests', 'types-six']
- repo: https://github.com/pdm-project/pdm
rev: 2.17.3
rev: 2.22.0
hooks:
- id: pdm-lock-check
- id: pdm-export
Expand All @@ -39,7 +39,7 @@ repos:
hooks:
- id: nbstripout
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.6.0
rev: v5.0.0
hooks:
- id: trailing-whitespace
- id: end-of-file-fixer
Expand All @@ -55,7 +55,7 @@ repos:
- id: refurb
args: ["--python-version", "3.9", "--format", "github"]
language: python
language_version: python3.10
language_version: python3.12
stages: [manual]
- repo: https://github.com/FHPythonUtils/LicenseCheck
rev: "2024.2"
Expand Down
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- Option to pass max number of workers for downloading the data [#30](https://github.com/kraina-ai/overturemaestro/issues/30)

## [0.1.1] - 2024-11-24

### Changed
Expand Down
16 changes: 13 additions & 3 deletions overturemaestro/_parquet_multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ def map_parquet_dataset(
filesystem: Optional["fs.FileSystem"] = None,
report_progress_as_text: bool = True,
verbosity_mode: VERBOSITY_MODE = "transient",
max_workers: Optional[int] = None,
) -> None:
"""
Apply a function over parquet dataset in a multiprocessing environment.
Expand All @@ -143,6 +144,8 @@ def map_parquet_dataset(
verbosity mode. Can be one of: silent, transient and verbose. Silent disables
output completely. Transient tracks progress, but removes output after finished.
Verbose leaves all progress outputs in the stdout. Defaults to "transient".
max_workers: (Optional[int], optional): Max number of multiprocessing workers used to
process the dataset. Defaults to None.
"""
with TrackProgressSpinner(
"Preparing multiprocessing environment", verbosity_mode=verbosity_mode
Expand All @@ -163,14 +166,21 @@ def map_parquet_dataset(
dataset = pq.ParquetDataset(dataset_path, filesystem=filesystem)

no_cpus = multiprocessing.cpu_count()

min_no_workers = 32 if no_cpus >= 8 else 16
no_workers = min(
no_scan_workers = min(
max(min_no_workers, no_cpus + 4), 64
) # minimum 16 / 32 workers, but not more than 64

no_processing_workers = no_cpus

if max_workers:
no_scan_workers = min(max_workers, no_scan_workers)
no_processing_workers = min(max_workers, no_processing_workers)

with TrackProgressBar(verbosity_mode=verbosity_mode) as progress:
total_files = len(dataset.files)
with ProcessPoolExecutor(max_workers=min(no_workers, total_files)) as ex:
with ProcessPoolExecutor(max_workers=min(no_scan_workers, total_files)) as ex:
fn = partial(_read_row_group_number, filesystem=dataset.filesystem)
row_group_numbers = list(
progress.track(
Expand Down Expand Up @@ -202,7 +212,7 @@ def map_parquet_dataset(
dataset.filesystem,
),
)
for _ in range(min(no_cpus, total))
for _ in range(min(no_processing_workers, total))
]
with TrackProgressBar(verbosity_mode=verbosity_mode) as progress_bar:
progress_bar.add_task(description=progress_description, total=total)
Expand Down
19 changes: 19 additions & 0 deletions overturemaestro/data_downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def download_data_for_multiple_types(
ignore_cache: bool = False,
working_directory: Union[str, Path] = "files",
verbosity_mode: "VERBOSITY_MODE" = "transient",
max_workers: Optional[int] = None,
) -> list[Path]: ...


Expand All @@ -44,6 +45,7 @@ def download_data_for_multiple_types(
ignore_cache: bool = False,
working_directory: Union[str, Path] = "files",
verbosity_mode: "VERBOSITY_MODE" = "transient",
max_workers: Optional[int] = None,
) -> list[Path]: ...


Expand All @@ -56,6 +58,7 @@ def download_data_for_multiple_types(
ignore_cache: bool = False,
working_directory: Union[str, Path] = "files",
verbosity_mode: "VERBOSITY_MODE" = "transient",
max_workers: Optional[int] = None,
) -> list[Path]: ...


Expand All @@ -67,6 +70,7 @@ def download_data_for_multiple_types(
ignore_cache: bool = False,
working_directory: Union[str, Path] = "files",
verbosity_mode: "VERBOSITY_MODE" = "transient",
max_workers: Optional[int] = None,
) -> list[Path]:
"""
Downloads the data for the given release for multiple types.
Expand All @@ -84,6 +88,8 @@ def download_data_for_multiple_types(
verbosity mode. Can be one of: silent, transient and verbose. Silent disables
output completely. Transient tracks progress, but removes output after finished.
Verbose leaves all progress outputs in the stdout. Defaults to "transient".
max_workers: (Optional[int], optional): Max number of multiprocessing workers used to
process the dataset. Defaults to None.
Returns:
list[Path]: List of saved Geoparquet files paths.
Expand All @@ -97,6 +103,7 @@ def download_data_for_multiple_types(
ignore_cache=ignore_cache,
working_directory=working_directory,
verbosity_mode=verbosity_mode,
max_workers=max_workers,
)
for theme_value, type_value in theme_type_pairs
]
Expand All @@ -113,6 +120,7 @@ def download_data(
ignore_cache: bool = False,
working_directory: Union[str, Path] = "files",
verbosity_mode: "VERBOSITY_MODE" = "transient",
max_workers: Optional[int] = None,
) -> Path: ...


Expand All @@ -128,6 +136,7 @@ def download_data(
ignore_cache: bool = False,
working_directory: Union[str, Path] = "files",
verbosity_mode: "VERBOSITY_MODE" = "transient",
max_workers: Optional[int] = None,
) -> Path: ...


Expand All @@ -143,6 +152,7 @@ def download_data(
ignore_cache: bool = False,
working_directory: Union[str, Path] = "files",
verbosity_mode: "VERBOSITY_MODE" = "transient",
max_workers: Optional[int] = None,
) -> Path: ...


Expand All @@ -157,6 +167,7 @@ def download_data(
ignore_cache: bool = False,
working_directory: Union[str, Path] = "files",
verbosity_mode: "VERBOSITY_MODE" = "transient",
max_workers: Optional[int] = None,
) -> Path:
"""
Downloads the data for the given release.
Expand All @@ -180,6 +191,8 @@ def download_data(
verbosity mode. Can be one of: silent, transient and verbose. Silent disables
output completely. Transient tracks progress, but removes output after finished.
Verbose leaves all progress outputs in the stdout. Defaults to "transient".
max_workers: (Optional[int], optional): Max number of multiprocessing workers used to
process the dataset. Defaults to None.
Returns:
Path: Saved Geoparquet file path.
Expand Down Expand Up @@ -224,6 +237,7 @@ def download_data(
result_file_path=result_file_path,
work_directory=tmp_dir_path,
verbosity_mode=verbosity_mode,
max_workers=max_workers,
)
return result_file_path

Expand All @@ -237,6 +251,7 @@ def _download_data(
result_file_path: Path,
work_directory: Path,
verbosity_mode: "VERBOSITY_MODE",
max_workers: Optional[int],
) -> None:
import time
from concurrent.futures import ProcessPoolExecutor
Expand Down Expand Up @@ -279,6 +294,9 @@ def _download_data(
max(min_no_workers, multiprocessing.cpu_count() + 4), 32
) # minimum 8 workers, but not more than 32

if max_workers:
no_workers = min(max_workers, no_workers)

with TrackProgressBar(verbosity_mode=verbosity_mode) as progress:
total_row_groups = len(row_groups_to_download)
fn = partial(
Expand Down Expand Up @@ -314,6 +332,7 @@ def _download_data(
progress_description=f"Filtering data by geometry ({theme}/{type})",
report_progress_as_text=False,
verbosity_mode=verbosity_mode,
max_workers=max_workers,
)

filtered_parquet_files = list(destination_path.glob("*.parquet"))
Expand Down
Loading

0 comments on commit 9f97305

Please sign in to comment.