李三十一

V1

2022/06/10阅读:13主题:全栈蓝

还活在上个时代,Etcd 3.0 实现分布式锁竟如此简单!

上下文

传统 Python 单机系统部署中,由于 GIL 的存在,相同进程中我们可以不用处理并发问题。但是随着业务发展需要,原有单机系统演变成分布式或多进程系统后。这将使原有的单机单进程并发控制策略失效。为了解决该问题需要引入一种跨进程、跨机器的互斥锁机制来控制共享资源的访问,这也就是分布式锁的由来。

所以,分布式锁的引入是为了保障多台机器或多个进程对共享资源读写的同步,保证数据的最终一致性。

分布式锁天生具有如下特点:

  • 互斥性:任何时刻,只允许单客户端(单进程、单节点)能持有锁。
  • 安全性:能有效避免死锁情况,即使客户端持有锁机器发生崩溃或退出故障,也需要保障锁能被正确释放。
  • 可用性:具备高可用能力,锁服务节点故障(即使宕机)也不应影响服务的正常运行。
  • 可重入性:针对同一个锁,加锁和解锁必须为同一个进程,即对称性。

常见分布式锁实现方式:

  • 数据库:采用乐观锁、悲观锁或主键 ID 实现。
  • 缓存:采用 Redis 或 RedLock (Redis组件) 实现。
  • 一致性算法:采用 Zookeeper、Chubby 或 Etcd 实现。

以上,号主知道的常见实现大致就这些了。接下来的篇幅主要讲解如何使用 Etcd 实现分布式锁业务,我们往下看。

基于 ETCD 的分布式锁实现

环境安装

为了保证锁服务可用性,我们搭建一个包含三个 Etcd 节点的 docker 集群环境,此处通过docker-compose进行实验配置。熟悉部署安装的号友,可跳过此部分。

步骤一:镜像拉取

docker pull bitnami/etcd:3.5.2

步骤二:编辑docker-compose.yml

version: '3.5'

services:
  etcd1:
    container_name: builder-etcd1
    image: bitnami/etcd:3.5.2
    ports:
      - 12379:2379
    environment:
      - ALLOW_NONE_AUTHENTICATION=yes
      - ETCD_AUTO_COMPACTION_MODE=revision
      - ETCD_AUTO_COMPACTION_RETENTION=1002
      - ETCD_QUOTA_BACKEND_BYTES=4294967296
      - ETCD_NAME=etcd1
      - ETCD_INITIAL_ADVERTISE_PEER_URLS=http://etcd1:2380
      - ETCD_LISTEN_PEER_URLS=http://0.0.0.0:2380
      - ETCD_LISTEN_CLIENT_URLS=http://0.0.0.0:2379
      - ETCD_ADVERTISE_CLIENT_URLS=http://etcd1:2379
      - ETCD_INITIAL_CLUSTER_TOKEN=etcd-cluster
      - ETCD_INITIAL_CLUSTER=etcd1=http://etcd1:2380,etcd2=http://etcd2:2380,etcd3=http://etcd3:2380
      - ETCD_INITIAL_CLUSTER_STATE=new
    volumes:
      - ${DOCKER_ROOT_DIR:-.}/volumes/etcd/data1:/bitnami/etcd

  etcd2:
    container_name: builder-etcd2
    image: bitnami/etcd:3.5.2
    ports:
      - 22379:2379
    environment:
      - ALLOW_NONE_AUTHENTICATION=yes
      - ETCD_AUTO_COMPACTION_MODE=revision
      - ETCD_AUTO_COMPACTION_RETENTION=1002
      - ETCD_QUOTA_BACKEND_BYTES=4294967296
      - ETCD_NAME=etcd2
      - ETCD_INITIAL_ADVERTISE_PEER_URLS=http://etcd2:2380
      - ETCD_LISTEN_PEER_URLS=http://0.0.0.0:2380
      - ETCD_LISTEN_CLIENT_URLS=http://0.0.0.0:2379
      - ETCD_ADVERTISE_CLIENT_URLS=http://etcd2:2379
      - ETCD_INITIAL_CLUSTER_TOKEN=etcd-cluster
      - ETCD_INITIAL_CLUSTER=etcd1=http://etcd1:2380,etcd2=http://etcd2:2380,etcd3=http://etcd3:2380
      - ETCD_INITIAL_CLUSTER_STATE=new
    volumes:
      - ${DOCKER_ROOT_DIR:-.}/volumes/etcd/data2:/bitnami/etcd

  etcd3:
    container_name: builder-etcd3
    image: bitnami/etcd:3.5.2
    ports:
      - 32379:2379
    environment:
      - ALLOW_NONE_AUTHENTICATION=yes
      - ETCD_AUTO_COMPACTION_MODE=revision
      - ETCD_AUTO_COMPACTION_RETENTION=1002
      - ETCD_QUOTA_BACKEND_BYTES=4294967296
      - ETCD_NAME=etcd3
      - ETCD_INITIAL_ADVERTISE_PEER_URLS=http://etcd3:2380
      - ETCD_LISTEN_PEER_URLS=http://0.0.0.0:2380
      - ETCD_LISTEN_CLIENT_URLS=http://0.0.0.0:2379
      - ETCD_ADVERTISE_CLIENT_URLS=http://etcd3:2379
      - ETCD_INITIAL_CLUSTER_TOKEN=etcd-cluster
      - ETCD_INITIAL_CLUSTER=etcd1=http://etcd1:2380,etcd2=http://etcd2:2380,etcd3=http://etcd3:2380
      - ETCD_INITIAL_CLUSTER_STATE=new
    volumes:
      - ${DOCKER_ROOT_DIR:-.}/volumes/etcd/data3:/bitnami/etcd

