"""SLE 节点实体 -- 统一收发接口。
将 MAC/PHY 各模块整合为单一 SLE 节点, 提供完整的链路生命周期管理
和数据收发能力。支持仿真模式和 USRP 硬件模式。
"""
from __future__ import annotations
__all__ = [
"NodeCallback",
"NodeConfig",
"NodeRole",
"NodeState",
"RxResult",
"SleNode",
"TransportMode",
"TxResult",
]
import logging
from dataclasses import dataclass, field
from enum import IntEnum, auto
from typing import TYPE_CHECKING
import numpy as np
from cryptography.exceptions import InvalidTag
from nearlink_sdr.mac.access import (
AccessWhitelist,
BroadcasterAccessManager,
DiscoveryManager,
InitiatorAccessManager,
NonConnectedBroadcastConfig,
NonConnectedBroadcastManager,
run_access_procedure,
)
from nearlink_sdr.mac.broadcast import BroadcastFilter, BroadcastFrame
from nearlink_sdr.mac.frame import AsyncDataFrame, ControlFrame
from nearlink_sdr.mac.link_manager import (
DisconnectReason,
Event,
EventType,
LinkManager,
LinkManagerCallback,
LinkParams,
LinkState,
Role,
)
from nearlink_sdr.mac.power_control import PowerController
from nearlink_sdr.mac.qos import (
ArqState,
FlowController,
HarqController,
LinkQualityTracker,
LinkType,
Priority,
QosManager,
TxDecision,
TxQueue,
)
from nearlink_sdr.mac.scheduler import (
EventTimingParams,
ScheduleManager,
SmfScheduleConfig,
)
from nearlink_sdr.mac.security_manager import (
FrameCryptoContext,
PairingManager,
PairingState,
)
from nearlink_sdr.mac.smf_scheduler import SMFScheduleParams, SMFTransmitScheduler
from nearlink_sdr.phy.channel import ChannelConfig, ChannelModel
from nearlink_sdr.phy.control_info import ControlInfoA2
from nearlink_sdr.phy.data_link import AsyncDataLinkParams, SyncDataLinkParams
from nearlink_sdr.phy.freq_hopping import (
FreqTable,
data_link_hop,
derive_hop_param2,
)
from nearlink_sdr.phy.mac_interface import (
MacRxResult,
iq_to_mac,
mac_to_iq,
)
from nearlink_sdr.phy.measurement import measurement_signal_1
from nearlink_sdr.phy.sdr_backend import SDRConfig, SDRDevice, create_device
from nearlink_sdr.phy.tx_pipeline import TxConfig
from nearlink_sdr.phy.usrp import SLETransceiver, USRPConfig, USRPDevice
if TYPE_CHECKING:
pass
log = logging.getLogger(__name__)
# ── 节点配置 ──
[文档]
class NodeRole(IntEnum):
"""节点初始角色偏好。"""
AUTO = 0
G_NODE = 1
T_NODE = 2
[文档]
class TransportMode(IntEnum):
"""传输模式。"""
SIMULATION = auto()
USRP = auto()
[文档]
@dataclass
class NodeConfig:
"""SLE 节点配置。
:ivar address: 6 字节设备地址。
:ivar role: 角色偏好 (AUTO/G_NODE/T_NODE)。
:ivar frame_type: 帧类型 (1-4)。
:ivar mcs_index: MCS 索引 (0-12)。
:ivar bandwidth_mhz: 信道带宽 (1/2/4 MHz)。
:ivar pilot_interval: 导频插入间隔 (0/4/8/16)。
:ivar max_pdu: 最大 PDU 长度 (字节)。
:ivar max_retransmit: 最大重传次数。
:ivar enable_encryption: 是否启用加密。
:ivar transport: 传输模式。
:ivar band: 频段标识。
:ivar hop_param2: 跳频参数 2 (0 表示自动从地址派生)。
:ivar blocked_channels: 被阻塞的信道号集合。
:ivar tx_power_dbm: 初始发射功率 (dBm)。
:ivar max_power_dbm: 最大发射功率。
:ivar min_power_dbm: 最小发射功率。
:ivar channel_config: 仿真模式下的信道模型配置 (None 表示理想信道)。
:ivar sdr_config: SDR 硬件配置 (transport=USRP 时使用, 支持 mock/uhd/pluto 后端)。
:ivar usrp_config: (已废弃) USRP 配置, 保留向后兼容; 优先使用 sdr_config。
:ivar smf_enabled: 是否启用系统管理帧调度。
"""
address: bytes = b"\x00" * 6
role: NodeRole = NodeRole.AUTO
frame_type: int = 2
mcs_index: int = 7
bandwidth_mhz: int = 1
pilot_interval: int = 8
max_pdu: int = 256
max_retransmit: int = 3
enable_encryption: bool = False
transport: TransportMode = TransportMode.SIMULATION
band: str = "2400"
hop_param2: int = 0
blocked_channels: set[int] = field(default_factory=set)
tx_power_dbm: float = 0.0
max_power_dbm: float = 20.0
min_power_dbm: float = -127.0
channel_config: ChannelConfig | None = None
sdr_config: SDRConfig | None = None
usrp_config: USRPConfig | None = None
smf_enabled: bool = False
# ── 节点状态 ──
[文档]
class NodeState(IntEnum):
"""节点高层状态, 简化自 LinkState。"""
IDLE = 0
ADVERTISING = auto()
SCANNING = auto()
CONNECTING = auto()
PAIRED = auto()
CONNECTED = auto()
DISCONNECTED = auto()
# ── 节点回调 ──
[文档]
class NodeCallback:
"""节点事件回调, 子类可覆盖。"""
[文档]
def on_state_changed(self, old: NodeState, new: NodeState) -> None:
pass
[文档]
def on_connected(self, peer_address: bytes, role: Role) -> None:
pass
[文档]
def on_data_received(self, data: bytes) -> None:
pass
[文档]
def on_disconnected(self, reason: DisconnectReason) -> None:
pass
[文档]
def on_broadcast_received(self, frame: BroadcastFrame) -> None:
pass
[文档]
def on_discovery_complete(self, devices: dict) -> None:
pass
[文档]
def on_measurement_result(self, result: dict) -> None:
pass
# ── 收发结果 ──
[文档]
@dataclass
class TxResult:
"""发射结果。"""
iq: np.ndarray | None
mac_bytes: bytes | None
decision: TxDecision
encrypted: bool = False
channel: int = -1
[文档]
@dataclass
class RxResult:
"""接收结果。"""
data: bytes | None
success: bool
decrypted: bool = False
# ── SLE 节点实体 ──
[文档]
@dataclass
class SleNode:
"""SparkLink SLE 节点实体。
整合链路管理、安全、QoS、PHY 流水线、跳频、功率控制、
时序调度、接入流程、信道模型为统一收发接口。
用法::
node = SleNode(NodeConfig(address=b"\\x01\\x02\\x03\\x04\\x05\\x06"))
node.send(b"hello", Priority.NORMAL)
tx_result = node.transmit()
# ... 通过信道传输 tx_result.iq ...
rx_result = node.receive(rx_iq, n_bytes)
"""
config: NodeConfig = field(default_factory=NodeConfig)
callback: NodeCallback = field(default_factory=NodeCallback)
# 内部组件 (延迟初始化)
_link_mgr: LinkManager = field(init=False, repr=False)
_qos: QosManager = field(init=False, repr=False)
_tx_config: TxConfig = field(init=False, repr=False)
_pairing: PairingManager | None = field(init=False, default=None, repr=False)
_crypto: FrameCryptoContext | None = field(init=False, default=None, repr=False)
_state: NodeState = field(init=False, default=NodeState.IDLE)
_peer_address: bytes = field(init=False, default=b"")
_tx_count: int = field(init=False, default=0)
_rx_count: int = field(init=False, default=0)
# 跳频管理
_freq_table: FreqTable = field(init=False, repr=False)
_hop_param2: int = field(init=False, default=0)
# 功率控制
_power_ctrl: PowerController = field(init=False, repr=False)
# 时序调度
_scheduler: ScheduleManager = field(init=False, repr=False)
# SMF 调度
_smf_scheduler: SMFTransmitScheduler | None = field(
init=False, default=None, repr=False,
)
# 接入流程
_whitelist: AccessWhitelist = field(init=False, repr=False)
_discovery: DiscoveryManager = field(init=False, repr=False)
_broadcast_filter: BroadcastFilter = field(init=False, repr=False)
_broadcaster_mgr: BroadcasterAccessManager | None = field(
init=False, default=None, repr=False,
)
_initiator_mgr: InitiatorAccessManager | None = field(
init=False, default=None, repr=False,
)
# 数据链路参数
_data_link_params: AsyncDataLinkParams | SyncDataLinkParams | None = field(
init=False, default=None, repr=False,
)
# 信道模型 (仿真模式)
_channel: ChannelModel | None = field(init=False, default=None, repr=False)
# SDR 硬件设备
_sdr_device: SDRDevice | None = field(init=False, default=None, repr=False)
# 旧版 USRP 收发器 (向后兼容)
_transceiver: SLETransceiver | None = field(
init=False, default=None, repr=False,
)
def __post_init__(self) -> None:
cfg = self.config
self._tx_config = TxConfig(
frame_type=cfg.frame_type,
mcs_index=cfg.mcs_index,
pid=int.from_bytes(cfg.address[:3], "big") if len(cfg.address) >= 3 else 0,
whitening_seed=0x52,
crc_seed=0x555555,
crc_len=24,
ctrl_bits_len=self._ctrl_bits_len(),
pilot_interval=cfg.pilot_interval,
)
link_type = LinkType.ASYNC if cfg.frame_type == 2 else LinkType.SYNC
self._qos = QosManager(
arq=ArqState(link_type=link_type, max_retransmit=cfg.max_retransmit),
harq=HarqController(),
flow=FlowController(),
quality=LinkQualityTracker(),
tx_queue=TxQueue(max_size=cfg.max_pdu),
)
self._link_mgr = LinkManager(
local_address=cfg.address,
params=LinkParams(frame_type=cfg.frame_type, bandwidth=cfg.bandwidth_mhz),
callback=_InternalCallback(self),
)
# 跳频
self._freq_table = FreqTable(
band=cfg.band,
bandwidth_mhz=cfg.bandwidth_mhz,
blocked_channels=set(cfg.blocked_channels),
)
self._hop_param2 = (
cfg.hop_param2 or derive_hop_param2(int.from_bytes(cfg.address, "big"))
)
# 功率控制
self._power_ctrl = PowerController(
tx_power_dbm=cfg.tx_power_dbm,
min_power_dbm=cfg.min_power_dbm,
max_power_dbm=cfg.max_power_dbm,
)
# 时序调度
self._scheduler = ScheduleManager()
# SMF 调度
if cfg.smf_enabled:
self._smf_scheduler = SMFTransmitScheduler()
# 接入流程
self._whitelist = AccessWhitelist()
self._discovery = DiscoveryManager(
local_address=cfg.address,
whitelist=self._whitelist,
)
self._broadcast_filter = BroadcastFilter()
# 数据链路参数
if cfg.frame_type == 2:
self._data_link_params = AsyncDataLinkParams()
else:
self._data_link_params = SyncDataLinkParams()
# 信道模型 (仿真模式)
if cfg.transport == TransportMode.SIMULATION and cfg.channel_config is not None:
self._channel = ChannelModel(config=cfg.channel_config)
# SDR 硬件
if cfg.transport == TransportMode.USRP:
if cfg.sdr_config is not None:
self._sdr_device = create_device(cfg.sdr_config)
elif cfg.usrp_config is not None:
# 向后兼容: 旧版 USRPConfig -> SLETransceiver
device = USRPDevice(config=cfg.usrp_config, use_mock=True)
self._transceiver = SLETransceiver(device)
def _ctrl_bits_len(self) -> int:
ft = self.config.frame_type
if ft == 1:
return 20
if ft == 2:
return 28
return 27
# ── 生命周期管理 ──
@property
def state(self) -> NodeState:
return self._state
@property
def link_state(self) -> LinkState:
return self._link_mgr.state
@property
def role(self) -> Role:
return self._link_mgr.role
@property
def peer_address(self) -> bytes:
return self._peer_address
@property
def is_connected(self) -> bool:
return self._state in (NodeState.CONNECTED, NodeState.PAIRED)
[文档]
def start_advertising(self) -> BroadcastFrame | None:
"""进入广播态, 构建并返回扩展广播帧。"""
self._link_mgr.process_event(Event(EventType.START_BROADCAST))
self._set_state(NodeState.ADVERTISING)
self._broadcaster_mgr = BroadcasterAccessManager(
link_manager=self._link_mgr,
local_address=self.config.address,
whitelist=self._whitelist,
)
return self._broadcaster_mgr.build_ext_adv_frame()
[文档]
def start_scanning(self) -> None:
"""进入扫描态, 搜索广播节点。"""
self._link_mgr.process_event(Event(EventType.START_SCAN))
self._set_state(NodeState.SCANNING)
self._initiator_mgr = InitiatorAccessManager(
link_manager=self._link_mgr,
local_address=self.config.address,
whitelist=self._whitelist,
)
[文档]
def on_broadcast_received(self, frame: BroadcastFrame) -> bool:
"""处理收到的广播帧。
在扫描态时, 通过发现管理器和广播过滤器处理帧。
:returns: True 表示该帧包含有效的接入资源配置。
"""
if not self._broadcast_filter.match(frame):
return False
self._discovery.on_broadcast_received(frame)
self.callback.on_broadcast_received(frame)
if self._initiator_mgr is not None:
return self._initiator_mgr.process_ext_adv(frame)
return False
[文档]
def connect(self, peer_address: bytes) -> None:
"""发起接入请求。"""
self._peer_address = peer_address
self._link_mgr.process_event(Event(EventType.SEND_ACCESS_REQUEST))
self._set_state(NodeState.CONNECTING)
[文档]
def accept_connection(self, peer_address: bytes, role: Role = Role.G_NODE) -> None:
"""接受接入请求, 直接进入链接态。
接入完成后自动注册链路调度资源。
"""
self._peer_address = peer_address
self._link_mgr.peer_address = peer_address
self._link_mgr.process_event(
Event(EventType.ACCESS_REQUEST_RECEIVED, {"accepted": True, "role": role})
)
self._set_state(NodeState.CONNECTED)
self._register_link_schedule(link_id=0)
[文档]
def run_access(
self,
peer_address: bytes,
is_broadcaster: bool = True,
) -> tuple[BroadcasterAccessManager, InitiatorAccessManager]:
"""执行完整接入编排流程。"""
b_mgr, i_mgr = run_access_procedure(
broadcaster_addr=self.config.address if is_broadcaster else peer_address,
initiator_addr=peer_address if is_broadcaster else self.config.address,
)
self._peer_address = peer_address
self._set_state(NodeState.CONNECTED)
self._link_mgr.process_event(
Event(EventType.ACCESS_REQUEST_RECEIVED, {"accepted": True, "role": Role.G_NODE})
)
self._register_link_schedule(link_id=0)
return b_mgr, i_mgr
[文档]
def disconnect(self) -> None:
"""断开连接。"""
self._link_mgr.process_event(Event(EventType.DISCONNECT_REQUEST))
self._set_state(NodeState.DISCONNECTED)
self._cleanup()
[文档]
def reset(self) -> None:
"""重置到初始状态。"""
if self._link_mgr.state in (LinkState.CONNECTED, LinkState.PAIRING):
self._link_mgr.process_event(Event(EventType.DISCONNECT_REQUEST))
if self._link_mgr.state != LinkState.IDLE:
self._link_mgr.reset()
self._set_state(NodeState.IDLE)
self._cleanup()
self.__post_init__()
# ── 安全配对 ──
[文档]
def start_pairing(self, peer_address: bytes) -> list:
"""发起配对流程。
:returns: 待发送给对端的配对信令列表。
"""
is_g = self._link_mgr.role == Role.G_NODE
self._pairing = PairingManager(
is_g_node=is_g,
local_address=self.config.address,
peer_address=peer_address,
)
self._link_mgr.process_event(Event(EventType.START_PAIRING))
self._set_state(NodeState.PAIRED)
return self._pairing.start_pairing()
[文档]
def process_pairing_message(self, msg: object) -> list:
"""处理收到的配对信令。
:returns: 待发送的响应信令列表。
"""
if self._pairing is None:
return []
responses = self._pairing.process_message(msg)
if self._pairing.state == PairingState.COMPLETED:
self._setup_crypto()
self._link_mgr.process_event(Event(EventType.PAIRING_COMPLETE))
self._set_state(NodeState.CONNECTED)
elif self._pairing.state == PairingState.FAILED:
self._link_mgr.process_event(Event(EventType.PAIRING_FAILED))
self._set_state(NodeState.DISCONNECTED)
return responses
def _setup_crypto(self) -> None:
if self._pairing is None or not self._pairing.is_paired:
return
is_g = self._link_mgr.role == Role.G_NODE
ik = self._pairing.integrity_key
iv_base = ik[:8] if ik else self._pairing.session_key[:8]
self._crypto = FrameCryptoContext(
session_key=self._pairing.session_key,
iv_base=iv_base,
direction=0 if is_g else 1,
mic_len=4,
frame_type=self.config.frame_type,
link_id=0,
)
# ── 数据发送 ──
[文档]
def send(self, data: bytes, priority: Priority = Priority.NORMAL) -> bool:
"""提交数据到发送队列。"""
return self._qos.submit_data(data, priority)
[文档]
def transmit(self) -> TxResult:
"""从发送队列取出数据, 构建帧并生成 IQ 信号。
自动选择当前跳频信道, 更新发射功率。
"""
decision, item = self._qos.prepare_tx()
if item is None:
return TxResult(iq=None, mac_bytes=None, decision=decision)
payload = item.data
encrypted = False
if self._crypto is not None and self.config.enable_encryption:
aad = b""
ciphertext, mic = self._crypto.encrypt(payload, aad)
payload = ciphertext + mic
encrypted = True
frame = AsyncDataFrame(segment_type=0, data=payload)
mac_bytes = frame.pack()
ctrl_fields = self._qos.get_ctrl_fields()
ctrl_info = _build_ctrl_info(ctrl_fields, len(mac_bytes))
iq = mac_to_iq(mac_bytes, self._tx_config, ctrl_info=ctrl_info)
# 跳频: 计算当前信道
slot = self._scheduler.slot_counter.value
channel = data_link_hop(slot, self._hop_param2, self._freq_table)
# SDR/USRP 模式: 通过硬件发送
if self._sdr_device is not None:
try:
self._sdr_device.transmit(iq)
except (OSError, RuntimeError):
log.warning("SDR 发送失败")
elif self._transceiver is not None:
try:
self._transceiver.transmit_iq(iq)
except (OSError, RuntimeError):
log.warning("USRP 发送失败")
self._tx_count += 1
return TxResult(
iq=iq, mac_bytes=mac_bytes, decision=decision,
encrypted=encrypted, channel=channel,
)
[文档]
def receive(self, iq_signal: np.ndarray, n_mac_bytes: int) -> RxResult:
"""从 IQ 信号解码数据。
如果配置了信道模型, 会在解码前应用信道效应。
"""
# 信道模型
if self._channel is not None:
iq_signal = self._channel.apply_fading(iq_signal)
rx: MacRxResult = iq_to_mac(iq_signal, self._tx_config, n_mac_bytes)
if not rx.crc_ok:
return RxResult(data=None, success=False)
try:
recovered = AsyncDataFrame.unpack(rx.mac_payload)
except (ValueError, IndexError):
return RxResult(data=None, success=False)
data = recovered.data
decrypted = False
if self._crypto is not None and self.config.enable_encryption:
mic_len = self._crypto.mic_len
if len(data) < mic_len:
return RxResult(data=None, success=False)
ciphertext = data[:-mic_len]
mic = data[-mic_len:]
try:
data = self._crypto.decrypt(ciphertext, mic, b"")
decrypted = True
except (ValueError, InvalidTag):
return RxResult(data=None, success=False)
self._rx_count += 1
self.callback.on_data_received(data)
return RxResult(data=data, success=True, decrypted=decrypted)
[文档]
def process_feedback(self, crc_ok: bool) -> TxDecision:
"""处理对端 ACK/NACK 反馈。"""
return self._qos.on_tx_feedback(crc_ok)
# ── 跳频 ──
@property
def freq_table(self) -> FreqTable:
return self._freq_table
@property
def current_channel(self) -> int:
"""当前跳频信道号。"""
return data_link_hop(
self._scheduler.slot_counter.value,
self._hop_param2,
self._freq_table,
)
[文档]
def block_channel(self, channel: int) -> None:
"""阻塞指定信道。"""
self._freq_table.blocked_channels.add(channel)
[文档]
def unblock_channel(self, channel: int) -> None:
"""解除信道阻塞。"""
self._freq_table.blocked_channels.discard(channel)
# ── 功率控制 ──
@property
def power_controller(self) -> PowerController:
return self._power_ctrl
@property
def tx_power_dbm(self) -> float:
return self._power_ctrl.tx_power_dbm
[文档]
def adjust_power(self, delta_db: float) -> float:
"""调整发射功率, 返回调整后的功率值。"""
req = self._power_ctrl.create_request(int(delta_db))
resp = self._power_ctrl.handle_request(req)
self._power_ctrl.apply_response(resp)
return self._power_ctrl.tx_power_dbm
# ── 时序调度 ──
@property
def scheduler(self) -> ScheduleManager:
return self._scheduler
[文档]
def advance_slot(self, n: int = 1) -> int:
"""推进时隙计数器。"""
return self._scheduler.advance_time(n)
def _register_link_schedule(self, link_id: int = 0) -> None:
"""接入完成后注册默认链路调度资源。"""
timing = EventTimingParams()
self._scheduler.register_link(link_id, timing)
# ── SMF ──
@property
def smf_scheduler(self) -> SMFTransmitScheduler | None:
return self._smf_scheduler
# ── 接入白名单与发现 ──
@property
def whitelist(self) -> AccessWhitelist:
return self._whitelist
@property
def discovery(self) -> DiscoveryManager:
return self._discovery
@property
def discovered_devices(self) -> dict:
return self._discovery.discovered_devices
[文档]
def set_broadcast_filter(self, broadcast_filter: BroadcastFilter) -> None:
"""设置广播帧过滤器。"""
self._broadcast_filter = broadcast_filter
# ── 非链接态广播 ──
[文档]
def start_non_connected_broadcast(
self,
nc_config: NonConnectedBroadcastConfig | None = None,
) -> BroadcastFrame | None:
"""发送非链接态广播数据帧。"""
if nc_config is None:
nc_config = NonConnectedBroadcastConfig()
mgr = NonConnectedBroadcastManager(
config=nc_config,
local_address=self.config.address,
)
return mgr.build_non_connected_broadcast_frame()
# ── 信道模型 ──
@property
def channel_model(self) -> ChannelModel | None:
return self._channel
[文档]
def set_channel_model(self, config: ChannelConfig) -> None:
"""配置仿真信道模型。"""
self._channel = ChannelModel(config=config)
[文档]
def clear_channel_model(self) -> None:
"""清除信道模型 (恢复理想信道)。"""
self._channel = None
# ── SDR 硬件 ──
@property
def sdr_device(self) -> SDRDevice | None:
return self._sdr_device
@property
def transceiver(self) -> SLETransceiver | None:
return self._transceiver
[文档]
def open_transceiver(self, rx_buf_size: int = 4096) -> None:
"""打开 SDR/USRP 硬件收发器。"""
if self._transceiver is not None:
self._transceiver.open(rx_buf_size)
[文档]
def close_transceiver(self) -> None:
"""关闭 SDR/USRP 硬件收发器。"""
if self._sdr_device is not None:
self._sdr_device.close()
if self._transceiver is not None:
self._transceiver.close()
[文档]
def receive_iq(self, num_samps: int) -> np.ndarray | None:
"""从 SDR/USRP 接收 IQ 采样。"""
if self._sdr_device is not None:
return self._sdr_device.receive(num_samps)
if self._transceiver is None:
return None
return self._transceiver.receive_iq(num_samps)
# ── 测量 ──
[文档]
def generate_measurement_signal(
self, n_measur: int = 64, security_type: int = 1,
) -> np.ndarray:
"""生成窄带测量信号 (标准 6.2.4)。"""
return measurement_signal_1(n_measur, security_type)
# ── 数据链路参数 ──
@property
def data_link_params(self) -> AsyncDataLinkParams | SyncDataLinkParams | None:
return self._data_link_params
# ── 信令 ──
[文档]
def send_signaling(self, msg: object) -> ControlFrame | None:
"""通过链路管理器发送控制面信令。"""
if not self.is_connected:
return None
return self._link_mgr.send_signaling(msg)
[文档]
def receive_signaling(self, msg: object) -> None:
"""处理收到的控制面信令。"""
self._link_mgr.process_event(Event(EventType.SIGNALING_RECEIVED, msg))
# ── 状态查询 ──
@property
def stats(self) -> dict:
"""返回节点统计信息。"""
qos_fields = self._qos.get_ctrl_fields()
return {
"state": self._state.name,
"role": self._link_mgr.role.name,
"tx_count": self._tx_count,
"rx_count": self._rx_count,
"tx_sn": qos_fields.get("tx_sn", 0),
"rx_sn": qos_fields.get("rx_sn", 0),
"fer": self._qos.quality.fer,
"mcs": self.config.mcs_index,
"queue_size": self._qos.tx_queue.size,
"flow_paused": self._qos.flow.is_paused,
"paired": self._pairing is not None and self._pairing.is_paired,
"encrypted": self._crypto is not None,
"channel": self.current_channel,
"tx_power_dbm": self._power_ctrl.tx_power_dbm,
"hop_param2": self._hop_param2,
}
@property
def recommended_mcs(self) -> int:
"""基于链路质量建议的 MCS 索引。"""
adj = self._qos.quality.suggest_mcs_adjustment()
return max(0, min(12, self.config.mcs_index + adj))
[文档]
def update_mcs(self, mcs_index: int) -> None:
"""更新 MCS 索引。"""
self.config.mcs_index = max(0, min(12, mcs_index))
self._tx_config = TxConfig(
frame_type=self.config.frame_type,
mcs_index=self.config.mcs_index,
pid=self._tx_config.pid,
whitening_seed=self._tx_config.whitening_seed,
crc_seed=self._tx_config.crc_seed,
crc_len=self._tx_config.crc_len,
ctrl_bits_len=self._tx_config.ctrl_bits_len,
pilot_interval=self._tx_config.pilot_interval,
)
# ── 内部方法 ──
def _set_state(self, new: NodeState) -> None:
old = self._state
if old != new:
self._state = new
self.callback.on_state_changed(old, new)
def _cleanup(self) -> None:
self._peer_address = b""
self._pairing = None
self._crypto = None
self._broadcaster_mgr = None
self._initiator_mgr = None
class _InternalCallback(LinkManagerCallback):
"""将 LinkManager 回调转发到 SleNode。"""
def __init__(self, node: SleNode) -> None:
self._node = node
def on_access_accepted(self, role: Role) -> None:
self._node.callback.on_connected(self._node.peer_address, role)
def on_disconnected(self, reason: DisconnectReason) -> None:
self._node._set_state(NodeState.DISCONNECTED)
self._node.callback.on_disconnected(reason)
def _build_ctrl_info(
ctrl_fields: dict[str, int], data_length: int,
) -> ControlInfoA2:
"""从 QoS 控制字段构建 A2 控制信息。"""
return ControlInfoA2(
packet_type=0,
empty_packet=ctrl_fields.get("empty_packet", 0) & 1,
tx_sn=ctrl_fields.get("tx_sn", 0) & 1,
rx_sn=ctrl_fields.get("rx_sn", 0) & 1,
flow_ctrl=ctrl_fields.get("flow_ctrl", 0) & 1,
sys_mgmt_rx=0,
reserved=0,
data_length=data_length,
)
def _build_ctrl_bits(
ctrl_fields: dict[str, int], frame_type: int,
) -> np.ndarray:
"""从 QoS 控制字段构建控制信息比特。"""
if frame_type == 2:
bits = np.zeros(28, dtype=np.int8)
bits[3] = ctrl_fields.get("tx_sn", 0) & 1
bits[4] = ctrl_fields.get("rx_sn", 0) & 1
bits[5] = ctrl_fields.get("flow_ctrl", 0) & 1
elif frame_type in (3, 4):
bits = np.zeros(27, dtype=np.int8)
tx_sn = ctrl_fields.get("tx_sn", 0) & 0x1F
for i in range(5):
bits[3 + i] = (tx_sn >> (4 - i)) & 1
rx_sn = ctrl_fields.get("rx_sn", 0) & 0x1F
for i in range(5):
bits[8 + i] = (rx_sn >> (4 - i)) & 1
bits[13] = ctrl_fields.get("flow_ctrl", 0) & 1
else:
bits = np.zeros(20, dtype=np.int8)
return bits