Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

weather-mv: yielding asset_name from IngestIntoEE step. #483

Merged
merged 2 commits into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions weather_mv/loader_pipeline/ee.py
Original file line number Diff line number Diff line change
Expand Up @@ -754,7 +754,7 @@ def start_ingestion(self, asset_data: AssetData) -> t.Optional[str]:
response_json = json.loads(response.content)
return response_json.get('name')
else: # as a COG based image.
result = ee.data.createAsset({
ee.data.createAsset({
'name': asset_name,
'type': self.ee_asset_type,
'gcs_location': {
Expand All @@ -764,11 +764,11 @@ def start_ingestion(self, asset_data: AssetData) -> t.Optional[str]:
'endTime': asset_data.end_time,
'properties': asset_data.properties,
})
return result.get('id')
return asset_name
elif self.ee_asset_type == 'TABLE': # ingest a feature collection.
self.wait_for_task_queue()
task_id = ee.data.newTaskId(1)[0]
response = ee.data.startTableIngestion(task_id, {
ee.data.startTableIngestion(task_id, {
'name': asset_name,
'sources': [{
'uris': [asset_data.target_path]
Expand All @@ -777,7 +777,7 @@ def start_ingestion(self, asset_data: AssetData) -> t.Optional[str]:
'endTime': asset_data.end_time,
'properties': asset_data.properties
})
return response.get('id')
return asset_name
except ee.EEException as e:
if "Could not parse a valid CRS from the first overview of the GeoTIFF" in repr(e):
logger.info(f"Failed to create asset '{asset_name}' in earth engine: {e}. Moving on...")
Expand All @@ -794,8 +794,8 @@ def start_ingestion(self, asset_data: AssetData) -> t.Optional[str]:
@timeit('IngestIntoEE')
def process(self, asset_data: AssetData) -> t.Iterator[t.Tuple[str, float]]:
"""Uploads an asset into the earth engine."""
asset_id = self.start_ingestion(asset_data)
asset_name = self.start_ingestion(asset_data)
metric.Metrics.counter('Success', 'IngestIntoEE').inc()

asset_start_time = asset_data.start_time
yield asset_id, asset_start_time
yield asset_name, asset_start_time
4 changes: 2 additions & 2 deletions weather_mv/loader_pipeline/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ def process(self, element):
try:
if len(element) == 0:
raise ValueError("time_dict not found.")
(asset_id, asset_start_time), time_dict = element
(asset_name, asset_start_time), time_dict = element
if not isinstance(time_dict, OrderedDict):
raise ValueError("time_dict not found.")

Expand Down Expand Up @@ -178,7 +178,7 @@ def process(self, element):
in zip(time_dict.items(), list(time_dict.items())[1:])
}
logger.info(
f"Step intervals for {uri}:{asset_id} :: {json.dumps(step_intervals, indent=4)}"
f"Step intervals for {uri}:{asset_name} :: {json.dumps(step_intervals, indent=4)}"
)

yield ("custom_metrics", (data_latency_ms / 1000, element_processing_time / 1000))
Expand Down
2 changes: 1 addition & 1 deletion weather_mv/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
packages=find_packages(),
author='Anthromets',
author_email='anthromets-ecmwf@google.com',
version='0.2.31',
version='0.2.32',
url='https://weather-tools.readthedocs.io/en/latest/weather_mv/',
description='A tool to load weather data into BigQuery.',
install_requires=beam_gcp_requirements + base_requirements,
Expand Down
Loading