(Zero to Hero)
- Vinay Chaudhari
Before you start,
- Open Google colab or Any IDE and hit following cmd.
- pip install pyspark
**** MAKE PR FOR CONTRIBUTION AND SUGGESTIONS :) ****
import IPython
#agg
x = sqlContext.createDataFrame([("vinay","sunny",100),("deepak","parag",200),("akash","pravin",300)],['from','to','amount'])
y = x.agg({"amount":"avg"})
x.show()
y.show()
+------+------+------+
| from| to|amount|
+------+------+------+
| vinay| sunny| 100|
|deepak| parag| 200|
| akash|pravin| 300|
+------+------+------+
+-----------+
|avg(amount)|
+-----------+
| 200.0|
+-----------+
#alias
from pyspark.sql.functions import col
x = sqlContext.createDataFrame([("vinay","sunny",100),("deepak","parag",200),("akash","pravin",300)],['from','to','amount'])
y = x.alias("transactions")
x.show()
y.select(col("transactions.to")).show()
+------+------+------+
| from| to|amount|
+------+------+------+
| vinay| sunny| 100|
|deepak| parag| 200|
| akash|pravin| 300|
+------+------+------+
+------+
| to|
+------+
| sunny|
| parag|
|pravin|
+------+
#cache
x = sqlContext.createDataFrame([("vinay","sunny",100),("deepak","parag",200),("akash","pravin",300)],['from','to','amount'])
x.cache()
print(x.count()) #first action materializes x in memory
print(x.count()) #later actions avoid IO overhead
3
3
#coalesce
x_rdd = sc.parallelize([("vinay","sunny",100),("deepak","parag",200),("akash","pravin",300)],4)
x = sqlContext.createDataFrame(x_rdd,['from','to','amount'])
y = x.coalesce(numPartitions=1)
print(x.rdd.getNumPartitions())
print(y.rdd.getNumPartitions())
x.show()
y.show()
4
1
+------+------+------+
| from| to|amount|
+------+------+------+
| vinay| sunny| 100|
|deepak| parag| 200|
| akash|pravin| 300|
+------+------+------+
+------+------+------+
| from| to|amount|
+------+------+------+
| vinay| sunny| 100|
|deepak| parag| 200|
| akash|pravin| 300|
+------+------+------+
#collect
x = sqlContext.createDataFrame([("vinay","sunny",100),("deepak","parag",200),("akash","pravin",300)],['from','to','amount'])
y = x.collect() # it creates list of rows.
x.show()
print(y)
+------+------+------+
| from| to|amount|
+------+------+------+
| vinay| sunny| 100|
|deepak| parag| 200|
| akash|pravin| 300|
+------+------+------+
[Row(from='vinay', to='sunny', amount=100), Row(from='deepak', to='parag', amount=200), Row(from='akash', to='pravin', amount=300)]
#columns
x = sqlContext.createDataFrame([("vinay","sunny",100),("deepak","parag",200),("akash","pravin",300)],['from','to','amount'])
y = x.columns
x.show()
print(y)
+------+------+------+
| from| to|amount|
+------+------+------+
| vinay| sunny| 100|
|deepak| parag| 200|
| akash|pravin| 300|
+------+------+------+
['from', 'to', 'amount']
#corr : Calculates the correlation of
# two columns of a DataFrame as a double value.
x = sqlContext.createDataFrame([("vinay","sunny",100,300),("deepak","parag",200,600),("akash","pravin",300,900)], ['from','to','amount','fees'])
y = x.corr(col1="amount",col2="fees")
x.show()
print(y)
+------+------+------+----+
| from| to|amount|fees|
+------+------+------+----+
| vinay| sunny| 100| 300|
|deepak| parag| 200| 600|
| akash|pravin| 300| 900|
+------+------+------+----+
1.0
#count
#Returns the number of rows in this DataFrame.
x = sqlContext.createDataFrame([("vinay","sunny",100),("deepak","parag",200),("akash","pravin",300)],['from','to','amount'])
x.show()
print(x.count())
+------+------+------+
| from| to|amount|
+------+------+------+
| vinay| sunny| 100|
|deepak| parag| 200|
| akash|pravin| 300|
+------+------+------+
3
#cov
#Calculate the sample covariance for the given columns,
#specified by their names, as a double value.
x = sqlContext.createDataFrame([("vinay","sunny",100,300),("deepak","parag",200,600),("akash","pravin",300,900)], ['from','to','amount','fees'])
y = x.cov(col1="amount",col2="fees")
x.show()
print(y)
+------+------+------+----+
| from| to|amount|fees|
+------+------+------+----+
| vinay| sunny| 100| 300|
|deepak| parag| 200| 600|
| akash|pravin| 300| 900|
+------+------+------+----+
30000.0
#crosstab
x = sqlContext.createDataFrame([("vinay","deepak",0.1),("sunny","pratik",0.2),("parag","akash",0.3)], ['from','to','amt'])
y = x.crosstab(col1='from',col2='to')
x.show()
y.show()
+-----+------+---+
| from| to|amt|
+-----+------+---+
|vinay|deepak|0.1|
|sunny|pratik|0.2|
|parag| akash|0.3|
+-----+------+---+
+-------+-----+------+------+
|from_to|akash|deepak|pratik|
+-------+-----+------+------+
| parag| 1| 0| 0|
| vinay| 0| 1| 0|
| sunny| 0| 0| 1|
+-------+-----+------+------+
#cube
# Create a multi-dimensional cube for the current DataFrame using the specified columns,
# so we can run aggregation on them
x = sqlContext.createDataFrame([("vinay","deepak",1),("sunny","pratik",2),("parag","akash",3)], ['from','to','amt'])
y = x.cube('from','to')
x.show()
print(y)
y.sum().show()
y.max().show()
+-----+------+---+
| from| to|amt|
+-----+------+---+
|vinay|deepak| 1|
|sunny|pratik| 2|
|parag| akash| 3|
+-----+------+---+
<pyspark.sql.group.GroupedData object at 0x000001D968C4A320>
+-----+------+--------+
| from| to|sum(amt)|
+-----+------+--------+
| null| akash| 3|
| null| null| 6|
|vinay|deepak| 1|
|vinay| null| 1|
| null|deepak| 1|
|parag| akash| 3|
| null|pratik| 2|
|parag| null| 3|
|sunny| null| 2|
|sunny|pratik| 2|
+-----+------+--------+
+-----+------+--------+
| from| to|max(amt)|
+-----+------+--------+
| null| akash| 3|
| null| null| 3|
|vinay|deepak| 1|
|vinay| null| 1|
| null|deepak| 1|
|parag| akash| 3|
| null|pratik| 2|
|parag| null| 3|
|sunny| null| 2|
|sunny|pratik| 2|
+-----+------+--------+
# Describe
x = sqlContext.createDataFrame([("vinay","deepak",1),("sunny","pratik",2),("parag","akash",3)], ['from','to','amt'])
x.show()
x.describe().show()
+-----+------+---+
| from| to|amt|
+-----+------+---+
|vinay|deepak| 1|
|sunny|pratik| 2|
|parag| akash| 3|
+-----+------+---+
+-------+-----+------+---+
|summary| from| to|amt|
+-------+-----+------+---+
| count| 3| 3| 3|
| mean| null| null|2.0|
| stddev| null| null|1.0|
| min|parag| akash| 1|
| max|vinay|pratik| 3|
+-------+-----+------+---+
# Distinct
x = sqlContext.createDataFrame([("vinay","deepak",1),("sunny","pratik",2),("parag","akash",3),("parag","akash",3),("parag","akash",3)], ['from','to','amt'])
y = x.distinct()
x.show()
y.show()
+-----+------+---+
| from| to|amt|
+-----+------+---+
|vinay|deepak| 1|
|sunny|pratik| 2|
|parag| akash| 3|
|parag| akash| 3|
|parag| akash| 3|
+-----+------+---+
+-----+------+---+
| from| to|amt|
+-----+------+---+
|sunny|pratik| 2|
|vinay|deepak| 1|
|parag| akash| 3|
+-----+------+---+
# Drop
x = sqlContext.createDataFrame([("vinay","deepak",1),("sunny","pratik",2),("parag","akash",3)], ['from','to','amt'])
y = x.drop('amt')
x.show()
y.show()
+-----+------+---+
| from| to|amt|
+-----+------+---+
|vinay|deepak| 1|
|sunny|pratik| 2|
|parag| akash| 3|
+-----+------+---+
+-----+------+
| from| to|
+-----+------+
|vinay|deepak|
|sunny|pratik|
|parag| akash|
+-----+------+
# dropDuplicates
x = sqlContext.createDataFrame([("vinay","deepak",1),("sunny","pratik",2),("parag","akash",3),("parag","akash",3),("parag","akash",3)], ['from','to','amt'])
y = x.dropDuplicates(subset=['from','to'])
x.show()
y.show()
+-----+------+---+
| from| to|amt|
+-----+------+---+
|vinay|deepak| 1|
|sunny|pratik| 2|
|parag| akash| 3|
|parag| akash| 3|
|parag| akash| 3|
+-----+------+---+
+-----+------+---+
| from| to|amt|
+-----+------+---+
|vinay|deepak| 1|
|sunny|pratik| 2|
|parag| akash| 3|
+-----+------+---+
#dropna
x = sqlContext.createDataFrame([(None,"vinay",0.1),("vinay","sunny",None),("Peter",None,0.3),("Mark","Steve",0.2)], ['from','to','amount'])
y = x.dropna(how='any',subset=['from','to'])
x.show()
y.show()
+-----+-----+------+
| from| to|amount|
+-----+-----+------+
| null|vinay| 0.1|
|vinay|sunny| null|
|Peter| null| 0.3|
| Mark|Steve| 0.2|
+-----+-----+------+
+-----+-----+------+
| from| to|amount|
+-----+-----+------+
|vinay|sunny| null|
| Mark|Steve| 0.2|
+-----+-----+------+
#dtypes
x = sqlContext.createDataFrame([("vinay","deepak",1),("sunny","pratik",2),("parag","akash",3),("parag","akash",3),("parag","akash",3)], ['from','to','amt'])
y = x.dtypes
x.show()
print(y)
+-----+------+---+
| from| to|amt|
+-----+------+---+
|vinay|deepak| 1|
|sunny|pratik| 2|
|parag| akash| 3|
|parag| akash| 3|
|parag| akash| 3|
+-----+------+---+
[('from', 'string'), ('to', 'string'), ('amt', 'bigint')]
#Explain
x = sqlContext.createDataFrame([("vinay","deepak",1),("sunny","pratik",2),("parag","akash",3)], ['from','to','amt'])
x.show()
x.agg({"amt":"avg"}).explain(extended = True)
+-----+------+---+
| from| to|amt|
+-----+------+---+
|vinay|deepak| 1|
|sunny|pratik| 2|
|parag| akash| 3|
+-----+------+---+
== Parsed Logical Plan ==
'Aggregate ['avg(amt#169L) AS avg(amt)#187]
+- AnalysisBarrier
+- LogicalRDD [from#167, to#168, amt#169L], false
== Analyzed Logical Plan ==
avg(amt): double
Aggregate [avg(amt#169L) AS avg(amt)#187]
+- LogicalRDD [from#167, to#168, amt#169L], false
== Optimized Logical Plan ==
Aggregate [avg(amt#169L) AS avg(amt)#187]
+- Project [amt#169L]
+- LogicalRDD [from#167, to#168, amt#169L], false
== Physical Plan ==
*(2) HashAggregate(keys=[], functions=[avg(amt#169L)], output=[avg(amt)#187])
+- Exchange SinglePartition
+- *(1) HashAggregate(keys=[], functions=[partial_avg(amt#169L)], output=[sum#192, count#193L])
+- *(1) Project [amt#169L]
+- Scan ExistingRDD[from#167,to#168,amt#169L]
#fillna
x = sqlContext.createDataFrame([(None,"deepak",1),("sunny",None,2),("parag",None,3)], ['from','to','amt'])
y = x.fillna(value = '---',subset = ['from','to'])
x.show()
y.show()
+-----+------+---+
| from| to|amt|
+-----+------+---+
| null|deepak| 1|
|sunny| null| 2|
|parag| null| 3|
+-----+------+---+
+-----+------+---+
| from| to|amt|
+-----+------+---+
| ---|deepak| 1|
|sunny| ---| 2|
|parag| ---| 3|
+-----+------+---+
# Filter
x = sqlContext.createDataFrame([("vinay","deepak",1),("sunny","pratik",2),("parag","akash",3)], ['from','to','amt'])
y = x.filter("amt > 2 ")
x.show()
y.show()
+-----+------+---+
| from| to|amt|
+-----+------+---+
|vinay|deepak| 1|
|sunny|pratik| 2|
|parag| akash| 3|
+-----+------+---+
+-----+-----+---+
| from| to|amt|
+-----+-----+---+
|parag|akash| 3|
+-----+-----+---+
# First
x = sqlContext.createDataFrame([("vinay","deepak",1),("sunny","pratik",2),("parag","akash",3)], ['from','to','amt'])
y = x.first()
x.show()
print(y)
+-----+------+---+
| from| to|amt|
+-----+------+---+
|vinay|deepak| 1|
|sunny|pratik| 2|
|parag| akash| 3|
+-----+------+---+
Row(from='vinay', to='deepak', amt=1)
# foreach
from __future__ import print_function
# setup
fn = './foreachExampleDataFrames.txt'
open(fn, 'w').close() # clear the file
def fappend(el,f):
'''appends el to file f'''
print(el,file=open(f, 'a+') )
# example
x = sqlContext.createDataFrame([("vinay","deepak",1),("sunny","pratik",2),("parag","akash",3)], ['from','to','amt'])
y = x.foreach(lambda x: fappend(x,fn)) # writes into foreachExampleDataFrames.txt
x.show() # original dataframe
print(y) # foreach returns 'None'
# print the contents of the file
with open(fn, "r") as foreachExample:
print (foreachExample.read())
+-----+------+---+
| from| to|amt|
+-----+------+---+
|vinay|deepak| 1|
|sunny|pratik| 2|
|parag| akash| 3|
+-----+------+---+
None
Row(from='vinay', to='deepak', amt=1)
Row(from='sunny', to='pratik', amt=2)
# foreachPartition
from __future__ import print_function
# setup
fn = './foreachExampleDataFrames.txt'
open(fn, 'w').close() # clear the file
def fappend(el,f):
'''appends el to file f'''
print(el,file=open(f, 'a+') )
# example
x = sqlContext.createDataFrame([("vinay","deepak",1),("sunny","pratik",2),("parag","akash",3)], ['from','to','amt'])
y = x.foreach(lambda x: fappend(x,fn)) # writes into foreachExampleDataFrames.txt
x.show() # original dataframe
print(y) # foreach returns 'None'
# print the contents of the file
with open(fn, "r") as foreachExample:
print (foreachExample.read())
+-----+------+---+
| from| to|amt|
+-----+------+---+
|vinay|deepak| 1|
|sunny|pratik| 2|
|parag| akash| 3|
+-----+------+---+
None
Row(from='parag', to='akash', amt=3)
Row(from='sunny', to='pratik', amt=2)
Row(from='vinay', to='deepak', amt=1)
# freqItems
x = sqlContext.createDataFrame([("Vinay","sunny",50), \
("Deepak","sunny",30), \
("Vinay","Parag",20), \
("Vinay","ram",50), \
("sham","sunny",90), \
("Vinay","pushpak",50), \
("om","sunny",50), \
("sagar","sunny",50), \
("Vinay","rahul",80), \
("akash","sunny",50), \
("puranik","pranav",70)],\
['from','to','amount'])
y = x.freqItems(cols=['from','amount'],support=0.8)
x.show()
y.show()
+-------+-------+------+
| from| to|amount|
+-------+-------+------+
| Vinay| sunny| 50|
| Deepak| sunny| 30|
| Vinay| Parag| 20|
| Vinay| ram| 50|
| sham| sunny| 90|
| Vinay|pushpak| 50|
| om| sunny| 50|
| sagar| sunny| 50|
| Vinay| rahul| 80|
| akash| sunny| 50|
|puranik| pranav| 70|
+-------+-------+------+
+--------------+----------------+
|from_freqItems|amount_freqItems|
+--------------+----------------+
| [Vinay]| [50]|
+--------------+----------------+
# groupBy
x = sqlContext.createDataFrame([("vinay","deepak",1),("sunny","pratik",2),("parag","akash",3)], ['from','to','amt'])
y = x.groupBy('amt')
x.show()
print(y)
+-----+------+---+
| from| to|amt|
+-----+------+---+
|vinay|deepak| 1|
|sunny|pratik| 2|
|parag| akash| 3|
+-----+------+---+
<pyspark.sql.group.GroupedData object at 0x0000021742513CC0>
# groupBy (col1).avg(col2)
x = sqlContext.createDataFrame([("vinay","deepak",12466641),("sunny","pratik",451232),("parag","akash",2555455)], ['from','to','amt'])
y = x.groupBy('from').avg('amt')
x.show()
y.show()
+-----+------+--------+
| from| to| amt|
+-----+------+--------+
|vinay|deepak|12466641|
|sunny|pratik| 451232|
|parag| akash| 2555455|
+-----+------+--------+
+-----+-----------+
| from| avg(amt)|
+-----+-----------+
|parag| 2555455.0|
|sunny| 451232.0|
|vinay|1.2466641E7|
+-----+-----------+
# head
x = sqlContext.createDataFrame([("vinay","deepak",12466641),("sunny","pratik",451232),("parag","akash",2555455)], ['from','to','amt'])
y = x.head(2)
x.show()
print(y)
+-----+------+--------+
| from| to| amt|
+-----+------+--------+
|vinay|deepak|12466641|
|sunny|pratik| 451232|
|parag| akash| 2555455|
+-----+------+--------+
[Row(from='vinay', to='deepak', amt=12466641), Row(from='sunny', to='pratik', amt=451232)]
# intersect
x = sqlContext.createDataFrame([("vinay","deepak",12466641),("sunny","pratik",451232),("parag","akash",2555455),("parag","akash",2555455)], ['from','to','amt'])
y = sqlContext.createDataFrame([("vinay","deepak",12466641),("sunny","pratik",451232),("parag","akash",2555455),("parag","akashay",2555455)], ['from','to','amt'])
z = x.intersect(y)
x.show()
y.show()
z.show()
+-----+------+--------+
| from| to| amt|
+-----+------+--------+
|vinay|deepak|12466641|
|sunny|pratik| 451232|
|parag| akash| 2555455|
|parag| akash| 2555455|
+-----+------+--------+
+-----+-------+--------+
| from| to| amt|
+-----+-------+--------+
|vinay| deepak|12466641|
|sunny| pratik| 451232|
|parag| akash| 2555455|
|parag|akashay| 2555455|
+-----+-------+--------+
+-----+------+--------+
| from| to| amt|
+-----+------+--------+
|sunny|pratik| 451232|
|vinay|deepak|12466641|
|parag| akash| 2555455|
+-----+------+--------+
# isLocal
x = sqlContext.createDataFrame([("vinay","deepak",12466641),("sunny","pratik",451232),("parag","akash",2555455),("parag","akash",2555455)], ['from','to','amt'])
y = x.isLocal()
x.show()
print(y)
+-----+------+--------+
| from| to| amt|
+-----+------+--------+
|vinay|deepak|12466641|
|sunny|pratik| 451232|
|parag| akash| 2555455|
|parag| akash| 2555455|
+-----+------+--------+
False
# join
x = sqlContext.createDataFrame([("vinay","deepak",12466641),("sunny","pratik",451232),("parag","akash",2555455),("Salman","akash",2555455)], ['from','to','amt'])
y = sqlContext.createDataFrame([('Andy',20),("Steve",40),("Elon",80)], ['name','age'])
z = x.join(y,x.to ==y.name,'inner').select('from','to','amt','age')
x.show()
y.show()
z.show()
+------+------+--------+
| from| to| amt|
+------+------+--------+
| vinay|deepak|12466641|
| sunny|pratik| 451232|
| parag| akash| 2555455|
|Salman| akash| 2555455|
+------+------+--------+
+-----+---+
| name|age|
+-----+---+
| Andy| 20|
|Steve| 40|
| Elon| 80|
+-----+---+
+----+---+---+---+
|from| to|amt|age|
+----+---+---+---+
+----+---+---+---+
# join
x = sqlContext.createDataFrame([("vinay","deepak",12466641),("sunny","pratik",451232),("parag","akash",2555455),("Salman","akash",2555455)], ['from','to','amt'])
y = sqlContext.createDataFrame([('Andy',20),("Steve",40),("Elon",80)], ['name','age'])
z = x.join(y,x.to ==y.name,'outer').select('from','to','amt','age')
x.show()
y.show()
z.show()
+------+------+--------+
| from| to| amt|
+------+------+--------+
| vinay|deepak|12466641|
| sunny|pratik| 451232|
| parag| akash| 2555455|
|Salman| akash| 2555455|
+------+------+--------+
+-----+---+
| name|age|
+-----+---+
| Andy| 20|
|Steve| 40|
| Elon| 80|
+-----+---+
+------+------+--------+----+
| from| to| amt| age|
+------+------+--------+----+
| null| null| null| 40|
| sunny|pratik| 451232|null|
| vinay|deepak|12466641|null|
| null| null| null| 20|
| parag| akash| 2555455|null|
|Salman| akash| 2555455|null|
| null| null| null| 80|
+------+------+--------+----+
# Limit
# join
x = sqlContext.createDataFrame([("vinay","deepak",12466641),("sunny","pratik",451232),("parag","akash",2555455),("Salman","akash",2555455)], ['from','to','amt'])
y = x.limit(2)
x.show()
y.show()
+------+------+--------+
| from| to| amt|
+------+------+--------+
| vinay|deepak|12466641|
| sunny|pratik| 451232|
| parag| akash| 2555455|
|Salman| akash| 2555455|
+------+------+--------+
+-----+------+--------+
| from| to| amt|
+-----+------+--------+
|vinay|deepak|12466641|
|sunny|pratik| 451232|
+-----+------+--------+
# na
x = sqlContext.createDataFrame([(None,"Bob",0.1),("Bob","Carol",None),("Carol",None,0.3),("Bob","Carol",0.2)], ['from','to','amt'])
y = x.na # returns an object for handling missing values, supports drop, fill, and replace methods
x.show()
print(y)
y.drop().show()
y.fill({'from':'unknown','to':'unknown','amt':0}).show()
y.fill('--').show()
+-----+-----+----+
| from| to| amt|
+-----+-----+----+
| null| Bob| 0.1|
| Bob|Carol|null|
|Carol| null| 0.3|
| Bob|Carol| 0.2|
+-----+-----+----+
<pyspark.sql.dataframe.DataFrameNaFunctions object at 0x0000021742513940>
+----+-----+---+
|from| to|amt|
+----+-----+---+
| Bob|Carol|0.2|
+----+-----+---+
+-------+-------+---+
| from| to|amt|
+-------+-------+---+
|unknown| Bob|0.1|
| Bob| Carol|0.0|
| Carol|unknown|0.3|
| Bob| Carol|0.2|
+-------+-------+---+
+-----+-----+----+
| from| to| amt|
+-----+-----+----+
| --| Bob| 0.1|
| Bob|Carol|null|
|Carol| --| 0.3|
| Bob|Carol| 0.2|
+-----+-----+----+
# orderBy
x = sqlContext.createDataFrame([("vinay","deepak",12466641),("sunny","pratik",451232),("parag","akash",2555455)], ['from','to','amt'])
y = x.orderBy(['amt'],ascending=[False])
z = x.orderBy(['amt'],ascending=[True])
x.show()
y.show()
z.show()
+-----+------+--------+
| from| to| amt|
+-----+------+--------+
|vinay|deepak|12466641|
|sunny|pratik| 451232|
|parag| akash| 2555455|
+-----+------+--------+
+-----+------+--------+
| from| to| amt|
+-----+------+--------+
|vinay|deepak|12466641|
|parag| akash| 2555455|
|sunny|pratik| 451232|
+-----+------+--------+
+-----+------+--------+
| from| to| amt|
+-----+------+--------+
|sunny|pratik| 451232|
|parag| akash| 2555455|
|vinay|deepak|12466641|
+-----+------+--------+
# PrintSchema
x = sqlContext.createDataFrame([("vinay","deepak",12466641),("sunny","pratik",451232),("parag","akash",2555455)], ['from','to','amt'])
x.show()
x.printSchema()
+-----+------+--------+
| from| to| amt|
+-----+------+--------+
|vinay|deepak|12466641|
|sunny|pratik| 451232|
|parag| akash| 2555455|
+-----+------+--------+
root
|-- from: string (nullable = true)
|-- to: string (nullable = true)
|-- amt: long (nullable = true)
# randomSplit
x = sqlContext.createDataFrame([("vinay","deepak",12466641),("sunny","pratik",451232),("parag","akash",2555455)], ['from','to','amt'])
y = x.randomSplit([0.5,0.5])
x.show()
y[0].show()
y[1].show()
+-----+------+--------+
| from| to| amt|
+-----+------+--------+
|vinay|deepak|12466641|
|sunny|pratik| 451232|
|parag| akash| 2555455|
+-----+------+--------+
+-----+------+-------+
| from| to| amt|
+-----+------+-------+
|sunny|pratik| 451232|
|parag| akash|2555455|
+-----+------+-------+
+-----+------+--------+
| from| to| amt|
+-----+------+--------+
|vinay|deepak|12466641|
+-----+------+--------+
# rdd
x = sqlContext.createDataFrame([("vinay","deepak",12466641),("sunny","pratik",451232),("parag","akash",2555455)], ['from','to','amt'])
y = x.rdd
x.show()
print(y.collect())
+-----+------+--------+
| from| to| amt|
+-----+------+--------+
|vinay|deepak|12466641|
|sunny|pratik| 451232|
|parag| akash| 2555455|
+-----+------+--------+
[Row(from='vinay', to='deepak', amt=12466641), Row(from='sunny', to='pratik', amt=451232), Row(from='parag', to='akash', amt=2555455)]
# registerTempTable
x = sqlContext.createDataFrame([("vinay","deepak",12466641),("sunny","pratik",451232),("parag","akash",2555455)], ['from','to','amt'])
x.registerTempTable(name="TRANS")
y = sqlContext.sql('SELECT * FROM TRANS WHERE amt > 451232')
x.show()
y.show()
+-----+------+--------+
| from| to| amt|
+-----+------+--------+
|vinay|deepak|12466641|
|sunny|pratik| 451232|
|parag| akash| 2555455|
+-----+------+--------+
+-----+------+--------+
| from| to| amt|
+-----+------+--------+
|vinay|deepak|12466641|
|parag| akash| 2555455|
+-----+------+--------+
# repartiton
x = sqlContext.createDataFrame([("vinay","deepak",12466641),("sunny","pratik",451232),("parag","akash",2555455)], ['from','to','amt'])
y = x.repartition(3)
print(x.rdd.getNumPartitions())
print(y.rdd.getNumPartitions())
y.show()
4
3
+-----+------+--------+
| from| to| amt|
+-----+------+--------+
|parag| akash| 2555455|
|vinay|deepak|12466641|
|sunny|pratik| 451232|
+-----+------+--------+
# replace
x = sqlContext.createDataFrame([("vinay","deepak",12466641),("sunny","pratik",451232),("parag","akash",2555455)], ['from','to','amt'])
y = x.replace('vinay','sunny',['from','to'])
x.show()
y.show()
+-----+------+--------+
| from| to| amt|
+-----+------+--------+
|vinay|deepak|12466641|
|sunny|pratik| 451232|
|parag| akash| 2555455|
+-----+------+--------+
+-----+------+--------+
| from| to| amt|
+-----+------+--------+
|sunny|deepak|12466641|
|sunny|pratik| 451232|
|parag| akash| 2555455|
+-----+------+--------+
# replace
x = sqlContext.createDataFrame([('Sunny',"chirag",0.1),("deepak","vinay",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.replace('Sunny','Pranav',['from','to'])
x.show()
y.show()
+------+------+---+
| from| to|amt|
+------+------+---+
| Sunny|chirag|0.1|
|deepak| vinay|0.2|
| Carol| Dave|0.3|
+------+------+---+
+------+------+---+
| from| to|amt|
+------+------+---+
|Pranav|chirag|0.1|
|deepak| vinay|0.2|
| Carol| Dave|0.3|
+------+------+---+
#rollup
x = sqlContext.createDataFrame([("vinay","deepak",1246.6641),("sunny","pratik",4512.32),("parag","akash",2555.455)], ['from','to','amt'])
y = x.rollup(['from','to'])
x.show()
print(y)
#y is a grouped data object
#aggregations will be applied to all numerical columns
y.sum().show()
y.max().show()
y.min().show()
+-----+------+---------+
| from| to| amt|
+-----+------+---------+
|vinay|deepak|1246.6641|
|sunny|pratik| 4512.32|
|parag| akash| 2555.455|
+-----+------+---------+
<pyspark.sql.group.GroupedData object at 0x000002C3E1627A58>
+-----+------+---------+
| from| to| sum(amt)|
+-----+------+---------+
| null| null|8314.4391|
|vinay|deepak|1246.6641|
|vinay| null|1246.6641|
|parag| akash| 2555.455|
|parag| null| 2555.455|
|sunny| null| 4512.32|
|sunny|pratik| 4512.32|
+-----+------+---------+
+-----+------+---------+
| from| to| max(amt)|
+-----+------+---------+
| null| null| 4512.32|
|vinay|deepak|1246.6641|
|vinay| null|1246.6641|
|parag| akash| 2555.455|
|parag| null| 2555.455|
|sunny| null| 4512.32|
|sunny|pratik| 4512.32|
+-----+------+---------+
+-----+------+---------+
| from| to| min(amt)|
+-----+------+---------+
| null| null|1246.6641|
|vinay|deepak|1246.6641|
|vinay| null|1246.6641|
|parag| akash| 2555.455|
|parag| null| 2555.455|
|sunny| null| 4512.32|
|sunny|pratik| 4512.32|
+-----+------+---------+
# sample:-
# Returns a stratified sample without replacement based
# on the fraction given on each stratum.
x = sqlContext.createDataFrame([("vinay","deepak",1246.6641),("sunny","pratik",4512.32),("parag","akash",2555.455)], ['from','to','amt'])
y = x.sample(False,0.5)
x.show()
y.show()
+-----+------+---------+
| from| to| amt|
+-----+------+---------+
|vinay|deepak|1246.6641|
|sunny|pratik| 4512.32|
|parag| akash| 2555.455|
+-----+------+---------+
+-----+------+--------+
| from| to| amt|
+-----+------+--------+
|sunny|pratik| 4512.32|
|parag| akash|2555.455|
+-----+------+--------+
#schema
x = sqlContext.createDataFrame([("vinay","deepak",1246.6641),("sunny","pratik",4512.32),("parag","akash",2555.455)], ['from','to','amt'])
y = x.schema
x.show()
print(y)
+-----+------+---------+
| from| to| amt|
+-----+------+---------+
|vinay|deepak|1246.6641|
|sunny|pratik| 4512.32|
|parag| akash| 2555.455|
+-----+------+---------+
StructType(List(StructField(from,StringType,true),StructField(to,StringType,true),StructField(amt,DoubleType,true)))
# SlectExpr
x = sqlContext.createDataFrame([("vinay","deepak",1246.6641),("sunny","pratik",4512.32),("parag","akash",2555.455)], ['from','to','amt'])
y = x.selectExpr(['substr(from,1,1)','amt+1000'])
x.show()
y.show()
+-----+------+---------+
| from| to| amt|
+-----+------+---------+
|vinay|deepak|1246.6641|
|sunny|pratik| 4512.32|
|parag| akash| 2555.455|
+-----+------+---------+
+---------------------+------------+
|substring(from, 1, 1)|(amt + 1000)|
+---------------------+------------+
| v| 2246.6641|
| s| 5512.32|
| p| 3555.455|
+---------------------+------------+
# show
x = sqlContext.createDataFrame([("vinay","deepak",1246.6641),("sunny","pratik",4512.32),("parag","akash",2555.455)], ['from','to','amt'])
x.show()
+-----+------+---------+
| from| to| amt|
+-----+------+---------+
|vinay|deepak|1246.6641|
|sunny|pratik| 4512.32|
|parag| akash| 2555.455|
+-----+------+---------+
# sort
x = sqlContext.createDataFrame([("vinay","deepak",1246.6641),("sunny","pratik",4512.32),("parag","akash",2555.455)], ['from','to','amt'])
y = x.sort(['amt'])
x.show()
y.show()
+-----+------+---------+
| from| to| amt|
+-----+------+---------+
|vinay|deepak|1246.6641|
|sunny|pratik| 4512.32|
|parag| akash| 2555.455|
+-----+------+---------+
+-----+------+---------+
| from| to| amt|
+-----+------+---------+
|vinay|deepak|1246.6641|
|parag| akash| 2555.455|
|sunny|pratik| 4512.32|
+-----+------+---------+
# sortWithinPartitions
x = sqlContext.createDataFrame([('vinay',"Bobby",0.1,1),("Bobby","sunny",0.2,2),("deepak","parag",0.3,2)], \
['from','to','amt','p_id']).repartition(2,'p_id')
y = x.sortWithinPartitions(['to'])
x.show()
y.show()
print(x.rdd.glom().collect()) # glom() flattens elements on the same partition
print("\n")
print(y.rdd.glom().collect())
+------+-----+---+----+
| from| to|amt|p_id|
+------+-----+---+----+
| Bobby|sunny|0.2| 2|
|deepak|parag|0.3| 2|
| vinay|Bobby|0.1| 1|
+------+-----+---+----+
+------+-----+---+----+
| from| to|amt|p_id|
+------+-----+---+----+
|deepak|parag|0.3| 2|
| Bobby|sunny|0.2| 2|
| vinay|Bobby|0.1| 1|
+------+-----+---+----+
[[Row(from='Bobby', to='sunny', amt=0.2, p_id=2), Row(from='deepak', to='parag', amt=0.3, p_id=2)], [Row(from='vinay', to='Bobby', amt=0.1, p_id=1)]]
[[Row(from='deepak', to='parag', amt=0.3, p_id=2), Row(from='Bobby', to='sunny', amt=0.2, p_id=2)], [Row(from='vinay', to='Bobby', amt=0.1, p_id=1)]]
# Stat :-Returns a
# DataFrameStatFunctions for statistic functions.
x = sqlContext.createDataFrame([("vinay","Bobby",0.1,0.001),("Bobby","sunny",0.2,0.02),("sunny","pranav",0.3,0.02)], ['from','to','amt','fees'])
y = x.stat
x.show()
print(y)
print(y.corr(col1="amt",col2="fees"))
+-----+------+---+-----+
| from| to|amt| fees|
+-----+------+---+-----+
|vinay| Bobby|0.1|0.001|
|Bobby| sunny|0.2| 0.02|
|sunny|pranav|0.3| 0.02|
+-----+------+---+-----+
<pyspark.sql.dataframe.DataFrameStatFunctions object at 0x00000241596F37F0>
0.8660254037844386
# subtract
x = sqlContext.createDataFrame([("vinay","Bobby",0.1,0.001),("Bobby","sunny",0.2,0.02),("sunny","pranav",0.3,0.02)], ['from','to','amt','fees'])
y = sqlContext.createDataFrame([("vinay","Bobby",0.1,0.001),("Bobby","sunny",0.2,0.02),("sunny","pranav",0.3,0.01)], ['from','to','amt','fees'])
z = x.subtract(y)
x.show()
y.show()
z.show()
+-----+------+---+-----+
| from| to|amt| fees|
+-----+------+---+-----+
|vinay| Bobby|0.1|0.001|
|Bobby| sunny|0.2| 0.02|
|sunny|pranav|0.3| 0.02|
+-----+------+---+-----+
+-----+------+---+-----+
| from| to|amt| fees|
+-----+------+---+-----+
|vinay| Bobby|0.1|0.001|
|Bobby| sunny|0.2| 0.02|
|sunny|pranav|0.3| 0.01|
+-----+------+---+-----+
+-----+------+---+----+
| from| to|amt|fees|
+-----+------+---+----+
|sunny|pranav|0.3|0.02|
+-----+------+---+----+
x = sqlContext.createDataFrame([("vinay","Bobby",0.1,0.001),("Bobby","sunny",0.2,0.02),("sunny","pranav",0.3,0.02)], ['from','to','amt','fees'])
y = x.take(num=2)
x.show()
print(y)
+-----+------+---+-----+
| from| to|amt| fees|
+-----+------+---+-----+
|vinay| Bobby|0.1|0.001|
|Bobby| sunny|0.2| 0.02|
|sunny|pranav|0.3| 0.02|
+-----+------+---+-----+
[Row(from='vinay', to='Bobby', amt=0.1, fees=0.001), Row(from='Bobby', to='sunny', amt=0.2, fees=0.02)]
#toDF
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.toDF("seller","buyer","amt")
x.show()
y.show()
+-----+-----+---+
| from| to|amt|
+-----+-----+---+
|Alice| Bob|0.1|
| Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+
+------+-----+---+
|seller|buyer|amt|
+------+-----+---+
| Alice| Bob|0.1|
| Bob|Carol|0.2|
| Carol| Dave|0.3|
+------+-----+---+
# toJson
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Alice",0.3)], ['from','to','amt'])
y = x.toJSON()
x.show()
print(y)
print("\n")
print(y.collect())
+-----+-----+---+
| from| to|amt|
+-----+-----+---+
|Alice| Bob|0.1|
| Bob|Carol|0.2|
|Carol|Alice|0.3|
+-----+-----+---+
MapPartitionsRDD[193] at toJavaRDD at NativeMethodAccessorImpl.java:0
['{"from":"Alice","to":"Bob","amt":0.1}', '{"from":"Bob","to":"Carol","amt":0.2}', '{"from":"Carol","to":"Alice","amt":0.3}']
# toPandas
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Alice",0.3)], ['from','to','amt'])
y = x.toPandas
x.show()
print(type(y))
y
+-----+-----+---+
| from| to|amt|
+-----+-----+---+
|Alice| Bob|0.1|
| Bob|Carol|0.2|
|Carol|Alice|0.3|
+-----+-----+---+
<class 'method'>
<bound method DataFrame.toPandas of DataFrame[from: string, to: string, amt: double]>
# unionAll
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Alice",0.3)], ['from','to','amt'])
y = sqlContext.createDataFrame([('sunny',"Bob",0.1),("vinay","Carol",0.2),("pranav","Alice",0.3)], ['from','to','amt'])
z = x.unionAll(y)
x.show()
y.show()
z.show()
+-----+-----+---+
| from| to|amt|
+-----+-----+---+
|Alice| Bob|0.1|
| Bob|Carol|0.2|
|Carol|Alice|0.3|
+-----+-----+---+
+------+-----+---+
| from| to|amt|
+------+-----+---+
| sunny| Bob|0.1|
| vinay|Carol|0.2|
|pranav|Alice|0.3|
+------+-----+---+
+------+-----+---+
| from| to|amt|
+------+-----+---+
| Alice| Bob|0.1|
| Bob|Carol|0.2|
| Carol|Alice|0.3|
| sunny| Bob|0.1|
| vinay|Carol|0.2|
|pranav|Alice|0.3|
+------+-----+---+
# unpersist
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Alice",0.3)], ['from','to','amt'])
x.cache()
x.count()
x.show()
print(x.is_cached)
x.unpersist()
print(x.is_cached)
+-----+-----+---+
| from| to|amt|
+-----+-----+---+
|Alice| Bob|0.1|
| Bob|Carol|0.2|
|Carol|Alice|0.3|
+-----+-----+---+
True
False
# where
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Alice",0.3)], ['from','to','amt'])
y = x.where("amt > 0.2")
x.show()
y.show()
+-----+-----+---+
| from| to|amt|
+-----+-----+---+
|Alice| Bob|0.1|
| Bob|Carol|0.2|
|Carol|Alice|0.3|
+-----+-----+---+
+-----+-----+---+
| from| to|amt|
+-----+-----+---+
|Carol|Alice|0.3|
+-----+-----+---+
# withColumn
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Alice",0.3)], ['from','to','amt'])
y = x.withColumn('conf',x.amt.isNotNull())
x.show()
y.show()
+-----+-----+---+
| from| to|amt|
+-----+-----+---+
|Alice| Bob|0.1|
| Bob|Carol|0.2|
|Carol|Alice|0.3|
+-----+-----+---+
+-----+-----+---+----+
| from| to|amt|conf|
+-----+-----+---+----+
|Alice| Bob|0.1|true|
| Bob|Carol|0.2|true|
|Carol|Alice|0.3|true|
+-----+-----+---+----+
# withColumnRenamed
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.withColumnRenamed('amt','amount')
x.show()
y.show()
+-----+-----+---+
| from| to|amt|
+-----+-----+---+
|Alice| Bob|0.1|
| Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+
+-----+-----+------+
| from| to|amount|
+-----+-----+------+
|Alice| Bob| 0.1|
| Bob|Carol| 0.2|
|Carol| Dave| 0.3|
+-----+-----+------+
# write
import json
x = sqlContext.createDataFrame([('Alice',"Bob",0.1),("Bob","Carol",0.2),("Carol","Dave",0.3)], ['from','to','amt'])
y = x.write.mode('overwrite').json('./dataframeWriteExample.json')
x.show()
# Read the DF back in from file
sqlContext.read.json('./dataframeWriteExample.json').show()
+-----+-----+---+
| from| to|amt|
+-----+-----+---+
|Alice| Bob|0.1|
| Bob|Carol|0.2|
|Carol| Dave|0.3|
+-----+-----+---+
+---+-----+-----+
|amt| from| to|
+---+-----+-----+
|0.3|Carol| Dave|
|0.1|Alice| Bob|
|0.2| Bob|Carol|
+---+-----+-----+