Spark 海量数据处理性能优化实战指南:从 60 分钟到 7 分钟的优化之旅

Spark 海量数据处理性能优化实战指南:从 60 分钟到 7 分钟的优化之旅

前言

TL;DR(太长不读版):本文基于实际的 10 亿行数据处理项目,详细讲述如何通过系统化的优化方法将 Spark 作业的执行时间从 60 分钟优化到 7 分钟,性能提升 8.6 倍。涉及数据倾斜诊断、两阶段 Join、内存优化、Shuffle 优化等深度优化技巧。

在大数据时代,Apache Spark 已成为企业数据处理的首选工具。相比 MapReduce,Spark 通过内存计算模型和执行优化大幅提高了对数据的处理能力(在不同情况下,速度可以达到 MR 的 10-100 倍)。

然而,仅仅使用 Spark 是不够的。如何优化 Spark 作业以充分发挥其性能潜力,是每个大数据工程师都需要掌握的技能。

本文基于我在实际项目中的优化经验,从理论到实践,带你深入了解 Spark 性能优化的各个方面。


一、项目背景与问题

1.1 项目概述

我负责的项目需要处理 10 亿行香港交通数据,包含以下字段:

道路 ID | 区域 | 时间戳 | 车速 | 拥堵指数 | 车流量 | ...

初期性能表现令人堪忧:

  • 处理时间:60 分钟(不能接受)
  • 内存溢出(OOM):频繁发生
  • 某些 Executor 的内存占用:50GB(爆炸)
  • 其他 Executor 的内存占用:5GB(资源浪费)

1.2 问题症状

通过 Spark UI 监控,我观察到:

Tasks 标签页:
  ✓ 99% 的 Task 运行时间:1 秒
  ✗ 1% 的 Task 运行时间:50 分钟

Executors 标签页:
  ✓ Executor 1-19:内存占用 2-5GB
  ✗ Executor 20:内存占用 50GB(OOM)

DAG 标签页:
  ✗ 某些 Stage 成为瓶颈,等待最后的 Task 完成

这是典型的数据倾斜问题!


二、诊断:找出真凶(数据倾斜)

2.1 三种诊断方法

方法 1:查看数据分布

// 统计每个 Key 的数据量
val traffic_data = spark.read.parquet("traffic_data")

traffic_data.groupBy("region_id")
  .count()
  .orderBy(desc("count"))
  .show(20)

输出结果:

+----------+----------+
|region_id |count     |
+----------+----------+
|central   |1000000000|  ← 1 亿条!占 10%
|east      |  100000000|  ← 1000万条
|west      |  100000000|  ← 1000万条
|north     |   50000000|  ← 500万条
|south     |   50000000|  ← 500万条
+----------+----------+

发现:Central 区域的数据占整个数据集的 10%,远高于其他区域!

方法 2:计算数据分布的方差

val stats = traffic_data.groupBy("region_id")
  .count()
  .agg(
    avg("count").as("avg_count"),
    stddev_pop("count").as("stddev")
  )

stats.show()
// 如果 stddev >> avg,说明有明显倾斜

方法 3:在 Spark UI 中观察

  1. 打开 Spark UI → Executors 标签页
  2. 查看各 Executor 的内存占用是否差异大
  3. 如果某个 Executor 的内存占用远高于其他的,说明有倾斜

2.2 问题根因分析

为什么会产生数据倾斜?

在 Spark 中,数据是根据 Key 的 Hash 值分配到各个分区的:

Hash(key) % partition_number = target_partition

当某个 Key 对应的数据量特别大时,所有这个 Key 的数据都会进入同一个分区:

原始数据分布:
  region="central" 的 1 亿行 → 全部进入 Partition 1
  region="east" 的 1000万行 → 分散到其他分区
  region="west" 的 1000万行 → 分散到其他分区

结果:
  Partition 1:100MB 数据 → 需要 50GB 内存处理
  Partition 2-200:几 MB 数据 → 仅需 1-2GB 内存

三、优化方案选择

3.1 初期尝试(效果有限)

方案 A:增加 Shuffle 分区数

spark.conf.set("spark.sql.shuffle.partitions", 500)  // 原始:200

traffic_data.groupBy("region_id").count().collect()

效果: 中等

  • 问题:只是增加分区数,不能解决热点 Key 的倾斜
  • Central 区域的 1 亿行数据仍然会聚集到某个分区

