• -------------------------------------------------------------
  • ====================================

sentinel控制台监控数据持久化【InfluxDB】

技能 dewbay 6年前 (2019-05-08) 3852次浏览 已收录 0个评论 扫描二维码

根据官方 wiki 文档,sentinel控制台的实时监控数据,默认仅存储 5 分钟以内的数据。如需持久化,需要定制实现相关接口。

https://github.com/alibaba/Sentinel/wiki/在生产环境中使用-Sentinel-控制台 也给出了指导步骤:

1.自行扩展实现 MetricsRepository 接口;

2.注册成 Spring Bean 并在相应位置通过 @Qualifier 注解指定对应的 bean name 即可。

本文使用时序数据库 InfluxDB 来进行持久化,从下载开始,一步步编写一个基于 InfluxDB 的存储实现。

—————————————————————————————————————————————————————————————–

InfluxDB 官网:https://www.influxdata.com

关键词:

高性能时序数据库

go 语言编写没有外部依赖

支持 HTTP API 读写

支持类 SQL 查询语法

通过数据保留策略(Retention Policies)支持自动清理历史数据

通过连续查询(Continuous Queries)支持数据归档

最新版本:1.6.4

下载

windows:wget https://dl.influxdata.com/influxdb/releases/influxdb-1.6.4_windows_amd64.zip

linux:wget https://dl.influxdata.com/influxdb/releases/influxdb-1.6.4_linux_amd64.tar.gz

注:windows 下载安装 wget  https://eternallybored.org/misc/wget/

在 windows 环境,解压 zip 文件至 D:\influxdb\influxdb-1.6.4-1 目录:

sentinel控制台监控数据持久化【InfluxDB】

打开 cmd 命令行窗口,在 D:\influxdb\influxdb-1.6.4-1 执行命令启动 influxdb 服务端:influxd

sentinel控制台监控数据持久化【InfluxDB】

 再打开一个 cmd 窗口,在目录下输入 influx 启动客户端: // 后面可以带上参数:-precision rfc3339 指定时间格式显示

sentinel控制台监控数据持久化【InfluxDB】

show databases 发现只有系统的 2 个数据库,这里我们新建一个 sentinel_db,输入命令:create database sentinel_db

sentinel控制台监控数据持久化【InfluxDB】

use sentinel_db  使用 sentinel_db 数据库

show measurements  查看数据库中的数据表(measurement)

sentinel控制台监控数据持久化【InfluxDB】

可以看到,这几个 InfluxDB 命令跟 MySQL 很相似。

==============================================================

InfluxDB 名词概念:

database:数据库 // 关系数据库的 database

measurement:数据库中的表 // 关系数据库中的 table

point:表里的一行数据 // 关系数据库中的 row

point 由 3 部分组成:

time:每条数据记录的时间,也是数据库自动生成的主索引;// 类似主键

fields:各种记录的值;// 没有索引的字段

tags:各种有索引的属性 // 有索引的字段

==============================================================

在官方 github 上,有一个 java 的客户端库:

在 sentinel-dashboard 的 pom.xml 中,加入 maven 依赖:

<dependency>
    <groupId>org.influxdb</groupId>
    <artifactId>influxdb-java</artifactId>
    <version>2.14</version>
</dependency>

封装一个工具类:存储 InfluxDB 连接信息以及方便调用

sentinel控制台监控数据持久化【InfluxDB】
/**
 * @author cdfive
 * @date 2018-10-19
 */
@Component
public class InfluxDBUtils {

    private static Logger logger = LoggerFactory.getLogger(InfluxDBUtils.class);

    private static String url;

    private static String username;

    private static String password;

    private static InfluxDBResultMapper resultMapper = new InfluxDBResultMapper();

    @Value("${influxdb.url}")
    public void setUrl(String url) {
        InfluxDBUtils.url = url;
    }

    @Value("${influxdb.username}")
    public void setUsername(String username) {
        InfluxDBUtils.username = username;
    }

    @Value("${influxdb.password}")
    public void setPassword(String password) {
        InfluxDBUtils.password = password;
    }

    public static void init(String url, String username, String password) {
        InfluxDBUtils.url = url;
        InfluxDBUtils.username = username;
        InfluxDBUtils.password = password;
    }

