news 2026/4/16 10:14:15

优化 PySpark 中的数据处理性能

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
优化 PySpark 中的数据处理性能

原文:towardsdatascience.com/optimizing-the-data-processing-performance-in-pyspark-4b895857c8aa?source=collection_archive---------3-----------------------#2024-11-07

PySpark 技术与策略,解决常见的性能挑战:一个实用的操作指南

https://medium.com/@johnleungTJ?source=post_page---byline--4b895857c8aa--------------------------------https://towardsdatascience.com/?source=post_page---byline--4b895857c8aa-------------------------------- John Leung

·发表于Towards Data Science ·阅读时间:9 分钟·发布日期:2024 年 11 月 7 日

Apache Spark由于其强大的分布式数据处理能力,近年来已成为领先的数据分析引擎之一。PySpark 是 Spark 的 Python API,常用于个人和企业项目中,以解决数据挑战。例如,我们可以使用 PySpark 高效地实现时间序列数据的特征工程,包括数据摄取、提取和可视化。然而,尽管 PySpark 能够处理大规模数据集,但在一些特定场景下,如极端数据分布和复杂的数据转换流程,性能瓶颈仍然可能出现。

本文将探讨在Databricks上使用 PySpark 进行数据处理时常见的性能问题,并介绍各种优化策略,以实现更快的执行速度。

https://github.com/OpenDocCN/towardsdatascience-blog-zh-2024/raw/master/docs/img/7a51dfd6e0bb834e68f2dbd4ac63cace.png

图片来源:Veri Ivanova 来自Unsplash

假设你开设了一家在线零售店,提供多种产品,主要面向美国客户。你计划通过分析当前交易的购买习惯来满足现有客户的更多需求,并吸引更多新客户。这促使你投入大量精力处理交易记录,作为准备步骤。

#0 模拟数据

我们首先模拟了 100 万条交易记录(在实际的大数据场景中,预计会处理更大的数据集),这些记录包含了客户 ID、购买的产品和交易细节,如支付方式和总金额。值得一提的是,客户 ID #100 的产品代理商有着庞大的客户群,因此在你的店铺中占据了大部分代发货的购买。

以下是演示此场景的代码:

importcsvimportdatetimeimportnumpyasnpimportrandom# Remove existing ‘retail_transactions.csv’ file, if any! rm-f/p/a/t/h retail_transactions.csv# Set the no of transactions and othet configsno_of_iterations=1000000data=[]csvFile='retail_transactions.csv'# Open a file in write modewithopen(csvFile,'w',newline='')asf:fieldnames=['orderID','customerID','productID','state','paymentMthd','totalAmt','invoiceTime']writer=csv.DictWriter(f,fieldnames=fieldnames)writer.writeheader()fornuminrange(no_of_iterations):# Create a transaction record with random valuesnew_txn={'orderID':num,'customerID':random.choice([100,random.randint(1,100000)]),'productID':np.random.randint(10000,size=random.randint(1,5)).tolist(),'state':random.choice(['CA','TX','FL','NY','PA','OTHERS']),'paymentMthd':random.choice(['Credit card','Debit card','Digital wallet','Cash on delivery','Cryptocurrency']),'totalAmt':round(random.random()*5000,2),'invoiceTime':datetime.datetime.now().isoformat()}data.append(new_txn)writer.writerows(data)

在模拟数据之后,我们使用 Databrick 的 Jupyter Notebook 将 CSV 文件加载到 PySpark DataFrame 中。

# Set file location and typefile_location="/FileStore/tables/retail_transactions.csv"file_type="csv"# Define CSV optionsschema="orderID INTEGER, customerID INTEGER, productID INTEGER, state STRING, paymentMthd STRING, totalAmt DOUBLE, invoiceTime TIMESTAMP"first_row_is_header="true"delimiter=","# Read CSV files into DataFramedf=spark.read.format(file_type)\.schema(schema)\.option("header",first_row_is_header)\.option("delimiter",delimiter)\.load(file_location)

我们还创建了一个可重用的装饰器工具,用于衡量和比较每个函数内不同方法的执行时间。

