yuff100

V1

2023/05/26阅读:28主题:极简黑

分布式事务的21种武器 - 5

在分布式系统中,事务的处理分布在不同组件、服务中,因此分布式事务的ACID保障面临着一些特殊难点。本系列文章介绍了21种分布式事务设计模式,并分析其实现原理和优缺点,在面对具体分布式事务问题时,可以选择合适的模式进行处理。原文: Exploring Solutions for Distributed Transactions (5)

Duncan Meyer @Unsplash
Duncan Meyer @Unsplash

在不同业务场景下,可以有不同的解决方案,常见方法有:

  1. 阻塞重试(Blocking Retry)
  2. 二阶段和三阶段提交(Two-Phase Commit (2PC) and Three-Phase Commit (3PC))
  3. 基于后台队列的异步处理(Using Queues to Process Asynchronously in the Background)
  4. TCC补偿(TCC Compensation Matters)
  5. 本地消息表(异步保证)/发件箱模式(Local Message Table (Asynchronously Ensured)/Outbox Pattern)
  6. MQ事务(MQ Transaction)
  7. Saga模式(Saga Pattern)
  8. 事件驱动(Event Sourcing)
  9. 命令查询职责分离(Command Query Responsibility Segregation, CQRS)
  10. 原子提交(Atomic Commitment)
  11. 并行提交(Parallel Commits)
  12. 事务复制(Transactional Replication)
  13. 一致性算法(Consensus Algorithms)
  14. 时间戳排序(Timestamp Ordering)
  15. 乐观并发控制(Optimistic Concurrency Control)
  16. 拜占庭容错(Byzantine Fault Tolerance, BFT)
  17. 分布式锁(Distributed Locking)
  18. 分片(Sharding)
  19. 多版本并发控制(Multi-Version Concurrency Control, MVCC)
  20. 分布式快照(Distributed Snapshots)
  21. 主从复制(Leader-Follower Replication)

本文将介绍一致性算法、时间戳排序以及乐观并发控制三种模式。

13. 一致性算法(Consensus Algorithms)
图片来源: https://www.baeldung.com/cs/consensus-algorithms-distributed-systems
图片来源: https://www.baeldung.com/cs/consensus-algorithms-distributed-systems
  • 系统中的所有节点都同意最终结果或决策。
  • 涉及如下步骤:
    1. 提议(Proposal) —— 一个节点向系统中的其他节点提出一个值
    2. 广播(Broadcast) —— 将建议的值广播到系统中所有节点
    3. 确认(Acknowledgment) —— 每个节点确认提议并向提议者返回确认消息
    4. 决定(Decision) —— 一旦提议者收到大多数节点的确认,就可以对提议的值做出决定
    5. 承诺(Commitment) —— 系统中所有节点提交决定
  • Paxos算法还包括以下几个步骤:
    1. 准备阶段(Prepare phase) —— 提议者选定提案,并向系统中的大多数节点发送准备请求
    2. 承诺阶段(Promise phase) —— 如果某个节点收到的准备消息序列号高于之前看到的序列号,将以承诺响应,并不再接受任何低于该序列号的提案
    3. 接受阶段(Accept phase) —— 如果提议者收到大多数节点的承诺,j就向大多数节点发送带有提案序列号的"接受"请求
    4. 已接受阶段(Accepted phase) —— 如果某个节点收到的接受消息序列号高于以前见过的任何接受请求序列号,就接受该提案,并向提议者和所有其他节点发送接受消息
    5. 学习阶段(Learn phase) —— 一旦提议者从大多数节点接收到被接受的消息,就可以提交提议。
from typing import List, Tuple

