news 2026/6/11 22:11:57

Logstash详解配置使用

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Logstash详解配置使用

1. Logstash 是什么?

Logstash 是 Elastic Stack (ELK/Elastic Stack) 的核心成员之一,扮演着数据处理管道的角色。你可以把它想象成一个功能强大的“数据加工厂”。

它的主要工作流程是:

  1. 收集 (Input):从各种来源(如日志文件、数据库、消息队列、网络端口等)收集数据。
  2. 处理 (Filter):对收集到的数据进行解析、转换、丰富和结构化。这是 Logstash 最强大的部分。
  3. 输出 (Output):将处理好的数据发送到一个或多个目的地(如 Elasticsearch、数据库、文件、消息队列等)。

这个Input -> Filter -> Output的流程被称为一个管道 (Pipeline)

2.核心概念

  • 事件 (Event):在 Logstash 中,每一条数据(比如一行日志)都被视为一个“事件”。事件是一个 JSON 对象,包含了原始数据以及 Logstash 处理过程中添加的元数据。
  • 插件 (Plugin):Logstash 的功能都是通过插件实现的。插件分为三种类型,对应管道的三个阶段:
    • 输入插件 (Input Plugins):负责数据收集。
    • 过滤插件 (Filter Plugins):负责数据处理。
    • 输出插件 (Output Plugins):负责数据发送。
  • 配置文件 (Configuration File):定义 Logstash 管道行为的文件。通常以.conf结尾。

3. 安装与运行

前提条件
  • 安装 Java 8 或 Java 11。你可以通过java -version命令检查。
安装
  1. 从 Elastic 官网下载 Logstash。
  2. 解压下载的压缩包。
运行

Logstash 的运行非常简单,核心命令是:

# -f 参数指定配置文件 bin/logstash -f <your_config_file.conf>

在开始编写复杂配置前,先来一个最简单的“Hello World”示例。

创建一个名为first-pipeline.conf的文件:

# first-pipeline.conf # 输入:从标准输入(你的键盘)接收数据 input { stdin {} } # 输出:将数据格式化后输出到标准输出(你的屏幕) output { stdout { codec => rubydebug # rubydebug 是一种格式,让输出更易读 } }

运行它:

bin/logstash -f first-pipeline.conf

启动后,在控制台输入 “hello logstash”,然后按回车。你会看到类似下面的输出:

{ "message" => "hello logstash", "@version" => "1", "host" => "your-hostname", "@timestamp" => 2023-10-27T08:30:00.123Z }

这证明你的 Logstash 管道已经成功运行。message是你输入的内容,@version,host,@timestamp是 Logstash 自动为每个事件添加的元数据。


4. 配置文件详解

Logstash 配置文件由三个主要部分组成:input,filter,output

# 输入段 input { # ... 一个或多个输入插件配置 ... } # 过滤段(可选) filter { # ... 一个或多个过滤插件配置 ... } # 输出段 output { # ... 一个或多个输出插件配置 ... }
4.1 输入插件 (Input)

作用:定义数据从哪里来。

