Skip to content

Commit

Permalink
Merge pull request #34 from elementary-data/empty_state_and_metrics
Browse files Browse the repository at this point in the history
Empty state and metrics
  • Loading branch information
oravi authored Nov 3, 2021
2 parents 1d6467b + 5727bb4 commit 490b196
Show file tree
Hide file tree
Showing 9 changed files with 89 additions and 9 deletions.
7 changes: 5 additions & 2 deletions lineage/bigquery_query_history.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,13 @@ def _query_history_table(self, start_date: datetime, end_date: datetime) -> [Que
profile_schema_name=schema_name)

queries.append(query)
self._query_history_stats.update_stats(query_context)

logger.debug("Finished fetching bigquery history job results")

return queries

def properties(self) -> dict:
return {'platform_type': 'bigquery',
'ignore_schema': self._ignore_schema}
query_history_properties = {'platform_type': 'bigquery'}
query_history_properties.update(self._query_history_stats.to_dict())
return query_history_properties
22 changes: 22 additions & 0 deletions lineage/empty_graph_helper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@


class EmptyGraphHelper(object):
def __init__(self, platform_type):
self._platform_type = platform_type

def get_integration_docs_link(self):
if self._platform_type == 'snowflake':
return "https://docs.elementary-data.com/integrations/snowflake"
elif self._platform_type == 'bigquery':
return "https://docs.elementary-data.com/integrations/bigquery"
else:
return "https://docs.elementary-data.com/guides/usage/query-history-1"

def get_help_message(self):
return f"""
We are deeply sorry but unfortunately the result graph is empty.
Please try the following steps to fix the problem -
\t1. Try running again with --ignore-schema=true
\t2. Check that you have sufficient permissions (follow this short guide here - {self.get_integration_docs_link()})
\t3. Join our slack channel here - https://bit.ly/slack-elementary, we promise to help and be nice!
"""
8 changes: 7 additions & 1 deletion lineage/lineage_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,10 @@ def properties(self):
'queries_count': self._queries_count,
'failed_queries': self._failed_queries_count}

def draw_graph(self, should_open_browser: bool = True) -> None:
def draw_graph(self, should_open_browser: bool = True) -> bool:
if len(self._lineage_graph.edges) == 0:
return False

# Visualize the graph
net = Network(height="95%", width="100%", directed=True, heading=self._load_header())
net.from_nx(self._lineage_graph)
Expand All @@ -204,3 +207,6 @@ def draw_graph(self, should_open_browser: bool = True) -> None:
net.save_graph("elementary_lineage.html")
if should_open_browser:
webbrowser.open_new_tab('elementary_lineage.html')

return True

5 changes: 4 additions & 1 deletion lineage/main.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import click

from lineage.dbt_utils import extract_credentials_and_data_from_profiles
from lineage.empty_graph_helper import EmptyGraphHelper
from lineage.tracking import track_cli_start, track_cli_end, track_cli_exception
from lineage.exceptions import ConfigError
from pyfiglet import Figlet
Expand Down Expand Up @@ -155,7 +156,9 @@ def main(start_date: datetime, end_date: datetime, profiles_dir: str, profile_na
f'specify a table name that exists in the database configured in your profiles file.')
lineage_graph.filter_on_table(resolved_table_name, direction, depth)

lineage_graph.draw_graph(should_open_browser=open_browser)
success = lineage_graph.draw_graph(should_open_browser=open_browser)
if not success:
print(EmptyGraphHelper(credentials.type).get_help_message())

track_cli_end(anonymous_tracking, lineage_graph.properties(), query_history.properties())

Expand Down
2 changes: 2 additions & 0 deletions lineage/query_history.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from lineage.bigquery_query import BigQueryQuery
from lineage.exceptions import SerializationError
from lineage.query import Query
from lineage.query_history_stats import QueryHistoryStats
from lineage.snowflake_query import SnowflakeQuery
from lineage.utils import is_flight_mode_on
import json
Expand All @@ -22,6 +23,7 @@ def __init__(self, con, profile_database_name: str, profile_schema_name: str,
self._profile_schema_name = profile_schema_name
self._should_export_query_history = should_export_query_history
self._ignore_schema = ignore_schema
self._query_history_stats = QueryHistoryStats()

def _serialize_query_history(self, queries: [str]) -> None:
if self._should_export_query_history:
Expand Down
25 changes: 25 additions & 0 deletions lineage/query_history_stats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from collections import defaultdict
from lineage.query_context import QueryContext


class QueryHistoryStats(object):
def __init__(self):
self._query_type_stats = defaultdict(lambda: 0)
self._roles = set()
self._users = set()

def update_stats(self, query_context: QueryContext):
if query_context.query_type is not None:
self._query_type_stats[query_context.query_type] += 1

if query_context.user_name is not None:
self._users.add(query_context.user_name)

if query_context.role_name is not None:
self._users.add(query_context.role_name)

def to_dict(self):
query_history_stats_dict = self._query_type_stats.copy()
query_history_stats_dict['user_count'] = len(self._users)
query_history_stats_dict['role_count'] = len(self._roles)
return query_history_stats_dict
10 changes: 7 additions & 3 deletions lineage/snowflake_query_history.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ def _enrich_history_with_view_definitions(self, cursor: SnowflakeCursor, databas
profile_schema_name=schema_name)

view_queries.append(query)
self._query_history_stats.update_stats(query_context)

return view_queries

Expand Down Expand Up @@ -141,13 +142,16 @@ def _query_history_table(self, start_date: datetime, end_date: datetime) -> [Que
profile_schema_name=schema_name)

queries.append(query)
self._query_history_stats.update_stats(query_context)

logger.debug("Finished fetching snowflake history query results")

queries.extend(self._enrich_history_with_view_definitions(cursor, database_name, schema_name))

return queries

def properties(self) -> dict:
return {'platform_type': 'snowflake',
'query_history_source': self.query_history_source,
'ignore_schema': self._ignore_schema}
query_history_properties = {'platform_type': 'snowflake',
'query_history_source': self.query_history_source}
query_history_properties.update(self._query_history_stats.to_dict())
return query_history_properties
2 changes: 1 addition & 1 deletion lineage/tracking.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def track_cli_start(profiles_dir: str, profile_data: dict) -> Optional['Anonymou
try:
anonymous_tracking = AnonymousTracking(profiles_dir, profile_data.get('anonymous_usage_tracking'))
anonymous_tracking.init()
cli_start_properties = dict()
cli_start_properties = {'platform_type': profile_data.get('type')}
cli_start_properties.update(get_run_properties())
anonymous_tracking.send_event('cli-start', properties=cli_start_properties)
return anonymous_tracking
Expand Down
17 changes: 16 additions & 1 deletion lineage/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@
import logging
import sys
from pathlib import Path
from typing import Optional

import click
import pkg_resources

FORMATTER = logging.Formatter("%(asctime)s — %(name)s — %(levelname)s — %(message)s")
LOG_FILE = "edl.log"
Expand Down Expand Up @@ -54,6 +57,15 @@ def is_dbt_installed() -> bool:
return False


def get_package_version() -> Optional[str]:
try:
return pkg_resources.get_distribution('elementary-lineage').version
except Exception:
pass

return None


def get_run_properties() -> dict:

click_context = click.get_current_context()
Expand Down Expand Up @@ -84,4 +96,7 @@ def get_run_properties() -> dict:
'full_table_names': params.get('full_table_names'),
'direction': params.get('direction'),
'depth': params.get('depth'),
'dbt_installed': is_dbt_installed()}
'ignore_schema': params.get('ignore_schema'),
'dbt_installed': is_dbt_installed(),
'version': get_package_version()}

0 comments on commit 490b196

Please sign in to comment.