Laurenfrost's Blog.
今朝有酒今朝醉,明日愁来明日愁。
From Databricks to Dataproc
Spark 迁移之痛
Contents
成为班逼的第 5 个年头马上就要过去了,当年的萌新彻底成为了老油条。期间先是在 Azure 上折腾 Databricks 环境,后面一转 AWS 云架构开发。到了今年年初,老板大手一挥我干了大半年的 AWS 迁移就停了,开始 allin GCP。考了两个云的 Cloud Architect Professional,结果屁也用不上,属于是三大云厂商的浑水都趟了个遍。这个从入职开始就挂在我头上 Spark 黑锅也跟着我转战南北,直到前几个月开始的迁移+降本增效任务,这个 Spark 数据处理项目走到了 GCP 的 Dataproc 面前。
Dataproc 作为 GCP 提供的 ETL 环境,其实提供了两种解法:Dataproc Serverless 和 Dataproc Cluster。后者相当于传统的 Spark 集群构建思路,选择合适的 Compute Engine 实例在上面安装 Spark 之类的环境然后自己往里提交任务,GCP 另外再提供一套正常类似于 EC2 的 AutoScaling 机制协助你的集群扩容。
| 服务 | Dataproc Serverless | Dataproc Cluster |
|---|---|---|
| 处理框架 | Spark | Spark/Hive/Flink/Trino/Kafka |
| 无服务器 | 是 | 否 |
| 启动时间 | 60 秒 | 90 秒 |
| 基础架构控制 | 否 | 是 |
| 资源管理 | 基于 Spark | 基于 YARN |
| GPU 支持 | 是 | 是 |
| 交互式会话 | 是 | 否 |
| 自定义容器 | 是 | 否 |
| 直接 SSH 到虚拟机 | 否 | 是 |
| Java 版本 | Java 17、11 | 支持过往版本 |
** *这就是 2025 年 12 月时点上GCP 官方文档里的比较
Dataproc Serverless 就不似这种胭脂俗粉,你只要在随便哪个 Juypter Notebook 里 import 一下它的包,然后创建一个 SparkSession,云上自动就会为你分配好一个 Spark 计算资源。一看到 “Serverless” 我这个被云厂商调教成的巴普洛夫的狗就汪汪叫了:啊!彻底摒弃复杂的计算资源管理,计算 Worker 节点的大小还有它们之间的通信也不用操心了,一切都交给伟大的 Google,他会用神奇的魔法消弭一切伤痛。
# pip install dataproc_spark_connect
# 题外话:当然如果你直接在 BigQuery Notebook 的界面里直接运行就不用 install 了
# 不过那个环境本质是 Vertex Colab Enterprice 提供的 Notebook Runtime
from google.cloud.dataproc_spark_connect import DataprocSparkSession
from google.cloud.dataproc_v1 import Session
import pyspark.sql.functions as f
session = Session()
# 创建 Spark Session,就像往常一样
spark = (
DataprocSparkSession.builder
.appName("APP_NAME")
.dataprocSessionConfig(session)
.getOrCreate()
)
# 然后用就完了
df = spark.read.format("bigquery").option("table", "bigquery-public-data.samples.shakespeare").load()
df.show()
是的,当你执行完上面代码中 DataprocSparkSession.builder.getOrCreate() 的部分后,Jupyter 界面上会出现一个缓慢行走的进度条,告诉你它正在创建一个 DataprocSparkSession。

是的,这个 Session 是实际存在于 GCP 的计算资源。每一次执行 getOrCreate() 都会创建一个 Session,然后你可以在 GCP 的管理界面看到它们:

在这个界面上点开你刚刚创建的 Session,你可以检视这个 Spark Session 的一切设置。 点开 Monitoring 你可以看到各个 Exector 的工作情况。 你甚至可以通过 Google Cloud Logging Query 查询整个项目的日志信息。