class PaxosNode:
    def __init__(self, node_id: int, nodes: List[int]):
        self.node_id = node_id
        self.nodes = nodes
        self.state = "proposed"
        self.proposed_value = None
        self.accepted_value = None
        self.accepted_round = -1
    
    def run_paxos(self, value: int) -> int:
        while True:
            if self.state == "proposed":
                self.proposed_value = value
                self.state = "prepare"
            
            if self.state == "prepare":
                max_round, max_val = self.prepare()
                if max_val is None:
                    self.state = "accept"
                else:
                    self.state = "proposed"
            
            if self.state == "accept":
                self.accepted_value = self.proposed_value
                self.accepted_round = max_round
                self.send_accept()
                self.state = "decided"
            
            if self.state == "decided":
                return self.accepted_value
    
    def prepare(self) -> Tuple[int, int]:
        max_round = -1
        max_val = None
        for node in self.nodes:
            round, val = node.receive_prepare()
            if round > max_round:
                max_round = round
                max_val = val
        
        return max_round, max_val
    
    def send_prepare(self, round: int):
        for node in self.nodes:
            node.receive_prepare_request(round, self.node_id)
    
    def receive_prepare_request(self, round: int, sender_id: int):
        if round > self.accepted_round:
            self.accepted_round = round
            self.send_prepare(round)
    
    def receive_prepare(self) -> Tuple[int, int]:
        return self.accepted_round, self.accepted_value
    
    def send_accept(self):
        for node in self.nodes:
            node.receive_accept_request(self.accepted_round, self.accepted_value)
    
    def receive_accept_request(self, round: int, value: int):
        if round >= self.accepted_round:
            self.accepted_round = round
            self.accepted_value = value
            self.send_accepted()
    
    def send_accepted(self):
        for node in self.nodes:
            node.receive_accepted(self.accepted_round, self.accepted_value)
    
    def receive_accepted(self, round: int, value: int):
        if round == self.accepted_round:
            self.proposed_value = value

示例代码

  • 示例代码为Paxos算法的一种实现,即使存在网络故障或其他可能发生的故障,也能在一组节点之间就某个值达成共识。
  • PaxosNode类表示分布式系统中参与Paxos算法的节点。
  • 构造函数接受2个参数,node_id是节点的ID,nodes是系统中所有节点的ID列表。
  • run_paxos方法运行Paxos算法,接受一个值作为输入,并返回系统中所有节点都同意的值。该方法将无限循环执行,直到商定一个值为止。
  • prepare方法向系统中所有节点发送"prepare"消息,并等待响应。该方法返回系统中其他节点可以接受的最大整数值,如果不接受任何值,则返回None
  • send_prepare方法,当节点想要向系统中的其他节点发送"prepare"消息时,调用该方法。
  • receive_prepare_request方法,当一个节点从另一个节点接收到"prepare"消息时,调用该方法。如果"prepare"消息携带的整数大于该节点接受的整数,则该节点更新其接受的整数,并向系统中其他节点发送"prepare"消息。
  • receive_prepare方法,当节点接收到对"prepare"消息的响应时,调用该方法,返回可接受的整数值。
  • send_accept方法,当节点想要向系统中的其他节点发送"accept"消息时,调用该方法。
  • receive_accept_request方法,当节点接收到来自另一个节点的"accept"消息时,调用该方法。如果"accept"消息携带的整数大于或等于该节点已接受的整数,则该节点更新其接受的整数值,并向系统中其他节点发送"accepted"消息。
  • send_accepted方法,当节点想要向系统中其他节点发送"accepted”消息时,调用该方法。
  • receive_accepted方法,当节点从另一个节点接收到"accepted"消息时,调用该方法。如果"accepted"消息的整数与节点已接受的整数相同,则节点使用"accepted"消息中的值更新其建议值。

优点

  • 所有节点都同意系统的状态
  • 可以容忍某些节点的故障,即使某些节点发生故障,系统也可以继续运行

缺点

  • 算法比较复杂,难以实现
  • 算法执行较慢,可能导致系统延迟增加
  • 算法需要在节点之间通信,增加了网络带宽和处理能力方面的开销

适用场景

  • 确保交易一致和准确的金融系统
  • 确保所有节点具有相同供应链视图的供应链管理系统

14. 时间戳排序(Timestamp Ordering)
图片来源: https://www.geeksforgeeks.org/multiversion-timestamp-ordering/
图片来源: https://www.geeksforgeeks.org/multiversion-timestamp-ordering/
  • 一种用于在分布式系统中对事务排序的共识算法。
  • 每个事务被分配一个时间戳,并且事务按照其时间戳的顺序执行。
  • 涉及如下步骤:
    1. 每个节点为接收到的事务生成唯一的时间戳,可以用全局时钟生成时间戳,也可以使用带有某种同步机制的本地时钟生成时间戳。
    2. 时间戳由(T, N)组成,其中T为时间戳值,N为生成时间戳的节点标识符。
    3. 当节点接收到新事务时,会根据之前接收到的所有事务的时间戳检查该事务的时间戳。如果新事务的时间戳比之前接收到的事务的时间戳都早,则立即执行。如果新事务的时间戳比以前接收到的事务的时间戳都要新,那么将被延迟,直到所有旧事务都执行完为止。
    4. 如果两个事务具有相同的时间戳,则使用tie-breaking机制来解决冲突。一种可能的tie-breaking机制是使用节点标识符作为判断标准,首先执行具有较低节点标识符的事务。
    5. 一旦事务被执行,结果就会传播到所有其他节点。
    6. 如果某个节点发生故障或断开网络连接,则一旦它重新加入网络,它的事务可以由另一个节点重新执行。事务可以按照时间戳顺序执行,以确保系统状态保持一致。
