nearlink_sdr.phy.usrp 源代码

"""USRP E310 硬件接口层。

封装 UHD Python API, 提供针对 SparkLink SLE 标准优化的 USRP 设备管理、
TX/RX 流接口以及与现有 PHY 模块的集成 Pipeline。

硬件参数 (USRP E310 / AD9361):
- 频率范围: 70 MHz - 6 GHz
- 最大带宽: 56 MHz
- ADC/DAC: 12-bit
- RX 增益范围: 0 - 76 dB
- TX 增益范围: 0 - 89.75 dB
- 天线端口: TX/RX, RX2
"""

from __future__ import annotations

__all__ = [
    "DEFAULT_CPU_FORMAT",
    "DEFAULT_WIRE_FORMAT",
    "E310_ADC_BITS",
    "E310_BW_MAX_HZ",
    "E310_FREQ_MAX_HZ",
    "E310_FREQ_MIN_HZ",
    "E310_RX_GAIN_MAX",
    "E310_RX_GAIN_MIN",
    "E310_TX_GAIN_MAX",
    "E310_TX_GAIN_MIN",
    "SLE_BANDWIDTHS_MHZ",
    "RXStream",
    "SLETransceiver",
    "TXStream",
    "USRPConfig",
    "USRPDevice",
    "uhd_available",
]


import logging
from collections.abc import Callable
from dataclasses import dataclass

import numpy as np

from nearlink_sdr.phy.freq_hopping import (
    BAND_2400,
    channel_to_freq,
)

logger = logging.getLogger(__name__)

# ---------------------------------------------------------------------------
# UHD 条件导入 — 开发/测试环境可能不具备 UHD 驱动
# ---------------------------------------------------------------------------

try:
    import uhd
    _UHD_AVAILABLE = True
except ImportError:
    uhd = None  # type: ignore[assignment]
    _UHD_AVAILABLE = False


