Quick Start
cd $SPARK_HOME
./bin/spark-submit examples/src/main/python/pi.py 10
# ---output---
# Pi is roughly 3.145440
Running Spark
YARN
cluster mode: 命令仅提交任务,不等待任务完成,spark driver 在 YARN 的 application master 上运行
./bin/spark-submit --class path.to.your.Class --master yarn --deploy-mode cluster [options] <app jar> [app options]
./bin/spark-submit --class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \
--driver-memory 4g \
--executor-memory 2g \
--executor-cores 1 \
--queue default \
examples/jars/spark-examples*.jar \
10
client mode: 命令提交任务并等待任务完成,spark driver 在 client 上运行,application master 仅用于申请资源
./bin/spark-submit --class path.to.your.Class --master yarn --deploy-mode client [options] <app jar> [app options]
Spark Connect
启动 connect server
cd $SPARK_HOME
./sbin/start-connect-server.sh --packages org.apache.spark:spark-connect_2.12:3.5.3
# check connect server is running
jps
lsof -i :15002
PySpark
Install
Spark Doc - PySpark Getting Started - Installation
pip install pyspark
# Spark SQL
pip install pyspark[sql]
# pandas API on Spark
pip install pyspark[pandas_on_spark] plotly # to plot your data, you can install plotly together.
# Spark Connect
pip install pyspark[connect]
initialize and stop
# 使用spark context
from pyspark import SparkContext
sc = SparkContext()
# some code
sc.stop()
# 使用spark sql
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("test") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
# some code
spark.stop()
# with default local warehouse and metastore
spark = SparkSession \
.builder \
.appName("test") \
.config("spark.sql.warehouse.dir", "file:/path/to/spark/spark-warehouse") \
.config("javax.jdo.option.ConnectionURL", "jdbc:derby:;databaseName=/path/to/spark/metastore_db;create=true") \
.enableHiveSupport() \
.getOrCreate()
check config
conf = spark.sparkContext.getConf().getAll()
for (key, value) in conf:
print(f"{key}: {value}")
read and write
# load parquet file
data = spark.read.parquet("file_name.parquet")
# show dataframe
data.show()
# write json file
data.write.json("filename.json")
DataFrame
import pyspark.sql.functions as F
import pyspark.sql.types as types
# From SQL
df = spark.sql('select name, age from table')
# collect/show
df.show()
df.take(5)
df.collect()
df.take(1)[0].asDict()['name']
# Rename column
df.withColumnRenamed('old_name', 'new_name')
# add column
df.withColumn('new_col', F.lit(1))
# drop column
df.drop('new_col')
# filter
df.filter(F.col('cnt') > 1)
df.where('cnt > 2')
# join
df1.join(df2, df['key1']==df2['key2'], how='inner')
# to RDD
df.rdd
# map with rdd
df.rdd.map(lambda x: (x[0]+1, )).toDF()
# map with udf
func = lambda x: x + 1 # define function
udf_func = F.udf(func, FloatType()) # convert function to UDF
df.withColumn('new_col', udf_func('old_col')) # map function to column
# save to hive
df.createTempView('tmp_table')
spark.sql('''
INSERT OVERWRITE TABLE hive_table PARTITION(ds=20200825)
SELECT name, age FROM tmp_table
''')
df.write.insertInto(target_db.target_table, overwrite = False)
Spark SQL
start spark sql
./bin/spark-sql
usage
SHOW DATABASES;
USE default;
SHOW TABLES;
-- default using local filesystem, write to $SPARK_HOME/spark-warehouse/<table_name>/*.parquet
CREATE TABLE t_test_table (
id INT,
name STRING
)
-- use another location
CREATE TABLE t_test_table (
id INT,
name STRING
) LOCATION '/data/yzchen/Softwares/spark/test-warehouse/t_test_table';
INSERT INTO t_test_table VALUES (1, 'test');
sparkmagic
Install
pip install sparkmagic
pip install ipywidgets
新版本的 IPython 会有兼容问题,修改 lib/python3.12/site-packages/hdijupyterutils/ipythondisplay.py 文件,参考 github issue
# from IPython.core.display import display # comment this line
from IPython.display import display # add this line
修改配置,vi ~/.sparkmagic/config.json,可以参考 example_config.json
{
"livy_session_startup_timeout_seconds": 300,
"livy_server_heartbeat_timeout_seconds": 1000,
"session_configs_defaults": {
"conf": {
"spark.driver.cores": "1",
"spark.driver.memory": "7g"
}
}
}
临时修改配置
%%spark config
{
"conf": {
"spark.driver.cores": "2",
"spark.driver.memory": "14g"
}
}
启动 session
%spark add -s <session-name> -l python -u http://<endpoint> -a username -p <token>
运行 SQL
%%spark
spark.sql("SHOW TABLES").show(2, False)
或者
%%spark -c sql
SHOW TABLES;