nearlink_sdr.phy.uhd_backend 源代码
"""UHD 后端 -- Ettus USRP 设备支持。
封装 UHD Python API, 实现 SDRDevice 接口。
保留对 USRP E310 等设备的完整支持。
"""
from __future__ import annotations
__all__ = [
"UHDDevice",
"uhd_available",
]
import logging
import numpy as np
from nearlink_sdr.phy.sdr_backend import SDRConfig, SDRDevice
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# UHD 条件导入
# ---------------------------------------------------------------------------
try:
import uhd
_UHD_AVAILABLE = True
except ImportError:
uhd = None # type: ignore[assignment]
_UHD_AVAILABLE = False
[文档]
def uhd_available() -> bool:
"""检查当前环境是否安装了 UHD 驱动。"""
return _UHD_AVAILABLE
# E310 硬件常量
E310_FREQ_MIN_HZ = 70e6
E310_FREQ_MAX_HZ = 6e9
E310_BW_MAX_HZ = 56e6
E310_RX_GAIN_MAX = 76.0
E310_TX_GAIN_MAX = 89.75
[文档]
class UHDDevice(SDRDevice):
"""USRP 设备接口 (基于 UHD)。"""
def __init__(self, config: SDRConfig):
if not _UHD_AVAILABLE:
raise RuntimeError("UHD 未安装。请安装 uhd Python 绑定。")
super().__init__(config)
dev_args = config.device_args
self._usrp = uhd.usrp.MultiUSRP(dev_args)
logger.info("已连接 USRP: %s", self._usrp.get_mboard_name())
# 流对象 (延迟创建)
self._tx_streamer = None
self._rx_streamer = None
self._rx_buffer = None
self.configure()
[文档]
def set_frequency(self, freq_hz: float) -> None:
tune_req = uhd.libpyuhd.types.tune_request(freq_hz)
self._usrp.set_rx_freq(tune_req, 0)
self._usrp.set_tx_freq(tune_req, 0)
[文档]
def set_sample_rate(self, rate_hz: float) -> None:
self._usrp.set_rx_rate(rate_hz)
self._usrp.set_tx_rate(rate_hz)
self._config.sample_rate_hz = rate_hz
[文档]
def set_rx_gain(self, gain_db: float) -> None:
self._usrp.set_rx_gain(gain_db)
self._config.rx_gain_db = gain_db
[文档]
def set_tx_gain(self, gain_db: float) -> None:
self._usrp.set_tx_gain(gain_db)
self._config.tx_gain_db = gain_db
[文档]
def set_bandwidth(self, bw_hz: float) -> None:
if bw_hz > 0:
self._usrp.set_rx_bandwidth(bw_hz)
self._usrp.set_tx_bandwidth(bw_hz)
def _ensure_tx_streamer(self):
if self._tx_streamer is None:
stream_args = uhd.usrp.StreamArgs("fc32", "sc16")
stream_args.channels = [0]
self._tx_streamer = self._usrp.get_tx_stream(stream_args)
def _ensure_rx_streamer(self, buf_size: int):
if self._rx_streamer is None:
stream_args = uhd.usrp.StreamArgs("fc32", "sc16")
stream_args.channels = [0]
self._rx_streamer = self._usrp.get_rx_stream(stream_args)
max_samps = self._rx_streamer.get_max_num_samps()
actual = min(buf_size, max_samps)
self._rx_buffer = np.zeros((1, actual), dtype=np.complex64)
self._rx_metadata = uhd.types.RXMetadata()
[文档]
def transmit(self, samples: np.ndarray) -> int:
self._ensure_tx_streamer()
data = np.ascontiguousarray(samples.astype(np.complex64))
metadata = uhd.types.TXMetadata()
metadata.start_of_burst = True
metadata.end_of_burst = True
return self._tx_streamer.send(data, metadata, self._config.stream_timeout_s)
[文档]
def receive(self, num_samps: int) -> np.ndarray:
self._ensure_rx_streamer(min(num_samps, 4096))
stream_cmd = uhd.types.StreamCMD(uhd.types.StreamMode.start_cont)
stream_cmd.stream_now = True
self._rx_streamer.issue_stream_cmd(stream_cmd)
result = np.zeros(num_samps, dtype=np.complex64)
collected = 0
while collected < num_samps:
n_recv = self._rx_streamer.recv(
self._rx_buffer, self._rx_metadata, self._config.stream_timeout_s
)
if n_recv == 0:
break
n_copy = min(n_recv, num_samps - collected)
result[collected : collected + n_copy] = self._rx_buffer[0, :n_copy]
collected += n_copy
stop_cmd = uhd.types.StreamCMD(uhd.types.StreamMode.stop_cont)
self._rx_streamer.issue_stream_cmd(stop_cmd)
return result[:collected]
[文档]
def close(self) -> None:
self._tx_streamer = None
self._rx_streamer = None
self._rx_buffer = None
logger.info("UHD 设备已关闭")
[文档]
def status_string(self) -> str:
cfg = self._config
return (
f"[UHD] {self._usrp.get_mboard_name()} "
f"CH{cfg.channel_num} ({cfg.band}) "
f"Fc={cfg.center_freq_hz / 1e6:.1f}MHz "
f"Rate={cfg.sample_rate_hz / 1e6:.1f}Msps "
f"RXG={cfg.rx_gain_db:.1f}dB TXG={cfg.tx_gain_db:.1f}dB"
)