分布式信号量(Redis)

2023-12-25 14:22:32

什么是信号量

信号量,由并发编程领域的先锋人物Edsger Wybe Dijkstra提出的一种解决同步不同执行线程的方法。

信号量(英语:semaphore)又称为信号标,是一个同步对象,用于保持在0至指定最大值之间的一个计数值。当线程完成一次对该semaphore对象的等待(wait)时,该计数值减一;当线程完成一次对semaphore对象的释放(release)时,计数值加一。当计数值为0,则线程等待该semaphore对象不再能成功直至该semaphore对象变成signaled状态。semaphore对象的计数值大于0,为signaled状态;计数值等于0,为nonsignaled状态。

简单理解,如果我们把工作区理解为一座房子,那信号量就是房子的入场券,券的数目是固定的,所有需要进入房子的人都需要拿到券,离开房子时需要返还券,未能拿到券的人可以选择等待或者直接离开。由于券是固定的,保证同时进入房子的人最多就是券的个数。当券的个数限制为1时,那么这个信号量就是互斥锁。
在单机场景下,进程内的信号量可以满足生产需求,但是在分布式场景下,面对多进程间的协同工作机制,可能就需要构建分布式的信号量来满足需求了。
因为涉及到多节点之间的协同工作,必须要一个可靠的中间件作为协调中心调度这一切。Redis拥有极快的响应速度和强大的数据结构,很适合作为这个调度中心。

实现思路

思路1:利用列表实现

  1. 获取、释放和计数机制

实现信号量的第一步就是需要有获取、释放以及计数的机制,考虑到这些特点,首先想到的就是列表这一数据结构。我们可以构建一个列表,保持列表的长度不变,作为信号量的大小。具体操作有:

  • 初始化:初始化信号量时需要初始化列表,并往其中放入信号量大小的数据。为保证原子性,只能由一个客户端写入,其他客户端直接返回(可以用SETNX保证,PS:其他客户端返回进行获取信号量操作时,可能此时列表并未设置成功,可能导致数据的不一致,可能需要分布式阻塞锁等方式阻塞其他客户端以保证一致性);
  • 获取:Redis的LPOP/RPOP天然匹配信号量的tryAcquire,而BLPOP/BRPOP正好对应阻塞获取;
  • 释放:自然是将列表中的数据PUSH回去;
  • 计数:列表的数目表示信号量的大小。
  1. 信号量的过期机制
    为了防止因客户端崩溃导致信号量无法释放问题,我们需要记录每个获取到信号量的客户端获取的时间,并当那些超过过期时间的客户端释放,即将其PUSH回列表。但是需要考虑的是,和初始化操作相同,此时应该保证只有一个客户端在做这件事。
    记录客户端获取时间,可以直接使用redis的基本的字符串数据结构即可。

思路2:利用有序集合实现

  1. 获取、释放和计数机制

我们构建一个有序集合,这个有序集合的member即为各个客户端的身份信息,其score即为获取的时间,这样我们可以根据获取时间的先后排序,只有排名比信号量大小要小的客户端被允许拿到信号量,这样即可实现。具体操作有:

  • 初始化:不需要对redis服务端做任何操作,只需要返回各自客户端需要初始化的一些参数;
  • 获取:首先向有序集合中添加一个member,若能获取到信号量,则返回成功;若获取不到,对于非阻塞获取,直接返回失败即可;对于阻塞获取,我们可以使用BRPOP实现阻塞等待;
  • 释放:需要删除有序集合中的member;当然还应该对等待列表中pPUSH一个值以使得等待列表能够尝试继续获取信号量;
  • 计数:有序集合天然支持排序。
  1. 信号量的过期制度
    如果有序集合的score对应的即是获取的时间,那么每次获取信号量之前去除一下超时的客户端。

方案对比
以上叙述了两种实现信号量的简易方案,可以简单比较如下:

  • 初始化:利用列表实现的信号量初始化时需要初始化服务端,且为了保证数据一致性,需要保证:只有一个客户端在初始化,且最好其他客户端等待此客户端初始化完全后才能工作(可能需要分布式锁实现);而利用有序集合实现的客户端则不需要;
  • 保证信号量大小不变:利用列表实现时,由各个客户端保证信号量大小,不管是过期信号量的回收还是正常释放,都是由客户端将列表元素PUSH回去,虽然分布式锁等机制能够保证安全,但是依然存在风险以及性能损耗;而有序集合实现时,由服务端保证数据的一致性,客户端唯一需要做的就是比较获取到的分数是否符合要求。

