锁之浅见


在计算机科学中,锁是在执行多线程时用于强行限制资源访问的同步机制,即用于在并发控制中保证对互斥要求的满足。一般的锁是建议锁(advisory lock),每个线程在访问对应资源前都需获取锁的信息,再根据信息决定是否可以访问。若访问对应信息,锁的状态会改变为锁定,因此其他线程此时不会访问该资源,当资源结束后,会恢复锁的状态,允许其他线程的访问。有些系统有强制锁(mandatory lock),若有未授权的线程想要访问锁定的数据,在访问时就会产生异常。

以上是维基百科中对于锁的定义,在现代化软件尤其是互联网行业中,系统的吞吐量成为技术衡量的重要指标,因为并发编程的广泛使用几乎是必然的。当我们享受着并发编程带来的更高硬件利用率以及更好用户体验的同时,针对共享资源的访问控制成为一个绕不开的话题。有时候我们甚至需要牺牲一定的并发性来保障数据的一致性和可靠性,一个优秀的系统不是偏执的,而是在多种因素之间取得一种完美的平衡,如何优雅地给程序加锁需要我们去探索和思考。

Java 中 ReentrantLock 设计带来的启发


我们知道 ReentrantLockJDK 在代码层面实现的锁,而它是通过扩展大名鼎鼎的 AQS 实现的,众所周知 AQSJava 为我们提供的一种同步器抽象,本身利用了 Template Method Pattern 的设计思想,通过扩展 AQS 我们可以方便地实现自定义的同步器,而锁本身也只是一种较为特殊的同步器而已,Java 中例如 CountDownLatchCyclicBarrierSemaphore 这样的经典同步器都不外乎是通过扩展 AQS 实现,为我们在并发编程中提供了强大的功能支持。通过探究 ReentrantLock 的源码设计我们可以感悟到锁的本质是什么?因为分布锁虽然在网络通信上更具有其复杂性,在作为锁本身的逻辑上是殊途同归的。

锁的试探性获取

public boolean tryLock() {
    return sync.nonfairTryAcquire(1);
}

ReentrantLock#tryLock() 方法代表对锁的试探性获取,这个操作是非阻塞的,直接返回获取结果。ReentrantLock 内部有个两个同步器的实现,分别为 NonfairSyncFairSync,它们都继承于 Sync,而 tryLock 最终调用的是同一个方法。

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

因为对于这个操作,并没有公平锁和非公平锁之分。如果当前 AQS 中的 state 为 0,则通过 compareAndSetState 尝试竞争锁即可。否则判断锁是否被当前线程持有,若当前线程已持有该锁则对 state 累加操作,这是对可重入性的支持,否则本次竞争失败。

锁的阻塞式获取(有限等待)

public boolean tryLock(long timeout, TimeUnit unit)
        throws InterruptedException {
    return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}

在实际的业务中如果某项资源具有竞争性,而我们的业务操作又是必须完成的,那么我们往往会倾向于阻塞式地去获取锁,如果锁已被其他线程持有,当前线程会进入阻塞状态直到所被它的拥有者释放重新进入竞争。

public final boolean tryAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquire(arg) ||
        doAcquireNanos(arg, nanosTimeout);
}

跟踪代码我们发现首先依然会尝试使用非阻塞式方式去竞争一次锁,若竞争失败,则调用 doAcquireNanos 方法。该方法是在 AQS 中实现的,是加锁的核心逻辑所在。

private boolean doAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (nanosTimeout <= 0L)
        return false;
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return true;
            }
            nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0L)
                return false;
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

该方法的主要逻辑如下:

  1. 计算 deadline 最后期限,在循环中判断,一旦当前时间超过最后期限则本次锁竞争失败。
  2. 将当前线程构建为 Node 节点,进行入队操作,并返回该节点。
  3. 取到当前节点的前驱节点,并判断前驱节点是否为链表头节点,若为链表头节点,表示当前线程有资格竞争锁,这很大程度上是为了避免不必要的重试。
  4. 判断 tryAcquire 方法的执行结果,该方法由 AQS 子类实现,这也是公平锁与非公平锁的根本实现差异所在,分别在 NonfairSyncFairSync 中有各自实现。两者的主要区别在于公平锁需要判断当前链表头节点之后是否还有线程在竞争锁,因为公平锁需要遵循先到先得的顺序依次获取锁。
  5. 若成功获取锁,则将当前节点设置为头结点,返回。
  6. shouldParkAfterFailedAcquire 方法判断当前线程是否需要被挂起(只要当前节点前已有线程被挂起,则当前线程也需要被挂起),Unsafe#park 挂起线程后等待别唤醒重新竞争锁,或者超出最大等待时间后自动解除挂起状态,重新加入竞争。
  7. 若最终依然没有获取锁,则调用 cancelAcquire 方法,则释放相关资源。

