Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

weather-dl now uses gsutil cp for file upload. #265

Merged
merged 13 commits into from
Dec 6, 2022
2 changes: 1 addition & 1 deletion environment.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
name: weather-tools
channels:
- conda-forge
- defaults
dependencies:
- python=3.8.13
- apache-beam=2.40.0
Expand Down Expand Up @@ -30,6 +29,7 @@ dependencies:
- pandas=1.5.1
- pip=22.3
- pygrib=2.1.4
- google-cloud-sdk=410.0.0
- pip:
- earthengine-api==0.1.329
- .[test]
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
"requests>=2.24.0",
"google-cloud-firestore",
"urllib3==1.26.5",
"gcloud==0.18.3",
]

weather_mv_requirements = [
Expand Down
18 changes: 5 additions & 13 deletions weather_dl/download_pipeline/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,20 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import apache_beam as beam
import dataclasses
import io
import logging
import shutil
import tempfile
import typing as t
from apache_beam.io.gcp.gcsio import WRITE_CHUNK_SIZE

import apache_beam as beam

from .clients import CLIENTS, Client
from .config import Config
from .manifest import Manifest, NoOpManifest, Location
from .parsers import prepare_target_name
from .config import Config
from .partition import skip_partition
from .stores import Store, FSStore
from .util import retry_with_exponential_backoff
from .util import copy, retry_with_exponential_backoff

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -54,12 +52,6 @@ def __post_init__(self):
if self.store is None:
self.store = FSStore()

@retry_with_exponential_backoff
def upload(self, src: io.FileIO, dest: str) -> None:
"""Upload blob to cloud storage, with retries."""
with self.store.open(dest, 'wb') as dest_:
shutil.copyfileobj(src, dest_, WRITE_CHUNK_SIZE)

@retry_with_exponential_backoff
def retrieve(self, client: Client, dataset: str, selection: t.Dict, dest: str) -> None:
"""Retrieve from download client, with retries."""
Expand All @@ -82,7 +74,7 @@ def fetch_data(self, config: Config, *, worker_name: str = 'default') -> None:
self.retrieve(client, config.dataset, config.selection, temp.name)

logger.info(f'[{worker_name}] Uploading to store for {target!r}.')
self.upload(temp, target)
copy(temp.name, target)

logger.info(f'[{worker_name}] Upload to store complete for {target!r}.')

Expand Down
Loading