Spark

[TOC]

PySpark

initialize and stop


# 使用spark context
from pyspark import SparkContext
sc = SparkContext()
# some code
sc.stop()


# 使用spark sql
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("test") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()
# some code
spark.stop()

read and write


# load parquet file
data = spark.read.parquet("file_name.parquet")

# show dataframe
data.show()

# write json file
data.write.json("filename.json")

DataFrame

import pyspark.sql.functions as F
import pyspark.sql.types as types

# From SQL
df = spark.sql('select name, age from table')

# collect/show
df.show()
df.take(5)
df.collect()
df.take(1)[0].asDict()['name']

# Rename column
df.withColumnRenamed('old_name', 'new_name')

# add column
df.withColumn('new_col', F.lit(1))

# drop column
df.drop('new_col')

# filter
df.filter(F.col('cnt') > 1)
df.where('cnt > 2')

# join
df1.join(df2, df['key1']==df2['key2'], how='inner')

# to RDD
df.rdd

# map with rdd
df.rdd.map(lambda x: (x[0]+1, )).toDF()

# map with udf
func = lambda x: x + 1 # define function
udf_func = F.udf(func, FloatType()) # convert function to UDF
df.withColumn('new_col', udf_func('old_col')) # map function to column

# save to hive
df.createTempView('tmp_table')
spark.sql('''
    INSERT OVERWRITE TABLE hive_table PARTITION(ds=20200825)
    SELECT name, age FROM tmp_table  
''')

df.write.insertInto(target_db.target_table, overwrite = False)

References