前言

一般我们需要进行日志分析场景:直接在日志文件中 grepawk 就可以获得自己想要的信息。但在规模较大的场景中,此方法效率低下,面临问题包括日志量太大如何归档、文本搜索太慢怎么办、如何多维度查询。需要集中化的日志管理,所有服务器上的日志收集汇总。常见解决思路是建立集中式日志收集系统,将所有节点上的日志统一收集,管理,访问。ELK提供了一整套解决方案,并且都是开源软件,之间互相配合使用,完美衔接,高效的满足了很多场合的应用。

ELK

ELK是三个开源项目的首字母缩写,这三个项目分别是:ElasticsearchLogstashKibanaElasticsearch 是一个搜索和分析引擎。Logstash 是服务器端数据处理管道,能够同时从多个来源采集数据,转换数据,然后将数据发送到诸如 Elasticsearch 等“存储库”中。Kibana 则可以让用户在Elasticsearch 中使用图形和图表对数据进行可视化。

Elasticsearch

Elasticsearch 是一个实时的分布式搜索分析引擎,它能让你以前所未有的速度和规模去检索你的数据。

下载地址:https://www.elastic.co/cn/downloads/elasticsearch

配置

下面列举一下常见的配置项

  • cluster.name:集群名称,默认为elasticsearch
  • node.master:该节点是否可以作为主节点(注意这里只是有资格作为主节点,不代表该节点一定就是master),默认为true
  • node.name:节点名称,如果不配置es则会自动获取
  • path.conf:配置文件路径,默认为es根目录的config文件夹
  • path.data:配置索引数据的文件路径,默认是es根目录下的data文件夹,可以设置多个存储路径,用逗号隔开
  • path.logs:配置日志文件路径,默认是es根目录下的logs文件夹
  • path.plugins:配置es插件路径,默认是es根目录下的plugins文件夹
  • http.port:es的http端口,默认为9200
  • transport.tcp.port:与其它节点交互的端口,默认为9300
  • transport.tcp.compress:配置是否压缩tcp传输时的数据,默认为false,不压缩。
  • network.bind_host:配置绑定地址,默认为0.0.0.0
  • network.publish_host:设置其他节点连接此节点的ip地址,如果不设置的话,则自动获取,publish_host的地址必须为真实地址
  • network.host:同时设置bind_hostpublish_host这两个参数
  • http.enabled:是否对外使用http协议,默认为true
  • index.number_of_replicas:设置索引副本个数,默认为1
  • http.cors.enabled:是否支持跨域,默认为false
  • http.cors.allow-origin:当设置允许跨域,默认为*

测试

了解的配置文件项之后我们可以来进行简单的配置

vim config/elasticsearch.yml

cluster.name: master

node.name: elk-1

path.data: /data/es

path.logs: /var/log/es/

bootstrap.memory_lock: true

network.host: 0.0.0.0

http.port: 9200

http.cors.enabled: true

http.cors.allow-origin: "*"

然后启动es

bin/elasticsearch

检查服务端口是否正常监听

[qiyou@example es]$ netstat -utnl|grep -E "9200|9300"
tcp6       0      0 127.0.0.1:9200          :::*                    LISTEN     
tcp6       0      0 ::1:9200                :::*                    LISTEN     
tcp6       0      0 127.0.0.1:9300          :::*                    LISTEN     
tcp6       0      0 ::1:9300                :::*                    LISTEN

检查es是否正常工作,可以看到是正常工作的

[qiyou@example es]$ curl -i -XGET 'localhost:9200/_count?pretty' 
HTTP/1.1 200 OK
content-type: application/json; charset=UTF-8
content-length: 114

{
  "count" : 0,
  "_shards" : {
    "total" : 0,
    "successful" : 0,
    "skipped" : 0,
    "failed" : 0
  }
}

上面用命令检索数据来是不是感觉麻烦,我们可以安装es插件elasticsearch-head,项目链接:https://github.com/mobz/elasticsearch-head

git clone git://github.com/mobz/elasticsearch-head.git
cd elasticsearch-head
npm install
npm run start

检查是否正常启动

