Skip to content

Commit

Permalink
Support for multiple output column UDFs (#1426)
Browse files Browse the repository at this point in the history
## Description

Adds `@register_multioutput_udf()`


- [ ] I have reviewed the [Guidelines for Contributing](CONTRIBUTING.md)
and the [Code of Conduct](CODE_OF_CONDUCT.md).

---------

Co-authored-by: Jamie Broomall <88007022+jamie256@users.noreply.github.com>
  • Loading branch information
richard-rogers and jamie256 authored Dec 20, 2023
1 parent a90e7c7 commit 08e9125
Show file tree
Hide file tree
Showing 4 changed files with 205 additions and 14 deletions.
73 changes: 64 additions & 9 deletions python/examples/experimental/whylogs_UDF_examples.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
},
{
"cell_type": "code",
"execution_count": 2,
"execution_count": null,
"metadata": {
"colab": {
"base_uri": "https://localhost:8080/"
Expand All @@ -58,13 +58,14 @@
"source": [
"## Types of UDF\n",
"\n",
"whylogs supports three kinds of UDFs:\n",
"whylogs supports four kinds of UDFs:\n",
"\n",
"* Dataset UDFs take one or more named columns as input and produce a new column as output.\n",
"* Multioutput UDFs take one or more named columns as input and produce one or more new columns as output.\n",
"* Type UDFs are applied to all columns of a specified type and produce a new column as output.\n",
"* Metric UDFs can be applied to a column specified by name or type, and do not produce a column. Instead, their output is tracked by a whylogs `UdfMetric` instance attached to input column in the dataset profile.\n",
"\n",
"Dataset and type UDFs produce their output columns before whylogs profiles the dataset. Thus the full machinery of whylogs schema specification and segmentation apply to the output columns. The `UdfMetric` has its own submetric schema mechanism to control the statistics tracked for metric UDF output, but since metric UDFs do not create columns they cannot be used for segmentation.\n",
"Dataset, multioutput, and type UDFs produce their output columns before whylogs profiles the dataset. Thus the full machinery of whylogs schema specification and segmentation apply to the output columns. The `UdfMetric` has its own submetric schema mechanism to control the statistics tracked for metric UDF output, but since metric UDFs do not create columns they cannot be used for segmentation.\n",
"\n",
"### Dataset UDFs\n",
"\n",
Expand All @@ -74,6 +75,14 @@
"\n",
"The dataframe or dictionary only contains the columns the UDF is registered to access (see the section on registration below). `DataFrame` inputs may contain multiple rows. Dictionary inputs contain only a single row, but it is presented as a list containing one value. This allows UDFs to be written using the intersection of the `DataFrame` and dictionary/list APIs to handle both cases. Performance-critical UDFs can check the type of input to provide implementations optimized for the specific input type. The returned list or series should contain one value for each input row.\n",
"\n",
"### Multioutput UDFs\n",
"\n",
"The signature for multioutput UDFs is\n",
"\n",
"`f(Union[Dict[str, List], pd.DataFrame]) -> Union[Dict[str, List], pd.DataFrame]`\n",
"\n",
"These are very similar to dataset UDFs. Where dataset UDFs use the UDF's name as the name of their single output column, multioutput UDFs prepend the UDF's name to the names of the columns returned by the UDF.\n",
"\n",
"### Type UDFs\n",
"\n",
"The signature for type UDFs is\n",
Expand Down Expand Up @@ -137,6 +146,28 @@
"* `namespace`, if present, is prepended to the UDF name to help manage UDF name collisions.\n",
"* `schema_name` helps manage collections of UDFs. A UDF can be registered in a specified schema. If omitted, it will be registered to the defualt schema. `udf_schema()` merges the UDFs registered in the requested schemas.\n",
"\n",
"### Multioutput UDFs\n",
"\n",
"The `@register_multioutput_udf` decorator declares multioutput UDFs.\n",
"```\n",
"from whylogs.experimental.core.udf_schema import register_multioutput_udf\n",
"import pandas as pd\n",
"\n",
"@register_multioutput_udf([\"x\"])\n",
"def powers(data: Union[Dict[str, List], pd.DataFrame]) -> Union[Dict[str, List], pd.DataFrame]:\n",
" if isinstance(data, pd.DataFrame):\n",
" result = pd.DataFrame()\n",
" result[\"xx\"] = data[\"x\"] * data[\"x\"]\n",
" result[\"xxx\"] = data[\"x\"] * data[\"x\"] * data[\"x\"]\n",
" return result\n",
" else:\n",
" result = {\"xx\" : [data[\"x\"][0] * data[\"x\"][0]]}\n",
" result[\"xxx\"] = [data[\"x\"][0] * data[\"x\"][0] * data[\"x\"][0]]\n",
" return result\n",
"```\n",
"\n",
"If you log a `DataFrame` (or single row via a dictionary) containing a column named `x`, columns named `powers.xx` and `powers.xxx` containing the squared and cubed input column will be added by applying the `powers()` function before whylogs produces its profile. If any of the input columns is missing, the UDF will not be invoked. While dataset UDFs do not execute if their output column already exists, multioutput UDFs always produce their output columns.\n",
"\n",
"### Type UDFs\n",
"\n",
"The `@register_type_udf` decorator declares type UDFs to be applied to columns of a specified type. Types can be specified as subclass of `whylogs.core.datatypes.DataType` or a plain Python type.\n",
Expand Down Expand Up @@ -228,7 +259,12 @@
"source": [
"import whylogs as why\n",
"from whylogs.core.datatypes import Fractional, String\n",
"from whylogs.experimental.core.udf_schema import register_dataset_udf, register_type_udf, udf_schema\n",
"from whylogs.experimental.core.udf_schema import (\n",
" register_dataset_udf,\n",
" register_multioutput_udf,\n",
" register_type_udf,\n",
" udf_schema\n",
")\n",
"from whylogs.experimental.core.metrics.udf_metric import register_metric_udf\n",
"\n",
"from typing import Any, Dict, List, Union\n",
Expand All @@ -242,6 +278,19 @@
" return [mass / volume for mass, volume in zip(data[\"mass\"], data[\"volume\"])]\n",
"\n",
"\n",
"@register_multioutput_udfs([\"x\"])\n",
"def powers(data: Union[Dict[str, List], pd.DataFrame]) -> Union[Dict[str, List], pd.DataFrame]:\n",
" if isinstance(data, pd.DataFrame):\n",
" result = pd.DataFrame()\n",
" result[\"xx\"] = data[\"x\"] * data[\"x\"]\n",
" result[\"xxx\"] = data[\"x\"] * data[\"x\"] * data[\"x\"]\n",
" return result\n",
" else:\n",
" result = {\"xx\": [data[\"x\"][0] * data[\"x\"][0]]}\n",
" result[\"xxx\"] = [data[\"x\"][0] * data[\"x\"][0] * data[\"x\"][0]]\n",
" return result\n",
"\n",
"\n",
"@register_type_udf(Fractional)\n",
"def square(input: Union[List, pd.Series]) -> Union[List, pd.Series]:\n",
" return [x * x for x in input]\n",
Expand All @@ -252,7 +301,13 @@
" return input.upper()\n",
"\n",
"\n",
"df = pd.DataFrame({\"mass\": [1, 2, 3], \"volume\": [4, 5, 6], \"score\": [1.9, 4.2, 3.1], \"lower\": [\"a\", \"b\", \"c\"]})\n",
"df = pd.DataFrame({\n",
" \"mass\": [1, 2, 3],\n",
" \"volume\": [4, 5, 6],\n",
" \"score\": [1.9, 4.2, 3.1],\n",
" \"lower\": [\"a\", \"b\", \"c\"],\n",
" \"x\": [1, 2, 3]\n",
"})\n",
"schema = udf_schema()\n",
"result = why.log(df, schema=schema)\n",
"result.view().to_pandas()\n"
Expand All @@ -265,7 +320,7 @@
"id": "LVKWYEwkEXd5",
"outputId": "470b8700-b22e-4912-d9e7-037922c1b694"
},
"execution_count": 3,
"execution_count": null,
"outputs": [
{
"output_type": "execute_result",
Expand Down Expand Up @@ -734,7 +789,7 @@
"id": "t_q4gAsyJHhq",
"outputId": "54dae1e6-18b6-4a49-f98d-91063c5986f6"
},
"execution_count": 4,
"execution_count": null,
"outputs": [
{
"output_type": "execute_result",
Expand Down Expand Up @@ -775,7 +830,7 @@
"id": "rPajILVsJtaf",
"outputId": "c20857f4-5dae-4e6a-a283-c09a2036075d"
},
"execution_count": 7,
"execution_count": null,
"outputs": [
{
"output_type": "execute_result",
Expand Down Expand Up @@ -1010,7 +1065,7 @@
"id": "J6R9Aauv4qmW",
"outputId": "b9c6b175-8bd1-4caa-a854-c57820c1509d"
},
"execution_count": 8,
"execution_count": null,
"outputs": [
{
"output_type": "execute_result",
Expand Down
1 change: 1 addition & 0 deletions python/test_notebooks/notebook_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
"LocalStore_with_Constraints.ipynb", # skipped because it has over 4 minutes of thread.sleep in it
"KS_Profiling.ipynb", # skipped because this takes a few minutes to run
"Monitoring_Embeddings.ipynb", # skipped because needs user input
"whylogs_UDF_examples.ipynb", # skipped until multiple output column UDFs released
]


Expand Down
37 changes: 37 additions & 0 deletions python/tests/experimental/core/test_udf_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
UdfSchema,
UdfSpec,
register_dataset_udf,
register_multioutput_udf,
register_type_udf,
udf_schema,
unregister_udf,
Expand Down Expand Up @@ -54,6 +55,42 @@ def test_udf_pandas() -> None:
assert len(data.columns) == 1


@register_multioutput_udf(["xx1", "xx2"])
def f1(x: Union[Dict[str, List], pd.DataFrame]) -> Union[Dict[str, List], pd.DataFrame]:
if isinstance(x, dict):
return {"foo": [x["xx1"][0]], "bar": [x["xx2"][0]]}
else:
return pd.DataFrame({"foo": x["xx1"], "bar": x["xx2"]})


@register_multioutput_udf(["xx1", "xx2"], prefix="blah")
def f2(x: Union[Dict[str, List], pd.DataFrame]) -> Union[Dict[str, List], pd.DataFrame]:
if isinstance(x, dict):
return {"foo": [x["xx1"][0]], "bar": [x["xx2"][0]]}
else:
return pd.DataFrame({"foo": x["xx1"], "bar": x["xx2"]})


def test_multioutput_udf_row() -> None:
schema = udf_schema()
row = {"xx1": 42, "xx2": 3.14}
results = why.log(row, schema=schema).view()
assert results.get_column("f1.foo") is not None
assert results.get_column("f1.bar") is not None
assert results.get_column("blah.foo") is not None
assert results.get_column("blah.bar") is not None


def test_multioutput_udf_dataframe() -> None:
schema = udf_schema()
df = pd.DataFrame({"xx1": [42, 7], "xx2": [3.14, 2.72]})
results = why.log(df, schema=schema).view()
assert results.get_column("f1.foo") is not None
assert results.get_column("f1.bar") is not None
assert results.get_column("blah.foo") is not None
assert results.get_column("blah.bar") is not None


@register_dataset_udf(["col1"], schema_name="unit-tests")
def add5(x: Union[Dict[str, List], pd.DataFrame]) -> Union[List, pd.Series]:
return [xx + 5 for xx in x["col1"]]
Expand Down
108 changes: 103 additions & 5 deletions python/whylogs/experimental/core/udf_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ class UdfSpec:
default_factory=dict
) # new column name -> callable to compute new column value
column_type: Optional[DataType] = None
prefix: Optional[str] = None

# for multiple output column UDFs
udf: Optional[Callable[[Any], Any]] = None
name: Optional[str] = None

def __post_init__(self):
if self.column_type is not None:
Expand All @@ -67,6 +72,7 @@ def __post_init__(self):
def _apply_udfs_on_row(
values: Union[List, Dict[str, List]], udfs: Dict, new_columns: Dict[str, Any], input_cols: Collection[str]
) -> None:
"""multiple input columns, single output column"""
for new_col, udf in udfs.items():
if new_col in input_cols:
continue
Expand All @@ -78,9 +84,33 @@ def _apply_udfs_on_row(
logger.exception(f"Evaluating UDF {new_col} failed")


def _apply_udf_on_row(
name: str,
prefix: Optional[str],
values: Union[List, Dict[str, List]],
udf: Callable,
new_columns: Dict[str, Any],
input_cols: Collection[str],
) -> None:
"""
multiple input columns, multiple output columns
udf(Union[Dict[str, List], pd.DataFrame]) -> Union[Dict[str, List], pd.DataFrame]
"""

try:
# TODO: Document assumption: dictionary in -> dictionary out
for new_col, value in udf(values).items():
new_col = prefix + "." + new_col if prefix else new_col
new_columns[new_col] = value[0]

except Exception as e: # noqa
logger.exception(f"Evaluating UDF {name} failed with error {e}")


def _apply_udfs_on_dataframe(
pandas: pd.DataFrame, udfs: Dict, new_df: pd.DataFrame, input_cols: Collection[str]
) -> None:
"""multiple input columns, single output column"""
for new_col, udf in udfs.items():
if new_col in input_cols:
continue
Expand All @@ -92,6 +122,30 @@ def _apply_udfs_on_dataframe(
logger.exception(f"Evaluating UDF {new_col} failed on columns {pandas.keys()} with error {e}")


def _apply_udf_on_dataframe(
name: str,
prefix: Optional[str],
pandas: pd.DataFrame,
udf: Callable,
new_df: pd.DataFrame,
input_cols: Collection[str],
) -> None:
"""
multiple input columns, multiple output columns
udf(Union[Dict[str, List], pd.DataFrame]) -> Union[Dict[str, List], pd.DataFrame]
"""

try:
# TODO: I think it's OKAY if udf returns a dictionary
udf_output = pd.DataFrame(udf(pandas))
udf_output = udf_output.rename(columns={old: prefix + "." + old for old in udf_output.keys()}) # type: ignore
for new_col in udf_output.keys():
new_df[new_col] = udf_output[new_col]
except Exception as e: # noqa
logger.exception(f"Evaluating UDF {name} failed on columns {pandas.keys()} with error {e}")
return pd.DataFrame()


def _apply_type_udfs(pandas: pd.Series, udfs: Dict, new_df: pd.DataFrame, input_cols: Collection[str]) -> None:
for new_col, udf in udfs.items():
if new_col in input_cols:
Expand Down Expand Up @@ -151,7 +205,10 @@ def _run_udfs_on_row(
for spec in self.multicolumn_udfs:
if spec.column_names and set(spec.column_names).issubset(set(row.keys())):
inputs = {col: [row[col]] for col in spec.column_names}
_apply_udfs_on_row(inputs, spec.udfs, new_columns, input_cols)
if spec.udf is not None:
_apply_udf_on_row(spec.name, spec.prefix, inputs, spec.udf, new_columns, input_cols) # type: ignore
else:
_apply_udfs_on_row(inputs, spec.udfs, new_columns, input_cols)

for column, value in row.items():
why_type = type(self.type_mapper(type(value)))
Expand All @@ -162,7 +219,12 @@ def _run_udfs_on_row(
def _run_udfs_on_dataframe(self, pandas: pd.DataFrame, new_df: pd.DataFrame, input_cols: Collection[str]) -> None:
for spec in self.multicolumn_udfs:
if spec.column_names and set(spec.column_names).issubset(set(pandas.keys())):
_apply_udfs_on_dataframe(pandas[spec.column_names], spec.udfs, new_df, input_cols)
if spec.udf is not None:
_apply_udf_on_dataframe(
spec.name, spec.prefix, pandas[spec.column_names], spec.udf, new_df, input_cols # type: ignore
)
else:
_apply_udfs_on_dataframe(pandas[spec.column_names], spec.udfs, new_df, input_cols)

for column, dtype in pandas.dtypes.items():
why_type = type(self.type_mapper(dtype))
Expand All @@ -174,15 +236,15 @@ def _run_udfs(
self, pandas: Optional[pd.DataFrame] = None, row: Optional[Dict[str, Any]] = None
) -> Tuple[Optional[pd.DataFrame], Optional[Mapping[str, Any]]]:
new_columns = deepcopy(row) if row else None
new_df = pd.DataFrame() if pandas is not None else None
new_df = pd.DataFrame()
if row is not None:
self._run_udfs_on_row(row, new_columns, row.keys()) # type: ignore

if pandas is not None:
self._run_udfs_on_dataframe(pandas, new_df, pandas.keys())
new_df = pd.concat([pandas, new_df], axis=1)

return new_df, new_columns
return new_df if pandas is not None else None, new_columns

def apply_udfs(
self, pandas: Optional[pd.DataFrame] = None, row: Optional[Dict[str, Any]] = None
Expand All @@ -203,6 +265,42 @@ def _reset_udfs(reset_metric_udfs: bool = True) -> None:
_resolver_specs = defaultdict(list)


def register_multioutput_udf(
col_names: List[str],
udf_name: Optional[str] = None,
prefix: Optional[str] = None,
namespace: Optional[str] = None,
schema_name: str = "",
) -> Callable[[Any], Any]:
"""
Decorator to easily configure UDFs for your data set. Decorate your UDF
functions, then call generate_udf_dataset_schema() to create a UdfSchema
that includes the UDFs configured by your decorator parameters. The decorated
function will automatically be a UDF in the UdfSchema.
Specify udf_name to give the output of the UDF a name. udf_name
defautls to the name of the decorated function. Note that all lambdas are
named "lambda", so omitting udf_name on more than one lambda will result
in name collisions. If you pass a namespace, it will be prepended to the UDF name.
Specifying schema_name will register the UDF in a particular schema. If omitted,
it will be registered to the defualt schema.
For multiple output column UDFs, the udf_name is prepended to the column
name supplied by the UDF. The signature for multiple output column UDFs
is f(Union[Dict[str, List], pd.DataFrame]) -> Union[Dict[str, List], pd.DataFrame]
"""

def decorator_register(func):
global _multicolumn_udfs
name = udf_name or func.__name__
name = f"{namespace}.{name}" if namespace else name
output_prefix = prefix if prefix else name
_multicolumn_udfs[schema_name].append(UdfSpec(col_names, prefix=output_prefix, udf=func, name=name))
return func

return decorator_register


def register_dataset_udf(
col_names: List[str],
udf_name: Optional[str] = None,
Expand Down Expand Up @@ -310,7 +408,7 @@ def generate_udf_specs(
include_default_schema: bool = True,
) -> List[UdfSpec]:
"""
Generates a list UdfSpecs that implement the UDFs specified by the
Generates a list of UdfSpecs that implement the UDFs specified by the
@register_dataset_udf, @register_type_udf, and @register_metric_udf
decorators. You can provide a list of other_udf_specs to include in
addition to those UDFs registered via the decorator.
Expand Down

0 comments on commit 08e9125

Please sign in to comment.