importtime# Measure the excution time of a given functiondeftime_decorator(func):defwrapper(*args,**kwargs):begin_time=time.time()output=func(*args,**kwargs)end_time=time.time()print(f"Execution time of function{func.__name__}:{round(end_time-begin_time,2)}seconds.")returnoutputreturnwrapper

好的,所有准备工作已经完成。接下来我们将探讨以下几个章节中执行性能的不同潜在挑战。

#1 存储

Spark 使用弹性分布式数据集(RDD)作为其核心构建块,数据默认通常保存在内存中。无论是执行计算(如连接和聚合)还是在集群中存储数据,所有操作都会在统一区域中贡献内存使用。

https://github.com/OpenDocCN/towardsdatascience-blog-zh-2024/raw/master/docs/img/f7f20c42fa3519fee671c569416280dc.png

一个包含执行内存和存储内存的统一区域(图源:作者)

如果设计不当,可能导致可用内存不足。这会导致过多的分区溢出到磁盘,从而导致性能下降。

缓存和持久化中间结果或频繁访问的数据集是常见的做法。虽然缓存和持久化具有相同的目的,但它们的存储级别可能有所不同。应当合理利用资源,以确保高效的读写操作。

例如,如果转换后的数据会在不同的后续阶段中重复用于计算和算法,建议对这些数据进行缓存。

代码示例:假设我们想要调查使用数字钱包作为支付方式的不同交易记录子集。

frompyspark.sql.functionsimportcol@time_decoratordefwithout_cache(data):# 1st filteringdf2=data.where(col("paymentMthd")=="Digital wallet")count=df2.count()# 2nd filteringdf3=df2.where(col("totalAmt")>2000)count=df3.count()returncount display(without_cache(df))
frompyspark.sql.functionsimportcol@time_decoratordefafter_cache(data):# 1st filtering with cachedf2=data.where(col("paymentMthd")=="Digital wallet").cache()count=df2.count()# 2nd filteringdf3=df2.where(col("totalAmt")>2000)count=df3.count()returncount display(after_cache(df))

缓存之后,即使我们想要根据不同的交易金额阈值或其他数据维度来过滤转换后的数据集,执行时间也会更易于控制。

#2 洗牌

当我们执行如连接 DataFrame 或按数据字段分组的操作时,会发生洗牌。这是必要的,目的是将所有记录重新分布到集群中,并确保具有相同键的记录位于同一个节点。这有助于同时处理并合并结果。

https://github.com/OpenDocCN/towardsdatascience-blog-zh-2024/raw/master/docs/img/ab84ce7db5a3682b102879ebcf4c8646.png

洗牌连接(图源:作者)

然而,这种洗牌操作是代价高昂的——由于数据在节点间的移动,执行时间长且额外的网络开销。

为了减少洗牌操作,有几种策略:

(1) 对于小数据集,使用广播变量,将只读副本发送到每个工作节点进行本地处理

虽然“较小”数据集通常定义为每个执行器最大内存阈值为 8GB,但广播的理想大小应通过针对特定案例的实验来确定。

https://github.com/OpenDocCN/towardsdatascience-blog-zh-2024/raw/master/docs/img/23bfddffdcd8abbc41eace540e8e045e.png

广播连接(作者图片)

(2) 提前过滤,尽早尽可能减少处理的数据量;

(3) 控制分区数量,以确保最佳性能

代码示例:假设我们想返回与我们的州列表匹配的交易记录及其全名

frompyspark.sql.functionsimportcol@time_decoratordefno_broadcast_var(data):# Create small dataframesmall_data=[("CA","California"),("TX","Texas"),("FL","Florida")]small_df=spark.createDataFrame(small_data,["state","stateLF"])# Perform joiningresult_no_broadcast=data.join(small_df,"state")returnresult_no_broadcast.count()display(no_broadcast_var(df))
frompyspark.sql.functionsimportcol,broadcast@time_decoratordefhave_broadcast_var(data):small_data=[("CA","California"),("TX","Texas"),("FL","Florida")]small_df=spark.createDataFrame(small_data,["state","stateFullName"])# Create broadcast variable and perform joiningresult_have_broadcast=data.join(broadcast(small_df),"state")returnresult_have_broadcast.count()display(have_broadcast_var(df))

