Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Specification: In-Memory Registry #96

Open
wants to merge 9 commits into
base: 0.28-affirm
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "feast-affirm"
version = "0.28+affirm147"
version = "0.28+affirm170"
description = "Feast - Affirm"
authors = ["Francisco Arceo", "Ross Briden", "Maks Stachowiak"]
readme = "README.md"
Expand Down
65 changes: 65 additions & 0 deletions sdk/python/feast/data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.

import enum
import copy
import warnings
from abc import ABC, abstractmethod
from datetime import timedelta
Expand Down Expand Up @@ -96,6 +97,14 @@ def to_proto(self) -> DataSourceProto.KafkaOptions:

return kafka_options_proto

def __copy__(self):
return KafkaOptions(
kafka_bootstrap_servers=self.kafka_bootstrap_servers,
message_format=self.message_format,
topic=self.topic,
watermark_delay_threshold=self.watermark_delay_threshold
)


class KinesisOptions:
"""
Expand All @@ -112,6 +121,13 @@ def __init__(
self.region = region
self.stream_name = stream_name

def __copy__(self):
return KinesisOptions(
record_format=copy.copy(self.record_format),
region=self.region,
stream_name=self.stream_name
)

@classmethod
def from_proto(cls, kinesis_options_proto: DataSourceProto.KinesisOptions):
"""
Expand Down Expand Up @@ -438,6 +454,22 @@ def __eq__(self, other):
def __hash__(self):
return super().__hash__()

def __copy__(self):
return KafkaSource(
name=self.name,
field_mapping=dict(self.field_mapping),
kafka_bootstrap_servers=self.kafka_options.kafka_bootstrap_servers,
message_format=self.kafka_options.message_format,
watermark_delay_threshold=self.kafka_options.watermark_delay_threshold,
topic=self.kafka_options.topic,
created_timestamp_column=self.created_timestamp_column,
timestamp_field=self.timestamp_field,
description=self.description,
tags=dict(self.tags),
owner=self.owner,
batch_source=copy.copy(self.batch_source) if self.batch_source else None,
)

@staticmethod
def from_proto(data_source: DataSourceProto):
watermark_delay_threshold = None
Expand Down Expand Up @@ -561,6 +593,15 @@ def __eq__(self, other):
def __hash__(self):
return super().__hash__()

def __copy__(self):
return RequestSource(
name=self.name,
schema=[copy.copy(field) for field in self.schema],
description=self.description,
tags=dict(self.tags),
owner=self.owner
)

@staticmethod
def from_proto(data_source: DataSourceProto):
schema_pb = data_source.request_data_options.schema
Expand Down Expand Up @@ -637,6 +678,21 @@ def from_proto(data_source: DataSourceProto):
else None,
)

def __copy__(self):
return KinesisSource(
name=self.name,
timestamp_field=self.timestamp_field,
field_mapping=dict(self.field_mapping),
record_format=copy.copy(self.kinesis_options.record_format),
region=self.kinesis_options.region,
stream_name=self.kinesis_options.stream_name,
created_timestamp_column=self.created_timestamp_column,
description=self.description,
tags=dict(self.tags),
owner=self.owner,
batch_source=copy.copy(self.batch_source) if self.batch_source else None
)

@staticmethod
def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]:
pass
Expand Down Expand Up @@ -771,6 +827,15 @@ def __eq__(self, other):
def __hash__(self):
return super().__hash__()

def __copy__(self):
return PushSource(
name=self.name,
batch_source=copy.copy(self.batch_source),
description=self.description,
tags=dict(self.tags),
owner=self.owner
)

def validate(self, config: RepoConfig):
pass

Expand Down
13 changes: 13 additions & 0 deletions sdk/python/feast/entity.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,19 @@ def is_valid(self):
if not self.value_type:
raise ValueError(f"The entity {self.name} does not have a type.")

def __copy__(self):
entity = Entity(
name=self.name,
join_keys=[self.join_key],
description=self.description,
tags=dict(self.tags),
owner=self.owner,
value_type=self.value_type
)
entity.created_timestamp = self.created_timestamp
entity.last_updated_timestamp = self.last_updated_timestamp
return entity