    public static <T> T process(String database, InfluxDBCallback callback) {
        InfluxDB influxDB = null;
        T t = null;
        try {
            influxDB = InfluxDBFactory.connect(url, username, password);
            influxDB.setDatabase(database);

            t = callback.doCallBack(database, influxDB);
        } catch (Exception e) {
            logger.error("[process exception]", e);
        } finally {
            if (influxDB != null) {
                try {
                    influxDB.close();
                } catch (Exception e) {
                    logger.error("[influxDB.close exception]", e);
                }
            }
        }

        return t;
    }

    public static void insert(String database, InfluxDBInsertCallback influxDBInsertCallback) {
        process(database, new InfluxDBCallback() {
            @Override
            public <T> T doCallBack(String database, InfluxDB influxDB) {
                influxDBInsertCallback.doCallBack(database, influxDB);
                return null;
            }
        });

    }

    public static QueryResult query(String database, InfluxDBQueryCallback influxDBQueryCallback) {
        return process(database, new InfluxDBCallback() {
            @Override
            public <T> T doCallBack(String database, InfluxDB influxDB) {
                QueryResult queryResult = influxDBQueryCallback.doCallBack(database, influxDB);
                return (T) queryResult;
            }
        });
    }

    public static <T> List<T> queryList(String database, String sql, Map<String, Object> paramMap, Class<T> clasz) {
        QueryResult queryResult = query(database, new InfluxDBQueryCallback() {
            @Override
            public QueryResult doCallBack(String database, InfluxDB influxDB) {
                BoundParameterQuery.QueryBuilder queryBuilder = BoundParameterQuery.QueryBuilder.newQuery(sql);
                queryBuilder.forDatabase(database);

                if (paramMap != null && paramMap.size() > 0) {
                    Set<Map.Entry<String, Object>> entries = paramMap.entrySet();
                    for (Map.Entry<String, Object> entry : entries) {
                        queryBuilder.bind(entry.getKey(), entry.getValue());
                    }
                }

                return influxDB.query(queryBuilder.create());
            }
        });

        return resultMapper.toPOJO(queryResult, clasz);
    }

    public interface InfluxDBCallback {
        <T> T doCallBack(String database, InfluxDB influxDB);
    }

    public interface InfluxDBInsertCallback {
        void doCallBack(String database, InfluxDB influxDB);
    }

    public interface InfluxDBQueryCallback {
        QueryResult doCallBack(String database, InfluxDB influxDB);
    }
}
sentinel控制台监控数据持久化【InfluxDB】

其中:

url、username、password 用于存储 InfluxDB 的连接、用户名、密码信息,定义为 static 属性,因此在 set 方法上使用@Value 注解从配置文件读取属性值;

resultMapper 用于查询结果到实体类的映射;

init 方法用于初始化 url、username、password;

process 为通用的处理方法,负责打开关闭连接,并且调用 InfluxDBCallback 回调方法;

insert 为插入数据方法,配合 InfluxDBInsertCallback 回调使用;

query 为通用的查询方法,配合 InfluxDBQueryCallback 回调方法使用,返回 QueryResult 对象;

queryList 为查询列表方法,调用 query 得到 QueryResult,再通过 resultMapper 转换为 List<实体类>;

在 resources 目录下的 application.properties 文件中,增加 InfluxDB 的配置: 

influxdb.url=${influxdb.url}
influxdb.username=${influxdb.username}
influxdb.password=${influxdb.password}

用${xxx}占位符,这样可以通过 maven 的 pom.xml 添加 profile 配置不同环境(开发、测试、生产) 或 从配置中心读取参数。

在 datasource.entity 包下,新建 influxdb 包,下面新建 sentinel_metric 数据表(measurement)对应的实体类 MetricPO:

sentinel控制台监控数据持久化【InfluxDB】
package com.taobao.csp.sentinel.dashboard.datasource.entity.influxdb;

import org.influxdb.annotation.Column;
import org.influxdb.annotation.Measurement;

import java.time.Instant;

/**
 * @author cdfive
 * @date 2018-10-19
 */
@Measurement(name = "sentinel_metric")
public class MetricPO {

    @Column(name = "time")
    private Instant time;

