浅尝系统消息预警


在现实的系统运行过程中,问题的产生几乎是不可避免的。因为我们要考虑的是在产生问题时如何将损失降到最低,如果快速发现,定位并解决问题。在这个过程中可以说时间是最重要的成本,所以我们期望系统能够具备自动预警的功能,将问题及时暴露给开发人员,如果等待用户反馈,那么此时这个问题往往已经给公司带来了实质性的损失,至少给用户留下了不良印象。在本人的工作实践中,主要通过钉钉机器人实现预警机制,依赖于钉钉的 API 将系统异常消息发送至内部钉钉群,以便开发人员及时反馈处理。当然钉钉本身只是作为一种消息发送的实现,在实际的应用场景中可以是短信通知,邮件通知等形式,本文主要从抽象层面上探讨并分享系统预警机制的思想和实现。

消息发送器抽象


从设计的抽象层面上讲,如果我们需要发送消息,那么需要一个主体,我把它叫做 MessageSender 消息发送器,由于在本文中主要基于钉钉实现,暂且将之定义为 DingTalkMessageSender,其实这是一个抽象概念,不必与某一实现相捆绑。

package com.wanshifu.user.framework.business.cache.dingtalk;

import java.util.concurrent.TimeUnit;

/**
 * 钉钉消息发送器
 * @author Ethan Zhang
 */
public interface DingTalkMessageSender {

    /**
     * 消息发送器名称,Spring上下文中保证唯一
     */
    String getName();

    /**
     * 默认顺序发送,缓存 1 天
     * @param content 消息内容
     * @param distinctions 去重特征值,不传默认为消息整体
     */
    default void send(String content, String... distinctions) {
        send(content, 1, TimeUnit.DAYS, distinctions);
    }

    /**
     * 消息顺序发送,自定义缓存时间,在缓存时间内消息不会再次发送
     * @param content 消息内容
     * @param duration 缓存时间
     * @param timeUnit 时间单位
     * @param distinctions 去重特征值,不传默认为消息整体
     */
    void send(String content, long duration, TimeUnit timeUnit, String... distinctions);

    /**
     * 消息延迟发送
     * @param content 消息内容
     * @param duration 延迟时间
     * @param timeUnit 时间单位
     * @param distinctions 去重特征值,不传默认为消息整体
     */
    void sendDelay(String content, long duration, TimeUnit timeUnit, String... distinctions);

}

这里我给 MessageSender 定义了两种消息发送方式,分别是消息顺序发送以及消息延时发送, 同时它们都支持在一定时期内保证消息的唯一发送,因为一旦系统出现故障,往往会产生大量重复性的异常信息,此时如果无节制地进行发送只会对开发人员产生干扰,因此我们需要一种去重机制,默认根据消息内容整体去重,也可以摘取消息特征值进行去重,例如我们可以将订单编号作为特征值,可以保证一个订单只发送一次消息。当然这一种机制需要设定一个时间范围,因为在时间范围以外可能是系统另一次的故障事件,我们依然需要保持关注。

默认消息发送器实现


首先我们可以为消息发送器提供一种默认实现,它应该能够满足我们系统普遍性的需求。

DefaultDingTalkMessageSender

对于一个预警消息发送器来说,我们首先可以明确几个原则:

  • 消息发送行为不应该影响主业务流程,即时消息发送失败,异常也不必对外暴露。

  • 消息发送行为可能存在一定的耗时,造成线程阻塞,同时考虑到预警消息并不要求绝对的可靠性,推荐使用异步方式发送。

  • 消息发送的频率是非均匀的,而底层的消息客户端实现可能对发送频率存在限制(例如钉钉限制为每分钟20条),因此我们往往需要一个缓冲队列暂存客户端发送的消息,同时可以设定一个周期性执行器作为消息消费者以固定频率从缓冲区取出消息执行发送操作。

  • 已发送消息的特征值需要缓存至消息中间件(此处以 Redis 为例),已满足分布式环境下的消息去重功能。

package com.wanshifu.user.framework.business.cache.dingtalk;

import com.dingtalk.api.DingTalkClient;
import com.dingtalk.api.request.OapiRobotSendRequest;
import com.dingtalk.api.response.OapiRobotSendResponse;
import lombok.Builder;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.scheduling.annotation.Async;
import org.springframework.util.Assert;
import org.springframework.util.DigestUtils;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static com.wanshifu.user.framework.business.cache.constant.CachePrefixConstant.COMMON_CACHE_PREFIX;

