QoS 服务质量管理¶
概述¶
mac.qos 模块实现了标准 6.5 节定义的数据传输质量控制, 包含 ARQ 重传管理、
HARQ 混合自动重传、流控、链路质量跟踪和优先级发送队列。
基本用法¶
创建 QoS 管理器¶
from nearlink_sdr.mac.qos import Priority, QosManager
mgr = QosManager()
提交数据并发送¶
mgr.submit_data(b"\x01\x02\x03", priority=Priority.NORMAL)
decision, _item = mgr.prepare_tx()
# decision == TxDecision.NEW_DATA, _item 包含待发送数据
处理 ACK/NACK 反馈¶
from nearlink_sdr.mac.qos import TxDecision
decision = mgr.on_tx_feedback(crc_ok=True)
if decision == TxDecision.NEW_DATA:
# 对端确认, 可发送下一帧
pass
获取控制信息字段¶
fields = mgr.get_ctrl_fields()
# fields["tx_sn"], fields["rx_sn"], fields["flow_ctrl"], etc.
ARQ 序列号管理¶
ArqState 管理不同帧类型的发送/接收序列号:
帧类型 |
SN 位宽 |
取值范围 |
|---|---|---|
FT1 |
1 bit |
0-1 |
FT2 |
1 bit |
0-1 |
FT3/FT4 |
5 bit |
0-31 |
异步链路默认无限重传, 同步链路可设置 max_retransmit 限制。
from nearlink_sdr.mac.qos import ArqState, LinkType
arq = ArqState(frame_type=3, link_type=LinkType.SYNC, max_retransmit=5)
HARQ 反馈¶
HarqController 支持两种反馈模式:
TB 模式: 整个传输块 ACK/NACK,
harq_feedback字段为期望 SNCBG 模式: 按编码块组反馈,
harq_feedback每 bit 对应一个 CBG
from nearlink_sdr.mac.qos import FeedbackMode, HarqController
harq = HarqController(feedback_mode=FeedbackMode.CBG, n_cbg=4)
harq.update_cbg_status([True, False, True, False])
mask = harq.get_retransmit_cbg_mask() # 返回位掩码, 每个 bit 对应一个 CBG
链路质量跟踪与 AMC¶
LinkQualityTracker 通过滑动窗口统计 FER, 建议 MCS 调整:
from nearlink_sdr.mac.qos import LinkQualityTracker
tracker = LinkQualityTracker(window_size=32)
frame_results = [True] * 28 + [False] * 4 # 模拟 32 帧结果
for crc_ok in frame_results:
tracker.record(crc_ok)
adj = tracker.suggest_mcs_adjustment()
# +1: 建议提升 MCS, -1: 建议降低, 0: 保持
new_mcs = tracker.apply_suggestion()
流控¶
FlowController 基于高/低水位机制实现背压:
from nearlink_sdr.mac.qos import FlowController
fc = FlowController(buffer_high_watermark=16, buffer_low_watermark=4)
fc.enqueue(10)
print(fc.flow_ctrl_bit) # 1 (有后续数据)
print(fc.is_paused) # False
优先级发送队列¶
TxQueue 提供 5 级优先级 (CONTROL > REALTIME > HIGH > NORMAL > LOW),
重传数据在同优先级内优先出队:
from nearlink_sdr.mac.qos import Priority, TxQueue, TxQueueItem
q = TxQueue(max_size=64)
q.push(TxQueueItem(priority=Priority.NORMAL, data=b"hello"))
q.push(TxQueueItem(priority=Priority.CONTROL, data=b"ctrl"))
item = q.pop() # 返回 CONTROL 优先级的 ctrl
运行示例¶
make examples