nearlink_sdr.mac.frame 源代码
"""数据链路层控制面帧结构 -- TXS-10002-2025 标准 7.3.2
控制面帧由数据类型索引 + 数据长度 + 控制面数据组成。
支持信令消息的注册、编码和解码。
"""
from __future__ import annotations
__all__ = [
"AsyncDataFrame",
"ControlFrame",
"MuxFrame",
"SegmentType",
"SyncDataFrame",
]
from dataclasses import dataclass
from enum import IntEnum
[文档]
class SegmentType(IntEnum):
"""分段类型指示 (7.3.3.2)。"""
COMPLETE = 0 # 完整单一分段
FIRST = 1 # SDU 第一个分段
MIDDLE = 2 # SDU 中间分段
LAST = 3 # SDU 最后分段
# ---------------------------------------------------------------------------
# 7.3.2.1 控制面帧通用结构
# ---------------------------------------------------------------------------
[文档]
@dataclass
class ControlFrame:
"""控制面帧: [data_type_index: 2B][data_length: 1B][payload: NB]"""
data_type_index: int # 16 bits (0x0000 - 0xFFFF)
payload: bytes
[文档]
def pack(self) -> bytes:
"""编码为字节流。"""
header = self.data_type_index.to_bytes(2, "big")
length = len(self.payload).to_bytes(1, "big")
return header + length + self.payload
[文档]
@classmethod
def unpack(cls, data: bytes) -> tuple[ControlFrame, int]:
"""从字节流解码, 返回 (帧, 已消费字节数)。"""
if len(data) < 3:
raise ValueError(f"控制面帧头部不足: 需要 3 字节, 实际 {len(data)}")
data_type_index = int.from_bytes(data[0:2], "big")
payload_len = data[2]
total = 3 + payload_len
if len(data) < total:
raise ValueError(f"控制面帧数据不足: 需要 {total} 字节, 实际 {len(data)}")
payload = data[3:total]
return cls(data_type_index, payload), total
# ---------------------------------------------------------------------------
# 7.3.3.2 异步数据链路帧
# ---------------------------------------------------------------------------
[文档]
@dataclass
class AsyncDataFrame:
"""异步数据帧: [分段类型: 2b][数据长度: 11b][预留: 3b][数据: NB]"""
segment_type: int # 2 bits
data: bytes
[文档]
def pack(self) -> bytes:
"""编码为字节流 (不含数据包类型索引前缀)。"""
length = len(self.data)
if length > 2047:
raise ValueError(f"数据长度超过 2047 字节: {length}")
# byte0: reserved(2b) + segment_type(2b) + data_length_high(4b)
# byte1: data_length_low(7b) + reserved(1b)
b0 = ((self.segment_type & 0x03) << 4) | ((length >> 7) & 0x0F)
b1 = ((length & 0x7F) << 1)
return bytes([b0, b1]) + self.data
[文档]
@classmethod
def unpack(cls, data: bytes) -> AsyncDataFrame:
"""从字节流解码 (不含数据包类型索引)。"""
if len(data) < 2:
raise ValueError(f"异步数据帧头部不足: 需要 2 字节, 实际 {len(data)}")
segment_type = (data[0] >> 4) & 0x03
length = ((data[0] & 0x0F) << 7) | ((data[1] >> 1) & 0x7F)
payload = data[2:2 + length]
if len(payload) < length:
raise ValueError(f"异步数据帧数据不足: 需要 {length} 字节, 实际 {len(payload)}")
return cls(segment_type, payload)
# ---------------------------------------------------------------------------
# 7.3.3.3 链接态同步数据链路帧
# ---------------------------------------------------------------------------
[文档]
@dataclass
class SyncDataFrame:
"""链接态同步数据帧 (非周期适配)。"""
pdu_seq: int # 5 bits, PDU 顺序号
event_group: int # 8 bits, 事件组编号
frame_format: int # 1 bit, 0=周期适配, 1=非周期适配
segment_type: int # 2 bits
data: bytes
time_offset: int | None = None # 24 bits, 微秒, 首段/单一分段
sdu_seq: int | None = None # 5 bits, 仅周期适配
[文档]
def pack(self) -> bytes:
"""编码为字节流 (不含数据包类型索引前缀)。"""
length = len(self.data)
if length > 2047:
raise ValueError(f"数据长度超过 2047 字节: {length}")
# byte0: pdu_seq(5b) + reserved(3b)
b0 = (self.pdu_seq & 0x1F) << 3
# byte1: event_group(8b)
b1 = self.event_group & 0xFF
# byte2: frame_format(1b) + segment_type(2b) + data_length_high(5b)
b2 = ((self.frame_format & 1) << 7) | ((self.segment_type & 0x03) << 5) | ((length >> 6) & 0x1F)
# byte3: data_length_low(6b) + reserved(2b)
b3 = ((length & 0x3F) << 2)
result = bytes([b0, b1, b2, b3])
# 时间偏移量 (首段和单一分段, 非周期适配)
if self.frame_format == 1 and self.segment_type in (SegmentType.COMPLETE, SegmentType.FIRST):
offset = self.time_offset if self.time_offset is not None else 0
result += offset.to_bytes(3, "big")
# SDU 顺序号 (周期适配)
if self.frame_format == 0:
seq = self.sdu_seq if self.sdu_seq is not None else 0
result += bytes([(seq & 0x1F) << 3])
result += self.data
return result
[文档]
@classmethod
def unpack(cls, data: bytes) -> SyncDataFrame:
"""从字节流解码 (不含数据包类型索引)。"""
if len(data) < 4:
raise ValueError(f"同步数据帧头部不足: 需要 4 字节, 实际 {len(data)}")
pdu_seq = (data[0] >> 3) & 0x1F
event_group = data[1]
frame_format = (data[2] >> 7) & 1
segment_type = (data[2] >> 5) & 0x03
length = ((data[2] & 0x1F) << 6) | ((data[3] >> 2) & 0x3F)
offset = 4
time_offset = None
sdu_seq = None
if frame_format == 1 and segment_type in (SegmentType.COMPLETE, SegmentType.FIRST):
if len(data) < offset + 3:
raise ValueError("时间偏移量数据不足")
time_offset = int.from_bytes(data[offset:offset + 3], "big")
offset += 3
if frame_format == 0:
if len(data) < offset + 1:
raise ValueError("SDU 顺序号数据不足")
sdu_seq = (data[offset] >> 3) & 0x1F
offset += 1
payload = data[offset:offset + length]
if len(payload) < length:
raise ValueError(f"同步数据帧数据不足: 需要 {length} 字节, 实际 {len(payload)}")
return cls(pdu_seq, event_group, frame_format, segment_type,
payload, time_offset, sdu_seq)
# ---------------------------------------------------------------------------
# 7.3.4 数据面与控制面复用帧
# ---------------------------------------------------------------------------
[文档]
@dataclass
class MuxFrame:
"""数据面与控制面复用帧。
存储一组控制面帧和一个可选数据帧 (异步或同步)。
"""
control_frames: list[ControlFrame]
data_frame: AsyncDataFrame | SyncDataFrame | None = None
[文档]
def pack(self) -> bytes:
"""编码全部帧为字节流。"""
result = bytearray()
for cf in self.control_frames:
result.extend(cf.pack())
if self.data_frame is not None:
result.extend(self.data_frame.pack())
return bytes(result)