/**
 * 钉钉消息发送默认实现
 * 消息缓冲,定时发送
 * @author Ethan Zhang
 */
@Slf4j
public class DefaultDingTalkMessageSender implements DingTalkMessageSender {

    private static final String PREFIX = COMMON_CACHE_PREFIX + "ding-talk-message-distinct:";

    protected final String name;
    protected final RedisTemplate<Object, Object> redisTemplate;
    protected final DingTalkClient dingTalkClient;
    protected final BlockingQueue<Message> buffer;
    protected final ScheduledThreadPoolExecutor scheduler;

    public DefaultDingTalkMessageSender(String name, RedisTemplate<Object, Object> redisTemplate, DingTalkClient dingTalkClient) {
        Assert.hasText(name, "DingTalkMessageSender name can not be empty!");
        this.name = name;
        this.redisTemplate = redisTemplate;
        this.dingTalkClient = dingTalkClient;
        this.buffer = new PriorityBlockingQueue<>(1 << 6, Comparator.comparing(Message::getTimestamp));
        this.scheduler = new ScheduledThreadPoolExecutor(Runtime.getRuntime().availableProcessors());
    }

    @PostConstruct
    public void init() {
        scheduler.scheduleAtFixedRate(() -> Optional.ofNullable(buffer.poll()).ifPresent(this::send), 5, 5, TimeUnit.SECONDS);
    }

    @PreDestroy
    public void destroy() {
        scheduler.shutdown();
    }

    @Override
    public String getName() {
        return name;
    }

    @Async
    @Override
    public void send(String content, long duration, TimeUnit timeUnit, String... distinctions) {
        long timestamp = System.currentTimeMillis();
        scheduler.execute(() -> {
            Message message = Message.builder()
                    .timestamp(timestamp)
                    .content(content)
                    .expire(timeUnit.toMillis(duration))
                    .distinctions(convertToSet(distinctions))
                    .build();
            if (!buffer.contains(message)) {
                buffer.offer(message);
            }
        });
    }

    @Override
    public void sendDelay(String content, long duration, TimeUnit timeUnit, String... distinctions) {
        scheduler.schedule(() -> send(Message.builder()
                .timestamp(System.currentTimeMillis())
                .content(content)
                .expire(timeUnit.toMillis(duration))
                .distinctions(convertToSet(distinctions))
                .build()), duration, timeUnit);
    }

    protected void send(Message message) {
        if (checkIfSent(message)) {
            return;
        }
        if (sendDingTalkMessage(dingTalkClient, message.getContent())) {
            markSent(message);
        }
    }

    protected boolean sendDingTalkMessage(DingTalkClient client, String content) {
        OapiRobotSendRequest req = new OapiRobotSendRequest();
        req.setMsgtype("text");
        OapiRobotSendRequest.Text text = new OapiRobotSendRequest.Text();
        text.setContent(content);
        req.setText(text);
        OapiRobotSendRequest.At at = new OapiRobotSendRequest.At();
        at.setIsAtAll("true");
        req.setAt(at);
        try {
            OapiRobotSendResponse res = client.execute(req);
            return res.isSuccess();
        } catch (Exception e) {
            log.error("send ding talk message error!", e);
            return false;
        }
    }

    protected void markSent(Message message) {
        Long size = redisTemplate.opsForHash().size(getRedisKey());
        boolean putRes = redisTemplate.opsForHash().putIfAbsent(getRedisKey(), message.getKey(), 1);
        if (0 == size && putRes) {
            redisTemplate.expire(getRedisKey(), message.getExpire(), TimeUnit.MILLISECONDS);
        }
    }

    protected boolean checkIfSent(Message message) {
        try {
            return redisTemplate.opsForHash().hasKey(getRedisKey(), message.getKey());
        } catch (Exception e) {
            log.error("check if send error", e);
            return false;
        }
    }

    protected String getRedisKey() {
        return PREFIX + name;
    }

    private Set<String> convertToSet(String... src) {
        if (null == src || src.length == 0) {
            return Collections.emptySet();
        }
        return Arrays.stream(src).collect(Collectors.toSet());
    }

