news 2026/5/1 19:21:27

实时日志采集与统计分析平台

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
实时日志采集与统计分析平台

一、项目概述

本项目基于 Flume + Kafka + Spark Streaming 构建了一套完整的实时日志采集与统计分析系统,实现了从日志生成、实时采集、消息队列传输到流式计算的完整数据链路,端到端延迟控制在 15 秒以内。


二、前置准备

下载并配置flume和kafka环境

1、Kafka 安装配置步骤(QM131节点)

1.下载 Kafka

切换到模块目录

cd /opt/module

使用华为云镜像下载(速度快)

wget https://mirrors.huaweicloud.com/apache/kafka/3.0.0/kafka_2.13-3.0.0.tgz

2.解压并重命名

tar -zxvf kafka_2.13-3.0.0.tgz

mv kafka_2.13-3.0.0 kafka

3.配置环境变量

echo 'export KAFKA_HOME=/opt/module/kafka' >> /etc/profile

echo 'export PATH=$PATH:$KAFKA_HOME/bin' >> /etc/profile

source /etc/profile

2、验证安装

kafka-topics.sh --version # 应输出 3.0.0

1.优化内存配置(2GB节点)

vi /opt/module/kafka/bin/kafka-server-start.sh

修改内存参数:

export KAFKA_HEAP_OPTS="-Xmx512m -Xms512m"

2.配置 server.properties

vi /opt/module/kafka/config/server.properties

修改以下配置:

properties

broker.id=1

listeners=PLAINTEXT://QM131:9092

advertised.listeners=PLAINTEXT://QM131:9092

log.dirs=/opt/module/kafka/logs

zookeeper.connect=QM131:2181

3.启动 Kafka

cd /opt/module/kafka

3、启动 ZooKeeper(后台)

bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

4、启动 Kafka(后台)

bin/kafka-server-start.sh -daemon config/server.properties

5、验证进程

jps # 应看到 QuorumPeerMain 和 Kafka

  1. 创建 Topic

在 QM131 执行 cd /opt/module/kafka

bin/kafka-topics.sh --create \

--bootstrap-server QM131:9092 \

--replication-factor 1 \

--partitions 2 \

--topic user_log_topic

6、验证 Topic

bin/kafka-topics.sh --list --bootstrap-server QM131:9092

二、Flume 安装配置步骤(QM130节点)

1.下载 Flume

cd /opt/module

使用华为云镜像下载

wget https://mirrors.huaweicloud.com/apache/flume/1.9.0/apache-flume-1.9.0-bin.tar.gz

2.解压并重命名

tar -zxvf apache-flume-1.9.0-bin.tar.gz

mv apache-flume-1.9.0-bin flume

3.配置环境变量

echo 'export FLUME_HOME=/opt/module/flume' >> /etc/profile

echo 'export PATH=$PATH:$FLUME_HOME/bin' >> /etc/profile

source /etc/profile

7、验证安装

flume-ng version # 应显示 1.9.0

1.创建项目目录和配置

创建目录

mkdir -p /opt/project/realtime/{data,conf,logs}

8、创建 Flume 配置文件

vi /opt/project/realtime/conf/flume_kafka.conf

配置文件内容:

properties agent.sources = tail_source agent.channels = memory_channel agent.sinks = kafka_sink agent.sources.tail_source.type = exec agent.sources.tail_source.command = tail -F /opt/project/realtime/data/click.log agent.sources.tail_source.shell = /bin/sh -c agent.channels.memory_channel.type = memory agent.channels.memory_channel.capacity = 1000 agent.channels.memory_channel.transactionCapacity = 100 agent.sinks.kafka_sink.type = org.apache.flume.sink.kafka.KafkaSink agent.sinks.kafka_sink.kafka.bootstrap.servers = QM131:9092 agent.sinks.kafka_sink.kafka.topic = user_log_topic agent.sources.tail_source.channels = memory_channel agent.sinks.kafka_sink.channel = memory_channel

