nearlink_sdr.mac.access 源代码

"""接入流程管理 -- TXS-10002-2025 标准 7.1.3。

实现 SLE 设备发现和接入的完整六阶段流程:
  a) 广播方准备并发送可接入扩展广播帧
  b) 接入发起方发送接入请求帧
  c) 广播方接收请求并发送响应
  d) 接入方接收响应并进入链接态
  e) 数据链路建立
  f) 安全流程 (委托给 security 模块)

协调 BroadcastFrame、AccessBasicInfo、TransportIndicationInfo、
AccessRequestInfo、AccessResponseInfo 等数据结构完成端到端接入。
"""

from __future__ import annotations

__all__ = [
    "MAX_ADV_INTERVAL_US",
    "MAX_ADV_RANDOM_DELAY_US",
    "MIN_ADV_INTERVAL_US",
    "MIN_ADV_TO_EXT_ADV_GAP",
    "MIN_EXT_ADV_TO_REQUEST_GAP",
    "MIN_REQUEST_TO_RESPONSE_GAP",
    "AccessConfig",
    "AccessPhase",
    "AccessWhitelist",
    "BroadcasterAccessManager",
    "DiscoveryManager",
    "InitiatorAccessManager",
    "NegotiatedRole",
    "NonConnectedBroadcastConfig",
    "NonConnectedBroadcastManager",
    "NonConnectedBroadcastResult",
    "negotiate_gt_role",
    "parse_non_connected_broadcast",
    "run_access_procedure",
]


import logging
import random
from dataclasses import dataclass, field
from enum import IntEnum
from typing import Any

from nearlink_sdr.mac.broadcast import (
    AccessBasicInfo,
    AccessRequestInfo,
    AccessResponseEntry,
    AccessResponseInfo,
    AccessResponseType,
    BroadcastDataType,
    BroadcastFrame,
    DiscoveryAccessEntry,
    DiscoveryAccessResourceConfig,
    GTNegotiation,
    NonLinkedBroadcastLinkInfo,
    QueryRequestFilterInfo,
    SystemMgmtFrameInfo,
    TransportIndicationInfo,
)
from nearlink_sdr.mac.link_manager import (
    Event,
    EventType,
    LinkManager,
    Role,
)

log = logging.getLogger(__name__)

# ── 最小时间间隔 (标准 7.1.3, 单位 μs) ──
MIN_ADV_TO_EXT_ADV_GAP = 300
MIN_EXT_ADV_TO_REQUEST_GAP = 300
MIN_REQUEST_TO_RESPONSE_GAP = 300

# ── 广播间隔约束 (标准 7.1.1, 单位 μs) ──
MIN_ADV_INTERVAL_US = 4_000              # 4 ms
MAX_ADV_INTERVAL_US = 2_097_151_875      # 2097.151875 s
MAX_ADV_RANDOM_DELAY_US = 2_000          # 2 ms