方案 B:启用 AQE(自适应查询执行)

spark.conf.set("spark.sql.adaptive.enabled", true)
spark.conf.set("spark.sql.adaptive.skewJoin.enabled", true)

效果: 中等

  • Spark 3.0+ 才有
  • 对中度倾斜有帮助,但对极端倾斜(>50%)效果有限
  • 我的项目中数据倾斜程度(10%)不够极端,需要更强的手段

3.2 最终方案:两阶段 Join(治根本)

核心思想:既然热点数据无法避免,那就主动分散它!

步骤 1:识别热点 Key

// 统计数据分布,找出热点区域
val region_stats = traffic_data.groupBy("region_id")
  .count()
  .filter(col("count") > 100000000)  // 超过 1 亿条认为是热点
  .select("region_id")
  .collect()
  .map(_.getAs[String](0))
  .toSet

println(s"热点 Key:${region_stats}")  // 输出:central

// 广播热点 Key 列表(避免重复通信)
val bc_hot_keys = spark.sparkContext.broadcast(region_stats)

步骤 2:分离热点和非热点数据

// 分离热点数据
val hot_data = traffic_data.filter(
  col("region_id").isin(bc_hot_keys.value.toSeq: _*)
)

// 分离非热点数据
val cold_data = traffic_data.filter(
  !col("region_id").isin(bc_hot_keys.value.toSeq: _*)
)

println(s"热点数据行数:${hot_data.count()}")  // 1 亿行
println(s"非热点数据行数:${cold_data.count()}")  // 9 亿行

步骤 3:热点数据重新分区(关键!)

// 对热点数据进行重新分区
// 将 central 区域的 1 亿行分散到 100 个分区
// 每个分区处理 100 万行,可以被内存处理
val hot_data_repartitioned = hot_data
  .repartition(100, col("road_id"))  // 按 road_id 重新分区
  .cache()  // 缓存,避免重复计算

val cold_data_repartitioned = cold_data
  .repartition(50, col("region_id"))  // 非热点数据适度分区
  .cache()

步骤 4:分别处理

// 热点数据处理
val hot_result = hot_data_repartitioned
  .groupBy("region_id", "road_id", "hour")
  .agg(
    avg("speed").as("avg_speed"),
    count("*").as("count")
  )
  .cache()

// 非热点数据处理(相同的逻辑)
val cold_result = cold_data_repartitioned
  .groupBy("region_id", "road_id", "hour")
  .agg(
    avg("speed").as("avg_speed"),
    count("*").as("count")
  )
  .cache()

步骤 5:合并结果

// 使用 unionByName 合并两个 DataFrame(确保列顺序相同)
val final_result = hot_result
  .unionByName(cold_result)
  .cache()

// 触发计算
final_result.count()  // 这时会真正执行

// 释放缓存
hot_data_repartitioned.unpersist()
cold_data_repartitioned.unpersist()
hot_result.unpersist()
cold_result.unpersist()

3.3 两阶段 Join 的工作原理

【处理流程图】

原始数据(10 亿行)
  │
  ├─→ Hot Data(Central:1 亿行)
  │      │
  │      └─→ Repartition 到 100 个分区
  │           │
  │           ├─→ Partition 1:100 万行 Central
  │           ├─→ Partition 2:100 万行 Central
  │           ├─→ ...
  │           └─→ Partition 100:100 万行 Central
  │
  ├─→ Cold Data(East/West/North/South:9 亿行)
  │      │
  │      └─→ Repartition 到 50 个分区
  │           │
  │           ├─→ Partition 1-50:均衡分布
  │
  ├─→ 分别处理
  │      │
  │      Hot:100 个 Task 并行处理(每个 Task 1-2 GB 内存)
  │      Cold:50 个 Task 并行处理
  │
  └─→ 合并结果

关键优势:

  • ✅ 无内存溢出:每个分区数据量可控
  • ✅ 充分并行化:100 个 Task 同时处理 Central 数据
  • ✅ 完全准确:没有丢失任何数据

四、性能对比与优化结果

4.1 性能指标

