Skip to content

Commit

Permalink
fix: PR Comments
Browse files Browse the repository at this point in the history
  • Loading branch information
sagar-salvi-apptware committed Oct 28, 2024
1 parent 3e2eae7 commit 195f78c
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,6 @@
"displayName": "Dremio",
"description": "Import Spaces, Sources, Tables and statistics from Dremio.",
"docsUrl": "https://datahubproject.io/docs/metadata-ingestion/",
"recipe": "source:\n type: dremio\n config:\n # Coordinates\n hostname: null\n port: null\n\n #Credentials with personal access token\n authentication_method: PAT\n password: pass\n #Or Credentials with basic auth\n #authentication_method: password\n #username: null\n #password: null\n #tls: true-->https/false-->http\n\n stateful_ingestion:\n enabled: true"
"recipe": "source:\n type: dremio\n config:\n # Coordinates\n hostname: null\n port: null\n #true if https, otherwise false\n tls: true\n\n #Credentials with personal access token\n authentication_method: PAT\n password: pass\n\n #Or Credentials with basic auth\n #authentication_method: password\n #username: null\n #password: null\n\n stateful_ingestion:\n enabled: true"
}
]
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
"""This Module contains controller functions for dremio source"""

__author__ = "Shabbir Mohammed Hussain, Shehroz Abdullah, Hamza Rehman, Jonny Dixon"

