博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
基于flume+kafka+logstash+es的分布式日志系统
阅读量:4294 次
发布时间:2019-05-27

本文共 5395 字,大约阅读时间需要 17 分钟。

本文将从以下几点讲解的分布式日志系统

1.日志埋点

2.日志收集

3.日志处理分析

4.日志查询展示

 

先看一下日志数据流程图:

flume监听日志文件收集每行日志发到kafka,logstash消费kafka中的消息将日志解析成json插入到es,es提供日志查询

1.日志埋点

由于我们项目结构是网关+dubbo服务实例的分布式系统,埋点主要在http的网关和rpc的服务实例。

网关我们在BaseActionServlet类中记录了请求和响应日志。在一个请求进来的时候我们创建一个上下文对象DistributedContext保存当前线程的logId(唯一)及一些业务相关参数,在rpc调用的时候将这个上下文放在dubbo的RpcContext中在整个调用链中传递,使整个调用链能够通过logId串联。

ConsumerSetContextFilter

1

2

3

4

5

6

7

8

9

10

@Activate(group = Constants.CONSUMER)

public class ConsumerSetContextFilter implements Filter {

    @Override

    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {

        // 消费者将上下文对象放在RpcContext中

        RpcContext.getContext().setAttachment("context", ObjectUtil.toString(DistributedContext.getContext()));

        Result result = invoker.invoke(invocation);

        return result;

    }

}

ProviderSetContextFilter

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

@Activate(group = Constants.PROVIDER)

public class ProviderSetContextFilter implements Filter {

    @Override

    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {

        DistributedContext context = null;

        // 提供者从RpcContext中取出上下文对象

        String contextJson = RpcContext.getContext().getAttachment("context");

        if (StringUtil.isNullOrBlank(contextJson)) {

            DistributedContext.start();

            context = DistributedContext.getContext();

        } else {

            context = GsonUtil.fromJson(contextJson, DistributedContext.class);

        }

        // 将上下文对象放在当前线程ThreadLocal中

        DistributedContext.setContext((DistributedContext)context);

        Result result = invoker.invoke(invocation);

        return result;

    }

}

 

 

2.日志收集

日志收集我们用的是apach的flume: ,  它的source+channel+sink的设计非常适合日志收集

Agent component diagram

                                                            flume数据模型

完全与业务系统解耦,flume宕机不影响系统运行。

我们在每台宿主机中装上flume,配置好agent的source channel 和 sink:

a1.sources = r1 r2 r3 a1.sinks = k1a1.channels = c1#source配置监听文件a1.sources.r1.type = TAILDIRa1.sources.r1.positionFile = /data/xx/flume_taildir_position/gateway.jsona1.sources.r1.filegroups = f1a1.sources.r1.filegroups.f1 = /data/xx/gateway/gateway.loga1.sources.r2.type = TAILDIRa1.sources.r2.positionFile = /data/xx/flume_taildir_position/demo.jsona1.sources.r2.filegroups = f2a1.sources.r2.filegroups.f2 = /data/xx/demo/demo.loga1.sources.r3.type = TAILDIRa1.sources.r3.positionFile = /data/xx/flume_taildir_position/authority.jsona1.sources.r3.filegroups = f3a1.sources.r3.filegroups.f3 = /data/xx/authority/authority.log#channel配置a1.channels.c1.type = filea1.channels.c1.checkpointDir = /data/soft/apache-flume-1.7.0-bin/channel_dir/checkpointa1.channels.c1.dataDirs = /data/soft/apache-flume-1.7.0-bin/channel_dir/data#sink配置,将数据丢到kafkaa1.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSinka1.sinks.k1.brokerList=10.200.xxx.xxx:9092a1.sinks.k1.topic=dcv6_loga1.sinks.k1.serializer.class=kafka.serializer.StringEncoder#source与channel绑定a1.sources.r1.channels = c1a1.sources.r2.channels = c1a1.sources.r3.channels = c1#channel与sink绑定a1.sinks.k1.channel = c1

flume的TailDir收集方式支持断点续传,他会将监听的文件inode值与行数持久化在文件中,重启后也可以从上一次监听的位置开始收集。

