news 2026/4/17 19:22:51

CSV Format Flink / PyFlink 读写 CSV 的正确姿势(含 Schema 高级配置)

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
CSV Format Flink / PyFlink 读写 CSV 的正确姿势(含 Schema 高级配置)

1、依赖引入

Java/Scala 工程需要加 Flink CSV 依赖:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>2.2.0</version></dependency>

PyFlink 用户一般可以直接在作业里使用(前提是集群环境里对应的 jar 能被加载;如果你是在远程集群跑,仍然需要按你前面“依赖管理”章节的方式把 jar 加入pipeline.jarsenv.add_jars())。

2、Java:快速读取 POJO(自动推导 Schema)

最省事的方式:让 Jackson 根据 POJO 字段推导 CSV schema:

CsvReaderFormat<SomePojo>csvFormat=CsvReaderFormat.forPojo(SomePojo.class);FileSource<SomePojo>source=FileSource.forRecordStreamFormat(csvFormat,Path.fromLocalFile(...)).build();

注意:CSV 列顺序必须和 POJO 字段顺序一致。必要时加:

@JsonPropertyOrder({"field1","field2",...})

否则列对不上会出现解析错位(最常见的“字段都不为空但值都错”的隐性 bug)。

3、Java:高级配置(自定义分隔符、禁用引号等)

需要精细控制时,用forSchema(...)自己生成CsvSchema,例如把分隔符改成|,并禁用 quote:

Function<CsvMapper,CsvSchema>schemaGenerator=mapper->mapper.schemaFor(CityPojo.class).withoutQuoteChar().withColumnSeparator('|');CsvReaderFormat<CityPojo>csvFormat=CsvReaderFormat.forSchema(()->newCsvMapper(),schemaGenerator,TypeInformation.of(CityPojo.class));FileSource<CityPojo>source=FileSource.forRecordStreamFormat(csvFormat,Path.fromLocalFile(...)).build();

对应 CSV:

Berlin|52.5167|13.3833|Germany|DE|Berlin|primary|3644826 San Francisco|37.7562|-122.443|United States|US|California||3592294 Beijing|39.905|116.3914|China|CN|Beijing|primary|19433000

更复杂的类型也能做(比如数组列),通过CsvSchema.ColumnType.ARRAY并指定数组元素分隔符:

CsvReaderFormat<ComplexPojo>csvFormat=CsvReaderFormat.forSchema(CsvSchema.builder().addColumn(newCsvSchema.Column(0,"id",CsvSchema.ColumnType.NUMBER)).addColumn(newCsvSchema.Column(4,"array",CsvSchema.ColumnType.ARRAY).withArrayElementSeparator("#")).build(),TypeInformation.of(ComplexPojo.class));

4、PyFlink:手动定义 CSV Schema(输出为 Row)

PyFlink 里通常自己建 schema,每一列映射为 Row 字段:

frompyflink.common.watermark_strategyimportWatermarkStrategyfrompyflink.tableimportDataTypesfrompyflink.datastreamimportStreamExecutionEnvironmentfrompyflink.datastream.connectors.file_systemimportFileSourcefrompyflink.formats.csvimportCsvReaderFormat,CsvSchema# 具体 import 以你环境包结构为准env=StreamExecutionEnvironment.get_execution_environment()schema=CsvSchema.builder()\.add_number_column('id',number_type=DataTypes.BIGINT())\.add_array_column('array',separator='#',element_type=DataTypes.INT())\.set_column_separator(',')\.build()source=FileSource.for_record_stream_format(CsvReaderFormat.for_schema(schema),CSV_FILE_PATH).build()ds=env.from_source(source,WatermarkStrategy.no_watermarks(),'csv-source')# ds 的 record 类型是 Row(具名字段 + 复合类型)# Types.ROW_NAMED(['id', 'array'], [Types.LONG(), Types.LIST(Types.INT())])

对应 CSV:

0,1#2#3 1, 2,1

补一个实战提醒:如果某列可能为空(比如上面的array),你后续算子处理时要把None/空数组的分支写好,否则很容易在 map/flat_map 里触发类型错误。

5、PyFlink:写 CSV(Bulk Format)

写 CSV 通常用CsvBulkWriters生成 BulkWriterFactory,再配合FileSink.for_bulk_format(...)

frompyflink.tableimportDataTypesfrompyflink.datastream.connectors.file_systemimportFileSinkfrompyflink.formats.csvimportCsvBulkWriters,CsvSchema# 具体 import 以你环境包结构为准schema=CsvSchema.builder()\.add_number_column('id',number_type=DataTypes.BIGINT())\.add_array_column('array',separator='#',element_type=DataTypes.INT())\.set_column_separator(',')\.build()sink=FileSink.for_bulk_format(OUTPUT_DIR,CsvBulkWriters.for_schema(schema)).build()ds.sink_to(sink)

6、运行模式:Batch / Streaming 都可用

CsvReaderFormat类似TextLineInputFormat,既可用于批也可用于流(持续监控目录等),具体取决于你用的 Source/RuntimeMode 以及文件系统是否支持持续发现。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/12 7:13:40

收藏这份AI客服构建指南:有赞从0到1的实践经验与思考

有赞分享了AI客服系统从0到1的完整实践历程。项目始于黑客马拉松&#xff0c;初期选用Dify平台快速验证&#xff0c;后采用混合架构应对性能挑战。文章详细阐述了模型选择、Workflow设计、上下文管理、知识工程等关键技术环节&#xff0c;并分享了评测优化和协作管理的经验。核…

作者头像 李华
网站建设 2026/4/18 6:29:53

国外学术论文怎么找:实用检索技巧与资源平台推荐

刚开始做科研的时候&#xff0c;我一直以为&#xff1a; 文献检索就是在知网、Google Scholar 里反复换关键词。 直到后来才意识到&#xff0c;真正消耗精力的不是“搜不到”&#xff0c;而是—— 你根本不知道最近这个领域发生了什么。 生成式 AI 出现之后&#xff0c;学术检…

作者头像 李华
网站建设 2026/4/3 3:00:49

卡尔曼滤波做轨迹跟踪 鲁棒卡尔曼滤波做野值剔除后的预测 扩展卡尔曼滤波对GPS数据进行状态估计滤波

卡尔曼滤波做轨迹跟踪 鲁棒卡尔曼滤波做野值剔除后的预测 扩展卡尔曼滤波对GPS数据进行状态估计滤波 轨迹跟踪这活儿听起来玄乎&#xff0c;其实咱们每天都在用——手机导航里那个蓝色小圆点&#xff0c;背后八成藏着卡尔曼滤波的数学魔法。今天咱们扯点实在的&#xff0c;用P…

作者头像 李华
网站建设 2026/4/17 18:02:49

电鱼智能 RK3399 赋能配送机器人的多屏交互与人脸识别支付

什么是 电鱼智能 RK3399&#xff1f;电鱼智能 RK3399 是一款高性能、高扩展性的六核&#xff08;2A72 4A53&#xff09;嵌入式核心板。虽然发布已有几年&#xff0c;但它在多媒体处理方面依然表现强劲。它支持 双路 MIPI/LVDS/HDMI/eDP 显示接口&#xff0c;且内置了双路 ISP&…

作者头像 李华
网站建设 2026/4/18 5:24:32

python基于vue的教室预约管理平台的设计与实现django flask pycharm

目录教室预约管理平台的设计与实现摘要开发技术路线相关技术介绍核心代码参考示例结论源码lw获取/同行可拿货,招校园代理 &#xff1a;文章底部获取博主联系方式&#xff01;教室预约管理平台的设计与实现摘要 基于Python的教室预约管理平台采用前后端分离架构&#xff0c;前端…

作者头像 李华