diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index b98b1dd..e19f33c 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -16,7 +16,7 @@ jobs: arrow-version: [''] include: - python-version: 3.x - arrow-version: '--pre ' + arrow-version: '--pre --only-binary :all: ' steps: - uses: actions/checkout@v4 - uses: actions/setup-python@v5 diff --git a/graphique/core.py b/graphique/core.py index 3676811..a5d0e7e 100644 --- a/graphique/core.py +++ b/graphique/core.py @@ -421,13 +421,13 @@ def group(self, *names: str, counts: str = '', **funcs: Sequence[Agg]) -> pa.Tab """ prefix = 'hash_' if names else '' aggs = [agg.astuple(prefix + func) for func in funcs for agg in funcs[func]] - columns = list(names) + [agg[0] for agg in aggs] + columns = { + name: pc.field(name).cast(Column.scalar_type(self.schema.field(name))) + for name in list(names) + [agg[0] for agg in aggs] + } if counts: aggs.append(([], 'hash_count_all', None, counts)) - if isinstance(self, pa.Table): - decl = Declaration('table_source', self.unify_dictionaries()) - else: - decl = Declaration.scan(self, columns=columns) + decl = Declaration.scan(ds.dataset(self) if isinstance(self, pa.Table) else self, columns) return decl.aggregate(aggs, names).to_table(use_threads=False) def aggregate(self, counts: str = '', **funcs: Sequence[Agg]) -> dict: diff --git a/graphique/interface.py b/graphique/interface.py index 98bc9d9..dc3eaf4 100644 --- a/graphique/interface.py +++ b/graphique/interface.py @@ -249,8 +249,7 @@ def group( table, aggs = self.table, dict(aggregate) refs = {agg.name for values in aggs.values() for agg in values} fragments = set(T.fragment_keys(self.table)) - dicts = (pa.types.is_dictionary(table.schema.field(name).type) for name in set(by) | refs) - if isinstance(table, ds.Scanner) or any(dicts): + if isinstance(table, ds.Scanner): table = self.select(info) if fragments and set(by) <= fragments: if set(by) == fragments: