Skip to content

clojurians-org/simple-datahub

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

36 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

项目说明:

此项目基于LinkedIn Datahub元数据平台二次开发, 进行简化改造及功能扩展

  • [OK] 去除docker依赖
  • [OK] 切换到postgresql数据集
  • 将外部系统初始化脚本集成至gms系统中[分析中]
  • UI中文支持 [部分完成]
  • 提供clojure/haskell 数据库模式脚本功能 [部分完成]
  • 切换MAE/MCE作业成clojure/haskell脚本模式[部分完成]
  • 集成血缘分析haskell(queryparser/hssqlppp)脚本功能 [开发中]
  • 添加调度器airflow集成[分析中]
  • 增加ETLCode实体类型及clojure/haskell脚本 [分析中]
  • 集成数据质量Apache Griffin平台 [分析中]
  • 添加apache-flink实时计算clojure脚本支持[分析中]
  • 切换至typescript + react前端技术重构[分析中]
  • 切换至NIX集群部署模式[分析中]

项目外部系统依赖

  • Kafka 及Kafka schema-registry集群 [实时计算及事件触发]
  • PostgreSQL数据库 [存储功能]
  • ElasticSearch数据库 [搜索功能]
  • Neo4j数据库 [实体关系模型]

软件开发环境

nix软件包管理器

  1. 安装nix => [https://nixos.wiki/wiki/Nix_Installation_Guide]
  2. 添加nixos-19.09 channel并更新
nix-channel --add https://nixos.org/channels/nixos-19.09
nix-channel --update nixos-19.09

运行流程及部署过程

一. 运行流程:

此项目由三部分组成: Kafka-Stream MCE/MAE作业, gms后台服务, datahub-frontend前台服务

  1. 上游ETL脚本推送avro数据至Kafka Topic[MetadataChangeEvent]
  2. MCE作业读取Kafka[MetadataChangeEvent]数据写入gms后台
  3. gms后台写入postgresql, 比较变更前后的值推送Kafka事件[MetadataAuditEvent]
  4. MAE作业读取Kafka[MetadataAuditEvent], 写入数据至ES及neo4j
  5. datahub-frontend前端通过gms元数据服务[postgresql + elasticsearch + neo4j]访问元数据

二. 部署过程

外部系统准备

  • kafka: 创建主题及schema[MetadataChangeEvent, MetadataAuditEvent]
# 1. create topic
kafka-topics.sh --create --if-not-exists --zookeeper 10.132.37.201:2181/monitor --partitions 1 --replication-factor 1 --topic MetadataChangeEvent
kafka-topics.sh --create --if-not-exists --zookeeper 10.132.37.201:2181/monitor --partitions 1 --replication-factor 1 --topic MetadataAuditEvent

# 2. create schema
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data @./nix/MetadataChangeEvent.avsc  http://10.132.37.201:8081/subjects/MetadataChangeEvent-value/versions
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data @./nix/MetadataAuditEvent.avsc  http://10.132.37.201:8081/subjects/MetadataAuditEvent-value/versions
  • pg: 创建数据库及表结构[metadata_aspect]
CREATE USER datahub WITH PASSWORD 'datahub' ;            
CREATE DATABASE datahub ;                                                                                                                            
psql -f nix/init.sql -h 10.132.37.200 -U datahub 
  • es: 创建文档索引结构[corpuserinfodocument, datasetdocument]
curl -H 'Content-Type: Application/JSON' -XPUT 10.132.37.201:9200/corpuserinfodocument --data @./nix/corpuser-index-config.json;
curl -H 'Content-Type: Application/JSON' -XPUT 10.132.37.201:9200/datasetdocument --data @./nix/dataset-index-config.json
curl -H 'Content-Type: Application/JSON' -XPUT 10.132.37.201:9200/datasetdocument/doc/_mapping --data @./nix/dataset-index-mapping.json

程序启动过程

  • 启动gms后台服务[写入 := kafka -> postgresql <> kafka, 服务 := postgresql <> kafka <> elasticsearch <> neo4j]
export EBEAN_DATASOURCE_URL=jdbc:postgresql://10.132.37.200:5432/datahub
export EBEAN_DATASOURCE_USERNAME=datahub
export EBEAN_DATASOURCE_PASSWORD=datahub
export KAFKA_BOOTSTRAP_SERVER=10.132.37.201:9092
export KAFKA_SCHEMAREGISTRY_URL=http://10.132.37.201:8081
export ELASTICSEARCH_HOST=10.132.37.201
export NEO4J_URI=bolt://localhost
export NEO4J_USERNAME=neo4j
export NEO4J_PASSWORD=datahub
./gradlew :gms:war:build
./gradlew :gms:war:JettyRunWar
  • 启动MCE作业[kafka/MetadataChangeEvent -> gms]
export KAFKA_BOOTSTRAP_SERVER=10.132.37.201:9092
export KAFKA_SCHEMAREGISTRY_URL=http://10.132.37.201:8081
export GMS_HOST=10.132.37.200
export GMS_PORT=8080
./gradlew :metadata-jobs:mce-consumer-job:build
./gradlew :metadata-jobs:mce-consumer-job:run
  • 启动MAE作业 [kafka/MetadataAuditEvent -> es <> neo4j]
export KAFKA_BOOTSTRAP_SERVER=10.132.37.201:9092
export KAFKA_SCHEMAREGISTRY_URL=http://10.132.37.201:8081
export ELASTICSEARCH_HOST=10.132.37.201
export NEO4J_URI=bolt://localhost
export NEO4J_USERNAME=neo4j
export NEO4J_PASSWORD=datahub
./gradlew :metadata-jobs:mae-consumer-job:build
./gradlew :metadata-jobs:mae-consumer-job:run
  • 启动datahub-frontend前台服务[gms rest-api -> datahub-frontend]

打开网址 http://10.132.37.201:9001 进行访问

./gradlew :datahub-frontend:build
cd datahub-frontend/run && ./run-local-frontend

项目采集工具:

请确保nix已安装, 可参见[软件开发环境]章节, 切换至contrib/metadata-etl目录
clj命令相关配置文件入口: conf/gms.conf.edn

测试文件列表

文件名类型描述
metadata_sample/demo.datENTITYdataset/user实体文件
metadata_sample/hive_1.sqlETL-SQLHIVE SQL ETL过程
metadata_sample/pg_procedure.sqlETL-SQLPostgresql 存储过程

命令行文件列表

文件名类型描述支持系统
bin/gmscat.cljproducer:clj推送接口数据至GMS存储JSON输入流
bin/dataset_jdbc_generator.cljgenerator:clj查询JDBC数据表模式PostgreSQL/MySQL/Oracle/SQL Server
bin/lineage_hive_generator.hsgenerator:hs解析HIVE SQL生成血缘关系HIVE
bin/gms_dependency_sandbox.cljsandbox:clj启动本地GMS依赖服务Kafka, Kafka Schema-Registry, PostgreSQL, ElasticSearch, Neo4j
bin/gms_mae_conduit.cljjob:cljmae事件作业# 未来合并mce,mae作业
bin/procedure_jdbc_connector.cljfetcher:clj拉取oracle存储过程Oracle

接口文件格式采集

export NIX_PATH=~/.nix-defexpr/channels
cat metadata_sample/demo.dat | bin/gmscat.clj :gmscat/uat

数据集模式采集 [未来标准接口采用管道方式, 方便本地留存及演示数据备份]

  • JDBC查询采集
export NIX_PATH=~/.nix-defexpr/channels
bin/dataset_jdbc_generator.clj :jdbc.ora/edw | bin/gmscat.clj :gmscat/uat
  • HIVE 查询采集
  • KAFKA 查询采集
  • ES 查询采集
  • ClickHouse 查询采集
  • Cassandra 查询采集

血缘分析采集

  • HIVE SQL解析采集 脚本需要传递文件列表 - [可采用ls, find, 或者cat *.list各种形式得到]
export NIX_PATH=~/.nix-defexpr/channels
ls metadata_sample/hive_*.sql | bin/lineage_hive_generator.hs
find . -name "*.sql" | grep hive | bin/lineage_hive_generator.hs
  • ORACLE SQL解析采集

工作流采集

  • airflow

About

simplify linkedin datahub metadata platform

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published