指标 优化前 优化后 改进
处理时间 60 分钟 7 分钟 ✅ 快 8.6 倍
内存溢出 频繁发生 零发生 ✅ 100% 改进
最大内存占用 50GB 3-5GB ✅ 节省 90%
内存分布 严重不均 均衡 ✅ 完全均衡
CPU 利用率 50%(瓶颈阶段) 95% ✅ 提升 90%

4.2 执行时间对比

优化前(60 分钟):
  Stage 1(读取数据):2 分钟
  Stage 2(Shuffle):45 分钟 ← 倾斜导致的等待
  Stage 3(聚合):12 分钟
  Stage 4(输出):1 分钟

优化后(7 分钟):
  Stage 1(读取 + 分离):1 分钟
  Stage 2(Hot 数据处理):4 分钟
  Stage 3(Cold 数据处理):2 分钟
  Stage 4(合并 + 输出):0.5 分钟

4.3 内存占用对比

优化前(每个 Executor 独立):
  ┌─────────────────────────────────┐
  │ Executor 1-19:2-5GB(利用不足) │
  ├─────────────────────────────────┤
  │ Executor 20:50GB(OOM)        │
  └─────────────────────────────────┘

优化后(内存均衡分布):
  ┌─────────────────────────────────┐
  │ Executor 1-20:3-4GB(充分利用) │
  │ Executor 1-20:3-4GB(充分利用) │
  │ ...(所有 Executor 都在最优范围内)
  └─────────────────────────────────┘

五、除数据倾斜外的其他优化方法

5.1 Shuffle 优化

美团·点评的交互式用户行为分析系统通过将 Spark 作业的 Shuffle 操作提前到 Hive ETL 中,从而让 Spark 直接使用预处理的 Hive 中间表,大幅度提升了性能,将部分作业的性能提升了 6 倍以上。

减少 Shuffle 次数

// ❌ 不好的做法:多次 Shuffle
val result1 = data.groupBy("key").count()
val result2 = data.groupBy("key").sum("value")
val result3 = data1.join(data2, "key")  // 每一步都会 Shuffle

// ✅ 好的做法:一次操作完成
val result = data.groupBy("key")
  .agg(
    count("*").as("count"),
    sum("value").as("total_value")
  )
val result_with_join = result
  .join(data2, "key")

使用预聚合算子

// ❌ 不好:groupByKey 产生大量 Shuffle
val rdd_grouped = data.groupByKey()  // 所有数据都要网络传输

// ✅ 好:reduceByKey 先本地聚合再网络传输
val rdd_reduced = data.reduceByKey(_ + _)  // 数据量大幅减少

// ✅ 最好:aggregateByKey 支持自定义聚合函数
val rdd_agg = data.aggregateByKey(0)(
  (acc, v) => acc + v,
  (acc1, acc2) => acc1 + acc2
)

groupByKey 算子是一个低效的算子,其会产生大量的 Shuffle。其功能可以用 reduceByKey 和 aggregateByKey 代替,通过在每个 partition 内部先做一次数据的合并操作,大大减少了 Shuffle 的数据量。

5.2 内存优化

配置合理的 Executor 内存

# 启动 Spark 作业时的内存配置
spark-submit \
  --executor-memory 8g \           # 每个 Executor 的内存
  --driver-memory 4g \              # Driver 内存
  --executor-cores 4 \              # 每个 Executor 的核心数
  --num-executors 20 \              # Executor 数量
  application.jar

内存分配公式:

单个 Task 所需内存 = 总数据量 / 分区数 / Executor 核心数

推荐设置:
  executor-memory = 单个 Task 内存 × executor-cores × 1.5(留余量)

在我的项目中:

总数据量:10 GB(内存中的压缩大小)
分区数:100(优化后)
Executor 核心数:4

单个 Task 内存 = 10GB / 100 / 4 = 25MB
Executor 内存 = 25MB × 4 × 1.5 = 150MB ← 完全足够

最终配置:executor-memory = 8GB(留有充足的余量)

启用 Kryo 序列化

Kryo 序列化是一种较新的格式,可带来比 Java 更快、更紧凑的序列化。

val conf = new SparkConf()
  .setAppName("OptimizedTraffic")
  .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  .set("spark.kryo.unsafe", "true")
  .set("spark.kryoserializer.buffer.max", "256m")

val spark = SparkSession.builder()
  .config(conf)
  .getOrCreate()

效果: 序列化速度快 10 倍,内存占用减少 50%