1.创建日志生成脚本

vi /opt/project/realtime/data/generate_log.py

python #!/usr/bin/env python3 import time import random import datetime actions = ['browse', 'add_to_cart', 'collect', 'pay'] categories = ['家电', '数码', '服装', '美妆', '食品'] log_file = "/opt/project/realtime/data/click.log" while True: timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") user_id = random.randint(1000, 9999) action = random.choice(actions) category = random.choice(categories) log_line = f"{timestamp}|{user_id}|{action}|{category}\n" with open(log_file, "a") as f: f.write(log_line) print(log_line.strip()) time.sleep(1)

chmod +x /opt/project/realtime/data/generate_log.py

三、启动命令汇总

启动顺序

顺序 节点 命令

1 QM131 Kafka启动(ZooKeeper + Kafka)

2 QM130 日志生成脚本

3 QM130 Flume采集

4 QM131 Kafka消费者验证(可选)

具体命令

QM131 - 启动Kafka:

cd /opt/module/kafka

bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

bin/kafka-server-start.sh -daemon config/server.properties

QM130 - 启动日志脚本:

python3 /opt/project/realtime/data/generate_log.py

QM130 - 启动Flume:

cd /opt/module/flume

bin/flume-ng agent \

--name agent \

--conf conf \

--conf-file /opt/project/realtime/conf/flume_kafka.conf \

-Dflume.root.logger=INFO,console

QM131 - Kafka消费者验证:

cd /opt/module/kafka

bin/kafka-console-consumer.sh --bootstrap-server QM131:9092 --topic user_log_topic

二、技术架构

技术栈详情

组件版本作用部署节点
Flume1.9.0日志采集,监控文件变化并发送至KafkaQM130
Kafka3.0.0消息队列,解耦采集与计算,缓冲数据QM131
Spark3.0.0流式计算,实时消费Kafka进行PV统计QM130
ZooKeeper3.8.0Kafka集群协调QM131
Python3.x模拟用户行为日志生成QM130

三、集群环境

节点分配

节点角色部署组件内存
QM130主节点Flume + 日志生成脚本 + Spark提交2GB
QM131Kafka节点ZooKeeper + Kafka Broker2GB
QM132-133从节点Hadoop DataNode2GB

环境配置

  • 操作系统:CentOS 7

  • JDK版本:Java 1.8.0_281

  • Hadoop版本:3.1.4

  • Spark路径:/opt/module/spark-local


四、核心功能实现

4.1 日志生成模块

文件位置:/opt/project/realtime/data/generate_log.py

功能:模拟用户行为日志,每秒生成一条记录

日志格式:

2026-04-29 20:14:01|5394|browse|家电

字段说明:

  • timestamp:行为发生时间

  • user_id:用户ID(1000-9999随机)

  • action:行为类型(browse/add_to_cart/collect/pay)

  • category:商品品类(家电/数码/服装/美妆/食品)

4.2 Flume采集模块

配置文件:/opt/project/realtime/conf/flume_kafka.conf

# Source:监控日志文件变化 agent.sources.tail_source.type = exec agent.sources.tail_source.command = tail -F /opt/project/realtime/data/click.log # Channel:内存通道(容量1000) agent.channels.memory_channel.type = memory agent.channels.memory_channel.capacity = 1000 # Sink:输出到Kafka agent.sinks.kafka_sink.type = org.apache.flume.sink.kafka.KafkaSink agent.sinks.kafka_sink.kafka.bootstrap.servers = QM131:9092 agent.sinks.kafka_sink.kafka.topic = user_log_topic

Flume 启动 [root@QM130 flume]# bin/flume-ng agent \

--name agent \

--conf conf \

--conf-file /opt/project/realtime/conf/flume_kafka.conf \

-Dflume.root.logger=INFO,console

4.3 Kafka消息队列模块

节点:QM131