如果kafka宕机,flume也会将数据持久化到channel配置的file中,这样可以很好的保证数据不会轻易丢失。

 

 

3.日志处理分析

logstash可以将日志格式化为json,方便我们处理日志的存取。因此用logstash消费kafka中的数据保存到es中。

如果logstash消费速度小于日志生成的速度,只需再起一个logstash进程去消费即可,这个拓展性要好一点。

logstash配置如下:

input {        kafka {                zk_connect => "10.200.100.xx1:2181,10.200.100.xx2:2181,10.200.100.xx3:2181"                group_id => "g1"                 topic_id => "dcv6_log"                fetch_message_max_bytes => 20480000                reset_beginning => false # boolean (optional), default: false                  consumer_threads => 20  # number (optional), default: 1                  decorate_events => true # boolean (optional), default: false                  type => "ORTHER"                codec => plain        }}filter{        grok {                match => {"message" => "\[(?
\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}.\d{3})\] \[%{DATA:level}\] \[%{DATA:className}\] \[%{DATA:methodName}\] \[%{DATA:thread}\] \[%{GREEDYDATA:msg}\] \[%{NUMBER:clientType:int}\] \[%{NUMBER:step:int}\] \[%{NUMBER:flag:int}\] \[%{DATA:ip}\] \[%{DATA:clientIp}\] \[%{NUMBER:costTime:int}\] \[%{DATA:isErr}\] \[%{DATA:errName}\] \[%{DATA:logId}\] \[%{DATA:sysName}\] \[%{DATA:actionName}\] \[%{DATA:apiName}\]"} remove_field => ["message"] #分析完之后,原始数据就可以被删掉了。原始数据占用的字段叫做message。 remove_field => "kafka" } if [actionName] == "xx.log.getReqLog" { drop {} } if [actionName] == "xx.log.getBusinessLog" { drop {} } if[flag]==1{ mutate { replace => { type => "REQUEST" } } } else if[flag]==2{ mutate { replace => { type => "RESPONSE" } } } else if[flag]==3{ mutate { replace => { type => "EXCEPTION" } } } else if[flag]==4{ mutate { replace => { type => "BUSINESS" } } } else if[flag]==5{ mutate { replace => { type => "INNER_API" } } } else { mutate { replace => { type => "OTHER" } } } date { match => [ "createTime", "yyyy-MM-dd HH:mm:ss.SSS" ] locale => "cn" timezone=>"+00:00" }}output { elasticsearch { hosts => ["10.200.100.xx6:9200"] index=>"dcv6-%{+YYYY.MM.dd}" } #stdout { codec => rubydebug }}

 

 

4.日志查询展示

这个利用es的java api即可编写日志查询接口。

 

 

目前该分布式日志系统中平均每天2亿条日志(100+G)的数据量,高峰期日志消费速率会延迟10分钟左右。

es的查询较慢,可以考虑做下集群。

 

转载地址:http://riuws.baihongyu.com/

你可能感兴趣的文章
海龟交易法则08_风险与资金管理
查看>>
海龟交易法则09_海龟式积木
查看>>
海龟交易法则10_通用积木
查看>>
海龟交易法则14_掌控心魔
查看>>
海龟交易法则15_万事俱备
查看>>
海龟交易法则16_附原版海龟交易法则
查看>>
克罗谈投资策略01_期货交易中的墨菲法则
查看>>
克罗谈投资策略02_赢家和输家
查看>>
克罗谈投资策略03_你所期望的赌博方式
查看>>
克罗谈投资策略04_感觉与现实
查看>>
通向财务自由之路01_导读
查看>>
通向财务自由之路02_成功的决定因素:你
查看>>
中低频量化交易策略研发01_引言
查看>>
中低频量化交易策略研发06_推进的择时策略
查看>>
史丹·温斯坦称傲牛熊市的秘密
查看>>
期货市场技术分析01_理论基础
查看>>
期货市场技术分析02_趋势的基本概念
查看>>
期货市场技术分析03_主要反转形态
查看>>
期货市场技术分析04_持续形态
查看>>
期货市场技术分析05_交易量和持仓兴趣
查看>>