1. Logstash 是什么?
Logstash 是 Elastic Stack (ELK/Elastic Stack) 的核心成员之一,扮演着数据处理管道的角色。你可以把它想象成一个功能强大的“数据加工厂”。
它的主要工作流程是:
- 收集 (Input):从各种来源(如日志文件、数据库、消息队列、网络端口等)收集数据。
- 处理 (Filter):对收集到的数据进行解析、转换、丰富和结构化。这是 Logstash 最强大的部分。
- 输出 (Output):将处理好的数据发送到一个或多个目的地(如 Elasticsearch、数据库、文件、消息队列等)。
这个Input -> Filter -> Output的流程被称为一个管道 (Pipeline)。
2.核心概念
- 事件 (Event):在 Logstash 中,每一条数据(比如一行日志)都被视为一个“事件”。事件是一个 JSON 对象,包含了原始数据以及 Logstash 处理过程中添加的元数据。
- 插件 (Plugin):Logstash 的功能都是通过插件实现的。插件分为三种类型,对应管道的三个阶段:
- 输入插件 (Input Plugins):负责数据收集。
- 过滤插件 (Filter Plugins):负责数据处理。
- 输出插件 (Output Plugins):负责数据发送。
- 配置文件 (Configuration File):定义 Logstash 管道行为的文件。通常以
.conf结尾。
3. 安装与运行
前提条件
- 安装 Java 8 或 Java 11。你可以通过
java -version命令检查。
安装
- 从 Elastic 官网下载 Logstash。
- 解压下载的压缩包。
运行
Logstash 的运行非常简单,核心命令是:
# -f 参数指定配置文件 bin/logstash -f <your_config_file.conf>在开始编写复杂配置前,先来一个最简单的“Hello World”示例。
创建一个名为first-pipeline.conf的文件:
# first-pipeline.conf # 输入:从标准输入(你的键盘)接收数据 input { stdin {} } # 输出:将数据格式化后输出到标准输出(你的屏幕) output { stdout { codec => rubydebug # rubydebug 是一种格式,让输出更易读 } }运行它:
bin/logstash -f first-pipeline.conf启动后,在控制台输入 “hello logstash”,然后按回车。你会看到类似下面的输出:
{ "message" => "hello logstash", "@version" => "1", "host" => "your-hostname", "@timestamp" => 2023-10-27T08:30:00.123Z }这证明你的 Logstash 管道已经成功运行。message是你输入的内容,@version,host,@timestamp是 Logstash 自动为每个事件添加的元数据。
4. 配置文件详解
Logstash 配置文件由三个主要部分组成:input,filter,output。
# 输入段 input { # ... 一个或多个输入插件配置 ... } # 过滤段(可选) filter { # ... 一个或多个过滤插件配置 ... } # 输出段 output { # ... 一个或多个输出插件配置 ... }4.1 输入插件 (Input)
作用:定义数据从哪里来。
常用插件示例:
file: 从文件中读取数据,非常适合读取日志文件。它会记录读取位置,不会重复读取。input { file { path => "/var/log/nginx/access.log" # 要读取的文件路径 start_position => "beginning" # 从文件开头开始读取 sincedb_path => "/dev/null" # (可选) 在开发测试时,禁止记录读取位置,方便重复测试 } }beats: 接收来自 Filebeat、Metricbeat 等 Beats 家族成员发送的数据。这是生产环境中非常推荐的组合。input { beats { port => 5044 # 监听的端口 } }tcp/udp: 监听一个 TCP 或 UDP 端口来接收数据。input { tcp { port => 9999 codec => "json" # 假设传入的数据是 JSON 格式 } }kafka: 从 Kafka 主题中消费数据。input { kafka { bootstrap_servers => "kafka-server:9092" topics => ["log-topic"] } }jdbc: 通过 JDBC 定期查询数据库。input { jdbc { jdbc_driver_library => "/path/to/mysql-connector-java.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://db-server:3306/mydatabase" jdbc_user => "user" jdbc_password => "password" statement => "SELECT * from logs WHERE timestamp > :sql_last_value" schedule => "*/5 * * * *" # 每5分钟执行一次 } }
4.2 过滤插件 (Filter)
作用:数据处理的核心,对事件进行解析、转换、丰富。
常用插件示例:
grok:(最重要)用于解析非结构化文本数据,将其转换为结构化数据。它使用预定义的正则表达式模式。- 场景:解析 Nginx 或 Apache 的访问日志。
- 示例:
解析后,事件会增加# 原始日志: 127.0.0.1 - - [27/Oct/2023:12:00:00 +0800] "GET /index.html HTTP/1.1" 200 1234 filter { grok { # %{COMBINEDAPACHELOG} 是一个预定义的 grok 模式 match => { "message" => "%{COMBINEDAPACHELOG}" } } }clientip,verb,request,response,bytes等字段。技巧:使用 Kibana Grok Debugger 工具可以非常方便地测试和创建你的 Grok 模式。
mutate: 用于对字段进行通用操作,如重命名、删除、替换和类型转换。filter { mutate { # 将 "response" 字段的值转换为整数 convert => { "response" => "integer" } # 重命名 "clientip" 字段为 "client_address" rename => { "clientip" => "client_address" } # 删除不需要的字段 remove_field => ["field_to_remove_1", "field_to_remove_2"] # 添加新字段 add_field => { "processed_by" => "logstash-server-1" } } }date: 解析字段中的时间字符串,并用它来替换 Logstash 事件的默认@timestamp字段。这对于确保事件时间准确性至关重要。# Nginx 日志中的时间格式: [27/Oct/2023:12:00:00 +0800] filter { grok { match => { "message" => "%{HTTPDATE:log_timestamp}" } } date { # 匹配 log_timestamp 字段中的时间 match => [ "log_timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ] # 成功解析后,删除原始的 log_timestamp 字段 remove_field => [ "log_timestamp" ] } }json: 解析值为 JSON 字符串的字段。# 如果原始 message 是: { "user": "john", "action": "login" } filter { json { source => "message" # 指定要解析的字段 } } # 解析后,事件会增加 user 和 action 两个字段geoip: 根据 IP 地址信息,添加地理位置数据(如国家、城市、经纬度)。filter { geoip { source => "client_address" # 指定包含 IP 地址的字段 } } # 成功后,会添加一个 geoip 字段,里面包含 location, country_name 等信息
条件判断 (Conditionals)
filter块支持if,else if,else语句,可以根据不同类型的日志执行不同的处理逻辑。
filter { if [type] == "nginx-access" { grok { ... } } else if [type] == "app-json" { json { ... } } else { # 其他处理 } }4.3 输出插件 (Output)
作用:定义处理好的数据发送到哪里。
常用插件示例:
elasticsearch:(最常用)将数据发送到 Elasticsearch 集群。output { elasticsearch { hosts => ["http://es-node1:9200", "http://es-node2:9200"] # ES 集群地址 index => "logstash-%{+YYYY.MM.dd}" # 索引名称,按天创建 user => "elastic" password => "your_password" } }stdout: 输出到标准输出,主要用于调试。output { stdout { codec => rubydebug # 格式化输出,便于查看 } }file: 将数据写入文件。output { file { path => "/tmp/logstash_output.log" } }kafka: 将数据发送到 Kafka。output { kafka { bootstrap_servers => "kafka-server:9092" topic_id => "processed_logs" } }
5. 实战案例:解析 Nginx 访问日志并存入 Elasticsearch
目标:
- 读取 Nginx 访问日志文件
/var/log/nginx/access.log。 - 使用 Grok 解析日志内容。
- 根据客户端 IP 添加地理位置信息。
- 将处理后的数据存入 Elasticsearch。
- 在开发调试时,同时在控制台打印输出。
创建配置文件nginx-pipeline.conf:
# nginx-pipeline.conf input { file { path => "/path/to/your/nginx/access.log" start_position => "beginning" # 在生产环境中,应去掉下面这行或指定一个真实路径 sincedb_path => "/dev/null" } } filter { # 1. 使用 grok 解析非结构化日志 grok { # COMBINEDAPACHELOG 是一个内置的模式,适用于常见的 Nginx/Apache 日志格式 # 示例日志: 192.168.1.1 - - [27/Oct/2023:14:10:30 +0800] "GET /api/v1/users HTTP/1.1" 200 512 "-" "Mozilla/5.0..." match => { "message" => "%{COMBINEDAPACHELOG}" } } # 2. 如果 grok 解析成功,则继续处理 if "_grokparsefailure" not in [tags] { # 2.1 解析时间戳,并设置为事件的主时间戳 date { match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ] } # 2.2 使用 geoip 插件,通过 clientip 字段获取地理位置信息 geoip { source => "clientip" } # 2.3 转换字段类型 mutate { convert => { "bytes" => "integer" } convert => { "response" => "integer" } } # 2.4 (可选) 解析成功后,可以删除原始的 message 字段以节省空间 mutate { remove_field => [ "message", "timestamp" ] } } } output { # 输出到 Elasticsearch elasticsearch { hosts => ["http://localhost:9200"] index => "nginx-access-%{+YYYY.MM.dd}" } # 同时输出到控制台,方便调试 stdout { codec => rubydebug } }运行与测试:
检查配置:在运行前,先检查配置文件语法是否正确。
bin/logstash -f nginx-pipeline.conf --config.test_and_exit如果看到
Configuration OK,说明配置没问题。运行管道:
bin/logstash -f nginx-pipeline.conf生成日志(可选):向你的 Nginx 访问日志文件中追加一行新日志,观察 Logstash 控制台的输出。
处理后的输出应该类似这样 (rubydebug 格式):{ "agent" => "\"Mozilla/5.0...\"", "clientip" => "192.168.1.1", "verb" => "GET", "request" => "/api/v1/users", "httpversion" => "1.1", "response" => 200, "bytes" => 512, "geoip" => { "ip" => "192.168.1.1", "country_name" => "China", "city_name" => "Beijing", "location" => { "lon" => 116.3883, "lat" => 39.9289 } }, "@timestamp" => 2023-10-27T06:10:30.000Z, // 注意时间已经转为 UTC ... }同时,这些结构化数据也会被发送到 Elasticsearch 中名为
nginx-access-YYYY.MM.DD的索引里。
6. 高级主题与最佳实践
- 多管道 (Multiple Pipelines):不要把所有逻辑都放在一个巨大的配置文件里。使用
pipelines.yml文件来定义和管理多个独立的管道,提高模块化和可维护性。 - 持久化队列 (Persistent Queues):当输出端(如 Elasticsearch)不可用时,Logstash 默认将事件缓存在内存中。开启持久化队列 (
queue.type: persisted) 可以将队列写入磁盘,防止 Logstash 重启或崩溃时数据丢失。 - 死信队列 (Dead Letter Queues - DLQ):当事件无法被处理(如 JSON 解析失败)或无法发送到输出端时,可以将其发送到死信队列,以便后续分析和处理,而不是直接丢弃。
- 性能调优:
pipeline.workers:增加 worker 数量可以并行处理事件,提高吞吐量(受 CPU 核心数限制)。pipeline.batch.size:调整每个批次处理的事件数量。
- 监控:通过 Logstash 的监控 API (
curl -XGET 'localhost:9600/_node/stats') 来获取其运行状态、性能指标和插件信息。