Architect
batch/stream
data processing systems from nyc-tlc-trip-records-data, via the ETLbatch process
: E (extract : tlc-trip-record-data.page -> S3 ) -> T (transform : S3 -> Spark) -> L (load : Spark -> Mysql) &stream process
: Event -> Event digest -> Event storage. The system then can support calculation such asTop Driver By area
,Order by time windiw
,latest-top-driver
, andTop busy areas
.
Batch data : nyc-tlc-trip-records-data
Stream data : TaxiEvent,
stream from file
.
- Tech : Spark, Hadoop, Hive, EMR, S3, MySQL, Kinesis, DynamoDB , Scala, Python, ELK, Kafka
- Batch pipeline : DataLoad -> DataTransform -> CreateView -> SaveToDB -> SaveToHive
- Download batch data : download_sample_data.sh
- Batch data : transactional-data, reference-data -> processed-data -> output-transactions -> output-materializedview
- Stream pipeline : TaxiEvent -> EventLoad -> KafkaEventLoad
- Stream data : taxi-event
Please also check NYC_Taxi_Trip_Duration in case you are interested in the data science projects with similar taxi dataset.
- Architecture idea (Batch):
- Architecture idea (Stream):
├── Dockerfile : Scala spark Dockerfile
├── build.sbt : Scala sbt build file
├── config : configuration files for DB/Kafka/AWS..
├── data : Raw/processed/output data (batch/stream)
├── doc : All repo reference/doc/pic
├── elk : ELK (Elasticsearch, Logstash, Kibana) config/scripts
├── fluentd : Fluentd help scripts
├── kafka : Kafka help scripts
├── pyspark : Legacy pipeline code (Python)
├── requirements.txt
├── script : Help scripts (env/services)
├── src : Batch/stream process scripts (Scala)
└── utility : Help scripts (pipeline)
Prerequisites
-
Install (batch)
- Spark 2.4.3
- Java 1.8.0_11 (java 8)
- Scala 2.11.12
- sbt 1.3.5
- Mysql
- Hive (optional)
- Hadoop (optional)
- Python 3 (optional)
- Pyspark (optional)
-
Install (stream)
- Zoopkeeper
- Kafka
- Elasticsearch 7.6.1
- Kibana 7.6.1
- Logstash 7.6.1
-
Set up
- Run on local:
- n/a
- Run on cloud :
- AWS account and get
key_pair
for access below services:- EMR
- EC2
- S3
- DYNAMODB
- Kinesis
- AWS account and get
- Run on local:
-
Config
- update config with your creds
- update elk-config with your use cases
Quick-Start-Batch-Pipeline-Manually
# STEP 1) Download the dataset
bash script/download_sample_data.sh
# STEP 2) sbt build
sbt compile
sbt assembly
# STEP 3) Load data
spark-submit \
--class DataLoad.LoadReferenceData \
target/scala-2.11/nyc_taxi_pipeline_2.11-1.0.jar
spark-submit \
--class DataLoad.LoadGreenTripData \
target/scala-2.11/nyc_taxi_pipeline_2.11-1.0.jar
spark-submit \
--class DataLoad.LoadYellowTripData \
target/scala-2.11/nyc_taxi_pipeline_2.11-1.0.jar
# STEP 4) Transform data
spark-submit \
--class DataTransform.TransformGreenTaxiData \
target/scala-2.11/nyc_taxi_pipeline_2.11-1.0.jar
spark-submit \
--class DataTransform.TransformYellowTaxiData \
target/scala-2.11/nyc_taxi_pipeline_2.11-1.0.jar
# STEP 5) Create view
spark-submit \
--class CreateView.CreateMaterializedView \
target/scala-2.11/nyc_taxi_pipeline_2.11-1.0.jar
# STEP 6) Save to JDBC (mysql)
spark-submit \
--class SaveToDB.JDBCToMysql \
target/scala-2.11/nyc_taxi_pipeline_2.11-1.0.jar
# STEP 7) Save to Hive
spark-submit \
--class SaveToHive.SaveMaterializedviewToHive \
target/scala-2.11/nyc_taxi_pipeline_2.11-1.0.jar
Quick-Start-Stream-Pipeline-Manually
# STEP 1) sbt build
abt compile
sbt assembly
# STEP 2) Create Taxi event
spark-submit \
--class TaxiEvent.CreateBasicTaxiEvent \
target/scala-2.11/nyc_taxi_pipeline_2.11-1.0.jar
# check the event
curl localhost:44444
# STEP 3) Process Taxi event
spark-submit \
--class EventLoad.SparkStream_demo_LoadTaxiEvent \
target/scala-2.11/nyc_taxi_pipeline_2.11-1.0.jar
# STEP 4) Send Taxi event to Kafaka
# start zookeeper, kafka
brew services start zookeeper
brew services start kafka
# create kafka topic
kafka-topics --create -zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic first_topic
kafka-topics --create -zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic streams-taxi
# curl event to kafka producer
curl localhost:44444 | kafka-console-producer --broker-list 127.0.0.1:9092 --topic first_topic
# STEP 5) Spark process kafka stream
spark-submit \
--class KafkaEventLoad.LoadKafkaEventExample \
target/scala-2.11/nyc_taxi_pipeline_2.11-1.0.jar
# STEP 6) Spark process kafka stream
spark-submit \
--class KafkaEventLoad.LoadTaxiKafkaEventWriteToKafka \
target/scala-2.11/nyc_taxi_pipeline_2.11-1.0.jar
# STEP 7) Run elsacsearch, kibana, logstach
# make sure curl localhost:44444 can get the taxi event
cd ~
kibana-7.6.1-darwin-x86_64/bin/kibana
elasticsearch-7.6.1/bin/elasticsearch
logstash-7.6.1/bin/logstash -f /Users/$USER/NYC_Taxi_Pipeline/elk/logstash/logstash_taxi_event_file.conf
# test insert toy data to logstash
# (logstash config: elk/logstash.conf)
#nc 127.0.0.1 5000 < data/event_sample.json
# then visit kibana UI : localhost:5601
# then visit "management" -> "index_patterns" -> "Create index pattern"
# create new index : logstash-* (not select timestamp as filter)
# then visit the "discover" tag and check the data
Dependency
-
Spark 2.4.3
-
Java 8
-
Apache Hadoop 2.7
-
Jars
TODO
# 1. Tune the main pipeline for large scale data (to process whole nyc-tlc-trip data)
# 2. Add front-end UI (flask to visualize supply & demand and surging price)
# 3. Add test
# 4. Dockerize the project
# 5. Tune the spark batch/stream code
# 6. Tune the kafka, zoopkeeper cluster setting