"""链路控制信令 -- TXS-10002-2025 标准 7.3.2.2-7.3.2.32
提供链路建立、参数协商、断开等核心控制面信令的编解码。
"""
from __future__ import annotations
__all__ = [
"AsyncLinkParamRequest",
"AsyncLinkParamResponse",
"AsyncMulticastLinkSetup",
"AsyncMulticastParamExchangeRequest",
"AsyncMulticastParamExchangeResponse",
"AsyncMulticastParamUpdateIndication",
"AsyncMulticastParamUpdateRequest",
"AsyncMulticastReconfig",
"AsyncTTLinkSetup",
"AsyncUnicastUpdate",
"BroadcastHopMap5GUpdate",
"BroadcastHopMapUpdate",
"BroadcastLinkDisconnect",
"BroadcastLinkParamUpdate",
"BroadcastLinkSetup",
"Channel5GStatusIndication",
"ChannelReportConfig",
"ChannelStatusIndication",
"ClockAccuracyRequest",
"ClockAccuracyResponse",
"CoordinateConfig",
"CoordinateReport",
"CoordinateRequest",
"CrcSwitchIndication",
"CrcSwitchRequest",
"DataLengthRequest",
"DataLengthResponse",
"FeatureExchangeRequest",
"FeatureExchangeResponse",
"HopMap5GUpdate",
"HopMapUpdate",
"HopTableUpdate",
"IntervalUpdateIndication",
"IntervalUpdateRequest",
"IntervalUpdateResponse",
"IsochronousLinkSetup",
"IsochronousParamExchangeRequest",
"IsochronousParamExchangeResponse",
"IsochronousParamUpdateIndication",
"IsochronousParamUpdateRequest",
"LinkDisconnect",
"MinAvailableChannels",
"MultiIntervalUpdateIndication",
"MultiIntervalUpdateRequest",
"MultiIntervalUpdateResponse",
"MulticastDisconnect",
"NarrowbandDelayRequest",
"NarrowbandDelayResponse",
"NarrowbandFreqTable24Update",
"NarrowbandFreqTable51Update",
"NarrowbandFreqTable58Update",
"NarrowbandMeasAction",
"NarrowbandMeasCapRequest",
"NarrowbandMeasCapResponse",
"NarrowbandMeasConfig",
"NarrowbandMeasConfigUpdateIndication",
"NarrowbandMeasConfigUpdateRequest",
"NarrowbandMeasReport",
"NarrowbandProxySensingFeedback",
"NarrowbandProxySensingRequest",
"NarrowbandSensingAction",
"NarrowbandSensingCapRequest",
"NarrowbandSensingCapResponse",
"NarrowbandSensingConfig",
"NarrowbandSensingConfigFeedback",
"NarrowbandSensingFeedback",
"NarrowbandSensingReport",
"NarrowbandSensingRequest",
"PhyUpdateIndication",
"PhyUpdateRequest",
"PingRequest",
"PingResponse",
"ResourceReservation",
"ResourceReservationTerminate",
"RoleSwitchRequest",
"SMFParamUpdateIndication",
"SMFParamUpdateRequest",
"SMFSignalingTerminate",
"SMFTimeSlotUpdateRequest",
"SMFTimeSlotUpdateResponse",
"SecurityPauseRequest",
"SecurityPauseResponse",
"SecurityRequest",
"SecurityResponse",
"SecurityStartRequest",
"SecurityStartResponse",
"SensingDeviceStatusReport",
"SignalingReject",
"SystemTimeIndication",
"TimeOffsetIndication",
"TimeoutUpdateRequest",
"UWBMeasAction",
"UWBMeasCapRequest",
"UWBMeasCapResponse",
"UWBMeasConfig",
"UWBMeasConfigFeedback",
"UWBMeasReport",
"UWBProxySensingFeedback",
"UWBProxySensingRequest",
"UWBSensingAction",
"UWBSensingCapRequest",
"UWBSensingCapResponse",
"UWBSensingConfig",
"UWBSensingConfigFeedback",
"UWBSensingProcessFeedback",
"UWBSensingProcessRequest",
"UWBSensingReport",
"UnknownFeatureFeedback",
"VersionExchange",
]
import struct
from dataclasses import dataclass, fields
# ---------------------------------------------------------------------------
# 7.3.2.2 收发间隔更新请求 (0x0000, 8 bits / 1 byte)
# ---------------------------------------------------------------------------
[文档]
@dataclass
class IntervalUpdateRequest:
"""收发间隔更新请求。
字段:
- interval_type: 4 bits, 请求的收发间隔类型 (0-15)
- reserved: 4 bits
"""
interval_type: int # 0-15
DATA_TYPE_INDEX = 0x0000
BYTE_LENGTH = 1
[文档]
def pack(self) -> bytes:
return bytes([(self.interval_type & 0x0F) << 4])
[文档]
@classmethod
def unpack(cls, data: bytes) -> IntervalUpdateRequest:
if len(data) < cls.BYTE_LENGTH:
raise ValueError(f"数据不足: 需要 {cls.BYTE_LENGTH} 字节")
return cls(interval_type=(data[0] >> 4) & 0x0F)
# ---------------------------------------------------------------------------
# 7.3.2.3 收发间隔更新响应 (0x0001, 8 bits / 1 byte)
# ---------------------------------------------------------------------------
[文档]
@dataclass
class IntervalUpdateResponse:
"""收发间隔更新响应。
字段:
- interval_type: 4 bits, 响应的收发间隔类型 (0-15)
- reserved: 4 bits
"""
interval_type: int # 0-15
DATA_TYPE_INDEX = 0x0001
BYTE_LENGTH = 1
[文档]
def pack(self) -> bytes:
return bytes([(self.interval_type & 0x0F) << 4])
[文档]
@classmethod
def unpack(cls, data: bytes) -> IntervalUpdateResponse:
if len(data) < cls.BYTE_LENGTH:
raise ValueError(f"数据不足: 需要 {cls.BYTE_LENGTH} 字节")
return cls(interval_type=(data[0] >> 4) & 0x0F)
# ---------------------------------------------------------------------------
# 7.3.2.4 收发间隔更新指示 (0x0002, 64 bits / 8 bytes)
# ---------------------------------------------------------------------------
[文档]
@dataclass
class IntervalUpdateIndication:
"""收发间隔更新指示。
字段:
- link_id: 24 bits, 逻辑链路标识
- interval_type: 4 bits, 更新后的收发间隔类型
- reserved: 4 bits
- effective_slot: 32 bits, 信令生效时隙号
"""
link_id: int # 1 ~ 2^24-1
interval_type: int # 0-15
effective_slot: int # 0 ~ 2^30-1
DATA_TYPE_INDEX = 0x0002
BYTE_LENGTH = 8
[文档]
def pack(self) -> bytes:
b = self.link_id.to_bytes(3, "big")
b += bytes([(self.interval_type & 0x0F) << 4])
b += self.effective_slot.to_bytes(4, "big")
return b
[文档]
@classmethod
def unpack(cls, data: bytes) -> IntervalUpdateIndication:
if len(data) < cls.BYTE_LENGTH:
raise ValueError(f"数据不足: 需要 {cls.BYTE_LENGTH} 字节")
link_id = int.from_bytes(data[0:3], "big")
interval_type = (data[3] >> 4) & 0x0F
effective_slot = int.from_bytes(data[4:8], "big")
return cls(link_id, interval_type, effective_slot)
# ---------------------------------------------------------------------------
# 7.3.2.5 信令被拒指示 (0x0003, 24 bits / 3 bytes)
# ---------------------------------------------------------------------------
[文档]
@dataclass
class SignalingReject:
"""信令被拒指示。
字段:
- rejected_index: 16 bits, 被拒的数据类型索引
- error_reason: 8 bits, 出错原因
"""
rejected_index: int # 0-65535
error_reason: int # 0-255
DATA_TYPE_INDEX = 0x0003
BYTE_LENGTH = 3
[文档]
def pack(self) -> bytes:
return self.rejected_index.to_bytes(2, "big") + bytes([self.error_reason & 0xFF])
[文档]
@classmethod
def unpack(cls, data: bytes) -> SignalingReject:
if len(data) < cls.BYTE_LENGTH:
raise ValueError(f"数据不足: 需要 {cls.BYTE_LENGTH} 字节")
rejected_index = int.from_bytes(data[0:2], "big")
return cls(rejected_index, data[2])
# ---------------------------------------------------------------------------
# 7.3.2.12 特性交互请求 (0x000A, 80 bits / 10 bytes)
# ---------------------------------------------------------------------------
[文档]
@dataclass
class FeatureExchangeRequest:
"""特性交互请求: 80-bit 特性集位图。"""
feature_set: int # 0 ~ 2^80-1
DATA_TYPE_INDEX = 0x000A
BYTE_LENGTH = 10
[文档]
def pack(self) -> bytes:
return self.feature_set.to_bytes(10, "big")
[文档]
@classmethod
def unpack(cls, data: bytes) -> FeatureExchangeRequest:
if len(data) < cls.BYTE_LENGTH:
raise ValueError(f"数据不足: 需要 {cls.BYTE_LENGTH} 字节")
return cls(int.from_bytes(data[:10], "big"))
# ---------------------------------------------------------------------------
# 7.3.2.13 特性交互响应 (0x000B, 80 bits / 10 bytes)
# ---------------------------------------------------------------------------
[文档]
@dataclass
class FeatureExchangeResponse:
"""特性交互响应: 80-bit 特性集位图。"""
feature_set: int # 0 ~ 2^80-1
DATA_TYPE_INDEX = 0x000B
BYTE_LENGTH = 10
[文档]
def pack(self) -> bytes:
return self.feature_set.to_bytes(10, "big")
[文档]
@classmethod
def unpack(cls, data: bytes) -> FeatureExchangeResponse:
if len(data) < cls.BYTE_LENGTH:
raise ValueError(f"数据不足: 需要 {cls.BYTE_LENGTH} 字节")
return cls(int.from_bytes(data[:10], "big"))
# ---------------------------------------------------------------------------
# 7.3.2.15 版本交互指示 (0x000D, 40 bits / 5 bytes)
# ---------------------------------------------------------------------------
[文档]
@dataclass
class VersionExchange:
"""版本交互指示。
字段:
- spec_version: 8 bits, 规格版本
- company_id: 16 bits, 公司标识符
- sub_version: 16 bits, 子规格版本
"""
spec_version: int # 0-255
company_id: int # 0-65535
sub_version: int # 0-65535
DATA_TYPE_INDEX = 0x000D
BYTE_LENGTH = 5
[文档]
def pack(self) -> bytes:
return (bytes([self.spec_version & 0xFF])
+ self.company_id.to_bytes(2, "big")
+ self.sub_version.to_bytes(2, "big"))
[文档]
@classmethod
def unpack(cls, data: bytes) -> VersionExchange:
if len(data) < cls.BYTE_LENGTH:
raise ValueError(f"数据不足: 需要 {cls.BYTE_LENGTH} 字节")
return cls(data[0], int.from_bytes(data[1:3], "big"),
int.from_bytes(data[3:5], "big"))
# ---------------------------------------------------------------------------
# 7.3.2.16 数据长度请求 (0x000E, 64 bits / 8 bytes)
# ---------------------------------------------------------------------------
[文档]
@dataclass
class DataLengthRequest:
"""数据长度请求 (MTU 协商)。
字段:
- max_rx_bytes: 16 bits, 最大接收字节数 (31-2047)
- max_rx_time: 16 bits, 最大接收时间 (346-65535 us)
- max_tx_bytes: 16 bits, 最大发送字节数 (31-2047)
- max_tx_time: 16 bits, 最大发送时间 (346-65535 us)
"""
max_rx_bytes: int
max_rx_time: int
max_tx_bytes: int
max_tx_time: int
DATA_TYPE_INDEX = 0x000E
BYTE_LENGTH = 8
[文档]
def pack(self) -> bytes:
return struct.pack(">HHHH", self.max_rx_bytes, self.max_rx_time,
self.max_tx_bytes, self.max_tx_time)
[文档]
@classmethod
def unpack(cls, data: bytes) -> DataLengthRequest:
if len(data) < cls.BYTE_LENGTH:
raise ValueError(f"数据不足: 需要 {cls.BYTE_LENGTH} 字节")
vals = struct.unpack(">HHHH", data[:8])
return cls(*vals)
# ---------------------------------------------------------------------------
# 7.3.2.17 数据长度响应 (0x000F, 64 bits / 8 bytes)
# ---------------------------------------------------------------------------
[文档]
@dataclass
class DataLengthResponse:
"""数据长度响应。字段与 DataLengthRequest 相同。"""
max_rx_bytes: int
max_rx_time: int
max_tx_bytes: int
max_tx_time: int
DATA_TYPE_INDEX = 0x000F
BYTE_LENGTH = 8
[文档]
def pack(self) -> bytes:
return struct.pack(">HHHH", self.max_rx_bytes, self.max_rx_time,
self.max_tx_bytes, self.max_tx_time)
[文档]
@classmethod
def unpack(cls, data: bytes) -> DataLengthResponse:
if len(data) < cls.BYTE_LENGTH:
raise ValueError(f"数据不足: 需要 {cls.BYTE_LENGTH} 字节")
vals = struct.unpack(">HHHH", data[:8])
return cls(*vals)
# ---------------------------------------------------------------------------
# 7.3.2.18 信道上报指示 (0x0010, 24 bits / 3 bytes)
# ---------------------------------------------------------------------------
[文档]
@dataclass
class ChannelReportConfig:
"""信道上报指示。
字段:
- enable: 8 bits, 使能信道上报 (0 或 1)
- min_interval: 8 bits, 最小时间间隔 (5-150, 单位 200ms)
- max_delay: 8 bits, 最大时延 (5-150, 单位 200ms)
"""
enable: int # 0 或 1
min_interval: int # 5-150
max_delay: int # 5-150
DATA_TYPE_INDEX = 0x0010
BYTE_LENGTH = 3
[文档]
def pack(self) -> bytes:
return bytes([self.enable & 0xFF, self.min_interval & 0xFF,
self.max_delay & 0xFF])
[文档]
@classmethod
def unpack(cls, data: bytes) -> ChannelReportConfig:
if len(data) < cls.BYTE_LENGTH:
raise ValueError(f"数据不足: 需要 {cls.BYTE_LENGTH} 字节")
return cls(data[0], data[1], data[2])
# ---------------------------------------------------------------------------
# 7.3.2.23 CRC 切换请求 (0x0015, 96 bits / 12 bytes)
# ---------------------------------------------------------------------------
[文档]
@dataclass
class CrcSwitchRequest:
"""CRC 切换请求。
字段:
- link_id: 24 bits, 逻辑链路标识
- tx_crc_type: 1 bit, 先发链路 CRC 类型 (0=CRC24, 1=CRC32)
- rx_crc_type: 1 bit, 后发链路 CRC 类型
- reserved: 6 bits
- tx_crc_init: 32 bits, 先发链路 CRC 初始值
- rx_crc_init: 32 bits, 后发链路 CRC 初始值
"""
link_id: int
tx_crc_type: int # 0=CRC24, 1=CRC32
rx_crc_type: int # 0=CRC24, 1=CRC32
tx_crc_init: int # CRC 初始值
rx_crc_init: int # CRC 初始值
DATA_TYPE_INDEX = 0x0015
BYTE_LENGTH = 12
[文档]
def pack(self) -> bytes:
b = self.link_id.to_bytes(3, "big")
flags = ((self.tx_crc_type & 1) << 7) | ((self.rx_crc_type & 1) << 6)
b += bytes([flags])
b += self.tx_crc_init.to_bytes(4, "big")
b += self.rx_crc_init.to_bytes(4, "big")
return b
[文档]
@classmethod
def unpack(cls, data: bytes) -> CrcSwitchRequest:
if len(data) < cls.BYTE_LENGTH:
raise ValueError(f"数据不足: 需要 {cls.BYTE_LENGTH} 字节")
link_id = int.from_bytes(data[0:3], "big")
tx_crc_type = (data[3] >> 7) & 1
rx_crc_type = (data[3] >> 6) & 1
tx_crc_init = int.from_bytes(data[4:8], "big")
rx_crc_init = int.from_bytes(data[8:12], "big")
return cls(link_id, tx_crc_type, rx_crc_type, tx_crc_init, rx_crc_init)
# ---------------------------------------------------------------------------
# 7.3.2.24 CRC 切换指示 (0x0016, 128 bits / 16 bytes)
# ---------------------------------------------------------------------------
[文档]
@dataclass
class CrcSwitchIndication:
"""CRC 切换指示。增加生效时隙号。"""
link_id: int
tx_crc_type: int
rx_crc_type: int
tx_crc_init: int
rx_crc_init: int
effective_slot: int # 32 bits
DATA_TYPE_INDEX = 0x0016
BYTE_LENGTH = 16
[文档]
def pack(self) -> bytes:
b = self.link_id.to_bytes(3, "big")
flags = ((self.tx_crc_type & 1) << 7) | ((self.rx_crc_type & 1) << 6)
b += bytes([flags])
b += self.tx_crc_init.to_bytes(4, "big")
b += self.rx_crc_init.to_bytes(4, "big")
b += self.effective_slot.to_bytes(4, "big")
return b
[文档]
@classmethod
def unpack(cls, data: bytes) -> CrcSwitchIndication:
if len(data) < cls.BYTE_LENGTH:
raise ValueError(f"数据不足: 需要 {cls.BYTE_LENGTH} 字节")
link_id = int.from_bytes(data[0:3], "big")
tx_crc_type = (data[3] >> 7) & 1
rx_crc_type = (data[3] >> 6) & 1
tx_crc_init = int.from_bytes(data[4:8], "big")
rx_crc_init = int.from_bytes(data[8:12], "big")
effective_slot = int.from_bytes(data[12:16], "big")
return cls(link_id, tx_crc_type, rx_crc_type,
tx_crc_init, rx_crc_init, effective_slot)
# ---------------------------------------------------------------------------
# 7.3.2.30 时钟精度请求 (0x001C, 8 bits / 1 byte)
# ---------------------------------------------------------------------------
[文档]
@dataclass
class ClockAccuracyRequest:
"""时钟精度请求。"""
accuracy: int # 0-255, 枚举
DATA_TYPE_INDEX = 0x001C
BYTE_LENGTH = 1
[文档]
def pack(self) -> bytes:
return bytes([self.accuracy & 0xFF])
[文档]
@classmethod
def unpack(cls, data: bytes) -> ClockAccuracyRequest:
if len(data) < cls.BYTE_LENGTH:
raise ValueError(f"数据不足: 需要 {cls.BYTE_LENGTH} 字节")
return cls(data[0])
# ---------------------------------------------------------------------------
# 7.3.2.31 时钟精度响应 (0x001D, 8 bits / 1 byte)
# ---------------------------------------------------------------------------
[文档]
@dataclass
class ClockAccuracyResponse:
"""时钟精度响应。"""
accuracy: int # 0-255
DATA_TYPE_INDEX = 0x001D
BYTE_LENGTH = 1
[文档]
def pack(self) -> bytes:
return bytes([self.accuracy & 0xFF])
[文档]
@classmethod
def unpack(cls, data: bytes) -> ClockAccuracyResponse:
if len(data) < cls.BYTE_LENGTH:
raise ValueError(f"数据不足: 需要 {cls.BYTE_LENGTH} 字节")
return cls(data[0])
# ---------------------------------------------------------------------------
# 7.3.2.32 链路断开指示 (0x001E, 32 bits / 4 bytes)
# ---------------------------------------------------------------------------
[文档]
@dataclass
class LinkDisconnect:
"""链路断开指示。
字段:
- link_id: 24 bits, 逻辑链路标识
- error_reason: 8 bits, 出错原因
"""
link_id: int # 0 ~ 2^24-1
error_reason: int # 0-255
DATA_TYPE_INDEX = 0x001E
BYTE_LENGTH = 4
[文档]
def pack(self) -> bytes:
return self.link_id.to_bytes(3, "big") + bytes([self.error_reason & 0xFF])
[文档]
@classmethod
def unpack(cls, data: bytes) -> LinkDisconnect:
if len(data) < cls.BYTE_LENGTH:
raise ValueError(f"数据不足: 需要 {cls.BYTE_LENGTH} 字节")
link_id = int.from_bytes(data[0:3], "big")
return cls(link_id, data[3])
# ---------------------------------------------------------------------------
# 7.3.2.6 安全请求 (0x0004, 104 bits / 13 bytes)
# ---------------------------------------------------------------------------
[文档]
@dataclass
class SecurityRequest:
"""安全请求。
字段:
- g_node_iv: 32 bits, G节点初始化向量
- g_node_skd: 64 bits, G节点连接密钥分散器
- enc_indication: 8 bits, 加密和完整性保护指示
"""
g_node_iv: int
g_node_skd: int
enc_indication: int
DATA_TYPE_INDEX = 0x0004
BYTE_LENGTH = 13
[文档]
def pack(self) -> bytes:
return (self.g_node_iv.to_bytes(4, "big")
+ self.g_node_skd.to_bytes(8, "big")
+ bytes([self.enc_indication & 0xFF]))
[文档]
@classmethod
def unpack(cls, data: bytes) -> SecurityRequest:
if len(data) < cls.BYTE_LENGTH:
raise ValueError(f"数据不足: 需要 {cls.BYTE_LENGTH} 字节")
g_iv = int.from_bytes(data[0:4], "big")
g_skd = int.from_bytes(data[4:12], "big")
return cls(g_iv, g_skd, data[12])
# ---------------------------------------------------------------------------
# 7.3.2.7 安全响应 (0x0005, 96 bits / 12 bytes)
# ---------------------------------------------------------------------------
[文档]
@dataclass
class SecurityResponse:
"""安全响应。
字段:
- t_node_iv: 32 bits, T节点初始化向量
- t_node_skd: 64 bits, T节点连接密钥分散器
"""
t_node_iv: int
t_node_skd: int
DATA_TYPE_INDEX = 0x0005
BYTE_LENGTH = 12
[文档]
def pack(self) -> bytes:
return (self.t_node_iv.to_bytes(4, "big")
+ self.t_node_skd.to_bytes(8, "big"))
[文档]
@classmethod
def unpack(cls, data: bytes) -> SecurityResponse:
if len(data) < cls.BYTE_LENGTH:
raise ValueError(f"数据不足: 需要 {cls.BYTE_LENGTH} 字节")
return cls(int.from_bytes(data[0:4], "big"),
int.from_bytes(data[4:12], "big"))
# ---------------------------------------------------------------------------
# 7.3.2.8 安全启动请求 (0x0006, 0 bytes)
# ---------------------------------------------------------------------------
[文档]
@dataclass
class SecurityStartRequest:
"""安全启动请求 (无载荷)。"""
DATA_TYPE_INDEX = 0x0006
BYTE_LENGTH = 0
[文档]
def pack(self) -> bytes:
return b""
[文档]
@classmethod
def unpack(cls, data: bytes) -> SecurityStartRequest:
return cls()
# ---------------------------------------------------------------------------
# 7.3.2.9 安全启动响应 (0x0007, 8 bits / 1 byte)
# ---------------------------------------------------------------------------
[文档]
@dataclass
class SecurityStartResponse:
"""安全启动响应。
字段:
- enc_indication: 8 bits, 加密和完整性保护指示
"""
enc_indication: int
DATA_TYPE_INDEX = 0x0007
BYTE_LENGTH = 1
[文档]
def pack(self) -> bytes:
return bytes([self.enc_indication & 0xFF])
[文档]
@classmethod
def unpack(cls, data: bytes) -> SecurityStartResponse:
if len(data) < cls.BYTE_LENGTH:
raise ValueError(f"数据不足: 需要 {cls.BYTE_LENGTH} 字节")
return cls(data[0])
# ---------------------------------------------------------------------------
# 7.3.2.10 安全暂停请求 (0x0008, 0 bytes)
# ---------------------------------------------------------------------------
[文档]
@dataclass
class SecurityPauseRequest:
"""安全暂停请求 (无载荷)。"""
DATA_TYPE_INDEX = 0x0008
BYTE_LENGTH = 0
[文档]
def pack(self) -> bytes:
return b""
[文档]
@classmethod
def unpack(cls, data: bytes) -> SecurityPauseRequest:
return cls()
# ---------------------------------------------------------------------------
# 7.3.2.11 安全暂停响应 (0x0009, 0 bytes)
# ---------------------------------------------------------------------------
[文档]
@dataclass
class SecurityPauseResponse:
"""安全暂停响应 (无载荷)。"""
DATA_TYPE_INDEX = 0x0009
BYTE_LENGTH = 0
[文档]
def pack(self) -> bytes:
return b""
[文档]
@classmethod
def unpack(cls, data: bytes) -> SecurityPauseResponse:
return cls()
# ---------------------------------------------------------------------------
# 7.3.2.14 未知特性反馈 (0x000C, 16 bits / 2 bytes)
# ---------------------------------------------------------------------------
[文档]
@dataclass
class UnknownFeatureFeedback:
"""未知特性反馈。
字段:
- unknown_type: 16 bits, 收到的未知数据类型索引
"""
unknown_type: int
DATA_TYPE_INDEX = 0x000C
BYTE_LENGTH = 2
[文档]
def pack(self) -> bytes:
return self.unknown_type.to_bytes(2, "big")
[文档]
@classmethod
def unpack(cls, data: bytes) -> UnknownFeatureFeedback:
if len(data) < cls.BYTE_LENGTH:
raise ValueError(f"数据不足: 需要 {cls.BYTE_LENGTH} 字节")
return cls(int.from_bytes(data[:2], "big"))
# ---------------------------------------------------------------------------
# 7.3.2.19 信道状态指示 (0x0011, 160 bits / 20 bytes)
# ---------------------------------------------------------------------------
[文档]
@dataclass
class ChannelStatusIndication:
"""信道状态指示。
字段:
- channel_map: 160 bits, 每信道 2 bit 质量指示 (0=未知, 1=好, 3=差)
"""
channel_map: bytes # 20 bytes
DATA_TYPE_INDEX = 0x0011
BYTE_LENGTH = 20
[文档]
def pack(self) -> bytes:
return self.channel_map[:20].ljust(20, b"\x00")
[文档]
@classmethod
def unpack(cls, data: bytes) -> ChannelStatusIndication:
if len(data) < cls.BYTE_LENGTH:
raise ValueError(f"数据不足: 需要 {cls.BYTE_LENGTH} 字节")
return cls(bytes(data[:20]))
# ---------------------------------------------------------------------------
# 7.3.2.20 跳频表更新指示 (0x0012, 可变长度)
# ---------------------------------------------------------------------------
[文档]
@dataclass
class HopTableUpdate:
"""跳频表更新指示 (系统管理帧)。
字段:
- effective_slot: 32 bits, 信令生效时隙号
- channel_count: 8 bits, 跳频表频点个数
- channel_table: 可变, 跳频表频点列表
"""
effective_slot: int
channel_count: int
channel_table: bytes
DATA_TYPE_INDEX = 0x0012
BYTE_LENGTH = 5 # 最小长度
[文档]
def pack(self) -> bytes:
return (self.effective_slot.to_bytes(4, "big")
+ bytes([self.channel_count & 0xFF])
+ self.channel_table)
[文档]
@classmethod
def unpack(cls, data: bytes) -> HopTableUpdate:
if len(data) < 5:
raise ValueError("数据不足: 至少需要 5 字节")
slot = int.from_bytes(data[0:4], "big")
count = data[4]
table = bytes(data[5:5 + count])
return cls(slot, count, table)
# ---------------------------------------------------------------------------
# 7.3.2.21 跳频地图更新指示 (0x0013, 112 bits / 14 bytes)
# ---------------------------------------------------------------------------
[文档]
@dataclass
class HopMapUpdate:
"""跳频地图更新指示 (数据链路)。
字段:
- hop_map: 80 bits, 跳频地图位图
- effective_slot: 32 bits, 信令生效时隙号
"""
hop_map: bytes # 10 bytes
effective_slot: int
DATA_TYPE_INDEX = 0x0013
BYTE_LENGTH = 14
[文档]
def pack(self) -> bytes:
return self.hop_map[:10].ljust(10, b"\x00") + self.effective_slot.to_bytes(4, "big")
[文档]
@classmethod
def unpack(cls, data: bytes) -> HopMapUpdate:
if len(data) < cls.BYTE_LENGTH:
raise ValueError(f"数据不足: 需要 {cls.BYTE_LENGTH} 字节")
return cls(bytes(data[:10]), int.from_bytes(data[10:14], "big"))
# ---------------------------------------------------------------------------
# 7.3.2.22 最少可用信道指示 (0x0014, 16 bits / 2 bytes)
# ---------------------------------------------------------------------------
[文档]
@dataclass
class MinAvailableChannels:
"""最少可用信道指示。
字段:
- frame_type: 4 bits, 无线帧类型
- bandwidth: 2 bits, 带宽指示
- pilot_density: 2 bits, 导频密度指示
- min_channels: 8 bits, 最小信道数 (2-76)
"""
frame_type: int
bandwidth: int
pilot_density: int
min_channels: int
DATA_TYPE_INDEX = 0x0014
BYTE_LENGTH = 2
[文档]
def pack(self) -> bytes:
b0 = ((self.frame_type & 0x0F) << 4
| (self.bandwidth & 0x03) << 2
| (self.pilot_density & 0x03))
return bytes([b0, self.min_channels & 0xFF])
[文档]
@classmethod
def unpack(cls, data: bytes) -> MinAvailableChannels:
if len(data) < cls.BYTE_LENGTH:
raise ValueError(f"数据不足: 需要 {cls.BYTE_LENGTH} 字节")
ft = (data[0] >> 4) & 0x0F
bw = (data[0] >> 2) & 0x03
pd = data[0] & 0x03
return cls(ft, bw, pd, data[1])
# ---------------------------------------------------------------------------
# 7.3.2.25 物理层更新请求 (0x0017, 32 bits / 4 bytes)
# ---------------------------------------------------------------------------
[文档]
@dataclass
class PhyUpdateRequest:
"""物理层更新请求。
字段:
- tx_frame_type: 4 bits, 先发链路无线帧类型
- rx_frame_type: 4 bits, 后发链路无线帧类型
- tx_bandwidth: 2 bits, 先发链路带宽
- rx_bandwidth: 2 bits, 后发链路带宽
- tx_pilot_density: 2 bits, 先发链路导频密度
- rx_pilot_density: 2 bits, 后发链路导频密度
- tx_feedback_type: 6 bits, 先发链路反馈类型
- rx_feedback_type: 3 bits, 后发链路反馈类型
- reserved: 7 bits
"""
tx_frame_type: int
rx_frame_type: int
tx_bandwidth: int
rx_bandwidth: int
tx_pilot_density: int
rx_pilot_density: int
tx_feedback_type: int
rx_feedback_type: int
DATA_TYPE_INDEX = 0x0017
BYTE_LENGTH = 4
[文档]
def pack(self) -> bytes:
b0 = ((self.tx_frame_type & 0x0F) << 4) | (self.rx_frame_type & 0x0F)
b1 = ((self.tx_bandwidth & 0x03) << 6
| (self.rx_bandwidth & 0x03) << 4
| (self.tx_pilot_density & 0x03) << 2
| (self.rx_pilot_density & 0x03))
b2 = ((self.tx_feedback_type & 0x3F) << 2
| (self.rx_feedback_type >> 1) & 0x03)
b3 = ((self.rx_feedback_type & 0x01) << 7)
return bytes([b0, b1, b2, b3])
[文档]
@classmethod
def unpack(cls, data: bytes) -> PhyUpdateRequest:
if len(data) < cls.BYTE_LENGTH:
raise ValueError(f"数据不足: 需要 {cls.BYTE_LENGTH} 字节")
tx_ft = (data[0] >> 4) & 0x0F
rx_ft = data[0] & 0x0F
tx_bw = (data[1] >> 6) & 0x03
rx_bw = (data[1] >> 4) & 0x03
tx_pd = (data[1] >> 2) & 0x03
rx_pd = data[1] & 0x03
tx_fb = (data[2] >> 2) & 0x3F
rx_fb = ((data[2] & 0x03) << 1) | ((data[3] >> 7) & 0x01)
return cls(tx_ft, rx_ft, tx_bw, rx_bw, tx_pd, rx_pd, tx_fb, rx_fb)
# ---------------------------------------------------------------------------
# 7.3.2.26 物理层更新指示 (0x0018, 64 bits / 8 bytes)
# ---------------------------------------------------------------------------
[文档]
@dataclass
class PhyUpdateIndication:
"""物理层更新指示 (含生效时隙)。"""
tx_frame_type: int
rx_frame_type: int
tx_bandwidth: int
rx_bandwidth: int
tx_pilot_density: int
rx_pilot_density: int
tx_feedback_type: int
rx_feedback_type: int
effective_slot: int
DATA_TYPE_INDEX = 0x0018
BYTE_LENGTH = 8
[文档]
def pack(self) -> bytes:
phy_req = PhyUpdateRequest(
self.tx_frame_type, self.rx_frame_type,
self.tx_bandwidth, self.rx_bandwidth,
self.tx_pilot_density, self.rx_pilot_density,
self.tx_feedback_type, self.rx_feedback_type,
)
return phy_req.pack() + self.effective_slot.to_bytes(4, "big")
[文档]
@classmethod
def unpack(cls, data: bytes) -> PhyUpdateIndication:
if len(data) < cls.BYTE_LENGTH:
raise ValueError(f"数据不足: 需要 {cls.BYTE_LENGTH} 字节")
req = PhyUpdateRequest.unpack(data[:4])
slot = int.from_bytes(data[4:8], "big")
return cls(
req.tx_frame_type, req.rx_frame_type,
req.tx_bandwidth, req.rx_bandwidth,
req.tx_pilot_density, req.rx_pilot_density,
req.tx_feedback_type, req.rx_feedback_type, slot,
)
# ---------------------------------------------------------------------------
# 7.3.2.33 异步组播链路参数重配置指示 (0x001F, 248 bits / 31 bytes)
# ---------------------------------------------------------------------------
[文档]
@dataclass
class AsyncMulticastReconfig:
"""异步组播链路参数重配置指示。"""
effective_ref_slot: int # 32 bits
event_group_offset: int # 16 bits
event_group_period: int # 16 bits
event_period: int # 16 bits
delay_period: int # 16 bits
timeout: int # 16 bits
intra_event_interval: int # 16 bits
inter_event_interval: int # 16 bits
event_count: int # 8 bits
payload_count: int # 39 bits
scheduling_slot: int # 3 bits
tx_rx_indication: int # 1 bit
tx_max_pdu: int # 11 bits
rx_max_pdu: int # 11 bits
tx_max_time_offset: int # 9 bits
rx_max_time_offset: int # 9 bits
DATA_TYPE_INDEX = 0x001F
BYTE_LENGTH = 30
[文档]
def pack(self) -> bytes:
buf = self.effective_ref_slot.to_bytes(4, "big")
buf += struct.pack(">HHHHHHHB",
self.event_group_offset, self.event_group_period,
self.event_period, self.delay_period,
self.timeout, self.intra_event_interval,
self.inter_event_interval, self.event_count)
# payload_count(39) + reserved(5) + scheduling_slot(3) +
# tx_rx_indication(1) + tx_max_pdu(11) + rx_max_pdu(11) +
# tx_max_time_offset(9) + rx_max_time_offset(9) = 88 bits = 11 bytes
pc = self.payload_count & 0x7FFFFFFFFF
bits = (pc << 49
| (self.scheduling_slot & 0x07) << 41
| (self.tx_rx_indication & 0x01) << 40
| (self.tx_max_pdu & 0x7FF) << 29
| (self.rx_max_pdu & 0x7FF) << 18
| (self.tx_max_time_offset & 0x1FF) << 9
| (self.rx_max_time_offset & 0x1FF))
buf += bits.to_bytes(11, "big")
return buf
[文档]
@classmethod
def unpack(cls, data: bytes) -> AsyncMulticastReconfig:
if len(data) < cls.BYTE_LENGTH:
raise ValueError(f"数据不足: 需要 {cls.BYTE_LENGTH} 字节")
efs = int.from_bytes(data[0:4], "big")
ego, egp, ep, dp, to, iei, iei2, ec = struct.unpack(
">HHHHHHHB", data[4:19])
tail = int.from_bytes(data[19:30], "big")
pc = (tail >> 49) & 0x7FFFFFFFFF
ss = (tail >> 41) & 0x07
tri = (tail >> 40) & 0x01
tmp = (tail >> 29) & 0x7FF
rmp = (tail >> 18) & 0x7FF
tmto = (tail >> 9) & 0x1FF
rmto = tail & 0x1FF
return cls(efs, ego, egp, ep, dp, to, iei, iei2, ec,
pc, ss, tri, tmp, rmp, tmto, rmto)
# ---------------------------------------------------------------------------
# 7.3.2.34 链接态异步链路参数更新指示 (0x003A, 120 bits / 15 bytes)
# ---------------------------------------------------------------------------
[文档]
@dataclass
class AsyncUnicastUpdate:
"""链接态单播异步链路参数更新指示。
标准定义 120 bits = 15 bytes。timeout 字段不包含在标准序列化中。
"""
effective_ref_slot: int # 32 bits
event_group_offset: int # 16 bits
event_group_period: int # 16 bits
intra_event_interval: int # 16 bits
inter_event_interval: int # 16 bits
delay_period: int # 16 bits
timeout: int = 0 # 不参与序列化
scheduling_slot: int = 0 # 3 bits
tx_rx_indication: int = 0 # 1 bit
DATA_TYPE_INDEX = 0x003A
BYTE_LENGTH = 15
[文档]
def pack(self) -> bytes:
buf = self.effective_ref_slot.to_bytes(4, "big")
buf += struct.pack(">HHHHH",
self.event_group_offset, self.event_group_period,
self.intra_event_interval, self.inter_event_interval,
self.delay_period)
tail = ((self.scheduling_slot & 0x07) << 5
| (self.tx_rx_indication & 0x01) << 4)
buf += bytes([tail])
return buf
[文档]
@classmethod
def unpack(cls, data: bytes) -> AsyncUnicastUpdate:
if len(data) < cls.BYTE_LENGTH:
raise ValueError(f"数据不足: 需要 {cls.BYTE_LENGTH} 字节")
efs = int.from_bytes(data[0:4], "big")
ego = int.from_bytes(data[4:6], "big")
egp = int.from_bytes(data[6:8], "big")
iei = int.from_bytes(data[8:10], "big")
iei2 = int.from_bytes(data[10:12], "big")
dp = int.from_bytes(data[12:14], "big")
tail = data[14]
ss = (tail >> 5) & 0x07
tri = (tail >> 4) & 0x01
return cls(efs, ego, egp, iei, iei2, dp, 0, ss, tri)
# ---------------------------------------------------------------------------
# 7.3.2.45 广播链路断开指示 (0x002A, 40 bits / 5 bytes)
# ---------------------------------------------------------------------------
[文档]
@dataclass
class BroadcastLinkDisconnect:
"""广播链路断开指示。
字段:
- link_id: 24 bits, 逻辑链路标识
- error_reason: 8 bits, 出错原因
- reserved: 8 bits
"""
link_id: int
error_reason: int
DATA_TYPE_INDEX = 0x002A
BYTE_LENGTH = 5
[文档]
def pack(self) -> bytes:
return self.link_id.to_bytes(3, "big") + bytes([self.error_reason & 0xFF, 0])
[文档]
@classmethod
def unpack(cls, data: bytes) -> BroadcastLinkDisconnect:
if len(data) < cls.BYTE_LENGTH:
raise ValueError(f"数据不足: 需要 {cls.BYTE_LENGTH} 字节")
return cls(int.from_bytes(data[0:3], "big"), data[3])
# ---------------------------------------------------------------------------
# 7.3.2.51 系统管理帧信令传输终止 (0x0030, 8 bits / 1 byte)
# ---------------------------------------------------------------------------
[文档]
@dataclass
class SMFSignalingTerminate:
"""系统管理帧信令传输终止。"""
terminate_type: int # 8 bits
DATA_TYPE_INDEX = 0x0030
BYTE_LENGTH = 1
[文档]
def pack(self) -> bytes:
return bytes([self.terminate_type & 0xFF])
[文档]
@classmethod
def unpack(cls, data: bytes) -> SMFSignalingTerminate:
if len(data) < cls.BYTE_LENGTH:
raise ValueError(f"数据不足: 需要 {cls.BYTE_LENGTH} 字节")
return cls(data[0])
# ---------------------------------------------------------------------------
# 7.3.2.52 角色切换请求 (0x0031, 32 bits / 4 bytes)
# ---------------------------------------------------------------------------
[文档]
@dataclass
class RoleSwitchRequest:
"""角色切换请求。
字段:
- effective_slot: 32 bits, 信令生效时隙号
"""
effective_slot: int
DATA_TYPE_INDEX = 0x0031
BYTE_LENGTH = 4
[文档]
def pack(self) -> bytes:
return self.effective_slot.to_bytes(4, "big")
[文档]
@classmethod
def unpack(cls, data: bytes) -> RoleSwitchRequest:
if len(data) < cls.BYTE_LENGTH:
raise ValueError(f"数据不足: 需要 {cls.BYTE_LENGTH} 字节")
return cls(int.from_bytes(data[:4], "big"))
# ---------------------------------------------------------------------------
# 7.3.2.53 时间偏移指示 (0x0032, 64 bits / 8 bytes)
# ---------------------------------------------------------------------------
[文档]
@dataclass
class TimeOffsetIndication:
"""时间偏移指示。"""
time_offset: int # 64 bits
DATA_TYPE_INDEX = 0x0032
BYTE_LENGTH = 8
[文档]
def pack(self) -> bytes:
return self.time_offset.to_bytes(8, "big")
[文档]
@classmethod
def unpack(cls, data: bytes) -> TimeOffsetIndication:
if len(data) < cls.BYTE_LENGTH:
raise ValueError(f"数据不足: 需要 {cls.BYTE_LENGTH} 字节")
return cls(int.from_bytes(data[:8], "big"))
# ---------------------------------------------------------------------------
# 7.3.2.54 PING 请求 (0x0033, 0 bytes)
# ---------------------------------------------------------------------------
[文档]
@dataclass
class PingRequest:
"""PING 请求 (无载荷)。"""
DATA_TYPE_INDEX = 0x0033
BYTE_LENGTH = 0
[文档]
def pack(self) -> bytes:
return b""
[文档]
@classmethod
def unpack(cls, data: bytes) -> PingRequest:
return cls()
# ---------------------------------------------------------------------------
# 7.3.2.55 PING 响应 (0x0034, 0 bytes)
# ---------------------------------------------------------------------------
[文档]
@dataclass
class PingResponse:
"""PING 响应 (无载荷)。"""
DATA_TYPE_INDEX = 0x0034
BYTE_LENGTH = 0
[文档]
def pack(self) -> bytes:
return b""
[文档]
@classmethod
def unpack(cls, data: bytes) -> PingResponse:
return cls()
# ---------------------------------------------------------------------------
# 7.3.2.62 超时时间更新请求 (0x003C, 16 bits / 2 bytes)
# ---------------------------------------------------------------------------
[文档]
@dataclass
class TimeoutUpdateRequest:
"""超时时间更新请求。"""
timeout: int # 16 bits, 单位 10ms
DATA_TYPE_INDEX = 0x003C
BYTE_LENGTH = 2
[文档]
def pack(self) -> bytes:
return self.timeout.to_bytes(2, "big")
[文档]
@classmethod
def unpack(cls, data: bytes) -> TimeoutUpdateRequest:
if len(data) < cls.BYTE_LENGTH:
raise ValueError(f"数据不足: 需要 {cls.BYTE_LENGTH} 字节")
return cls(int.from_bytes(data[:2], "big"))
# ---------------------------------------------------------------------------
# 7.3.2.63 组播链路断开指示 (0x003D, 24 bits / 3 bytes)
# ---------------------------------------------------------------------------
[文档]
@dataclass
class MulticastDisconnect:
"""组播链路断开指示。"""
link_id: int # 24 bits
# 注: 标准定义 3 字节 (link_id 仅 24 bits, 无额外字段)
DATA_TYPE_INDEX = 0x003D
BYTE_LENGTH = 3
[文档]
def pack(self) -> bytes:
return self.link_id.to_bytes(3, "big")
[文档]
@classmethod
def unpack(cls, data: bytes) -> MulticastDisconnect:
if len(data) < cls.BYTE_LENGTH:
raise ValueError(f"数据不足: 需要 {cls.BYTE_LENGTH} 字节")
return cls(int.from_bytes(data[:3], "big"))
# ---------------------------------------------------------------------------
# 7.3.2.35 链接态异步链路参数更新请求 (0x0020, 216 bits / 27 bytes)
# ---------------------------------------------------------------------------
[文档]
@dataclass
class AsyncLinkParamRequest:
"""链接态异步链路参数更新请求。"""
event_group_period_min: int # 16 bits
event_group_period_max: int # 16 bits
delay_period: int # 16 bits
timeout: int # 16 bits (10ms)
expected_period_unit: int # 8 bits
effective_ref_slot: int # 32 bits
offsets: tuple[int, ...] # 6 × 16 bits
time_slot_length: int # 8 bits
time_slot_count: int # 8 bits
DATA_TYPE_INDEX = 0x0020
BYTE_LENGTH = 27
[文档]
def pack(self) -> bytes:
buf = struct.pack(">HHHH", self.event_group_period_min,
self.event_group_period_max,
self.delay_period, self.timeout)
buf += bytes([self.expected_period_unit & 0xFF])
buf += self.effective_ref_slot.to_bytes(4, "big")
offs = (self.offsets + (0,) * 6)[:6]
for o in offs:
buf += struct.pack(">H", o & 0xFFFF)
buf += bytes([self.time_slot_length & 0xFF, self.time_slot_count & 0xFF])
return buf
[文档]
@classmethod
def unpack(cls, data: bytes) -> AsyncLinkParamRequest:
if len(data) < cls.BYTE_LENGTH:
raise ValueError(f"数据不足: 需要 {cls.BYTE_LENGTH} 字节")
egpm, egpx, dp, to = struct.unpack(">HHHH", data[0:8])
epu = data[8]
ers = int.from_bytes(data[9:13], "big")
offs = tuple(struct.unpack(">H", data[13 + i * 2:15 + i * 2])[0] for i in range(6))
tsl = data[25]
tsc = data[26]
return cls(egpm, egpx, dp, to, epu, ers, offs, tsl, tsc)
# ---------------------------------------------------------------------------
# 7.3.2.36 链接态异步链路参数更新响应 (0x0021, 216 bits / 27 bytes)
# ---------------------------------------------------------------------------
[文档]
@dataclass
class AsyncLinkParamResponse:
"""链接态异步链路参数更新响应 (与请求字段完全相同)。"""
event_group_period_min: int
event_group_period_max: int
delay_period: int
timeout: int
expected_period_unit: int
effective_ref_slot: int
offsets: tuple[int, ...]
time_slot_length: int
time_slot_count: int
DATA_TYPE_INDEX = 0x0021
BYTE_LENGTH = 27
[文档]
def pack(self) -> bytes:
buf = struct.pack(">HHHH", self.event_group_period_min,
self.event_group_period_max,
self.delay_period, self.timeout)
buf += bytes([self.expected_period_unit & 0xFF])
buf += self.effective_ref_slot.to_bytes(4, "big")
offs = (self.offsets + (0,) * 6)[:6]
for o in offs:
buf += struct.pack(">H", o & 0xFFFF)
buf += bytes([self.time_slot_length & 0xFF, self.time_slot_count & 0xFF])
return buf
[文档]
@classmethod
def unpack(cls, data: bytes) -> AsyncLinkParamResponse:
if len(data) < cls.BYTE_LENGTH:
raise ValueError(f"数据不足: 需要 {cls.BYTE_LENGTH} 字节")
egpm, egpx, dp, to = struct.unpack(">HHHH", data[0:8])
epu = data[8]
ers = int.from_bytes(data[9:13], "big")
offs = tuple(struct.unpack(">H", data[13 + i * 2:15 + i * 2])[0] for i in range(6))
tsl = data[25]
tsc = data[26]
return cls(egpm, egpx, dp, to, epu, ers, offs, tsl, tsc)
# ---------------------------------------------------------------------------
# 7.3.2.37 同步等时链路建链指示 (0x0022, 448 bits / 56 bytes)
# ---------------------------------------------------------------------------
[文档]
@dataclass
class IsochronousLinkSetup:
"""同步等时链路建链指示。"""
event_group_set_id: int # 8
event_group_id: int # 8
effective_slot: int # 32
event_group_period: int # 16
event_period: int # 16
intra_event_interval: int # 16
inter_event_interval: int # 16
event_count: int # 8
sync_anchor_delay: int # 24
sync_ref_delay: int # 24
scheduling_slot: int # 3
tx_rx_indication: int # 1
tx_adapt_mode: int # 1
rx_adapt_mode: int # 1
tx_link_id: int # 24
rx_link_id: int # 24
tx_frame_type: int # 4
rx_frame_type: int # 4
tx_bandwidth: int # 2
rx_bandwidth: int # 2
tx_pilot_density: int # 2
rx_pilot_density: int # 2
tx_sdu_max: int # 12
rx_sdu_max: int # 12
tx_sdu_period: int # 20
rx_sdu_period: int # 20
tx_pdu_max: int # 11
rx_pdu_max: int # 11
tx_max_time_offset: int # 9
rx_max_time_offset: int # 9
tx_new_pkt_count: int # 4
rx_new_pkt_count: int # 4
tx_crc_init: int # 32
rx_crc_init: int # 32
tx_discard_period: int # 8
rx_discard_period: int # 8
tx_crc_type: int # 1
rx_crc_type: int # 1
tx_feedback_type: int # 6
rx_feedback_type: int # 3
DATA_TYPE_INDEX = 0x0022
BYTE_LENGTH = 56
[文档]
def pack(self) -> bytes:
buf = bytes([self.event_group_set_id & 0xFF, self.event_group_id & 0xFF])
buf += self.effective_slot.to_bytes(4, "big")
buf += struct.pack(">HHHHB", self.event_group_period, self.event_period,
self.intra_event_interval, self.inter_event_interval,
self.event_count)
buf += self.sync_anchor_delay.to_bytes(3, "big")
buf += self.sync_ref_delay.to_bytes(3, "big")
# 编码剩余位域 (23 bytes = 184 bits)
b = 0
b = (b << 3) | (self.scheduling_slot & 0x07)
b = (b << 2) | 0 # reserved
b = (b << 1) | (self.tx_rx_indication & 0x01)
b = (b << 1) | (self.tx_adapt_mode & 0x01)
b = (b << 1) | (self.rx_adapt_mode & 0x01)
b = (b << 24) | (self.tx_link_id & 0xFFFFFF)
b = (b << 24) | (self.rx_link_id & 0xFFFFFF)
b = (b << 4) | (self.tx_frame_type & 0x0F)
b = (b << 4) | (self.rx_frame_type & 0x0F)
b = (b << 2) | (self.tx_bandwidth & 0x03)
b = (b << 2) | (self.rx_bandwidth & 0x03)
b = (b << 2) | (self.tx_pilot_density & 0x03)
b = (b << 2) | (self.rx_pilot_density & 0x03)
b = (b << 12) | (self.tx_sdu_max & 0xFFF)
b = (b << 12) | (self.rx_sdu_max & 0xFFF)
b = (b << 20) | (self.tx_sdu_period & 0xFFFFF)
b = (b << 20) | (self.rx_sdu_period & 0xFFFFF)
b = (b << 11) | (self.tx_pdu_max & 0x7FF)
b = (b << 11) | (self.rx_pdu_max & 0x7FF)
b = (b << 9) | (self.tx_max_time_offset & 0x1FF)
b = (b << 9) | (self.rx_max_time_offset & 0x1FF)
b = (b << 4) | (self.tx_new_pkt_count & 0x0F)
b = (b << 4) | (self.rx_new_pkt_count & 0x0F)
buf += b.to_bytes(23, "big")
buf += self.tx_crc_init.to_bytes(4, "big")
buf += self.rx_crc_init.to_bytes(4, "big")
buf += bytes([self.tx_discard_period & 0xFF, self.rx_discard_period & 0xFF])
tail = ((self.tx_crc_type & 1) << 15
| (self.rx_crc_type & 1) << 14
| (self.tx_feedback_type & 0x3F) << 8
| (self.rx_feedback_type & 0x07) << 5)
buf += tail.to_bytes(2, "big")
return buf
[文档]
@classmethod
def unpack(cls, data: bytes) -> IsochronousLinkSetup:
if len(data) < cls.BYTE_LENGTH:
raise ValueError(f"数据不足: 需要 {cls.BYTE_LENGTH} 字节")
egsi = data[0]
egi = data[1]
es = int.from_bytes(data[2:6], "big")
egp, ep, iei, iei2, ec = struct.unpack(">HHHHB", data[6:15])
sad = int.from_bytes(data[15:18], "big")
srd = int.from_bytes(data[18:21], "big")
b = int.from_bytes(data[21:44], "big")
rnpc = b & 0x0F
b >>= 4
tnpc = b & 0x0F
b >>= 4
rmto = b & 0x1FF
b >>= 9
tmto = b & 0x1FF
b >>= 9
rpm = b & 0x7FF
b >>= 11
tpm = b & 0x7FF
b >>= 11
rsp = b & 0xFFFFF
b >>= 20
tsp = b & 0xFFFFF
b >>= 20
rsm = b & 0xFFF
b >>= 12
tsm = b & 0xFFF
b >>= 12
rpd = b & 0x03
b >>= 2
tpd = b & 0x03
b >>= 2
rb = b & 0x03
b >>= 2
tb = b & 0x03
b >>= 2
rft = b & 0x0F
b >>= 4
tft = b & 0x0F
b >>= 4
rli = b & 0xFFFFFF
b >>= 24
tli = b & 0xFFFFFF
b >>= 24
ram = b & 0x01
b >>= 1
tam = b & 0x01
b >>= 1
tri = b & 0x01
b >>= 1
b >>= 2 # reserved
ss = b & 0x07
tci = int.from_bytes(data[44:48], "big")
rci = int.from_bytes(data[48:52], "big")
tdp = data[52]
rdp = data[53]
tail = int.from_bytes(data[54:56], "big")
tct = (tail >> 15) & 1
rct = (tail >> 14) & 1
tfb = (tail >> 8) & 0x3F
rfb = (tail >> 5) & 0x07
return cls(
egsi, egi, es, egp, ep, iei, iei2, ec, sad, srd,
ss, tri, tam, ram, tli, rli, tft, rft, tb, rb, tpd, rpd,
tsm, rsm, tsp, rsp, tpm, rpm, tmto, rmto, tnpc, rnpc,
tci, rci, tdp, rdp, tct, rct, tfb, rfb,
)
# ---------------------------------------------------------------------------
# 7.3.2.38 同步等时链路参数交互请求 (0x0023, 416 bits / 52 bytes)
# ---------------------------------------------------------------------------
[文档]
@dataclass
class IsochronousParamExchangeRequest:
"""同步等时链路参数交互请求。"""
event_group_set_id: int
event_group_id: int
event_group_period: int
event_period: int
intra_event_interval: int
inter_event_interval: int
event_count: int
sync_anchor_delay: int # 24
sync_ref_delay: int # 24
param_tag_id: int # 4
tx_rx_indication: int # 1
tx_adapt_mode: int # 1
rx_adapt_mode: int # 1
tx_link_id: int # 24
rx_link_id: int # 24
tx_frame_type: int # 4
rx_frame_type: int # 4
tx_bandwidth: int
rx_bandwidth: int
tx_pilot_density: int
rx_pilot_density: int
tx_sdu_max: int
rx_sdu_max: int
tx_sdu_period: int
rx_sdu_period: int
tx_pdu_max: int
rx_pdu_max: int
tx_max_time_offset: int
rx_max_time_offset: int
tx_new_pkt_count: int
rx_new_pkt_count: int
tx_crc_init: int
rx_crc_init: int
tx_discard_period: int
rx_discard_period: int
tx_crc_type: int
rx_crc_type: int
tx_feedback_type: int
rx_feedback_type: int
DATA_TYPE_INDEX = 0x0023
BYTE_LENGTH = 52
[文档]
def pack(self) -> bytes:
buf = bytes([self.event_group_set_id & 0xFF, self.event_group_id & 0xFF])
buf += struct.pack(">HHHHB", self.event_group_period, self.event_period,
self.intra_event_interval, self.inter_event_interval,
self.event_count)
buf += self.sync_anchor_delay.to_bytes(3, "big")
buf += self.sync_ref_delay.to_bytes(3, "big")
b = 0
b = (b << 4) | (self.param_tag_id & 0x0F)
b = (b << 1) | 0 # reserved
b = (b << 1) | (self.tx_rx_indication & 0x01)
b = (b << 1) | (self.tx_adapt_mode & 0x01)
b = (b << 1) | (self.rx_adapt_mode & 0x01)
b = (b << 24) | (self.tx_link_id & 0xFFFFFF)
b = (b << 24) | (self.rx_link_id & 0xFFFFFF)
b = (b << 4) | (self.tx_frame_type & 0x0F)
b = (b << 4) | (self.rx_frame_type & 0x0F)
b = (b << 2) | (self.tx_bandwidth & 0x03)
b = (b << 2) | (self.rx_bandwidth & 0x03)
b = (b << 2) | (self.tx_pilot_density & 0x03)
b = (b << 2) | (self.rx_pilot_density & 0x03)
b = (b << 12) | (self.tx_sdu_max & 0xFFF)
b = (b << 12) | (self.rx_sdu_max & 0xFFF)
b = (b << 20) | (self.tx_sdu_period & 0xFFFFF)
b = (b << 20) | (self.rx_sdu_period & 0xFFFFF)
b = (b << 11) | (self.tx_pdu_max & 0x7FF)
b = (b << 11) | (self.rx_pdu_max & 0x7FF)
b = (b << 9) | (self.tx_max_time_offset & 0x1FF)
b = (b << 9) | (self.rx_max_time_offset & 0x1FF)
b = (b << 4) | (self.tx_new_pkt_count & 0x0F)
b = (b << 4) | (self.rx_new_pkt_count & 0x0F)
buf += b.to_bytes(23, "big")
buf += self.tx_crc_init.to_bytes(4, "big")
buf += self.rx_crc_init.to_bytes(4, "big")
buf += bytes([self.tx_discard_period & 0xFF, self.rx_discard_period & 0xFF])
tail = ((self.tx_crc_type & 1) << 15
| (self.rx_crc_type & 1) << 14
| (self.tx_feedback_type & 0x3F) << 8
| (self.rx_feedback_type & 0x07) << 5)
buf += tail.to_bytes(2, "big")
return buf
[文档]
@classmethod
def unpack(cls, data: bytes) -> IsochronousParamExchangeRequest:
if len(data) < cls.BYTE_LENGTH:
raise ValueError(f"数据不足: 需要 {cls.BYTE_LENGTH} 字节")
egsi = data[0]
egi = data[1]
egp, ep, iei, iei2, ec = struct.unpack(">HHHHB", data[2:11])
sad = int.from_bytes(data[11:14], "big")
srd = int.from_bytes(data[14:17], "big")
b = int.from_bytes(data[17:40], "big")
rnpc = b & 0x0F
b >>= 4
tnpc = b & 0x0F
b >>= 4
rmto = b & 0x1FF
b >>= 9
tmto = b & 0x1FF
b >>= 9
rpm = b & 0x7FF
b >>= 11
tpm = b & 0x7FF
b >>= 11
rsp = b & 0xFFFFF
b >>= 20
tsp = b & 0xFFFFF
b >>= 20
rsm = b & 0xFFF
b >>= 12
tsm = b & 0xFFF
b >>= 12
rpd = b & 0x03
b >>= 2
tpd = b & 0x03
b >>= 2
rb = b & 0x03
b >>= 2
tb = b & 0x03
b >>= 2
rft = b & 0x0F
b >>= 4
tft = b & 0x0F
b >>= 4
rli = b & 0xFFFFFF
b >>= 24
tli = b & 0xFFFFFF
b >>= 24
ram = b & 0x01
b >>= 1
tam = b & 0x01
b >>= 1
tri = b & 0x01
b >>= 1
b >>= 1 # reserved
pti = b & 0x0F
tci = int.from_bytes(data[40:44], "big")
rci = int.from_bytes(data[44:48], "big")
tdp = data[48]
rdp = data[49]
tail = int.from_bytes(data[50:52], "big")
tct = (tail >> 15) & 1
rct = (tail >> 14) & 1
tfb = (tail >> 8) & 0x3F
rfb = (tail >> 5) & 0x07
return cls(
egsi, egi, egp, ep, iei, iei2, ec, sad, srd,
pti, tri, tam, ram, tli, rli, tft, rft, tb, rb, tpd, rpd,
tsm, rsm, tsp, rsp, tpm, rpm, tmto, rmto, tnpc, rnpc,
tci, rci, tdp, rdp, tct, rct, tfb, rfb,
)
# ---------------------------------------------------------------------------
# 7.3.2.39 同步等时链路参数交互响应 (0x0024, 416 bits / 52 bytes)
# ---------------------------------------------------------------------------
[文档]
@dataclass
class IsochronousParamExchangeResponse(IsochronousParamExchangeRequest):
"""同步等时链路参数交互响应 (与请求字段相同)。"""
DATA_TYPE_INDEX = 0x0024
BYTE_LENGTH = 52
[文档]
@classmethod
def unpack(cls, data: bytes) -> IsochronousParamExchangeResponse:
req = IsochronousParamExchangeRequest.unpack(data)
return cls(**{f.name: getattr(req, f.name) for f in fields(req)})
# ---------------------------------------------------------------------------
# 7.3.2.40 同步等时链路参数更新请求 (0x0025, 24 bits / 3 bytes)
# ---------------------------------------------------------------------------
[文档]
@dataclass
class IsochronousParamUpdateRequest:
"""同步等时链路参数更新请求。"""
param_tag_id: int # 4 bits
event_group_set_id: int # 8 bits
event_group_id: int # 8 bits
DATA_TYPE_INDEX = 0x0025
BYTE_LENGTH = 3
[文档]
def pack(self) -> bytes:
b0 = (self.param_tag_id & 0x0F) << 4
return bytes([b0, self.event_group_set_id & 0xFF, self.event_group_id & 0xFF])
[文档]
@classmethod
def unpack(cls, data: bytes) -> IsochronousParamUpdateRequest:
if len(data) < cls.BYTE_LENGTH:
raise ValueError(f"数据不足: 需要 {cls.BYTE_LENGTH} 字节")
return cls((data[0] >> 4) & 0x0F, data[1], data[2])
# ---------------------------------------------------------------------------
# 7.3.2.41 同步等时链路参数更新指示 (0x0026, 72 bits / 9 bytes)
# ---------------------------------------------------------------------------
[文档]
@dataclass
class IsochronousParamUpdateIndication:
"""同步等时链路参数更新指示。"""
param_tag_id: int # 4 bits
event_group_set_id: int # 8 bits
event_group_id: int # 8 bits
effective_ref_slot: int # 32 bits
event_group_offset: int # 16 bits
DATA_TYPE_INDEX = 0x0026
BYTE_LENGTH = 9
[文档]
def pack(self) -> bytes:
b0 = (self.param_tag_id & 0x0F) << 4
buf = bytes([b0, self.event_group_set_id & 0xFF, self.event_group_id & 0xFF])
buf += self.effective_ref_slot.to_bytes(4, "big")
buf += struct.pack(">H", self.event_group_offset)
return buf
[文档]
@classmethod
def unpack(cls, data: bytes) -> IsochronousParamUpdateIndication:
if len(data) < cls.BYTE_LENGTH:
raise ValueError(f"数据不足: 需要 {cls.BYTE_LENGTH} 字节")
pti = (data[0] >> 4) & 0x0F
egsi = data[1]
egi = data[2]
ers = int.from_bytes(data[3:7], "big")
ego = struct.unpack(">H", data[7:9])[0]
return cls(pti, egsi, egi, ers, ego)
# ---------------------------------------------------------------------------
# 7.3.2.42 链接态广播链路建立指示 (0x0027, 可变长)
# 固定部分 + 80 bit 跳频地图 (2.4GHz) = 68 bytes
# ---------------------------------------------------------------------------
[文档]
@dataclass
class BroadcastLinkSetup:
"""链接态广播链路建立指示 (2.4GHz 80-bit 跳频地图版本)。"""
transmission_type: int # 1 bit
adapt_mode: int # 1 bit
event_group_set_id: int # 8
event_group_count: int # 8
event_group_id: int # 8
effective_slot: int # 32
event_group_interval: int # 8
event_group_period: int # 16
event_period: int # 16
event_count: int # 8
base_link_id: int # 24
frame_type: int # 4
bandwidth: int # 2
pilot_density: int # 2
sdu_max: int # 12
sdu_period: int # 20
pdu_max: int # 11
new_pkt_count: int # 4
crc_type: int # 1
crc_base_init: int # 32
hop_map: bytes # 80 bits = 10 bytes
sync_anchor_delay: int # 24
sync_ref_delay: int # 24
DATA_TYPE_INDEX = 0x0027
BYTE_LENGTH = 44 # 不含可选加密字段
[文档]
def pack(self) -> bytes:
b0 = ((self.transmission_type & 1) << 7
| (self.adapt_mode & 1) << 6)
buf = bytes([b0, self.event_group_set_id & 0xFF,
self.event_group_count & 0xFF, self.event_group_id & 0xFF])
buf += self.effective_slot.to_bytes(4, "big")
buf += bytes([self.event_group_interval & 0xFF])
buf += struct.pack(">HHB", self.event_group_period,
self.event_period, self.event_count)
buf += self.base_link_id.to_bytes(3, "big")
# 位域: frame_type(4) + bandwidth(2) + pilot_density(2) = 8 bits
b_cfg = ((self.frame_type & 0x0F) << 4
| (self.bandwidth & 0x03) << 2
| (self.pilot_density & 0x03))
buf += bytes([b_cfg])
# sdu_max(12) + sdu_period(20) = 32 bits
sp = ((self.sdu_max & 0xFFF) << 20) | (self.sdu_period & 0xFFFFF)
buf += sp.to_bytes(4, "big")
# pdu_max(11) + new_pkt_count(4) + crc_type(1) = 16 bits
pc = ((self.pdu_max & 0x7FF) << 5
| (self.new_pkt_count & 0x0F) << 1
| (self.crc_type & 1))
buf += pc.to_bytes(2, "big")
buf += self.crc_base_init.to_bytes(4, "big")
hm = self.hop_map[:10] if len(self.hop_map) >= 10 else self.hop_map + b"\x00" * (10 - len(self.hop_map))
buf += hm
buf += self.sync_anchor_delay.to_bytes(3, "big")
buf += self.sync_ref_delay.to_bytes(3, "big")
return buf
[文档]
@classmethod
def unpack(cls, data: bytes) -> BroadcastLinkSetup:
if len(data) < cls.BYTE_LENGTH:
raise ValueError(f"数据不足: 需要 {cls.BYTE_LENGTH} 字节")
tt = (data[0] >> 7) & 1
am = (data[0] >> 6) & 1
egsi = data[1]
egc = data[2]
egi = data[3]
es = int.from_bytes(data[4:8], "big")
egi2 = data[8]
egp, ep, ec = struct.unpack(">HHB", data[9:14])
bli = int.from_bytes(data[14:17], "big")
bcfg = data[17]
ft = (bcfg >> 4) & 0x0F
bw = (bcfg >> 2) & 0x03
pd = bcfg & 0x03
sp = int.from_bytes(data[18:22], "big")
sm = (sp >> 20) & 0xFFF
spr = sp & 0xFFFFF
pc = int.from_bytes(data[22:24], "big")
pm = (pc >> 5) & 0x7FF
npc = (pc >> 1) & 0x0F
ct = pc & 1
cbi = int.from_bytes(data[24:28], "big")
hm = data[28:38]
sad = int.from_bytes(data[38:41], "big")
srd = int.from_bytes(data[41:44], "big") if len(data) >= 44 else 0
return cls(tt, am, egsi, egc, egi, es, egi2, egp, ep, ec,
bli, ft, bw, pd, sm, spr, pm, npc, ct, cbi, hm, sad, srd)
# ---------------------------------------------------------------------------
# 7.3.2.43 广播链路参数更新指示 (0x0028, 240 bits / 30 bytes)
# ---------------------------------------------------------------------------
[文档]
@dataclass
class BroadcastLinkParamUpdate:
"""广播链路参数更新指示。"""
event_group_set_id: int
event_group_count: int
event_group_id: int
event_group_period: int # 16
event_period: int # 16
event_count: int # 8
frame_type: int # 4
bandwidth: int # 2
pilot_density: int # 2
sdu_max: int # 12
new_pkt_count: int # 4
adapt_mode: int # 1
sdu_period: int # 20
pdu_max: int # 11
crc_type: int # 1
crc_base_init: int # 32
sync_anchor_delay: int # 24
sync_ref_delay: int # 24
effective_ref_slot: int # 32
event_group_offset: int # 16
DATA_TYPE_INDEX = 0x0028
BYTE_LENGTH = 32
[文档]
def pack(self) -> bytes:
buf = bytes([self.event_group_set_id & 0xFF,
self.event_group_count & 0xFF,
self.event_group_id & 0xFF])
buf += struct.pack(">HHB", self.event_group_period,
self.event_period, self.event_count)
b_cfg = ((self.frame_type & 0x0F) << 4
| (self.bandwidth & 0x03) << 2
| (self.pilot_density & 0x03))
buf += bytes([b_cfg])
# sdu_max(12) + new_pkt_count(4) + adapt_mode(1) + reserved(7) = 24 bits
sm_np = ((self.sdu_max & 0xFFF) << 12
| (self.new_pkt_count & 0x0F) << 8
| (self.adapt_mode & 1) << 7)
buf += sm_np.to_bytes(3, "big")
# sdu_period(20) + pdu_max(11) + crc_type(1) = 32 bits
sp_pm = ((self.sdu_period & 0xFFFFF) << 12
| (self.pdu_max & 0x7FF) << 1
| (self.crc_type & 1))
buf += sp_pm.to_bytes(4, "big")
buf += self.crc_base_init.to_bytes(4, "big")
buf += self.sync_anchor_delay.to_bytes(3, "big")
buf += self.sync_ref_delay.to_bytes(3, "big")
buf += self.effective_ref_slot.to_bytes(4, "big")
buf += struct.pack(">H", self.event_group_offset)
return buf
[文档]
@classmethod
def unpack(cls, data: bytes) -> BroadcastLinkParamUpdate:
if len(data) < cls.BYTE_LENGTH:
raise ValueError(f"数据不足: 需要 {cls.BYTE_LENGTH} 字节")
egsi, egc, egi = data[0], data[1], data[2]
egp, ep, ec = struct.unpack(">HHB", data[3:8])
bcfg = data[8]
ft = (bcfg >> 4) & 0x0F
bw = (bcfg >> 2) & 0x03
pd = bcfg & 0x03
sm_np = int.from_bytes(data[9:12], "big")
sm = (sm_np >> 12) & 0xFFF
npc = (sm_np >> 8) & 0x0F
am = (sm_np >> 7) & 1
sp_pm = int.from_bytes(data[12:16], "big")
spr = (sp_pm >> 12) & 0xFFFFF
pm = (sp_pm >> 1) & 0x7FF
ct = sp_pm & 1
cbi = int.from_bytes(data[16:20], "big")
sad = int.from_bytes(data[20:23], "big")
srd = int.from_bytes(data[23:26], "big")
ers = int.from_bytes(data[26:30], "big")
ego = 0
if len(data) >= 32:
ego = struct.unpack(">H", data[30:32])[0]
return cls(egsi, egc, egi, egp, ep, ec, ft, bw, pd,
sm, npc, am, spr, pm, ct, cbi, sad, srd, ers, ego)
# ---------------------------------------------------------------------------
# 7.3.2.44 广播链路跳频地图更新指示 (0x0029, 112 bits / 14 bytes)
# ---------------------------------------------------------------------------
[文档]
@dataclass
class BroadcastHopMapUpdate:
"""广播链路跳频地图更新指示。"""
hop_map: bytes # 80 bits = 10 bytes
effective_slot: int # 32 bits
DATA_TYPE_INDEX = 0x0029
BYTE_LENGTH = 14
[文档]
def pack(self) -> bytes:
hm = self.hop_map[:10] if len(self.hop_map) >= 10 else self.hop_map + b"\x00" * (10 - len(self.hop_map))
return hm + self.effective_slot.to_bytes(4, "big")
[文档]
@classmethod
def unpack(cls, data: bytes) -> BroadcastHopMapUpdate:
if len(data) < cls.BYTE_LENGTH:
raise ValueError(f"数据不足: 需要 {cls.BYTE_LENGTH} 字节")
return cls(data[:10], int.from_bytes(data[10:14], "big"))
# ---------------------------------------------------------------------------
# 7.3.2.46 系统管理帧参数更新请求 (0x002B, 64 bits / 8 bytes)
# ---------------------------------------------------------------------------
[文档]
@dataclass
class SMFParamUpdateRequest:
"""系统管理帧参数更新请求。"""
smf_period: int # 16 bits
smf_start_offset: int # 8 bits
link_id: int # 24 bits
frame_type: int # 4 bits
bandwidth: int # 2 bits
pilot_density: int # 2 bits
crc_type: int # 1 bit
DATA_TYPE_INDEX = 0x002B
BYTE_LENGTH = 8
[文档]
def pack(self) -> bytes:
buf = struct.pack(">HB", self.smf_period, self.smf_start_offset)
buf += self.link_id.to_bytes(3, "big")
b_cfg = ((self.frame_type & 0x0F) << 4
| (self.bandwidth & 0x03) << 2
| (self.pilot_density & 0x03))
b_tail = ((self.crc_type & 1) << 7)
buf += bytes([b_cfg, b_tail])
return buf
[文档]
@classmethod
def unpack(cls, data: bytes) -> SMFParamUpdateRequest:
if len(data) < cls.BYTE_LENGTH:
raise ValueError(f"数据不足: 需要 {cls.BYTE_LENGTH} 字节")
sp = struct.unpack(">H", data[0:2])[0]
so = data[2]
li = int.from_bytes(data[3:6], "big")
ft = (data[6] >> 4) & 0x0F
bw = (data[6] >> 2) & 0x03
pd = data[6] & 0x03
ct = (data[7] >> 7) & 1
return cls(sp, so, li, ft, bw, pd, ct)
# ---------------------------------------------------------------------------
# 7.3.2.47 系统管理帧参数更新指示 (0x002C, 96 bits / 12 bytes)
# ---------------------------------------------------------------------------
[文档]
@dataclass
class SMFParamUpdateIndication:
"""系统管理帧参数更新指示。"""
smf_period: int
smf_start_offset: int
link_id: int
frame_type: int
bandwidth: int
pilot_density: int
crc_type: int
crc_init: int # 32 bits -> 但标准说 96 bits total, 所以这里不含
DATA_TYPE_INDEX = 0x002C
BYTE_LENGTH = 12
[文档]
def pack(self) -> bytes:
buf = struct.pack(">HB", self.smf_period, self.smf_start_offset)
buf += self.link_id.to_bytes(3, "big")
b_cfg = ((self.frame_type & 0x0F) << 4
| (self.bandwidth & 0x03) << 2
| (self.pilot_density & 0x03))
b_tail = ((self.crc_type & 1) << 7)
buf += bytes([b_cfg, b_tail])
buf += self.crc_init.to_bytes(4, "big")
return buf
[文档]
@classmethod
def unpack(cls, data: bytes) -> SMFParamUpdateIndication:
if len(data) < cls.BYTE_LENGTH:
raise ValueError(f"数据不足: 需要 {cls.BYTE_LENGTH} 字节")
sp = struct.unpack(">H", data[0:2])[0]
so = data[2]
li = int.from_bytes(data[3:6], "big")
ft = (data[6] >> 4) & 0x0F
bw = (data[6] >> 2) & 0x03
pd = data[6] & 0x03
ct = (data[7] >> 7) & 1
ci = int.from_bytes(data[8:12], "big")
return cls(sp, so, li, ft, bw, pd, ct, ci)
# ---------------------------------------------------------------------------
# 7.3.2.48 系统管理帧时间片更新请求 (0x002D, 104 bits / 13 bytes)
# ---------------------------------------------------------------------------
[文档]
@dataclass
class SMFTimeSlotUpdateRequest:
"""系统管理帧时间片更新请求。"""
link_id: int # 24 bits
current_offset: int # 16 bits
offsets: tuple[int, ...] # 4 × 16 bits
DATA_TYPE_INDEX = 0x002D
BYTE_LENGTH = 13
[文档]
def pack(self) -> bytes:
buf = self.link_id.to_bytes(3, "big")
buf += struct.pack(">H", self.current_offset)
offs = (self.offsets + (0,) * 4)[:4]
for o in offs:
buf += struct.pack(">H", o & 0xFFFF)
return buf
[文档]
@classmethod
def unpack(cls, data: bytes) -> SMFTimeSlotUpdateRequest:
if len(data) < cls.BYTE_LENGTH:
raise ValueError(f"数据不足: 需要 {cls.BYTE_LENGTH} 字节")
li = int.from_bytes(data[0:3], "big")
co = struct.unpack(">H", data[3:5])[0]
offs = tuple(struct.unpack(">H", data[5 + i * 2:7 + i * 2])[0] for i in range(4))
return cls(li, co, offs)
# ---------------------------------------------------------------------------
# 7.3.2.49 系统管理帧时间片更新响应 (0x002E, 72 bits / 9 bytes)
# ---------------------------------------------------------------------------
[文档]
@dataclass
class SMFTimeSlotUpdateResponse:
"""系统管理帧时间片更新响应。"""
link_id: int # 24 bits
offset: int # 16 bits
effective_slot: int # 32 bits
DATA_TYPE_INDEX = 0x002E
BYTE_LENGTH = 9
[文档]
def pack(self) -> bytes:
buf = self.link_id.to_bytes(3, "big")
buf += struct.pack(">H", self.offset)
buf += self.effective_slot.to_bytes(4, "big")
return buf
[文档]
@classmethod
def unpack(cls, data: bytes) -> SMFTimeSlotUpdateResponse:
if len(data) < cls.BYTE_LENGTH:
raise ValueError(f"数据不足: 需要 {cls.BYTE_LENGTH} 字节")
li = int.from_bytes(data[0:3], "big")
o = struct.unpack(">H", data[3:5])[0]
es = int.from_bytes(data[5:9], "big")
return cls(li, o, es)
# ===================================================================
# 0x0035 - 0x003B 5GHz / 多级收发间隔
# ===================================================================
# ---------------------------------------------------------------------------
# 7.3.2.50 5GHz 频段信道状态指示 (0x0035, 400 bits / 50 bytes)
# ---------------------------------------------------------------------------
[文档]
@dataclass
class Channel5GStatusIndication:
"""5GHz 频段信道状态指示 (400-bit 信道分类)。"""
channel_classification: bytes # 50 bytes
DATA_TYPE_INDEX = 0x0035
BYTE_LENGTH = 50
[文档]
def pack(self) -> bytes:
return (self.channel_classification + b"\x00" * 50)[:50]
[文档]
@classmethod
def unpack(cls, data: bytes) -> Channel5GStatusIndication:
if len(data) < cls.BYTE_LENGTH:
raise ValueError(f"数据不足: 需要 {cls.BYTE_LENGTH} 字节")
return cls(data[:50])
# ---------------------------------------------------------------------------
# 5GHz 跳频地图更新指示 (0x0036 数据链路 / 0x003B 广播链路, 232 bits / 29 bytes)
# ---------------------------------------------------------------------------
[文档]
@dataclass
class HopMap5GUpdate:
"""5GHz 跳频地图更新指示 (数据链路)。"""
hop_map: bytes # 200 bits = 25 bytes
effective_slot: int # 32 bits
DATA_TYPE_INDEX = 0x0036
BYTE_LENGTH = 29
[文档]
def pack(self) -> bytes:
hm = (self.hop_map + b"\x00" * 25)[:25]
return hm + self.effective_slot.to_bytes(4, "big")
[文档]
@classmethod
def unpack(cls, data: bytes) -> HopMap5GUpdate:
if len(data) < cls.BYTE_LENGTH:
raise ValueError(f"数据不足: 需要 {cls.BYTE_LENGTH} 字节")
return cls(data[:25], int.from_bytes(data[25:29], "big"))
[文档]
@dataclass
class BroadcastHopMap5GUpdate(HopMap5GUpdate):
"""5GHz 广播链路跳频地图更新指示。"""
DATA_TYPE_INDEX = 0x003B
BYTE_LENGTH = 29
[文档]
@classmethod
def unpack(cls, data: bytes) -> BroadcastHopMap5GUpdate:
base = HopMap5GUpdate.unpack(data)
return cls(base.hop_map, base.effective_slot)
# ---------------------------------------------------------------------------
# 多级收发间隔更新 (0x0037 请求 / 0x0038 响应, 248 bits / 31 bytes)
# ---------------------------------------------------------------------------
[文档]
@dataclass
class MultiIntervalUpdateRequest:
"""多级收发间隔更新请求 (31 个间隔字节)。"""
intervals: bytes # 31 bytes
DATA_TYPE_INDEX = 0x0037
BYTE_LENGTH = 31
[文档]
def pack(self) -> bytes:
return (self.intervals + b"\x00" * 31)[:31]
[文档]
@classmethod
def unpack(cls, data: bytes) -> MultiIntervalUpdateRequest:
if len(data) < cls.BYTE_LENGTH:
raise ValueError(f"数据不足: 需要 {cls.BYTE_LENGTH} 字节")
return cls(data[:31])
[文档]
@dataclass
class MultiIntervalUpdateResponse(MultiIntervalUpdateRequest):
"""多级收发间隔更新响应 (与请求结构相同)。"""
DATA_TYPE_INDEX = 0x0038
BYTE_LENGTH = 31
[文档]
@classmethod
def unpack(cls, data: bytes) -> MultiIntervalUpdateResponse:
base = MultiIntervalUpdateRequest.unpack(data)
return cls(base.intervals)
# ---------------------------------------------------------------------------
# 多级收发间隔更新指示 (0x0039, 288 bits / 36 bytes)
# ---------------------------------------------------------------------------
[文档]
@dataclass
class MultiIntervalUpdateIndication:
"""多级收发间隔更新指示。"""
intervals: bytes # 31 bytes
update_flags: int # 6 bits (打包在一个字节的高 6 位)
effective_slot: int # 32 bits
DATA_TYPE_INDEX = 0x0039
BYTE_LENGTH = 36
[文档]
def pack(self) -> bytes:
buf = (self.intervals + b"\x00" * 31)[:31]
buf += bytes([(self.update_flags & 0x3F) << 2])
buf += self.effective_slot.to_bytes(4, "big")
return buf
[文档]
@classmethod
def unpack(cls, data: bytes) -> MultiIntervalUpdateIndication:
if len(data) < cls.BYTE_LENGTH:
raise ValueError(f"数据不足: 需要 {cls.BYTE_LENGTH} 字节")
intervals = data[:31]
uf = (data[31] >> 2) & 0x3F
es = int.from_bytes(data[32:36], "big")
return cls(intervals, uf, es)
# ===================================================================
# 0x003E - 0x0043 系统时间 / 异步组播
# ===================================================================
# ---------------------------------------------------------------------------
# 系统时间指示 (0x003E, 可变长)
# ---------------------------------------------------------------------------
[文档]
@dataclass
class SystemTimeIndication:
"""系统时间指示 (可变长, 按原始载荷存储)。"""
payload: bytes
DATA_TYPE_INDEX = 0x003E
BYTE_LENGTH = 0 # 可变
[文档]
def pack(self) -> bytes:
return self.payload
[文档]
@classmethod
def unpack(cls, data: bytes) -> SystemTimeIndication:
return cls(bytes(data))
# ---------------------------------------------------------------------------
# 异步组播链路建链指示 (0x003F, 360 bits / 45 bytes)
# ---------------------------------------------------------------------------
[文档]
@dataclass
class AsyncMulticastLinkSetup:
"""异步组播链路建链指示。"""
event_group_set_id: int # 8
event_group_id: int # 8
effective_slot: int # 32
event_group_period: int # 16
event_period: int # 16
intra_event_interval: int # 16
inter_event_interval: int # 16
scheduling_slot: int # 3
tx_rx_indication: int # 1
tx_link_id: int # 24
rx_link_id: int # 24
tx_frame_type: int # 4
rx_frame_type: int # 4
tx_bandwidth: int # 2
rx_bandwidth: int # 2
tx_pilot_density: int # 2
rx_pilot_density: int # 2
tx_sdu_max: int # 12
rx_sdu_max: int # 12
tx_sdu_period: int # 20
rx_sdu_period: int # 20
tx_pdu_max: int # 11
rx_pdu_max: int # 11
tx_max_time_offset: int # 9
rx_max_time_offset: int # 9
tx_crc_init: int # 32
rx_crc_init: int # 32
tx_crc_type: int # 1
rx_crc_type: int # 1
tx_feedback_type: int # 6
rx_feedback_type: int # 3
DATA_TYPE_INDEX = 0x003F
BYTE_LENGTH = 45
[文档]
def pack(self) -> bytes:
buf = bytes([self.event_group_set_id & 0xFF, self.event_group_id & 0xFF])
buf += self.effective_slot.to_bytes(4, "big")
buf += struct.pack(">HHHH", self.event_group_period, self.event_period,
self.intra_event_interval, self.inter_event_interval)
# 剩余字段合并为单一位域: 247 bits + 1 reserved = 248 bits = 31 bytes
b = 0
b = (b << 3) | (self.scheduling_slot & 0x07)
b = (b << 1) | (self.tx_rx_indication & 0x01)
b = (b << 24) | (self.tx_link_id & 0xFFFFFF)
b = (b << 24) | (self.rx_link_id & 0xFFFFFF)
b = (b << 4) | (self.tx_frame_type & 0x0F)
b = (b << 4) | (self.rx_frame_type & 0x0F)
b = (b << 2) | (self.tx_bandwidth & 0x03)
b = (b << 2) | (self.rx_bandwidth & 0x03)
b = (b << 2) | (self.tx_pilot_density & 0x03)
b = (b << 2) | (self.rx_pilot_density & 0x03)
b = (b << 12) | (self.tx_sdu_max & 0xFFF)
b = (b << 12) | (self.rx_sdu_max & 0xFFF)
b = (b << 20) | (self.tx_sdu_period & 0xFFFFF)
b = (b << 20) | (self.rx_sdu_period & 0xFFFFF)
b = (b << 11) | (self.tx_pdu_max & 0x7FF)
b = (b << 11) | (self.rx_pdu_max & 0x7FF)
b = (b << 9) | (self.tx_max_time_offset & 0x1FF)
b = (b << 9) | (self.rx_max_time_offset & 0x1FF)
b = (b << 32) | (self.tx_crc_init & 0xFFFFFFFF)
b = (b << 32) | (self.rx_crc_init & 0xFFFFFFFF)
b = (b << 1) | (self.tx_crc_type & 1)
b = (b << 1) | (self.rx_crc_type & 1)
b = (b << 6) | (self.tx_feedback_type & 0x3F)
b = (b << 3) | (self.rx_feedback_type & 0x07)
b <<= 1 # 1 bit reserved
buf += b.to_bytes(31, "big")
return buf
[文档]
@classmethod
def unpack(cls, data: bytes) -> AsyncMulticastLinkSetup:
if len(data) < cls.BYTE_LENGTH:
raise ValueError(f"数据不足: 需要 {cls.BYTE_LENGTH} 字节")
egsi = data[0]
egi = data[1]
es = int.from_bytes(data[2:6], "big")
egp, ep, iei, iei2 = struct.unpack(">HHHH", data[6:14])
b = int.from_bytes(data[14:45], "big")
b >>= 1 # reserved
rfb = b & 0x07
b >>= 3
tfb = b & 0x3F
b >>= 6
rct = b & 0x01
b >>= 1
tct = b & 0x01
b >>= 1
rci = b & 0xFFFFFFFF
b >>= 32
tci = b & 0xFFFFFFFF
b >>= 32
rmto = b & 0x1FF
b >>= 9
tmto = b & 0x1FF
b >>= 9
rpm = b & 0x7FF
b >>= 11
tpm = b & 0x7FF
b >>= 11
rsp = b & 0xFFFFF
b >>= 20
tsp = b & 0xFFFFF
b >>= 20
rsm = b & 0xFFF
b >>= 12
tsm = b & 0xFFF
b >>= 12
rpd = b & 0x03
b >>= 2
tpd = b & 0x03
b >>= 2
rb = b & 0x03
b >>= 2
tb = b & 0x03
b >>= 2
rft = b & 0x0F
b >>= 4
tft = b & 0x0F
b >>= 4
rli = b & 0xFFFFFF
b >>= 24
tli = b & 0xFFFFFF
b >>= 24
tri = b & 0x01
b >>= 1
ss = b & 0x07
return cls(
egsi, egi, es, egp, ep, iei, iei2,
ss, tri, tli, rli, tft, rft, tb, rb, tpd, rpd,
tsm, rsm, tsp, rsp, tpm, rpm, tmto, rmto,
tci, rci, tct, rct, tfb, rfb,
)
# ---------------------------------------------------------------------------
# 异步组播链路参数交互请求/响应 (0x0040/0x0041, 328 bits / 41 bytes)
# ---------------------------------------------------------------------------
[文档]
@dataclass
class AsyncMulticastParamExchangeRequest:
"""异步组播链路参数交互请求。"""
payload: bytes # 41 bytes (复杂位域, 按原始载荷存储)
DATA_TYPE_INDEX = 0x0040
BYTE_LENGTH = 41
[文档]
def pack(self) -> bytes:
return (self.payload + b"\x00" * 41)[:41]
[文档]
@classmethod
def unpack(cls, data: bytes) -> AsyncMulticastParamExchangeRequest:
if len(data) < cls.BYTE_LENGTH:
raise ValueError(f"数据不足: 需要 {cls.BYTE_LENGTH} 字节")
return cls(bytes(data[:41]))
[文档]
@dataclass
class AsyncMulticastParamExchangeResponse:
"""异步组播链路参数交互响应 (与请求结构相同)。"""
payload: bytes
DATA_TYPE_INDEX = 0x0041
BYTE_LENGTH = 41
[文档]
def pack(self) -> bytes:
return (self.payload + b"\x00" * 41)[:41]
[文档]
@classmethod
def unpack(cls, data: bytes) -> AsyncMulticastParamExchangeResponse:
if len(data) < cls.BYTE_LENGTH:
raise ValueError(f"数据不足: 需要 {cls.BYTE_LENGTH} 字节")
return cls(bytes(data[:41]))
# ---------------------------------------------------------------------------
# 异步组播链路参数更新请求 (0x0042, 24 bits / 3 bytes)
# ---------------------------------------------------------------------------
[文档]
@dataclass
class AsyncMulticastParamUpdateRequest:
"""异步组播链路参数更新请求。"""
param_tag_id: int # 4 bits
event_group_set_id: int # 8
event_group_id: int # 8
DATA_TYPE_INDEX = 0x0042
BYTE_LENGTH = 3
[文档]
def pack(self) -> bytes:
b0 = (self.param_tag_id & 0x0F) << 4
return bytes([b0, self.event_group_set_id & 0xFF, self.event_group_id & 0xFF])
[文档]
@classmethod
def unpack(cls, data: bytes) -> AsyncMulticastParamUpdateRequest:
if len(data) < cls.BYTE_LENGTH:
raise ValueError(f"数据不足: 需要 {cls.BYTE_LENGTH} 字节")
return cls((data[0] >> 4) & 0x0F, data[1], data[2])
# ---------------------------------------------------------------------------
# 异步组播链路参数更新指示 (0x0043, 72 bits / 9 bytes)
# ---------------------------------------------------------------------------
[文档]
@dataclass
class AsyncMulticastParamUpdateIndication:
"""异步组播链路参数更新指示。"""
param_tag_id: int
event_group_set_id: int
event_group_id: int
effective_ref_slot: int # 32
event_group_offset: int # 16
DATA_TYPE_INDEX = 0x0043
BYTE_LENGTH = 9
[文档]
def pack(self) -> bytes:
b0 = (self.param_tag_id & 0x0F) << 4
buf = bytes([b0, self.event_group_set_id & 0xFF, self.event_group_id & 0xFF])
buf += self.effective_ref_slot.to_bytes(4, "big")
buf += struct.pack(">H", self.event_group_offset)
return buf
[文档]
@classmethod
def unpack(cls, data: bytes) -> AsyncMulticastParamUpdateIndication:
if len(data) < cls.BYTE_LENGTH:
raise ValueError(f"数据不足: 需要 {cls.BYTE_LENGTH} 字节")
return cls(
(data[0] >> 4) & 0x0F, data[1], data[2],
int.from_bytes(data[3:7], "big"),
struct.unpack(">H", data[7:9])[0],
)
# ===================================================================
# 0x0044 - 0x0050 窄带跳频测量
# ===================================================================
# ---------------------------------------------------------------------------
# 零载荷信令基类
# ---------------------------------------------------------------------------
@dataclass
class _ZeroPayload:
"""零载荷信令基类。"""
def pack(self) -> bytes:
return b""
@classmethod
def unpack(cls, data: bytes):
return cls()
[文档]
@dataclass
class NarrowbandMeasCapRequest(_ZeroPayload):
"""窄带跳频测量能力请求。"""
DATA_TYPE_INDEX = 0x0044
BYTE_LENGTH = 0
# ---------------------------------------------------------------------------
# 窄带跳频测量能力响应 (0x0045, 256 bits / 32 bytes)
# ---------------------------------------------------------------------------
[文档]
@dataclass
class NarrowbandMeasCapResponse:
"""窄带跳频测量能力响应。"""
payload: bytes # 32 bytes
DATA_TYPE_INDEX = 0x0045
BYTE_LENGTH = 32
[文档]
def pack(self) -> bytes:
return (self.payload + b"\x00" * 32)[:32]
[文档]
@classmethod
def unpack(cls, data: bytes) -> NarrowbandMeasCapResponse:
if len(data) < cls.BYTE_LENGTH:
raise ValueError(f"数据不足: 需要 {cls.BYTE_LENGTH} 字节")
return cls(bytes(data[:32]))
# ---------------------------------------------------------------------------
# 窄带跳频测量频点表配置更新指示
# 0x0046 (2.4GHz, 11B), 0x0047 (5.1GHz, 26B), 0x0048 (5.8GHz, 17B)
# ---------------------------------------------------------------------------
[文档]
@dataclass
class NarrowbandFreqTable24Update:
"""窄带跳频测量频点表配置更新指示 (2.4GHz)。"""
config_index: int # 8 bits
freq_table: bytes # 80 bits = 10 bytes
DATA_TYPE_INDEX = 0x0046
BYTE_LENGTH = 11
[文档]
def pack(self) -> bytes:
ft = (self.freq_table + b"\x00" * 10)[:10]
return bytes([self.config_index & 0xFF]) + ft
[文档]
@classmethod
def unpack(cls, data: bytes) -> NarrowbandFreqTable24Update:
if len(data) < cls.BYTE_LENGTH:
raise ValueError(f"数据不足: 需要 {cls.BYTE_LENGTH} 字节")
return cls(data[0], data[1:11])
[文档]
@dataclass
class NarrowbandFreqTable51Update:
"""窄带跳频测量频点表配置更新指示 (5.1GHz)。"""
config_index: int
freq_table: bytes # 200 bits = 25 bytes
DATA_TYPE_INDEX = 0x0047
BYTE_LENGTH = 26
[文档]
def pack(self) -> bytes:
ft = (self.freq_table + b"\x00" * 25)[:25]
return bytes([self.config_index & 0xFF]) + ft
[文档]
@classmethod
def unpack(cls, data: bytes) -> NarrowbandFreqTable51Update:
if len(data) < cls.BYTE_LENGTH:
raise ValueError(f"数据不足: 需要 {cls.BYTE_LENGTH} 字节")
return cls(data[0], data[1:26])
[文档]
@dataclass
class NarrowbandFreqTable58Update:
"""窄带跳频测量频点表配置更新指示 (5.8GHz)。"""
config_index: int
freq_table: bytes # 128 bits = 16 bytes
DATA_TYPE_INDEX = 0x0048
BYTE_LENGTH = 17
[文档]
def pack(self) -> bytes:
ft = (self.freq_table + b"\x00" * 16)[:16]
return bytes([self.config_index & 0xFF]) + ft
[文档]
@classmethod
def unpack(cls, data: bytes) -> NarrowbandFreqTable58Update:
if len(data) < cls.BYTE_LENGTH:
raise ValueError(f"数据不足: 需要 {cls.BYTE_LENGTH} 字节")
return cls(data[0], data[1:17])
# ---------------------------------------------------------------------------
# 窄带跳频测量信号配置 (0x0049, 可变长)
# ---------------------------------------------------------------------------
[文档]
@dataclass
class NarrowbandMeasConfig:
"""窄带跳频测量信号配置 (可变长, 按原始载荷存储)。"""
payload: bytes
DATA_TYPE_INDEX = 0x0049
BYTE_LENGTH = 0
[文档]
def pack(self) -> bytes:
return self.payload
[文档]
@classmethod
def unpack(cls, data: bytes) -> NarrowbandMeasConfig:
return cls(bytes(data))
# ---------------------------------------------------------------------------
# 窄带跳频测量信息上报 (0x004A, 可变长)
# ---------------------------------------------------------------------------
[文档]
@dataclass
class NarrowbandMeasReport:
"""窄带跳频测量信息上报 (可变长)。"""
payload: bytes
DATA_TYPE_INDEX = 0x004A
BYTE_LENGTH = 0
[文档]
def pack(self) -> bytes:
return self.payload
[文档]
@classmethod
def unpack(cls, data: bytes) -> NarrowbandMeasReport:
return cls(bytes(data))
# ---------------------------------------------------------------------------
# 窄带跳频测量行为指示 (0x004B, 48 bits / 6 bytes)
# ---------------------------------------------------------------------------
[文档]
@dataclass
class NarrowbandMeasAction:
"""窄带跳频测量行为指示。"""
config_index: int # 8
start_slot: int # 32
action_config: int # 8
DATA_TYPE_INDEX = 0x004B
BYTE_LENGTH = 6
[文档]
def pack(self) -> bytes:
return bytes([self.config_index & 0xFF]) + self.start_slot.to_bytes(4, "big") + bytes([self.action_config & 0xFF])
[文档]
@classmethod
def unpack(cls, data: bytes) -> NarrowbandMeasAction:
if len(data) < cls.BYTE_LENGTH:
raise ValueError(f"数据不足: 需要 {cls.BYTE_LENGTH} 字节")
return cls(data[0], int.from_bytes(data[1:5], "big"), data[5])
# ---------------------------------------------------------------------------
# 坐标信息请求 (0x004C, 0 bytes)
# ---------------------------------------------------------------------------
[文档]
@dataclass
class CoordinateRequest(_ZeroPayload):
"""坐标信息请求。"""
DATA_TYPE_INDEX = 0x004C
BYTE_LENGTH = 0
# ---------------------------------------------------------------------------
# 坐标信息上报/配置 (0x004D/0x004E, 192 bits / 24 bytes)
# ---------------------------------------------------------------------------
[文档]
@dataclass
class CoordinateReport:
"""坐标信息上报。"""
rel_x: int # 32
rel_y: int # 32
rel_z: int # 32
abs_lon: int # 32 (经度)
abs_lat: int # 32 (纬度)
abs_alt: int # 32 (海拔)
DATA_TYPE_INDEX = 0x004D
BYTE_LENGTH = 24
[文档]
def pack(self) -> bytes:
return struct.pack(">iiiiii", self.rel_x, self.rel_y, self.rel_z,
self.abs_lon, self.abs_lat, self.abs_alt)
[文档]
@classmethod
def unpack(cls, data: bytes) -> CoordinateReport:
if len(data) < cls.BYTE_LENGTH:
raise ValueError(f"数据不足: 需要 {cls.BYTE_LENGTH} 字节")
vals = struct.unpack(">iiiiii", data[:24])
return cls(*vals)
[文档]
@dataclass
class CoordinateConfig:
"""坐标信息配置 (与上报结构相同)。"""
rel_x: int
rel_y: int
rel_z: int
abs_lon: int
abs_lat: int
abs_alt: int
DATA_TYPE_INDEX = 0x004E
BYTE_LENGTH = 24
[文档]
def pack(self) -> bytes:
return struct.pack(">iiiiii", self.rel_x, self.rel_y, self.rel_z,
self.abs_lon, self.abs_lat, self.abs_alt)
[文档]
@classmethod
def unpack(cls, data: bytes) -> CoordinateConfig:
if len(data) < cls.BYTE_LENGTH:
raise ValueError(f"数据不足: 需要 {cls.BYTE_LENGTH} 字节")
vals = struct.unpack(">iiiiii", data[:24])
return cls(*vals)
# ---------------------------------------------------------------------------
# 窄带跳频测量时延信息请求 (0x004F, 0 bytes)
# ---------------------------------------------------------------------------
[文档]
@dataclass
class NarrowbandDelayRequest(_ZeroPayload):
"""窄带跳频测量时延信息请求。"""
DATA_TYPE_INDEX = 0x004F
BYTE_LENGTH = 0
# ---------------------------------------------------------------------------
# 窄带跳频测量时延信息响应 (0x0050, 可变长)
# ---------------------------------------------------------------------------
[文档]
@dataclass
class NarrowbandDelayResponse:
"""窄带跳频测量时延信息响应 (可变长)。"""
payload: bytes
DATA_TYPE_INDEX = 0x0050
BYTE_LENGTH = 0
[文档]
def pack(self) -> bytes:
return self.payload
[文档]
@classmethod
def unpack(cls, data: bytes) -> NarrowbandDelayResponse:
return cls(bytes(data))
# ===================================================================
# 0x0051 异步 TT 链路建链指示 (可变长)
# ===================================================================
[文档]
@dataclass
class AsyncTTLinkSetup:
"""异步 TT 链路建链指示 (可变长)。"""
payload: bytes
DATA_TYPE_INDEX = 0x0051
BYTE_LENGTH = 0
[文档]
def pack(self) -> bytes:
return self.payload
[文档]
@classmethod
def unpack(cls, data: bytes) -> AsyncTTLinkSetup:
return cls(bytes(data))
# ===================================================================
# 0x0052 - 0x005C 超宽带脉冲测量/感知
# ===================================================================
[文档]
@dataclass
class UWBMeasCapRequest(_ZeroPayload):
"""超宽带脉冲测量能力请求。"""
DATA_TYPE_INDEX = 0x0052
BYTE_LENGTH = 0
[文档]
@dataclass
class UWBMeasCapResponse:
"""超宽带脉冲测量能力响应 (400 bits / 50 bytes)。"""
payload: bytes
DATA_TYPE_INDEX = 0x0053
BYTE_LENGTH = 50
[文档]
def pack(self) -> bytes:
return (self.payload + b"\x00" * 50)[:50]
[文档]
@classmethod
def unpack(cls, data: bytes) -> UWBMeasCapResponse:
if len(data) < cls.BYTE_LENGTH:
raise ValueError(f"数据不足: 需要 {cls.BYTE_LENGTH} 字节")
return cls(bytes(data[:50]))
[文档]
@dataclass
class UWBMeasConfig:
"""超宽带脉冲测量配置 (可变长)。"""
payload: bytes
DATA_TYPE_INDEX = 0x0054
BYTE_LENGTH = 0
[文档]
def pack(self) -> bytes:
return self.payload
[文档]
@classmethod
def unpack(cls, data: bytes) -> UWBMeasConfig:
return cls(bytes(data))
[文档]
@dataclass
class UWBMeasConfigFeedback:
"""超宽带脉冲测量配置反馈。"""
config_index: int # 8
status: int # 8
DATA_TYPE_INDEX = 0x0055
BYTE_LENGTH = 2
[文档]
def pack(self) -> bytes:
return bytes([self.config_index & 0xFF, self.status & 0xFF])
[文档]
@classmethod
def unpack(cls, data: bytes) -> UWBMeasConfigFeedback:
if len(data) < cls.BYTE_LENGTH:
raise ValueError(f"数据不足: 需要 {cls.BYTE_LENGTH} 字节")
return cls(data[0], data[1])
[文档]
@dataclass
class UWBMeasReport:
"""超宽带脉冲测量信息上报 (可变长)。"""
payload: bytes
DATA_TYPE_INDEX = 0x0056
BYTE_LENGTH = 0
[文档]
def pack(self) -> bytes:
return self.payload
[文档]
@classmethod
def unpack(cls, data: bytes) -> UWBMeasReport:
return cls(bytes(data))
[文档]
@dataclass
class UWBSensingCapRequest(_ZeroPayload):
"""超宽带脉冲感知能力请求。"""
DATA_TYPE_INDEX = 0x0057
BYTE_LENGTH = 0
[文档]
@dataclass
class UWBSensingCapResponse:
"""超宽带脉冲感知能力响应 (408 bits / 51 bytes)。"""
payload: bytes
DATA_TYPE_INDEX = 0x0058
BYTE_LENGTH = 51
[文档]
def pack(self) -> bytes:
return (self.payload + b"\x00" * 51)[:51]
[文档]
@classmethod
def unpack(cls, data: bytes) -> UWBSensingCapResponse:
if len(data) < cls.BYTE_LENGTH:
raise ValueError(f"数据不足: 需要 {cls.BYTE_LENGTH} 字节")
return cls(bytes(data[:51]))
# ---------------------------------------------------------------------------
# 0x0059 超宽带脉冲感知配置 (可变长)
# ---------------------------------------------------------------------------
[文档]
@dataclass
class UWBSensingConfig:
"""超宽带脉冲感知配置 (可变长)。"""
payload: bytes
DATA_TYPE_INDEX = 0x0059
BYTE_LENGTH = 0
[文档]
def pack(self) -> bytes:
return self.payload
[文档]
@classmethod
def unpack(cls, data: bytes) -> UWBSensingConfig:
return cls(bytes(data))
[文档]
@dataclass
class UWBSensingConfigFeedback:
"""超宽带脉冲感知配置反馈。"""
config_index: int
status: int
DATA_TYPE_INDEX = 0x005A
BYTE_LENGTH = 2
[文档]
def pack(self) -> bytes:
return bytes([self.config_index & 0xFF, self.status & 0xFF])
[文档]
@classmethod
def unpack(cls, data: bytes) -> UWBSensingConfigFeedback:
if len(data) < cls.BYTE_LENGTH:
raise ValueError(f"数据不足: 需要 {cls.BYTE_LENGTH} 字节")
return cls(data[0], data[1])
[文档]
@dataclass
class UWBSensingReport:
"""超宽带脉冲感知信息上报 (可变长)。"""
payload: bytes
DATA_TYPE_INDEX = 0x005B
BYTE_LENGTH = 0
[文档]
def pack(self) -> bytes:
return self.payload
[文档]
@classmethod
def unpack(cls, data: bytes) -> UWBSensingReport:
return cls(bytes(data))
[文档]
@dataclass
class UWBSensingAction:
"""超宽带脉冲感知行为指示。"""
config_index: int
start_slot: int # 32
action_config: int # 8
DATA_TYPE_INDEX = 0x005C
BYTE_LENGTH = 6
[文档]
def pack(self) -> bytes:
return bytes([self.config_index & 0xFF]) + self.start_slot.to_bytes(4, "big") + bytes([self.action_config & 0xFF])
[文档]
@classmethod
def unpack(cls, data: bytes) -> UWBSensingAction:
if len(data) < cls.BYTE_LENGTH:
raise ValueError(f"数据不足: 需要 {cls.BYTE_LENGTH} 字节")
return cls(data[0], int.from_bytes(data[1:5], "big"), data[5])
# ===================================================================
# 0x005D - 0x005E 资源预留
# ===================================================================
[文档]
@dataclass
class ResourceReservation:
"""资源预留指示。"""
config_index: int # 8
effective_slot: int # 32
event_group_period: int # 16
event_period: int # 16
event_length: int # 16
event_count: int # 8
scheduling_slot: int # 3
DATA_TYPE_INDEX = 0x005D
BYTE_LENGTH = 13
[文档]
def pack(self) -> bytes:
buf = bytes([self.config_index & 0xFF])
buf += self.effective_slot.to_bytes(4, "big")
buf += struct.pack(">HHHB", self.event_group_period,
self.event_period, self.event_length,
self.event_count)
buf += bytes([(self.scheduling_slot & 0x07) << 5])
return buf
[文档]
@classmethod
def unpack(cls, data: bytes) -> ResourceReservation:
if len(data) < cls.BYTE_LENGTH:
raise ValueError(f"数据不足: 需要 {cls.BYTE_LENGTH} 字节")
ci = data[0]
es = int.from_bytes(data[1:5], "big")
egp, ep, el, ec = struct.unpack(">HHHB", data[5:12])
ss = (data[12] >> 5) & 0x07
return cls(ci, es, egp, ep, el, ec, ss)
[文档]
@dataclass
class ResourceReservationTerminate:
"""资源预留终止。"""
config_index: int # 8
reason: int # 8
DATA_TYPE_INDEX = 0x005E
BYTE_LENGTH = 2
[文档]
def pack(self) -> bytes:
return bytes([self.config_index & 0xFF, self.reason & 0xFF])
[文档]
@classmethod
def unpack(cls, data: bytes) -> ResourceReservationTerminate:
if len(data) < cls.BYTE_LENGTH:
raise ValueError(f"数据不足: 需要 {cls.BYTE_LENGTH} 字节")
return cls(data[0], data[1])
# ===================================================================
# 0x005F - 0x0069 窄带跳频感知
# ===================================================================
[文档]
@dataclass
class NarrowbandSensingRequest:
"""窄带跳频感知流程请求。"""
payload: bytes # 16 bytes
DATA_TYPE_INDEX = 0x005F
BYTE_LENGTH = 16
[文档]
def pack(self) -> bytes:
return (self.payload + b"\x00" * 16)[:16]
[文档]
@classmethod
def unpack(cls, data: bytes) -> NarrowbandSensingRequest:
if len(data) < cls.BYTE_LENGTH:
raise ValueError(f"数据不足: 需要 {cls.BYTE_LENGTH} 字节")
return cls(bytes(data[:16]))
[文档]
@dataclass
class NarrowbandSensingFeedback:
"""窄带跳频感知流程反馈。"""
process_index: int # 8
status: int # 8
DATA_TYPE_INDEX = 0x0060
BYTE_LENGTH = 2
[文档]
def pack(self) -> bytes:
return bytes([self.process_index & 0xFF, self.status & 0xFF])
[文档]
@classmethod
def unpack(cls, data: bytes) -> NarrowbandSensingFeedback:
if len(data) < cls.BYTE_LENGTH:
raise ValueError(f"数据不足: 需要 {cls.BYTE_LENGTH} 字节")
return cls(data[0], data[1])
[文档]
@dataclass
class NarrowbandProxySensingRequest:
"""窄带跳频代理感知请求。"""
proxy_index: int # 8
sensing_index: int # 8
meas_quantity: int # 32
report_period: int # 32
bandwidth: int # 8
DATA_TYPE_INDEX = 0x0061
BYTE_LENGTH = 11
[文档]
def pack(self) -> bytes:
buf = bytes([self.proxy_index & 0xFF, self.sensing_index & 0xFF])
buf += self.meas_quantity.to_bytes(4, "big")
buf += self.report_period.to_bytes(4, "big")
buf += bytes([self.bandwidth & 0xFF])
return buf
[文档]
@classmethod
def unpack(cls, data: bytes) -> NarrowbandProxySensingRequest:
if len(data) < cls.BYTE_LENGTH:
raise ValueError(f"数据不足: 需要 {cls.BYTE_LENGTH} 字节")
return cls(
data[0], data[1],
int.from_bytes(data[2:6], "big"),
int.from_bytes(data[6:10], "big"),
data[10],
)
[文档]
@dataclass
class NarrowbandProxySensingFeedback:
"""窄带跳频代理感知反馈。"""
proxy_index: int
sensing_index: int # 16
status: int # 8
meas_quantity1: int # 32
meas_quantity2: int # 32
bandwidth1: int # 8
bandwidth2: int # 8
DATA_TYPE_INDEX = 0x0062
BYTE_LENGTH = 14
[文档]
def pack(self) -> bytes:
buf = bytes([self.proxy_index & 0xFF])
buf += struct.pack(">HB", self.sensing_index, self.status)
buf += self.meas_quantity1.to_bytes(4, "big")
buf += self.meas_quantity2.to_bytes(4, "big")
buf += bytes([self.bandwidth1 & 0xFF, self.bandwidth2 & 0xFF])
return buf
[文档]
@classmethod
def unpack(cls, data: bytes) -> NarrowbandProxySensingFeedback:
if len(data) < cls.BYTE_LENGTH:
raise ValueError(f"数据不足: 需要 {cls.BYTE_LENGTH} 字节")
pi = data[0]
si, st = struct.unpack(">HB", data[1:4])
mq1 = int.from_bytes(data[4:8], "big")
mq2 = int.from_bytes(data[8:12], "big")
return cls(pi, si, st, mq1, mq2, data[12], data[13])
[文档]
@dataclass
class NarrowbandSensingCapRequest(_ZeroPayload):
"""窄带跳频感知能力请求。"""
DATA_TYPE_INDEX = 0x0063
BYTE_LENGTH = 0
[文档]
@dataclass
class NarrowbandSensingCapResponse:
"""窄带跳频感知能力响应 (50 bytes)。"""
payload: bytes
DATA_TYPE_INDEX = 0x0064
BYTE_LENGTH = 50
[文档]
def pack(self) -> bytes:
return (self.payload + b"\x00" * 50)[:50]
[文档]
@classmethod
def unpack(cls, data: bytes) -> NarrowbandSensingCapResponse:
if len(data) < cls.BYTE_LENGTH:
raise ValueError(f"数据不足: 需要 {cls.BYTE_LENGTH} 字节")
return cls(bytes(data[:50]))
[文档]
@dataclass
class NarrowbandSensingConfig:
"""窄带跳频感知配置 (可变长)。"""
payload: bytes
DATA_TYPE_INDEX = 0x0065
BYTE_LENGTH = 0
[文档]
def pack(self) -> bytes:
return self.payload
[文档]
@classmethod
def unpack(cls, data: bytes) -> NarrowbandSensingConfig:
return cls(bytes(data))
[文档]
@dataclass
class NarrowbandSensingConfigFeedback:
"""窄带跳频感知配置反馈。"""
config_index: int
status: int
DATA_TYPE_INDEX = 0x0066
BYTE_LENGTH = 2
[文档]
def pack(self) -> bytes:
return bytes([self.config_index & 0xFF, self.status & 0xFF])
[文档]
@classmethod
def unpack(cls, data: bytes) -> NarrowbandSensingConfigFeedback:
if len(data) < cls.BYTE_LENGTH:
raise ValueError(f"数据不足: 需要 {cls.BYTE_LENGTH} 字节")
return cls(data[0], data[1])
[文档]
@dataclass
class SensingDeviceStatusReport:
"""感知设备状态上报。"""
config_index: int # 8
stability: int # 1 bit (存储在字节高位)
DATA_TYPE_INDEX = 0x0067
BYTE_LENGTH = 2
[文档]
def pack(self) -> bytes:
return bytes([self.config_index & 0xFF, (self.stability & 1) << 7])
[文档]
@classmethod
def unpack(cls, data: bytes) -> SensingDeviceStatusReport:
if len(data) < cls.BYTE_LENGTH:
raise ValueError(f"数据不足: 需要 {cls.BYTE_LENGTH} 字节")
return cls(data[0], (data[1] >> 7) & 1)
[文档]
@dataclass
class NarrowbandSensingReport:
"""窄带跳频感知信息上报 (可变长)。"""
payload: bytes
DATA_TYPE_INDEX = 0x0068
BYTE_LENGTH = 0
[文档]
def pack(self) -> bytes:
return self.payload
[文档]
@classmethod
def unpack(cls, data: bytes) -> NarrowbandSensingReport:
return cls(bytes(data))
[文档]
@dataclass
class NarrowbandSensingAction:
"""窄带跳频感知行为指示。"""
config_index: int
start_slot: int
action_config: int
DATA_TYPE_INDEX = 0x0069
BYTE_LENGTH = 6
[文档]
def pack(self) -> bytes:
return bytes([self.config_index & 0xFF]) + self.start_slot.to_bytes(4, "big") + bytes([self.action_config & 0xFF])
[文档]
@classmethod
def unpack(cls, data: bytes) -> NarrowbandSensingAction:
if len(data) < cls.BYTE_LENGTH:
raise ValueError(f"数据不足: 需要 {cls.BYTE_LENGTH} 字节")
return cls(data[0], int.from_bytes(data[1:5], "big"), data[5])
# ===================================================================
# 0x006A - 0x0070 配置更新 / UWB 感知
# ===================================================================
[文档]
@dataclass
class NarrowbandMeasConfigUpdateRequest:
"""窄带跳频测量信号配置更新请求 (32 bytes)。"""
payload: bytes
DATA_TYPE_INDEX = 0x006A
BYTE_LENGTH = 32
[文档]
def pack(self) -> bytes:
return (self.payload + b"\x00" * 32)[:32]
[文档]
@classmethod
def unpack(cls, data: bytes) -> NarrowbandMeasConfigUpdateRequest:
if len(data) < cls.BYTE_LENGTH:
raise ValueError(f"数据不足: 需要 {cls.BYTE_LENGTH} 字节")
return cls(bytes(data[:32]))
[文档]
@dataclass
class NarrowbandMeasConfigUpdateIndication:
"""窄带跳频测量信号配置更新指示 (与请求结构相同, 32 bytes)。"""
payload: bytes
DATA_TYPE_INDEX = 0x006B
BYTE_LENGTH = 32
[文档]
def pack(self) -> bytes:
return (self.payload + b"\x00" * 32)[:32]
[文档]
@classmethod
def unpack(cls, data: bytes) -> NarrowbandMeasConfigUpdateIndication:
if len(data) < cls.BYTE_LENGTH:
raise ValueError(f"数据不足: 需要 {cls.BYTE_LENGTH} 字节")
return cls(bytes(data[:32]))
[文档]
@dataclass
class UWBSensingProcessRequest:
"""超宽带脉冲感知流程请求 (16 bytes)。"""
payload: bytes
DATA_TYPE_INDEX = 0x006C
BYTE_LENGTH = 16
[文档]
def pack(self) -> bytes:
return (self.payload + b"\x00" * 16)[:16]
[文档]
@classmethod
def unpack(cls, data: bytes) -> UWBSensingProcessRequest:
if len(data) < cls.BYTE_LENGTH:
raise ValueError(f"数据不足: 需要 {cls.BYTE_LENGTH} 字节")
return cls(bytes(data[:16]))
[文档]
@dataclass
class UWBSensingProcessFeedback:
"""超宽带脉冲感知流程反馈。"""
process_index: int
status: int
DATA_TYPE_INDEX = 0x006D
BYTE_LENGTH = 2
[文档]
def pack(self) -> bytes:
return bytes([self.process_index & 0xFF, self.status & 0xFF])
[文档]
@classmethod
def unpack(cls, data: bytes) -> UWBSensingProcessFeedback:
if len(data) < cls.BYTE_LENGTH:
raise ValueError(f"数据不足: 需要 {cls.BYTE_LENGTH} 字节")
return cls(data[0], data[1])
[文档]
@dataclass
class UWBProxySensingRequest:
"""超宽带脉冲代理感知请求 (11 bytes)。"""
proxy_index: int
sensing_index: int
meas_quantity: int
report_period: int
bandwidth: int
DATA_TYPE_INDEX = 0x006E
BYTE_LENGTH = 11
[文档]
def pack(self) -> bytes:
buf = bytes([self.proxy_index & 0xFF, self.sensing_index & 0xFF])
buf += self.meas_quantity.to_bytes(4, "big")
buf += self.report_period.to_bytes(4, "big")
buf += bytes([self.bandwidth & 0xFF])
return buf
[文档]
@classmethod
def unpack(cls, data: bytes) -> UWBProxySensingRequest:
if len(data) < cls.BYTE_LENGTH:
raise ValueError(f"数据不足: 需要 {cls.BYTE_LENGTH} 字节")
return cls(
data[0], data[1],
int.from_bytes(data[2:6], "big"),
int.from_bytes(data[6:10], "big"),
data[10],
)
[文档]
@dataclass
class UWBProxySensingFeedback:
"""超宽带脉冲代理感知反馈 (14 bytes)。"""
proxy_index: int
sensing_index: int # 16
status: int
meas_quantity1: int # 32
meas_quantity2: int # 32
bandwidth1: int
bandwidth2: int
DATA_TYPE_INDEX = 0x006F
BYTE_LENGTH = 14
[文档]
def pack(self) -> bytes:
buf = bytes([self.proxy_index & 0xFF])
buf += struct.pack(">HB", self.sensing_index, self.status)
buf += self.meas_quantity1.to_bytes(4, "big")
buf += self.meas_quantity2.to_bytes(4, "big")
buf += bytes([self.bandwidth1 & 0xFF, self.bandwidth2 & 0xFF])
return buf
[文档]
@classmethod
def unpack(cls, data: bytes) -> UWBProxySensingFeedback:
if len(data) < cls.BYTE_LENGTH:
raise ValueError(f"数据不足: 需要 {cls.BYTE_LENGTH} 字节")
pi = data[0]
si, st = struct.unpack(">HB", data[1:4])
mq1 = int.from_bytes(data[4:8], "big")
mq2 = int.from_bytes(data[8:12], "big")
return cls(pi, si, st, mq1, mq2, data[12], data[13])
[文档]
@dataclass
class UWBMeasAction:
"""超宽带脉冲测量行为指示。"""
config_index: int
start_slot: int
action_config: int
DATA_TYPE_INDEX = 0x0070
BYTE_LENGTH = 6
[文档]
def pack(self) -> bytes:
return bytes([self.config_index & 0xFF]) + self.start_slot.to_bytes(4, "big") + bytes([self.action_config & 0xFF])
[文档]
@classmethod
def unpack(cls, data: bytes) -> UWBMeasAction:
if len(data) < cls.BYTE_LENGTH:
raise ValueError(f"数据不足: 需要 {cls.BYTE_LENGTH} 字节")
return cls(data[0], int.from_bytes(data[1:5], "big"), data[5])