Skip to content

Commit

Permalink
Default to threaded aggregation.
Browse files Browse the repository at this point in the history
This means groups do not maintain order, but the performance difference has become noticeable. Functions which require ordering remain unthreaded.
  • Loading branch information
coady committed Aug 17, 2024
1 parent ad7c0cb commit 1000c42
Show file tree
Hide file tree
Showing 5 changed files with 16 additions and 5 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).

## Unreleased
### Changed
* Grouping defaults to parallelized but unordered
* Partitioning supports facet counts and arbitrary functions
* Optimizations for dictionary arrays

## [1.7](https://pypi.org/project/graphique/1.7/) - 2024-07-19
### Changed
Expand Down
9 changes: 7 additions & 2 deletions graphique/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class Agg:

associatives = {'all', 'any', 'first', 'last', 'max', 'min', 'one', 'product', 'sum'}
associatives |= {'count'} # transformed to be associative
ordered = {'first', 'last'}

def __init__(self, name: str, alias: str = '', **options):
self.name = name
Expand Down Expand Up @@ -411,12 +412,15 @@ def runs(self, *names: str, **predicates: tuple) -> tuple:
table = Table.union(scalars, Table.from_offsets(lists, offsets))
return table, Column.diff(offsets)

def group(self, *names: str, counts: str = '', **funcs: Sequence[Agg]) -> pa.Table:
def group(
self, *names: str, counts: str = '', ordered: bool = False, **funcs: Sequence[Agg]
) -> pa.Table:
"""Group by and aggregate.
Args:
*names: columns to group by
counts: alias for optional row counts
ordered: do not use threads
**funcs: aggregate funcs with columns options
"""
prefix = 'hash_' if names else ''
Expand All @@ -428,7 +432,8 @@ def group(self, *names: str, counts: str = '', **funcs: Sequence[Agg]) -> pa.Tab
if counts:
aggs.append(([], 'hash_count_all', None, counts))
decl = Declaration.scan(ds.dataset(self) if isinstance(self, pa.Table) else self, columns)
return decl.aggregate(aggs, names).to_table(use_threads=False)
use_threads = not ordered and Agg.ordered.isdisjoint(funcs)
return decl.aggregate(aggs, names).to_table(use_threads)

def aggregate(self, counts: str = '', **funcs: Sequence[Agg]) -> dict:
"""Return aggregated scalars as a row of data."""
Expand Down
4 changes: 3 additions & 1 deletion graphique/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,13 +232,15 @@ def slice(
@doc_field(
by="column names; empty will aggregate into a single row table",
counts="optionally include counts in an aliased column",
ordered="optinally disable parallelization to maintain ordering",
aggregate="aggregation functions applied to other columns",
)
def group(
self,
info: Info,
by: list[str] = [],
counts: str = '',
ordered: bool = False,
aggregate: HashAggregates = {}, # type: ignore
) -> Self:
"""Return table grouped by columns.
Expand All @@ -263,7 +265,7 @@ def group(
for agg in itertools.chain(*aggs.values()):
agg.name = agg.alias
loaded = isinstance(table, pa.Table)
table = T.group(table, *by, counts=counts, **aggs)
table = T.group(table, *by, counts=counts, ordered=ordered, **aggs)
return type(self)(table if loaded else self.add_metric(info, table, mode='group'))

def fragments(self, info: Info, counts: str = '', aggregate: HashAggregates = {}) -> pa.Table: # type: ignore
Expand Down
2 changes: 1 addition & 1 deletion tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def test_chunks():
assert C.index(array, 'a', start=3) == 4
assert C.index(array, 'b', start=2) == -1
table = pa.table({'col': array})
tbl = T.group(table, 'col', count_distinct=[Agg('col', 'count')])
tbl = T.group(table, 'col', ordered=True, count_distinct=[Agg('col', 'count')])
assert tbl['col'].to_pylist() == list('abc')
assert tbl['count'].to_pylist() == [1] * 3

Expand Down
2 changes: 1 addition & 1 deletion tests/test_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ def test_group(client):
with pytest.raises(ValueError, match="cannot represent"):
client.execute('{ group(by: "state", aggregate: {list: {name: "city"}}) { row { city } } }')
data = client.execute(
"""{ group(by: ["state"], aggregate: {list: {name: "county"}}) { length tables { length
"""{ group(by: ["state"], ordered: true, aggregate: {list: {name: "county"}}) { length tables { length
columns { state { values } county { min max } } }
scan(columns: {list: {valueLength: {name: "county"}}, alias: "c"}) {
column(name: "c") { ... on IntColumn { values } } } } }"""
Expand Down

0 comments on commit 1000c42

Please sign in to comment.