常用插件示例

  • file: 从文件中读取数据,非常适合读取日志文件。它会记录读取位置,不会重复读取。

    input { file { path => "/var/log/nginx/access.log" # 要读取的文件路径 start_position => "beginning" # 从文件开头开始读取 sincedb_path => "/dev/null" # (可选) 在开发测试时,禁止记录读取位置,方便重复测试 } }
  • beats: 接收来自 Filebeat、Metricbeat 等 Beats 家族成员发送的数据。这是生产环境中非常推荐的组合。

    input { beats { port => 5044 # 监听的端口 } }
  • tcp/udp: 监听一个 TCP 或 UDP 端口来接收数据。

    input { tcp { port => 9999 codec => "json" # 假设传入的数据是 JSON 格式 } }
  • kafka: 从 Kafka 主题中消费数据。

    input { kafka { bootstrap_servers => "kafka-server:9092" topics => ["log-topic"] } }
  • jdbc: 通过 JDBC 定期查询数据库。

    input { jdbc { jdbc_driver_library => "/path/to/mysql-connector-java.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://db-server:3306/mydatabase" jdbc_user => "user" jdbc_password => "password" statement => "SELECT * from logs WHERE timestamp > :sql_last_value" schedule => "*/5 * * * *" # 每5分钟执行一次 } }
4.2 过滤插件 (Filter)

作用:数据处理的核心,对事件进行解析、转换、丰富。

常用插件示例

  • grok:(最重要)用于解析非结构化文本数据,将其转换为结构化数据。它使用预定义的正则表达式模式。

    • 场景:解析 Nginx 或 Apache 的访问日志。
    • 示例
      # 原始日志: 127.0.0.1 - - [27/Oct/2023:12:00:00 +0800] "GET /index.html HTTP/1.1" 200 1234 filter { grok { # %{COMBINEDAPACHELOG} 是一个预定义的 grok 模式 match => { "message" => "%{COMBINEDAPACHELOG}" } } }
      解析后,事件会增加clientip,verb,request,response,bytes等字段。

      技巧:使用 Kibana Grok Debugger 工具可以非常方便地测试和创建你的 Grok 模式。

  • mutate: 用于对字段进行通用操作,如重命名、删除、替换和类型转换。

    filter { mutate { # 将 "response" 字段的值转换为整数 convert => { "response" => "integer" } # 重命名 "clientip" 字段为 "client_address" rename => { "clientip" => "client_address" } # 删除不需要的字段 remove_field => ["field_to_remove_1", "field_to_remove_2"] # 添加新字段 add_field => { "processed_by" => "logstash-server-1" } } }
  • date: 解析字段中的时间字符串,并用它来替换 Logstash 事件的默认@timestamp字段。这对于确保事件时间准确性至关重要。

    # Nginx 日志中的时间格式: [27/Oct/2023:12:00:00 +0800] filter { grok { match => { "message" => "%{HTTPDATE:log_timestamp}" } } date { # 匹配 log_timestamp 字段中的时间 match => [ "log_timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ] # 成功解析后,删除原始的 log_timestamp 字段 remove_field => [ "log_timestamp" ] } }
  • json: 解析值为 JSON 字符串的字段。

    # 如果原始 message 是: { "user": "john", "action": "login" } filter { json { source => "message" # 指定要解析的字段 } } # 解析后,事件会增加 user 和 action 两个字段
  • geoip: 根据 IP 地址信息,添加地理位置数据(如国家、城市、经纬度)。

    filter { geoip { source => "client_address" # 指定包含 IP 地址的字段 } } # 成功后,会添加一个 geoip 字段,里面包含 location, country_name 等信息

条件判断 (Conditionals)

filter块支持if,else if,else语句,可以根据不同类型的日志执行不同的处理逻辑。

filter { if [type] == "nginx-access" { grok { ... } } else if [type] == "app-json" { json { ... } } else { # 其他处理 } }
4.3 输出插件 (Output)

作用:定义处理好的数据发送到哪里。

常用插件示例

  • elasticsearch:(最常用)将数据发送到 Elasticsearch 集群。

    output { elasticsearch { hosts => ["http://es-node1:9200", "http://es-node2:9200"] # ES 集群地址 index => "logstash-%{+YYYY.MM.dd}" # 索引名称,按天创建 user => "elastic" password => "your_password" } }
  • stdout: 输出到标准输出,主要用于调试。

    output { stdout { codec => rubydebug # 格式化输出,便于查看 } }
  • file: 将数据写入文件。

    output { file { path => "/tmp/logstash_output.log" } }
  • kafka: 将数据发送到 Kafka。

    output { kafka { bootstrap_servers => "kafka-server:9092" topic_id => "processed_logs" } }

5. 实战案例:解析 Nginx 访问日志并存入 Elasticsearch

目标

  1. 读取 Nginx 访问日志文件/var/log/nginx/access.log
  2. 使用 Grok 解析日志内容。
  3. 根据客户端 IP 添加地理位置信息。
  4. 将处理后的数据存入 Elasticsearch。
  5. 在开发调试时,同时在控制台打印输出。

创建配置文件nginx-pipeline.conf:

# nginx-pipeline.conf input { file { path => "/path/to/your/nginx/access.log" start_position => "beginning" # 在生产环境中,应去掉下面这行或指定一个真实路径 sincedb_path => "/dev/null" } } filter { # 1. 使用 grok 解析非结构化日志 grok { # COMBINEDAPACHELOG 是一个内置的模式,适用于常见的 Nginx/Apache 日志格式 # 示例日志: 192.168.1.1 - - [27/Oct/2023:14:10:30 +0800] "GET /api/v1/users HTTP/1.1" 200 512 "-" "Mozilla/5.0..." match => { "message" => "%{COMBINEDAPACHELOG}" } } # 2. 如果 grok 解析成功,则继续处理 if "_grokparsefailure" not in [tags] { # 2.1 解析时间戳,并设置为事件的主时间戳 date { match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ] } # 2.2 使用 geoip 插件,通过 clientip 字段获取地理位置信息 geoip { source => "clientip" } # 2.3 转换字段类型 mutate { convert => { "bytes" => "integer" } convert => { "response" => "integer" } } # 2.4 (可选) 解析成功后,可以删除原始的 message 字段以节省空间 mutate { remove_field => [ "message", "timestamp" ] } } } output { # 输出到 Elasticsearch elasticsearch { hosts => ["http://localhost:9200"] index => "nginx-access-%{+YYYY.MM.dd}" } # 同时输出到控制台,方便调试 stdout { codec => rubydebug } }

运行与测试

  1. 检查配置:在运行前,先检查配置文件语法是否正确。

    bin/logstash -f nginx-pipeline.conf --config.test_and_exit

    如果看到Configuration OK,说明配置没问题。

  2. 运行管道

    bin/logstash -f nginx-pipeline.conf
  3. 生成日志(可选):向你的 Nginx 访问日志文件中追加一行新日志,观察 Logstash 控制台的输出。
    处理后的输出应该类似这样 (rubydebug 格式):

    { "agent" => "\"Mozilla/5.0...\"", "clientip" => "192.168.1.1", "verb" => "GET", "request" => "/api/v1/users", "httpversion" => "1.1", "response" => 200, "bytes" => 512, "geoip" => { "ip" => "192.168.1.1", "country_name" => "China", "city_name" => "Beijing", "location" => { "lon" => 116.3883, "lat" => 39.9289 } }, "@timestamp" => 2023-10-27T06:10:30.000Z, // 注意时间已经转为 UTC ... }

    同时,这些结构化数据也会被发送到 Elasticsearch 中名为nginx-access-YYYY.MM.DD的索引里。


6. 高级主题与最佳实践

  • 多管道 (Multiple Pipelines):不要把所有逻辑都放在一个巨大的配置文件里。使用pipelines.yml文件来定义和管理多个独立的管道,提高模块化和可维护性。
  • 持久化队列 (Persistent Queues):当输出端(如 Elasticsearch)不可用时,Logstash 默认将事件缓存在内存中。开启持久化队列 (queue.type: persisted) 可以将队列写入磁盘,防止 Logstash 重启或崩溃时数据丢失。
  • 死信队列 (Dead Letter Queues - DLQ):当事件无法被处理(如 JSON 解析失败)或无法发送到输出端时,可以将其发送到死信队列,以便后续分析和处理,而不是直接丢弃。
  • 性能调优
    • pipeline.workers:增加 worker 数量可以并行处理事件,提高吞吐量(受 CPU 核心数限制)。
    • pipeline.batch.size:调整每个批次处理的事件数量。
  • 监控:通过 Logstash 的监控 API (curl -XGET 'localhost:9600/_node/stats') 来获取其运行状态、性能指标和插件信息。
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/6/10 15:06:38

SpringDoc 基本使用指南

SpringDoc 是基于 Spring Boot 的现代化 API 文档生成工具&#xff0c;通过自动化扫描代码和注解&#xff0c;生成符合 OpenAPI 3.0 规范 的交互式文档&#xff0c;并集成 Swagger UI 提供可视化测试界面。以下是其核心详解&#xff1a; 核心特性与优势 开箱即用 仅需添加依赖&…

作者头像 李华
网站建设 2026/6/10 21:29:10

零基础怎么学网络安全?超详细全程指南,带你从入门到精通

一、学习建议 1.了解基础概念&#xff1a; 开始之前&#xff0c;了解网络安全的基本概念和术语是很重要的。你可以查找网络安全入门教程或在线课程&#xff0c;了解网络安全领域的基本概念&#xff0c;如黑客、漏洞、攻击类型等。 2.网络基础知识&#xff1a; 学习计算机网…

作者头像 李华
网站建设 2026/6/10 12:53:05

手游防DDoS攻击SDK

手游防DDoS攻击SDK的关键功能手游防DDoS攻击的SDK通常需要具备流量清洗、IP黑白名单、协议分析、速率限制等功能。流量清洗通过识别异常流量并过滤恶意请求&#xff0c;确保正常玩家访问不受影响。IP黑白名单允许开发者手动或自动封禁可疑IP地址&#xff0c;减少攻击源。协议分…

作者头像 李华
网站建设 2026/6/10 12:54:56

HoRain云--揭秘AQS:ReentrantLock高效锁机制

&#x1f3ac; HoRain 云小助手&#xff1a;个人主页 ⛺️生活的理想&#xff0c;就是为了理想的生活! ⛳️ 推荐 前些天发现了一个超棒的服务器购买网站&#xff0c;性价比超高&#xff0c;大内存超划算&#xff01;忍不住分享一下给大家。点击跳转到网站。 目录 ⛳️ 推荐 …

作者头像 李华