news 2026/6/10 23:51:01

Flink DataGen SQL Connector 本地造数、压测、边界数据与“像真数据”的生成技巧

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Flink DataGen SQL Connector 本地造数、压测、边界数据与“像真数据”的生成技巧

1. 核心概念:bounded vs unbounded

  • 默认unbounded:无限造数
  • 配置number-of-rowsbounded:到数量就停
  • 只要任意字段用了sequence,表也会变成 bounded(序列先跑完就结束)

最小示例:

CREATETABLEOrders(order_numberBIGINT,priceDECIMAL(32,2),buyerROW<first_name STRING,last_name STRING>,order_timeTIMESTAMP(3))WITH('connector'='datagen');

2. 三个最常用的全局参数

2.1 控速:rows-per-second

默认 10000 行/秒:

WITH('connector'='datagen','rows-per-second'='5000')

2.2 有界:number-of-rows

让作业像 batch 一样跑完退出:

WITH('connector'='datagen','number-of-rows'='100000')

2.3 并行:scan.parallelism

控制 source 并行度(不写就用全局默认):

WITH('connector'='datagen','scan.parallelism'='4')

3. “像真数据”的关键:字段级配置 fields.#.*

DataGen 的强大在于每个字段都能调:

3.1 random vs sequence(最重要)

  • fields.xxx.kind = random:随机
  • fields.xxx.kind = sequence:从 start 到 end

示例:订单号从 1 到 100000(bounded),价格随机范围 1~999:

CREATETABLEGenOrders(order_idBIGINT,priceDECIMAL(10,2),buyer_idBIGINT,tsTIMESTAMP(3))WITH('connector'='datagen','rows-per-second'='20000','fields.order_id.kind'='sequence','fields.order_id.start'='1','fields.order_id.end'='100000','fields.price.kind'='random','fields.price.min'='1','fields.price.max'='999','fields.buyer_id.min'='1','fields.buyer_id.max'='1000000','fields.ts.max-past'='10 min');

3.2 NULL 注入:null-rate(做脏数据/容错测试必备)

例如 5% 的 buyer_id 为空:

'fields.buyer_id.null-rate'='0.05'

3.3 时间字段:max-past

TIMESTAMP / TIMESTAMP_LTZ 会生成“过去的时间”,最大过去范围由max-past控制:

'fields.order_time.max-past'='1 h'

注意:文档里强调TIME/DATE 永远是本机当前时间/日期,所以如果你想更“可控”的时间字段,通常用计算列生成(见第 6 节)。

4. 字符串/字节数组长度:length + var-len

DataGen 对长度约束有三条规则(很容易踩坑):

  • CHAR/BINARY(定长):长度只能在 schema 定,不能自定义
  • VARCHAR/VARBINARY(变长但有上限):fields.#.length不能超过 schema 定义的长度
  • STRING/BYTES(超长类型):默认 length=100,可设到< 2^31

示例:seller 是 VARCHAR(150),开启变长生成:

CREATETABLEOrders2(idBIGINT,sellerVARCHAR(150),commentSTRING)WITH('connector'='datagen','fields.seller.var-len'='true','fields.seller.length'='150','fields.comment.length'='2000');

5. 集合类型造数:ARRAY / MAP / MULTISET 的长度

默认集合大小是 3,可以用fields.#.length指定:

CREATETABLEGenCollections(f0 ARRAY<INT>,f1 MAP<INT,STRING>,f2 MULTISET<INT>)WITH('connector'='datagen','fields.f0.length'='10','fields.f1.length'='11','fields.f2.length'='12');

这个特别适合验证:下游 sink(比如 ES/OpenSearch)对嵌套结构的序列化、映射、字段膨胀问题。

6. 高级用法:LIKE 复制真实表结构,再 EXCLUDING ALL

你经常需要“模拟某个真实表”,又不想手写 schema。这时就用:

