-
Notifications
You must be signed in to change notification settings - Fork 0
/
pyspark_airlines.py
26 lines (20 loc) · 1 KB
/
pyspark_airlines.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import pyspark as ps
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
def main():
scSpark = SparkSession.builder.appName("airlines").getOrCreate()
scSpark.conf.set("temporaryGcsBucket","pyspark_output_files")
data_file = "gs://covid19flights_dim_tables/airlines.dat"
schema = StructType([
StructField("index", IntegerType(), True),
StructField("airline_name", StringType(), True),
StructField("alias", StringType(), True),
StructField("airline_iata_code", StringType(), True),
StructField("airline_icao_code", StringType(), True),
StructField("airline_callsign_id", StringType(), True),
StructField("country", StringType(), True),
StructField("active", StringType(), True)])
sdfData = scSpark.read.csv(data_file, header=False, schema=schema, sep=",")
sdfData.write.format("bigquery").option("table","covid19flights:covid19_airtraffic.airlines").mode("overwrite").save()
if __name__ == '__main__':
main()