networks:
  default:
    name: builder_dev

部署配置文件 docker-compose.yml 下载,详见 [1]

步骤三:启动服务

docker-compose -f docker-compose.yml up

步骤四:验证集群

验证集群节点的版本

docker exec -it builder-etcd1 /bin/bash -c "etcd --version"
docker exec -it builder-etcd2 /bin/bash -c "etcd --version"
docker exec -it builder-etcd3 /bin/bash -c "etcd --version"

#
 输出
etcd Version: 3.5.2
Git SHA: 99018a77b
Go Version: go1.16.3
Go OS/Arch: linux/amd64

验证三个节点返回的数据一致

docker exec -it builder-etcd1 /bin/bash -c "etcdctl put greeting \"hello, etcd\""
# OK
docker exec -it builder-etcd1 /bin/bash -c "etcdctl get greeting"
docker exec -it builder-etcd2 /bin/bash -c "etcdctl get greeting"
docker exec -it builder-etcd3 /bin/bash -c "etcdctl get greeting"
# 输出
greeting
hello, etcd

锁的实现基础机制

此处为 Etcd 核心机制,下文讲解的分布式锁就是基于这些基础功能和基础特性实现。

Lease 机制:即租约机制(TTL),Etcd 可以为存储的 kv 对设置租约,当租约到期,kv 将失效删除;当然也支持 refresh 续约。

Revision 机制:存储的每个 key 带有一个 Revision 属性值,Etcd 每进行一次事务操作,对应的全局 Revision 值都会加一,因此每个 key 对应的 Revision 属性值都是全局唯一的。通过比较 Revision 的大小就能知道写操作的顺序。

公平锁机制:多个程序同时抢锁时,会根据 Revision 值大小依次获得锁,可以有效避免 “惊群效应”,公平获取。

Prefix 机制:即前缀机制,可以根据前缀获取该目录下所有的 key 及对应的属性(包括 key, value 以及 revision 等)。

Watch 机制:即监听机制,Watch 机制支持 Watch 某个固定的 key,也支持 Watch 一个目录(前缀机制),当被 Watch 的 key 或目录发生变化,客户端将收到通知(🐮🐮🐮)。

锁的实现逻辑

在 Etcd 的 clientv3 包中,官方实现了分布式锁,为了了解它的工作机制和实现逻辑,我们先做一个简单的模拟使用

下面我们通过etcdctl进行锁模拟。

# 终端 1
docker exec -it builder-etcd1 /bin/bash -c "etcdctl lock /lock/kv/uuid_a"
# /lock/kv/uuid_a/12f78036a2766634

#
 终端 2
docker exec -it builder-etcd2 /bin/bash -c "etcdctl lock /lock/kv/uuid_a"
# 终端 3
docker exec -it builder-etcd3 /bin/bash -c "etcdctl lock /lock/kv/uuid_a"

#
 终端 1 结束,显示终端 2
# /lock/kv/uuid_a/41ce8036a27a471d
# 终端 2 结束,显示终端 3
# /lock/kv/uuid_a/58538036a276e81f

由上,三个不同节点同时对/lock/kv/uuid_a进行加锁,但是却被依次获得锁(简单演示,看看就行!)。

通过分析分析源码,发现 Etcd 分布式锁的实现主要集中在 go.etcd.io/etcd/client/v3/concurrency 包中,通过以下三个方法进行提供:

func NewMutex(s *Session, pfx string) *Mutex        // 新建一个 Mutex 对象
func (m *Mutex) Lock(ctx context.Context) error     // 获取锁,阻塞调用
func (m *Mutex) Unlock(ctx context.Context) error   // 释放锁

由上,使用 Etcd 提供分布式锁变得比预期简单许多,首先实例化一个Mutex对象,然后Lock尝试抢占锁,之后进行业务处理,最后Unlock释放锁即可。

我们先看看Lock函数的实现原理,具体如下:

// Lock locks the mutex with a cancelable context. If the context is canceled
// while trying to acquire the lock, the mutex tries to clean its stale lock entry.
func (m *Mutex) Lock(ctx context.Context) error {
 resp, err := m.tryAcquire(ctx)
 ...
}
func (m *Mutex) tryAcquire(ctx context.Context) (*v3.TxnResponse, error) {
 s := m.s
 client := m.s.Client()

 m.myKey = fmt.Sprintf("%s%x", m.pfx, s.Lease())
 cmp := v3.Compare(v3.CreateRevision(m.myKey), "="0)
 // put self in lock waiters via myKey; oldest waiter holds lock
 put := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease()))
 // reuse key in case this session already holds the lock
 get := v3.OpGet(m.myKey)
 // fetch current holder to complete uncontended path with only one RPC
 getOwner := v3.OpGet(m.pfx, v3.WithFirstCreate()...)
 resp, err := client.Txn(ctx).If(cmp).Then(put, getOwner).Else(get, getOwner).Commit()
 if err != nil {
  return nil, err
 }
 m.myRev = resp.Header.Revision
 if !resp.Succeeded {
  m.myRev = resp.Responses[0].GetResponseRange().Kvs[0].CreateRevision
 }
 return resp, nil
}

首先通过一个事务来尝试加锁,事务围绕着由 m.pfxs.Lease() 组成的 Key,包含 4 个基础操作:cmpputgetgetOwner,具体如下:

  • cmp:比较 Key 的 Revision 版本是否为 0,其中 0 代表该锁不存在
  • put:向 Key 中存储超时时间为 Session 默认时长的空值(加锁操作)
  • get:通过 Key 查询写入
  • getOwner:由于其他 Session 也用同样 m.pfx 加锁,且每个 LeaseID 不同。通过 m.pfx 和 WithFirstCreate()来查询,第一次肯定会 put 成功,但也只有最早使用这个 pfx 的 Session 才是持有锁的,所以这个 getOwner 是该事务的关键。

整个分布式锁的实现方式,通过统一的前缀 m.pfx来 put,然后根据各自的Revision版本号来排队获取锁,避免了惊群效应,效率非常高。

如上图所示,共有 4 个 Session 进行加锁,根据 Revision 排队规则,最终获取锁的顺序分别为:Session2 -> Session3 -> Session1 -> Session4

好了,分布式相关知识点就讲解完了,讲真Watch机制是我的最爱,平时项目中使用也最多。但再次看完这部分原理后,发现Revision机制的实现真的很巧妙,不得不佩服这帮巨佬。

Python 分布式锁使用

接下来,使用 Etcd v3 版 SDK 进行一个简单的业务使用,查遍全网似乎只有 python-etcd3 [2] 这个小众包,号友们将就用,别吐槽。

# -*- coding: utf-8 -*-

import time
import etcd3

etcd = etcd3.client(host='127.0.0.1', port=2379)
lock_name = "uuid_a"

# 实现 A
lock = etcd.lock(lock_name, ttl=20)
lock.acquire()
print(lock.is_acquired())       # 获取加锁状态 或 具体业务
lock.release()

# 实现 B(推荐)
with etcd.lock(lock_name, ttl=20as lock:
    print(lock.is_acquired())   # 获取加锁状态 或 具体业务
    lock.refresh()

结语

本文从基础概念、Etcd 实现原理、Python 业务应用角度对分布式锁进行剖析学习,限于篇幅和个人水平,Go相关内容没有进行继续深入。

Etcd3.0 相对于 2.0 版本,官方使用 gRPC 替换 JSON,在效率、可靠性、可扩展性和易用性等方面都有了很大进步。

Python 对 3.0 版本支持较弱,主要体现在没有稳定的 SDK 包,且大部分包对 Python 的版本支持也仅限于 <= 3.7(😒)。

参考文件

  • [1] https://github.com/liyaodev/docker-compose
  • [2] https://github.com/kragniz/python-etcd3/tree/master

❤️❤️❤️读者每一份热爱都是笔者前进的动力! 我是三十一,感谢各位朋友:求点赞、求评论、求转发,大家下期见!

往期文章:

分类:

后端

标签:

后端

作者介绍

李三十一
V1