#3 倾斜性

数据有时会分布不均,尤其是用于处理的键字段。这会导致分区大小不平衡,其中某些分区比平均值大或小得多。

由于执行性能受到最长运行任务的限制,因此需要解决过载节点的问题。

一种常见的方法是加盐。其原理是通过向倾斜键添加随机数,使得数据在分区中更加均匀分布。假设在基于倾斜键进行聚合时,我们将使用加盐后的键进行聚合,然后再使用原始键进行聚合。另一种方法是重新分区,它通过增加分区的数量来帮助数据更均匀地分布。

https://github.com/OpenDocCN/towardsdatascience-blog-zh-2024/raw/master/docs/img/119db606882ab6d12a63942cc007751e.png

数据分布——加盐前后的情况(作者图片)

代码示例:我们想聚合一个不对称的数据集,主要由客户 ID #100 引起的倾斜。

frompyspark.sql.functionsimportcol,desc@time_decoratordefno_salting(data):# Perform aggregationagg_data=data.groupBy("customerID").agg({"totalAmt":"sum"}).sort(desc("sum(totalAmt)"))returnagg_data display(no_salting(df))
frompyspark.sql.functionsimportcol,lit,concat,rand,split,desc@time_decoratordefhave_salting(data):# Salt the customerID by adding the suffixsalted_data=data.withColumn("salt",(rand()*8).cast("int"))\.withColumn("saltedCustomerID",concat(col("customerID"),lit("_"),col("salt")))# Perform aggregationagg_data=salted_data.groupBy("saltedCustomerID").agg({"totalAmt":"sum"})# Remove salt for further aggregationfinal_result=agg_data.withColumn("customerID",split(col("saltedCustomerID"),"_")[0]).groupBy("customerID").agg({"sum(totalAmt)":"sum"}).sort(desc("sum(sum(totalAmt))"))returnfinal_result display(have_salting(df))

向倾斜键添加一个随机的前缀或后缀都可以有效。通常,5 到 10 个随机值是一个很好的起点,可以在扩展数据和保持高复杂性之间取得平衡。

#4 序列化

人们通常更倾向于使用用户定义函数(UDFs),因为它在定制数据处理逻辑方面更灵活。然而,UDFs 是按行逐一操作的。代码需要被 Python 解释器序列化,发送到执行器 JVM,然后再反序列化。这会产生高昂的序列化开销,且阻碍 Spark 对代码的优化和高效处理。

简单直接的方法是尽可能避免使用 UDFs。

我们应首先考虑使用内置 Spark 函数,这些函数可以处理聚合、数组/映射操作、日期/时间戳以及 JSON 数据处理等任务。如果内置函数无法满足你的需求,确实可以考虑使用pandasUDFs。与 UDFs 相比,它们建立在 Apache Arrow 基础上,具有更低的开销和更高的性能。

代码示例:交易价格根据来源州进行折扣。

frompyspark.sql.functionsimportudffrompyspark.sql.typesimportDoubleTypefrompyspark.sqlimportfunctionsasFimportnumpyasnp# UDF to calculate discounted amountdefcalculate_discount(state,amount):ifstate=="CA":returnamount*0.90# 10% offelse:returnamount*0.85# 15% offdiscount_udf=udf(calculate_discount,DoubleType())@time_decoratordefhave_udf(data):# Use the UDFdiscounted_data=data.withColumn("discountedTotalAmt",discount_udf("state","totalAmt"))# Show the resultsreturndiscounted_data.select("customerID","totalAmt","state","discountedTotalAmt").show()display(have_udf(df))
frompyspark.sql.functionsimportwhen@time_decoratordefno_udf(data):# Use when and otherwise to discount the amount based on conditionsdiscounted_data=data.withColumn("discountedTotalAmt",when(data.state=="CA",data.totalAmt*0.90)# 10% off.otherwise(data.totalAmt*0.85))# 15% off# Show the resultsreturndiscounted_data.select("customerID","totalAmt","state","discountedTotalAmt").show()display(no_udf(df))

