news 2026/4/16 10:48:41

Flink JSON 序列化/反序列化 Schema KafkaSource/KafkaSink + 自定义 ObjectMapper + PyFlink Row

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Flink JSON 序列化/反序列化 Schema KafkaSource/KafkaSink + 自定义 ObjectMapper + PyFlink Row

1. JsonDeserializationSchema:KafkaSource 中反序列化 POJO

JsonDeserializationSchema实现了 Flink 的DeserializationSchema,因此只要某个 connector 支持DeserializationSchema,你就能直接使用它。

典型用法:KafkaSource 只消费 value,反序列化成 POJO:

JsonDeserializationSchema<SomePojo>jsonFormat=newJsonDeserializationSchema<>(SomePojo.class);KafkaSource<SomePojo>source=KafkaSource.<SomePojo>builder().setValueOnlyDeserializer(jsonFormat)// ....build();

适用场景:

  • Kafka 的 value 是 JSON
  • 你希望在 DataStream 里直接拿到业务对象SomePojo

工程建议:

  • POJO 字段尽量使用包装类型(Integer/Long)应对字段缺失或 null
  • 为了兼容字段变动,可以配合 ObjectMapper 设置忽略未知字段(见第 3 节)

2. JsonSerializationSchema:KafkaSink 中序列化 POJO

写回 Kafka 时,JsonSerializationSchema实现了SerializationSchema,可用于任何支持SerializationSchema的 connector。

典型用法:KafkaSink 写 value,序列化 POJO 为 JSON:

JsonSerializationSchema<SomePojo>jsonFormat=newJsonSerializationSchema<>();KafkaSink<SomePojo>sink=KafkaSink.<SomePojo>builder().setRecordSerializer(newKafkaRecordSerializationSchemaBuilder<SomePojo>().setValueSerializationSchema(jsonFormat)// ....build()).build();

适用场景:

  • 你希望下游系统继续消费 JSON
  • 你不想自己手写 Jackson 序列化逻辑

3. 自定义 ObjectMapper:控制 Jackson 行为(非常常用)

Flink 允许你通过构造函数传入SerializableSupplier<ObjectMapper>来定制 mapper,相当于提供一个“ObjectMapper 工厂”。

你可以用它做很多工程级增强,比如:

  • 忽略未知字段(兼容上游 schema 变更)
  • 注册模块(Java 时间类型、参数名模块等)
  • 开启/关闭某些序列化特性(字段排序、空值处理等)

示例:自定义序列化 mapper,让 map key 有序,并注册模块:

JsonSerializationSchema<SomeClass>jsonFormat=newJsonSerializationSchema<>(()->newObjectMapper().enable(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS).registerModule(newParameterNamesModule()));

你也可以把“兼容字段变更”的设置加进去(强烈建议生产开启类似配置):

  • FAIL_ON_UNKNOWN_PROPERTIES关闭
  • JavaTimeModule 等

(这里不展开写完整 mapper 配置,你只要知道:用 supplier 你就能完全掌控 Jackson。)

4. PyFlink:Row 类型用 JsonRowSerializationSchema / JsonRowDeserializationSchema

在 PyFlink 中,Flink 内置了 Row 的 JSON Schema:

  • JsonRowDeserializationSchema
  • JsonRowSerializationSchema

这对 Python 流处理特别友好,因为 Python 侧更常操作 Row 而不是 POJO 类。

KafkaSource:JSON -> Row

row_type_info=Types.ROW_NAMED(['name','age'],[Types.STRING(),Types.INT()])json_format=JsonRowDeserializationSchema.builder()\.type_info(row_type_info)\.build()source=KafkaSource.builder()\.set_value_only_deserializer(json_format)\.build()

KafkaSink:Row -> JSON

row_type_info=Types.ROW_NAMED(['name','age'],[Types.STRING(),Types.INT()])json_format=JsonRowSerializationSchema.builder()\.with_type_info(row_type_info)\.build()sink=KafkaSink.builder()\.set_record_serializer(KafkaRecordSerializationSchema.builder().set_topic('test').set_value_serialization_schema(json_format).build())\.build()

适用场景:

  • Python 处理流数据,行结构清晰
  • Kafka 中 value 为 JSON

5. 选型建议:POJO vs ObjectNode vs Row

  • Java POJO:类型安全、IDE 友好、适合稳定 schema 的业务流
  • ObjectNode:更灵活,适合 schema 频繁变化、半结构化数据
  • PyFlink Row:Python 生态更顺手,适合表/行式处理
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/3 1:30:55

CVE-2025-59287 :揭开WSUS不安全反序列化漏洞的致命真相

一、漏洞核心基础信息 1.1 漏洞核心定义 CVE-2025-59287 是 Windows Server Update Services (WSUS) 组件中被披露的高危远程代码执行&#xff08;RCE&#xff09;漏洞&#xff0c;其根本成因是服务端存在不安全反序列化的设计缺陷。该漏洞被赋予 CVSS 9.8 分&#xff08;满分1…

作者头像 李华
网站建设 2026/4/16 0:55:52

腾讯云隐藏福利:如何通过一键操作白嫖CPU升级?性能飙升

当前这个网络世界&#xff0c;IPv6既可以说是大势所趋&#xff0c;也可以说是形势所迫。谈IPv6&#xff0c;必谈其公网地址。对普通人比较友好的是&#xff0c;你终于有了一个可以从全世界任意位置访问的公网IPv6地址&#xff0c;如果嫌麻烦&#xff0c;配置DDNS-go就可以了&am…

作者头像 李华
网站建设 2026/4/15 5:04:10

龙威破局:DragonForce勒索软件攻防战与企业数字化韧性构建指南

2023年底现身的DragonForce勒索软件&#xff0c;凭借跨Windows与VMware ESXi系统的攻击能力、成熟的勒索即服务&#xff08;RaaS&#xff09;模式&#xff0c;已成为2025-2026年全球企业面临的核心网络威胁之一。其不仅延续了“加密窃密”的双重勒索套路&#xff0c;更通过技术…

作者头像 李华
网站建设 2026/4/13 21:48:49

算法题 二叉搜索树的范围和

938. 二叉搜索树的范围和 问题描述 给定二叉搜索树的根节点 root&#xff0c;以及两个整数 low 和 high&#xff0c;返回所有节点值在范围 [low, high] 内的节点值之和。 二叉搜索树&#xff1a; 对于任意节点&#xff0c;左子树的所有节点值都小于该节点值右子树的所有节点值都…

作者头像 李华