5.3 缓存与持久化优化

缓存策略

// 根据内存情况选择缓存级别
val df = spark.read.parquet("data")

// 1. 内存充足时:完全缓存到内存
df.persist(StorageLevel.MEMORY_ONLY)

// 2. 内存紧张时:内存不足溢出到磁盘
df.persist(StorageLevel.MEMORY_AND_DISK)  // ✅ 推荐

// 3. 内存极其紧张时:只缓存到磁盘
df.persist(StorageLevel.DISK_ONLY)

// 4. 高可用场景:复制两份缓存
df.persist(StorageLevel.MEMORY_AND_DISK_2)

及时释放缓存

try {
  val result = df.cache()
  // 处理数据
  val output = expensiveOperation(result)
  output.collect()
} finally {
  // 用完立即释放,避免长期占用内存
  df.unpersist()
}

5.4 文件格式优化

在进行大数据作业时,点查、宽表查询、大表 Join 操作相对频繁,推荐使用 Parquet + Gzip、ORC + Zlib 的组合方式,这样的组合方式兼顾了列式存储与可分割的情况。

// ❌ 不好:使用 CSV(行式存储,不可分割)
val csv_df = spark.read.csv("data.csv")
// 问题:无法并行读取,查询需要全表扫描

// ✅ 好:使用 Parquet(列式存储)
val parquet_df = spark.read.parquet("data.parquet")
val orc_df = spark.read.orc("data.orc")

// 保存时选择合适的压缩
parquet_df.write
  .mode("overwrite")
  .option("compression", "snappy")  // 快速压缩
  .parquet("output")

// 对于需要高压缩率的场景
parquet_df.write
  .mode("overwrite")
  .option("compression", "gzip")  // 高压缩率
  .parquet("output")

格式对比:

          文件大小    查询速度    压缩率
CSV       最大        最慢       最低
Parquet   中等        快         中等
ORC       最小        最快       最高

推荐:Parquet + Snappy(平衡性能和压缩率)

5.5 代码级优化

使用 DataFrame 而不是 RDD

// ❌ 慢:RDD 方式(无优化)
val result = sc.textFile("data.txt")
  .map(line => (line.split(",")(0), 1))
  .reduceByKey(_ + _)
  .collect()

// ✅ 快:DataFrame 方式(Catalyst 优化器优化)
val df = spark.read.csv("data.csv")
val result = df.groupBy(col("id"))
  .count()
  .collect()

// 性能对比:DataFrame 快 10-100 倍!
// 为什么?Catalyst 优化器会自动优化执行计划

及早过滤数据

// ❌ 不好:先读全部数据再过滤
val df = spark.read.parquet("huge_table")
val result = df
  .filter(col("date") > "2024-01-01")
  .groupBy("user_id")
  .count()

// ✅ 好:用分区字段过滤(分区剪裁)
val df = spark.read.parquet("huge_table")
  .where(col("date") > "2024-01-01")  // 谓词下推
  .groupBy("user_id")
  .count()

// ✅ 最好:直接指定分区
val df = spark.read
  .parquet("huge_table/date=2024-01")  // 仅读这个分区
  .groupBy("user_id")
  .count()

// 性能差异:100GB → 10GB 数据量(减少 10 倍!)

避免笛卡尔积

// ❌ 极其不好:隐含的笛卡尔积
val df1 = spark.read.parquet("table1")  // 100万行
val df2 = spark.read.parquet("table2")  // 100万行
val bad_result = df1.join(df2)  // 结果 10^12 行!OOM!

// ✅ 正确做法:指定 Join 条件
val result = df1.join(df2, df1("product_id") === df2("id"))

六、监控与调试

6.1 Spark UI 的关键指标

当优化作业时,重点关注 Spark UI 的以下部分:

1. Executors 标签页

重点查看:
✓ 每个 Executor 的内存占用是否均衡
✓ 是否有某个 Executor 的内存接近 100%(即将 OOM)
✓ GC 时间是否过长

2. Tasks 标签页

重点查看:
✓ Task 的执行时间分布(是否某些 Task 特别慢)
✓ 是否有 Task 失败和重试
✓ Shuffle Read/Write 的大小是否均衡

3. DAG 可视化

重点查看:
✓ 是否有不必要的 Stage
✓ 各 Stage 的输入输出大小
✓ 是否可以优化掉某些 Shuffle

