Spark
[TOC]
PySpark
initialize and stop
# 使用spark context
from pyspark import SparkContext
sc = SparkContext()
# some code
sc.stop()
# 使用spark sql
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("test") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
# some code
spark.stop()
read and write
# load parquet file
data = spark.read.parquet("file_name.parquet")
# show dataframe
data.show()
# write json file
data.write.json("filename.json")
DataFrame
import pyspark.sql.functions as F
import pyspark.sql.types as types
# From SQL
df = spark.sql('select name, age from table')
# collect/show
df.show()
df.take(5)
df.collect()
df.take(1)[0].asDict()['name']
# Rename column
df.withColumnRenamed('old_name', 'new_name')
# add column
df.withColumn('new_col', F.lit(1))
# drop column
df.drop('new_col')
# filter
df.filter(F.col('cnt') > 1)
df.where('cnt > 2')
# join
df1.join(df2, df['key1']==df2['key2'], how='inner')
# to RDD
df.rdd
# map with rdd
df.rdd.map(lambda x: (x[0]+1, )).toDF()
# map with udf
func = lambda x: x + 1 # define function
udf_func = F.udf(func, FloatType()) # convert function to UDF
df.withColumn('new_col', udf_func('old_col')) # map function to column
# save to hive
df.createTempView('tmp_table')
spark.sql('''
INSERT OVERWRITE TABLE hive_table PARTITION(ds=20200825)
SELECT name, age FROM tmp_table
''')
df.write.insertInto(target_db.target_table, overwrite = False)