[文档] class AccessPhase(IntEnum): """接入流程阶段标识。""" IDLE = 0 ADV_SENDING = 1 # a: 广播方发送扩展广播帧 REQ_WINDOW = 2 # b: 接入请求窗口 RSP_WINDOW = 3 # c: 接入响应窗口 LINK_SETUP = 4 # d/e: 链路建立 COMPLETED = 5 # 接入完成
[文档] @dataclass class AccessConfig: """接入流程配置参数。""" # 广播/接入资源配置 request_offset_us: int = 600 # 扩展广播帧结束到请求窗口 (μs) request_max_length: int = 64 # 接入请求帧最大数据长度 (字节) response_offset_us: int = 1200 # 扩展广播帧结束到响应窗口 (μs) window_count: int = 1 # 并行请求窗口个数 # GT 角色协商 gt_preference: int = 0 # 0=偏好T节点, 1=偏好G节点 gt_negotiable: bool = True # 角色是否可协商 # 超时与重试 access_timeout_ms: int = 5000 # 接入超时 max_retries: int = 3 # 最大重试次数
[文档] @dataclass class NegotiatedRole: """GT 角色协商结果。""" local_role: Role peer_role: Role negotiated: bool = False # 是否经过协商 (vs 默认分配)
# --------------------------------------------------------------------------- # 7.1.6 接入白名单 # ---------------------------------------------------------------------------
[文档] @dataclass class AccessWhitelist: """接入白名单 (7.1.6) 广播设备设置白名单后, 只接收白名单中设备的接入请求。 接入设备设置白名单后, 只向白名单中设备发起接入。 """ _addresses: set[bytes] = field(default_factory=set) enabled: bool = False
[文档] def add(self, address: bytes) -> None: """添加一个设备地址 (6 字节)""" self._addresses.add(bytes(address[:6]))
[文档] def remove(self, address: bytes) -> None: """移除一个设备地址""" self._addresses.discard(bytes(address[:6]))
[文档] def clear(self) -> None: """清空白名单""" self._addresses.clear()
[文档] def contains(self, address: bytes) -> bool: """检查地址是否在白名单中""" return bytes(address[:6]) in self._addresses
[文档] def check(self, address: bytes) -> bool: """检查是否允许该地址的接入 白名单未启用时允许所有地址; 启用后仅允许白名单内地址。 """ if not self.enabled: return True return self.contains(address)
@property def addresses(self) -> list[bytes]: return sorted(self._addresses)
# --------------------------------------------------------------------------- # 7.1.2 发现流程 # ---------------------------------------------------------------------------
[文档] @dataclass class DiscoveryManager: """发现流程管理器 (标准 7.1.2) 实现发现设备接收广播帧后的查询请求/响应流程: a) 广播设备发送基础广播帧和扩展广播帧 b) 发现设备收到可查询扩展广播帧后发送查询请求帧 c) 广播设备接收请求并发送查询响应帧 d) 发现设备接收查询响应帧, 完成发现 """ local_address: bytes = b"\x00" * 6 whitelist: AccessWhitelist = field(default_factory=AccessWhitelist) # 发现到的设备列表: {地址: (广播帧, 查询响应帧)} _discovered: dict[bytes, tuple[BroadcastFrame, BroadcastFrame | None]] = ( field(default_factory=dict) ) # 广播方: 查询请求处理器的数据存储 _query_responses: dict[bytes, BroadcastFrame] = field(default_factory=dict) # -- 发现设备端 --
[文档] def on_broadcast_received(self, frame: BroadcastFrame) -> bool: """处理收到的广播帧。 若帧中包含发现/接入资源配置且为可查询帧, 返回 True 表示可发送查询请求。 白名单启用时仅处理白名单内设备。 :param frame: 收到的广播帧。 :returns: True 表示可查询, 需要后续发送查询请求。 """ if not self.whitelist.check(frame.local_addr): return False addr = bytes(frame.local_addr) self._discovered[addr] = (frame, None) # 检查是否包含可查询的发现接入资源配置 for data_type, data_bytes in frame.data_items: if data_type == BroadcastDataType.DISCOVERY_ACCESS_RESOURCE: config = DiscoveryAccessResourceConfig.unpack(data_bytes) if config.entries: for entry in config.entries: if entry.request_type == 1: # 可查询 return True return False
[文档] def build_query_request( self, target_addr: bytes, filter_info: QueryRequestFilterInfo | None = None, upper_layer_data: bytes = b"", ) -> BroadcastFrame: """构造查询请求帧 (阶段 b)。 :param target_addr: 目标广播设备地址。 :param filter_info: 查询过滤信息 (按服务UUID过滤)。 :param upper_layer_data: 高层广播数据。 :returns: 查询请求广播帧。 """ data_items: list[tuple[int, bytes]] = [] if filter_info is not None: data_items.append(( BroadcastDataType.QUERY_REQUEST_FILTER, filter_info.pack(), )) if upper_layer_data: data_items.append(( BroadcastDataType.UPPER_LAYER_DATA, upper_layer_data, )) return BroadcastFrame( structure_indication=0x10, local_addr_type=0, peer_addr_type=0, local_addr=self.local_address, irk_id=0, peer_addr=target_addr, data_items=data_items, )
[文档] def handle_query_response( self, frame: BroadcastFrame, ) -> bool: """处理查询响应帧 (阶段 d)。 :param frame: 收到的查询响应帧。 :returns: True 表示发现完成。 """ addr = bytes(frame.local_addr) if addr in self._discovered: original, _ = self._discovered[addr] self._discovered[addr] = (original, frame) return True return False
# -- 广播设备端 --
[文档] def handle_query_request( self, request_frame: BroadcastFrame, all_services_data: bytes = b"", ) -> BroadcastFrame: """处理查询请求并构造查询响应帧 (阶段 c)。 :param request_frame: 收到的查询请求帧。 :param all_services_data: 本设备支持的所有服务数据。 :returns: 查询响应广播帧。 """ requester_addr = bytes(request_frame.local_addr) # 判断是否携带查询过滤信息 response_data = all_services_data for data_type, _data_bytes in request_frame.data_items: if data_type == BroadcastDataType.QUERY_REQUEST_FILTER: # 有过滤信息时, 应仅返回过滤指定的服务数据 # 此处简化处理: 返回全部数据, 由上层做更精确过滤 break data_items: list[tuple[int, bytes]] = [] if response_data: data_items.append(( BroadcastDataType.UPPER_LAYER_DATA, response_data, )) response = BroadcastFrame( structure_indication=0x10, local_addr_type=0, peer_addr_type=0, local_addr=self.local_address, irk_id=0, peer_addr=requester_addr, data_items=data_items, ) self._query_responses[requester_addr] = response return response
@property def discovered_devices(self) -> dict[bytes, tuple[BroadcastFrame, BroadcastFrame | None]]: """返回已发现的设备及其广播帧/查询响应帧。""" return dict(self._discovered)
[文档] def clear(self) -> None: """清空发现结果。""" self._discovered.clear() self._query_responses.clear()
[文档] def negotiate_gt_role( broadcaster_pref: int, broadcaster_negotiable: bool, initiator_pref: int, initiator_negotiable: bool, ) -> NegotiatedRole: """根据标准 7.1.3 执行 GT 角色协商。 双方各表达角色偏好 (0=T节点, 1=G节点) 和可协商标志。 冲突时默认: 发起方→G节点, 广播方→T节点。 :param broadcaster_pref: 广播方角色偏好 (0=T, 1=G)。 :param broadcaster_negotiable: 广播方角色是否可协商。 :param initiator_pref: 发起方 (接入方) 角色偏好。 :param initiator_negotiable: 发起方角色是否可协商。 :returns: 协商结果, 从发起方视角: local_role 为发起方角色。 """ # 若双方偏好一致 (都想当 G 或都想当 T), 存在冲突 if broadcaster_pref == initiator_pref: # 均不可协商 → 默认分配: 发起方=G, 广播方=T if not broadcaster_negotiable and not initiator_negotiable: return NegotiatedRole( local_role=Role.G_NODE, peer_role=Role.T_NODE, negotiated=False, ) # 广播方可协商 → 发起方保持偏好 if broadcaster_negotiable: if initiator_pref == 1: return NegotiatedRole(Role.G_NODE, Role.T_NODE, True) return NegotiatedRole(Role.T_NODE, Role.G_NODE, True) # 发起方可协商 → 广播方保持偏好 if broadcaster_pref == 1: return NegotiatedRole(Role.T_NODE, Role.G_NODE, True) return NegotiatedRole(Role.G_NODE, Role.T_NODE, True) # 偏好互补: 一方想 G, 另一方想 T → 各取所好 if initiator_pref == 1: return NegotiatedRole(Role.G_NODE, Role.T_NODE, True) return NegotiatedRole(Role.T_NODE, Role.G_NODE, True)
# ── 广播方接入管理 ──
[文档] @dataclass class BroadcasterAccessManager: """广播方接入管理器 (标准 7.1.3 阶段 a/c)。 职责: - 构造可接入扩展广播帧 - 处理收到的接入请求 - 生成接入响应 - 完成 GT 角色协商 """ config: AccessConfig = field(default_factory=AccessConfig) link_manager: LinkManager = field(default_factory=LinkManager) local_address: bytes = b"\x00" * 6 # 链路配置 (用于 AccessBasicInfo 或 TransportIndicationInfo) use_smf: bool = True smf_baseline_slot: int = 0 smf_offset: int = 300 smf_link_id: int = 0x00000001 smf_period: int = 800 smf_frame_type: int = 2 smf_bandwidth: int = 0 smf_pilot_density: int = 0 access_link_id: int = 0x000001 access_period: int = 40 access_timeout: int = 50 sleep_clock_accuracy: int = 7 access_crc_type: int = 0 access_crc_init: int = 0 hop_map: bytes = b"\xFF" * 10 smf_channel_table: bytes = b"\x00\x01\x02" whitelist: AccessWhitelist = field(default_factory=AccessWhitelist) # 广播间隔 (标准 7.1.1, 固定时间间隔, 单位 μs) adv_interval_us: int = 100_000 # 默认 100 ms # 内部状态 _phase: AccessPhase = AccessPhase.IDLE _pending_requests: list[dict[str, Any]] = field(default_factory=list) def __post_init__(self) -> None: if not MIN_ADV_INTERVAL_US <= self.adv_interval_us <= MAX_ADV_INTERVAL_US: raise ValueError( f"adv_interval_us={self.adv_interval_us} 超出标准范围 " f"[{MIN_ADV_INTERVAL_US}, {MAX_ADV_INTERVAL_US}] (§7.1.1)" )
[文档] def next_adv_delay_us(self) -> int: """计算下一次广播的总间隔 (§7.1.1)。 返回固定间隔加 [0, 2ms] 随机延迟, 单位 μs。 """ return self.adv_interval_us + random.randint(0, MAX_ADV_RANDOM_DELAY_US)
[文档] def build_ext_adv_frame(self) -> BroadcastFrame: """构建可接入扩展广播帧 (阶段 a)。 包含发现接入资源配置信息 (7.1.4.2)。 :returns: 填充好的 BroadcastFrame 对象。 """ gt_neg = GTNegotiation.NEGOTIATE_G if self.config.gt_preference == 1: if self.config.gt_negotiable: gt_neg = GTNegotiation.NEGOTIATE_G else: gt_neg = GTNegotiation.FIXED_G else: if self.config.gt_negotiable: gt_neg = GTNegotiation.NEGOTIATE_T else: gt_neg = GTNegotiation.FIXED_T discovery_config = DiscoveryAccessResourceConfig( request_offset=self.config.request_offset_us, request_max_length=self.config.request_max_length, response_offset=self.config.response_offset_us, gt_negotiation=gt_neg, entry_count=self.config.window_count, entries=[ DiscoveryAccessEntry( request_type=1, carry_info_indication=0, peer_addr_type=0, addr_present=0, peer_addr=b"", ) for _ in range(self.config.window_count) ], ) frame = BroadcastFrame( structure_indication=0x11, local_addr_type=0, peer_addr_type=0, local_addr=self.local_address, irk_id=0, peer_addr=b"\x00" * 6, data_items=[ (BroadcastDataType.DISCOVERY_ACCESS_RESOURCE, discovery_config.pack()), ], ) self._phase = AccessPhase.ADV_SENDING return frame
[文档] def handle_access_request( self, request_data: bytes, peer_address: bytes = b"\x00" * 6, ) -> tuple[BroadcastFrame | None, bool]: """处理接入请求 (阶段 c)。 :param request_data: 接入请求帧数据。 :param peer_address: 请求方 MAC 地址。 :returns: (响应帧, 是否接受)。响应帧包含 AccessResponseInfo, 以及 (若接受且角色为 G) AccessBasicInfo 或 TransportIndicationInfo。 """ self._phase = AccessPhase.RSP_WINDOW # 白名单检查 (7.1.6) if not self.whitelist.check(peer_address): log.info("接入请求被白名单拒绝: %s", peer_address.hex()) return None, False # 解析接入请求 req = AccessRequestInfo.unpack(request_data) # GT 角色协商 initiator_gt_pref = 0 initiator_negotiable = True if req.gt_role is not None: initiator_gt_pref = req.gt_role & 0x01 initiator_negotiable = (req.gt_role & 0x02) == 0 negotiation = negotiate_gt_role( broadcaster_pref=self.config.gt_preference, broadcaster_negotiable=self.config.gt_negotiable, initiator_pref=initiator_gt_pref, initiator_negotiable=initiator_negotiable, ) # 从发起方视角看, negotiation.local_role 是发起方角色 # 广播方角色 = negotiation.peer_role broadcaster_role = negotiation.peer_role # 构建响应 response_entry = AccessResponseEntry( peer_addr=peer_address, response_type=AccessResponseType.ACCEPT, peer_addr_type=0, repeat_indication=0, ) response_info = AccessResponseInfo( entries=[response_entry], ) data_items: list[tuple[int, bytes]] = [ (BroadcastDataType.ACCESS_RESPONSE, response_info.pack()), ] # 若广播方成为 G 节点, 需在响应中携带链路配置 if broadcaster_role == Role.G_NODE and self.use_smf: access_basic = AccessBasicInfo( smf_baseline_slot=self.smf_baseline_slot, smf_offset=self.smf_offset, smf_link_id=self.smf_link_id, smf_period=self.smf_period, smf_frame_type=self.smf_frame_type, smf_bandwidth=self.smf_bandwidth, smf_pilot_density=self.smf_pilot_density, access_link_id=self.access_link_id, access_period=self.access_period, access_timeout=self.access_timeout, sleep_clock_accuracy=self.sleep_clock_accuracy, access_crc_type=self.access_crc_type, access_crc_init=self.access_crc_init, hop_map=self.hop_map, smf_channel_count=len(self.smf_channel_table), smf_channel_table=self.smf_channel_table, ) data_items.append( (BroadcastDataType.ACCESS_BASIC, access_basic.pack()) ) response_frame = BroadcastFrame( structure_indication=0x11, local_addr_type=0, peer_addr_type=0, local_addr=self.local_address, irk_id=0, peer_addr=b"\x00" * 6, data_items=data_items, ) # 驱动状态机 self.link_manager.local_address = self.local_address self.link_manager.process_event(Event( EventType.ACCESS_REQUEST_RECEIVED, data={ "accepted": True, "role": broadcaster_role, "peer_address": peer_address, }, )) self._phase = AccessPhase.COMPLETED return response_frame, True
[文档] def reject_access_request( self, peer_address: bytes = b"\x00" * 6, reason: AccessResponseType = AccessResponseType.USER_REJECT, ) -> BroadcastFrame: """拒绝接入请求。 :param peer_address: 请求方 MAC 地址。 :param reason: 拒绝原因。 :returns: 包含拒绝响应的帧。 """ response_entry = AccessResponseEntry( peer_addr=peer_address, response_type=reason, peer_addr_type=0, repeat_indication=0, ) response_info = AccessResponseInfo( entries=[response_entry], ) return BroadcastFrame( structure_indication=0x11, local_addr_type=0, peer_addr_type=0, local_addr=self.local_address, irk_id=0, peer_addr=b"\x00" * 6, data_items=[ (BroadcastDataType.ACCESS_RESPONSE, response_info.pack()), ], )
# ── 接入发起方管理 ──
[文档] @dataclass class InitiatorAccessManager: """接入发起方管理器 (标准 7.1.3 阶段 b/d)。 职责: - 解析收到的扩展广播帧 - 构造接入请求 - 处理接入响应 - 完成 GT 角色协商 """ config: AccessConfig = field(default_factory=AccessConfig) link_manager: LinkManager = field(default_factory=LinkManager) local_address: bytes = b"\x00" * 6 whitelist: AccessWhitelist = field(default_factory=AccessWhitelist) # 解析出的广播方信息 _adv_frame: BroadcastFrame | None = None _discovery_config: DiscoveryAccessResourceConfig | None = None _phase: AccessPhase = AccessPhase.IDLE _retry_count: int = 0
[文档] def process_ext_adv(self, frame: BroadcastFrame) -> bool: """处理收到的可接入扩展广播帧 (阶段 b 准备)。 解析发现接入资源配置, 提取请求窗口参数。 白名单启用时, 仅处理白名单中设备的广播帧 (7.1.6)。 :param frame: 收到的广播帧。 :returns: True 表示帧中包含有效的接入资源配置。 """ # 白名单检查 (7.1.6) if not self.whitelist.check(frame.local_addr): log.info("广播帧被白名单过滤: %s", frame.local_addr.hex()) return False self._adv_frame = frame for data_type, data_bytes in frame.data_items: if data_type == BroadcastDataType.DISCOVERY_ACCESS_RESOURCE: self._discovery_config = DiscoveryAccessResourceConfig.unpack( data_bytes ) self._phase = AccessPhase.REQ_WINDOW # 通知 link_manager self.link_manager.process_event(Event( EventType.BROADCAST_RECEIVED, data=frame, )) return True return False
[文档] def build_access_request(self) -> bytes: """构造接入请求帧数据 (阶段 b)。 根据本地角色偏好构建 AccessRequestInfo。 :returns: 接入请求帧的序列化数据。 """ gt_flag = self.config.gt_preference if not self.config.gt_negotiable: gt_flag |= 0x02 req = AccessRequestInfo( structure_indication=0xFF, # 所有字段均存在 gt_role=gt_flag, frame_support=0x0F, # 支持 FT1-4 bandwidth_support=0x07, # 支持 1M/2M/4M mcs_support=0x1FFF, # 支持所有 MCS pilot_support=0x0F, # 四种导频密度 slot_support=0x1F, # 五种调度时隙 switch_delay=0x00, # 125μs crc_support=0x03, # CRC24 + CRC32 ) # 驱动状态机 self.link_manager.process_event(Event( EventType.SEND_ACCESS_REQUEST, data={ "peer_address": self._adv_frame.local_addr if self._adv_frame else b"", "role": Role.G_NODE if self.config.gt_preference == 1 else Role.T_NODE, }, )) return req.pack()
[文档] def handle_access_response( self, response_data: bytes, ) -> tuple[Role | None, dict[str, Any]]: """处理接入响应 (阶段 d)。 :param response_data: 接入响应帧完整数据 (BroadcastFrame.pack() 格式)。 :returns: (最终角色, 链路参数字典)。角色为 None 表示接入被拒绝。 """ frame = BroadcastFrame.unpack(response_data) link_params: dict[str, Any] = {} response_accepted = False final_role: Role | None = None for data_type, data_bytes in frame.data_items: if data_type == BroadcastDataType.ACCESS_RESPONSE: resp = AccessResponseInfo.unpack(data_bytes) if resp.entries and resp.entries[0].response_type == \ AccessResponseType.ACCEPT: response_accepted = True else: response_accepted = False elif data_type == BroadcastDataType.ACCESS_BASIC: access_info = AccessBasicInfo.unpack(data_bytes) link_params["smf_baseline_slot"] = access_info.smf_baseline_slot link_params["smf_offset"] = access_info.smf_offset link_params["smf_link_id"] = access_info.smf_link_id link_params["access_link_id"] = access_info.access_link_id link_params["supervision_timeout"] = ( access_info.access_timeout * 10 ) link_params["crc_type"] = access_info.access_crc_type link_params["hop_map"] = access_info.hop_map elif data_type == BroadcastDataType.TRANSPORT_INDICATION: ti = TransportIndicationInfo.unpack(data_bytes) link_params["event_group_period"] = ti.event_group_period link_params["event_period"] = ti.event_period if response_accepted: # 确定最终角色: 响应中有 AccessBasicInfo 说明对端是 G 节点 if BroadcastDataType.ACCESS_BASIC in \ [dt for dt, _ in frame.data_items]: final_role = Role.T_NODE # 对端 G, 本端 T else: final_role = Role.G_NODE if self.config.gt_preference == 1 \ else Role.T_NODE self.link_manager.process_event(Event( EventType.ACCESS_RESPONSE_RECEIVED, data={"accepted": True, "role": final_role}, )) self._phase = AccessPhase.COMPLETED else: self._retry_count += 1 self.link_manager.process_event(Event( EventType.ACCESS_RESPONSE_RECEIVED, data={"accepted": False}, )) self._phase = AccessPhase.IDLE return final_role, link_params
@property def can_retry(self) -> bool: """是否还能重试接入。""" return self._retry_count < self.config.max_retries @property def discovery_config(self) -> DiscoveryAccessResourceConfig | None: return self._discovery_config @property def phase(self) -> AccessPhase: return self._phase
# ── 非链接态广播管理 (7.1.7.2) ──
[文档] @dataclass class NonConnectedBroadcastConfig: """非链接态广播传输配置。""" # 广播链路信息 transmission_type: int = 0 # 0=异步, 1=同步 service_adapt_mode: int = 0 # 0=周期, 1=非周期 system_slot_seq: int = 0 event_group_offset: int = 0 event_group_set_id: int = 0 event_group_count: int = 1 event_group_interval: int = 10 event_group_period: int = 160 event_period: int = 40 event_count: int = 1 sync_anchor_delay: int = 0 sync_ref_delay: int = 0 base_link_id: int = 0x000001 frame_type: int = 2 bandwidth: int = 0 pilot_density: int = 0 sdu_max: int = 128 sdu_period: int = 1000 pdu_max: int = 256 new_packet_count: int = 1 crc_type: int = 0 crc_base_init: int = 0 hop_map: bytes = b"\xFF" * 10 is_5g: bool = False # 系统管理帧信息 smf_baseline_slot: int = 0 smf_offset: int = 300 smf_access_addr: int = 0x000001 smf_period: int = 800 smf_frame_type: int = 2 smf_bandwidth: int = 0 smf_pilot_density: int = 0 smf_channel_table: bytes = b"\x00\x01\x02" # 加密参数 (可选) encrypted: bool = False giv: bytes = b"" gskd: bytes = b""
[文档] @dataclass class NonConnectedBroadcastManager: """非链接态广播管理器 (标准 7.1.7.2)。 通过携带非链接态广播信息和启动系统管理帧信息的扩展广播帧 建立非链接态广播传输。 """ config: NonConnectedBroadcastConfig = field( default_factory=NonConnectedBroadcastConfig, ) local_address: bytes = b"\x00" * 6
[文档] def build_non_connected_broadcast_frame(self) -> BroadcastFrame: """构建携带非链接态广播信息的扩展广播帧。 帧中包含两种数据: - UNLINKED_BROADCAST_LINK (0x06): NonLinkedBroadcastLinkInfo - SYSTEM_MGMT_FRAME (0x05): SystemMgmtFrameInfo :returns: 构建好的 BroadcastFrame。 """ cfg = self.config link_info = NonLinkedBroadcastLinkInfo( transmission_type=cfg.transmission_type, service_adapt_mode=cfg.service_adapt_mode, system_slot_seq=cfg.system_slot_seq, event_group_offset=cfg.event_group_offset, event_group_set_id=cfg.event_group_set_id, event_group_count=cfg.event_group_count, event_group_interval=cfg.event_group_interval, event_group_period=cfg.event_group_period, event_period=cfg.event_period, event_count=cfg.event_count, sync_anchor_delay=cfg.sync_anchor_delay, sync_ref_delay=cfg.sync_ref_delay, base_link_id=cfg.base_link_id, frame_type=cfg.frame_type, bandwidth=cfg.bandwidth, pilot_density=cfg.pilot_density, sdu_max=cfg.sdu_max, sdu_period=cfg.sdu_period, pdu_max=cfg.pdu_max, new_packet_count=cfg.new_packet_count, crc_type=cfg.crc_type, crc_base_init=cfg.crc_base_init, hop_map=cfg.hop_map, giv=cfg.giv if cfg.encrypted else b"", gskd=cfg.gskd if cfg.encrypted else b"", is_5g=cfg.is_5g, ) smf_info = SystemMgmtFrameInfo( baseline_slot=cfg.smf_baseline_slot, offset=cfg.smf_offset, access_addr=cfg.smf_access_addr, period=cfg.smf_period, frame_type=cfg.smf_frame_type, bandwidth=cfg.smf_bandwidth, pilot_density=cfg.smf_pilot_density, channel_count=len(cfg.smf_channel_table), channel_table=cfg.smf_channel_table, ) data_items: list[tuple[int, bytes]] = [ (BroadcastDataType.UNLINKED_BROADCAST_LINK, link_info.pack()), (BroadcastDataType.SYSTEM_MGMT_FRAME, smf_info.pack()), ] return BroadcastFrame( structure_indication=0x11, local_addr_type=0, peer_addr_type=0, local_addr=self.local_address, irk_id=0, peer_addr=b"\x00" * 6, data_items=data_items, )
[文档] @dataclass class NonConnectedBroadcastResult: """解析非链接态广播帧的结果。""" link_info: NonLinkedBroadcastLinkInfo smf_info: SystemMgmtFrameInfo broadcaster_addr: bytes
[文档] def parse_non_connected_broadcast( frame: BroadcastFrame, ) -> NonConnectedBroadcastResult | None: """解析扩展广播帧中的非链接态广播信息 (7.1.7.2)。 :param frame: 收到的扩展广播帧。 :returns: 解析结果; 若帧中不包含非链接态广播信息则返回 None。 """ link_info: NonLinkedBroadcastLinkInfo | None = None smf_info: SystemMgmtFrameInfo | None = None for data_type, data_bytes in frame.data_items: if data_type == BroadcastDataType.UNLINKED_BROADCAST_LINK: link_info = NonLinkedBroadcastLinkInfo.unpack(data_bytes) elif data_type == BroadcastDataType.SYSTEM_MGMT_FRAME: smf_info = SystemMgmtFrameInfo.unpack(data_bytes) if link_info is None or smf_info is None: return None return NonConnectedBroadcastResult( link_info=link_info, smf_info=smf_info, broadcaster_addr=frame.local_addr, )
# ── 端到端接入流程 (仿真/测试用) ──
[文档] def run_access_procedure( broadcaster_addr: bytes = b"\x01\x02\x03\x04\x05\x06", initiator_addr: bytes = b"\x0A\x0B\x0C\x0D\x0E\x0F", broadcaster_config: AccessConfig | None = None, initiator_config: AccessConfig | None = None, broadcaster_use_smf: bool = True, ) -> tuple[BroadcasterAccessManager, InitiatorAccessManager]: """执行完整的端到端接入流程 (标准 7.1.3 阶段 a-e)。 用于仿真和集成测试, 不涉及实际射频传输。 :param broadcaster_addr: 广播方 MAC 地址。 :param initiator_addr: 发起方 MAC 地址。 :param broadcaster_config: 广播方配置。 :param initiator_config: 发起方配置。 :param broadcaster_use_smf: 是否使用系统管理帧模式。 :returns: (广播方管理器, 发起方管理器) 二元组, 两者的 link_manager 在成功时均处于 CONNECTED 状态。 """ if broadcaster_config is None: broadcaster_config = AccessConfig() if initiator_config is None: initiator_config = AccessConfig() # 初始化广播方: IDLE → BROADCASTING b_mgr = BroadcasterAccessManager( config=broadcaster_config, local_address=broadcaster_addr, use_smf=broadcaster_use_smf, ) b_mgr.link_manager.local_address = broadcaster_addr b_mgr.link_manager.process_event(Event(EventType.START_BROADCAST)) # 初始化发起方: IDLE → SCANNING i_mgr = InitiatorAccessManager( config=initiator_config, local_address=initiator_addr, ) i_mgr.link_manager.local_address = initiator_addr i_mgr.link_manager.process_event(Event(EventType.START_SCAN)) # 阶段 a: 广播方构建扩展广播帧 ext_adv_frame = b_mgr.build_ext_adv_frame() # 阶段 b: 发起方收到广播帧, 解析并发送接入请求 i_mgr.process_ext_adv(ext_adv_frame) request_data = i_mgr.build_access_request() # 阶段 c: 广播方收到请求, 生成响应 response_frame, _accepted = b_mgr.handle_access_request( request_data, peer_address=initiator_addr, ) # 阶段 d/e: 发起方收到响应 if response_frame is not None: response_bytes = response_frame.pack() i_mgr.handle_access_response(response_bytes) return b_mgr, i_mgr