锁的阻塞式获取(无限等待)

ReentrantLock#lock 方法的逻辑与 ReentrantLock#tryLock(long timeout, TimeUnit unit) 类似,只是前者没有时间限制,会无限期地重试竞争锁,除非当前线程被 interrupt 才会停止,一般来讲此种方式不推荐使用,因为在大部分的业务场景中我们往往会设定一个当前业务所能容忍的最大耗时,长时间地阻塞并没有意义,同时会导致系统中业务竞争该资源时被阻塞,这时候我们更倾向于制造一种快速失败的熔断效应,例如在分布式锁的实现中往往会在中间件设定锁的最大存活时间,避免死锁问题的产生。

Redisson 中分布式锁实现带来的启发


锁的获取

Redisson 作为一个优秀的 Redis 开源客户端,它最大的意义在于将各种 Redis 操作抽象为我们所熟悉的 Java 对象,使我们在使用 Redis 的过程中隔离中间件本身的复杂性,就像在进行 Java 原生编程那样自然。

@Override
public RLock getLock(String name) {
    return new RedissonLock(connectionManager.getCommandExecutor(), name);
}

显然在分布锁层面上 Redisson 也为我们提供了这样的抽象,我们可以借用 RedissonClient#getLock 获取一个 RedissonLock 对象,其中已经封装了我们需要的加锁解锁等操作。

<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    internalLockLeaseTime = unit.toMillis(leaseTime);

    return evalWriteAsync(getName(), LongCodec.INSTANCE, command,
            "if (redis.call('exists', KEYS[1]) == 0) then " +
                    "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return nil; " +
                    "end; " +
                    "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                    "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return nil; " +
                    "end; " +
                    "return redis.call('pttl', KEYS[1]);",
            Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}

RedissonLock#tryLockInnerAsync 方法是加锁的核心逻辑,Redisson 的操作几乎都是基于 Lua 脚本实现的

  1. 判断以当前锁名称为 key 的缓存是否存在。
  2. 若 key 不存在,则存入一个 Hash 结构,Hash 中的 key 为当前客户端ID + 线程ID,也就是说可以精确标识某一服务中的某一线程,Hash 中的 value 为 1。设置锁的缓存时间,避免客户端宕机死锁。加锁成功直接返回。
  3. 若 key 所对应的 Hash 表已存在,则直接对其中当前线程对应的 value 进行自增操作,并重新设置过期时间,加锁成功返回。
  4. 若加锁失败,则返回当前锁的过期剩余时间。

锁的等待

那么 Redisson 是如何实现锁的等待机制的呢?我们知道对于一个成熟的分布式锁设计, 当我们第一次尝试获取锁失败后,在一定时间范围内需要一种重试机制,帮忙我们多次竞争锁。但是这种重试机制该如何设计呢?如果在短时间内密集重试实际是没有意义的,因为此时别的业务线程还未释放该锁,此时的重试行为只是无端增加了对于中间件的访问量,在极端情况下可能造成流量激增,甚至造成中间件服务宕机(本人在实际场景中由于锁设计不当就遇到过此类问题)。也就是我们需要模仿一种类似于 AQS 中那样的 park 机制,当这个锁未被它的持有者释放时,应该挂起其余的竞争者线程,避免无意义的重试,直到该锁被释放时再唤醒它们加入新的竞争。这种行为在单体服务中实现是相对容易的,而在分布式系统中意味着需要服务间的通信,因为当服务A的线程获取到锁时,必须挂起服务B中的线程,这无疑增加了复杂性。那么 Redisson 是怎么做的?它利用了 Redis 中的发布订阅模式。我们可以想象成每一个锁都持有一个 Topic 主题,当一个线程竞争锁失败后,它会成为这个 TopicSubscriber 订阅者,一旦 Topic 的内容被修改为可竞争状态,它便解除挂起状态,重新加入竞争,除非超出最大等待时间解除订阅,返回失败。

RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
    if (!subscribeFuture.cancel(false)) {
        subscribeFuture.onComplete((res, e) -> {
            if (e == null) {
                unsubscribe(subscribeFuture, threadId);
            }
        });
    }
    acquireFailed(threadId);
    return false;
}

那么我们可以猜测在 Redisson 的解锁的逻辑中必然存在发布消息的逻辑。

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                    "return nil;" +
                    "end; " +
                    "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                    "if (counter > 0) then " +
                    "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                    "return 0; " +
                    "else " +
                    "redis.call('del', KEYS[1]); " +
                    "redis.call('publish', KEYS[2], ARGV[1]); " +
                    "return 1; " +
                    "end; " +
                    "return nil;",
            Arrays.asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}
  1. 如果 Hash 表中当前线程持有的锁已经不存在,表示锁可能已经过期被自动释放,此时直接返回即可。
  2. 减小 Hash 表中的 value 值,如果之后 value 依然大于0,表示锁依然被持有,重试设置缓存时间后返回。
  3. 如果 value 值已经被减为0,则表示锁已经被完全释放,删除 Hash 表缓存,并发布 Topic 通知。