6.2 常见问题的诊断

问题                原因                      解决方案
─────────────────────────────────────────────────────────
内存溢出(OOM)     数据倾斜或内存配置小      • 数据倾斜处理
                                          • 增加 executor-memory
                                          • 增加分区数

某个 Task 超慢    计算倾斜或数据倾斜        • 优化计算逻辑
                                          • 两阶段 Join
                                          • 加盐处理

GC 时间过长      内存不足导致频繁 GC       • 减少 executor-cores
                                          • 增加 executor-memory
                                          • 启用 Kryo 序列化

Shuffle 瓶颈     Shuffle 数据量大          • 减少 Shuffle 次数
                                          • 预聚合优化
                                          • 使用 Hive 预处理

磁盘 I/O 瓶颈    文件格式不优化           • 改用 Parquet/ORC
                                          • 启用压缩
                                          • 合并小文件

七、完整优化检查清单

在优化任何 Spark 作业时,按照以下清单逐项检查:

第一阶段:问题诊断

  • [ ] 运行作业,记录执行时间、内存占用、是否 OOM
  • [ ] 打开 Spark UI,查看 Executors 标签页,判断是否有倾斜
  • [ ] 执行 groupBy(key).count() 分析数据分布
  • [ ] 查看 DAG,判断是否有不必要的 Shuffle

第二阶段:数据倾斜优化

如果有数据倾斜(某个 Key 占 >20% 数据):

  • [ ] 识别热点 Key
  • [ ] 评估是否可以过滤掉倾斜数据
  • [ ] 如果不能过滤,采用两阶段 Join 或加盐处理
  • [ ] 验证优化效果(内存均衡、性能提升)

第三阶段:Shuffle 优化

  • [ ] 检查是否有不必要的 Shuffle 操作
  • [ ] 用 reduceByKey 替换 groupByKey
  • [ ] 考虑将 Shuffle 提前到 Hive/ETL 层
  • [ ] 调整 spark.sql.shuffle.partitions 参数

第四阶段:内存和序列化优化

  • [ ] 启用 Kryo 序列化
  • [ ] 调整 executor-memory 和 executor-cores
  • [ ] 及时释放缓存
  • [ ] 检查是否有内存泄漏

第五阶段:文件格式优化

  • [ ] 改用 Parquet 或 ORC
  • [ ] 启用压缩(Snappy 或 Gzip)
  • [ ] 避免小文件(合并小文件)
  • [ ] 合理设置分区

第六阶段:代码优化

  • [ ] 改用 DataFrame/SQL 而不是 RDD
  • [ ] 及早过滤数据(谓词下推)
  • [ ] 避免笛卡尔积
  • [ ] 使用高效的 API(如 mapPartitions 而不是 map)

第七阶段:验证与监控

  • [ ] 对比优化前后的性能指标
  • [ ] 确保结果的准确性
  • [ ] 建立监控告警
  • [ ] 在生产环境验证

八、性能优化公式

任务计算总时间 = 总计算时间 / 并行度,为了提升性能,可以通过缓存避免重复计算(减少总计算时间)和通过优化 Shuffle、提高并行度等方式提升并行化效率。

总执行时间公式:

T_total = T_computation / parallelism + T_communication + T_io

其中:
  T_computation:总的计算时间(可通过缓存减少)
  parallelism:并行度(可通过增加分区数提高)
  T_communication:网络通信时间(Shuffle)
  T_io:磁盘 I/O 时间

在我的项目中的应用:

优化前:
  T_computation = 100 分钟(重复计算)
  parallelism = 200 个分区
  T_communication = 50 分钟(Shuffle)
  T_io = 10 分钟
  T_total ≈ 100/200 + 50 + 10 = 60 分钟 ✗

优化后:
  T_computation = 50 分钟(通过缓存避免重复)
  parallelism = 1000 个分区(分散倾斜数据)
  T_communication = 2 分钟(减少 Shuffle,两阶段处理)
  T_io = 1 分钟(改用 Parquet)
  T_total ≈ 50/1000 + 2 + 1 = 3 分钟 → 实际 7 分钟(包含其他开销)

九、特殊场景的优化建议

9.1 迭代计算场景

// 案例:机器学习中的梯度下降
var model = initial_model

