跳转至

2.4 迁移分区数据限流

迁移计划描述

假设有迁移计划:

t0, p0: 101, 102 -> 102, 103                 101 -> 103
t0, p1: 102, 103 -> 103, 104                 102 -> 104
t0, p2: 103, 101 -> 104, 101                 103 -> 104
t1, p0: 101, 102, 103 -> 102, 103, 104       101 -> 104

根据迁移计划获取添加 leader 限制的副本

topic partition broker leader
t0 p0 101
t0 p0 102
t0 p1 102
t0 p1 103
t0 p2 103
t0 p2 101
t1 p0 101
t1 p0 102
t1 p0 103

根据迁移计划获取添加 follower 限制的副本

topic partition broker
t0 p0 103
t0 p1 104
t0 p2 104
t1 p0 104

限流最小值计算

  • leader 流量限制:对每个 broker,可以根据重新分配分区方案,获取该 broker 上涉及到的 topic partition leader 写入流量之和,broker 提供的 fetch 请求响应流量必须大于写入流量,因此设置该 broker 的 leader.replication.throttled.rate 大于这个值。
  • follower 流量限制:对每个 broker,可以根据重新分配分区方案,获取该 broker 上新增的 topic partition,进一步获取这些 topic partition 在当前 leader 节点写入流量之和,broker 的 fetch 请求流量必须大于写入流量,因此设置该 broker 的 follower.replication.throttled.rate 大于这个值。
# 当前分区分布
current_assignments = [
    {
        "topic": "t0",
        "partition": 0,
        "replicas": [
            101,
            102
        ]
    },
    {
        "topic": "t0",
        "partition": 1,
        "replicas": [
            102,
            103
        ]
    },
    {
        "topic": "t0",
        "partition": 2,
        "replicas": [
            103,
            101
        ]
    },
    {
        "topic": "t1",
        "partition": 0,
        "replicas": [
            101,
            102,
            103
        ]
    }
]

# 目标分区分布
proposed_assignments = [
    {
        "topic": "t0",
        "partition": 0,
        "replicas": [
            102,
            103
        ]
    },
    {
        "topic": "t0",
        "partition": 1,
        "replicas": [
            103,
            104
        ]
    },
    {
        "topic": "t0",
        "partition": 2,
        "replicas": [
            104,
            101
        ]
    },
    {
        "topic": "t1",
        "partition": 0,
        "replicas": [
            102,
            103,
            104
        ]
    }
]


def get_traffic(broker, topic, partition):
    # 从 bcm 获取流量值
    return 1


# 获取 topic partition leader 当前分布
topic_partition_leader = {}

# 获取当前每个 broker 上的 topic partition 分布
current_distribution = {}

current_distribution_map = {}

for partition_assign in current_assignments:
    topic = partition_assign["topic"]
    partition = partition_assign["partition"]
    replicas = partition_assign["replicas"]

    # topic partition 最优 leader 为 replicas 第一个值
    topic_partition_leader.setdefault(topic, {})[partition] = replicas[0]

    for replica in replicas:
        current_distribution.setdefault(replica, {}).setdefault(topic, []).append(partition)

    current_distribution_map.setdefault(topic, {})[partition] = replicas


# 获取迁移后,每个 broker 上的 topic partition 分布
proposed_distribution = {}

proposed_distribution_map = {}

for partition_assign in proposed_assignments:
    topic = partition_assign["topic"]
    partition = partition_assign["partition"]
    replicas = partition_assign["replicas"]

    for replica in replicas:
        proposed_distribution.setdefault(replica, {}).setdefault(topic, []).append(partition)

    proposed_distribution_map.setdefault(topic, {})[partition] = replicas

# 获取每个 broker 上将要增加的 topic partition
add_partitions = {}

# 获取每个 broker 上将要删除的 topic partition
remove_partitions = {}

for broker in proposed_distribution.keys() | current_distribution.keys():
    current_topic_partitions = current_distribution.get(broker, {})
    proposed_topic_partitions = proposed_distribution.get(broker, {})

    for topic in current_topic_partitions.keys() | proposed_topic_partitions.keys():
        current_partitions = current_topic_partitions.get(topic, [])
        proposed_partitions = proposed_topic_partitions.get(topic, [])

        remove_partition = list(set(current_partitions) - set(proposed_partitions))
        if remove_partition:
            remove_partitions.setdefault(broker, {})[topic] = remove_partition

        add_partition = list(set(proposed_partitions) - set(current_partitions))
        if add_partition:
            add_partitions.setdefault(broker, {})[topic] = add_partition


# 重新分配分区过程中,每个 broker 上的 topic partition 分布
in_progress_distribution = {}

for broker, topic_partitions in current_distribution.items():
    for topic, partitions in topic_partitions.items():
        in_progress_distribution.setdefault(broker, {})[topic] = partitions
        if broker in add_partitions and topic in add_partitions[broker]:
            in_progress_distribution.setdefault(broker, {})[topic].extend(add_partitions[broker][topic])

in_progress_distribution_map = {}

for topic, partitions in current_distribution_map.items():
    for partition, replicas in partitions.items():
        in_progress_distribution_map.setdefault(topic, {})[partition] = replicas
        in_progress_distribution_map.[topic][partition].extend(proposed_distribution_map[topic, partition])

# 记录 topic partition 当前流量
topic_partition_in_traffic = {}

# 聚合每个 broker 上涉及的 topic partition leader 流入流量
broker_leader_in_traffic = {}