在这个示例中,我们使用内置的 PySpark 函数“when”和“otherwise”来有效地按顺序检查多个条件。基于我们对这些函数的熟悉,示例几乎是无限的。例如,pyspark.sql.functions.transform,一个帮助对输入数组中的每个元素应用转换的函数,自 PySpark 3.1.0 版本开始引入。

#5 溢出

如在存储部分讨论的那样,溢出是由于内存不足以容纳所有所需数据,导致将临时数据从内存写入磁盘。我们提到的许多性能问题都与溢出有关。例如,在分区之间洗牌大量数据的操作,容易导致内存耗尽并随之发生溢出。

https://github.com/OpenDocCN/towardsdatascience-blog-zh-2024/raw/master/docs/img/8e0391d49e71ebd4918378b38ed64fe2.png

由于内存不足引起的溢出不同场景(图像由作者提供)

审查 Spark UI 中的性能指标至关重要。如果我们发现溢出(内存)和溢出(磁盘)的统计数据,那么溢出可能是长时间运行任务的原因。为了解决这个问题,可以尝试实例化一个每个工作节点有更多内存的集群,例如通过调节配置值spark.executor.memory来增加执行进程的内存大小;另外,我们还可以配置spark.memory.fraction来调整分配给执行和存储的内存量。

总结

我们遇到了一些常见的导致 PySpark 性能下降的因素,以及可能的改进方法:

最近,自适应查询执行(AQE)被提出用于基于运行时统计信息对查询进行动态规划和重新规划。这支持查询执行过程中发生的不同查询重新优化特性,从而成为一种出色的优化技术。然而,在初期设计阶段理解数据特征仍然至关重要,因为这有助于制定更好的策略,以编写有效的代码和查询,并利用 AQE 进行微调。

在你离开之前

如果你喜欢这篇文章,欢迎关注我的Medium 页面和LinkedIn 页面。通过这样做,你可以及时获取有关数据科学副项目、机器学习运维(MLOps)示范以及项目管理方法学的精彩内容。

## 简化数据工程项目中的 Python 代码

用于数据摄取、验证、处理和测试的 Python 技巧与技术:实用的操作流程

towardsdatascience.com ## 使用 PySpark 在 Databricks 上进行时间序列特征工程

探索 PySpark 在时间序列数据中的潜力:摄取、提取和可视化数据,并附带实践…

towardsdatascience.com

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/15 4:07:41

最优分配与匈牙利算法

原文:towardsdatascience.com/optimum-assignment-and-the-hungarian-algorithm-8b1027628028?sourcecollection_archive---------1-----------------------#2024-07-07 https://github.com/OpenDocCN/towardsdatascience-blog-zh-2024/raw/master/docs/img/55cb3b…

作者头像 李华
网站建设 2026/4/11 23:37:45

在 Azure 中编排动态时间序列管道

原文:towardsdatascience.com/orchestrating-a-dynamic-time-series-pipeline-with-azure-data-factory-and-databricks-810819608231?sourcecollection_archive---------9-----------------------#2024-05-31 探索如何使用 Azure Data Factory(ADF&…

作者头像 李华
网站建设 2026/4/11 11:29:24

使用UI-TARS-desktop构建智能爬虫系统

使用UI-TARS-desktop构建智能爬虫系统 1. 引言 传统的网页爬虫开发总是让人头疼不已——需要分析网页结构、编写复杂的XPath或CSS选择器、处理动态加载内容,还要应对网站改版带来的各种问题。每次目标网站稍有变动,整个爬虫就可能失效,维护…

作者头像 李华
网站建设 2026/4/12 14:13:37

突破帧率桎梏:WaveTools性能优化引擎的技术架构与硬件适配方案

突破帧率桎梏:WaveTools性能优化引擎的技术架构与硬件适配方案 【免费下载链接】WaveTools 🧰鸣潮工具箱 项目地址: https://gitcode.com/gh_mirrors/wa/WaveTools 游戏性能瓶颈突破与硬件适配方案是当前玩家面临的核心挑战。WaveTools性能优化引…

作者头像 李华