"""MAC 层服务质量管理 -- TXS-10002-2025 标准 6.5/7.2
实现异步/同步数据链路的数据传输质量控制:
- ARQ 自动重传请求: 序列号管理、ACK/NACK 反馈、重传决策 (6.5.1)
- HARQ 混合自动重传: 基于 TB 和 CBG 的反馈模式 (6.5.1.1)
- 流控: 发送缓冲区管理与流控信令 (控制信息 flow_ctrl 字段)
- 链路质量跟踪: MCS 反馈、FER 估计、AMC 建议 (控制信息 LQI 字段)
- 发送队列: 优先级分级与 PDU 管理
"""
from __future__ import annotations
__all__ = [
"MAX_CBG",
"MAX_SN_FT1",
"MAX_SN_FT2",
"MAX_SN_FT34",
"ArqState",
"FeedbackMode",
"FlowController",
"HarqController",
"LinkQualityTracker",
"LinkType",
"Priority",
"QosManager",
"TxDecision",
"TxQueue",
"TxQueueItem",
]
from collections import deque
from dataclasses import dataclass, field
from enum import IntEnum, auto
# ---------------------------------------------------------------------------
# 常量
# ---------------------------------------------------------------------------
MAX_SN_FT1 = 2 # FT1: 1-bit SN, 取值 0/1
MAX_SN_FT2 = 2 # FT2 异步模式 A2: 1-bit SN
MAX_SN_FT34 = 32 # FT3/FT4 异步模式 A3: 5-bit SN
MAX_CBG = 8 # 最大编码块组数
[文档]
class FeedbackMode(IntEnum):
"""HARQ 反馈模式。"""
TB = 0 # 基于传输块整体
CBG = 1 # 基于编码块组
[文档]
class LinkType(IntEnum):
"""数据链路类型。"""
ASYNC = 0 # 异步数据链路 (可靠传输)
SYNC = 1 # 同步数据链路 (非可靠传输)
[文档]
class TxDecision(IntEnum):
"""发送决策。"""
NEW_DATA = auto() # 发送新数据
RETRANSMIT = auto() # 重传旧数据
RETRANSMIT_CBG = auto() # 部分 CBG 重传 (MCS=15)
EMPTY = auto() # 发送空包
[文档]
class Priority(IntEnum):
"""数据传输优先级 (数值越小优先级越高)。"""
CONTROL = 0 # 控制面信令
REALTIME = 1 # 实时数据 (同步等时)
HIGH = 2 # 高优先级数据
NORMAL = 3 # 常规数据
LOW = 4 # 低优先级/后台数据
# ---------------------------------------------------------------------------
# 6.5.1 ARQ 控制器
# ---------------------------------------------------------------------------
[文档]
@dataclass
class ArqState:
"""ARQ 序列号与重传状态。
标准 6.5.1.1 描述的异步数据链路传输流程:
- tx_sn: 当前发送序列号
- expected_rx_sn: 期望接收的对端序列号
- pending_ack: 当前等待 ACK 的数据是否未确认
- retransmit_count: 当前包的重传次数
- max_retransmit: 最大重传次数 (0 = 无限, 异步链路默认)
"""
frame_type: int = 2
link_type: LinkType = LinkType.ASYNC
tx_sn: int = 0
expected_rx_sn: int = 0
pending_ack: bool = False
retransmit_count: int = 0
max_retransmit: int = 0
_last_tx_data: bytes = b""
@property
def sn_modulus(self) -> int:
if self.frame_type == 1:
return MAX_SN_FT1
if self.frame_type == 2:
return MAX_SN_FT2
return MAX_SN_FT34
[文档]
def on_tx_new(self, data: bytes) -> int:
"""发送新数据包, 返回使用的 SN。"""
self._last_tx_data = data
self.pending_ack = True
self.retransmit_count = 0
return self.tx_sn
[文档]
def on_ack_received(self) -> None:
"""收到 ACK: 翻转 SN, 清除重传状态。"""
self.pending_ack = False
self.retransmit_count = 0
self.tx_sn = (self.tx_sn + 1) % self.sn_modulus
[文档]
def on_nack_received(self) -> TxDecision:
"""收到 NACK: 决定是否重传。
异步链路: 持续重传直到收到 ACK。
同步链路: 超出 max_retransmit 后丢弃。
"""
if (
self.link_type == LinkType.SYNC
and self.max_retransmit > 0
and self.retransmit_count >= self.max_retransmit
):
self._discard_pending()
return TxDecision.NEW_DATA
self.retransmit_count += 1
return TxDecision.RETRANSMIT
[文档]
def on_no_feedback(self) -> TxDecision:
"""未收到反馈: 与 NACK 处理相同。"""
return self.on_nack_received()
[文档]
def on_rx_packet(self, rx_sn: int) -> bool:
"""接收数据包, 返回是否为新数据。
若 rx_sn == expected_rx_sn: 新数据, 反馈 ACK。
否则: 重复包, 仍反馈 ACK。
"""
if rx_sn == self.expected_rx_sn:
self.expected_rx_sn = (self.expected_rx_sn + 1) % self.sn_modulus
return True
return False
def _discard_pending(self) -> None:
"""丢弃当前待确认数据 (同步链路超时)。"""
self.pending_ack = False
self.retransmit_count = 0
self._last_tx_data = b""
self.tx_sn = (self.tx_sn + 1) % self.sn_modulus
# ---------------------------------------------------------------------------
# 6.5.1.1 HARQ 控制器
# ---------------------------------------------------------------------------
[文档]
@dataclass
class HarqController:
"""混合自动重传请求控制器。
管理 HARQ 反馈字段 (8 bit) 和 CBG 级部分重传逻辑。
TB 模式: harq_feedback 指示期望对端传输的数据包 SN。
CBG 模式: harq_feedback 每 bit 对应一个 CBG 的 ACK/NACK。
"""
feedback_mode: FeedbackMode = FeedbackMode.TB
n_cbg: int = 1
_cbg_status: list[bool] = field(default_factory=list)
def __post_init__(self) -> None:
if not self._cbg_status:
self._cbg_status = [False] * min(self.n_cbg, MAX_CBG)
[文档]
def encode_harq_feedback(self, ack: bool, peer_sn: int = 0) -> int:
"""编码 HARQ 反馈字段 (8 bit)。
TB 模式: 返回期望对端传输的 SN。
CBG 模式: 返回 CBG ACK/NACK bitmap。
"""
if self.feedback_mode == FeedbackMode.TB:
return peer_sn & 0xFF
# CBG 模式: 每 bit 对应一个 CBG
bitmap = 0
for i, ok in enumerate(self._cbg_status):
if ok:
bitmap |= (1 << i)
return bitmap & 0xFF
[文档]
def decode_harq_feedback(self, harq_field: int) -> list[bool]:
"""解码 HARQ 反馈字段。
TB 模式: 返回 [全部 ACK] 或 [全部 NACK]。
CBG 模式: 返回每个 CBG 的 ACK 状态。
"""
if self.feedback_mode == FeedbackMode.TB:
return [True] * self.n_cbg
return [(harq_field >> i) & 1 == 1 for i in range(self.n_cbg)]
[文档]
def update_cbg_status(self, cbg_ack: list[bool]) -> None:
"""更新 CBG 接收状态。"""
for i in range(min(len(cbg_ack), len(self._cbg_status))):
if cbg_ack[i]:
self._cbg_status[i] = True
[文档]
def needs_cbg_retransmit(self) -> bool:
"""是否需要 CBG 级部分重传。"""
if self.feedback_mode != FeedbackMode.CBG:
return False
return any(self._cbg_status) and not all(self._cbg_status)
[文档]
def get_retransmit_cbg_mask(self) -> int:
"""获取需要重传的 CBG 掩码 (用于 B1 控制信息 data_length 字段)。
标准 6.5.1.1: MCS=15 时, data_length 低 M bit 指示需要传输的 CBG。
"""
mask = 0
for i, ok in enumerate(self._cbg_status):
if not ok:
mask |= (1 << i)
return mask
[文档]
def decide_tx(self, ack_received: bool, arq: ArqState) -> TxDecision:
"""综合 HARQ 反馈决定发送行为。"""
if ack_received:
arq.on_ack_received()
self.reset_cbg()
return TxDecision.NEW_DATA
if self.needs_cbg_retransmit():
arq.retransmit_count += 1
return TxDecision.RETRANSMIT_CBG
return arq.on_nack_received()
[文档]
def reset_cbg(self) -> None:
"""重置 CBG 状态 (新数据传输前)。"""
self._cbg_status = [False] * len(self._cbg_status)
@property
def all_cbg_acked(self) -> bool:
return all(self._cbg_status)
# ---------------------------------------------------------------------------
# 流控管理
# ---------------------------------------------------------------------------
[文档]
@dataclass
class FlowController:
"""发送流控管理器。
根据发送缓冲区状态生成 flow_ctrl 指示字段:
- 0: 没有后续数据
- 1: 有后续数据待发送
同时实现简单的背压机制。
"""
buffer_high_watermark: int = 16
buffer_low_watermark: int = 4
_buffer_count: int = 0
_paused: bool = False
[文档]
def enqueue(self, count: int = 1) -> None:
"""数据入队。"""
self._buffer_count += count
if self._buffer_count >= self.buffer_high_watermark:
self._paused = True
[文档]
def dequeue(self, count: int = 1) -> None:
"""数据出队。"""
self._buffer_count = max(0, self._buffer_count - count)
if self._buffer_count <= self.buffer_low_watermark:
self._paused = False
@property
def flow_ctrl_bit(self) -> int:
"""控制信息 flow_ctrl 字段值。"""
return 1 if self._buffer_count > 0 else 0
@property
def is_paused(self) -> bool:
"""是否处于背压暂停状态。"""
return self._paused
@property
def buffer_count(self) -> int:
return self._buffer_count
# ---------------------------------------------------------------------------
# 链路质量跟踪
# ---------------------------------------------------------------------------
[文档]
@dataclass
class LinkQualityTracker:
"""链路质量跟踪与 AMC 建议。
跟踪最近 N 帧的 CRC 通过率, 计算瞬时 FER,
根据 FER 阈值建议 MCS 调整方向。
标准: 控制信息 LQI 字段 (前 4 bit 为 MCS 指示)。
"""
window_size: int = 32
fer_target_low: float = 0.01
fer_target_high: float = 0.10
_history: deque[bool] = field(default_factory=deque)
_current_mcs: int = 7
@property
def current_mcs(self) -> int:
return self._current_mcs
@current_mcs.setter
def current_mcs(self, value: int) -> None:
self._current_mcs = max(0, min(12, value))
[文档]
def record(self, crc_ok: bool) -> None:
"""记录一帧接收结果。"""
self._history.append(crc_ok)
if len(self._history) > self.window_size:
self._history.popleft()
@property
def fer(self) -> float:
"""当前窗口 FER。"""
if not self._history:
return 0.0
n_fail = sum(1 for ok in self._history if not ok)
return n_fail / len(self._history)
@property
def success_rate(self) -> float:
"""当前窗口成功率。"""
return 1.0 - self.fer
[文档]
def suggest_mcs_adjustment(self) -> int:
"""建议 MCS 调整方向。
:returns: +1: 建议提升 MCS (链路质量好)
-1: 建议降低 MCS (链路质量差)
0: 保持当前 MCS
"""
if len(self._history) < self.window_size // 2:
return 0
if self.fer < self.fer_target_low and self._current_mcs < 12:
return 1
if self.fer > self.fer_target_high and self._current_mcs > 0:
return -1
return 0
[文档]
def apply_suggestion(self) -> int:
"""应用 MCS 调整建议并返回新 MCS。"""
adj = self.suggest_mcs_adjustment()
self.current_mcs = self._current_mcs + adj
return self._current_mcs
[文档]
def encode_lqi(self) -> int:
"""编码 LQI 字段 (8 bit): 前 4 bit MCS, 后 4 bit 预留。"""
return (self._current_mcs & 0x0F) << 4
[文档]
@staticmethod
def decode_lqi(lqi: int) -> int:
"""解码 LQI 字段, 提取 MCS 指示。"""
return (lqi >> 4) & 0x0F
# ---------------------------------------------------------------------------
# 发送队列
# ---------------------------------------------------------------------------
[文档]
@dataclass
class TxQueueItem:
"""发送队列项。"""
priority: Priority
data: bytes
segment_type: int = 0
retransmit: bool = False
attempt: int = 0
[文档]
class TxQueue:
"""优先级发送队列。
高优先级数据优先出队; 同优先级内按FIFO顺序;
重传数据优先于同优先级新数据。
"""
def __init__(self, max_size: int = 64) -> None:
self._queues: dict[Priority, deque[TxQueueItem]] = {
p: deque() for p in Priority
}
self._max_size = max_size
self._total = 0
[文档]
def push(self, item: TxQueueItem) -> bool:
"""入队, 队满时返回 False。"""
if self._total >= self._max_size:
return False
q = self._queues[item.priority]
if item.retransmit:
q.appendleft(item)
else:
q.append(item)
self._total += 1
return True
[文档]
def pop(self) -> TxQueueItem | None:
"""出队最高优先级项。"""
for p in Priority:
q = self._queues[p]
if q:
self._total -= 1
return q.popleft()
return None
[文档]
def peek(self) -> TxQueueItem | None:
"""查看但不移除最高优先级项。"""
for p in Priority:
q = self._queues[p]
if q:
return q[0]
return None
@property
def size(self) -> int:
return self._total
@property
def is_empty(self) -> bool:
return self._total == 0
@property
def is_full(self) -> bool:
return self._total >= self._max_size
[文档]
def count_by_priority(self, priority: Priority) -> int:
"""指定优先级的队列项数量。"""
return len(self._queues[priority])
[文档]
def clear(self) -> None:
"""清空所有队列。"""
for q in self._queues.values():
q.clear()
self._total = 0
# ---------------------------------------------------------------------------
# QoS 管理器 (集成)
# ---------------------------------------------------------------------------
[文档]
@dataclass
class QosManager:
"""QoS 服务质量管理器。
集成 ARQ、HARQ、流控、链路质量跟踪和发送队列,
为 MAC-PHY 接口提供统一的数据传输质量管理。
"""
arq: ArqState = field(default_factory=ArqState)
harq: HarqController = field(default_factory=HarqController)
flow: FlowController = field(default_factory=FlowController)
quality: LinkQualityTracker = field(default_factory=LinkQualityTracker)
tx_queue: TxQueue = field(default_factory=TxQueue)
[文档]
def prepare_tx(self) -> tuple[TxDecision, TxQueueItem | None]:
"""准备下一帧发送。
:returns: (发送决策, 队列项或 None)
"""
if self.arq.pending_ack:
return TxDecision.RETRANSMIT, None
item = self.tx_queue.pop()
if item is None:
return TxDecision.EMPTY, None
self.arq.on_tx_new(item.data)
self.flow.dequeue()
return TxDecision.NEW_DATA, item
[文档]
def on_tx_feedback(self, crc_ok: bool) -> TxDecision:
"""处理来自对端的 ACK/NACK 反馈。"""
self.quality.record(crc_ok)
if crc_ok:
return self.harq.decide_tx(True, self.arq)
return self.harq.decide_tx(False, self.arq)
[文档]
def submit_data(
self,
data: bytes,
priority: Priority = Priority.NORMAL,
) -> bool:
"""提交数据到发送队列。"""
item = TxQueueItem(priority=priority, data=data)
ok = self.tx_queue.push(item)
if ok:
self.flow.enqueue()
return ok
[文档]
def get_ctrl_fields(self) -> dict[str, int]:
"""获取当前控制信息字段值。"""
return {
"tx_sn": self.arq.tx_sn,
"rx_sn": self.arq.expected_rx_sn,
"flow_ctrl": self.flow.flow_ctrl_bit,
"empty_packet": 1 if self.tx_queue.is_empty and not self.arq.pending_ack else 0,
"harq_feedback": self.harq.encode_harq_feedback(
not self.arq.pending_ack, self.arq.tx_sn,
),
}