netstat -untl|grep 9100
tcp        0      0 0.0.0.0:9100            0.0.0.0:*               LISTEN

然后访问http://localhost:9100/即可

Logstash

Logstash是一个实时的管道式开源日志收集引擎。Logstash可以动态的将不同来源的数据进行归一并且将格式化的数据存储到你选择的位置。对你的所有做数据清洗和大众化处理,以便做数据分析和可视化。Logstash通过输入、过滤和输出插件Logstash可以对任何类型的事件丰富和转换,通过本地编码器还可以进一步简化此过程。

logstash下载地址:https://www.elastic.co/cn/downloads/logstash

logstash的基本目录结构如下及其含义:

Type Description Default Location Setting
home logstash安装的目录 {extract.path}
bin logstash的二进制脚本以及插件 {extract.path}/bin
settings 配置文件, 包含logstash.ymljvm.options {extract.path}/config path.settings
logs 日志文件 {extract.path}/logs path.logs
plugins 插件存放的目录,每个插件都包含在一个子目录中 {extract.path}/plugins path.plugins
data logstash及其插件为任何持久性需求所使用的数据文件 {extract.path}/data path.data

一个Logstash管道有两个必须的组件,inputoutput,除此之外还有一个可选的组件filterinput插件将数据从源读入,filter插件按照你的定义处理数据,最后通过output插件写入到目的地。

Logstash支持的input插件:https://www.elastic.co/guide/en/logstash/current/input-plugins.html

Logstash支持的output插件:https://www.elastic.co/guide/en/logstash/current/input-plugins.html

注:有的插件默认是没有安装的,可以使用logstash-plugin list列出所有已经安装的插件

配置

input插件

可以指定logstash的事件源

下面列举几种常见的事件源

  • stdin:标准输入
input { 
    stdin { } 
}
  • file:从文件中读取
file {
    path => "/var/log/secure" 
    type => "logstash_log" # 定义类型,一般用于过滤器中
    start_position => "beginning" # 表示从文件开头读取,默认为end
}
  • syslog:从syslog传输过来的事件中读取
syslog{
    port =>"514" # 监听的端口,syslog配置文件中配置:*.* @ip:514 即可
    type => "syslog"
}
  • beats:从Elastic beats接收事件
beats {
    port => 5044   # 监听的端口
}

然后在beat进行如下配置即可
output.logstash:
    hosts: ["localhost:5044"]
  • Redis:从redis中获取事件
redis {
                host => "127.0.0.1"
                port => "6379"
                password => "passwd"
                db => "1"
                data_type => "list"
                key => "redis_key"
                batch_count => 1 
    }

output插件

指定logstash事件的接收源

下面列举几种常见的接受源

  • stdout:标准输出
output{
    stdout{
        codec => "rubydebug"
    }
}
  • file:将事件保存到文件中
file {
   path => "/var/log/logstash/%{host}/{application}
   codec => line { format => "%{message}"} }
}
  • kafka:将事件发送到kafka
kafka{
    bootstrap_servers => "localhost:9092"
    topic_id => "logstash_log"
}
  • elasticseach:将事件发送到es中
elasticsearch {
    hosts => "localhost:9200"
    index => "logstash-%{+YYYY.MM.dd}"  
}
  • redis:将事件发送到redis
redis {
        data_type => "list"
        host => "127.0.0.1"
        port => "6379"
        db => "1"
        password => "passwd"
        key => "redis_key"
}

filter过滤器插件

对事件进行中间处理,filter过滤器这里只列举gork

gork可以将非结构化日志数据解析为结构化和可查询的数据,gork的基本语法为:%{SYNTAX:SEMANTIC}

  • SYNTAXSYNTAX是与文本匹配的模式的名称,如123可以匹配的是NUMBER127.0.0.1可以匹配的是IP

注:NUMBERIP都是gork默认内置的字段,不需要我们额外编写正则表达式,gork默认内置120个预定义匹配字段:https://github.com/logstash-plugins/logstash-patterns-core/tree/master/patterns

点击收藏 | 1 关注 | 1
登录 后跟帖