From 38b2c7cd60b31cda5149c3edfe80ee83c58f9471 Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Thu, 1 Dec 2022 17:57:48 -0800 Subject: [PATCH 01/12] weather-dl now uses `gsutil cp` for file upload. A partial implementation of #256. My intention here is to see if this can speed up weather-dl requests. --- environment.yml | 1 + setup.py | 1 + weather_dl/download_pipeline/fetcher.py | 18 +- weather_dl/download_pipeline/fetcher_test.py | 329 ++++++++----------- weather_dl/download_pipeline/util.py | 6 + weather_dl/setup.py | 1 + 6 files changed, 159 insertions(+), 197 deletions(-) diff --git a/environment.yml b/environment.yml index 4209f530..1d69c1f1 100644 --- a/environment.yml +++ b/environment.yml @@ -30,6 +30,7 @@ dependencies: - pandas=1.5.1 - pip=22.3 - pygrib=2.1.4 + - gsutil=5.6 - pip: - earthengine-api==0.1.329 - .[test] diff --git a/setup.py b/setup.py index 8092ec1f..3c1c4c37 100644 --- a/setup.py +++ b/setup.py @@ -40,6 +40,7 @@ "requests>=2.24.0", "google-cloud-firestore", "urllib3==1.26.5", + "gsutil==5.6", ] weather_mv_requirements = [ diff --git a/weather_dl/download_pipeline/fetcher.py b/weather_dl/download_pipeline/fetcher.py index 7fb0a861..080e7a35 100644 --- a/weather_dl/download_pipeline/fetcher.py +++ b/weather_dl/download_pipeline/fetcher.py @@ -11,22 +11,20 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import apache_beam as beam import dataclasses -import io import logging -import shutil import tempfile import typing as t -from apache_beam.io.gcp.gcsio import WRITE_CHUNK_SIZE + +import apache_beam as beam from .clients import CLIENTS, Client +from .config import Config from .manifest import Manifest, NoOpManifest, Location from .parsers import prepare_target_name -from .config import Config from .partition import skip_partition from .stores import Store, FSStore -from .util import retry_with_exponential_backoff +from .util import copy, retry_with_exponential_backoff logger = logging.getLogger(__name__) @@ -54,12 +52,6 @@ def __post_init__(self): if self.store is None: self.store = FSStore() - @retry_with_exponential_backoff - def upload(self, src: io.FileIO, dest: str) -> None: - """Upload blob to cloud storage, with retries.""" - with self.store.open(dest, 'wb') as dest_: - shutil.copyfileobj(src, dest_, WRITE_CHUNK_SIZE) - @retry_with_exponential_backoff def retrieve(self, client: Client, dataset: str, selection: t.Dict, dest: str) -> None: """Retrieve from download client, with retries.""" @@ -82,7 +74,7 @@ def fetch_data(self, config: Config, *, worker_name: str = 'default') -> None: self.retrieve(client, config.dataset, config.selection, temp.name) logger.info(f'[{worker_name}] Uploading to store for {target!r}.') - self.upload(temp, target) + copy(temp.name, target) logger.info(f'[{worker_name}] Upload to store complete for {target!r}.') diff --git a/weather_dl/download_pipeline/fetcher_test.py b/weather_dl/download_pipeline/fetcher_test.py index 0108cc3e..83efc8e6 100644 --- a/weather_dl/download_pipeline/fetcher_test.py +++ b/weather_dl/download_pipeline/fetcher_test.py @@ -13,53 +13,15 @@ # limitations under the License. import io -import socket +import os import tempfile -import typing as t import unittest from unittest.mock import patch, ANY +from .config import Config from .fetcher import Fetcher from .manifest import MockManifest, Location -from .stores import InMemoryStore, FSStore -from .config import Config - - -class UploadTest(unittest.TestCase): - def setUp(self) -> None: - self.message = b'the quick brown fox jumped over the lazy dog'.split() - self.store = FSStore() - self.fetcher = Fetcher('fake', store=self.store) - - def test_upload_writes_to_store(self): - with tempfile.NamedTemporaryFile() as src: - src.writelines(self.message) - src.flush() - src.seek(0) - with tempfile.NamedTemporaryFile('wb') as dst: - self.fetcher.upload(src, dst.name) - with open(dst.name, 'rb') as dst1: - self.assertEqual(dst1.readlines()[0], b''.join(self.message)) - - def test_retries_after_socket_timeout_error(self): - class SocketTimeoutStore(InMemoryStore): - count = 0 - - def open(self, filename: str, mode: str = 'r') -> t.IO: - self.count += 1 - raise socket.timeout('Deliberate error.') - - socket_store = SocketTimeoutStore() - - fetcher = Fetcher('fake', store=socket_store) - - with tempfile.NamedTemporaryFile() as src: - src.writelines(self.message) - src.flush() - with tempfile.NamedTemporaryFile('wb') as dst: - with self.assertRaises(socket.timeout): - fetcher.upload(src, dst.name) - self.assertEqual(socket_store.count, 8) +from .stores import InMemoryStore class FetchDataTest(unittest.TestCase): @@ -67,162 +29,161 @@ class FetchDataTest(unittest.TestCase): def setUp(self) -> None: self.dummy_manifest = MockManifest(Location('dummy-manifest')) - @patch('weather_dl.download_pipeline.stores.InMemoryStore.open', return_value=io.StringIO()) - @patch('cdsapi.Client.retrieve') - def test_fetch_data(self, mock_retrieve, mock_gcs_file): - config = Config.from_dict({ - 'parameters': { - 'dataset': 'reanalysis-era5-pressure-levels', - 'partition_keys': ['year', 'month'], - 'target_path': 'gs://weather-dl-unittest/download-{:02d}-{:02d}.nc', - 'api_url': 'https//api-url.com/v1/', - 'api_key': '12345', - }, - 'selection': { - 'features': ['pressure'], - 'month': ['12'], - 'year': ['01'] - } - }) - - fetcher = Fetcher('cds', self.dummy_manifest, InMemoryStore()) - fetcher.fetch_data(config) - - mock_gcs_file.assert_called_with( - 'gs://weather-dl-unittest/download-01-12.nc', - 'wb' - ) - - mock_retrieve.assert_called_with( - 'reanalysis-era5-pressure-levels', - config.selection, - ANY) - - @patch('weather_dl.download_pipeline.stores.InMemoryStore.open', return_value=io.StringIO()) @patch('cdsapi.Client.retrieve') - def test_fetch_data__manifest__returns_success(self, mock_retrieve, mock_gcs_file): - config = Config.from_dict({ - 'parameters': { - 'dataset': 'reanalysis-era5-pressure-levels', - 'partition_keys': ['year', 'month'], - 'target_path': 'gs://weather-dl-unittest/download-{:02d}-{:02d}.nc', - 'api_url': 'https//api-url.com/v1/', - 'api_key': '12345', - }, - 'selection': { - 'features': ['pressure'], - 'month': ['12'], - 'year': ['01'] - } - }) - - fetcher = Fetcher('cds', self.dummy_manifest, InMemoryStore()) - fetcher.fetch_data(config) - - self.assertDictContainsSubset(dict( - selection=config.selection, - location='gs://weather-dl-unittest/download-01-12.nc', - status='success', - error=None, - user='unknown', - ), list(self.dummy_manifest.records.values())[0]._asdict()) + def test_fetch_data(self, mock_retrieve): + with tempfile.TemporaryDirectory() as tmpdir: + config = Config.from_dict({ + 'parameters': { + 'dataset': 'reanalysis-era5-pressure-levels', + 'partition_keys': ['year', 'month'], + 'target_path': os.path.join(tmpdir, 'download-{:02d}-{:02d}.nc'), + 'api_url': 'https//api-url.com/v1/', + 'api_key': '12345', + }, + 'selection': { + 'features': ['pressure'], + 'month': ['12'], + 'year': ['01'] + } + }) - @patch('cdsapi.Client.retrieve') - def test_fetch_data__manifest__records_retrieve_failure(self, mock_retrieve): - config = Config.from_dict({ - 'parameters': { - 'dataset': 'reanalysis-era5-pressure-levels', - 'partition_keys': ['year', 'month'], - 'target_path': 'gs://weather-dl-unittest/download-{:02d}-{:02d}.nc', - 'api_url': 'https//api-url.com/v1/', - 'api_key': '12345', - }, - 'selection': { - 'features': ['pressure'], - 'month': ['12'], - 'year': ['01'] - } - }) - - error = IOError("We don't have enough permissions to download this.") - mock_retrieve.side_effect = error - - with self.assertRaises(IOError) as e: fetcher = Fetcher('cds', self.dummy_manifest, InMemoryStore()) fetcher.fetch_data(config) - actual = list(self.dummy_manifest.records.values())[0]._asdict() + self.assertTrue(os.path.exists(os.path.join(tmpdir, 'download-01-12.nc'))) - self.assertDictContainsSubset(dict( - selection=config.selection, - location='gs://weather-dl-unittest/download-01-12.nc', - status='failure', - user='unknown', - ), actual) + mock_retrieve.assert_called_with( + 'reanalysis-era5-pressure-levels', + config.selection, + ANY) - self.assertIn(error.args[0], actual['error']) - self.assertIn(error.args[0], e.exception.args[0]) - - @patch('weather_dl.download_pipeline.stores.InMemoryStore.open', return_value=io.StringIO()) @patch('cdsapi.Client.retrieve') - def test_fetch_data__manifest__records_gcs_failure(self, mock_retrieve, mock_gcs_file): - config = Config.from_dict({ - 'parameters': { - 'dataset': 'reanalysis-era5-pressure-levels', - 'partition_keys': ['year', 'month'], - 'target_path': 'gs://weather-dl-unittest/download-{:02d}-{:02d}.nc', - 'api_url': 'https//api-url.com/v1/', - 'api_key': '12345', - }, - 'selection': { - 'features': ['pressure'], - 'month': ['12'], - 'year': ['01'] - } - }) - - error = IOError("Can't open gcs file.") - mock_gcs_file.side_effect = error - - with self.assertRaises(IOError) as e: + def test_fetch_data__manifest__returns_success(self, mock_retrieve): + with tempfile.TemporaryDirectory() as tmpdir: + config = Config.from_dict({ + 'parameters': { + 'dataset': 'reanalysis-era5-pressure-levels', + 'partition_keys': ['year', 'month'], + 'target_path': os.path.join(tmpdir, 'download-{:02d}-{:02d}.nc'), + 'api_url': 'https//api-url.com/v1/', + 'api_key': '12345', + }, + 'selection': { + 'features': ['pressure'], + 'month': ['12'], + 'year': ['01'] + } + }) + fetcher = Fetcher('cds', self.dummy_manifest, InMemoryStore()) fetcher.fetch_data(config) - actual = list(self.dummy_manifest.records.values())[0]._asdict() - self.assertDictContainsSubset(dict( - selection=config.selection, - location='gs://weather-dl-unittest/download-01-12.nc', - status='failure', - user='unknown', - ), actual) + self.assertDictContainsSubset(dict( + selection=config.selection, + location=os.path.join(tmpdir, 'download-01-12.nc'), + status='success', + error=None, + user='unknown', + ), list(self.dummy_manifest.records.values())[0]._asdict()) - self.assertIn(error.args[0], actual['error']) - self.assertIn(error.args[0], e.exception.args[0]) + @patch('cdsapi.Client.retrieve') + def test_fetch_data__manifest__records_retrieve_failure(self, mock_retrieve): + with tempfile.TemporaryDirectory() as tmpdir: + config = Config.from_dict({ + 'parameters': { + 'dataset': 'reanalysis-era5-pressure-levels', + 'partition_keys': ['year', 'month'], + 'target_path': os.path.join(tmpdir, 'download-{:02d}-{:02d}.nc'), + 'api_url': 'https//api-url.com/v1/', + 'api_key': '12345', + }, + 'selection': { + 'features': ['pressure'], + 'month': ['12'], + 'year': ['01'] + } + }) + + error = IOError("We don't have enough permissions to download this.") + mock_retrieve.side_effect = error + + with self.assertRaises(IOError) as e: + fetcher = Fetcher('cds', self.dummy_manifest, InMemoryStore()) + fetcher.fetch_data(config) + + actual = list(self.dummy_manifest.records.values())[0]._asdict() + + self.assertDictContainsSubset(dict( + selection=config.selection, + location=os.path.join(tmpdir, 'download-01-12.nc'), + status='failure', + user='unknown', + ), actual) + + self.assertIn(error.args[0], actual['error']) + self.assertIn(error.args[0], e.exception.args[0]) + + @patch('cdsapi.Client.retrieve') + def test_fetch_data__manifest__records_gcs_failure(self, mock_retrieve): + with tempfile.TemporaryDirectory() as tmpdir: + config = Config.from_dict({ + 'parameters': { + 'dataset': 'reanalysis-era5-pressure-levels', + 'partition_keys': ['year', 'month'], + 'target_path': os.path.join(tmpdir, 'download-{:02d}-{:02d}.nc'), + 'api_url': 'https//api-url.com/v1/', + 'api_key': '12345', + }, + 'selection': { + 'features': ['pressure'], + 'month': ['12'], + 'year': ['01'] + } + }) + + error = IOError("Can't open gcs file.") + mock_retrieve.side_effect = error + + with self.assertRaises(IOError) as e: + fetcher = Fetcher('cds', self.dummy_manifest, InMemoryStore()) + fetcher.fetch_data(config) + + actual = list(self.dummy_manifest.records.values())[0]._asdict() + self.assertDictContainsSubset(dict( + selection=config.selection, + location=os.path.join(tmpdir, 'download-01-12.nc'), + status='failure', + user='unknown', + ), actual) + + self.assertIn(error.args[0], actual['error']) + self.assertIn(error.args[0], e.exception.args[0]) @patch('weather_dl.download_pipeline.stores.InMemoryStore.open', return_value=io.StringIO()) @patch('cdsapi.Client.retrieve') def test_fetch_data__skips_existing_download(self, mock_retrieve, mock_gcs_file): - config = Config.from_dict({ - 'parameters': { - 'dataset': 'reanalysis-era5-pressure-levels', - 'partition_keys': ['year', 'month'], - 'target_path': 'gs://weather-dl-unittest/download-{year:02d}-{month:02d}.nc', - 'api_url': 'https//api-url.com/v1/', - 'api_key': '12345', - }, - 'selection': { - 'features': ['pressure'], - 'month': ['12'], - 'year': ['01'] - } - }) - - # target file already exists in store... - store = InMemoryStore() - store.store['gs://weather-dl-unittest/download-01-12.nc'] = '' - - fetcher = Fetcher('cds', self.dummy_manifest, store) - fetcher.fetch_data(config) - - self.assertFalse(mock_gcs_file.called) - self.assertFalse(mock_retrieve.called) + with tempfile.TemporaryDirectory() as tmpdir: + config = Config.from_dict({ + 'parameters': { + 'dataset': 'reanalysis-era5-pressure-levels', + 'partition_keys': ['year', 'month'], + 'target_path': os.path.join(tmpdir, 'download-{:02d}-{:02d}.nc'), + 'api_url': 'https//api-url.com/v1/', + 'api_key': '12345', + }, + 'selection': { + 'features': ['pressure'], + 'month': ['12'], + 'year': ['01'] + } + }) + + # target file already exists in store... + store = InMemoryStore() + store.store[os.path.join(tmpdir, 'download-01-12.nc')] = '' + + fetcher = Fetcher('cds', self.dummy_manifest, store) + fetcher.fetch_data(config) + + self.assertFalse(mock_gcs_file.called) + self.assertFalse(mock_retrieve.called) diff --git a/weather_dl/download_pipeline/util.py b/weather_dl/download_pipeline/util.py index c47650b8..7bbe8590 100644 --- a/weather_dl/download_pipeline/util.py +++ b/weather_dl/download_pipeline/util.py @@ -13,6 +13,7 @@ # limitations under the License. import itertools import socket +import subprocess import sys import typing as t @@ -58,3 +59,8 @@ def ichunked(iterable: t.Iterable, n: int) -> t.Iterator[t.Iterable]: yield itertools.chain([first], it) except StopIteration: pass + + +def copy(src: str, dst: str) -> None: + """Copy data via `gsutil cp`.""" + subprocess.run(f'gsutil cp {src!r} {dst!r}', shell=True, check=True) diff --git a/weather_dl/setup.py b/weather_dl/setup.py index 92e76aa8..8d35866c 100644 --- a/weather_dl/setup.py +++ b/weather_dl/setup.py @@ -40,6 +40,7 @@ "requests>=2.24.0", "urllib3==1.26.5", "google-cloud-firestore==2.6.0", + "gsutil==5.6" ] setup( From d59b3432b00c78899a68420f5b1028efd2e5dc6f Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Thu, 1 Dec 2022 18:10:22 -0800 Subject: [PATCH 02/12] Temporary: no gsutil version. --- environment.yml | 2 +- setup.py | 2 +- weather_dl/setup.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/environment.yml b/environment.yml index 1d69c1f1..020c31fa 100644 --- a/environment.yml +++ b/environment.yml @@ -30,7 +30,7 @@ dependencies: - pandas=1.5.1 - pip=22.3 - pygrib=2.1.4 - - gsutil=5.6 + - gsutil - pip: - earthengine-api==0.1.329 - .[test] diff --git a/setup.py b/setup.py index 3c1c4c37..ade0e030 100644 --- a/setup.py +++ b/setup.py @@ -40,7 +40,7 @@ "requests>=2.24.0", "google-cloud-firestore", "urllib3==1.26.5", - "gsutil==5.6", + "gsutil", ] weather_mv_requirements = [ diff --git a/weather_dl/setup.py b/weather_dl/setup.py index 8d35866c..9d3252c3 100644 --- a/weather_dl/setup.py +++ b/weather_dl/setup.py @@ -40,7 +40,7 @@ "requests>=2.24.0", "urllib3==1.26.5", "google-cloud-firestore==2.6.0", - "gsutil==5.6" + "gsutil" ] setup( From 2d0fa7c02bb19feb09eb5226d477cc38c86ada6a Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Thu, 1 Dec 2022 18:10:39 -0800 Subject: [PATCH 03/12] Bump weather-dl version. --- weather_dl/setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/weather_dl/setup.py b/weather_dl/setup.py index 9d3252c3..469e1043 100644 --- a/weather_dl/setup.py +++ b/weather_dl/setup.py @@ -46,7 +46,7 @@ setup( name='download_pipeline', packages=find_packages(), - version='0.1.10', + version='0.1.11', author='Anthromets', author_email='anthromets-ecmwf@google.com', url='https://weather-tools.readthedocs.io/en/latest/weather_dl/', From 40b056de59254dfbcefe9e2d22192a3498f4b4d4 Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Thu, 1 Dec 2022 23:03:26 -0800 Subject: [PATCH 04/12] pinning gsutil version. --- environment.yml | 2 +- setup.py | 2 +- weather_dl/setup.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/environment.yml b/environment.yml index 020c31fa..d7b2aac5 100644 --- a/environment.yml +++ b/environment.yml @@ -30,7 +30,7 @@ dependencies: - pandas=1.5.1 - pip=22.3 - pygrib=2.1.4 - - gsutil + - gsutil=4.61 - pip: - earthengine-api==0.1.329 - .[test] diff --git a/setup.py b/setup.py index ade0e030..b2deecf2 100644 --- a/setup.py +++ b/setup.py @@ -40,7 +40,7 @@ "requests>=2.24.0", "google-cloud-firestore", "urllib3==1.26.5", - "gsutil", + "gsutil==4.61", ] weather_mv_requirements = [ diff --git a/weather_dl/setup.py b/weather_dl/setup.py index 469e1043..152b728c 100644 --- a/weather_dl/setup.py +++ b/weather_dl/setup.py @@ -40,7 +40,7 @@ "requests>=2.24.0", "urllib3==1.26.5", "google-cloud-firestore==2.6.0", - "gsutil" + "gsutil==4.61" ] setup( From 6731c24618d8d15f4b6608a75ef81a47ca24f8bb Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Fri, 2 Dec 2022 12:18:40 -0800 Subject: [PATCH 05/12] Use gcloud alpha storage cp, which is even faster :) --- weather_dl/download_pipeline/util.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/weather_dl/download_pipeline/util.py b/weather_dl/download_pipeline/util.py index 7bbe8590..84f300cb 100644 --- a/weather_dl/download_pipeline/util.py +++ b/weather_dl/download_pipeline/util.py @@ -62,5 +62,9 @@ def ichunked(iterable: t.Iterable, n: int) -> t.Iterator[t.Iterable]: def copy(src: str, dst: str) -> None: - """Copy data via `gsutil cp`.""" - subprocess.run(f'gsutil cp {src!r} {dst!r}', shell=True, check=True) + """Copy data via `gcloud storage cp`. + + Here, we take advantage of GCP's faster storage transfer: + https://cloud.google.com/blog/products/storage-data-transfer/new-gcloud-storage-enables-super-fast-data-transfers/ + """ + subprocess.run(f'gcloud alpha storage cp {src!r} {dst!r}', shell=True, check=True) From 35b9a69d7469de4dea5e0bbe51cae600a6d866cc Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Fri, 2 Dec 2022 12:26:11 -0800 Subject: [PATCH 06/12] Set up gcloud sdk, accounting for runtime auth issue. --- environment.yml | 3 +-- setup.py | 2 +- weather_dl/setup.py | 3 ++- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/environment.yml b/environment.yml index d7b2aac5..e160134c 100644 --- a/environment.yml +++ b/environment.yml @@ -1,7 +1,6 @@ name: weather-tools channels: - conda-forge - - defaults dependencies: - python=3.8.13 - apache-beam=2.40.0 @@ -30,7 +29,7 @@ dependencies: - pandas=1.5.1 - pip=22.3 - pygrib=2.1.4 - - gsutil=4.61 + - google-cloud-sdk=410.0.0 - pip: - earthengine-api==0.1.329 - .[test] diff --git a/setup.py b/setup.py index b2deecf2..bd3b1111 100644 --- a/setup.py +++ b/setup.py @@ -40,7 +40,7 @@ "requests>=2.24.0", "google-cloud-firestore", "urllib3==1.26.5", - "gsutil==4.61", + "gcloud==0.18.3", ] weather_mv_requirements = [ diff --git a/weather_dl/setup.py b/weather_dl/setup.py index 152b728c..e7159e4e 100644 --- a/weather_dl/setup.py +++ b/weather_dl/setup.py @@ -40,7 +40,8 @@ "requests>=2.24.0", "urllib3==1.26.5", "google-cloud-firestore==2.6.0", - "gsutil==4.61" + # "gcloud" should already be installed in the host image. + # If we install it here, we'll hit auth issues. ] setup( From b129eb01bb31b2ac20adceb2055556521baf905c Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Fri, 2 Dec 2022 15:32:07 -0800 Subject: [PATCH 07/12] Added error handling to the subprocess call for copying. Co-authored-by: Rahul Mahrsee <86819420+mahrsee1997@users.noreply.github.com> --- weather_dl/download_pipeline/util.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/weather_dl/download_pipeline/util.py b/weather_dl/download_pipeline/util.py index 84f300cb..dbcff3f3 100644 --- a/weather_dl/download_pipeline/util.py +++ b/weather_dl/download_pipeline/util.py @@ -67,4 +67,8 @@ def copy(src: str, dst: str) -> None: Here, we take advantage of GCP's faster storage transfer: https://cloud.google.com/blog/products/storage-data-transfer/new-gcloud-storage-enables-super-fast-data-transfers/ """ - subprocess.run(f'gcloud alpha storage cp {src!r} {dst!r}', shell=True, check=True) + try: + subprocess.run(f'gcloud alpha storage cp {src!r} {dst!r}', shell=True, check=True, capture_output=True) + except subprocess.CalledProcessError as e: + logger.error(f'Failed to copy file {src!r} to {dst!r} due to {e.stderr.decode("utf-8")}') + raise From d355924ebbd15247336749886329382090be92b9 Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Fri, 2 Dec 2022 15:37:20 -0800 Subject: [PATCH 08/12] fix: added import. --- weather_dl/download_pipeline/util.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/weather_dl/download_pipeline/util.py b/weather_dl/download_pipeline/util.py index dbcff3f3..5f5eeb24 100644 --- a/weather_dl/download_pipeline/util.py +++ b/weather_dl/download_pipeline/util.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import itertools +import logging import socket import subprocess import sys @@ -19,6 +20,8 @@ from apache_beam.utils import retry +logger = logging.getLogger(__name__) + def _retry_if_valid_input_but_server_or_socket_error_and_timeout_filter(exception) -> bool: if isinstance(exception, socket.timeout): From 914ce351968e9ff5a1504f1a879c34beb3b8ae8f Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Fri, 2 Dec 2022 17:16:29 -0800 Subject: [PATCH 09/12] Changing subprocess invocation to be more secure. Thanks @shoyer. Co-authored-by: Stephan Hoyer --- weather_dl/download_pipeline/util.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/weather_dl/download_pipeline/util.py b/weather_dl/download_pipeline/util.py index 5f5eeb24..d2a18fbd 100644 --- a/weather_dl/download_pipeline/util.py +++ b/weather_dl/download_pipeline/util.py @@ -71,7 +71,7 @@ def copy(src: str, dst: str) -> None: https://cloud.google.com/blog/products/storage-data-transfer/new-gcloud-storage-enables-super-fast-data-transfers/ """ try: - subprocess.run(f'gcloud alpha storage cp {src!r} {dst!r}', shell=True, check=True, capture_output=True) + subprocess.run(['gcloud', 'alpha', 'storage', 'cp', src, dest], check=True, capture_output=True) except subprocess.CalledProcessError as e: logger.error(f'Failed to copy file {src!r} to {dst!r} due to {e.stderr.decode("utf-8")}') raise From 2f75701d104c844688c7c030aac8230b3291e17a Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Fri, 2 Dec 2022 17:20:12 -0800 Subject: [PATCH 10/12] nit: dst, not dest. --- weather_dl/download_pipeline/util.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/weather_dl/download_pipeline/util.py b/weather_dl/download_pipeline/util.py index d2a18fbd..e22df866 100644 --- a/weather_dl/download_pipeline/util.py +++ b/weather_dl/download_pipeline/util.py @@ -71,7 +71,7 @@ def copy(src: str, dst: str) -> None: https://cloud.google.com/blog/products/storage-data-transfer/new-gcloud-storage-enables-super-fast-data-transfers/ """ try: - subprocess.run(['gcloud', 'alpha', 'storage', 'cp', src, dest], check=True, capture_output=True) + subprocess.run(['gcloud', 'alpha', 'storage', 'cp', src, dst], check=True, capture_output=True) except subprocess.CalledProcessError as e: logger.error(f'Failed to copy file {src!r} to {dst!r} due to {e.stderr.decode("utf-8")}') raise From ea0e197782c5c3e972345b6b3369eab34005a75d Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Fri, 2 Dec 2022 17:27:19 -0800 Subject: [PATCH 11/12] nit: remove gcloud pip dependency. --- setup.py | 1 - 1 file changed, 1 deletion(-) diff --git a/setup.py b/setup.py index 8b5e3bfa..5729f5a4 100644 --- a/setup.py +++ b/setup.py @@ -40,7 +40,6 @@ "requests>=2.24.0", "google-cloud-firestore", "urllib3==1.26.5", - "gcloud==0.18.3", ] weather_mv_requirements = [ From 57f78b44e5249eac015436916b512bcbe20538a8 Mon Sep 17 00:00:00 2001 From: Alex Merose Date: Mon, 5 Dec 2022 13:10:06 -0800 Subject: [PATCH 12/12] Using gsutil for now until we upgrade project deps. --- weather_dl/download_pipeline/util.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/weather_dl/download_pipeline/util.py b/weather_dl/download_pipeline/util.py index e22df866..08982013 100644 --- a/weather_dl/download_pipeline/util.py +++ b/weather_dl/download_pipeline/util.py @@ -65,13 +65,9 @@ def ichunked(iterable: t.Iterable, n: int) -> t.Iterator[t.Iterable]: def copy(src: str, dst: str) -> None: - """Copy data via `gcloud storage cp`. - - Here, we take advantage of GCP's faster storage transfer: - https://cloud.google.com/blog/products/storage-data-transfer/new-gcloud-storage-enables-super-fast-data-transfers/ - """ + """Copy data via `gsutil cp`.""" try: - subprocess.run(['gcloud', 'alpha', 'storage', 'cp', src, dst], check=True, capture_output=True) + subprocess.run(['gsutil', 'cp', src, dst], check=True, capture_output=True) except subprocess.CalledProcessError as e: logger.error(f'Failed to copy file {src!r} to {dst!r} due to {e.stderr.decode("utf-8")}') raise