综上所述,我们采取有序集合的方式实现分布式的信号量。

方案和代码实现(Golang)

在上述思路的前提下,我们可能还需要做到以下几点。
公平的计数方式
虽然上述思路中选择将各个客户端的时间信息作为score,但是考虑到各个主机的系统时间的差异,所以以时间戳作为score可能并不是很公平。
考虑到此,我们在redis服务端维护一个自增的计数变量,每次发出请求需要对其自增,并以获取到的自增值作为score传入有序集合中。
考虑到并发,假设目前信号量池中只有一个信号量,若客户端A首先获取了自增量,客户端B随后获取了自增量,按照道理此时应该是客户端A获取到信号量,但是假如客户端B比客户端A早加入有序集合,那么此时客户端B将拿到信号量,因为其自增量属于第一位,其后客户端A再加入有序集合,也将获取到信号量,因为其score比B还小,这将导致拿到信号量的客户端多于信号量大小。所以我们必须保证获取自增量和加入有序集合是一个原子操作,这里,我们可以用lua脚本实现。
代码实现如下,其中r.ownerKey表示有序集合,r.incrKey表示自增量,r.identifyId()表示客户端的唯一标识,我们使用hostname-pid-goroutineId作为唯一标识,r.permit表示信号量大小。

var acquireScript = redis.NewScript(`
   local cnt = redis.call("INCR", KEYS[2])
   redis.call("ZADD", KEYS[1], cnt, ARGV[1])
   print(ARGV[4])
   local res = redis.call("ZRANK", KEYS[1], ARGV[1])
   if res >= tonumber(ARGV[2]) then
      return 0
   else
      return 1
   end
`)

func (r *RedisSem) TryAcquire() bool {   
   keys := []string{r.ownerKey r.incrKey}
   values := []interface{}{r.identifyId(), r.permit}
   res, err := acquireScript.Run(context.Background(), r.rc, keys, values...).Int()
   if err != nil || res == 0 {
      r.rc.ZRem(context.Background(), r.ownerKey, r.identifyId())
      return false
   }

   return true
}

func (r *RedisSem) identifyId() string {
   hostname, _ := os.Hostname()
   // 使用 hostname-pid-goroutineId 作为唯一标识
   return fmt.Sprintf("%v-%v-%v", hostname, os.Getpid(), util.GoroutineId())
}

超时清除机制

考虑到客户端可能因为某种原因panic或者未及时释放信号量,这时候需要一定的机制去除掉超时的信号量客户端,这里我们利用另一个有序集合(名为r.timeKey,其member和r.ownerKey一致)记录获取信号量的时间,首先删除r.timeKey中超时的成员,然后使用redis的ZINTERSTORE指令取两个有序集合中的交集,并将较小的分数(一般来说,自增量都会小于时间戳,为尽力保证这点,我们取纳秒时间戳作为r.timeKey的score)重新赋值回r.ownerKey。

考虑到高并发时每个客户端都执行一遍以上操作,这将是没有必要且耗费性能的,因为超时清除机制只能算是一种保底策略,无需时时刻刻都执行,且对时间精度的要求并不需要那么高。为此,我们设置一个r.clearKey,采取redis的SETNX保证一个时刻只有一个客户端在执行此操作。并且设置此值的时间间隔设置为r.interval,以保证并发性能。

代码实现如下:

var acquireScript = redis.NewScript(`
   local cnt = redis.call("INCR", KEYS[4])
   redis.call("ZADD", KEYS[1], ARGV[2], ARGV[1])
   redis.call("ZADD", KEYS[2], cnt, ARGV[1])
   print(ARGV[4])
   local res = redis.call("ZRANK", KEYS[2], ARGV[1])
   if res >= tonumber(ARGV[3]) then
      return 0
   else
      return 1
   end
`)

