"""广播帧编解码 -- TXS-10002-2025 标准 7.1.4
实现 SLE 广播、发现和接入所需的帧结构编解码:
- 广播帧通用结构 (BroadcastFrame)
- 扩展广播帧资源配置信息 (ExtAdvResourceConfig)
- 发现接入资源配置信息 (DiscoveryAccessResourceConfig)
- 接入基本信息 (AccessBasicInfo)
- 接入请求信息 (AccessRequestInfo)
- 接入响应信息 (AccessResponseInfo)
- 启动系统管理帧信息 (SystemMgmtFrameInfo)
"""
from __future__ import annotations
__all__ = [
"AccessBasicInfo",
"AccessRequestInfo",
"AccessResponseEntry",
"AccessResponseInfo",
"AccessResponseType",
"AddrType",
"BroadcastDataType",
"BroadcastFilter",
"BroadcastFrame",
"DiscoveryAccessEntry",
"DiscoveryAccessResourceConfig",
"ExtAdvResourceConfig",
"FilterCondition",
"FilterOp",
"GTNegotiation",
"NarrowbandMeasurementConfig",
"NonLinkedBroadcastLinkInfo",
"QueryRequestFilterInfo",
"RequestType",
"SystemMgmtFrameInfo",
"TransportIndicationInfo",
"UWBPulseMeasurementConfig",
]
import struct
from dataclasses import dataclass, field
from enum import IntEnum
# ---------------------------------------------------------------------------
# 枚举定义
# ---------------------------------------------------------------------------
[文档]
class AddrType(IntEnum):
"""媒体接入层标识类型 (7.1.4 表25)"""
ALLIANCE_ASSIGNED = 0
LOCAL_ORG = 2
RESOLVABLE_RANDOM = 3
NON_RESOLVABLE_RANDOM = 4
ALLIANCE_RESERVED = 5
PRIVATE = 6
[文档]
class BroadcastDataType(IntEnum):
"""广播帧数据类型 (7.1.4 表26)"""
DISCOVERY_ACCESS_RESOURCE = 0x00
TRANSPORT_INDICATION = 0x01
ACCESS_BASIC = 0x02
ACCESS_REQUEST = 0x03
ACCESS_RESPONSE = 0x04
SYSTEM_MGMT_FRAME = 0x05
UNLINKED_BROADCAST_LINK = 0x06
QUERY_REQUEST_FILTER = 0x07
NB_HOPPING_MEAS_CONFIG = 0x08
UWB_PULSE_MEAS_CONFIG = 0x09
UPPER_LAYER_DATA = 0xFF
[文档]
class RequestType(IntEnum):
"""请求类型标志 (7.1.4.2)"""
QUERY = 0
ACCESS = 1
UNRESTRICTED = 2
[文档]
class GTNegotiation(IntEnum):
"""GT 角色协商结果 (7.1.4.2)"""
NEGOTIATE_T = 0b00
NEGOTIATE_G = 0b01
FIXED_T = 0b10
FIXED_G = 0b11
[文档]
class AccessResponseType(IntEnum):
"""接入响应类型 (7.1.4.6)"""
ACCEPT = 0
GT_NEGOTIATION_FAIL = 1
RESOURCE_LIMITED = 2
USER_REJECT = 3
# ---------------------------------------------------------------------------
# 7.1.4.1 扩展广播帧资源配置信息 (4 字节)
# ---------------------------------------------------------------------------
[文档]
@dataclass
class ExtAdvResourceConfig:
"""扩展广播帧资源配置信息 (7.1.4.1, 表27)
4 字节结构:
- 频点索引: 8 bits
- 偏移量: 12 bits
- 无线帧类型指示: 4 bits
- 带宽指示: 2 bits
- 导频密度指示: 2 bits
- 时钟精度: 3 bits
- 偏移量单位: 1 bit
"""
channel_index: int # 频点索引 (0-255)
offset: int # 偏移量 (0-4095)
frame_type: int # 无线帧类型 (0-15)
bandwidth: int # 带宽 (0=1M, 1=2M, 2=4M)
pilot_density: int # 导频密度 (0=4:1, 1=8:1, 2=16:1, 3=无)
clock_accuracy: int # 时钟精度 (0-7)
offset_unit: int # 偏移量单位 (0=1基础时隙, 1=10基础时隙)
BYTE_LENGTH = 4
[文档]
def pack(self) -> bytes:
b0 = self.channel_index & 0xFF
b1 = (self.offset >> 4) & 0xFF
b2 = ((self.offset & 0x0F) << 4) | (self.frame_type & 0x0F)
b3 = (
((self.bandwidth & 0x03) << 6)
| ((self.pilot_density & 0x03) << 4)
| ((self.clock_accuracy & 0x07) << 1)
| (self.offset_unit & 0x01)
)
return bytes([b0, b1, b2, b3])
[文档]
@classmethod
def unpack(cls, data: bytes) -> ExtAdvResourceConfig:
if len(data) < cls.BYTE_LENGTH:
raise ValueError(f"数据不足: 需要 {cls.BYTE_LENGTH} 字节")
channel_index = data[0]
offset = (data[1] << 4) | ((data[2] >> 4) & 0x0F)
frame_type = data[2] & 0x0F
bandwidth = (data[3] >> 6) & 0x03
pilot_density = (data[3] >> 4) & 0x03
clock_accuracy = (data[3] >> 1) & 0x07
offset_unit = data[3] & 0x01
return cls(
channel_index, offset, frame_type,
bandwidth, pilot_density, clock_accuracy, offset_unit,
)
# ---------------------------------------------------------------------------
# 7.1.4.2 发现接入资源配置信息
# ---------------------------------------------------------------------------
[文档]
@dataclass
class DiscoveryAccessEntry:
"""单个发现接入资源条目"""
request_type: int # 请求类型标志 (2 bits)
carry_info_indication: int # 请求携带信息指示 (2 bits)
peer_addr_type: int # 对端标识类型 (3 bits)
addr_present: int # 标识指示 (1 bit)
peer_addr: bytes # 对端标识 (6字节, 仅当 addr_present=1 时有效)
[文档]
@dataclass
class DiscoveryAccessResourceConfig:
"""发现接入资源配置信息 (7.1.4.2, 表28)"""
request_offset: int # 请求偏移量 (2字节, us)
request_max_length: int # 请求最大长度 (1字节)
response_offset: int # 请求响应偏移量 (2字节, us)
gt_negotiation: int # GT 协商指示 (2 bits)
entry_count: int # 请求/响应个数 (6 bits)
entries: list[DiscoveryAccessEntry] = field(default_factory=list)
[文档]
def pack(self) -> bytes:
buf = struct.pack(">HBH", self.request_offset,
self.request_max_length, self.response_offset)
buf += bytes([
((self.gt_negotiation & 0x03) << 6) | (self.entry_count & 0x3F)
])
for entry in self.entries:
flag = (
((entry.request_type & 0x03) << 6)
| ((entry.carry_info_indication & 0x03) << 4)
| ((entry.peer_addr_type & 0x07) << 1)
| (entry.addr_present & 0x01)
)
buf += bytes([flag])
if entry.addr_present:
buf += entry.peer_addr[:6].ljust(6, b"\x00")
return buf
[文档]
@classmethod
def unpack(cls, data: bytes) -> DiscoveryAccessResourceConfig:
if len(data) < 6:
raise ValueError("数据不足: 至少需要 6 字节")
req_offset, req_max_len, resp_offset = struct.unpack(">HBH", data[:5])
gt_neg = (data[5] >> 6) & 0x03
count = data[5] & 0x3F
entries: list[DiscoveryAccessEntry] = []
pos = 6
for _ in range(count):
if pos >= len(data):
break
flag = data[pos]
pos += 1
req_type = (flag >> 6) & 0x03
carry_info = (flag >> 4) & 0x03
peer_type = (flag >> 1) & 0x07
addr_present = flag & 0x01
peer_addr = b""
if addr_present and pos + 6 <= len(data):
peer_addr = bytes(data[pos:pos + 6])
pos += 6
entries.append(DiscoveryAccessEntry(
req_type, carry_info, peer_type, addr_present, peer_addr,
))
return cls(req_offset, req_max_len, resp_offset, gt_neg, count, entries)
# ---------------------------------------------------------------------------
# 7.1.4.4 接入基本信息 (可变长)
# ---------------------------------------------------------------------------
[文档]
@dataclass
class AccessBasicInfo:
"""接入基本信息 (7.1.4.4, 表31)"""
smf_baseline_slot: int # 系统管理帧基线时隙 (4字节)
smf_offset: int # 启动系统管理帧偏移量 (2字节, us)
smf_link_id: int # 系统管理帧逻辑链路标识 (4字节)
smf_period: int # 系统管理帧周期间隔 (3字节, 基础时隙)
smf_frame_type: int # 系统管理帧无线帧类型 (4 bits)
smf_bandwidth: int # 系统管理帧带宽 (2 bits)
smf_pilot_density: int # 系统管理帧导频密度 (2 bits)
access_link_id: int # 接入逻辑链路标识 (3字节)
access_period: int # 接入周期间隔 (2字节, 基础时隙)
access_timeout: int # 接入超时时间 (1字节)
sleep_clock_accuracy: int # 睡眠时钟精度 (3 bits)
access_crc_type: int # 接入CRC类型 (1 bit, 0=CRC24, 1=CRC32)
access_crc_init: int # 接入CRC初始值 (4字节)
hop_map: bytes # 数据传输可用跳频地图 (10或25字节)
smf_channel_count: int # 系统管理帧可用频点个数 (1字节)
smf_channel_table: bytes # 系统管理帧频点表 (可变长)
[文档]
def pack(self) -> bytes:
buf = self.smf_baseline_slot.to_bytes(4, "big")
buf += self.smf_offset.to_bytes(2, "big")
buf += self.smf_link_id.to_bytes(4, "big")
buf += self.smf_period.to_bytes(3, "big")
b_config = (
((self.smf_frame_type & 0x0F) << 4)
| ((self.smf_bandwidth & 0x03) << 2)
| (self.smf_pilot_density & 0x03)
)
buf += bytes([b_config])
buf += self.access_link_id.to_bytes(3, "big")
buf += self.access_period.to_bytes(2, "big")
buf += bytes([self.access_timeout & 0xFF])
b_flags = (
((self.sleep_clock_accuracy & 0x07) << 5)
| (self.access_crc_type & 0x01)
)
buf += bytes([b_flags])
buf += self.access_crc_init.to_bytes(4, "big")
buf += self.hop_map
buf += bytes([self.smf_channel_count & 0xFF])
buf += self.smf_channel_table
return buf
[文档]
@classmethod
def unpack(cls, data: bytes) -> AccessBasicInfo:
if len(data) < 25:
raise ValueError("数据不足: 至少需要 25 字节")
smf_baseline = int.from_bytes(data[0:4], "big")
smf_offset = int.from_bytes(data[4:6], "big")
smf_link_id = int.from_bytes(data[6:10], "big")
smf_period = int.from_bytes(data[10:13], "big")
config = data[13]
smf_ft = (config >> 4) & 0x0F
smf_bw = (config >> 2) & 0x03
smf_pd = config & 0x03
access_lid = int.from_bytes(data[14:17], "big")
access_period = int.from_bytes(data[17:19], "big")
access_timeout = data[19]
flags = data[20]
sca = (flags >> 5) & 0x07
crc_type = flags & 0x01
crc_init = int.from_bytes(data[21:25], "big")
# 根据跳频地图长度判断: 2.4GHz = 10字节, 5.xGHz = 25字节
remaining = len(data) - 25
if remaining >= 25 + 1:
hop_map = bytes(data[25:50])
pos = 50
else:
hop_map = bytes(data[25:35])
pos = 35
smf_ch_count = data[pos] if pos < len(data) else 0
pos += 1
smf_ch_table = bytes(data[pos:pos + smf_ch_count])
return cls(
smf_baseline, smf_offset, smf_link_id, smf_period,
smf_ft, smf_bw, smf_pd, access_lid, access_period,
access_timeout, sca, crc_type, crc_init, hop_map,
smf_ch_count, smf_ch_table,
)
# ---------------------------------------------------------------------------
# 7.1.4.5 接入请求信息 (可变长)
# ---------------------------------------------------------------------------
[文档]
@dataclass
class AccessRequestInfo:
"""接入请求信息 (7.1.4.5, 表32/33)"""
structure_indication: int # 结构指示字段 (8 bits 位图)
gt_role: int | None = None # GT角色标志 (2 bits)
frame_support: int | None = None # 无线帧结构指示 (4 bits 位图)
bandwidth_support: int | None = None # 带宽指示 (3 bits 位图)
mcs_support: int | None = None # MCS指示 (13 bits 位图)
pilot_support: int | None = None # 导频指示 (4 bits 位图)
slot_support: int | None = None # 调度时隙指示 (5 bits 位图)
switch_delay: int | None = None # 收发切换时延 (4 bits)
crc_support: int | None = None # CRC指示 (2 bits 位图)
[文档]
def pack(self) -> bytes:
# 按位收集存在的字段, 组成比特流后按字节对齐
bits: list[int] = []
bits.extend(_int_to_bits(self.structure_indication, 8))
if self.structure_indication & 0x01 and self.gt_role is not None:
bits.extend(_int_to_bits(self.gt_role, 2))
if self.structure_indication & 0x02 and self.frame_support is not None:
bits.extend(_int_to_bits(self.frame_support, 4))
if self.structure_indication & 0x04 and self.bandwidth_support is not None:
bits.extend(_int_to_bits(self.bandwidth_support, 3))
if self.structure_indication & 0x08 and self.mcs_support is not None:
bits.extend(_int_to_bits(self.mcs_support, 13))
if self.structure_indication & 0x10 and self.pilot_support is not None:
bits.extend(_int_to_bits(self.pilot_support, 4))
if self.structure_indication & 0x20 and self.slot_support is not None:
bits.extend(_int_to_bits(self.slot_support, 5))
if self.structure_indication & 0x40 and self.switch_delay is not None:
bits.extend(_int_to_bits(self.switch_delay, 4))
if self.structure_indication & 0x80 and self.crc_support is not None:
bits.extend(_int_to_bits(self.crc_support, 2))
# 高位补零对齐到字节
pad = (8 - len(bits) % 8) % 8
bits.extend([0] * pad)
return _bits_to_bytes(bits)
[文档]
@classmethod
def unpack(cls, data: bytes) -> AccessRequestInfo:
if len(data) < 1:
raise ValueError("数据不足: 至少需要 1 字节")
bits = _bytes_to_bits(data)
pos = 0
si = _bits_to_int(bits, pos, 8)
pos += 8
gt = frame = bw = mcs = pilot = slot = switch = crc = None
if si & 0x01:
gt = _bits_to_int(bits, pos, 2)
pos += 2
if si & 0x02:
frame = _bits_to_int(bits, pos, 4)
pos += 4
if si & 0x04:
bw = _bits_to_int(bits, pos, 3)
pos += 3
if si & 0x08:
mcs = _bits_to_int(bits, pos, 13)
pos += 13
if si & 0x10:
pilot = _bits_to_int(bits, pos, 4)
pos += 4
if si & 0x20:
slot = _bits_to_int(bits, pos, 5)
pos += 5
if si & 0x40:
switch = _bits_to_int(bits, pos, 4)
pos += 4
if si & 0x80:
crc = _bits_to_int(bits, pos, 2)
pos += 2
return cls(si, gt, frame, bw, mcs, pilot, slot, switch, crc)
# ---------------------------------------------------------------------------
# 7.1.4.6 接入响应信息 (可变长)
# ---------------------------------------------------------------------------
[文档]
@dataclass
class AccessResponseEntry:
"""单个接入响应条目"""
peer_addr: bytes # 对端标识 (6字节)
response_type: int # 请求响应类型 (3 bits)
peer_addr_type: int # 对端标识类型 (3 bits)
repeat_indication: int # 重复指示 (2 bits)
capability: bytes | None = None # 广播节点能力指示 (6字节, 可选)
[文档]
@dataclass
class AccessResponseInfo:
"""接入响应信息 (7.1.4.6, 表34)"""
entries: list[AccessResponseEntry] = field(default_factory=list)
[文档]
def pack(self) -> bytes:
count = len(self.entries)
buf = bytes([(count & 0x3F) << 2])
for entry in self.entries:
buf += entry.peer_addr[:6].ljust(6, b"\x00")
flag = (
((entry.response_type & 0x07) << 5)
| ((entry.peer_addr_type & 0x07) << 2)
| (entry.repeat_indication & 0x03)
)
buf += bytes([flag])
if entry.repeat_indication in (1, 2) and entry.capability:
buf += entry.capability[:6].ljust(6, b"\x00")
return buf
[文档]
@classmethod
def unpack(cls, data: bytes) -> AccessResponseInfo:
if len(data) < 1:
raise ValueError("数据不足")
count = (data[0] >> 2) & 0x3F
entries: list[AccessResponseEntry] = []
pos = 1
for _ in range(count):
if pos + 7 > len(data):
break
peer_addr = bytes(data[pos:pos + 6])
pos += 6
flag = data[pos]
pos += 1
resp_type = (flag >> 5) & 0x07
peer_type = (flag >> 2) & 0x07
repeat = flag & 0x03
cap = None
if repeat in (1, 2) and pos + 6 <= len(data):
cap = bytes(data[pos:pos + 6])
pos += 6
entries.append(AccessResponseEntry(
peer_addr, resp_type, peer_type, repeat, cap,
))
return cls(entries)
# ---------------------------------------------------------------------------
# 7.1.4.7 启动系统管理帧信息
# ---------------------------------------------------------------------------
[文档]
@dataclass
class SystemMgmtFrameInfo:
"""启动系统管理帧信息 (7.1.4.7, 表35)"""
baseline_slot: int # 系统管理帧基线时隙 (4字节)
offset: int # 偏移量 (2字节, us)
access_addr: int # 系统管理帧接入地址 (3字节)
period: int # 周期间隔 (3字节, 基础时隙)
frame_type: int # 无线帧类型 (4 bits)
bandwidth: int # 带宽 (2 bits)
pilot_density: int # 导频密度 (2 bits)
channel_count: int # 可用频点个数 (1字节)
channel_table: bytes # 频点表 (可变长)
[文档]
def pack(self) -> bytes:
buf = self.baseline_slot.to_bytes(4, "big")
buf += self.offset.to_bytes(2, "big")
buf += self.access_addr.to_bytes(3, "big")
buf += self.period.to_bytes(3, "big")
config = (
((self.frame_type & 0x0F) << 4)
| ((self.bandwidth & 0x03) << 2)
| (self.pilot_density & 0x03)
)
buf += bytes([config])
buf += bytes([self.channel_count & 0xFF])
buf += self.channel_table
return buf
[文档]
@classmethod
def unpack(cls, data: bytes) -> SystemMgmtFrameInfo:
if len(data) < 14:
raise ValueError("数据不足: 至少需要 14 字节")
baseline = int.from_bytes(data[0:4], "big")
offset = int.from_bytes(data[4:6], "big")
access_addr = int.from_bytes(data[6:9], "big")
period = int.from_bytes(data[9:12], "big")
config = data[12]
ft = (config >> 4) & 0x0F
bw = (config >> 2) & 0x03
pd = config & 0x03
ch_count = data[13]
ch_table = bytes(data[14:14 + ch_count])
return cls(baseline, offset, access_addr, period, ft, bw, pd,
ch_count, ch_table)
# ---------------------------------------------------------------------------
# 7.1.4.3 传输指示信息
# ---------------------------------------------------------------------------
[文档]
@dataclass
class TransportIndicationInfo:
"""传输指示信息 (7.1.4.3, 表29)
总长度 480 bits (2.4GHz) 或 600 bits (5.xGHz),
取决于跳频地图长度 (80 或 200 bits)。
"""
system_slot_seq: int # 系统基础时隙顺序号 (32 bits)
event_group_offset: int # 事件组起始偏移时间 (24 bits, us)
event_group_period: int # 事件组周期 (16 bits, 调度时隙)
event_period: int # 事件周期 (16 bits, 调度时隙)
intra_event_interval: int # 事件内间隔 (16 bits, us)
inter_event_interval: int # 事件间间隔和事件组间间隔 (16 bits, us)
event_count: int # 事件总数 (8 bits)
peer_addr: bytes # 对端媒体接入层标识 (48 bits / 6字节)
peer_addr_type: int # 对端标识类型 (3 bits)
sleep_clock_accuracy: int # 睡眠时钟精度 (3 bits)
first_last_indication: int # 先发后发指示 (1 bit)
reserved_1: int = 0 # 保留 (1 bit)
tx_frame_type: int = 0 # 先发链路无线帧类型指示 (4 bits)
rx_frame_type: int = 0 # 后发链路无线帧类型指示 (4 bits)
tx_crc_type: int = 0 # 先发链路CRC类型 (1 bit)
rx_crc_type: int = 0 # 后发链路CRC类型 (1 bit)
tx_feedback_type: int = 0 # 先发链路反馈类型指示 (6 bits)
rx_feedback_type: int = 0 # 后发链路反馈类型指示 (3 bits)
system_schedule_slot: int = 0 # 系统调度时隙 (3 bits)
reserved_2: int = 0 # 保留 (2 bits)
tx_link_id: int = 0 # 先发链路逻辑链路标识 (24 bits)
rx_link_id: int = 0 # 后发链路逻辑链路标识 (24 bits)
tx_bandwidth: int = 0 # 先发链路带宽指示 (2 bits)
rx_bandwidth: int = 0 # 后发链路带宽指示 (2 bits)
tx_pilot_density: int = 0 # 先发链路导频密度指示 (2 bits)
rx_pilot_density: int = 0 # 后发链路导频密度指示 (2 bits)
tx_pdu_max: int = 0 # 先发链路PDU最大值 (11 bits)
rx_pdu_max: int = 0 # 后发链路PDU最大值 (11 bits)
tx_max_time_offset: int = 0 # 先发链路最大时间偏移量 (9 bits)
rx_max_time_offset: int = 0 # 后发链路最大时间偏移量 (9 bits)
tx_crc_init: int = 0 # 先发链路CRC初始值 (32 bits)
rx_crc_init: int = 0 # 后发链路CRC初始值 (32 bits)
delay_period: int = 0 # 延迟周期 (16 bits)
timeout: int = 0 # 超时时间 (16 bits)
hop_map: bytes = b"" # 跳频地图 (10 或 25 字节)
is_5g: bool = False # 是否使用 5.xGHz 跳频地图
[文档]
def pack(self) -> bytes:
"""编码为字节序列"""
bits: list[int] = []
bits.extend(_int_to_bits(self.system_slot_seq, 32))
bits.extend(_int_to_bits(self.event_group_offset, 24))
bits.extend(_int_to_bits(self.event_group_period, 16))
bits.extend(_int_to_bits(self.event_period, 16))
bits.extend(_int_to_bits(self.intra_event_interval, 16))
bits.extend(_int_to_bits(self.inter_event_interval, 16))
bits.extend(_int_to_bits(self.event_count, 8))
# peer_addr: 48 bits
for b in self.peer_addr[:6].ljust(6, b"\x00"):
bits.extend(_int_to_bits(b, 8))
bits.extend(_int_to_bits(self.peer_addr_type, 3))
bits.extend(_int_to_bits(self.sleep_clock_accuracy, 3))
bits.extend(_int_to_bits(self.first_last_indication, 1))
bits.extend(_int_to_bits(self.reserved_1, 1))
bits.extend(_int_to_bits(self.tx_frame_type, 4))
bits.extend(_int_to_bits(self.rx_frame_type, 4))
bits.extend(_int_to_bits(self.tx_crc_type, 1))
bits.extend(_int_to_bits(self.rx_crc_type, 1))
bits.extend(_int_to_bits(self.tx_feedback_type, 6))
bits.extend(_int_to_bits(self.rx_feedback_type, 3))
bits.extend(_int_to_bits(self.system_schedule_slot, 3))
bits.extend(_int_to_bits(self.reserved_2, 2))
bits.extend(_int_to_bits(self.tx_link_id, 24))
bits.extend(_int_to_bits(self.rx_link_id, 24))
bits.extend(_int_to_bits(self.tx_bandwidth, 2))
bits.extend(_int_to_bits(self.rx_bandwidth, 2))
bits.extend(_int_to_bits(self.tx_pilot_density, 2))
bits.extend(_int_to_bits(self.rx_pilot_density, 2))
bits.extend(_int_to_bits(self.tx_pdu_max, 11))
bits.extend(_int_to_bits(self.rx_pdu_max, 11))
bits.extend(_int_to_bits(self.tx_max_time_offset, 9))
bits.extend(_int_to_bits(self.rx_max_time_offset, 9))
bits.extend(_int_to_bits(self.tx_crc_init, 32))
bits.extend(_int_to_bits(self.rx_crc_init, 32))
bits.extend(_int_to_bits(self.delay_period, 16))
bits.extend(_int_to_bits(self.timeout, 16))
# hop_map: 80 or 200 bits
hop_len = 25 if self.is_5g else 10
hop = self.hop_map[:hop_len].ljust(hop_len, b"\x00")
for b in hop:
bits.extend(_int_to_bits(b, 8))
return _bits_to_bytes(bits)
[文档]
@classmethod
def unpack(
cls, data: bytes, is_5g: bool = False,
) -> TransportIndicationInfo:
"""从字节序列解码"""
hop_len = 25 if is_5g else 10
expected_bits = 400 + hop_len * 8
if len(data) * 8 < expected_bits:
raise ValueError(
f"数据不足: 需要至少 {expected_bits} bits "
f"({(expected_bits + 7) // 8} 字节)"
)
bits = _bytes_to_bits(data)
p = 0
def _take(w: int) -> int:
nonlocal p
v = _bits_to_int(bits, p, w)
p += w
return v
system_slot_seq = _take(32)
event_group_offset = _take(24)
event_group_period = _take(16)
event_period = _take(16)
intra_event_interval = _take(16)
inter_event_interval = _take(16)
event_count = _take(8)
peer_addr = bytes([_take(8) for _ in range(6)])
peer_addr_type = _take(3)
sleep_clock_accuracy = _take(3)
first_last_indication = _take(1)
reserved_1 = _take(1)
tx_frame_type = _take(4)
rx_frame_type = _take(4)
tx_crc_type = _take(1)
rx_crc_type = _take(1)
tx_feedback_type = _take(6)
rx_feedback_type = _take(3)
system_schedule_slot = _take(3)
reserved_2 = _take(2)
tx_link_id = _take(24)
rx_link_id = _take(24)
tx_bandwidth = _take(2)
rx_bandwidth = _take(2)
tx_pilot_density = _take(2)
rx_pilot_density = _take(2)
tx_pdu_max = _take(11)
rx_pdu_max = _take(11)
tx_max_time_offset = _take(9)
rx_max_time_offset = _take(9)
tx_crc_init = _take(32)
rx_crc_init = _take(32)
delay_period = _take(16)
timeout = _take(16)
hop_map = bytes([_take(8) for _ in range(hop_len)])
return cls(
system_slot_seq=system_slot_seq,
event_group_offset=event_group_offset,
event_group_period=event_group_period,
event_period=event_period,
intra_event_interval=intra_event_interval,
inter_event_interval=inter_event_interval,
event_count=event_count,
peer_addr=peer_addr,
peer_addr_type=peer_addr_type,
sleep_clock_accuracy=sleep_clock_accuracy,
first_last_indication=first_last_indication,
reserved_1=reserved_1,
tx_frame_type=tx_frame_type,
rx_frame_type=rx_frame_type,
tx_crc_type=tx_crc_type,
rx_crc_type=rx_crc_type,
tx_feedback_type=tx_feedback_type,
rx_feedback_type=rx_feedback_type,
system_schedule_slot=system_schedule_slot,
reserved_2=reserved_2,
tx_link_id=tx_link_id,
rx_link_id=rx_link_id,
tx_bandwidth=tx_bandwidth,
rx_bandwidth=rx_bandwidth,
tx_pilot_density=tx_pilot_density,
rx_pilot_density=rx_pilot_density,
tx_pdu_max=tx_pdu_max,
rx_pdu_max=rx_pdu_max,
tx_max_time_offset=tx_max_time_offset,
rx_max_time_offset=rx_max_time_offset,
tx_crc_init=tx_crc_init,
rx_crc_init=rx_crc_init,
delay_period=delay_period,
timeout=timeout,
hop_map=hop_map,
is_5g=is_5g,
)
# ---------------------------------------------------------------------------
# 7.1.4.8 非链接态广播链路信息
# ---------------------------------------------------------------------------
[文档]
@dataclass
class NonLinkedBroadcastLinkInfo:
"""非链接态广播链路信息 (7.1.4.8, 表36/37)"""
transmission_type: int # 传输类型 (1 bit, 0=异步, 1=同步)
service_adapt_mode: int # 业务适配方式 (1 bit, 0=周期, 1=非周期)
reserved: int = 0 # 保留 (6 bits)
system_slot_seq: int = 0 # 系统基础时隙顺序号 (32 bits)
event_group_offset: int = 0 # 事件组起始偏移时间 (24 bits, us)
event_group_set_id: int = 0 # 事件组集合标识 (8 bits)
event_group_count: int = 0 # 事件组个数 (8 bits)
event_group_interval: int = 0 # 事件组间隔 (8 bits, 基础时隙)
event_group_period: int = 0 # 事件组周期 (16 bits, 基础时隙)
event_period: int = 0 # 事件周期 (16 bits, 基础时隙)
event_count: int = 0 # 事件总数 (8 bits)
sync_anchor_delay: int = 0 # 同步锚点延迟 (24 bits, us)
sync_ref_delay: int = 0 # 同步参考延迟 (24 bits, us)
base_link_id: int = 0 # 基础逻辑链路标识 (24 bits)
frame_type: int = 0 # 无线帧类型 (4 bits)
bandwidth: int = 0 # 带宽指示 (2 bits)
pilot_density: int = 0 # 导频密度指示 (2 bits)
sdu_max: int = 0 # SDU最大值 (12 bits)
sdu_period: int = 0 # SDU周期 (20 bits, us)
pdu_max: int = 0 # PDU最大值 (11 bits)
new_packet_count: int = 0 # 新数据包计数 (4 bits)
crc_type: int = 0 # CRC类型 (1 bit)
crc_base_init: int = 0 # CRC基础初始值 (32 bits)
hop_map: bytes = b"" # 跳频地图 (10 或 25 字节)
giv: bytes = b"" # 加密使用的GIV (8 字节)
gskd: bytes = b"" # 生成加密密钥的GSKD (16 字节)
is_5g: bool = False # 是否使用 5.xGHz 跳频地图
[文档]
def pack(self) -> bytes:
"""编码为字节序列"""
bits: list[int] = []
bits.extend(_int_to_bits(self.transmission_type, 1))
bits.extend(_int_to_bits(self.service_adapt_mode, 1))
bits.extend(_int_to_bits(self.reserved, 6))
bits.extend(_int_to_bits(self.system_slot_seq, 32))
bits.extend(_int_to_bits(self.event_group_offset, 24))
bits.extend(_int_to_bits(self.event_group_set_id, 8))
bits.extend(_int_to_bits(self.event_group_count, 8))
bits.extend(_int_to_bits(self.event_group_interval, 8))
bits.extend(_int_to_bits(self.event_group_period, 16))
bits.extend(_int_to_bits(self.event_period, 16))
bits.extend(_int_to_bits(self.event_count, 8))
bits.extend(_int_to_bits(self.sync_anchor_delay, 24))
bits.extend(_int_to_bits(self.sync_ref_delay, 24))
bits.extend(_int_to_bits(self.base_link_id, 24))
bits.extend(_int_to_bits(self.frame_type, 4))
bits.extend(_int_to_bits(self.bandwidth, 2))
bits.extend(_int_to_bits(self.pilot_density, 2))
bits.extend(_int_to_bits(self.sdu_max, 12))
bits.extend(_int_to_bits(self.sdu_period, 20))
bits.extend(_int_to_bits(self.pdu_max, 11))
bits.extend(_int_to_bits(self.new_packet_count, 4))
bits.extend(_int_to_bits(self.crc_type, 1))
bits.extend(_int_to_bits(self.crc_base_init, 32))
# hop_map
hop_len = 25 if self.is_5g else 10
hop = self.hop_map[:hop_len].ljust(hop_len, b"\x00")
for b in hop:
bits.extend(_int_to_bits(b, 8))
# giv: 8 字节
giv = self.giv[:8].ljust(8, b"\x00")
for b in giv:
bits.extend(_int_to_bits(b, 8))
# gskd: 16 字节
gskd = self.gskd[:16].ljust(16, b"\x00")
for b in gskd:
bits.extend(_int_to_bits(b, 8))
return _bits_to_bytes(bits)
[文档]
@classmethod
def unpack(
cls, data: bytes, is_5g: bool = False,
) -> NonLinkedBroadcastLinkInfo:
"""从字节序列解码"""
hop_len = 25 if is_5g else 10
# 固定位: 288 + hop + giv(64) + gskd(128)
fixed_bits = 288 + hop_len * 8 + 64 + 128
if len(data) * 8 < fixed_bits:
raise ValueError(
f"数据不足: 需要至少 {fixed_bits} bits "
f"({(fixed_bits + 7) // 8} 字节)"
)
bits = _bytes_to_bits(data)
p = 0
def _take(w: int) -> int:
nonlocal p
v = _bits_to_int(bits, p, w)
p += w
return v
transmission_type = _take(1)
service_adapt_mode = _take(1)
reserved = _take(6)
system_slot_seq = _take(32)
event_group_offset = _take(24)
event_group_set_id = _take(8)
event_group_count = _take(8)
event_group_interval = _take(8)
event_group_period = _take(16)
event_period = _take(16)
event_count = _take(8)
sync_anchor_delay = _take(24)
sync_ref_delay = _take(24)
base_link_id = _take(24)
frame_type = _take(4)
bandwidth = _take(2)
pilot_density = _take(2)
sdu_max = _take(12)
sdu_period = _take(20)
pdu_max = _take(11)
new_packet_count = _take(4)
crc_type = _take(1)
crc_base_init = _take(32)
hop_map = bytes([_take(8) for _ in range(hop_len)])
giv = bytes([_take(8) for _ in range(8)])
gskd = bytes([_take(8) for _ in range(16)])
return cls(
transmission_type=transmission_type,
service_adapt_mode=service_adapt_mode,
reserved=reserved,
system_slot_seq=system_slot_seq,
event_group_offset=event_group_offset,
event_group_set_id=event_group_set_id,
event_group_count=event_group_count,
event_group_interval=event_group_interval,
event_group_period=event_group_period,
event_period=event_period,
event_count=event_count,
sync_anchor_delay=sync_anchor_delay,
sync_ref_delay=sync_ref_delay,
base_link_id=base_link_id,
frame_type=frame_type,
bandwidth=bandwidth,
pilot_density=pilot_density,
sdu_max=sdu_max,
sdu_period=sdu_period,
pdu_max=pdu_max,
new_packet_count=new_packet_count,
crc_type=crc_type,
crc_base_init=crc_base_init,
hop_map=hop_map,
giv=giv,
gskd=gskd,
is_5g=is_5g,
)
# ---------------------------------------------------------------------------
# 7.1.4.9 查询请求过滤信息
# ---------------------------------------------------------------------------
[文档]
@dataclass
class QueryRequestFilterInfo:
"""查询请求过滤信息 (7.1.4.9, 表38)
变长结构, 包含:
- 16比特标准服务标识列表
- 128比特自定义服务标识列表
"""
uuid_16_list: list[int] = field(default_factory=list)
uuid_128_list: list[bytes] = field(default_factory=list)
[文档]
def pack(self) -> bytes:
"""编码为字节序列"""
buf = b""
# 16-bit UUID 个数 (1字节)
count_16 = len(self.uuid_16_list)
buf += bytes([count_16 & 0xFF])
for uuid_16 in self.uuid_16_list:
buf += struct.pack(">H", uuid_16 & 0xFFFF)
# 128-bit UUID 个数 (1字节)
count_128 = len(self.uuid_128_list)
buf += bytes([count_128 & 0xFF])
for uuid_128 in self.uuid_128_list:
buf += uuid_128[:16].ljust(16, b"\x00")
return buf
[文档]
@classmethod
def unpack(cls, data: bytes) -> QueryRequestFilterInfo:
"""从字节序列解码"""
if len(data) < 1:
raise ValueError("数据不足: 至少需要 1 字节")
pos = 0
count_16 = data[pos]
pos += 1
uuid_16_list: list[int] = []
for _ in range(count_16):
if pos + 2 > len(data):
break
uuid_16 = struct.unpack(">H", data[pos:pos + 2])[0]
uuid_16_list.append(uuid_16)
pos += 2
uuid_128_list: list[bytes] = []
if pos < len(data):
count_128 = data[pos]
pos += 1
for _ in range(count_128):
if pos + 16 > len(data):
break
uuid_128_list.append(bytes(data[pos:pos + 16]))
pos += 16
return cls(uuid_16_list, uuid_128_list)
# ---------------------------------------------------------------------------
# 7.1.4 广播帧通用结构
# ---------------------------------------------------------------------------
[文档]
@dataclass
class BroadcastFrame:
"""广播帧通用结构 (7.1.4, 表25)
结构指示 (5 bits) 控制可选字段:
- bit 0: 本端标识存在
- bit 1: 解析密钥标识存在
- bit 2: 对端标识存在
- bit 3: 扩展广播帧资源配置存在
- bit 4: 数据部分存在
"""
structure_indication: int # 广播帧结构指示 (5 bits)
local_addr_type: int # 本端标识类型 (3 bits)
peer_addr_type: int # 对端标识类型 (3 bits)
local_addr: bytes # 本端标识 (6字节)
irk_id: int # 解析密钥标识 (1字节)
peer_addr: bytes # 对端标识 (6字节)
ext_adv_config: ExtAdvResourceConfig | None = None
data_items: list[tuple[int, bytes]] = field(default_factory=list)
[文档]
def pack(self) -> bytes:
si = self.structure_indication & 0x1F
b0 = (si << 3)
b1 = (
((self.local_addr_type & 0x07) << 5)
| ((self.peer_addr_type & 0x07) << 2)
)
buf = bytes([b0, b1])
if si & 0x01:
buf += self.local_addr[:6].ljust(6, b"\x00")
if si & 0x02:
buf += bytes([self.irk_id & 0xFF])
if si & 0x04:
buf += self.peer_addr[:6].ljust(6, b"\x00")
if si & 0x08 and self.ext_adv_config:
buf += self.ext_adv_config.pack()
if si & 0x10:
for dtype, content in self.data_items:
buf += bytes([dtype & 0xFF, len(content) & 0xFF])
buf += content
return buf
[文档]
@classmethod
def unpack(cls, data: bytes) -> BroadcastFrame:
if len(data) < 2:
raise ValueError("数据不足: 至少需要 2 字节")
si = (data[0] >> 3) & 0x1F
local_type = (data[1] >> 5) & 0x07
peer_type = (data[1] >> 2) & 0x07
pos = 2
local_addr = b"\x00" * 6
irk_id = 0
peer_addr = b"\x00" * 6
ext_config = None
items: list[tuple[int, bytes]] = []
if si & 0x01:
local_addr = bytes(data[pos:pos + 6])
pos += 6
if si & 0x02:
irk_id = data[pos]
pos += 1
if si & 0x04:
peer_addr = bytes(data[pos:pos + 6])
pos += 6
if si & 0x08:
ext_config = ExtAdvResourceConfig.unpack(data[pos:pos + 4])
pos += 4
if si & 0x10:
while pos + 2 <= len(data):
dtype = data[pos]
dlen = data[pos + 1]
pos += 2
if pos + dlen > len(data):
break
content = bytes(data[pos:pos + dlen])
pos += dlen
items.append((dtype, content))
return cls(si, local_type, peer_type, local_addr,
irk_id, peer_addr, ext_config, items)
# ---------------------------------------------------------------------------
# 7.1.5 广播帧过滤
# ---------------------------------------------------------------------------
[文档]
class FilterOp(IntEnum):
"""过滤条件组合运算符"""
AND = 0
OR = 1
NOT = 2
[文档]
@dataclass
class FilterCondition:
"""单个过滤条件"""
field_name: str # 匹配字段: address, name, uuid, service_data, vendor_data
value: bytes = b"" # 匹配值
negate: bool = False # 是否取反
[文档]
@dataclass
class BroadcastFilter:
"""广播帧过滤器 (7.1.5)
支持按设备地址、设备名称、服务UUID、服务数据和厂商数据等信息
的单个条件或多个条件的与/或/非组合过滤。
"""
conditions: list[FilterCondition] = field(default_factory=list)
operator: FilterOp = FilterOp.AND
[文档]
def match(self, frame: BroadcastFrame) -> bool:
"""检查广播帧是否满足过滤条件"""
if not self.conditions:
return True
results = [self._eval_condition(c, frame) for c in self.conditions]
if self.operator == FilterOp.AND:
return all(results)
elif self.operator == FilterOp.OR:
return any(results)
else: # NOT: 对第一个条件取反
return not results[0] if results else True
@staticmethod
def _eval_condition(cond: FilterCondition, frame: BroadcastFrame) -> bool:
matched = False
if cond.field_name == "address":
matched = frame.local_addr == cond.value
elif cond.field_name == "name":
for _dtype, content in frame.data_items:
if content == cond.value:
matched = True
break
elif cond.field_name in ("uuid", "service_data", "vendor_data"):
for _dtype, content in frame.data_items:
if cond.value in content:
matched = True
break
return not matched if cond.negate else matched
# ---------------------------------------------------------------------------
# 7.1.4.10 非链接态窄带跳频测量信息配置
# ---------------------------------------------------------------------------
[文档]
@dataclass
class NarrowbandMeasurementConfig:
"""非链接态窄带跳频测量信息配置 (7.1.4.10, 表39)
包含感知信号配置、事件组参数、跳频配置、初始化阶段参数、
先发链路参数和跳频信道位图等字段。
"""
config_index: int = 0 # 感知信号配置索引 (8b)
event_group_start_offset: int = 0 # 事件组起始偏移时间 (24b, us)
nb_event_period: int = 0 # 窄带跳频感知事件周期 (16b)
nb_event_group_period: int = 0 # 窄带跳频感知事件组周期 (16b)
nb_events_in_group: int = 0 # 事件组内事件总数 (8b)
nb_event_group_count: int = 0 # 事件组总数 (8b)
nb_event_stat_count: int = 0 # 事件统计数量 (8b)
schedule_slot: int = 4 # 系统调度时隙 (3b, 默认125us)
meas_signal_bw: int = 0 # 测量信号带宽指示 (2b)
hopping_mode: int = 0 # 跳频方式指示 (2b)
init_channel: int = 0 # 初始化阶段的频点 (10b)
init_interaction_type: int = 0 # 初始化阶段交互类型指示 (2b)
init_sync_signal: int = 0 # 初始化阶段同步信号指示 (4b)
init_sync_config: int = 0 # 初始化阶段同步信号配置 (24b)
init_meas_signal_type: int = 0 # 初始化阶段测量信号类型指示 (1b)
init_meas_signal1_len: int = 0 # 初始化阶段的测量信号1的长度 (3b)
init_intra_event_interval: int = 0 # 初始化阶段事件内间隔 (16b, us)
init_switch_interval: int = 0 # 初始化阶段的测量帧内部切换间隔 (8b, us)
init_meas_signal2_len: int = 0 # 初始化阶段的测量信号2的长度 (8b)
init_inter_event_interval: int = 0 # 初始化阶段事件间间隔 (16b, us)
mf1_inter_event_interval: int = 0 # 测量帧类型1事件间间隔 (16b, us)
mf2_inter_event_interval: int = 0 # 测量帧类型2事件间间隔 (16b, us)
nb_intra_event_interval: int = 0 # 窄带跳频感知事件内间隔 (16b, us)
nb_inter_group_interval: int = 0 # 窄带跳频感知事件组间间隔 (24b, us)
nb_event_total: int = 0 # 窄带跳频感知事件总数 (8b)
mf1_event_period: int = 0 # 测量帧类型1事件的周期 (8b)
mf1_switch_interval: int = 0 # 测量帧类型1内部切换间隔 (8b, us)
tx_sync_signal: int = 0 # 先发链路同步信号指示 (4b)
tx_antenna_count: int = 0 # 先发节点发送天线数量指示 (4b)
tx_sync_config: int = 0 # 先发链路同步信号配置 (24b)
tx_meas_signal_type: int = 0 # 先发链路测量信号类型指示 (1b)
tx_meas_signal1_len: int = 0 # 先发链路测量信号1长度指示 (3b)
tx_meas_signal1_security: int = 0 # 先发链路测量信号1安全类型指示 (2b)
tx_meas_signal2_multitone: int = 0 # 先发链路测量信号2多音指示 (2b)
tx_first_sub_signal_len: int = 0 # 先发链路第一个子测量信号的长度指示 (8b)
tx_antenna_switch_interval: int = 0 # 先发节点天线切换间隔 (8b, us)
tx_meas_signal2_len: int = 0 # 先发链路测量信号2长度指示 (8b)
hop_band_bitmap: int = 0 # 跳频信道频带指示 (8b, 位图)
hop_channel_2g4: int = 0 # 2.4GHz跳频信道指示 (80b)
hop_channel_5g1: int = 0 # 5.1GHz跳频信道指示 (200b)
hop_channel_5g8: int = 0 # 5.8GHz跳频信道指示 (128b)
stability: int = 0 # 稳定性指示 (1b)
reserved: int = 0 # 预留 (4b)
[文档]
def pack(self) -> bytes:
bits: list[int] = []
bits.extend(_int_to_bits(self.config_index, 8))
bits.extend(_int_to_bits(self.event_group_start_offset, 24))
bits.extend(_int_to_bits(self.nb_event_period, 16))
bits.extend(_int_to_bits(self.nb_event_group_period, 16))
bits.extend(_int_to_bits(self.nb_events_in_group, 8))
bits.extend(_int_to_bits(self.nb_event_group_count, 8))
bits.extend(_int_to_bits(self.nb_event_stat_count, 8))
bits.extend(_int_to_bits(self.schedule_slot, 3))
bits.extend(_int_to_bits(self.meas_signal_bw, 2))
bits.extend(_int_to_bits(self.hopping_mode, 2))
bits.extend(_int_to_bits(self.init_channel, 10))
bits.extend(_int_to_bits(self.init_interaction_type, 2))
bits.extend(_int_to_bits(self.init_sync_signal, 4))
bits.extend(_int_to_bits(self.init_sync_config, 24))
bits.extend(_int_to_bits(self.init_meas_signal_type, 1))
bits.extend(_int_to_bits(self.init_meas_signal1_len, 3))
bits.extend(_int_to_bits(self.init_intra_event_interval, 16))
bits.extend(_int_to_bits(self.init_switch_interval, 8))
bits.extend(_int_to_bits(self.init_meas_signal2_len, 8))
bits.extend(_int_to_bits(self.init_inter_event_interval, 16))
bits.extend(_int_to_bits(self.mf1_inter_event_interval, 16))
bits.extend(_int_to_bits(self.mf2_inter_event_interval, 16))
bits.extend(_int_to_bits(self.nb_intra_event_interval, 16))
bits.extend(_int_to_bits(self.nb_inter_group_interval, 24))
bits.extend(_int_to_bits(self.nb_event_total, 8))
bits.extend(_int_to_bits(self.mf1_event_period, 8))
bits.extend(_int_to_bits(self.mf1_switch_interval, 8))
bits.extend(_int_to_bits(self.tx_sync_signal, 4))
bits.extend(_int_to_bits(self.tx_antenna_count, 4))
bits.extend(_int_to_bits(self.tx_sync_config, 24))
bits.extend(_int_to_bits(self.tx_meas_signal_type, 1))
bits.extend(_int_to_bits(self.tx_meas_signal1_len, 3))
bits.extend(_int_to_bits(self.tx_meas_signal1_security, 2))
bits.extend(_int_to_bits(self.tx_meas_signal2_multitone, 2))
bits.extend(_int_to_bits(self.tx_first_sub_signal_len, 8))
bits.extend(_int_to_bits(self.tx_antenna_switch_interval, 8))
bits.extend(_int_to_bits(self.tx_meas_signal2_len, 8))
bits.extend(_int_to_bits(self.hop_band_bitmap, 8))
# 根据频带位图决定包含哪些信道指示
if self.hop_band_bitmap & 0x01:
bits.extend(_int_to_bits(self.hop_channel_2g4, 80))
if self.hop_band_bitmap & 0x02:
bits.extend(_int_to_bits(self.hop_channel_5g1, 200))
if self.hop_band_bitmap & 0x04:
bits.extend(_int_to_bits(self.hop_channel_5g8, 128))
bits.extend(_int_to_bits(self.stability, 1))
bits.extend(_int_to_bits(self.reserved, 4))
# 补齐到字节边界
pad = (8 - len(bits) % 8) % 8
bits.extend([0] * pad)
return _bits_to_bytes(bits)
[文档]
@classmethod
def unpack(cls, data: bytes) -> NarrowbandMeasurementConfig:
bits = _bytes_to_bits(data)
p = 0
def _take(w: int) -> int:
nonlocal p
v = _bits_to_int(bits, p, w)
p += w
return v
obj = cls()
obj.config_index = _take(8)
obj.event_group_start_offset = _take(24)
obj.nb_event_period = _take(16)
obj.nb_event_group_period = _take(16)
obj.nb_events_in_group = _take(8)
obj.nb_event_group_count = _take(8)
obj.nb_event_stat_count = _take(8)
obj.schedule_slot = _take(3)
obj.meas_signal_bw = _take(2)
obj.hopping_mode = _take(2)
obj.init_channel = _take(10)
obj.init_interaction_type = _take(2)
obj.init_sync_signal = _take(4)
obj.init_sync_config = _take(24)
obj.init_meas_signal_type = _take(1)
obj.init_meas_signal1_len = _take(3)
obj.init_intra_event_interval = _take(16)
obj.init_switch_interval = _take(8)
obj.init_meas_signal2_len = _take(8)
obj.init_inter_event_interval = _take(16)
obj.mf1_inter_event_interval = _take(16)
obj.mf2_inter_event_interval = _take(16)
obj.nb_intra_event_interval = _take(16)
obj.nb_inter_group_interval = _take(24)
obj.nb_event_total = _take(8)
obj.mf1_event_period = _take(8)
obj.mf1_switch_interval = _take(8)
obj.tx_sync_signal = _take(4)
obj.tx_antenna_count = _take(4)
obj.tx_sync_config = _take(24)
obj.tx_meas_signal_type = _take(1)
obj.tx_meas_signal1_len = _take(3)
obj.tx_meas_signal1_security = _take(2)
obj.tx_meas_signal2_multitone = _take(2)
obj.tx_first_sub_signal_len = _take(8)
obj.tx_antenna_switch_interval = _take(8)
obj.tx_meas_signal2_len = _take(8)
obj.hop_band_bitmap = _take(8)
if obj.hop_band_bitmap & 0x01:
obj.hop_channel_2g4 = _take(80)
if obj.hop_band_bitmap & 0x02:
obj.hop_channel_5g1 = _take(200)
if obj.hop_band_bitmap & 0x04:
obj.hop_channel_5g8 = _take(128)
obj.stability = _take(1)
obj.reserved = _take(4)
return obj
# ---------------------------------------------------------------------------
# 7.1.4.11 非链接态超宽带脉冲测量信息配置
# ---------------------------------------------------------------------------
[文档]
@dataclass
class UWBPulseMeasurementConfig:
"""非链接态超宽带脉冲测量信息配置 (7.1.4.11, 表41)"""
config_index: int = 0 # 测量信号配置索引 (8b)
event_group_start_offset: int = 0 # 事件组起始偏移时间 (24b, us)
init_event_group_period: int = 0 # 初始化阶段事件组周期 (32b, 调度时隙)
schedule_slot: int = 4 # 系统调度时隙 (3b, 默认125us)
init_channel_count: int = 0 # 初始化阶段的频点个数 (5b)
init_channel_list: list[int] = field(default_factory=list) # 频点列表 (每个2字节)
init_interaction_type: int = 0 # 初始化阶段交互类型指示 (3b)
init_sync_signal: int = 0 # 初始化阶段同步信号指示 (5b)
init_sync_config: int = 0 # 初始化阶段同步信号配置 (24b)
init_intra_event_interval: int = 0 # 初始化阶段事件内间隔 (16b, us)
init_switch_interval: int = 0 # 初始化阶段测量帧内部切换间隔 (8b, us)
init_meas_signal_type: int = 0 # 初始化阶段测量信号类型指示 (2b)
init_meas_signal1_len: int = 0 # 初始化阶段测量信号1的长度 (6b)
init_meas_signal2_len: int = 0 # 初始化阶段测量信号2的长度 (8b)
tx_antenna_count: int = 0 # 先发节点发送天线数量指示 (4b)
tx_multi_antenna_config: int = 0 # 先发节点多天线切换资源配置指示 (8b)
uwb_first_frame_start: int = 0 # 超宽带脉冲第一测量帧起始时刻 (22b)
uwb_event_period: int = 0 # 超宽带脉冲测量事件周期 (16b)
uwb_event_group_period: int = 0 # 超宽带脉冲测量事件组周期 (16b, us)
uwb_events_in_group: int = 0 # 超宽带脉冲测量事件组内事件总数 (8b)
uwb_event_group_count: int = 0 # 超宽带脉冲测量事件组总数 (16b)
uwb_event_stat_count: int = 0 # 超宽带脉冲测量事件统计数量 (8b)
uwb_channel: int = 0 # 超宽带脉冲信道指示 (8b)
uwb_signal_bw: int = 0 # 超宽带脉冲信号带宽指示 (2b)
channel_overlap_mode: int = 0 # 信道重叠模式指示 (4b)
channel_order_mode: int = 0 # 信道使用顺序模式指示 (2b)
channel_stitch_count: int = 0 # 信道拼接数量指示 (8b)
channel_stitch_step: int = 0 # 信道拼接测量子片段步进指示 (4b)
sync_symbol_kl: int = 0 # 同步符号码字长度K和占空因子L (4b)
sync_symbol_n: int = 0 # 同步符号个数N (16b)
sync_symbol_index: int = 0 # 同步符号码字的索引 (8b)
meas_symbol_shift: int = 0 # 测量符号码字循环移位 (8b)
meas_seg_symbol_count: int = 0 # 测量子片段符号数Nseg (16b)
meas_seg_count: int = 0 # 测量子片段数Mseg (8b)
meas_seg_gap: int = 0 # 测量子片段间隔粒度Ngap (16b)
meas_symbol_cp_len: int = 0 # 测量符号循环前缀长度Lcp (6b)
meas_symbol_zero_len: int = 0 # 测量符号补零长度Lzero (8b)
stability: int = 0 # 稳定性指示 (1b)
device_status: int = 0 # 感知设备状态信息 (7b)
[文档]
def pack(self) -> bytes:
bits: list[int] = []
bits.extend(_int_to_bits(self.config_index, 8))
bits.extend(_int_to_bits(self.event_group_start_offset, 24))
bits.extend(_int_to_bits(self.init_event_group_period, 32))
bits.extend(_int_to_bits(self.schedule_slot, 3))
bits.extend(_int_to_bits(self.init_channel_count, 5))
# 频点列表: 每个频点 16 bits
for i in range(self.init_channel_count):
ch = self.init_channel_list[i] if i < len(self.init_channel_list) else 0
bits.extend(_int_to_bits(ch, 16))
bits.extend(_int_to_bits(self.init_interaction_type, 3))
bits.extend(_int_to_bits(self.init_sync_signal, 5))
bits.extend(_int_to_bits(self.init_sync_config, 24))
bits.extend(_int_to_bits(self.init_intra_event_interval, 16))
bits.extend(_int_to_bits(self.init_switch_interval, 8))
bits.extend(_int_to_bits(self.init_meas_signal_type, 2))
bits.extend(_int_to_bits(self.init_meas_signal1_len, 6))
bits.extend(_int_to_bits(self.init_meas_signal2_len, 8))
bits.extend(_int_to_bits(self.tx_antenna_count, 4))
bits.extend(_int_to_bits(self.tx_multi_antenna_config, 8))
bits.extend(_int_to_bits(self.uwb_first_frame_start, 22))
bits.extend(_int_to_bits(self.uwb_event_period, 16))
bits.extend(_int_to_bits(self.uwb_event_group_period, 16))
bits.extend(_int_to_bits(self.uwb_events_in_group, 8))
bits.extend(_int_to_bits(self.uwb_event_group_count, 16))
bits.extend(_int_to_bits(self.uwb_event_stat_count, 8))
bits.extend(_int_to_bits(self.uwb_channel, 8))
bits.extend(_int_to_bits(self.uwb_signal_bw, 2))
bits.extend(_int_to_bits(self.channel_overlap_mode, 4))
bits.extend(_int_to_bits(self.channel_order_mode, 2))
bits.extend(_int_to_bits(self.channel_stitch_count, 8))
bits.extend(_int_to_bits(self.channel_stitch_step, 4))
bits.extend(_int_to_bits(self.sync_symbol_kl, 4))
bits.extend(_int_to_bits(self.sync_symbol_n, 16))
bits.extend(_int_to_bits(self.sync_symbol_index, 8))
bits.extend(_int_to_bits(self.meas_symbol_shift, 8))
bits.extend(_int_to_bits(self.meas_seg_symbol_count, 16))
bits.extend(_int_to_bits(self.meas_seg_count, 8))
bits.extend(_int_to_bits(self.meas_seg_gap, 16))
bits.extend(_int_to_bits(self.meas_symbol_cp_len, 6))
bits.extend(_int_to_bits(self.meas_symbol_zero_len, 8))
bits.extend(_int_to_bits(self.stability, 1))
bits.extend(_int_to_bits(self.device_status, 7))
# 补齐到字节边界
pad = (8 - len(bits) % 8) % 8
bits.extend([0] * pad)
return _bits_to_bytes(bits)
[文档]
@classmethod
def unpack(cls, data: bytes) -> UWBPulseMeasurementConfig:
bits = _bytes_to_bits(data)
p = 0
def _take(w: int) -> int:
nonlocal p
v = _bits_to_int(bits, p, w)
p += w
return v
obj = cls()
obj.config_index = _take(8)
obj.event_group_start_offset = _take(24)
obj.init_event_group_period = _take(32)
obj.schedule_slot = _take(3)
obj.init_channel_count = _take(5)
obj.init_channel_list = [_take(16) for _ in range(obj.init_channel_count)]
obj.init_interaction_type = _take(3)
obj.init_sync_signal = _take(5)
obj.init_sync_config = _take(24)
obj.init_intra_event_interval = _take(16)
obj.init_switch_interval = _take(8)
obj.init_meas_signal_type = _take(2)
obj.init_meas_signal1_len = _take(6)
obj.init_meas_signal2_len = _take(8)
obj.tx_antenna_count = _take(4)
obj.tx_multi_antenna_config = _take(8)
obj.uwb_first_frame_start = _take(22)
obj.uwb_event_period = _take(16)
obj.uwb_event_group_period = _take(16)
obj.uwb_events_in_group = _take(8)
obj.uwb_event_group_count = _take(16)
obj.uwb_event_stat_count = _take(8)
obj.uwb_channel = _take(8)
obj.uwb_signal_bw = _take(2)
obj.channel_overlap_mode = _take(4)
obj.channel_order_mode = _take(2)
obj.channel_stitch_count = _take(8)
obj.channel_stitch_step = _take(4)
obj.sync_symbol_kl = _take(4)
obj.sync_symbol_n = _take(16)
obj.sync_symbol_index = _take(8)
obj.meas_symbol_shift = _take(8)
obj.meas_seg_symbol_count = _take(16)
obj.meas_seg_count = _take(8)
obj.meas_seg_gap = _take(16)
obj.meas_symbol_cp_len = _take(6)
obj.meas_symbol_zero_len = _take(8)
obj.stability = _take(1)
obj.device_status = _take(7)
return obj
# ---------------------------------------------------------------------------
# 比特操作辅助函数
# ---------------------------------------------------------------------------
def _int_to_bits(val: int, width: int) -> list[int]:
"""整数转比特列表 (高位在前)"""
return [(val >> (width - 1 - i)) & 1 for i in range(width)]
def _bits_to_int(bits: list[int], start: int, width: int) -> int:
"""比特列表转整数"""
val = 0
for i in range(width):
if start + i < len(bits):
val = (val << 1) | (bits[start + i] & 1)
else:
val <<= 1
return val
def _bits_to_bytes(bits: list[int]) -> bytes:
"""比特列表转字节序列"""
result = []
for i in range(0, len(bits), 8):
byte_val = 0
for j in range(8):
if i + j < len(bits):
byte_val = (byte_val << 1) | (bits[i + j] & 1)
else:
byte_val <<= 1
result.append(byte_val)
return bytes(result)
def _bytes_to_bits(data: bytes) -> list[int]:
"""字节序列转比特列表"""
bits: list[int] = []
bits.extend((b >> i) & 1 for b in data for i in range(7, -1, -1))
return bits