@classmethod
def from_proto(cls, entity_proto: EntityProto):
"""
Expand Down
40 changes: 40 additions & 0 deletions sdk/python/feast/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from feast.field import Field



class DataSourceNotFoundException(Exception):
def __init__(self, path):
super().__init__(
Expand Down Expand Up @@ -118,6 +119,13 @@ def __init__(self, name: str, project: str):
)


class DuplicateValidationReference(Exception):
def __init__(self, name: str, project) -> None:
super(DuplicateValidationReference, self).__init__(
f"Duplication validation reference {name} for project {project}."
)


class FeastProviderLoginError(Exception):
"""Error class that indicates a user has not authenticated with their provider."""

Expand Down Expand Up @@ -411,3 +419,35 @@ def __init__(self):
class PushSourceNotFoundException(Exception):
def __init__(self, push_source_name: str):
super().__init__(f"Unable to find push source '{push_source_name}'.")


class RegistryNotBuiltException(Exception):
def __init__(self, registry_name: str) -> None:
super(RegistryNotBuiltException, self).__init__(f"Registry {registry_name} must be built before being queried.")


class EntityNameCollisionException(Exception):
def __init__(self, entity_name: str, project: str) -> None:
super(EntityNameCollisionException, self).__init__(f"Duplicate entity {entity_name} for project {project}.")


class FeatureServiceNameCollisionException(Exception):
def __init__(self, service_name: str, project: str) -> None:
super(FeatureServiceNameCollisionException, self).__init__(
f"Duplicate feature service {service_name} for project {project}."
)


class MissingInfraObjectException(Exception):
def __init__(self, project: str) -> None:
super(MissingInfraObjectException, self).__init__(f"No infra objects found for project {project}.")


class SavedDatasetCollisionException(Exception):
def __init__(self, project: str, name: str) -> None:
super(SavedDatasetCollisionException, self).__init__(f"Duplicated saved dataset {name} for project {project}")


class MissingProjectMetadataException(Exception):
def __init__(self, project: str) -> None:
super(MissingProjectMetadataException, self).__init__(f"No project metadata for project {project}")
7 changes: 7 additions & 0 deletions sdk/python/feast/feature_logging.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import abc
import copy
from typing import TYPE_CHECKING, Dict, Optional, Type, cast

import pyarrow as pa
Expand Down Expand Up @@ -155,6 +156,12 @@ def __init__(self, destination: LoggingDestination, sample_rate: float = 1.0):
self.destination = destination
self.sample_rate = sample_rate

def __copy__(self):
return LoggingConfig(
destination=copy.copy(self.destination),
sample_rate=self.sample_rate
)

@classmethod
def from_proto(cls, config_proto: LoggingConfigProto) -> Optional["LoggingConfig"]:
proto_kind = cast(str, config_proto.WhichOneof("destination"))
Expand Down
19 changes: 19 additions & 0 deletions sdk/python/feast/feature_service.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import copy
from datetime import datetime
from typing import Dict, List, Optional, Union

Expand Down Expand Up @@ -185,6 +186,24 @@ def __eq__(self, other):

return True

def __copy__(self):
fs = FeatureService(
name=self.name,
features=[],
tags=dict(self.tags),
description=self.description,
owner=self.owner,
logging_config=copy.copy(self.logging_config)
)
fs.feature_view_projections.extend(
[
copy.copy(projection) for projection in self.feature_view_projections
]
)
fs.created_timestamp = self.created_timestamp
fs.last_updated_timestamp = self.last_updated_timestamp
return fs

@classmethod
def from_proto(cls, feature_service_proto: FeatureServiceProto):
"""
Expand Down
38 changes: 26 additions & 12 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
from feast.infra.registry.base_registry import BaseRegistry
from feast.infra.registry.registry import Registry
from feast.infra.registry.sql import SqlRegistry
from feast.infra.registry.memory import MemoryRegistry
from feast.on_demand_feature_view import OnDemandFeatureView
from feast.online_response import OnlineResponse
from feast.protos.feast.serving.ServingService_pb2 import (
Expand Down Expand Up @@ -167,16 +168,24 @@ def __init__(
self.repo_path, utils.get_default_yaml_file_path(self.repo_path)
)

self._provider = get_provider(self.config)

# RB: ordering matters here because `apply_total` assumes a constructed `FeatureStore` instance
registry_config = self.config.get_registry_config()
if registry_config.registry_type == "sql":
self._registry = SqlRegistry(registry_config, None, is_feast_apply=is_feast_apply)
elif registry_config.registry_type == "memory":
self._registry = MemoryRegistry(registry_config, repo_path, is_feast_apply=is_feast_apply)

# RB: MemoryRegistry is stateless, meaning we'll need to call `apply` with each new FeatureStore instance
if not is_feast_apply:
from feast.repo_operations import apply_total
apply_total(repo_config=self.config, repo_path=self.repo_path, skip_source_validation=False, store=self)
else:
r = Registry(registry_config, repo_path=self.repo_path)
r._initialize_registry(self.config.project)
self._registry = r

self._provider = get_provider(self.config)

@log_exceptions
def version(self) -> str:
"""Returns the version of the current Feast SDK/CLI."""
Expand Down Expand Up @@ -212,6 +221,10 @@ def refresh_registry(self):
downloaded synchronously, which may increase latencies if the triggering method is get_online_features().
"""
registry_config = self.config.get_registry_config()

# RB: MemoryRegistry is a cache
if registry_config.registry_type == "memory":
return
registry = Registry(registry_config, repo_path=self.repo_path)
registry.refresh(self.config.project)

Expand Down Expand Up @@ -1002,15 +1015,15 @@ def apply(
tables_to_delete: List[FeatureView] = views_to_delete + sfvs_to_delete if not partial else [] # type: ignore
tables_to_keep: List[FeatureView] = views_to_update + sfvs_to_update # type: ignore

self._get_provider().update_infra(
project=self.project,
tables_to_delete=tables_to_delete,
tables_to_keep=tables_to_keep,
entities_to_delete=entities_to_delete if not partial else [],
entities_to_keep=entities_to_update,
partial=partial,
)

if not self.config.ignore_infra_changes:
self._get_provider().update_infra(
project=self.project,
tables_to_delete=tables_to_delete,
tables_to_keep=tables_to_keep,
entities_to_delete=entities_to_delete if not partial else [],
entities_to_keep=entities_to_update,
partial=partial,
)
self._registry.commit()

@log_exceptions_and_usage
Expand All @@ -1023,7 +1036,8 @@ def teardown(self):

entities = self.list_entities()

self._get_provider().teardown_infra(self.project, tables, entities)
if not self.config.ignore_infra_changes:
self._get_provider().teardown_infra(self.project, tables, entities)
self._registry.teardown()

@log_exceptions_and_usage
Expand Down
17 changes: 12 additions & 5 deletions sdk/python/feast/feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,18 +220,25 @@ def __hash__(self):
def __copy__(self):
fv = FeatureView(
name=self.name,
ttl=self.ttl,
source=self.stream_source if self.stream_source else self.batch_source,
schema=self.schema,
tags=self.tags,
description=self.description,
tags=dict(self.tags),
owner=self.owner,
online=self.online,
ttl=self.ttl,
source=copy.copy(self.stream_source if self.stream_source else self.batch_source),
)

# This is deliberately set outside of the FV initialization as we do not have the Entity objects.
fv.entities = self.entities
fv.entities = list(self.entities)
fv.features = copy.copy(self.features)
fv.entity_columns = copy.copy(self.entity_columns)
fv.projection = copy.copy(self.projection)

fv.created_timestamp = self.created_timestamp
fv.last_updated_timestamp = self.last_updated_timestamp

for interval in self.materialization_intervals:
fv.materialization_intervals.append(interval)
return fv

def __eq__(self, other):
Expand Down
12 changes: 11 additions & 1 deletion sdk/python/feast/feature_view_projection.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import copy
from typing import TYPE_CHECKING, Dict, List, Optional

from attr import dataclass
Expand Down Expand Up @@ -38,6 +39,15 @@ class FeatureViewProjection:
def name_to_use(self):
return self.name_alias or self.name

def __copy__(self):
return FeatureViewProjection(
name=self.name,
name_alias=self.name_alias,
desired_features=self.desired_features,
features=copy.copy(self.features),
join_key_map=dict(self.join_key_map)
)

def to_proto(self) -> FeatureViewProjectionProto:
feature_reference_proto = FeatureViewProjectionProto(
feature_view_name=self.name,
Expand Down Expand Up @@ -67,7 +77,7 @@ def from_proto(proto: FeatureViewProjectionProto):
def from_definition(base_feature_view: "BaseFeatureView"):
return FeatureViewProjection(
name=base_feature_view.name,
name_alias=None,
name_alias="",
features=base_feature_view.features,
desired_features=[],
)
Expand Down
Loading