func (r *RedisSem) TryAcquire() bool {
   // 首先清除超时信号量
   if r.tryLock() {
      // 1. 清除时间戳 zset 的超时数据
      r.rc.ZRemRangeByScore(context.Background(), r.timeKey, "0", strconv.FormatInt(time.Now().UnixNano()-r.timeout, 10))
      // 2. 取交集,取最小值存入ownKey(一般而言最小值肯定是cnt),使得ownKey也过滤掉超时信号量
      r.rc.ZInterStore(context.Background(), r.ownerKey, &redis.ZStore{
         Keys:      []string{r.timeKey, r.ownerKey},
         Aggregate: "MIN",
      })
   }
   
   keys := []string{r.timeKey, r.ownerKey, r.waitKey, r.incrKey}
   values := []interface{}{r.identifyId(), time.Now().UnixNano(), r.permit}
   res, err := acquireScript.Run(context.Background(), r.rc, keys, values...).Int()
   if err != nil || res == 0 {
      r.rc.ZRem(context.Background(), r.timeKey, r.identifyId())
      r.rc.ZRem(context.Background(), r.ownerKey, r.identifyId())
      return false
   }

   logrus.Infof("[%s] acquire the redis semephore[%s] success!", r.identifyId(), r.name)

   return true
}


func (r *RedisSem) tryLock() bool {
   booCmd := r.rc.SetNX(context.Background(), r.clearKey, "true", r.interval)
   if booCmd.Err() != nil || !booCmd.Val() {
      return false
   }
   return true
}

func (r *RedisSem) identifyId() string {
   hostname, _ := os.Hostname()
   // 使用 hostname-pid-goroutineId 作为唯一标识
   return fmt.Sprintf("%v-%v-%v", hostname, os.Getpid(), util.GoroutineId())
}

阻塞请求

对于信号量而言,不仅有以上的TryAcquire尝试获取,也会有阻塞获取的需求。我们采用redis的列表的BRPOP实现阻塞,如下所示。

func (r *RedisSem) Acquire(ctx context.Context) error {
   ok := r.TryAcquire()
   if !ok {
      cmd := r.rc.BRPop(ctx, 0, r.waitKey)
      if cmd.Err() != nil {
         logrus.Errorf("[%s] RedisSem Acquire semephore [%s] BRPop failed: %+v", r.identifyId(), r.name, cmd.Err())
         return ErrGetSem
      }
      return r.Acquire(ctx)
   }
   return nil
}

其释放如下所示:

func (r *RedisSem) Release() {
   r.rc.ZRem(context.Background(), r.timeKey, r.identifyId())
   r.rc.ZRem(context.Background(), r.ownerKey, r.identifyId())
   r.rc.RPush(context.Background(), r.waitKey, r.identifyId())
   r.rc.Expire(context.Background(), r.waitKey, 5*time.Second)
   r.releaseCh <- struct{}{}
}

信号量续约机制

由于我们设置了超时时间,目的是为了预防有些客户端panic后无法释放信号量。但是可能有些操作会很耗时,所以我们可以在超时时间的一半左右开始重新设置r.timeKey,这样就可以实现续约机制。代码如下,我们只需要在获取信号量成功的时候,异步执行以下后台任务。

// 用于信号量的续约
func (r *RedisSem) extension(identifyId string) {
   interval := util.SetIf0(r.opt.timeout, 2*time.Minute) / 2
   tick := time.NewTicker(interval)
   defer tick.Stop()

   for {
      select {
      case <-tick.C:
         intCmd := r.rc.ZAdd(context.Background(), r.timeKey, &redis.Z{
            Score:  float64(time.Now().UnixNano()),
            Member: identifyId,
         })
         if intCmd.Err() != nil {
            logrus.Errorf("[%s] extension the redis semaphore[%s] failed: %+v", identifyId, r.name, intCmd.Err())
         }
      case <-r.releaseCh:
         // 高并发时,可能该信号量在release之后还进行了续约,所以我们删除掉这次信号量,以保证安全
         r.rc.ZRem(context.Background(), r.timeKey, identifyId)
         r.rc.ZRem(context.Background(), r.ownerKey, identifyId)
         return
      }
   }
}

文章来源:https://blog.csdn.net/ldxxxxll/article/details/135197052
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。