CREATETABLEOrders(order_numberBIGINT,priceDECIMAL(32,2),buyerROW<first_name STRING,last_name STRING>,order_timeTIMESTAMP(3))WITH(...真实表的 connector...);CREATETEMPORARYTABLEGenOrdersWITH('connector'='datagen','number-of-rows'='10')LIKEOrders(EXCLUDINGALL);

这个模式用于:

  • 在没有真实源的情况下,把 SQL pipeline 先跑通
  • 回归测试时稳定复现 schema 变更带来的影响

7. 常见测试配方(直接抄)

7.1 “压测下游 sink”配方

目标:把 sink(ES/JDBC/HBase)顶满看吞吐/反压

  • rows-per-second
  • 合理scan.parallelism
  • 关键字段尽量sequence避免过多重复(尤其 upsert sink)
CREATETABLEGenHot(idBIGINT,v STRING,tsTIMESTAMP(3))WITH('connector'='datagen','rows-per-second'='200000','scan.parallelism'='8','fields.id.kind'='sequence','fields.id.start'='1','fields.id.end'='10000000','fields.v.length'='200','fields.v.var-len'='true','fields.ts.max-past'='5 min');

7.2 “脏数据/空值/长字段”配方

目标:验证 UDF、下游 schema、映射、容错逻辑

CREATETABLEGenDirty(idBIGINT,name STRING,note STRING,scoreINT)WITH('connector'='datagen','rows-per-second'='5000','fields.name.null-rate'='0.02','fields.note.length'='5000','fields.note.var-len'='true','fields.score.min'='-100','fields.score.max'='200');

8. 最容易踩的坑(你一定会遇到)

  • VARCHAR(20)却配置fields.xxx.length=200:不生效/报错(长度不能超过 schema)
  • 配了sequence还以为是流:只要有 sequence 就可能 bounded(序列先结束)
  • DATE/TIME 不是随机:永远是本机当前日期/时间
  • rows-per-second太高导致下游反压:你会看到 source 自动被 backpressure 限速(这其实是你想要的压测现象)
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/6/10 13:43:31

如何查阅最新的研究论文:实用方法与技巧指南

刚开始做科研的时候&#xff0c;我一直以为&#xff1a; 文献检索就是在知网、Google Scholar 里反复换关键词。 直到后来才意识到&#xff0c;真正消耗精力的不是“搜不到”&#xff0c;而是—— 你根本不知道最近这个领域发生了什么。 生成式 AI 出现之后&#xff0c;学术检…

作者头像 李华
网站建设 2026/6/10 13:44:25

springboot_ssm859微空间私人定向共享系统

目录 具体实现截图摘要 系统所用技术介绍写作提纲源码文档获取/同行可拿货,招校园代理 &#xff1a;文章底部获取博主联系方式&#xff01; 具体实现截图 摘要 SpringBoot_SSM859微空间私人定向共享系统是基于SpringBoot框架和SSM&#xff08;SpringSpringMVCMyBatis&#xf…

作者头像 李华
网站建设 2026/6/10 13:44:13

java_ssm97爱宠宠物医院挂号预约系统管理系统设计与实现

目录具体实现截图摘要系统所用技术介绍写作提纲源码文档获取/同行可拿货,招校园代理 &#xff1a;文章底部获取博主联系方式&#xff01;具体实现截图 摘要 爱宠宠物医院挂号预约系统是一款基于Java SSM框架&#xff08;Spring、Spring MVC、MyBatis&#xff09;开发的智能化…

作者头像 李华
网站建设 2026/6/10 9:52:32

芯片制造企业网页应用,JAVA大文件分块上传如何实现?

大文件传输解决方案设计方案 作为福建某软件公司的技术负责人&#xff0c;针对大文件传输需求&#xff0c;我提出以下技术方案&#xff1a; 一、需求分析与技术挑战 核心需求&#xff1a; 支持50G大文件传输文件/文件夹上传下载&#xff08;保留层级结构&#xff09;断点续传…

作者头像 李华