Skip to content

Commit

Permalink
Merge branch 'master' into ing-732-app-ingestion
Browse files Browse the repository at this point in the history
  • Loading branch information
treff7es authored Oct 19, 2024
2 parents f6df316 + 8f7f2c1 commit 67f80d0
Show file tree
Hide file tree
Showing 27 changed files with 509 additions and 187 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,13 @@
TableSchemaMetadataValue,
)
from dagster._core.execution.stats import RunStepKeyStatsSnapshot, StepEventStatus
from dagster._core.snap import JobSnapshot

try:
from dagster._core.snap import JobSnapshot # type: ignore[attr-defined]
except ImportError:
# Import changed since Dagster 1.8.12 to this -> https://github.com/dagster-io/dagster/commit/29a37d1f0260cfd112849633d1096ffc916d6c95
from dagster._core.snap import JobSnap as JobSnapshot

from dagster._core.snap.node import OpDefSnap
from dagster._core.storage.dagster_run import DagsterRun, DagsterRunStatsSnapshot
from datahub.api.entities.datajob import DataFlow, DataJob
Expand Down
28 changes: 15 additions & 13 deletions metadata-ingestion/docs/transformer/dataset_transformer.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,13 @@ transformers:
```
## Simple Add Dataset ownership
### Config Details
| Field | Required | Type | Default | Description |
|--------------------|----------|--------------|-------------|---------------------------------------------------------------------|
| `owner_urns` | ✅ | list[string] | | List of owner urns. |
| `ownership_type` | | string | "DATAOWNER" | ownership type of the owners (either as enum or ownership type urn) |
| `replace_existing` | | boolean | `false` | Whether to remove ownership from entity sent by ingestion source. |
| `semantics` | | enum | `OVERWRITE` | Whether to OVERWRITE or PATCH the entity present on DataHub GMS. |
| Field | Required | Type | Default | Description |
|--------------------|----------|--------------|-------------|------------------------------------------------------------------------------------------------------------|
| `owner_urns` | ✅ | list[string] | | List of owner urns. |
| `ownership_type` | | string | "DATAOWNER" | ownership type of the owners (either as enum or ownership type urn) |
| `replace_existing` | | boolean | `false` | Whether to remove ownership from entity sent by ingestion source. |
| `semantics` | | enum | `OVERWRITE` | Whether to OVERWRITE or PATCH the entity present on DataHub GMS. |
| `on_conflict` | | enum | `DO_UPDATE` | Whether to make changes if domains already exist. If set to DO_NOTHING, `semantics` setting is irrelevant. |

For transformer behaviour on `replace_existing` and `semantics`, please refer section [Relationship Between replace_existing And semantics](#relationship-between-replace_existing-and-semantics).

Expand Down Expand Up @@ -191,13 +192,14 @@ transformers:

## Pattern Add Dataset ownership
### Config Details
| Field | Required | Type | Default | Description |
|--------------------|----------|----------------------|-------------|-----------------------------------------------------------------------------------------|
| `owner_pattern` | ✅ | map[regx, list[urn]] | | entity urn with regular expression and list of owners urn apply to matching entity urn. |
| `ownership_type` | | string | "DATAOWNER" | ownership type of the owners (either as enum or ownership type urn) |
| `replace_existing` | | boolean | `false` | Whether to remove owners from entity sent by ingestion source. |
| `semantics` | | enum | `OVERWRITE` | Whether to OVERWRITE or PATCH the entity present on DataHub GMS. |
| `is_container` | | bool | `false` | Whether to also consider a container or not. If true, then ownership will be attached to both the dataset and its container. |
| Field | Required | Type | Default | Description |
|--------------------|----------|----------------------|-------------|------------------------------------------------------------------------------------------------------------------------------|
| `owner_pattern` | ✅ | map[regx, list[urn]] | | entity urn with regular expression and list of owners urn apply to matching entity urn. |
| `ownership_type` | | string | "DATAOWNER" | ownership type of the owners (either as enum or ownership type urn) |
| `replace_existing` | | boolean | `false` | Whether to remove owners from entity sent by ingestion source. |
| `semantics` | | enum | `OVERWRITE` | Whether to OVERWRITE or PATCH the entity present on DataHub GMS. |
| `is_container` | | bool | `false` | Whether to also consider a container or not. If true, then ownership will be attached to both the dataset and its container. |
| `on_conflict` | | enum | `DO_UPDATE` | Whether to make changes if domains already exist. If set to DO_NOTHING, `semantics` setting is irrelevant. |

let’s suppose we’d like to append a series of users who we know to own a different dataset from a data source but aren't detected during normal ingestion. To do so, we can use the `pattern_add_dataset_ownership` module that’s included in the ingestion framework. This will match the pattern to `urn` of the dataset and assign the respective owners.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,17 +85,34 @@ def scroll_urns_by_filter(
self,
entity_type: str,
extra_or_filters: List[Dict[str, str]],
extra_and_filters: List[Dict[str, str]] = [],
) -> Iterable[str]:
"""
Scroll through all urns that match the given filters
"""

key_aspect = self.ENTITY_KEY_ASPECT_MAP.get(entity_type)
assert key_aspect, f"No key aspect found for entity type {entity_type}"
if extra_or_filters and extra_and_filters:
raise ValueError(
"Only one of extra_or_filters and extra_and_filters should be provided"
)

count = 1000
query = " OR ".join(
[f"{filter['field']}:{filter['value']}" for filter in extra_or_filters]
query = (
" OR ".join(
[
f"{filter['field']}:\"{filter['value']}\""
for filter in extra_or_filters
]
)
if extra_or_filters
else " AND ".join(
[
f"{filter['field']}:\"{filter['value']}\""
for filter in extra_and_filters
]
)
)
scroll_id = None
while True:
Expand Down Expand Up @@ -252,3 +269,23 @@ def search_by_key(

def delete(self, graph_client: DataHubGraph, hard: bool = True) -> None:
graph_client.delete_entity(str(PlatformResourceUrn(self.id)), hard=hard)

@staticmethod
def search_by_filters(
graph_client: DataHubGraph,
and_filters: List[Dict[str, str]] = [],
or_filters: List[Dict[str, str]] = [],
) -> Iterable["PlatformResource"]:
if and_filters and or_filters:
raise ValueError(
"Only one of and_filters and or_filters should be provided"
)
openapi_client = OpenAPIGraphClient(graph_client)
for urn in openapi_client.scroll_urns_by_filter(
entity_type="platformResource",
extra_or_filters=or_filters if or_filters else [],
extra_and_filters=and_filters if and_filters else [],
):
platform_resource = PlatformResource.from_datahub(graph_client, urn)
if platform_resource:
yield platform_resource
1 change: 1 addition & 0 deletions metadata-ingestion/src/datahub/ingestion/graph/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,7 @@ def get_tags(self, entity_urn: str) -> Optional[GlobalTagsClass]:
def get_glossary_terms(self, entity_urn: str) -> Optional[GlossaryTermsClass]:
return self.get_aspect(entity_urn=entity_urn, aspect_type=GlossaryTermsClass)

@functools.lru_cache(maxsize=1)
def get_domain(self, entity_urn: str) -> Optional[DomainsClass]:
return self.get_aspect(entity_urn=entity_urn, aspect_type=DomainsClass)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1989,6 +1989,11 @@ def _translate_dbt_name_to_upstream_urn(dbt_name: str) -> str:
time=mce_builder.get_sys_time(),
actor=_DEFAULT_ACTOR,
)
sibling_urn = node.get_urn(
self.config.target_platform,
self.config.env,
self.config.target_platform_instance,
)
return UpstreamLineageClass(
upstreams=[
UpstreamClass(
Expand All @@ -1997,6 +2002,7 @@ def _translate_dbt_name_to_upstream_urn(dbt_name: str) -> str:
auditStamp=auditStamp,
)
for upstream in upstream_urns
if not (node.node_type == "model" and upstream == sibling_urn)
],
fineGrainedLineages=(
(cll or None) if self.config.include_column_lineage else None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@
PlatformDetail,
)
from datahub.ingestion.source.fivetran.data_classes import Connector, Job
from datahub.ingestion.source.fivetran.fivetran_log_api import FivetranLogAPI
from datahub.ingestion.source.fivetran.fivetran_log_api import (
MAX_JOBS_PER_CONNECTOR,
FivetranLogAPI,
)
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StaleEntityRemovalHandler,
)
Expand Down Expand Up @@ -72,11 +75,6 @@ def __init__(self, config: FivetranSourceConfig, ctx: PipelineContext):

self.audit_log = FivetranLogAPI(self.config.fivetran_log_config)

# Create and register the stateful ingestion use-case handler.
self.stale_entity_removal_handler = StaleEntityRemovalHandler.create(
self, self.config, self.ctx
)

def _extend_lineage(self, connector: Connector, datajob: DataJob) -> None:
input_dataset_urn_list: List[DatasetUrn] = []
output_dataset_urn_list: List[DatasetUrn] = []
Expand Down Expand Up @@ -267,6 +265,13 @@ def _get_connector_workunits(
).as_workunit(is_primary_source=False)

# Map Fivetran's job/sync history entity with Datahub's data process entity
if len(connector.jobs) >= MAX_JOBS_PER_CONNECTOR:
self.report.warning(
title="Not all sync history was captured",
message=f"The connector had more than {MAX_JOBS_PER_CONNECTOR} sync runs in the past {self.config.history_sync_lookback_period} days. "
f"Only the most recent {MAX_JOBS_PER_CONNECTOR} syncs were ingested.",
context=f"{connector.connector_name} (connector_id: {connector.connector_id})",
)
for job in connector.jobs:
dpi = self._generate_dpi_from_job(job, datajob)
yield from self._get_dpi_workunits(job, dpi)
Expand All @@ -279,7 +284,9 @@ def create(cls, config_dict: dict, ctx: PipelineContext) -> Source:
def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
return [
*super().get_workunit_processors(),
self.stale_entity_removal_handler.workunit_processor,
StaleEntityRemovalHandler.create(
self, self.config, self.ctx
).workunit_processor,
]

def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@

logger: logging.Logger = logging.getLogger(__name__)

# We don't want to generate a massive number of dataProcesses for a single connector.
# This is primarily used as a safeguard to prevent performance issues.
MAX_JOBS_PER_CONNECTOR = 1000


class FivetranLogAPI:
def __init__(self, fivetran_log_config: FivetranLogConfig) -> None:
Expand Down Expand Up @@ -158,34 +162,32 @@ def _get_table_lineage(

return table_lineage_list

def _get_all_connector_sync_logs(self, syncs_interval: int) -> Dict[str, Dict]:
sync_logs = {}
for row in self._query(
self.fivetran_log_query.get_sync_logs_query().format(
db_clause=self.fivetran_log_query.db_clause,
syncs_interval=syncs_interval,
)
):
if row[Constant.CONNECTOR_ID] not in sync_logs:
sync_logs[row[Constant.CONNECTOR_ID]] = {
row[Constant.SYNC_ID]: {
row["message_event"]: (
row[Constant.TIME_STAMP].timestamp(),
row[Constant.MESSAGE_DATA],
)
}
}
elif row[Constant.SYNC_ID] not in sync_logs[row[Constant.CONNECTOR_ID]]:
sync_logs[row[Constant.CONNECTOR_ID]][row[Constant.SYNC_ID]] = {
row["message_event"]: (
row[Constant.TIME_STAMP].timestamp(),
row[Constant.MESSAGE_DATA],
)
}
else:
sync_logs[row[Constant.CONNECTOR_ID]][row[Constant.SYNC_ID]][
row["message_event"]
] = (row[Constant.TIME_STAMP].timestamp(), row[Constant.MESSAGE_DATA])
def _get_all_connector_sync_logs(
self, syncs_interval: int, connector_ids: List[str]
) -> Dict[str, Dict[str, Dict[str, Tuple[float, Optional[str]]]]]:
sync_logs: Dict[str, Dict[str, Dict[str, Tuple[float, Optional[str]]]]] = {}

# Format connector_ids as a comma-separated string of quoted IDs
formatted_connector_ids = ", ".join(f"'{id}'" for id in connector_ids)

query = self.fivetran_log_query.get_sync_logs_query().format(
db_clause=self.fivetran_log_query.db_clause,
syncs_interval=syncs_interval,
max_jobs_per_connector=MAX_JOBS_PER_CONNECTOR,
connector_ids=formatted_connector_ids,
)

for row in self._query(query):
connector_id = row[Constant.CONNECTOR_ID]
sync_id = row[Constant.SYNC_ID]

if connector_id not in sync_logs:
sync_logs[connector_id] = {}

sync_logs[connector_id][sync_id] = {
"sync_start": (row["start_time"].timestamp(), None),
"sync_end": (row["end_time"].timestamp(), row["end_message_data"]),
}

return sync_logs

Expand Down Expand Up @@ -244,7 +246,10 @@ def _fill_connectors_table_lineage(self, connectors: List[Connector]) -> None:
def _fill_connectors_jobs(
self, connectors: List[Connector], syncs_interval: int
) -> None:
sync_logs = self._get_all_connector_sync_logs(syncs_interval)
connector_ids = [connector.connector_id for connector in connectors]
sync_logs = self._get_all_connector_sync_logs(
syncs_interval, connector_ids=connector_ids
)
for connector in connectors:
connector.jobs = self._get_jobs_list(sync_logs.get(connector.connector_id))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,32 @@ def get_users_query(self) -> str:

def get_sync_logs_query(self) -> str:
return """
SELECT connector_id,
sync_id,
message_event,
message_data,
time_stamp
FROM {db_clause}log
WHERE message_event in ('sync_start', 'sync_end')
and time_stamp > CURRENT_TIMESTAMP - INTERVAL '{syncs_interval} days'"""
WITH ranked_syncs AS (
SELECT
connector_id,
sync_id,
MAX(CASE WHEN message_event = 'sync_start' THEN time_stamp END) as start_time,
MAX(CASE WHEN message_event = 'sync_end' THEN time_stamp END) as end_time,
MAX(CASE WHEN message_event = 'sync_end' THEN message_data END) as end_message_data,
ROW_NUMBER() OVER (PARTITION BY connector_id ORDER BY MAX(time_stamp) DESC) as rn
FROM {db_clause}log
WHERE message_event in ('sync_start', 'sync_end')
AND time_stamp > CURRENT_TIMESTAMP - INTERVAL '{syncs_interval} days'
AND connector_id IN ({connector_ids})
GROUP BY connector_id, sync_id
)
SELECT
connector_id,
sync_id,
start_time,
end_time,
end_message_data
FROM ranked_syncs
WHERE rn <= {max_jobs_per_connector}
AND start_time IS NOT NULL
AND end_time IS NOT NULL
ORDER BY connector_id, end_time DESC
"""

def get_table_lineage_query(self) -> str:
return f"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -929,7 +929,6 @@ def from_api( # noqa: C901
reporter: SourceReport,
source_config: LookerDashboardSourceConfig,
) -> Optional["LookerExplore"]: # noqa: C901
from datahub.ingestion.source.looker.lookml_source import _BASE_PROJECT_NAME

try:
explore = client.lookml_model_explore(model, explore_name)
Expand Down Expand Up @@ -1194,7 +1193,6 @@ def _to_metadata_events( # noqa: C901
) -> Optional[List[Union[MetadataChangeEvent, MetadataChangeProposalWrapper]]]:
# We only generate MCE-s for explores that contain from clauses and do NOT contain joins
# All other explores (passthrough explores and joins) end in correct resolution of lineage, and don't need additional nodes in the graph.
from datahub.ingestion.source.looker.lookml_source import _BASE_PROJECT_NAME

dataset_snapshot = DatasetSnapshot(
urn=self.get_explore_urn(config),
Expand All @@ -1207,15 +1205,19 @@ def _to_metadata_events( # noqa: C901
dataset_snapshot.aspects.append(browse_paths)
dataset_snapshot.aspects.append(StatusClass(removed=False))

custom_properties = {}
if self.label is not None:
custom_properties["looker.explore.label"] = str(self.label)
if self.source_file is not None:
custom_properties["looker.explore.file"] = str(self.source_file)
custom_properties = {
"project": self.project_name,
"model": self.model_name,
"looker.explore.label": self.label,
"looker.explore.name": self.name,
"looker.explore.file": self.source_file,
}
dataset_props = DatasetPropertiesClass(
name=str(self.label) if self.label else LookerUtil._display_name(self.name),
description=self.description,
customProperties=custom_properties,
customProperties={
k: str(v) for k, v in custom_properties.items() if v is not None
},
)
dataset_props.externalUrl = self._get_url(base_url)

Expand Down
Loading

0 comments on commit 67f80d0

Please sign in to comment.