[文档] def uhd_available() -> bool: """检查当前环境是否安装了 UHD 驱动。""" return _UHD_AVAILABLE
# --------------------------------------------------------------------------- # E310 硬件常量 # --------------------------------------------------------------------------- E310_FREQ_MIN_HZ = 70e6 E310_FREQ_MAX_HZ = 6e9 E310_BW_MAX_HZ = 56e6 E310_RX_GAIN_MIN = 0.0 E310_RX_GAIN_MAX = 76.0 E310_TX_GAIN_MIN = 0.0 E310_TX_GAIN_MAX = 89.75 E310_ADC_BITS = 12 # SLE 标准支持的带宽 (MHz) SLE_BANDWIDTHS_MHZ = (1.0, 2.0, 4.0) # 默认 I/O 格式 DEFAULT_CPU_FORMAT = "fc32" # np.complex64 DEFAULT_WIRE_FORMAT = "sc16" # 16-bit signed complex on wire # --------------------------------------------------------------------------- # 设备配置 # ---------------------------------------------------------------------------
[文档] @dataclass class USRPConfig: """USRP 设备配置参数。 :ivar device_args: UHD 设备地址参数, 空字符串表示自动发现。 :ivar channel_num: SLE 射频信道号。 :ivar band: 频段标识 ("2400" / "5100" / "5800")。 :ivar sample_rate_hz: 采样率 (Hz)。 :ivar rx_gain_db: 接收增益 (dB)。 :ivar tx_gain_db: 发射增益 (dB)。 :ivar bandwidth_hz: 前端模拟滤波器带宽 (Hz), 0 表示不设置。 :ivar rx_antenna: 接收天线端口名称。 :ivar tx_antenna: 发射天线端口名称。 :ivar clock_source: 参考时钟源。 :ivar time_source: PPS 时间源。 :ivar cpu_format: 主机端样本格式。 :ivar wire_format: 线缆端样本格式。 :ivar num_recv_frames: 接收缓冲区帧数, 增大可减少溢出。 :ivar stream_timeout_s: 流操作超时时间 (秒)。 """ device_args: str = "" channel_num: int = 0 band: str = BAND_2400 sample_rate_hz: float = 1e6 rx_gain_db: float = 30.0 tx_gain_db: float = 20.0 bandwidth_hz: float = 0.0 rx_antenna: str = "RX2" tx_antenna: str = "TX/RX" clock_source: str = "internal" time_source: str = "internal" cpu_format: str = DEFAULT_CPU_FORMAT wire_format: str = DEFAULT_WIRE_FORMAT num_recv_frames: int = 512 stream_timeout_s: float = 3.0 def __post_init__(self): self.validate()
[文档] def validate(self): """校验配置参数有效性。""" freq_hz = self.center_freq_hz if not (E310_FREQ_MIN_HZ <= freq_hz <= E310_FREQ_MAX_HZ): raise ValueError( f"中心频率 {freq_hz / 1e6:.1f} MHz 超出 E310 支持范围 " f"[{E310_FREQ_MIN_HZ / 1e6:.0f}, {E310_FREQ_MAX_HZ / 1e6:.0f}] MHz" ) if not (E310_RX_GAIN_MIN <= self.rx_gain_db <= E310_RX_GAIN_MAX): raise ValueError( f"RX gain {self.rx_gain_db} dB 超出范围 " f"[{E310_RX_GAIN_MIN}, {E310_RX_GAIN_MAX}]" ) if not (E310_TX_GAIN_MIN <= self.tx_gain_db <= E310_TX_GAIN_MAX): raise ValueError( f"TX gain {self.tx_gain_db} dB 超出范围 " f"[{E310_TX_GAIN_MIN}, {E310_TX_GAIN_MAX}]" ) if self.sample_rate_hz <= 0 or self.sample_rate_hz > E310_BW_MAX_HZ: raise ValueError( f"采样率 {self.sample_rate_hz / 1e6:.1f} MHz 超出范围 " f"(0, {E310_BW_MAX_HZ / 1e6:.0f}] MHz" )
@property def center_freq_hz(self) -> float: """根据信道号和频段计算中心频率 (Hz)。""" return channel_to_freq(self.channel_num, self.band) * 1e6 @property def bandwidth_mhz(self) -> float: """带宽 (MHz)。""" return self.sample_rate_hz / 1e6
# --------------------------------------------------------------------------- # Mock USRP —— 用于无硬件环境下的开发与测试 # --------------------------------------------------------------------------- class MockStreamCmd: """模拟 uhd.types.StreamCMD。""" def __init__(self, stream_mode): self.stream_mode = stream_mode self.stream_now = True self.num_samps = 0 self.time_spec = None class MockStreamMode: """模拟 uhd.types.StreamMode。""" start_cont = "start_cont" stop_cont = "stop_cont" num_done = "num_done" class MockRXMetadata: """模拟 uhd.types.RXMetadata。""" def __init__(self): self.error_code = 0 # 0 = no error class MockTXMetadata: """模拟 uhd.types.TXMetadata。""" def __init__(self): self.has_time_spec = False self.time_spec = None self.end_of_burst = False self.start_of_burst = False class LoopbackBuffer: """TX→RX 环回缓冲区, 可选信道损伤。 用于 MockUSRP 的 loopback 模式, 将 TX 发射的 IQ 样本经信道模型 送入 RX 接收端, 实现无硬件的端到端仿真。 """ def __init__(self, channel_model: object | None = None, seed: int = 42): self._buffer: list[np.ndarray] = [] self._channel = channel_model self._rng = np.random.default_rng(seed) def push(self, samples: np.ndarray): """TX 端写入样本。""" sig = np.asarray(samples, dtype=np.complex64).ravel() if self._channel is not None: sig = np.asarray(self._channel.apply_awgn(sig), dtype=np.complex64) self._buffer.append(sig) def pull(self, n: int) -> np.ndarray: """RX 端读取最多 n 个样本。""" if not self._buffer: return np.array([], dtype=np.complex64) head = self._buffer[0] if len(head) <= n: self._buffer.pop(0) return head out = head[:n] self._buffer[0] = head[n:] return out @property def available(self) -> int: return sum(len(b) for b in self._buffer) def clear(self): self._buffer.clear() class MockStreamer: """模拟 UHD TX/RX Streamer。""" def __init__( self, direction: str = "rx", samps_per_buffer: int = 1000, loopback: LoopbackBuffer | None = None, ): self._direction = direction self._samps_per_buffer = samps_per_buffer self._running = False self._rng = np.random.default_rng(0) self._loopback = loopback def get_max_num_samps(self) -> int: return self._samps_per_buffer def issue_stream_cmd(self, cmd): if cmd.stream_mode == MockStreamMode.start_cont: self._running = True elif cmd.stream_mode == MockStreamMode.stop_cont: self._running = False def recv(self, buffer: np.ndarray, metadata, timeout: float = 3.0) -> int: """模拟接收: 环回模式返回 TX 发送的数据, 否则返回噪声。""" if not self._running: return 0 n = min(buffer.shape[-1], self._samps_per_buffer) if self._loopback is not None and self._loopback.available > 0: data = self._loopback.pull(n) actual = len(data) if buffer.ndim == 2: buffer[0, :actual] = data else: buffer[:actual] = data return actual noise = (self._rng.standard_normal(n) + 1j * self._rng.standard_normal(n)).astype(np.complex64) * 0.01 if buffer.ndim == 2: buffer[0, :n] = noise else: buffer[:n] = noise return n def send(self, buffer: np.ndarray, metadata, timeout: float = 3.0) -> int: """模拟发射: 环回模式写入缓冲区, 否则直接消耗。""" data = buffer[0] if buffer.ndim == 2 else buffer if self._loopback is not None: self._loopback.push(data) return len(data) class MockUSRP: """模拟 uhd.usrp.MultiUSRP, 用于无硬件测试。 记录所有配置调用的参数, 并提供可验证的状态。 支持 loopback 模式: TX 发射的数据经可选信道模型后送入 RX。 """ def __init__(self, args: str = "", loopback: LoopbackBuffer | None = None): self._args = args self._rx_rate = 1e6 self._tx_rate = 1e6 self._rx_freq = 2.402e9 self._tx_freq = 2.402e9 self._rx_gain = 30.0 self._tx_gain = 20.0 self._rx_bw = 0.0 self._tx_bw = 0.0 self._rx_antenna = "RX2" self._tx_antenna = "TX/RX" self._clock_source = "internal" self._time_source = "internal" self._call_log: list[tuple[str, dict]] = [] self._loopback = loopback def _log(self, method: str, **kwargs): self._call_log.append((method, kwargs)) def set_rx_rate(self, rate: float, chan: int = 0): self._log("set_rx_rate", rate=rate, chan=chan) self._rx_rate = rate def get_rx_rate(self, chan: int = 0) -> float: return self._rx_rate def set_tx_rate(self, rate: float, chan: int = 0): self._log("set_tx_rate", rate=rate, chan=chan) self._tx_rate = rate def get_tx_rate(self, chan: int = 0) -> float: return self._tx_rate def set_rx_freq(self, tune_request, chan: int = 0): self._log("set_rx_freq", freq=tune_request, chan=chan) if isinstance(tune_request, (int, float)): self._rx_freq = float(tune_request) else: self._rx_freq = float(tune_request) def get_rx_freq(self, chan: int = 0) -> float: return self._rx_freq def set_tx_freq(self, tune_request, chan: int = 0): self._log("set_tx_freq", freq=tune_request, chan=chan) if isinstance(tune_request, (int, float)): self._tx_freq = float(tune_request) else: self._tx_freq = float(tune_request) def get_tx_freq(self, chan: int = 0) -> float: return self._tx_freq def set_rx_gain(self, gain: float, chan: int = 0): self._log("set_rx_gain", gain=gain, chan=chan) self._rx_gain = gain def get_rx_gain(self, chan: int = 0) -> float: return self._rx_gain def set_tx_gain(self, gain: float, chan: int = 0): self._log("set_tx_gain", gain=gain, chan=chan) self._tx_gain = gain def get_tx_gain(self, chan: int = 0) -> float: return self._tx_gain def set_rx_bandwidth(self, bw: float, chan: int = 0): self._log("set_rx_bandwidth", bw=bw, chan=chan) self._rx_bw = bw def get_rx_bandwidth(self, chan: int = 0) -> float: return self._rx_bw def set_tx_bandwidth(self, bw: float, chan: int = 0): self._log("set_tx_bandwidth", bw=bw, chan=chan) self._tx_bw = bw def get_tx_bandwidth(self, chan: int = 0) -> float: return self._tx_bw def set_rx_antenna(self, ant: str, chan: int = 0): self._log("set_rx_antenna", ant=ant, chan=chan) self._rx_antenna = ant def get_rx_antenna(self, chan: int = 0) -> str: return self._rx_antenna def set_tx_antenna(self, ant: str, chan: int = 0): self._log("set_tx_antenna", ant=ant, chan=chan) self._tx_antenna = ant def get_tx_antenna(self, chan: int = 0) -> str: return self._tx_antenna def set_clock_source(self, source: str, mboard: int = 0): self._log("set_clock_source", source=source, mboard=mboard) self._clock_source = source def set_time_source(self, source: str, mboard: int = 0): self._log("set_time_source", source=source, mboard=mboard) self._time_source = source def get_mboard_name(self, mboard: int = 0) -> str: return "E310 (Mock)" def get_rx_stream(self, stream_args) -> MockStreamer: self._log("get_rx_stream", args=str(stream_args)) return MockStreamer("rx", loopback=self._loopback) def get_tx_stream(self, stream_args) -> MockStreamer: self._log("get_tx_stream", args=str(stream_args)) return MockStreamer("tx", loopback=self._loopback) def get_pp_string(self) -> str: return ( f"MockUSRP E310\n" f" RX: freq={self._rx_freq / 1e6:.1f} MHz, " f"rate={self._rx_rate / 1e6:.1f} Msps, " f"gain={self._rx_gain:.1f} dB\n" f" TX: freq={self._tx_freq / 1e6:.1f} MHz, " f"rate={self._tx_rate / 1e6:.1f} Msps, " f"gain={self._tx_gain:.1f} dB" ) # --------------------------------------------------------------------------- # USRP 设备管理 # ---------------------------------------------------------------------------
[文档] class USRPDevice: """USRP E310 设备管理。 封装 UHD MultiUSRP 对象, 提供 SLE 标准所需的设备配置和控制接口。 无硬件时自动使用 MockUSRP。 """ def __init__( self, config: USRPConfig | None = None, use_mock: bool = False, loopback: LoopbackBuffer | None = None, ): """初始化 USRP 设备。 :param config: 设备配置, None 时使用默认配置。 :param use_mock: 强制使用 MockUSRP。 :param loopback: 环回缓冲区, 仅 Mock 模式有效。 """ self._config = config or USRPConfig() self._use_mock = use_mock or not _UHD_AVAILABLE if self._use_mock: self._usrp = MockUSRP(self._config.device_args, loopback=loopback) logger.info("使用 MockUSRP (无硬件模式)") else: dev_args = self._config.device_args if self._config.num_recv_frames > 0: sep = "," if dev_args else "" dev_args += f"{sep}num_recv_frames={self._config.num_recv_frames}" self._usrp = uhd.usrp.MultiUSRP(dev_args) logger.info("已连接 USRP: %s", self._usrp.get_mboard_name()) self._configure() def _configure(self): """将 USRPConfig 的参数应用到设备。""" cfg = self._config freq_hz = cfg.center_freq_hz # 时钟与同步源 self._usrp.set_clock_source(cfg.clock_source) self._usrp.set_time_source(cfg.time_source) # RX 配置 self._usrp.set_rx_rate(cfg.sample_rate_hz) self._set_freq("rx", freq_hz) self._usrp.set_rx_gain(cfg.rx_gain_db) self._usrp.set_rx_antenna(cfg.rx_antenna) if cfg.bandwidth_hz > 0: self._usrp.set_rx_bandwidth(cfg.bandwidth_hz) # TX 配置 self._usrp.set_tx_rate(cfg.sample_rate_hz) self._set_freq("tx", freq_hz) self._usrp.set_tx_gain(cfg.tx_gain_db) self._usrp.set_tx_antenna(cfg.tx_antenna) if cfg.bandwidth_hz > 0: self._usrp.set_tx_bandwidth(cfg.bandwidth_hz) logger.info("设备配置完成: %s", self.status_string()) def _set_freq(self, direction: str, freq_hz: float, chan: int = 0): """设置中心频率。""" if self._use_mock: if direction == "rx": self._usrp.set_rx_freq(freq_hz, chan) else: self._usrp.set_tx_freq(freq_hz, chan) else: tune_req = uhd.libpyuhd.types.tune_request(freq_hz) if direction == "rx": self._usrp.set_rx_freq(tune_req, chan) else: self._usrp.set_tx_freq(tune_req, chan)
[文档] def tune_channel(self, channel_num: int, band: str | None = None): """切换到指定 SLE 射频信道。 :param channel_num: 射频信道号。 :param band: 频段标识, None 沿用当前配置。 """ if band is not None: self._config.band = band self._config.channel_num = channel_num freq_hz = self._config.center_freq_hz self._set_freq("rx", freq_hz) self._set_freq("tx", freq_hz) logger.debug("切换至信道 %d, 频率 %.1f MHz", channel_num, freq_hz / 1e6)
[文档] def set_rx_gain(self, gain_db: float): """设置 RX 增益。""" if not (E310_RX_GAIN_MIN <= gain_db <= E310_RX_GAIN_MAX): raise ValueError(f"RX gain {gain_db} dB 超出范围") self._config.rx_gain_db = gain_db self._usrp.set_rx_gain(gain_db)
[文档] def set_tx_gain(self, gain_db: float): """设置 TX 增益。""" if not (E310_TX_GAIN_MIN <= gain_db <= E310_TX_GAIN_MAX): raise ValueError(f"TX gain {gain_db} dB 超出范围") self._config.tx_gain_db = gain_db self._usrp.set_tx_gain(gain_db)
[文档] def set_sample_rate(self, rate_hz: float): """设置收发采样率。""" if rate_hz <= 0 or rate_hz > E310_BW_MAX_HZ: raise ValueError(f"采样率 {rate_hz / 1e6:.1f} MHz 超出范围") self._config.sample_rate_hz = rate_hz self._usrp.set_rx_rate(rate_hz) self._usrp.set_tx_rate(rate_hz)
@property def config(self) -> USRPConfig: return self._config @property def usrp(self): """获取底层 UHD/Mock 设备对象。""" return self._usrp @property def is_mock(self) -> bool: return self._use_mock
[文档] def status_string(self) -> str: """返回可读的设备状态摘要。""" cfg = self._config return ( f"[{'Mock' if self._use_mock else 'HW'}] " f"CH{cfg.channel_num} ({cfg.band}) " f"Fc={cfg.center_freq_hz / 1e6:.1f}MHz " f"Rate={cfg.sample_rate_hz / 1e6:.1f}Msps " f"RXG={cfg.rx_gain_db:.1f}dB TXG={cfg.tx_gain_db:.1f}dB" )
# --------------------------------------------------------------------------- # TX 发射流 # ---------------------------------------------------------------------------
[文档] class TXStream: """USRP 发射流接口。 管理 TX streamer 的生命周期, 提供帧级发射方法。 """ def __init__(self, device: USRPDevice): self._device = device self._streamer = None self._metadata = None
[文档] def open(self): """创建 TX streamer。""" if self._device.is_mock: stream_args = "fc32" else: stream_args = uhd.usrp.StreamArgs( self._device.config.cpu_format, self._device.config.wire_format, ) stream_args.channels = [0] self._streamer = self._device.usrp.get_tx_stream(stream_args) if self._device.is_mock: self._metadata = MockTXMetadata() else: self._metadata = uhd.types.TXMetadata() logger.debug("TX streamer 已创建")
[文档] def send(self, samples: np.ndarray) -> int: """发射一帧 IQ 样本。 :param samples: 复基带样本, dtype=complex64, shape (N,)。 :returns: 实际发射的样本数。 """ if self._streamer is None: raise RuntimeError("TX streamer 未打开, 请先调用 open()") samples = np.ascontiguousarray(samples.astype(np.complex64)) self._metadata.start_of_burst = True self._metadata.end_of_burst = True n_sent = self._streamer.send( samples, self._metadata, self._device.config.stream_timeout_s ) return n_sent
[文档] def send_continuous(self, samples: np.ndarray, num_repeats: int = 1) -> int: """连续发射, 重复指定次数。 :param samples: 单帧 IQ 样本。 :param num_repeats: 重复次数。 :returns: 总发射样本数。 """ if self._streamer is None: raise RuntimeError("TX streamer 未打开, 请先调用 open()") samples = np.ascontiguousarray(samples.astype(np.complex64)) total = 0 for i in range(num_repeats): self._metadata.start_of_burst = (i == 0) self._metadata.end_of_burst = (i == num_repeats - 1) n = self._streamer.send( samples, self._metadata, self._device.config.stream_timeout_s ) total += n return total
[文档] def close(self): """关闭 TX streamer。""" self._streamer = None self._metadata = None logger.debug("TX streamer 已关闭")
# --------------------------------------------------------------------------- # RX 接收流 # ---------------------------------------------------------------------------
[文档] class RXStream: """USRP 接收流接口。 管理 RX streamer 的生命周期, 提供定量接收和连续接收方法。 """ def __init__(self, device: USRPDevice): self._device = device self._streamer = None self._metadata = None self._recv_buffer = None self._running = False
[文档] def open(self, samps_per_buffer: int = 1000): """创建 RX streamer 并分配接收缓冲区。 :param samps_per_buffer: 每次 recv 调用的缓冲区大小。 """ if self._device.is_mock: stream_args = "fc32" else: stream_args = uhd.usrp.StreamArgs( self._device.config.cpu_format, self._device.config.wire_format, ) stream_args.channels = [0] self._streamer = self._device.usrp.get_rx_stream(stream_args) if self._device.is_mock: self._metadata = MockRXMetadata() else: self._metadata = uhd.types.RXMetadata() max_samps = self._streamer.get_max_num_samps() buf_size = min(samps_per_buffer, max_samps) self._recv_buffer = np.zeros((1, buf_size), dtype=np.complex64) logger.debug("RX streamer 已创建, 缓冲区 %d 样本", buf_size)
[文档] def recv_num_samps(self, num_samps: int) -> np.ndarray: """接收指定数量的样本。 :param num_samps: 要接收的样本数。 :returns: 复基带样本数组, shape (num_samps,), dtype=complex64。 """ if self._streamer is None: raise RuntimeError("RX streamer 未打开, 请先调用 open()") # 开始连续流 if self._device.is_mock: stream_cmd = MockStreamCmd(MockStreamMode.start_cont) else: stream_cmd = uhd.types.StreamCMD(uhd.types.StreamMode.start_cont) stream_cmd.stream_now = True self._streamer.issue_stream_cmd(stream_cmd) samples = np.zeros(num_samps, dtype=np.complex64) collected = 0 while collected < num_samps: n_recv = self._streamer.recv( self._recv_buffer, self._metadata, self._device.config.stream_timeout_s, ) if n_recv == 0: break n_copy = min(n_recv, num_samps - collected) samples[collected:collected + n_copy] = self._recv_buffer[0, :n_copy] collected += n_copy # 停止流 if self._device.is_mock: stop_cmd = MockStreamCmd(MockStreamMode.stop_cont) else: stop_cmd = uhd.types.StreamCMD(uhd.types.StreamMode.stop_cont) self._streamer.issue_stream_cmd(stop_cmd) return samples[:collected]
[文档] def start_continuous(self): """启动连续接收模式。""" if self._streamer is None: raise RuntimeError("RX streamer 未打开, 请先调用 open()") if self._device.is_mock: stream_cmd = MockStreamCmd(MockStreamMode.start_cont) else: stream_cmd = uhd.types.StreamCMD(uhd.types.StreamMode.start_cont) stream_cmd.stream_now = True self._streamer.issue_stream_cmd(stream_cmd) self._running = True
[文档] def recv_once(self) -> np.ndarray: """在连续模式下接收一个缓冲区的样本。 :returns: 接收到的样本, 长度可能小于缓冲区大小。 """ if not self._running: raise RuntimeError("连续接收未启动, 请先调用 start_continuous()") n = self._streamer.recv( self._recv_buffer, self._metadata, self._device.config.stream_timeout_s, ) return self._recv_buffer[0, :n].copy()
[文档] def stop_continuous(self): """停止连续接收。""" if self._running: if self._device.is_mock: stop_cmd = MockStreamCmd(MockStreamMode.stop_cont) else: stop_cmd = uhd.types.StreamCMD(uhd.types.StreamMode.stop_cont) self._streamer.issue_stream_cmd(stop_cmd) self._running = False
[文档] def close(self): """关闭 RX streamer。""" self.stop_continuous() self._streamer = None self._metadata = None self._recv_buffer = None logger.debug("RX streamer 已关闭")
# --------------------------------------------------------------------------- # SLE 收发 Pipeline # ---------------------------------------------------------------------------
[文档] class SLETransceiver: """SparkLink SLE 实时收发 Pipeline。 将 USRP 设备与 PHY 调制/解调模块集成, 提供帧级收发功能。 支持跳频操作。 """ def __init__(self, device: USRPDevice): self._device = device self._tx_stream = TXStream(device) self._rx_stream = RXStream(device) self._is_open = False
[文档] def open(self, rx_buf_size: int = 1000): """初始化收发流。""" self._tx_stream.open() self._rx_stream.open(samps_per_buffer=rx_buf_size) self._is_open = True logger.info("SLE 收发 Pipeline 已就绪")
[文档] def transmit_iq(self, iq_samples: np.ndarray) -> int: """发射预调制的 IQ 样本。 :param iq_samples: 复基带信号。 :returns: 实际发射的样本数。 """ if not self._is_open: raise RuntimeError("Pipeline 未打开") return self._tx_stream.send(iq_samples)
[文档] def receive_iq(self, num_samps: int) -> np.ndarray: """接收指定数量的 IQ 样本。 :param num_samps: 要接收的样本数。 :returns: 接收到的复基带信号。 """ if not self._is_open: raise RuntimeError("Pipeline 未打开") return self._rx_stream.recv_num_samps(num_samps)
[文档] def transmit_frame( self, frame_bits: np.ndarray, modulate_fn: Callable[[np.ndarray], np.ndarray], ) -> int: """调制并发射一帧。 :param frame_bits: 帧比特序列。 :param modulate_fn: 调制函数, 接收比特数组, 返回 IQ 样本。 :returns: 实际发射的样本数。 """ iq = modulate_fn(frame_bits) return self.transmit_iq(iq)
[文档] def receive_frame( self, num_samps: int, demodulate_fn: Callable[[np.ndarray], np.ndarray], ) -> np.ndarray: """接收并解调一帧。 :param num_samps: 接收样本数。 :param demodulate_fn: 解调函数, 接收 IQ 样本, 返回比特数组。 :returns: 解调后的比特序列。 """ iq = self.receive_iq(num_samps) return demodulate_fn(iq)
[文档] def hop_and_transmit( self, channel_num: int, iq_samples: np.ndarray, band: str | None = None, ) -> int: """跳频后发射。 :param channel_num: 目标信道号。 :param iq_samples: 要发射的 IQ 样本。 :param band: 频段标识, None 沿用当前配置。 :returns: 实际发射的样本数。 """ self._device.tune_channel(channel_num, band) return self.transmit_iq(iq_samples)
[文档] def hop_and_receive( self, channel_num: int, num_samps: int, band: str | None = None, ) -> np.ndarray: """跳频后接收。 :param channel_num: 目标信道号。 :param num_samps: 接收样本数。 :param band: 频段标识, None 沿用当前配置。 :returns: 接收到的 IQ 样本。 """ self._device.tune_channel(channel_num, band) return self.receive_iq(num_samps)
[文档] def close(self): """关闭所有收发流。""" self._tx_stream.close() self._rx_stream.close() self._is_open = False logger.info("SLE 收发 Pipeline 已关闭")
@property def device(self) -> USRPDevice: return self._device @property def is_open(self) -> bool: return self._is_open