哇!这是现代计算设施的力量!这是云原生的胜利!荣誉归于伟大的 Google!
现在我要开始我的迁移了!把我那乱七八糟的 Notebooks 从昂贵的 Databricks 环境中抢救出来,让它们在 GCP 的自由空间中撒欢吧!
Dataproc Serverless 的苦痛
一切都是如此美好,就像刚刚结识的女友一样。等过了一开始的兴奋劲头,就会不断认识到她的各种缺点:Dataproc Serverless 什么都好,就是不好用。
SparkSession 启动缓慢,而且会因为网络问题失败
官方文档上说 “启动时间 60 秒”,实际上启动时间远不止这些。如果一切顺利的话它才能在 60 秒内搞定,万一中间出了什么岔子:比如恰好当前可用区里资源不够用(而这种情况经常发生),它就不得不去当前 Region 的其他可用区里寻找资源。有时候三个可用区里全都没有资源,那这个时候就只能干瞪眼了。
Dataproc 在设计上出于安全考虑,它是不对外直接暴露 Spark 的端口的。GCP 自家 Compute Engine 就有这样的操作:当你要访问的计算实例没有公共 IP,或者没有设置 SSH 密钥无法直接通过 SSH 访问的时候,可以简单的通过 gcloud 的命令行操作连接到 VM 内:
gcloud compute ssh <instance-id> --project <project-id> --region <region-id>
而且这个 Python 库本质是创建了一个代理,把本地 PySpark 客户端的网络请求送到 GCP 中,然后 GCP 再把数据包交给你刚创建的 Session。
# https://github.com/GoogleCloudDataproc/dataproc-spark-connect-python/blob/9228492bb4aefd30c3cc1b27948aabcc07a729c0/google/cloud/dataproc_spark_connect/client/proxy.py#L62
def connect_tcp_bridge(hostname):
path = "tcp-over-websocket-bridge/35218cb7-1201-4940-89e8-48d8f03fed96"
creds, _ = googleauth.default(
scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
creds.refresh(googleauthrequests.Request())
return websocketclient.connect(
f"wss://{hostname}/{path}",
additional_headers={"Authorization": f"Bearer {creds.token}"},
open_timeout=30,
)
在构建上面的 tcp bridge 的同时,会在本地 127.0.0.1 开一个端口:
# https://github.com/GoogleCloudDataproc/dataproc-spark-connect-python/blob/9228492bb4aefd30c3cc1b27948aabcc07a729c0/google/cloud/dataproc_spark_connect/client/proxy.py#L217
def _run(self, s):
with socket.create_server(("127.0.0.1", self._port)) as frontend_socket:
if self._port == 0:
self._port = frontend_socket.getsockname()[1]
s.release()
while not self._killed:
conn, addr = frontend_socket.accept()
logger.debug(f"Accepted a connection from {addr}...")
self._conn_number += 1
threading.Thread(
target=forward_connection,
args=[self._conn_number, conn, addr, self._target_host],
daemon=True,
).start()
最后它再通过这个 proxy,尝试跟 Dataproc 构建 GRPC channel,而这也是正常 SparkSession 与 Cluster 建立连接的方式。
但不知道为什么这个 proxy 在本地创建的端口并不总能跟 GRPC channel 要连接的端口对上,最终 GRPC Connection Timeout。然后整个体验就变成:我成功创建了一个 DataprocSparkSession(因为创建云资源只需要访问 Google Cloud 的 Dataproc API),然后进度卡在连接到刚创建的 SparkSession 这一步。5 分钟后(默认的 GRPC Connection Timeout 时间),这一步就失败了。
失败也就罢了,关键是创建的这个 DataprocSparkSession 并没有随着连接构建失败自动终止,它还在继续花钱… 我理解因为这两部分完全是分开的:一个是云资源的操作,一个是对本地网络环境的调整,它们很难也不应该直接耦合在一起。只是作为用户这就很让人恼火了:如果我重新尝试运行这个代码块,它不会复用已经存在的 DataprocSparkSession,而是重新再创建一个新的。明明这个方法叫 getOrCreate(),结果完全是 noGetOnlyCreate()。
感觉这是库本身的毛病,作为上游库我也拿他没有办法。
依赖库很难安装,装上去了也不会共享给 Executor
在解决网络问题之后,现在需要为我的机器学习脚本准备运行环境。为了减少迁移对业务的影响,我需要尽量把之前在 Databricks 上运行的机器学习库搬过来。这时候问题就来了,Dataproc Serverless 并没有像 Databricks 的依赖库配置界面,我没有办法直接在某个管理界面直接输入 某个库的 maven 坐标一键安装。

为了解决这个问题,我尝试在创建 SparkSession 的时候通过 Spark config 来让 spark 去 maven repo 自己安装 jars:
spark = DataprocSparkSession.builder \
.appName("testApp") \
.dataprocSessionConfig(session) \
.config("spark.jars.packages",
"org.postgresql:postgresql:42.7.7,"
"com.microsoft.azure:synapseml_2.12:1.0.13,"
"org.apache.sedona:sedona-spark-shaded-3.5_2.12:1.7.2,"
"org.datasyslab:geotools-wrapper:1.7.2-28.5") \
.getOrCreate()
通过这种方式的确可以成功创建 SparkSession,但到实际执行的时候就出问题了:Executor 会报错说找不到相应的 jar file。
但是当我们查看 jar file 的位置的时候它们又确实存在:
spark.conf.get("spark.jars")
# file:///usr/lib/spark/connector/spark-connect_2.12-3.5.1.jar,file:///home/spark/.ivy2/jars/org.postgresql_postgresql-42.7.3.jar,file:///home/spark/.ivy2/jars/com.microsoft.azure_synapseml_2.12-1.0.13.jar,file:///home/spark/.ivy2/jars/org.apache.sedona_sedona-spark-shaded-3.5_2.12-1.7.2.jar,file:///home/spark/.ivy2/jars/org.datasyslab_geotools-wrapper-1.7.2-28.5.jar,file:///home/spark/.ivy2/jars/org.checkerframework_checker-qual-3.42.0.jar,file:///home/spark/.ivy2/jars/com.microsoft.azure_synapseml-core_2.12-1.0.13.jar,file:///home/spark/.ivy2/jars/com.microsoft.azure_synapseml-deep-learning_2.12-1.0.13.jar,file:///home/spark/.ivy2/jars/com.microsoft.azure_synapseml-cognitive_2.12-1.0.13.jar,file:///home/spark/.ivy2/jars/com.microsoft.azure_synapseml-vw_2.12-1.0.13.jar,file:///home/spark/.ivy2/jars/com.microsoft.azure_synapseml-lightgbm_2.12-1.0.13.jar,file:///home/spark/.ivy2/jars/...后续省略
这是因为 Driver 获取的 jar 不会自动传给 Executor。实际执行 spark.read 的时候,就会发生下列问题:
df = spark.read \
.format("jdbc") \
.option("url", f"jdbc:postgresql://{host}:{port}/{database}") \
.option("driver", "org.postgresql.Driver") \
.option("user", username) \
.option("password", password) \
.option("dbtable", source_table) \
.load()
# SparkConnectGrpcException: (org.apache.spark.SparkException) Job aborted due to stage failure: Task 0 in stage 4.0 failed 4 times, most recent failure: Lost task 0.3 in stage 4.0 (TID 19) (10.146.15.230 executor 0): java.lang.ClassNotFoundException: org.postgresql.Driver
# at org.apache.spark.executor.ExecutorClassLoader.findClass(ExecutorClassLoader.scala:124)
# at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:592)
# at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:525)
# at org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(DriverRegistry.scala:46)
# at org.apache.spark.sql.jdbc.JdbcDialect.$anonfun$createConnectionFactory$1(JdbcDialects.scala:157)
# at org.apache.spark.sql.jdbc.JdbcDialect.$anonfun$createConnectionFactory$1$adapted(JdbcDialects.scala:156)
# at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:229)
Docker?可能也不是一个好的选择
通过阅读 GCP 的官方文档,我们可以看到他提供了一个选项,可以让我们自己构建一个 Docker Image 提交上去运行。
但根据文档的描述,这个用法只适用于提交编译好的 jars 作为一个完整的任务 batch 执行的模式,并不支持在 Jupyter Notebook 上互动的操作:
gcloud dataproc batches submit spark \
--container-image=gcr.io/my-project-id/my-image:1.0.1 \
--region=<region-id> \
--jars=file:///opt/spark/jars/spark-examples.jar \
--class=org.apache.spark.examples.SparkPi \
-- <arguments>
对于需要反复测试评估的机器学习应用的场景,这样的简单功能无法满足需求。
Dataproc Cluster 的毛病
我受够了这一切。经过两三周的尝试后,我决定放弃 Serverless。也许 Cluster 没有那么优雅,但至少一切都在用户的掌控之中,不是吗?
哈哈
我可能是被 Databricks 惯坏了。
首先如何访问 Spark?
Cluster 自带的 Jupyter 难用
在创建 Dataproc Cluster 的界面里可以选择安装 Jupyter 模组,配合上面默认勾选的“Enable component gateway”,可以把原本只能内部访问的 Jupyter/Jupyter Lab 界面暴露出来,在浏览器上就可以直接访问。

但是它就真的只是最普通的 Jupyter Notebook,没有任何 UI 优化自动补全。而且它自己不总是能连接到 Cluster 里的 Kernel,并不总是能正确建立 Spark Session。原因不明。
local 的 Jupyter 没法直接访问
既然如此,我也不是必须使用它提供的 Jupyter Notebook,我本地的 Notebook 按理来说也可以使用。默认 Spark Connect
spark.remote的端口在15002,只要连接到创建的 Cluster 中的 Spark 服务的端口就行,就像这样:from pyspark.sql import SparkSession spark = SparkSession.builder.remote("sc://<dataproc-cluster-ip>:15002").getOrCreate()问题在于 Spark Connect 本身是没有任何认证逻辑的,如果把 Spark Connect 端口直接暴露到公网的话,那就基本和赛博裸奔没有区别。
出于安全考虑,Dataproc Cluster 默认只分配 Internal IP 地址。在这种情况下还想连接过去的话,最合理的方式就是通过 gcloud 的命令行操作连接到 VM 内产生一个 SSH 代理。
然后一切的问题就回到一开始的网络连接问题。
当然网络的问题可以通过网络的方法解决。虽然没有实际验证过,但理论上在同一个 VPC 中创建一个安装了 OpenVPN Server 的实例作为跳板,同时配置好相应的 security group 确保 “互联网 - 跳板 VPN - Dataproc” 的整个链路的畅通。这样只要在自己的本地配置好 VPN,就完全可以实现到内网间的 Spark Remote 连接。
但这也太蠢了。
Jupyter Kernel 莫名其妙断联
目前我选择的方案是使用 Vertex AI Workbench 提供的 Jupyter Notebook 实例。它的 Jupyter Lab 默认安装了 Dataproc 相关的插件,可以自动识别到相同 project 中的 Dataproc Cluster,然后自动建立到该 Cluster 环境的 kernel。这一点还是很方便的。
不过用了一段时间发现 kernel 偶尔会突然毫无征兆的挂掉,没有什么原因,可能单纯只是网络波动(?)在着急推进项目进度的时节令人加倍烦躁。
但目前为止这是兼顾各方面问题的最合理的解决方案了,忍了。
如何安装依赖库
Dataproc Cluster 依然没有像 Databricks 一样美丽的依赖库安装界面。在创建 Cluster 的时候有一个选项 Initialization actions 可以指定 Cluster 初始化时执行的脚本,只要往这个脚本里写入安装依赖库的命令就可以解决问题,尽管并不直观。

如图所示这个脚本需要上传到 GCS 才行(所以创建 Dataproc Cluster 的 IAM Role 需要相应的 GCS 访问权限)。
#!/bin/bash
set -e -x
apt-get update -qq
apt-get install -y -qq maven
cat <<'EOF' > /tmp/temppom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.dataproc.deps</groupId>
<artifactId>dataproc-deps</artifactId>
<version>1.0</version>
<dependencies>
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>synapseml_2.12</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>synapseml-lightgbm_2.12</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.sedona</groupId>
<artifactId>sedona-spark-shaded-3.5_2.12</artifactId>
<version>1.8.0</version>
</dependency>
<dependency>
<groupId>org.datasyslab</groupId>
<artifactId>geotools-wrapper</artifactId>
<version>1.8.0-33.1</version>
</dependency>
</dependencies>
</project>
EOF
mvn -f /tmp/temppom.xml dependency:copy-dependencies \
-DoutputDirectory=/usr/lib/spark/jars \
-DexcludeGroupIds=org.scala-lang,org.apache.spark,org.apache.hadoop
rm /tmp/temppom.xml
echo "All deps have been installed to /usr/lib/spark/jars now!"
按照上述思路生成一个 pom.xml 文件,然后调用 maven 命令安装它就好了。唯一需要注意的点就是必须排除 scala-lang 之类的基础依赖,否则容易导致运行环境冲突,进而引发各种神秘问题。
磁盘空间需要一开始自己分配,后续无法添加
磁盘空间这个问题也是从 Databricks 迁移过来之后才意识到的问题。按照 Spark 本身的机制,默认会把计算相关的所有数据保存在内存里,如果数据量超出了给 Executor 分配的内存空间大小,就会触发将数据缓存到磁盘中的逻辑。这个时候如果磁盘空间也不够的话,就会触发 Fatal Error,整个计算任务就会被废弃。
Databricks 是不需要考虑磁盘空间的问题的,它的集群配置页面也没用相关的设置。在 Databricks 里,系统会动态分配集群的磁盘空间,用多少就加多少。用户唯一需要关心的就是合理调整 Executor,避免数据溢出/数据倾斜导致的计算效率下降。整个计算任务是不会直接 fail 掉的。
但 Dataproc Cluster 的设计思想完全不同,它面向的是更资深的用户:用户在创建集群的阶段就需要考虑好实例的类型和大小、挂载的磁盘类型、挂载的磁盘空间大小。一旦创建好了之后就不能再更改,如需更改,就删掉当前的 Cluster 然后再创建一个。
这样一来对于新用户来说就不得不反复尝试不同的配置组合,直到确定好全部的参数。要是对相关服务不够熟悉的话那就是彻底的灾难体验。
项目代码管理的迷思
上面的致命问题都解决之后,终于可以在业务中好好使用 Spark 了。随之而来的是脚本代码管理的问题:在公司里大家需要团队协作,代码最好能在一个持久性的代码管理工具中管理,而不是把脚本扔得到处都是。
在 Databricks 中这个逻辑就非常自然:计算集群和脚本代码本身是解耦的。在 Databricks 的网页界面上就可以编辑、管理不同脚本和数据,同时它集成了多用户访问和权限管理。但对于 Dataproc Cluster 来说它就仅仅只是计算集群。要是你使用 Dataproc Cluster 上集成的 Jupyter 的话就更糟糕:没有 Cluster 就没有办法查看和修改代码。进一步从设计思想上来说 Dataproc Cluster 就是可以随便创建随便删除的,每次新建 Cluster 就是一个全新的存储空间;删除 Cluster 之后里面的 Jupyter Notebook 也就相应地消失了。
这个问题不是没有办法解决。借助 jupyterlab-git 插件,就可以让 Jupyter Notebook 拥有 git 版本管理功能。如果进一步配置 ssh-key,就可以接入 GitHub,把所有的代码同步到远程仓库里。
现在的问题是,这个 ssh-key 是开发者自己的 ssh-key,配置在开发者自己的 GitHub 的设置里。一个公司团队的资源,却只能通过某个个人的名义同步代码,这在组织的管理实践不太恰当。假如这个开发者离开了团队,随着这个开发者的访问权限失效,ssh-key 配置也同时失效了,这个时候团队开发进程也就不得不暂停。
题外话:bigquery 的臭毛病
在整个服务迁移的过程中,我也尝试了一下 BigQuery。
BigQuery 这个服务看起来眉清目秀,算起来飞一般快。而且按数据量收费,1TB 数据 7.5 美刀,讲道理不是特别贵。
但你休想从 GCP 这里薅羊毛!如果你的 SQL 占用的 CPU 时间过长,比如大量的 Geometry Polygon 的相交查询,它会直接停止计算:
Query exceeded resource limits. This query used 2400379 CPU seconds but
would charge only 2940M Analysis bytes. This exceeds the ratio supported
by the on-demand pricing model. Please consider moving this workload to
a capacity-based pricing model, which does not have this limit. 2400379
CPU seconds were used, and this query must use less than 752600 CPU seconds.
这种时候就必须把表拆分成一个个的小表然后分开计算了。
Hello? Internal Error
当讲道理这些都还能接受。都是生意嘛,反正花的是公司的钱,我这边只要有能说得过去的产出一切都好说。
但如果我报错 Internal Error 阁下又当如何应对?
An internal error occurred and the request could not be completed. This
is usually caused by a transient issue. Retrying the job with back-off as
described in the BigQuery SLA should solve the problem:
https://cloud.google.com/bigquery/sla. If the error continues to occur
please contact support at https://cloud.google.com/support. Error: 80038528
工单处理非常缓慢,是的,非常缓慢
当晚就有在线印度客服回答我的问题,非常热情,直到我给他展示我遇到的 Internal Error。然后他让我等邮件回复。两天后他们回复了:
Thank you for your patience as we continue to investigate your BigQuery ML model training issue. I’ve received an update from our internal team that I believe will be helpful.
Based on our analysis of the error logs, it appears the “internal error” you’re seeing is likely caused by memory constraints during the model training process. This can often happen with larger datasets or complex models.
To address this, we recommend that you please try running your BigQuery ML training job with a larger memory type.
We are confident that increasing the available memory for the job will help resolve this issue for you. Please do not hesitate to let us know if you have any further questions or if you require any assistance with this adjustment.
好的,让我重新用 “更大的内存” 重新训练我的 BigQuery ML。这他妈简直就是废话,我当然能猜到这是资源不够导致的,但你这是 BigQuery 诶! 你让我调整资源?我上哪去调整资源?你怎么不让我进数据机房搬设备插网线呢?
Thank you for your patience and understanding regarding your recent request.
We have consulted with our product engineering team regarding your concern, and they have confirmed that we will be increasing the training machine memory size as requested.
We will provide you with an update as soon as this change has been implemented. Please expect an update no later than 10:30 PM IST (UTC+5:30) xx-xxx-2025.
回复的邮件说让我等下一封邮件,一定会给我调整。然后下一封也是一模一样的内容。一共延期了四次,每次都说让我等三四天。前前后后总共折腾了 3 周多一点,最后终于调好了:
Thank you for your patience and understanding while we addressed this for you.
We have consulted with our product engineering team, and they have successfully increased the memory size for your training machine.
Please try running your process again at your convenience. Should you encounter any further issues, please do not hesitate to let us know, and we will be happy to assist you further.
Looking forward to your response.
哈哈哈,谢谢你们的辛勤工作,我早就放弃这个方案了。你们白调整了。
感想
每一次逃离 Databricks 环境的尝试,都是在进一步证明 Databricks 的伟大。
伟大,无需多言