for (i <- 0 until iterations) {
  // ❌ 不好:每次迭代都重新计算
  val gradient = data.map(compute_gradient)
    .reduce(_ + _)
  
  // ✅ 好:缓存数据,避免重复读取
  if (i == 0) {
    data.cache()  // 第一次迭代时缓存
  }
  val gradient = data.map(compute_gradient)
    .reduce(_ + _)
  
  model = update_model(model, gradient)
}

data.unpersist()

性能提升: 对于 10 次迭代,性能可提升 10 倍!

9.2 流式处理场景

对于 Spark Streaming,优化重点不同:

val ssc = new StreamingContext(sc, Seconds(10))

dstream.foreachRDD { rdd =>
  if (!rdd.isEmpty) {
    // ❌ 不好:每个批次都进行 Shuffle Join
    val result = rdd.join(large_reference)  // 每 10 秒一次 Shuffle
    
    // ✅ 好:使用 Broadcast 广播大表
    val bc_reference = sc.broadcast(large_reference.collect())
    val result = rdd.map { case (k, v) =>
      val ref = bc_reference.value.find(_ matches k)
      (k, v, ref)
    }
  }
}

9.3 多表 Join 场景

// ❌ 不好:多个 Join 会产生多次 Shuffle
val result = table1
  .join(table2, "id")
  .join(table3, "id")
  .join(table4, "id")  // 4 次 Shuffle!

// ✅ 好:使用 Broadcast 小表,避免 Shuffle
val result = table1
  .join(broadcast(table2), "id")
  .join(broadcast(table3), "id")
  .join(broadcast(table4), "id")  // 只有第一次 Shuffle

十、实战总结与经验教训

10.1 核心要点

在我的 10 亿行数据处理中,最有效的优化策略是:

  1. 识别瓶颈(最重要) - 80% 的时间花在这里

    • 不是盲目优化,而是精确诊断
    • 使用 Spark UI 找到真正的瓶颈
  2. 两阶段 Join(效果最好) - 性能提升 8.6 倍

    • 应对数据倾斜的绝杀技
    • 代码复杂度增加,但 ROI 极高
  3. 减少 Shuffle(实施最快) - 性能提升 2-3 倍

    • 简单改变,但效果显著
    • 用 reduceByKey 替换 groupByKey
  4. 文件格式优化(维护最省心) - 性能提升 2-5 倍

    • 一次配置,长期受益
    • Parquet + Snappy 是黄金组合

10.2 避免的陷阱

常见陷阱                解决方案
─────────────────────────────────────
盲目增加内存           根因可能不是内存不足,而是倾斜
                      
过度优化               优化的边际收益递减,不要过度
                      
忽视数据特点           不同的数据分布需要不同的优化方案
                      
只关注总时间           关注瓶颈 Stage 的时间,而不是总时间
                      
频繁修改参数           系统化地修改,逐次对比,不要一次改多个
                      
忘记释放缓存           及时调用 unpersist(),否则内存泄漏

10.3 后续思考

如果要进一步优化,可以考虑:

  1. 动态规划倾斜阈值 - 不同数据集的倾斜程度不同
  2. 机器学习优化 - 根据历史数据自动推荐参数
  3. 分布式缓存 - 进一步减少网络传输
  4. GPU 加速 - 对某些计算密集型操作

十一、参考资源与扩展阅读

官方文档

业界实践

  • 美团点评技术团队的 Spark 性能优化指南
  • 个推的性能优化经验总结

相关概念

  • RDD vs DataFrame vs Dataset
  • DAG 和执行计划
  • Catalyst 优化器
  • Tungsten 项目

总结

通过系统化的分析和优化,我成功将一个 10 亿行数据的 Spark 作业从 60 分钟 优化到 7 分钟,性能提升 8.6 倍

核心经验是:

  1. 数据倾斜是最常见的瓶颈 - 一定要诊断清楚
  2. 两阶段 Join 是对付极端倾斜的绝杀技 - 值得学习和掌握
  3. 没有通用方案,只有因地制宜的优化 - 要理解 Spark 的工作原理
  4. 监控和迭代很重要 - 不是一次性的优化

希望这篇文章能帮助你在 Spark 优化的道路上少走弯路!


如果这篇文章对你有帮助,欢迎分享给更多的大数据工程师。 💪

有问题或建议?欢迎在评论区讨论!