    @Column(name = "id")
    private Long id;

    @Column(name = "gmtCreate")
    private Long gmtCreate;

    @Column(name = "gmtModified")
    private Long gmtModified;

    @Column(name = "app", tag = true)
    private String app;

    @Column(name = "resource", tag = true)
    private String resource;

    @Column(name = "passQps")
    private Long passQps;

    @Column(name = "successQps")
    private Long successQps;

    @Column(name = "blockQps")
    private Long blockQps;

    @Column(name = "exceptionQps")
    private Long exceptionQps;

    @Column(name = "rt")
    private double rt;

    @Column(name = "count")
    private int count;

    @Column(name = "resourceCode")
    private int resourceCode;

    // getter setter 省略
}
sentinel控制台监控数据持久化【InfluxDB】

该类参考 MetricEntity 创建,加上 influxdb-java 包提供的注解,通过@Measurement(name = “sentinel_metric”)指定数据表(measurement)名称,

time 作为时序数据库的时间列;

app、resource 设置为 tag 列,通过注解标识为 tag=true;

其它字段为 filed 列;

接着在 InMemoryMetricsRepository 所在的 repository.metric 包下新建 InfluxDBMetricsRepository 类,实现 MetricsRepository<MetricEntity>接口:

sentinel控制台监控数据持久化【InfluxDB】
package com.taobao.csp.sentinel.dashboard.repository.metric;

import com.alibaba.csp.sentinel.util.StringUtil;
import com.taobao.csp.sentinel.dashboard.datasource.entity.MetricEntity;
import com.taobao.csp.sentinel.dashboard.datasource.entity.influxdb.MetricPO;
import com.taobao.csp.sentinel.dashboard.util.InfluxDBUtils;
import org.apache.commons.lang.time.DateFormatUtils;
import org.apache.commons.lang.time.DateUtils;
import org.influxdb.InfluxDB;
import org.influxdb.dto.Point;
import org.springframework.stereotype.Repository;
import org.springframework.util.CollectionUtils;

import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
 * metrics 数据 InfluxDB 存储实现
 * @author cdfive
 * @date 2018-10-19
 */
@Repository("influxDBMetricsRepository")
public class InfluxDBMetricsRepository implements MetricsRepository<MetricEntity> {

    /**时间格式*/
    private static final String DATE_FORMAT_PATTERN = "yyyy-MM-dd HH:mm:ss.SSS";

    /**数据库名称*/
    private static final String SENTINEL_DATABASE = "sentinel_db";

    /**数据表名称*/
    private static final String METRIC_MEASUREMENT = "sentinel_metric";

    /**北京时间领先 UTC 时间 8 小时 UTC: Universal Time Coordinated,世界统一时间*/
    private static final Integer UTC_8 = 8;

    @Override
    public void save(MetricEntity metric) {
        if (metric == null || StringUtil.isBlank(metric.getApp())) {
            return;
        }

        InfluxDBUtils.insert(SENTINEL_DATABASE, new InfluxDBUtils.InfluxDBInsertCallback() {
            @Override
            public void doCallBack(String database, InfluxDB influxDB) {
                if (metric.getId() == null) {
                    metric.setId(System.currentTimeMillis());
                }
                doSave(influxDB, metric);
            }
        });
    }

    @Override
    public void saveAll(Iterable<MetricEntity> metrics) {
        if (metrics == null) {
            return;
        }

        Iterator<MetricEntity> iterator = metrics.iterator();
        boolean next = iterator.hasNext();
        if (!next) {
            return;
        }

        InfluxDBUtils.insert(SENTINEL_DATABASE, new InfluxDBUtils.InfluxDBInsertCallback() {
            @Override
            public void doCallBack(String database, InfluxDB influxDB) {
                while (iterator.hasNext()) {
                    MetricEntity metric = iterator.next();
                    if (metric.getId() == null) {
                        metric.setId(System.currentTimeMillis());
                    }
                    doSave(influxDB, metric);
                }
            }
        });
    }