from typing import List, Tuple

class Timestamp:
    def __init__(self, node_id: int):
        self.node_id = node_id
        self.counter = 0

    def increment(self):
        self.counter += 1

    def __str__(self):
        return f"{self.node_id}:{self.counter}"

class Event:
    def __init__(self, node_id: int, timestamp: Timestamp, data: str):
        self.node_id = node_id
        self.timestamp = timestamp
        self.data = data

    def __str__(self):
        return f"{self.node_id} {self.timestamp} {self.data}"

class Network:
    def __init__(self, nodes: List[int]):
        self.nodes = nodes
        self.message_queues = {node: [] for node in nodes}

    def send(self, sender: int, receiver: int, message: str):
        self.message_queues[receiver].append((sender, message))

    def receive(self, node_id: int) -> Tuple[int, str]:
        if len(self.message_queues[node_id]) > 0:
            return self.message_queues[node_id].pop(0)
        else:
            return None

class Node:
    def __init__(self, node_id: int, network: Network, initial_data: List[str]):
        self.node_id = node_id
        self.network = network
        self.clock = Timestamp(node_id)
        self.queue = []
        for data in initial_data:
            self.queue.append(Event(node_id, self.clock, data))
            self.clock.increment()

    def run(self):
        while True:
            event = self.queue.pop(0)
            print(f"Node {self.node_id} executing event {event}")
            self.clock = max(self.clock, event.timestamp)  # Update local clock

示例代码

  • 实现分布式系统的时间戳排序算法
  • 定义了几个类和方法来实现时间戳排序算法的模拟
  • Timestamp类表示时间戳,由node_idcounter组成
  • Event类表示分布式系统中的事件。事件由node_id(生成事件的节点ID)、timestamp(事件的时间戳)和data(事件的有效负载)组成。
  • Network类表示连接分布式系统中节点的网络
  • Node类代表节点,由node_id(节点ID)、network(所连接的网络)、clock(本地时间戳)和queue(节点已经生成并等待执行的事件列表)组成。
  • increment()方法增加时间戳的计数器值
  • __str__()方法以"node_id:counter"的格式返回时间戳的字符串表示形式。
  • __str__()方法以"node_id:timestamp data"的格式返回事件的字符串表示形式。
  • send()方法将消息从一个节点发送到另一个节点
  • receive()方法接收给定节点队列中的下一条消息,如果队列中没有消息,则返回None
  • run()方法是节点的主循环,实现为无限循环,按时间戳顺序执行队列中的事件。当某个事件被执行时,节点将其本地时钟更新为其当前时间戳和被执行事件时间戳的最大值。
  • nodes属性是系统中节点的id列表
  • message_queues属性将每个节点ID映射到字典

优点

  • 所有事件都是有序的,并且在系统中所有节点上以一致的顺序发生
  • 解决并发事件之间的冲突

缺点

  • 过程复杂且资源密集
  • 时钟同步、一致性等问题

适用场景

  • 金融系统: 交易按照正确顺序进行处理,而不管来自哪个节点
  • 供应链管理系统: 跟踪货物流动,确保所有事件都按照正确的顺序处理

15. 乐观并发控制(Optimistic Concurrency Control)
  • 一种在数据库管理系统(DBMS)中用于处理并发访问数据的技术
  • 主要没有冲突,可以允许多个事务并发修改相同的数据项
  • 涉及如下步骤:
    • 开始事务(T1) —— 当事务开始时,读取需要的数据库记录,并将读取的版本记录在其私有工作区中。
    • 修改数据 —— 当事务修改数据时,将修改记录在其私有工作区中,而不更新实际的数据库记录。
    • 事务结束(T1) —— 当事务准备提交时,检查是否有其他事务在读取数据后修改了相同的数据。另外,将其私有工作区中的数据版本与数据库中数据的当前版本进行比较。
    • 验证检查 —— 如果数据库中数据的当前版本与T1读取的版本相同,则T1可以将其更改提交到数据库。但是,如果数据的当前版本与T1读取的版本不同,则意味着T1的更改与另一个事务的更改发生了冲突。
    • 回滚 —— 如果T1的更改与另一个事务的更改发生了冲突,那么T1必须中止并回滚其更改。T1可以在延迟后重试事务或采取其他适当操作。
    • 提交 —— 如果T1的更改不与任何其他事务的更改冲突,那么T1可以将其更改提交到数据库,用其私有工作区中所做的修改来更新数据库记录。
