news 2026/4/18 15:23:41

Flink JSON 序列化/反序列化 Schema KafkaSource/KafkaSink + 自定义 ObjectMapper + PyFlink Row

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Flink JSON 序列化/反序列化 Schema KafkaSource/KafkaSink + 自定义 ObjectMapper + PyFlink Row

1. JsonDeserializationSchema:KafkaSource 中反序列化 POJO

JsonDeserializationSchema实现了 Flink 的DeserializationSchema,因此只要某个 connector 支持DeserializationSchema,你就能直接使用它。

典型用法:KafkaSource 只消费 value,反序列化成 POJO:

JsonDeserializationSchema<SomePojo>jsonFormat=newJsonDeserializationSchema<>(SomePojo.class);KafkaSource<SomePojo>source=KafkaSource.<SomePojo>builder().setValueOnlyDeserializer(jsonFormat)// ....build();

适用场景:

  • Kafka 的 value 是 JSON
  • 你希望在 DataStream 里直接拿到业务对象SomePojo

工程建议:

  • POJO 字段尽量使用包装类型(Integer/Long)应对字段缺失或 null
  • 为了兼容字段变动,可以配合 ObjectMapper 设置忽略未知字段(见第 3 节)

2. JsonSerializationSchema:KafkaSink 中序列化 POJO

写回 Kafka 时,JsonSerializationSchema实现了SerializationSchema,可用于任何支持SerializationSchema的 connector。

典型用法:KafkaSink 写 value,序列化 POJO 为 JSON:

JsonSerializationSchema<SomePojo>jsonFormat=newJsonSerializationSchema<>();KafkaSink<SomePojo>sink=KafkaSink.<SomePojo>builder().setRecordSerializer(newKafkaRecordSerializationSchemaBuilder<SomePojo>().setValueSerializationSchema(jsonFormat)// ....build()).build();

适用场景:

  • 你希望下游系统继续消费 JSON
  • 你不想自己手写 Jackson 序列化逻辑

3. 自定义 ObjectMapper:控制 Jackson 行为(非常常用)

Flink 允许你通过构造函数传入SerializableSupplier<ObjectMapper>来定制 mapper,相当于提供一个“ObjectMapper 工厂”。

你可以用它做很多工程级增强,比如:

  • 忽略未知字段(兼容上游 schema 变更)
  • 注册模块(Java 时间类型、参数名模块等)
  • 开启/关闭某些序列化特性(字段排序、空值处理等)

示例:自定义序列化 mapper,让 map key 有序,并注册模块:

JsonSerializationSchema<SomeClass>jsonFormat=newJsonSerializationSchema<>(()->newObjectMapper().enable(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS).registerModule(newParameterNamesModule()));

你也可以把“兼容字段变更”的设置加进去(强烈建议生产开启类似配置):

  • FAIL_ON_UNKNOWN_PROPERTIES关闭
  • JavaTimeModule 等

(这里不展开写完整 mapper 配置,你只要知道:用 supplier 你就能完全掌控 Jackson。)

4. PyFlink:Row 类型用 JsonRowSerializationSchema / JsonRowDeserializationSchema

在 PyFlink 中,Flink 内置了 Row 的 JSON Schema:

  • JsonRowDeserializationSchema
  • JsonRowSerializationSchema

这对 Python 流处理特别友好,因为 Python 侧更常操作 Row 而不是 POJO 类。

KafkaSource:JSON -> Row

row_type_info=Types.ROW_NAMED(['name','age'],[Types.STRING(),Types.INT()])json_format=JsonRowDeserializationSchema.builder()\.type_info(row_type_info)\.build()source=KafkaSource.builder()\.set_value_only_deserializer(json_format)\.build()

KafkaSink:Row -> JSON

row_type_info=Types.ROW_NAMED(['name','age'],[Types.STRING(),Types.INT()])json_format=JsonRowSerializationSchema.builder()\.with_type_info(row_type_info)\.build()sink=KafkaSink.builder()\.set_record_serializer(KafkaRecordSerializationSchema.builder().set_topic('test').set_value_serialization_schema(json_format).build())\.build()

适用场景:

  • Python 处理流数据,行结构清晰
  • Kafka 中 value 为 JSON

5. 选型建议:POJO vs ObjectNode vs Row

  • Java POJO:类型安全、IDE 友好、适合稳定 schema 的业务流
  • ObjectNode:更灵活,适合 schema 频繁变化、半结构化数据
  • PyFlink Row:Python 生态更顺手,适合表/行式处理
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/18 11:09:29

智能巡检车、无人机道路检测、AI 路况分析平台 智慧交通 驾驶视角道路病害缺陷检测数据集 建立基于深度学习框架YOLOV8道路病害缺陷检测系统 裂纹 网快 坑洼

道路缺陷检测数据集使用labelimg标注&#xff0c;标签的格式是txt格式&#xff0c;适用于yolo目标检测系列所有版本训练数据集。 标注了&#xff08;裂纹&#xff08;Crack&#xff09;、 检查井&#xff08;Manhole&#xff09;、 网&#xff08;Net&#xff09;、 裂纹块&…

作者头像 李华
网站建设 2026/4/18 11:02:30

机器人日志十年演进

下面给你一条专门针对机器人系统的 「机器人日志十年演进路线&#xff08;2025–2035&#xff09;」。 这里的“日志”不是简单的 printf&#xff0c;而是机器人如何记住自己做过什么、为什么这么做、以及如何避免重蹈覆辙。一、核心判断&#xff08;一句话&#xff09;未来十年…

作者头像 李华
网站建设 2026/4/18 7:01:40

机器人诊断十年演进

下面给你一条专门针对机器人系统的 「机器人诊断十年演进路线&#xff08;2025–2035&#xff09;」。 这里的“诊断”不是 IT 意义上的排错&#xff0c;而是机器人在真实世界中如何理解自身失效、判断风险、选择修复策略&#xff0c;并避免重复犯错。一、核心判断&#xff08;…

作者头像 李华
网站建设 2026/4/18 11:05:52

机器人监控系统十年演进

下面给你一条专门针对机器人系统的 「机器人监控系统十年演进路线&#xff08;2025–2035&#xff09;」。 这里的“监控系统”不是 IT 意义上的 dashboard&#xff0c;而是机器人在真实世界中是否仍然“可控、可信、可持续运行”的核心基础设施。一、核心判断&#xff08;一句…

作者头像 李华
网站建设 2026/4/18 8:50:10

机器人系统架构十年演进

机器人系统架构这条线&#xff0c;十年里会发生的最大变化是&#xff1a;**架构从“把模块连起来”变成“把风险管起来”。**你会看到抽象层不断上移&#xff1a;模块 → 行为 → 风险与治理&#xff1b;而真正拉开差距的&#xff0c;是谁先把“退化、失败、责任边界、人机协同…

作者头像 李华