news 2026/4/18 8:32:23

PyFlink JAR、Python 包、requirements、虚拟环境、模型文件,远程集群怎么一次搞定?

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
PyFlink JAR、Python 包、requirements、虚拟环境、模型文件,远程集群怎么一次搞定?

1. 先记住一条总原则:混用 DataStream + Table 时,用 DataStream API 配依赖

文档强调了一句非常关键的话:

如果一个 Job 里混用了 Python DataStream API 和 Python Table API,建议通过 DataStream API去指定依赖,这样两边都能生效。

也就是:

  • 纯 Table:table_env.get_config()/table_env.add_python_*
  • 混用:优先StreamExecutionEnvironmentadd_jars / add_python_file / set_python_requirements / add_python_archive / set_python_executable

2. JAR 依赖:pipeline.jars vs pipeline.classpaths vs add_jars vs add_classpaths

2.1 Table API 方式

A)pipeline.jars:上传到集群(最常用)

  • 只能file://本地路径
  • 会把 JAR上传到集群
table_env.get_config().set("pipeline.jars","file:///my/jar/path/connector.jar;file:///my/jar/path/json.jar")

Windows 示例(注意还是 file:///):

table_env.get_config().set("pipeline.jars","file:///E:/my/jar/path/connector.jar;file:///E:/my/jar/path/json.jar")

B)pipeline.classpaths:不上传,只加到 classpath(要求集群也能访问同路径)

  • 你必须保证 client、cluster 都能访问这些 URL(比如共享盘、同目录、分发好了)
table_env.get_config().set("pipeline.classpaths","file:///opt/flink/jars/connector.jar;file:///opt/flink/jars/json.jar")

一句话:

  • 你不想折腾分发:用pipeline.jars
  • 你已经把 jar 管理好并且集群路径一致:用pipeline.classpaths

2.2 DataStream API 方式(混用场景首选)

A)add_jars(...):上传到集群

env.add_jars("file:///my/jar/path/connector1.jar","file:///my/jar/path/connector2.jar")

B)add_classpaths(...):加到 client + cluster classpath(同样要求可达)

env.add_classpaths("file:///opt/flink/jars/connector1.jar","file:///opt/flink/jars/connector2.jar")

2.3 提交参数--jarfile的限制

  • 只支持一个 jar,所以多个依赖通常要求你自己打fat jar / uber jar

3. Python 依赖:三种层级(文件/目录、requirements、归档环境)

3.1 python.files / add_python_file:带“代码/包”到 PYTHONPATH

适合:

  • 你的 UDF 写在my_udf.py
  • 或者你有一坨自研包目录my_pkg/
  • 或者你打好了*.whl / *.egg(zip 本质)

Table API:

table_env.add_python_file("/path/to/my_udf.py")table_env.add_python_file("/path/to/my_pkg/")# 目录也可以

DataStream API:

env.add_python_file("/path/to/my_udf.py")env.add_python_file("/path/to/my_pkg/")

等价的还有:

  • 配置python.files
  • 提交参数-pyfs / --pyFiles

关键点:这些会被加到Python UDF worker 的 PYTHONPATH

3.2 requirements.txt / set_python_requirements:让集群 pip 安装第三方依赖

适合:

  • numpy/pandas/requests/sklearn 这种 pip 依赖
  • 你希望 Flink 在 worker 上自动安装

Table API:

table_env.set_python_requirements(requirements_file_path="/path/to/requirements.txt",requirements_cache_dir="cached_dir"# 可选)

DataStream API:

env.set_python_requirements(requirements_file_path="/path/to/requirements.txt",requirements_cache_dir="cached_dir")

离线安装(集群没网)怎么做?

文档给了关键命令:

pip download -d cached_dir -r requirements.txt --no-binary :all:

然后把这个cached_dir作为requirements_cache_dir传进去,Flink 会上传它用于离线安装。

硬要求(很容易忽视):

  • pip >= 20.3
  • setuptools >= 37.0.0
  • cached_dir 里的包必须匹配集群平台与 Python 版本(比如 manylinux、glibc、cp310/cp311)

等价的还有:

  • 配置python.requirements
  • 提交参数-pyreq / --pyRequirements

3.3 python.archives / add_python_archive:带“环境/数据/模型文件”并自动解压

适合:

  • 你要带模型、词典、数据文件
  • 你要带一个完整虚拟环境(venv/conda 打包)

Table API:

table_env.add_python_archive("/path/to/py_env.zip","myenv")

DataStream API:

env.add_python_archive("/path/to/py_env.zip","myenv")

在 UDF 里访问(相对路径):

defmy_udf():withopen("myenv/py_env/data/data.txt")asf:...

如果没写 target_dir:

table_env.add_python_archive("/path/to/py_env.zip")# UDF 内访问:open("py_env.zip/py_env/data/data.txt")

支持格式:

  • zip 系(zip/jar/whl/egg…)
  • tar 系(tar/tar.gz/tgz…)

注意:如果 archive 里是虚拟环境,一定要和集群平台一致。

4. Python 解释器:worker 端与 client 端是两回事

这是很多人线上翻车的根本原因:
client 侧需要 Python 来解析/编译 UDF;cluster 侧 worker 需要 Python 来执行 UDF。

4.1 worker 端 Python:python.executable / set_python_executable

Table API:

table_env.get_config().set_python_executable("/path/to/python")

DataStream API:

env.set_python_executable("/path/to/python")

解释器放在 archive 里(推荐“自带环境”打法)

env.add_python_archive("/path/to/py_env.zip","venv")env.set_python_executable("venv/py_env/bin/python")

注意:如果指向 archive 内路径,用相对路径,别写绝对路径。

等价方式:

  • 配置python.executable
  • 提交参数-pyexec / --pyExecutable

4.2 client 端 Python:python.client.executable / --pyClientExecutable

client 端用于“编译阶段解析 Python UDF”。你可以:

  • 直接激活你本地的 venv:source my_env/bin/activate
  • 或通过配置/参数指定:python.client.executable/-pyclientexec/PYFLINK_CLIENT_EXECUTABLE

5. 在 Java/SQL 里用 Python UDF:依赖还是走 Python 那套配置

你可以在 Java Table API 或纯 SQL 里注册 Python UDF,例如:

tEnv.executeSql("create temporary system function add_one as 'add_one.add_one' language python");

但 Python 依赖依然要通过这些配置/参数提供:

  • python.files
  • python.archives
  • python.requirements
  • python.executable
  • python.client.executable

6. 一套工程化推荐组合(拿去就能用)

场景A:集群能上网(最省事)

  • JAR:pipeline.jarsenv.add_jars
  • Python:set_python_requirements(requirements.txt)
  • 自研代码:add_python_file(my_udf.py / my_pkg/)
  • 模型文件:add_python_archive(model.zip, "model")

场景B:集群没网(企业内网最常见)

  • requirements 离线缓存:pip download -d cached_dir -r requirements.txt
  • 代码依赖:add_python_file(...)
  • 模型/数据:add_python_archive(...)
  • 如果集群 Python 环境不可控:直接把 venv 打包进 archive,再set_python_executable("venv/.../python")

场景C:你要“零环境依赖”的可移植作业(最稳)

  • 把 venv 打包成 zip(或 conda pack)
  • add_python_archive(venv.zip, "venv")
  • set_python_executable("venv/.../python")
  • 第三方包不再走 requirements(除非你愿意让它再装一遍)

7. 最常见的坑清单(提前避雷)

  • 混用 Table + DataStream,你只在 Table 侧配了依赖,DataStream 侧 UDF 找不到包
    → 混用就统一用StreamExecutionEnvironment

  • pipeline.classpaths指向本地路径,集群节点根本访问不到
    → 不确定就用pipeline.jars / add_jars上传

  • 离线 cached_dir 的 wheel/源码包与集群平台/Python 版本不匹配
    → 必须按集群环境构建缓存(Linux x86_64 + cp310/cp311)

  • worker 端 Python 和 client 端 Python 版本不一致,Arrow/Pandas 相关依赖经常爆炸
    → 统一版本,或者用 archive 自带解释器

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/7 19:25:30

强烈安利9个AI论文写作软件,本科生搞定毕业论文!

强烈安利9个AI论文写作软件,本科生搞定毕业论文! AI工具助力论文写作,告别手忙脚乱 对于大多数本科生来说,撰写毕业论文是一次前所未有的挑战。从选题到开题,从大纲搭建到初稿撰写,每一个环节都可能让人感到…

作者头像 李华
网站建设 2026/4/14 21:44:25

基于Python的智能房价分析与预测系统设计2025_9166ra6h

前言在房地产市场波动加剧的背景下,购房者、投资者、开发商及政策制定者均面临信息不对称的挑战。该系统通过整合多源数据(如历史成交价、区域规划、人口流动等),结合机器学习算法,构建精准的房价预测模型,…

作者头像 李华
网站建设 2026/3/31 16:27:12

9个降AI率工具推荐!自考党高效避坑指南

9个降AI率工具推荐!自考党高效避坑指南 AI降重工具:自考论文的高效护航者 随着人工智能技术的广泛应用,越来越多的学生在撰写论文时依赖AI工具来提高效率。然而,AI生成的内容往往存在明显的痕迹,导致AIGC率过高&#x…

作者头像 李华
网站建设 2026/4/17 16:08:34

吐血推荐!9大AI论文网站测评:研究生科研写作全攻略

吐血推荐!9大AI论文网站测评:研究生科研写作全攻略 AI论文写作工具测评:精准选择助力科研高效推进 在当前学术研究日益数字化的背景下,研究生群体在论文撰写过程中面临诸多挑战,如文献检索效率低、格式规范难掌握、内容…

作者头像 李华
网站建设 2026/4/11 5:31:13

Bound Service Account Token Improvements

Bound Service Account Token Improvements 详细介绍Bound Service Account Token(绑定服务账户令牌)是 Kubernetes 针对传统服务账户令牌安全缺陷推出的增强机制,通过短期有效、对象绑定、受众限制和自动轮换等特性,显著提升了容…

作者头像 李华