Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

I want to use Mage pipeline to load data from partitioned object in gcs and export it to bigquery. The object's name is other_data and partitioned by year, month and day. But it seems like a wrong object name when I use the name 'other_data/*/*/*/*' because I've tried to load from specific path 'other_data/year=2024/month=10/day=2/40949e85e0734000910e2c0179278e00-0.parquet' and it worked #558

Open
phuccodetrau opened this issue Oct 29, 2024 · 0 comments

Comments

@phuccodetrau
Copy link

Data loader:
from mage_ai.settings.repo import get_repo_path
from mage_ai.io.config import ConfigFileLoader
from mage_ai.io.google_cloud_storage import GoogleCloudStorage
from os import path
if 'data_loader' not in globals():
from mage_ai.data_preparation.decorators import data_loader
if 'test' not in globals():
from mage_ai.data_preparation.decorators import test

@data_loader
def load_from_google_cloud_storage(*args, **kwargs):
"""
Template for loading data from a Google Cloud Storage bucket.
Specify your configuration settings in 'io_config.yaml'.

Docs: https://docs.mage.ai/design/data-loading#googlecloudstorage
"""
config_path = path.join(get_repo_path(), 'io_config.yaml')
config_profile = 'default'

bucket_name = 'weather_bigdata_20241'
object_key = 'other_data/*/*/*/*'

return GoogleCloudStorage.with_config(ConfigFileLoader(config_path, config_profile)).load(
    bucket_name,
    object_key,
)

Data exporter:
from mage_ai.settings.repo import get_repo_path
from mage_ai.io.bigquery import BigQuery
from mage_ai.io.config import ConfigFileLoader
from pandas import DataFrame
from os import path

if 'data_exporter' not in globals():
from mage_ai.data_preparation.decorators import data_exporter

@data_exporter
def export_data_to_big_query(df: DataFrame, **kwargs) -> None:
"""
Template for exporting data to a BigQuery warehouse.
Specify your configuration settings in 'io_config.yaml'.

Docs: https://docs.mage.ai/design/data-loading#bigquery
"""
table_id = 'strong-ward-437213-j6.bigdata_20241.test_parquet'
config_path = path.join(get_repo_path(), 'io_config.yaml')
config_profile = 'default'

BigQuery.with_config(ConfigFileLoader(config_path, config_profile)).export(
    df,
    table_id,
    if_exists='replace',  # Specify resolution policy if table name already exists
)

Issue:
GoogleCloudStorage initialized

└─ Loading data frame from bucket 'weather_bigdata_20241' at key 'other_data'...

DONE

BigQuery initialized

├─ Connecting to BigQuery warehouse...DONE

└─ Exporting data to table 'strong-ward-437213-j6.bigdata_20241.test_parquet'...


BadRequest Traceback (most recent call last)

File /home/src/magic-zoomcamp/data_exporters/weather_bq.py:23, in export_data_to_big_query(df, **kwargs)

 20 config_path = path.join(get_repo_path(), 'io_config.yaml')

 21 config_profile = 'default'

---> 23 BigQuery.with_config(ConfigFileLoader(config_path, config_profile)).export(

 24     df,

 25     table_id,

 26     if_exists='replace',  # Specify resolution policy if table name already exists

 27 )

File /usr/local/lib/python3.10/site-packages/mage_ai/io/bigquery.py:354, in BigQuery.export(self, df, table_id, database, if_exists, overwrite_types, query_string, verbose, unique_conflict_method, unique_constraints, write_disposition, create_dataset, **configuration_params)

352 if verbose:

353     with self.printer.print_msg(f'Exporting data to table \'{table_id}\''):

--> 354 __process(database=database, write_disposition=write_disposition)

355 else:

356     __process(database=database)

File /usr/local/lib/python3.10/site-packages/mage_ai/io/bigquery.py:343, in BigQuery.export..__process(database, write_disposition)

341     elif if_exists == ExportWritePolicy.FAIL:

342         write_disposition = WriteDisposition.WRITE_EMPTY

--> 343 self.__write_table(

344     df,

345     table_id,

346     overwrite_types=overwrite_types,

347     write_disposition=write_disposition,

348     create_dataset=create_dataset,

349     **configuration_params,

350 )

File /usr/local/lib/python3.10/site-packages/mage_ai/io/bigquery.py:403, in BigQuery.__write_table(self, df, table_id, overwrite_types, create_dataset, **configuration_params)

400 if type(df) is DataFrame:

401     df.columns = df.columns.str.replace(' ', '_')

--> 403 return self.client.load_table_from_dataframe(df, table_id, job_config=config).result()

File /usr/local/lib/python3.10/site-packages/google/cloud/bigquery/job/base.py:952, in _AsyncJob.result(self, retry, timeout)

949     self._begin(retry=retry, timeout=timeout)

951 kwargs = {} if retry is DEFAULT_RETRY else {"retry": retry}

--> 952 return super(_AsyncJob, self).result(timeout=timeout, **kwargs)

File /usr/local/lib/python3.10/site-packages/google/api_core/future/polling.py:261, in PollingFuture.result(self, timeout, retry, polling)

256 self._blocking_poll(timeout=timeout, retry=retry, polling=polling)

258 if self._exception is not None:

259     # pylint: disable=raising-bad-type

260     # Pylint doesn't recognize that this is valid in this case.

--> 261 raise self._exception

263 return self._result

BadRequest: 400 Table test_parquet_eec7e159_f7d0_4966_81c2_355a00ea9287_source does not have a schema.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants
@phuccodetrau and others