news 2026/4/20 10:34:14

flink+kafka实例

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
flink+kafka实例

简单说flink&kafka

Kafka 是高吞吐、高可靠的消息队列,负责承接上游所有动态数据(用户行为、业务日志、设备采集、数据库变更);
Flink 是流批一体的计算引擎,负责对 Kafka 里的 “流动数据” 做实时处理。

下面是企业真实开发中最常用的 3 类代码模板,覆盖 “消费→处理→输出” 全链路:
前置依赖(需补充 Kafka 连接器)

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>1.17.0</version></dependency>
  1. 基础:消费 Kafka 数据做实时统计(你理解的 “统计功能”)
importorg.apache.flink.api.common.serialization.SimpleStringSchema;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;importjava.util.Properties;publicclassFlinkKafkaCountDemo{publicstaticvoidmain(String[]args)throwsException{StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 1. 配置Kafka连接参数PropertieskafkaProps=newProperties();kafkaProps.setProperty("bootstrap.servers","localhost:9092");kafkaProps.setProperty("group.id","flink-kafka-group");// 2. 从Kafka消费数据(topic:user_behavior)DataStream<String>kafkaStream=env.addSource(newFlinkKafkaConsumer<>("user_behavior",newSimpleStringSchema(),kafkaProps));// 3. 实时统计:按用户行为类型(点击/加购/下单)计数kafkaStream.map(line->line.split(",")[1])// 假设数据格式:userID,behavior,time.keyBy(behavior->behavior).countWindow(60)// 60秒窗口.sum(0).print("实时行为统计:");env.execute("Flink-Kafka Count Job");}}
  1. 进阶:消费 Kafka 做实时 ETL(清洗后写入另一 Topic)
// 承接上面的kafkaStreamDataStream<String>cleanStream=kafkaStream// 过滤脏数据(非JSON/空值).filter(line->line!=null&&line.startsWith("{"))// 转换格式(JSON→CSV).map(line->{JSONObjectjson=JSON.parseObject(line);returnjson.getString("userID")+","+json.getString("orderID")+","+json.getString("amount");});// 将清洗后的数据写入Kafka的clean_order_topiccleanStream.addSink(newFlinkKafkaProducer<>("localhost:9092","clean_order_topic",newSimpleStringSchema()));
  1. 高阶:消费 Kafka 做 CEP 复杂事件检测
// 承接kafkaStream,解析为用户行为实体DataStream<UserBehavior>behaviorStream=kafkaStream.map(line->{String[]fields=line.split(",");returnnewUserBehavior(fields[0],fields[1],Long.parseLong(fields[2]));});// 定义CEP模式:10分钟内连续3次失败登录Pattern<UserBehavior,?>failLoginPattern=Pattern.<UserBehavior>begin("first_fail").where(behavior->"fail_login".equals(behavior.getBehavior())).next("second_fail").where(behavior->"fail_login".equals(behavior.getBehavior())).next("third_fail").where(behavior->"fail_login".equals(behavior.getBehavior())).within(Time.minutes(10));// 检测匹配的事件,输出风控告警PatternStream<UserBehavior>patternStream=CEP.pattern(behaviorStream.keyBy(UserBehavior::getUserID),failLoginPattern);patternStream.select(pattern->"用户"+pattern.get("first_fail").get(0).getUserID()+"10分钟内3次登录失败").print("风控告警:");

总结

核心结论:Flink 接 Kafka 不只是 “统计”,而是覆盖实时统计、ETL、风控、特征工程、CEP 等全场景,“统计” 只是最基础的入门场景;
架构逻辑:Kafka 负责 “数据传输 / 暂存”,Flink 负责 “数据计算 / 处理”,是企业实时大数据的标准组合;
落地关键:无论什么场景,核心流程都是「Kafka 消费数据 → Flink 计算 / 处理 → 输出到存储 / 下游 Kafka / 大屏」。
如果需要某类场景(如金融风控 / 电商实时统计)的完整可运行代码,可以直接说,我会按企业真实开发规范提供。

#“为什么非要用 Flink?自己写接口接数据、做清洗、插数据库不行吗?”
核心差异其实体现在「数据量 / 并发 / 实时性 / 可靠性」这几个维度,尤其是数据量变大后,手写接口的方案会快速失效。

#手写方案的致命问题
并发瓶颈:单消费者线程处理每秒 1 万条会卡死,多消费者需自己做分区分配,极易出现重复消费;
可靠性差:如果插入数据库时程序崩溃,已经处理的日志可能丢了,未处理的可能重复;
扩展难:要加 “5 分钟内去重” 逻辑,需自己写缓存(Redis)+ 定时清理,代码量翻倍;
监控缺失:不知道每秒处理了多少条、失败了多少条,出问题只能查日志。

// Spring Boot + Kafka Consumer@ServicepublicclassLogConsumerService{@AutowiredprivateClickHouseMapperclickHouseMapper;// 手写Kafka消费者@KafkaListener(topics="user_log",groupId="log-group")publicvoidconsume(List<String>logs,Acknowledgmentack){try{// 1. 数据清洗(简单过滤)List<String>cleanLogs=logs.stream().filter(log->log!=null&&!log.contains("null")).collect(Collectors.toList());// 2. 批量插入ClickHouse(自己写批量逻辑)clickHouseMapper.batchInsert(cleanLogs);// 3. 手动提交偏移量(避免丢数据)ack.acknowledge();}catch(Exceptione){// 4. 异常处理(自己写重试,极易重复入库)retryInsert(logs);}}}

#Flink 方案的优势
并发:只需设置env.setParallelism(10),就能用 10 个并行度处理,轻松扛每秒 1 万条;
可靠:Checkpoint 自动记录状态,程序崩溃重启后从 5 秒前的快照继续,数据不丢不重;
扩展:要加 CEP 检测异常行为,只需加几行 Pattern 代码,无需重构;
运维:启动 Flink Web UI(默认 8081 端口),能实时看到处理吞吐量、延迟、失败数,一键扩缩容。

publicclassFlinkLogCleanJob{publicstaticvoidmain(String[]args)throwsException{StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();// 开启Checkpoint(每5秒做一次快照,故障重启不丢数据)env.enableCheckpointing(5000);env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);// 1. 消费Kafka数据(自动做负载均衡、分区分配)PropertieskafkaProps=newProperties();kafkaProps.setProperty("bootstrap.servers","localhost:9092");kafkaProps.setProperty("group.id","flink-log-group");DataStream<String>logStream=env.addSource(newFlinkKafkaConsumer<>("user_log",newSimpleStringSchema(),kafkaProps));// 2. 数据清洗(内置算子,支持复杂逻辑)DataStream<String>cleanStream=logStream.filter(log->log!=null&&!log.contains("null"))// 过滤脏数据.keyBy(log->log.split(",")[0])// 按用户ID分组.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))// 5秒窗口去重.distinct();// 内置去重算子// 3. 写入ClickHouse(内置连接器,支持Exactly-Once)cleanStream.addSink(ClickHouseSink.builder().setUrl("jdbc:clickhouse://localhost:8123/db").setSql("INSERT INTO user_log VALUES (?)").build());env.execute("Flink Log Clean Job");}}

Flink 是如何实现分布式

Flink 集群分为 3 类节点,分工明确:

<dependencies><!--Flink核心依赖--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.17.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>1.17.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>1.17.0</version></dependency><!--Flink-Kafka连接器--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>1.17.0</version></dependency><!--Flink-JDBC(对接ClickHouse--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>1.17.0</version></dependency><!--ClickHouseJDBC驱动--><dependency><groupId>ru.yandex.clickhouse</groupId><artifactId>clickhouse-jdbc</artifactId><version>0.3.2</version></dependency></dependencies>
importorg.apache.flink.api.common.eventtime.WatermarkStrategy;importorg.apache.flink.api.common.functions.FilterFunction;importorg.apache.flink.api.common.functions.MapFunction;importorg.apache.flink.api.common.serialization.SimpleStringSchema;importorg.apache.flink.connector.jdbc.JdbcConnectionOptions;importorg.apache.flink.connector.jdbc.JdbcSink;importorg.apache.flink.connector.kafka.source.KafkaSource;importorg.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;importorg.apache.flink.streaming.api.windowing.time.Time;importjava.sql.PreparedStatement;/** * Flink 分布式数据清洗入库(完整可运行版) * 功能:消费Kafka日志 → 清洗 → 5秒窗口去重 → 写入ClickHouse */publicclassFlinkDistributedLogCleanJob{// 定义ClickHouse表结构(提前创建):CREATE TABLE user_log (user_id String, log_content String, ts BIGINT) ENGINE = MergeTree() ORDER BY ts;publicstaticclassUserLog{privateStringuserId;privateStringlogContent;privatelongts;publicUserLog(StringuserId,StringlogContent,longts){this.userId=userId;this.logContent=logContent;this.ts=ts;}// getterpublicStringgetUserId(){returnuserId;}publicStringgetLogContent(){returnlogContent;}publiclonggetTs(){returnts;}}publicstaticvoidmain(String[]args)throwsException{// 1. 创建Flink执行环境(分布式执行的入口)StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();// 设置并行度=3(分布式核心:任务会拆成3个子任务运行在不同TaskManager)env.setParallelism(3);// 2. 开启Checkpoint(分布式容错核心:每5秒快照,故障重启不丢数据)env.enableCheckpointing(5000);// 5秒一次Checkpointenv.getCheckpointConfig().setCheckpointingMode(org.apache.flink.streaming.api.CheckpointingMode.EXACTLY_ONCE);// 精准一次语义env.getCheckpointConfig().setCheckpointTimeout(60000);// Checkpoint超时时间60秒// 3. 配置Kafka数据源(分布式消费:自动分配分区到不同子任务)KafkaSource<String>kafkaSource=KafkaSource.<String>builder().setBootstrapServers("localhost:9092")// Kafka集群地址(分布式部署时填多个节点).setTopics("user_log")// 消费的Topic.setGroupId("flink-log-group")// 消费组.setStartingOffsets(OffsetsInitializer.latest())// 从最新偏移量开始消费.setValueOnlyDeserializer(newSimpleStringSchema())// 反序列化.build();// 4. 读取Kafka数据(分布式消费)DataStream<String>kafkaStream=env.fromSource(kafkaSource,WatermarkStrategy.noWatermarks(),"Kafka Source");// 5. 分布式数据清洗:过滤脏数据 + 解析为实体类DataStream<UserLog>cleanLogStream=kafkaStream// 过滤空值/异常日志(分布式执行:每个子任务独立过滤自己的分片数据).filter((FilterFunction<String>)log->log!=null&&!log.contains("null")&&log.split(",").length==3)// 解析日志(格式:user_id,log_content,ts).map((MapFunction<String,UserLog>)log->{String[]fields=log.split(",");returnnewUserLog(fields[0],fields[1],Long.parseLong(fields[2]));});// 6. 分布式窗口去重(按用户ID分组,5秒窗口去重)DataStream<UserLog>distinctLogStream=cleanLogStream.keyBy(UserLog::getUserId)// 按用户ID分组(分布式:相同用户的日志会发到同一个子任务).window(TumblingProcessingTimeWindows.of(Time.seconds(5)))// 5秒滚动窗口.distinct(UserLog::getLogContent);// 窗口内去重(分布式执行)// 7. 分布式写入ClickHouse(每个子任务独立写入,Flink保证Exactly-Once)distinctLogStream.addSink(JdbcSink.sink("INSERT INTO user_log (user_id, log_content, ts) VALUES (?, ?, ?)",// ClickHouse插入SQL(PreparedStatementstmt,UserLoglog)->{// 绑定参数stmt.setString(1,log.getUserId());stmt.setString(2,log.getLogContent());stmt.setLong(3,log.getTs());},newJdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:clickhouse://localhost:8123/default")// ClickHouse地址.withDriverName("ru.yandex.clickhouse.ClickHouseDriver")// 驱动类.withUsername("default")// 用户名.withPassword("")// 密码(默认空).build())).name("ClickHouse Sink");// 8. 提交任务(分布式执行:JobManager接收任务后拆分并调度到TaskManager)env.execute("Flink Distributed Log Clean Job");}}

启动 Flink 集群:

./bin/start-cluster.sh #(会启动 1 个 JobManager + 1 个 TaskManager,默认有 4 个 Slot);
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/20 10:32:17

3分钟为Word添加APA第7版引用模板:告别手动格式化的终极指南

3分钟为Word添加APA第7版引用模板&#xff1a;告别手动格式化的终极指南 【免费下载链接】APA-7th-Edition Microsoft Word XSD for generating APA 7th edition references 项目地址: https://gitcode.com/gh_mirrors/ap/APA-7th-Edition 你是否曾因APA格式问题而延误论…

作者头像 李华
网站建设 2026/4/20 10:31:21

为什么说 GXDE 的 DDE 比 Deepin 25 更适合 Linux 小白?

如果你正在为“从 Windows 转 Linux”做功课&#xff0c;大概率绕不开国产系统的两大代表&#xff1a;Deepin&#xff08;深度&#xff09;和基于它的社区改版 GXDE。虽然 Deepin 25 名声在外&#xff0c;但对于真正的零基础初学者&#xff0c;GXDE&#xff08;Gorgeous eXtend…

作者头像 李华
网站建设 2026/4/20 10:26:18

27考研高数复习笔记1——选填函数题思路

27考研高数复习笔记&#xff1a;函数选填题核​​心思路1.函数问题的通用解题策略在处理函数相关的选择题或填空题时&#xff0c;应遵循以下优先级&#xff1a;优先方案&#xff1a;尝试使用函数法&#xff08;代数性质分析&#xff09;或作图法。数形结合通常是突破复杂函数关…

作者头像 李华