    @Override
    public List<MetricEntity> queryByAppAndResourceBetween(String app, String resource, long startTime, long endTime) {
        List<MetricEntity> results = new ArrayList<MetricEntity>();
        if (StringUtil.isBlank(app)) {
            return results;
        }

        if (StringUtil.isBlank(resource)) {
            return results;
        }

        StringBuilder sql = new StringBuilder();
        sql.append("SELECT * FROM " + METRIC_MEASUREMENT);
        sql.append(" WHERE app=$app");
        sql.append(" AND resource=$resource");
        sql.append(" AND time>=$startTime");
        sql.append(" AND time<=$endTime");

        Map<String, Object> paramMap = new HashMap<String, Object>();
        paramMap.put("app", app);
        paramMap.put("resource", resource);
        paramMap.put("startTime", DateFormatUtils.format(new Date(startTime), DATE_FORMAT_PATTERN));
        paramMap.put("endTime", DateFormatUtils.format(new Date(endTime), DATE_FORMAT_PATTERN));

        List<MetricPO> metricPOS = InfluxDBUtils.queryList(SENTINEL_DATABASE, sql.toString(), paramMap, MetricPO.class);

        if (CollectionUtils.isEmpty(metricPOS)) {
            return results;
        }

        for (MetricPO metricPO : metricPOS) {
            results.add(convertToMetricEntity(metricPO));
        }

        return results;
    }

    @Override
    public List<String> listResourcesOfApp(String app) {
        List<String> results = new ArrayList<>();
        if (StringUtil.isBlank(app)) {
            return results;
        }

        StringBuilder sql = new StringBuilder();
        sql.append("SELECT * FROM " + METRIC_MEASUREMENT);
        sql.append(" WHERE app=$app");
        sql.append(" AND time>=$startTime");

        Map<String, Object> paramMap = new HashMap<String, Object>();
        long startTime = System.currentTimeMillis() - 1000 * 60;
        paramMap.put("app", app);
        paramMap.put("startTime", DateFormatUtils.format(new Date(startTime), DATE_FORMAT_PATTERN));

        List<MetricPO> metricPOS = InfluxDBUtils.queryList(SENTINEL_DATABASE, sql.toString(), paramMap, MetricPO.class);

        if (CollectionUtils.isEmpty(metricPOS)) {
            return results;
        }

        List<MetricEntity> metricEntities = new ArrayList<MetricEntity>();
        for (MetricPO metricPO : metricPOS) {
            metricEntities.add(convertToMetricEntity(metricPO));
        }

        Map<String, MetricEntity> resourceCount = new HashMap<>(32);

        for (MetricEntity metricEntity : metricEntities) {
            String resource = metricEntity.getResource();
            if (resourceCount.containsKey(resource)) {
                MetricEntity oldEntity = resourceCount.get(resource);
                oldEntity.addPassQps(metricEntity.getPassQps());
                oldEntity.addRtAndSuccessQps(metricEntity.getRt(), metricEntity.getSuccessQps());
                oldEntity.addBlockQps(metricEntity.getBlockQps());
                oldEntity.addExceptionQps(metricEntity.getExceptionQps());
                oldEntity.addCount(1);
            } else {
                resourceCount.put(resource, MetricEntity.copyOf(metricEntity));
            }
        }

        // Order by last minute b_qps DESC.
        return resourceCount.entrySet()
                .stream()
                .sorted((o1, o2) -> {
                    MetricEntity e1 = o1.getValue();
                    MetricEntity e2 = o2.getValue();
                    int t = e2.getBlockQps().compareTo(e1.getBlockQps());
                    if (t != 0) {
                        return t;
                    }
                    return e2.getPassQps().compareTo(e1.getPassQps());
                })
                .map(Map.Entry::getKey)
                .collect(Collectors.toList());
    }

    private MetricEntity convertToMetricEntity(MetricPO metricPO) {
        MetricEntity metricEntity = new MetricEntity();

        metricEntity.setId(metricPO.getId());
        metricEntity.setGmtCreate(new Date(metricPO.getGmtCreate()));
        metricEntity.setGmtModified(new Date(metricPO.getGmtModified()));
        metricEntity.setApp(metricPO.getApp());
        metricEntity.setTimestamp(Date.from(metricPO.getTime().minusMillis(TimeUnit.HOURS.toMillis(UTC_8))));// 查询数据减 8 小时
        metricEntity.setResource(metricPO.getResource());
        metricEntity.setPassQps(metricPO.getPassQps());
        metricEntity.setSuccessQps(metricPO.getSuccessQps());
        metricEntity.setBlockQps(metricPO.getBlockQps());
        metricEntity.setExceptionQps(metricPO.getExceptionQps());
        metricEntity.setRt(metricPO.getRt());
        metricEntity.setCount(metricPO.getCount());

        return metricEntity;
    }