另外由于 Hash 表中的 key 值是与线程ID相关的,所以并不会出现A线程误释放B线程锁的情况,对于分布式锁设计来说这也是一个重点所在。

Redis 分布式锁的简易实现


在实际的工作中,使用诸如 RedissonCurator 这样的开源客户端帮助我们实现分布锁往往是较为健壮可靠的,从不重复造轮子的原则出发,这样的基础组件已经经过前人不断的验证,可以帮忙我们构建稳定的系统。但有时基于项目的现实情况,我们需要自己编码,以下是我实现的 Redis 分布式锁简易版本,还有一些细节待完善,仅作记录。

锁的抽象

package com.wanshifu.user.framework.business.cache.lock;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 分布式锁
 * {@link #tryLock(String)} 尝试获取锁,不阻塞立刻返回
 * {@link #tryLock(String, int)} 设定重试次数,重试完获取不到锁则抛出 {@link DistributedLockException}
 * {@link #tryLock(String, long, TimeUnit)} 在指定时间内不断尝试获取锁,超出时间获取不到则抛出 {@link DistributedLockException}
 * @author zhangyifan@wshifu.com
 */
public interface DistributedLock {

    boolean tryLock(String requestId);

    void unlock(String requestId);

    default boolean tryLock(String requestId, int retryCount) throws DistributedLockException {
        if (retryCount <= 0) {
            return tryLock(requestId);
        }
        AtomicInteger incrementer = new AtomicInteger(0);
        while (true) {
            if (incrementer.incrementAndGet() > retryCount) {
                throw new DistributedLockException(String.format("acquire distributed lock failed, retried over %s times!", retryCount));
            }
            if (tryLock(requestId)) {
                return true;
            } else {
                Thread.yield();
            }
        }
    }

    default boolean tryLock(String requestId, long time, TimeUnit timeUnit) throws DistributedLockException {
        long deadline = System.nanoTime() + timeUnit.toNanos(time);
        while (true) {
            if (deadline - System.nanoTime() <= 0) {
                throw new DistributedLockException(String.format("acquire distributed lock failed, retried over %s %s!", time, timeUnit));
            }
            if (tryLock(requestId)) {
                return true;
            } else {
                Thread.yield();
            }
        }
    }

}

Redis 实现

package com.wanshifu.user.framework.business.cache.lock;

import com.wanshifu.framework.redis.autoconfigure.component.RedisHelper;
import com.wanshifu.user.framework.business.core.constant.CacheKeyConstant;

import java.util.concurrent.TimeUnit;

/**
 * 分布式锁 redis 实现
 * @author zhangyifan@wshifu.com
 */
public class RedisLock implements DistributedLock {

    private final RedisHelper redisHelper;

    private final String redisKey;

    private final long expire;

    private final TimeUnit timeUnit;

    public RedisLock(RedisHelper redisHelper, String name) {
        this(redisHelper, name, 5, TimeUnit.SECONDS);
    }

    public RedisLock(RedisHelper redisHelper, String name, long expire, TimeUnit timeUnit) {
        this.redisHelper = redisHelper;
        this.redisKey = String.format(CacheKeyConstant.DISTRIBUTED_LOCK_KEY, name);
        this.expire = expire;
        this.timeUnit = timeUnit;
    }

    @Override
    public boolean tryLock(String requestId) {
        return redisHelper.tryGetDistributedLock(redisKey, requestId, (int) timeUnit.toMillis(expire));
    }

    @Override
    public void unlock(String requestId) {
        redisHelper.releaseDistributedLock(redisKey, requestId);
    }

}

注解支持

package com.wanshifu.user.framework.business.cache.lock;

import java.lang.annotation.*;
import java.util.concurrent.TimeUnit;

/**
 * @see DistributedLock
 * @see RedisLock
 * @author zhangyifan@wshifu.com
 */
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface DistributedLocked {

    /**
     * 锁名称,默认为MD5(className + methodName)
     */
    String name() default "";

    /**
     * 过期时间,默认为5s
     */
    int expireTime() default 5;

    /**
     * 过期时间单位,默认为秒
     */
    TimeUnit expireUnit() default TimeUnit.SECONDS;

    /**
     * 业务ID,默认与name相同
     */
    String requestId() default "";

    /**
     * 最大等待时间,默认为60s
     */
    long waitTime() default 60;

    /**
     * 最大等待时间单位,默认为秒
     */
    TimeUnit waitUnit() default TimeUnit.SECONDS;

    /**
     * 重试次数,当重试次数>0时,优先使用重试次数,忽略等待时间
     */
    int retryCount() default 0;

    /**
     * 锁的实现,暂时只支持Redis方式
     */
    Class<? extends DistributedLock> implementationClz() default RedisLock.class;

    /**
     * 获取不到锁时的降级处理,默认抛出异常 {@link DistributedLockException}
     */
    String fallbackMethod() default "";

}

AOP 支持

package com.wanshifu.user.framework.business.cache.lock;

import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation;
import org.apache.commons.lang3.StringUtils;
import org.springframework.aop.aspectj.AspectJExpressionPointcutAdvisor;
import org.springframework.cglib.core.ReflectUtils;
import org.springframework.util.DigestUtils;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
import java.util.Objects;

/**
 * 针对 {@link DistributedLocked} 切面处理
 * @author zhangyifan@wshifu.com
 */
public class DistributedLockedAdvisor extends AspectJExpressionPointcutAdvisor {

    private static final String POINTCUT = "@annotation(com.wanshifu.user.framework.business.cache.lock.DistributedLocked)";

    @Resource
    private DistributedLockProvider distributedLockProvider;

    @PostConstruct
    public void init() {
        setExpression(POINTCUT);
        setAdvice(new DistributedLockedAdvice(distributedLockProvider));
    }

    private static class DistributedLockedAdvice implements MethodInterceptor {

        private final DistributedLockProvider distributedLockProvider;

        public DistributedLockedAdvice(DistributedLockProvider distributedLockProvider) {
            this.distributedLockProvider = distributedLockProvider;
        }

        @Override
        public Object invoke(MethodInvocation invocation) throws Throwable {
            Method targetMethod = invocation.getMethod();
            DistributedLocked[] distributedLockeds = targetMethod.getDeclaredAnnotationsByType(DistributedLocked.class);
            if (Objects.isNull(distributedLockeds) || distributedLockeds.length == 0) {
                throw new RuntimeException("DistributedLocked annotation not found!");
            }
            DistributedLocked distributedLocked = distributedLockeds[0];
            String lockName = distributedLocked.name();
            if (StringUtils.isBlank(lockName)) {
                lockName = DigestUtils.md5DigestAsHex((targetMethod.getDeclaringClass().getName() + targetMethod.getName()).getBytes(StandardCharsets.UTF_8));
            }
            DistributedLock lock = distributedLockProvider.getRedisLock(lockName, distributedLocked.expireTime(), distributedLocked.expireUnit());
            String requestId = distributedLocked.requestId();
            if (StringUtils.isBlank(requestId)) {
                requestId = lockName;
            }
            int retryCount = distributedLocked.retryCount();
            try {
                if (retryCount > 0 && lock.tryLock(requestId, retryCount)) {
                    return invocation.proceed();
                }
                if (lock.tryLock(requestId, distributedLocked.waitTime(), distributedLocked.waitUnit())) {
                    return invocation.proceed();
                }
                throw new DistributedLockException("acquire distributed lock failed!");
            } catch (DistributedLockException ex) {
                String fallbackMethodName = distributedLocked.fallbackMethod();
                if (StringUtils.isBlank(fallbackMethodName)) {
                    throw ex;
                }
                Method fallbackMethod = ReflectUtils.findDeclaredMethod(targetMethod.getDeclaringClass(),
                        fallbackMethodName, targetMethod.getParameterTypes());
                if (Objects.isNull(fallbackMethod)) {
                    throw ex;
                }
                if (!targetMethod.getReturnType().isAssignableFrom(fallbackMethod.getReturnType())) {
                    throw ex;
                }
                fallbackMethod.setAccessible(true);
                return fallbackMethod.invoke(invocation.getThis(), invocation.getArguments());
            } finally {
                lock.unlock(requestId);
            }
        }
    }

}

文章作者: Ethan Zhang
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Ethan Zhang !
评论
 上一篇
观 Spring Cloud OpenFeign 设计有感 观 Spring Cloud OpenFeign 设计有感
Spring Cloud Feign 是 Spring 官方提供的一个轻量级 web 声明式客户端,它的设计初衷在于使我们更容易地编写 web 客户端程序,它将一个 web 请求所需要的信息抽象为基于接口和注解的元信息,使我们从复杂的编码
2020-09-15 Ethan Zhang
下一篇 
响应式编程之浅见 响应式编程之浅见
在计算中,响应式编程或反应式编程(英语:Reactive programming)是一种面向数据流和变化传播的声明式编程范式。这意味着可以在编程语言中很方便地表达静态或动态的数据流,而相关的计算模型会自动将变化的值通过数据流进行传播。 以
2020-06-28 Ethan Zhang