from typing import List

class Account:
    def __init__(self, id: int, balance: float):
        self.id = id
        self.balance = balance
        self.version = 0

    def withdraw(self, amount: float):
        self.balance -= amount
        self.version += 1

    def deposit(self, amount: float):
        self.balance += amount
        self.version += 1

class OptimisticConcurrencyControl:
    def __init__(self, accounts: List[Account]):
        self.accounts = accounts

    def transfer(self, sender_id: int, receiver_id: int, amount: float):
        # Find sender and receiver accounts
        sender = next(acc for acc in self.accounts if acc.id == sender_id)
        receiver = next(acc for acc in self.accounts if acc.id == receiver_id)

        # Create copies of the accounts to modify
        sender_copy = Account(sender.id, sender.balance)
        receiver_copy = Account(receiver.id, receiver.balance)

        # Withdraw from sender and deposit to receiver
        sender_copy.withdraw(amount)
        receiver_copy.deposit(amount)

        # Update the global accounts list if there are no conflicts
        for i, acc in enumerate(self.accounts):
            if acc.id == sender_id:
                if acc.version != sender.version:
                    raise Exception("Optimistic Concurrency Control failed")
                self.accounts[i] = sender_copy
            elif acc.id == receiver_id:
                if acc.version != receiver.version:
                    raise Exception("Optimistic Concurrency Control failed")
                self.accounts[i] = receiver_copy

示例代码

  • Account类表示包含ID、余额和版本号的银行帐户,版本号用于跟踪帐户更新的次数。
  • OptimisticConcurrencyControl类将帐户列表作为输入。
  • transfer方法的输入为发送方和接收方账户id以及要转账的金额。
  • withdrawdeposit方法,修改帐户余额并增加版本号。
  • 将全局帐号列表中帐号的版本号与发送方、接收方帐号的版本号进行比较,检查更新帐号是否存在冲突,如果存在冲突就会引发异常。如果没有冲突,用修改后的帐户更新全局帐户列表。在发生冲突的情况下,可以重试事务,或者通知用户手动解决冲突。

优点

  • 不需要锁或复杂的数据结构
  • 事务可以同时修改相同的数据,因此可以支持高并发性
  • 事务不需要等待锁被释放

缺点

  • 如果多个事务频繁修改相同的数据项,将造成大量修改失败并回滚
  • 需要额外的处理开销来比较数据版本并检查冲突
  • 不提供任何关于事务提交顺序的保证

适用场景

  • 电子商务应用 —— 大量用户同时访问同一个数据库
  • 银行和金融应用 —— 大量交易同时发生

挑战

  • 不正确的实现将导致数据不一致
  • 需要处理冲突和回滚的机制,会增加应用程序的复杂性

参考文献

What is a consensus algorithm?

Consensus Algorithms in Blockchain

How Many Consensus Algorithms Are There? An Overview

Analysis of the Blockchain Consensus Algorithms

Consensus Algorithms Distributed Systems

Multiversion Timestamp Ordering

DBMS Timestamp Ordering Protocol

Timestamp-based Concurrency Control

Timestamp Ordering Protocol in DBMS

Timestamp-based Ordering Protocol in DBMS

What is an optimistic concurrency control in DBMS

Optimistic vs Pessimistic Concurrency: What Every Developer Should Know

Dealing with Optimistic Concurrency Control Collisions


你好,我是俞凡,在Motorola做过研发,现在在Mavenir做技术工作,对通信、网络、后端架构、云原生、DevOps、CICD、区块链、AI等技术始终保持着浓厚的兴趣,平时喜欢阅读、思考,相信持续学习、终身成长,欢迎一起交流学习。
微信公众号:DeepNoMind

- END -

分类:

后端

标签:

数据结构与算法

作者介绍

yuff100
V1

俞凡,公众号DeepNoMind