"""异步/同步数据链路传输规程 (标准 6.5.1 ~ 6.5.3)。
实现数据链路参数管理、流控决策、同步数据丢弃机制、
事件组集合调度, 以及同步业务适配 (周期/非周期) 的
分段重组逻辑。
"""
from __future__ import annotations
__all__ = [
"AdaptMode",
"AperiodicFragment",
"AperiodicServiceAdaptor",
"AsyncDataLinkParams",
"AsyncFlowControl",
"EventGroupSet",
"PeriodicServiceAdaptor",
"SyncDataDiscard",
"SyncDataLinkParams",
"SyncFlowControl",
"TransmissionMode",
]
import math
from dataclasses import dataclass
from enum import IntEnum
# ---------------------------------------------------------------------------
# 6.5 传输类型枚举
# ---------------------------------------------------------------------------
[文档]
class TransmissionMode(IntEnum):
"""传输模式 (6.5.1.4 ~ 6.5.1.7, 6.5.2.6 ~ 6.5.2.10)。"""
UNICAST = 0
MULTICAST = 1
BIDIRECTIONAL_MULTICAST = 2
FEEDBACK_MULTICAST = 3
BROADCAST_ASYNC = 4
BROADCAST_SYNC = 5
[文档]
class AdaptMode(IntEnum):
"""同步业务适配方式 (6.5.3.1)。"""
PERIODIC = 0
APERIODIC = 1
# ---------------------------------------------------------------------------
# 6.5.1.2 异步数据链路参数
# ---------------------------------------------------------------------------
[文档]
@dataclass
class AsyncDataLinkParams:
"""异步数据链路参数 (6.5.1.2)。"""
event_group_start_us: int = 0
event_group_period_us: int = 0
event_period_us: int = 0
intra_event_interval_us: int = 0
inter_event_interval_us: int = 0
event_group_interval_us: int = 0
first_tx: bool = True
tx_pdu_max: int = 251
rx_pdu_max: int = 251
tx_max_time_us: int = 0
rx_max_time_us: int = 0
tx_time_offset: int = 0
rx_time_offset: int = 0
tx_max_time_offset: int = 0
rx_max_time_offset: int = 0
tx_sdu_max: int = 0
rx_sdu_max: int = 0
mode: TransmissionMode = TransmissionMode.UNICAST
@property
def max_event_length(self) -> int:
"""事件最大长度 = 先发最大偏移 + 后发最大偏移。"""
return self.tx_max_time_offset + self.rx_max_time_offset
# ---------------------------------------------------------------------------
# 6.5.1.3 异步数据链路流控
# ---------------------------------------------------------------------------
[文档]
class AsyncFlowControl:
"""异步数据链路流控决策 (6.5.1.3)。
根据标准规则, 判断先发节点是否应停止在当前事件组
后续事件中发送。
"""
[文档]
def should_stop_unicast_tx(
self,
tx_flow_ctrl: int,
rx_flow_ctrl: int | None,
rx_ack: bool | None,
rx_data_len: int | None,
) -> bool:
"""单播先发节点停止条件 (6.5.1.3)。
条件一: 先发发送 flow_ctrl=0 且收到后发 flow_ctrl=0, ACK, data_len=0
条件二: 先发发送了消息但未收到后发消息 (rx 参数为 None)
"""
if rx_flow_ctrl is None or rx_ack is None or rx_data_len is None:
return True
return tx_flow_ctrl == 0 and rx_flow_ctrl == 0 and rx_ack and rx_data_len == 0
[文档]
def should_stop_multicast_semi_reliable(
self, tx_flow_ctrl: int, any_nack: bool,
) -> bool:
"""半可靠组播组长停止条件。
组长发送 flow=0 且未收到任何 NACK。
"""
return tx_flow_ctrl == 0 and not any_nack
[文档]
def should_stop_multicast_full(
self,
tx_flow_ctrl: int,
member_feedbacks: list[tuple[int, bool, int]],
) -> bool:
"""非半可靠组播/双向组播组长停止条件。
组长发送 flow=0, 收到所有组员 flow=0、ACK、data_len=0。
"""
if tx_flow_ctrl != 0:
return False
for flow, ack, data_len in member_feedbacks:
if flow != 0 or not ack or data_len != 0:
return False
return True
[文档]
def should_stop_feedback_multicast_leader(
self,
tx_flow_ctrl: int,
member_feedbacks: list[tuple[int, bool, int]],
) -> bool:
"""反馈组播组长停止条件, 同 should_stop_multicast_full。"""
return self.should_stop_multicast_full(tx_flow_ctrl, member_feedbacks)
[文档]
def should_stop_feedback_multicast_member(
self, prev_flow_ctrl: int, leader_ack: bool,
) -> bool:
"""反馈组播组员停止条件。
前一事件中组员发送 flow=0, 当前事件收到组长 ACK=成功。
"""
return prev_flow_ctrl == 0 and leader_ack
# ---------------------------------------------------------------------------
# 6.5.2.2 同步数据链路参数
# ---------------------------------------------------------------------------
[文档]
@dataclass
class SyncDataLinkParams:
"""同步数据链路参数 (6.5.2.2)。"""
event_group_start_us: int = 0
event_group_period_us: int = 0
event_period_us: int = 0
intra_event_interval_us: int = 0
inter_event_interval_us: int = 0
event_group_interval_us: int = 0
event_count: int = 1
first_tx: bool = True
tx_pdu_max: int = 251
rx_pdu_max: int = 251
tx_max_time_us: int = 0
rx_max_time_us: int = 0
tx_time_offset: int = 0
rx_time_offset: int = 0
tx_max_time_offset: int = 0
rx_max_time_offset: int = 0
tx_sdu_max: int = 0
rx_sdu_max: int = 0
new_pkt_count: int = 1
discard_period: int = 3
adapt_mode: AdaptMode = AdaptMode.PERIODIC
sync_anchor_delay_us: int = 0
sync_ref_delay_us: int = 0
mode: TransmissionMode = TransmissionMode.UNICAST
@property
def max_event_length(self) -> int:
return self.tx_max_time_offset + self.rx_max_time_offset
# ---------------------------------------------------------------------------
# 6.5.2.4 同步数据链路数据丢弃机制
# ---------------------------------------------------------------------------
[文档]
@dataclass
class SyncDataDiscard:
"""同步数据丢弃机制 (6.5.2.4)。
跟踪发送序列号、本地基准值和有效载荷计数, 支持
主动丢弃和被动丢弃判定。
"""
new_pkt_count: int = 2
discard_period: int = 3
tx_sn: int = 0
event_group_index: int = 0
payload_count: int = 0
_last_ack: bool | None = None
@property
def local_baseline(self) -> int:
"""本地基准值 (6.5.2.4)。"""
if self.event_group_index < self.discard_period:
return 0
return self.new_pkt_count * (
self.event_group_index - self.discard_period + 1
)
[文档]
def on_ack(self) -> None:
"""收到 ACK, 发送序列号 +1。"""
self._last_ack = True
self.tx_sn += 1
self.payload_count += 1
[文档]
def on_nack_or_timeout(self) -> None:
"""收到 NACK 或未收到反馈, 序列号不变。"""
self._last_ack = False
[文档]
def on_event_group_boundary(self) -> None:
"""跨事件组边界, 更新序列号和事件组索引。"""
self.tx_sn = max(0, self.tx_sn - self.new_pkt_count)
self.event_group_index += 1
[文档]
def is_discarded(self, payload_id: int) -> bool:
"""判定 payload_id 是否已被被动丢弃。"""
return payload_id < self.local_baseline
[文档]
def should_active_discard(self, max_retransmit: int) -> bool:
"""主动丢弃判定: 对于 FT3/FT4, 发送端可自主决定。"""
return self._last_ack is False and max_retransmit > 0
[文档]
def active_discard(self) -> None:
"""执行主动丢弃, 翻转序列号发送下一包。"""
self.tx_sn += 1
self.payload_count += 1
# ---------------------------------------------------------------------------
# 6.5.2.3 同步数据链路流控
# ---------------------------------------------------------------------------
[文档]
class SyncFlowControl:
"""同步数据链路流控 (6.5.2.3)。
与异步流控逻辑基本一致, 但同步链路中先发节点在
满足停止条件后直接停止, 后发节点的行为由事件组
内事件总数约束。
"""
[文档]
def should_stop_unicast_tx(
self,
tx_flow_ctrl: int,
rx_flow_ctrl: int | None,
rx_ack: bool | None,
rx_data_len: int | None,
) -> bool:
"""单播先发停止 (6.5.2.3)。"""
if rx_flow_ctrl is None or rx_ack is None or rx_data_len is None:
return False
return (
tx_flow_ctrl == 0
and rx_flow_ctrl == 0
and rx_ack
and rx_data_len == 0
)
[文档]
def should_stop_multicast_semi_reliable(
self, tx_flow_ctrl: int, any_nack: bool,
) -> bool:
return tx_flow_ctrl == 0 and not any_nack
[文档]
def should_stop_multicast_full(
self,
tx_flow_ctrl: int,
member_feedbacks: list[tuple[int, bool, int]],
) -> bool:
if tx_flow_ctrl != 0:
return False
for flow, ack, data_len in member_feedbacks:
if flow != 0 or not ack or data_len != 0:
return False
return True
[文档]
def should_stop_feedback_leader(
self,
tx_flow_ctrl: int,
member_feedbacks: list[tuple[int, bool, int]],
) -> bool:
return self.should_stop_multicast_full(tx_flow_ctrl, member_feedbacks)
[文档]
def should_stop_feedback_member(
self, prev_flow_ctrl: int, leader_ack: bool,
) -> bool:
return prev_flow_ctrl == 0 and leader_ack
# ---------------------------------------------------------------------------
# 6.5.2.5 事件组集合
# ---------------------------------------------------------------------------
[文档]
@dataclass
class EventGroupSet:
"""事件组集合 (6.5.2.5)。
多个具有相同事件组周期的事件组可组合为事件组集合。
"""
event_group_count: int = 1
event_group_spacing_slots: int = 0
event_group_period_us: int = 0
schedule_slot_us: int = 125
[文档]
def event_group_offsets(self) -> list[int]:
"""返回集合中各事件组相对于集合起始的偏移 (μs)。"""
return [
i * self.event_group_spacing_slots * self.schedule_slot_us
for i in range(self.event_group_count)
]
[文档]
def set_duration_us(self) -> int:
"""集合总时长 (μs): 最后一个事件组起始 + 事件组周期。"""
offsets = self.event_group_offsets()
if not offsets:
return 0
return offsets[-1] + self.event_group_period_us
[文档]
def validate(self) -> bool:
"""校验事件组集合的时间资源不能交叠。"""
offsets = self.event_group_offsets()
for i in range(len(offsets) - 1):
if offsets[i] + self.event_group_period_us > offsets[i + 1]:
return False
return True
# ---------------------------------------------------------------------------
# 6.5.3.2 周期适配方法
# ---------------------------------------------------------------------------
[文档]
@dataclass
class PeriodicServiceAdaptor:
"""同步数据链路周期适配 (6.5.3.2)。
将上层 SDU 分段为 PDU, 并按事件组周期进行时间同步。
"""
sdu_max: int = 256
pdu_max: int = 251
event_group_period_us: int = 0
sdu_period_us: int = 0
sync_ref_delay_us: int = 0
sync_anchor_delay_us: int = 0
discard_period: int = 3
@property
def segments_per_sdu(self) -> int:
"""每个 SDU 需要的 PDU 分段数。"""
if self.pdu_max <= 0:
return 1
return math.ceil(self.sdu_max / self.pdu_max)
@property
def sdus_per_event_group(self) -> int:
"""每个事件组周期内传输的 SDU 数量。"""
if self.sdu_period_us <= 0:
return 1
return self.event_group_period_us // self.sdu_period_us
@property
def frames_per_event_group(self) -> int:
"""每个事件组周期内传输的无线帧数。"""
return self.segments_per_sdu * self.sdus_per_event_group
[文档]
def segment_sdu(self, sdu: bytes) -> list[bytes]:
"""将 SDU 分段为 PDU 列表。"""
if not sdu:
return [b""]
segments: list[bytes] = [
sdu[offset: offset + self.pdu_max]
for offset in range(0, len(sdu), self.pdu_max)
]
expected = self.segments_per_sdu
while len(segments) < expected:
segments.append(b"")
return segments
[文档]
def reassemble_sdu(self, pdus: list[bytes]) -> bytes:
"""将 PDU 列表重组为 SDU。"""
return b"".join(pdus)
[文档]
def tx_accept_time(
self, event_group_start_us: int,
) -> int:
"""发送端 t0: 开始接收 SDU 的时刻。"""
sdus_n = self.sdus_per_event_group
t1 = event_group_start_us
return t1 - (sdus_n - 1) * self.sdu_period_us - self.sync_ref_delay_us
[文档]
def rx_deliver_time(
self, event_group_start_us: int,
) -> int:
"""接收端 r1: 向高层递交第一个 SDU 的时刻。"""
return (
event_group_start_us
+ self.sync_anchor_delay_us
+ (self.discard_period - 1) * self.event_group_period_us
)
# ---------------------------------------------------------------------------
# 6.5.3.3 非周期适配方法
# ---------------------------------------------------------------------------
[文档]
@dataclass
class AperiodicFragment:
"""非周期适配分片 (6.5.3.3)。"""
is_first: bool = False
is_last: bool = False
time_offset_us: int = 0
data: bytes = b""
[文档]
class AperiodicServiceAdaptor:
"""同步数据链路非周期适配 (6.5.3.3)。
将 SDU 分片, 每个包含首段的分片携带时间偏移量。
"""
def __init__(self, pdu_max: int = 251, header_size: int = 4) -> None:
self.pdu_max = pdu_max
self.header_size = header_size
[文档]
def fragment_sdu(
self, sdu: bytes, time_offset_us: int = 0,
) -> list[AperiodicFragment]:
"""将 SDU 分片。"""
payload_cap = self.pdu_max - self.header_size
if payload_cap <= 0:
return [AperiodicFragment(
is_first=True, is_last=True,
time_offset_us=time_offset_us, data=sdu,
)]
fragments: list[AperiodicFragment] = []
offset = 0
while offset < len(sdu):
chunk = sdu[offset: offset + payload_cap]
frag = AperiodicFragment(
is_first=(offset == 0),
is_last=(offset + payload_cap >= len(sdu)),
time_offset_us=time_offset_us if offset == 0 else 0,
data=chunk,
)
fragments.append(frag)
offset += payload_cap
if not fragments:
fragments.append(AperiodicFragment(
is_first=True, is_last=True,
time_offset_us=time_offset_us, data=b"",
))
return fragments
[文档]
def reassemble_sdu(
self, fragments: list[AperiodicFragment],
) -> tuple[bytes, int]:
"""从分片重组 SDU, 返回 (sdu, time_offset_us)。"""
time_offset = 0
data_parts: list[bytes] = []
for frag in fragments:
if frag.is_first:
time_offset = frag.time_offset_us
data_parts.append(frag.data)
return b"".join(data_parts), time_offset
[文档]
def rx_deliver_time(
self,
event_group_start_us: int,
sync_anchor_delay_us: int,
discard_period: int,
event_group_period_us: int,
sdu_period_us: int,
sdu_time_offset_us: int,
) -> int:
"""接收端 r1 (6.5.3.3.2)。"""
return (
event_group_start_us
+ sync_anchor_delay_us
+ discard_period * event_group_period_us
+ sdu_period_us
- sdu_time_offset_us
)