import logging
import time
import uuid
Expand Down Expand Up @@ -161,19 +157,31 @@ def __init__(
self.ui_url = ui_url

def get_container_key(
self, name: str, path: Optional[List[str]]
self, name: Optional[str], path: Optional[List[str]]
) -> DremioContainerKey:
key = name
if path:
key = ".".join(path) + "." + name if name else ".".join(path)

return DremioContainerKey(
platform=self.platform,
instance=self.platform_instance,
env=str(self.env),
key="".join(path) + name if path else name,
key=key,
)

def get_container_urn(self, name: str, path: Optional[List[str]]) -> str:
def get_container_urn(
self, name: Optional[str] = None, path: Optional[List[str]] = []
) -> str:
container_key = self.get_container_key(name, path)
return container_key.as_urn()

def get_container_space_urn(self) -> str:
return self.get_container_urn(name="Spaces", path=[])

def get_container_source_urn(self) -> str:
return self.get_container_urn(name="Sources", path=[])

def create_domain_aspect(self) -> Optional[_Aspect]:
if self.domain:
if self.domain.startswith("urn:li:domain:"):
Expand All @@ -199,16 +207,16 @@ def populate_container_mcp(
)
yield mcp.as_workunit()

# Browse Paths V2
browse_paths_v2 = self._create_browse_paths(container)
if browse_paths_v2:
mcp = MetadataChangeProposalWrapper(
entityUrn=container_urn,
aspect=browse_paths_v2,
)
yield mcp.as_workunit()
if not container.path:
browse_paths_v2 = self._create_browse_paths_containers(container)
if browse_paths_v2:
mcp = MetadataChangeProposalWrapper(
entityUrn=container_urn,
aspect=browse_paths_v2,
)
yield mcp.as_workunit()

# Container Class
# Container Class Folders
container_class = self._create_container_class(container)
if container_class:
mcp = MetadataChangeProposalWrapper(
Expand All @@ -217,6 +225,16 @@ def populate_container_mcp(
)
yield mcp.as_workunit()

# Container Class for Spaces and Sources
if not container.path:
container_class = self._create_container_class_containers(container)
if container_class:
mcp = MetadataChangeProposalWrapper(
entityUrn=container_urn,
aspect=container_class,
)
yield mcp.as_workunit()

# Data Platform Instance
data_platform_instance = self._create_data_platform_instance()
if data_platform_instance:
Expand Down Expand Up @@ -278,15 +296,6 @@ def populate_dataset_mcp(
)
yield mcp.as_workunit()

# Browse Paths V2
browse_paths_v2 = self._create_browse_paths(dataset)
if browse_paths_v2:
mcp = MetadataChangeProposalWrapper(
entityUrn=dataset_urn,
aspect=browse_paths_v2,
)
yield mcp.as_workunit()

# Container Class
container_class = self._create_container_class(dataset)
if container_class:
Expand Down Expand Up @@ -380,32 +389,15 @@ def _create_container_properties(
env=self.env,
)

def _create_browse_paths(
self, entity: Union[DremioContainer, DremioDataset]
def _create_browse_paths_containers(
self, entity: DremioContainer
) -> Optional[BrowsePathsV2Class]:
paths = []

if self.platform_instance:
paths.append(
BrowsePathEntryClass(
id=self.platform_instance,
)
)

if entity.path:
for browse_path_level in range(len(entity.path)):
paths.append(
BrowsePathEntryClass(
id=entity.path[browse_path_level],
urn=self.get_container_urn(
name=entity.container_name
if hasattr(entity, "container_name")
else "",
path=entity.path[: browse_path_level + 1],
),
)
)

if entity.subclass == "Dremio Space":
paths.append(BrowsePathEntryClass(id="Spaces"))
elif entity.subclass == "Dremio Source":
paths.append(BrowsePathEntryClass(id="Sources"))
if paths:
return BrowsePathsV2Class(path=paths)
return None
Expand All @@ -414,12 +406,17 @@ def _create_container_class(
self, entity: Union[DremioContainer, DremioDataset]
) -> Optional[ContainerClass]:
if entity.path:
return ContainerClass(
container=self.get_container_urn(
path=entity.path,
name="",
)
)
return ContainerClass(container=self.get_container_urn(path=entity.path))
return None

def _create_container_class_containers(
self, entity: DremioContainer
) -> Optional[ContainerClass]:
if entity.subclass == "Dremio Space":
return ContainerClass(container=self.get_container_space_urn())
elif entity.subclass == "Dremio Source":
return ContainerClass(container=self.get_container_source_urn())

return None

def _create_data_platform_instance(self) -> DataPlatformInstanceClass:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from typing import List, Literal, Optional

import certifi
from pydantic import Field, validator
from pydantic import Field, root_validator, validator

from datahub.configuration.common import AllowDenyPattern, ConfigModel
from datahub.configuration.source_common import (
Expand Down Expand Up @@ -112,15 +112,22 @@ class ProfileConfig(GEProfilingConfig):


class DremioSourceMapping(EnvConfigMixin, PlatformInstanceConfigMixin, ConfigModel):
platform: Optional[str] = Field(
default=None,
platform: str = Field(
description="Source connection made by Dremio (e.g. S3, Snowflake)",
)
source_name: Optional[str] = Field(
default=None,
source_name: str = Field(
description="Alias of platform in Dremio connection",
)

@root_validator
def check_both_fields_present(cls, values):
platform, source_name = values.get("platform"), values.get("source_name")
if not platform or not source_name:
raise ValueError(
"Both 'platform' and 'source_name' must be provided in source_mappings."
)
return values


class DremioSourceConfig(
DremioConnectionConfig,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,3 @@
"""
Dremio source type to Datahub source type.
"""


from typing import Optional


Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
"""This Module contains utility functions for dremio source"""
import itertools
import logging
import re
Expand Down Expand Up @@ -275,11 +274,11 @@ def get_profile_data(self, profiler: DremioProfiler) -> Dict:


class DremioContainer:
subclass: str = "Dremio Container"
container_name: str
location_id: str
path: List[str]
description: Optional[str]
subclass: str

def __init__(
self,
Expand Down Expand Up @@ -373,10 +372,6 @@ def set_datasets(self) -> None:

self.datasets_populated = True

def force_reset_datasets(self) -> None:
self.datasets_populated = False
self.set_datasets()

def get_datasets(self) -> Deque[DremioDataset]:
self.set_datasets()
return self.datasets
Expand Down Expand Up @@ -429,10 +424,6 @@ def set_containers(self) -> None:

self.containers_populated = True

def force_reset_containers(self) -> None:
self.containers_populated = False
self.set_containers()

def get_containers(self) -> Deque:
self.set_containers()
return deque(itertools.chain(self.sources, self.spaces, self.folders))
Expand All @@ -446,10 +437,6 @@ def get_glossary_terms(self) -> Deque[DremioGlossaryTerm]:
self.set_containers()
return self.glossary_terms

def force_set_glossary_terms(self) -> None:
self.force_reset_containers()
self.force_reset_datasets()

def is_valid_query(self, query: Dict[str, Any]) -> bool:
required_fields = [
"job_id",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
"""This module contains the Dremio source class for DataHub ingestion"""
import logging
import re
from collections import defaultdict
Expand Down Expand Up @@ -201,15 +200,10 @@ def _build_source_map(self) -> Dict[str, Dict]:
source_platform_name = source_name

for mapping in self.config.source_mappings or []:
if not mapping.source_name:
continue

if re.search(mapping.source_name, source_type, re.IGNORECASE):
source_platform_name = mapping.source_name.lower()

if not mapping.platform:
continue

datahub_source_type = (
DremioToDataHubSourceTypeMapping.get_datahub_source_type(
source_type
Expand Down Expand Up @@ -281,6 +275,15 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
"""

self.source_map = self._build_source_map()

space_urn = self.dremio_aspects.get_container_space_urn()
container = DremioContainer("Spaces", "", [], self.dremio_catalog.dremio_api)
yield from self.dremio_aspects.populate_container_mcp(space_urn, container)

source_urn = self.dremio_aspects.get_container_source_urn()
container = DremioContainer("Sources", "", [], self.dremio_catalog.dremio_api)
yield from self.dremio_aspects.populate_container_mcp(source_urn, container)

# Process Containers
containers = self.dremio_catalog.get_containers()
for container in containers:
Expand All @@ -293,7 +296,7 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
self.report.num_containers_failed += 1 # Increment failed containers
self.report.report_failure(
"Failed to process Dremio container",
f"Failed to process container {'.'.join(container.path)}.{container.resource_name}: {exc}",
f"Failed to process container {'.'.join(container.path)}.{container.container_name}: {exc}",
)

# Process Datasets
Expand Down

0 comments on commit 195f78c

Please sign in to comment.