Quick Start

cd $SPARK_HOME
./bin/spark-submit examples/src/main/python/pi.py 10
# ---output---
# Pi is roughly 3.145440

Running Spark

YARN

cluster mode: 命令仅提交任务,不等待任务完成,spark driver 在 YARN 的 application master 上运行

./bin/spark-submit --class path.to.your.Class --master yarn --deploy-mode cluster [options] <app jar> [app options]

./bin/spark-submit --class org.apache.spark.examples.SparkPi \
    --master yarn \
    --deploy-mode cluster \
    --driver-memory 4g \
    --executor-memory 2g \
    --executor-cores 1 \
    --queue default \
    examples/jars/spark-examples*.jar \
    10

client mode: 命令提交任务并等待任务完成,spark driver 在 client 上运行,application master 仅用于申请资源

./bin/spark-submit --class path.to.your.Class --master yarn --deploy-mode client [options] <app jar> [app options]

Spark Connect

Spark Doc - Spark Connect Overview

启动 connect server

cd $SPARK_HOME
./sbin/start-connect-server.sh --packages org.apache.spark:spark-connect_2.12:3.5.3

# check connect server is running
jps
lsof -i :15002

PySpark

Install

Spark Doc - PySpark Getting Started - Installation

pip install pyspark

# Spark SQL
pip install pyspark[sql]
# pandas API on Spark
pip install pyspark[pandas_on_spark] plotly  # to plot your data, you can install plotly together.
# Spark Connect
pip install pyspark[connect]

initialize and stop


# 使用spark context
from pyspark import SparkContext
sc = SparkContext()
# some code
sc.stop()


# 使用spark sql
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("test") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()
# some code
spark.stop()

# with default local warehouse and metastore
spark = SparkSession \
    .builder \
    .appName("test") \
    .config("spark.sql.warehouse.dir", "file:/path/to/spark/spark-warehouse") \
    .config("javax.jdo.option.ConnectionURL", "jdbc:derby:;databaseName=/path/to/spark/metastore_db;create=true") \
    .enableHiveSupport() \
    .getOrCreate()

check config

conf = spark.sparkContext.getConf().getAll()
for (key, value) in conf:
    print(f"{key}: {value}")

read and write


# load parquet file
data = spark.read.parquet("file_name.parquet")

# show dataframe
data.show()

# write json file
data.write.json("filename.json")

DataFrame

import pyspark.sql.functions as F
import pyspark.sql.types as types

# From SQL
df = spark.sql('select name, age from table')

# collect/show
df.show()
df.take(5)
df.collect()
df.take(1)[0].asDict()['name']

# Rename column
df.withColumnRenamed('old_name', 'new_name')

# add column
df.withColumn('new_col', F.lit(1))

# drop column
df.drop('new_col')

# filter
df.filter(F.col('cnt') > 1)
df.where('cnt > 2')

# join
df1.join(df2, df['key1']==df2['key2'], how='inner')

# to RDD
df.rdd

# map with rdd
df.rdd.map(lambda x: (x[0]+1, )).toDF()

# map with udf
func = lambda x: x + 1 # define function
udf_func = F.udf(func, FloatType()) # convert function to UDF
df.withColumn('new_col', udf_func('old_col')) # map function to column

# save to hive
df.createTempView('tmp_table')
spark.sql('''
    INSERT OVERWRITE TABLE hive_table PARTITION(ds=20200825)
    SELECT name, age FROM tmp_table  
''')

df.write.insertInto(target_db.target_table, overwrite = False)

Spark SQL

start spark sql

./bin/spark-sql

usage

SHOW DATABASES;

USE default;

SHOW TABLES;


-- default using local filesystem, write to $SPARK_HOME/spark-warehouse/<table_name>/*.parquet
CREATE TABLE t_test_table (
    id INT,
    name STRING
)

-- use another location
CREATE TABLE t_test_table (
    id INT,
    name STRING
) LOCATION '/data/yzchen/Softwares/spark/test-warehouse/t_test_table';

INSERT INTO t_test_table VALUES (1, 'test');

sparkmagic

Install

pip install sparkmagic
pip install ipywidgets

新版本的 IPython 会有兼容问题,修改 lib/python3.12/site-packages/hdijupyterutils/ipythondisplay.py 文件,参考 github issue

# from IPython.core.display import display  # comment this line
from IPython.display import display         # add this line

修改配置,vi ~/.sparkmagic/config.json,可以参考 example_config.json

{
  "livy_session_startup_timeout_seconds": 300,
  "livy_server_heartbeat_timeout_seconds": 1000,
  "session_configs_defaults": {
    "conf": {
      "spark.driver.cores": "1",
      "spark.driver.memory": "7g"
    }
  }
}

临时修改配置

%%spark config
{
   "conf": {
       "spark.driver.cores": "2",
       "spark.driver.memory": "14g"
   }
}

启动 session

%spark add -s <session-name> -l python -u http://<endpoint> -a username -p <token>

运行 SQL

%%spark
spark.sql("SHOW TABLES").show(2, False)

或者

%%spark -c sql
SHOW TABLES;

References