    @Getter
    @Setter
    @Builder
    public static class Message {

        private long timestamp;
        private String content;
        private long expire;
        private Set<String> distinctions;

        String getKey() {
            return CollectionUtils.isNotEmpty(distinctions) ? encode(String.join("", distinctions)) : encode(content);
        }

        String encode(String content) {
            return DigestUtils.md5DigestAsHex(content.getBytes(StandardCharsets.UTF_8));
        }

        @Override
        public boolean equals(Object o) {
            if (this == o) return true;
            if (o == null || getClass() != o.getClass()) return false;
            Message message = (Message) o;
            return this.getKey().equals(message.getKey());
        }

        @Override
        public int hashCode() {
            return Objects.hash(getKey());
        }

    }

}

消息分片发送


MultipartDingTalkMessageSender

在某些场景下,我们需要将统一来源的消息发送至不同的目的地,例如不同的钉钉群,不同的邮件地址或是不同的手机号,如果我们定义为一个 client 指向固定的一个目的地,那么我们需要将消息以某种规则分片到不同的 client,决定它们的走向。因此一个 DingTalkMessageSender 需要持有多个 client,同时多个 client 具有类型负载均衡的作用,假设单个 client 的消息发送频率有限制,那么我们可以组合多个 client 支撑大量的消息发送需求。那么消息如何才能合理分配呢?我们可以定义一种 clientselector 机制,利用选择器选出当前最为恰当的客户端进行消息发送,这一选择是可以交给开发人员自定义的,当前也会提供默认实现。

package com.wanshifu.user.framework.business.cache.dingtalk;

import com.dingtalk.api.DingTalkClient;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.util.Assert;

import javax.annotation.PostConstruct;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;

/**
 * 消息分片发送
 * 当消息量大,单个钉钉机器人的发送频率无法满足时,可将消息分片到多个机器人分别发送
 * @author Ethan Zhang
 */
@Slf4j
@Setter
public class MultipartDingTalkMessageSender extends DefaultDingTalkMessageSender {

    private static final int MAX_SEND_MESSAGE_PER_MINUTE = 20;

    private final ConcurrentMap<DingTalkClient, Cache<Long, String>> clients;

    private Caffeine<Object, Object> caffeine = Caffeine.newBuilder()
            .expireAfterWrite(1, TimeUnit.MINUTES)
            .maximumSize(MAX_SEND_MESSAGE_PER_MINUTE);

    private DingTalkClientSelector selector = clients -> clients.entrySet().stream()
            .filter(entry -> entry.getValue().estimatedSize() < MAX_SEND_MESSAGE_PER_MINUTE)
            .map(Map.Entry::getKey).findAny().orElse(null);

    public MultipartDingTalkMessageSender(String name, RedisTemplate<Object, Object> redisTemplate, Collection<DingTalkClient> dingTalkClients) {
        super(name, redisTemplate, null);
        Assert.notEmpty(dingTalkClients, "dingTalkClients can not be empty!");
        clients = new ConcurrentHashMap<>();
        dingTalkClients.forEach(dingTalkClient -> clients.put(dingTalkClient, caffeine.build()));
    }

    @PostConstruct
    @Override
    public void init() {
        scheduler.scheduleAtFixedRate(() -> {
            try {
                Optional.ofNullable(buffer.poll(5, TimeUnit.SECONDS)).ifPresent(this::send);
            } catch (InterruptedException e) {
                log.error("scheduler has bean interrupted!", e);
            }
        }, 5000, 200, TimeUnit.MILLISECONDS);
    }

    @Override
    protected void send(Message message) {
        if (checkIfSent(message)) {
            return;
        }
        DingTalkClient client = selector.select(clients);
        if (Objects.isNull(client)) {
            return;
        }
        if (sendDingTalkMessage(client, message.getContent())) {
            markSent(message);
            clients.get(client).put(message.getTimestamp(), message.getKey());
        }
    }

    /**
     * 客户端选择器,可自定义
     */
    @FunctionalInterface
    public interface DingTalkClientSelector {

        DingTalkClient select(ConcurrentMap<DingTalkClient, Cache<Long, String>> clients);

    }

}

消息合并发送