配置优化(2GB内存节点):

bash

export KAFKA_HEAP_OPTS="-Xmx512m -Xms512m"

创建的Topic:

  • 名称:user_log_topic

  • 分区数:2

  • 副本因子:1

4.4 Spark Streaming实时计算模块

脚本:/opt/project/realtime/scripts/streaming_file.py

核心逻辑:

  1. 读取Kafka中的日志流

  2. |分割解析日志

  3. 按分钟聚合计算PV

  4. 每10秒输出统计结果

提交命令:

bin/spark-submit \ --master local[2] \ --driver-memory 512m \ /opt/project/realtime/scripts/streaming_file.py

五、项目成果

5.1 运行效果

5.2 性能指标

指标数值
日志生成速率1条/秒
Flume→Kafka延迟< 1秒
Spark处理延迟< 10秒
端到端总延迟< 15秒
单日处理能力10万+条

5.3 已完成功能

  • 模拟用户行为日志持续生成

  • Flume实时监控日志文件变化

  • Kafka消息队列数据缓冲与传输

  • Spark Streaming实时消费与PV统计

  • 每10秒输出分钟级统计数据

  • 2GB小内存节点参数调优


六、踩坑与解决方案

问题解决方案
国外源下载过慢换华为云镜像:mirrors.huaweicloud.com
Kafka依赖包下载失败换用文件流方式(file://协议)
Spark读取HDFS而非本地使用 file:// 前缀指定本地路径
Flume连不上Kafka关闭防火墙:systemctl stop firewalld
2GB内存OOM调小Kafka和Spark堆内存至512M

七、项目亮点

  1. 完整的实时链路:从数据产生、采集、传输到计算全流程打通

  2. 资源受限环境优化:2GB节点下通过参数调优保障稳定运行

  3. 真实模拟数据:模拟用户行为日志,贴近生产环境

  4. 模块化配置:各组件独立配置,易于扩展和维护

  5. 可观测性强:每10秒输出统计结果,实时监控数据流


八、扩展

  • 增加UV统计:

  • uv_df = parsed_df.groupBy("process_minute").agg( approx_count_distinct("user_id").alias("uv") )
  • 品类热度排行:按category分组统计

  • category_df = parsed_df.groupBy("process_minute", "category") \ .count() \ .orderBy("process_minute", col("count").desc())
  • 行为分布

  • action_df = parsed_df.groupBy("process_minute", "action") \ .count() \ .orderBy("process_minute", col("count").desc())

    增强版代码:

  • from pyspark.sql import SparkSession from pyspark.sql.functions import * from pyspark.sql.types import * def main(): spark = SparkSession.builder \ .appName("RealTimeLogAnalysis_FileStream") \ .config("spark.sql.shuffle.partitions", "2") \ .getOrCreate() print("=" * 70) print("Spark Streaming 文件流增强版已启动") print("监控目录: /opt/project/realtime/data") print("=" * 70) # 读取文件流 df = spark.readStream \ .format("text") \ .option("pathGlobFilter", "*.log") \ .load("file:///opt/project/realtime/data") # 定义解析函数 def parse_log(line): try: parts = line.split('|') if len(parts) == 4 and parts[0].startswith('20'): return (parts[0], int(parts[1]), parts[2], parts[3]) except: pass return None schema = StructType([ StructField("timestamp", StringType(), True), StructField("user_id", IntegerType(), True), StructField("action", StringType(), True), StructField("category", StringType(), True) ]) parse_udf = udf(parse_log, schema) # 解析并过滤 parsed_df = df.select(parse_udf(col("value")).alias("data")) \ .filter(col("data").isNotNull()) \ .select( col("data.timestamp").alias("timestamp"), col("data.user_id").alias("user_id"), col("data.action").alias("action"), col("data.category").alias("category") ) # 添加处理时间 parsed_df = parsed_df.withColumn("process_minute", date_format(current_timestamp(), "yyyy-MM-dd HH:mm")) # ============================================================ # 1. PV统计(每分钟) # ============================================================ pv_df = parsed_df.groupBy("process_minute").count() query_pv = pv_df.writeStream \ .outputMode("complete") \ .format("console") \ .trigger(processingTime="10 seconds") \ .queryName("PV统计") \ .start() # ============================================================ # 2. UV统计(使用近似去重) # ============================================================ uv_df = parsed_df.groupBy("process_minute").agg( approx_count_distinct("user_id").alias("uv") ) query_uv = uv_df.writeStream \ .outputMode("complete") \ .format("console") \ .trigger(processingTime="10 seconds") \ .queryName("UV统计") \ .start() # ============================================================ # 3. 品类热度统计 # ============================================================ category_df = parsed_df.groupBy("process_minute", "category") \ .count() \ .orderBy("process_minute", col("count").desc()) query_category = category_df.writeStream \ .outputMode("complete") \ .format("console") \ .trigger(processingTime="10 seconds") \ .queryName("品类热度") \ .start() # ============================================================ # 4. 行为分布统计 # ============================================================ action_df = parsed_df.groupBy("process_minute", "action") \ .count() \ .orderBy("process_minute", col("count").desc()) query_action = action_df.writeStream \ .outputMode("complete") \ .format("console") \ .trigger(processingTime="10 seconds") \ .queryName("行为分布") \ .start() print("功能列表:") print(" PV统计 - 每10秒输出") print(" UV统计(近似) - 每10秒输出") print(" 品类热度 - 每10秒输出") print(" 行为分布 - 每10秒输出") print("=" * 70) print("等待数据流入...") print("=" * 70) query_pv.awaitTermination() if __name__ == "__main__": main()

    结果截图:
    1.实时日志

2.日志采集与传输

3.实时计算

九、后续可拓展方向

  • 结果持久化:写入MySQL或Hive供可视化展示

  • 接入Grafana:构建实时监控大屏

  • 增加告警机制:PV突增时触发报警

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/5/1 19:19:29

【学习笔记】储能系统的铁三角:BMS、PCS、EMS分别管啥

一套典型的储能系统&#xff0c;通常由三个核心控制系统组成&#xff1a;BMS、PCS、EMS。 很多人听完还是一头雾水——都是英文缩写&#xff0c;都是"系统"&#xff0c;都很重要&#xff0c;但它们到底各管什么、有什么区别、哪个更容易出问题&#xff1f; 今天用一个…

作者头像 李华
网站建设 2026/5/1 19:14:35

多模态情感分析中的对比学习

案例&#xff1a;你有没有刷到过这样的视频&#xff1a;一个人嘴上说着“我没事”&#xff0c;声音却在发抖&#xff0c;眼眶还红红的。只看文字&#xff0c;AI会判为“中性”&#xff1b;但加上语音和画面&#xff0c;真实情感是“悲伤”。怎么让模型把这三者对齐、听懂“言外…

作者头像 李华
网站建设 2026/5/1 19:12:33

5分钟彻底解决Windows热键冲突:Hotkey Detective完全使用指南

5分钟彻底解决Windows热键冲突&#xff1a;Hotkey Detective完全使用指南 【免费下载链接】hotkey-detective A small program for investigating stolen key combinations under Windows 7 and later. 项目地址: https://gitcode.com/gh_mirrors/ho/hotkey-detective 你…

作者头像 李华
网站建设 2026/5/1 19:12:31

使用curl命令快速测试Taotoken的API连通性与基础功能

使用curl命令快速测试Taotoken的API连通性与基础功能 1. 准备工作 在开始测试之前&#xff0c;请确保已获取有效的Taotoken API Key。登录Taotoken控制台&#xff0c;在「API密钥管理」页面创建或查看现有密钥。同时确认已安装curl工具&#xff0c;大多数Linux/macOS系统已预…

作者头像 李华