    private void doSave(InfluxDB influxDB, MetricEntity metric) {
        influxDB.write(Point.measurement(METRIC_MEASUREMENT)
                .time(DateUtils.addHours(metric.getTimestamp(), UTC_8).getTime(), TimeUnit.MILLISECONDS)// 因 InfluxDB 默认 UTC 时间,按北京时间算写入数据加 8 小时
                .tag("app", metric.getApp())
                .tag("resource", metric.getResource())
                .addField("id", metric.getId())
                .addField("gmtCreate", metric.getGmtCreate().getTime())
                .addField("gmtModified", metric.getGmtModified().getTime())
                .addField("passQps", metric.getPassQps())
                .addField("successQps", metric.getSuccessQps())
                .addField("blockQps", metric.getBlockQps())
                .addField("exceptionQps", metric.getExceptionQps())
                .addField("rt", metric.getRt())
                .addField("count", metric.getCount())
                .addField("resourceCode", metric.getResourceCode())
                .build());
    }
}
sentinel控制台监控数据持久化【InfluxDB】

其中:

save、saveAll 方法通过调用 InfluxDBUtils.insert 和 InfluxDBInsertCallback 回调方法,往 sentinel_db 库的 sentinel_metric 数据表写数据;

saveAll 方法不是循环调用 save 方法,而是在回调内部循环 Iterable<MetricEntity> metrics 处理,这样 InfluxDBFactory.connect 连接只打开关闭一次;

doSave 方法中,.time(DateUtils.addHours(metric.getTimestamp(), 8).getTime(), TimeUnit.MILLISECONDS)

因 InfluxDB 的 UTC 时间暂时没找到修改方法,所以这里 time 时间列加了 8 个小时时差;

queryByAppAndResourceBetween、listResourcesOfApp 里面的查询方法,使用 InfluxDB 提供的类 sql 语法,编写查询语句即可。

最后一步,在 MetricController、MetricFetcher 两个类,找到 metricStore 属性,在@Autowired 注解上面加上@Qualifier(“jpaMetricsRepository”)注解:

@Qualifier("influxDBMetricsRepository")
@Autowired
private MetricsRepository<MetricEntity> metricStore;

来验证下成果:

设置 sentinel-dashboard 工程启动参数:-Dserver.port=8080 -Dcsp.sentinel.dashboard.server=localhost:8080 -Dproject.name=sentinel-dashboard

启动工程,打开 http://localhost:8080,查看各页面均显示正常,

在命令行通过 InfluxDB 客户端命令,show measurements,可以看到已经生成了 sentinel_metric 数据表(measurement);

查询总数:select count(id) from sentinel_metric

查询最新 5 行数据:select * from sentinel_metric order by time desc limit 5

注:命令行语句结束不用加分号

—————————————————————————————————————————————————————————————–

代码参考:https://github.com/cdfive/Sentinel/tree/winxuan_develop/sentinel-dashboard

扩展:

1.考虑以什么时间维度归档历史数据;

2.结合 grafana 将监控数据进行多维度的统计和呈现。

—————————————————————————————————————————————————————————————–

参考:

Sentinel官方文档:

https://github.com/alibaba/Sentinel/wiki/在生产环境中使用-Sentinel控制台

InfluxDB 官网文档 https://docs.influxdata.com/influxdb/v1.6/introduction/getting-started/

InfluxDB 简明手册 https://xtutu.gitbooks.io/influxdb-handbook/content/


露水湾 , 版权所有丨如未注明 , 均为原创丨本网站采用BY-NC-SA协议进行授权
转载请注明原文链接:sentinel控制台监控数据持久化【InfluxDB】
喜欢 (0)
[]
分享 (0)
关于作者:
发表我的评论
取消评论

表情 贴图 加粗 删除线 居中 斜体 签到

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址