在某些场景下,通过增大消息发送频率来消费大量消息并不是一种良好的选择,频率的异常预警对于开发人员是一种干扰,长时间的报警可能造成麻木性,失去预警本身的意义,同时我们也不希望重要消息丢失,因此我们可以将一段时间内的消息收集合并发送,例如收集一小时内系统产生的异常信息,合并后一次发送,这样既不会使我们丢失重要信息,也不会带来频繁报警的困扰,当然从消息的即时性上来说有一定的损失。

MergeDingTalkMessageSender

消息的合并逻辑我们可以提供一种抽象叫做 merger,可以提供给开发人员扩展,在这里我仅提供了一种字符串合并的实现,从更为广义的角度去思考,我们是否可以可以将消息合并为文件发送?这都是我们可以根据特定业务场景去尝试的实现。

package com.wanshifu.user.framework.business.cache.dingtalk;

import com.dingtalk.api.DingTalkClient;
import lombok.Setter;
import org.apache.commons.lang3.StringUtils;
import org.springframework.data.redis.core.RedisTemplate;

import javax.annotation.PostConstruct;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
 * 消息合并发送
 * 消息量大,频繁发送影响体验,可定期合并消息发送
 * @author Ethan Zhang
 */
@Setter
public class MergeDingTalkMessageSender extends DefaultDingTalkMessageSender {

    /**
     * 发送间隔,默认 5s
     */
    private long period = Duration.ofSeconds(5).toMillis();

    private MessageMerger merger = messages -> messages.stream()
            .distinct()
            .filter(message -> !checkIfSent(message))
            .map(message -> String.format("{%s}-{%s}\n\n", formatTimestamp(message.getTimestamp()), message.getContent()))
            .collect(Collectors.joining());

    public MergeDingTalkMessageSender(String name, RedisTemplate<Object, Object> redisTemplate, DingTalkClient dingTalkClient) {
        super(name, redisTemplate, dingTalkClient);
    }

    @PostConstruct
    @Override
    public void init() {
        scheduler.scheduleAtFixedRate(() -> {
            if (buffer.isEmpty()) {
                return;
            }
            List<Message> messages = new ArrayList<>(buffer.size());
            buffer.drainTo(messages);
            sendMerge(messages);
        }, 5000, period, TimeUnit.MILLISECONDS);
    }

    protected void sendMerge(List<Message> messages) {
        String mergeContent = merger.merge(messages);
        if (StringUtils.isBlank(mergeContent)) {
            return;
        }
        if (sendDingTalkMessage(dingTalkClient, mergeContent)) {
            markSentMerge(messages);
        }
    }

    protected void markSentMerge(List<Message> messages) {
        Long size = redisTemplate.opsForHash().size(getRedisKey());
        redisTemplate.opsForHash().putAll(getRedisKey(),
                messages.stream().collect(Collectors.toMap(Message::getKey, message -> 1)));
        if (0 == size) {
            redisTemplate.expire(getRedisKey(),
                    (long) messages.stream().mapToLong(Message::getExpire).average().orElse(0d),
                    TimeUnit.MILLISECONDS);
        }
    }

    private String formatTimestamp(long timestamp) {
        return LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneId.systemDefault()).format(DateTimeFormatter.ISO_DATE_TIME);
    }

    /**
     * 消息合并器,可自定义
     */
    @FunctionalInterface
    public interface MessageMerger {

        String merge(List<Message> messages);

    }

}

文章作者: Ethan Zhang
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Ethan Zhang !
评论
 上一篇
Spring Security(一)身份认证 Spring Security(一)身份认证
随着互联网技术的蓬勃发展,正所谓道高一尺,魔高一丈,黑客技术也得到了迅猛的发展。黑客们利用各大网站的安全漏洞实施着各种出人意料的攻击,例如常见的 XSS 跨站脚本攻击,CSRF 跨站请求伪造等方式。众所周知,数据是互联网公司最重要的核心资
2020-11-27 Ethan Zhang
下一篇 
Spring Cache 缓存抽象 Spring Cache 缓存抽象
要构建一个高性能高吞吐量的系统,缓存是我们必须考虑的问题。几乎可以这样说,任何一个系统都需要 N 个缓存层来解决数据交互中的 IO 问题,因为磁盘 IO 和内存 IO 在耗时上不是一个数量级的,我们总是更倾向于去读写内存而非磁盘,但内存资
2020-11-17 Ethan Zhang