# 聚合每个 broker 上涉及的 topic partition leader 流出流量
broker_leader_out_traffic = {}

for broker, topic_partitions in current_distribution.items():
    for topic, partitions in topic_partitions.items():
        # 理想状态下,获取 broker - topioc - partition 的流量 BytesInPerSec
        # for partition in partitions:
        #     bytes_in_per_sec = get_traffic(broker, topic, partition)
        #     if broker_leader_in_traffic[broker] is None:
        #         broker_leader_in_traffic[broker] = 0
        #     broker_leader_in_traffic[broker] += bytes_in_per_sec

        # 但是实际只能获取到 broker - topic 维度的流量 BytesInPerSec
        bytes_in_per_sec = get_traffic(broker, topic, None)
        if not broker in broker_leader_in_traffic:
            broker_leader_in_traffic[broker] = 0
        broker_leader_in_traffic[broker] += bytes_in_per_sec  # broker 上所有 topic 写入流量之和

        # TODO: 要提供多少个同步副本?
        if not broker in broker_leader_out_traffic:
            broker_leader_out_traffic[broker] = 0
        broker_leader_out_traffic[broker] += bytes_in_per_sec  # broker 上所有 topic 写入流量之和

        # 记录当前 topic partition 在 leader 节点流量
        # 问题:
        #   1. 当前节点是否为 partition leader ?其实不是问题,只有是 partition leader 的情况下才能获取到流量,不是的话流量为 0
        #   2. 遍历每个副本,意味着不能简单的求和,理想情况下,只有一个副本有值,其他副本都为 0
        #   3. 如果求历史流量的值,可能发生 leader 切换,可能有多个副本有值,所以这里求最大值
        #   4. 无法取得 partition 级别的流量,只能以 topic 级别代替。如果一个 broker 上有多个 partition leader,会导致得到的分区流量偏大
        if not topic in topic_partition_in_traffic:
            topic_partition_in_traffic[topic] = {}

        for partition in partitions:
            traffic = topic_partition_in_traffic[topic].get(partition, 0)
            topic_partition_in_traffic[topic][partition] = max(bytes_in_per_sec, traffic)



# 计算 follower 需要流量
# 这部分主要看哪些节点有新增的 partition
broker_follower_traffic = {}
for broker, topic_partitions in add_partitions.items():
    for topic, partitions in topic_partitions.items():
        # 在 broker 上增加 topic partition
        # 需要获取这个 topic partition leader 当前的流量

        # 理想状态下,获取 broker - topioc - partition 的流量
        # 但是只能获取到 broker -topic 维度的流量 BytesInPerSec,会使得这里流量偏大
        for partition in partitions:
            leader_bytes_in_per_sec = topic_partition_in_traffic[topic][partition]

            # 分区副本的同步速率大于 leader 的写入速率
            if not broker in broker_follower_traffic:
                broker_follower_traffic[broker] = 0
            broker_follower_traffic[broker] += leader_bytes_in_per_sec


throttle = -1

for broker, traffic in broker_leader_in_traffic.items():
    if traffic > throttle:
        throttle = traffic

for broker, traffic in broker_follower_traffic.items():
    if traffic > throttle:
        throttle = traffic

问题:

  • kafka JMX 不提供 topic partition 级别的流量监控
  • 重分区过程中,发生 leader 切换,可能导致原来设置的流量限制小于该节点 leader 写入速率
  • 分批执行,会使得每个批次需要的实际流量更小
  • 生成重分区方案有随机性,意味着预估依据的方案必须与实际执行的方案一致

迁移耗时预估

def get_log_size(broker, topic, partition):
    # 从 bcm 获取 topic, partition 占用大小
    return 1

# 获取要移动的 topic partition log size
topic_partition_log_size = {}
for broker, topic_partitions in remove_partitions.items():
    for topic, partitions in topic_partitions.items():
        for partition in partitions:
            topic_partition_log_size.setdefault(topic, {})[partition] = get_log_size(broker, topic, partition)

# leader 节点需要提供这些 topic partition 的 fetch 流量
broker_leader_log_size = {}

# follower 节点需要新增这些 topic partition
broker_follower_log_size = {}

for broker, topic_partitions in add_partitions.items():
    for topic, partitions in topic_partitions.items():
        for partition in partitions:
            log_size = topic_partition_log_size[topic][partition]

            if not broker in broker_follower_log_size:
                broker_follower_log_size[broker] = 0
            broker_follower_log_size[broker] += log_size

            # leader 节点需要被复制的 topic partition
            # 这里认为最优 leader 为当前 leader,但实际情况可能不同
            leader = topic_partition_leader[topic][partition]

            if not broker in broker_leader_log_size:
                broker_leader_log_size[leader] = 0
            broker_leader_log_size[leader] += log_size

# 获取迁移耗时
max_duration = 0

# 阈值不能设置为最小值,这里增加 20%
throttle = throttle * 1.2

# leader 耗时
for broker, log_size in broker_leader_log_size.items():
    traffic = broker_leader_in_traffic[broker]
    duration = log_size / (throttle - traffic)
    if duration > max_duration:
        max_duration = duration

# follower 耗时
for broker, log_size in broker_follower_log_size.items():
    traffic = broker_follower_traffic[broker]
    duration = log_size / (throttle - traffic)
    if duration > max_duration:
        max_duration = duration

问题:

  • 这里是限流,但不代表实际的迁移流量和限流一样
  • 分批执行,并行度下降,会导致迁移时长增加