nearlink_sdr.phy.mock_backend 源代码
"""Mock SDR 后端 -- 无硬件仿真。
从原 usrp.py 中的 MockUSRP / LoopbackBuffer 逻辑提取而来,
实现 SDRDevice 接口, 用于无硬件环境的开发与测试。
"""
from __future__ import annotations
__all__ = [
"LoopbackBuffer",
"MockDevice",
]
import logging
import numpy as np
from nearlink_sdr.phy.sdr_backend import SDRConfig, SDRDevice
logger = logging.getLogger(__name__)
[文档]
class LoopbackBuffer:
"""TX->RX 环回缓冲区, 可选信道损伤。"""
def __init__(self, channel_model: object | None = None, seed: int = 42):
self._buffer: list[np.ndarray] = []
self._channel = channel_model
self._rng = np.random.default_rng(seed)
[文档]
def push(self, samples: np.ndarray) -> None:
"""TX 端写入样本。"""
sig = np.asarray(samples, dtype=np.complex64).ravel()
if self._channel is not None:
sig = np.asarray(self._channel.apply_awgn(sig), dtype=np.complex64)
self._buffer.append(sig)
[文档]
def pull(self, n: int) -> np.ndarray:
"""RX 端读取最多 n 个样本。"""
if not self._buffer:
return np.array([], dtype=np.complex64)
head = self._buffer[0]
if len(head) <= n:
self._buffer.pop(0)
return head
out = head[:n]
self._buffer[0] = head[n:]
return out
@property
def available(self) -> int:
return sum(len(b) for b in self._buffer)
[文档]
def clear(self) -> None:
self._buffer.clear()
[文档]
class MockDevice(SDRDevice):
"""模拟 SDR 设备, 用于无硬件测试。
支持环回模式: TX 发射的数据经可选信道模型后送入 RX。
"""
def __init__(
self,
config: SDRConfig,
loopback: LoopbackBuffer | None = None,
):
super().__init__(config)
self._loopback = loopback
self._rng = np.random.default_rng(42)
self._freq_hz = config.center_freq_hz
self._sample_rate_hz = config.sample_rate_hz
self._rx_gain_db = config.rx_gain_db
self._tx_gain_db = config.tx_gain_db
self._bw_hz = config.bandwidth_hz
self.configure()
[文档]
def set_frequency(self, freq_hz: float) -> None:
self._freq_hz = freq_hz
[文档]
def set_sample_rate(self, rate_hz: float) -> None:
self._sample_rate_hz = rate_hz
self._config.sample_rate_hz = rate_hz
[文档]
def set_rx_gain(self, gain_db: float) -> None:
self._rx_gain_db = gain_db
self._config.rx_gain_db = gain_db
[文档]
def set_tx_gain(self, gain_db: float) -> None:
self._tx_gain_db = gain_db
self._config.tx_gain_db = gain_db
[文档]
def set_bandwidth(self, bw_hz: float) -> None:
self._bw_hz = bw_hz
[文档]
def transmit(self, samples: np.ndarray) -> int:
data = np.ascontiguousarray(samples.astype(np.complex64))
if self._loopback is not None:
self._loopback.push(data)
return len(data)
[文档]
def receive(self, num_samps: int) -> np.ndarray:
if self._loopback is not None and self._loopback.available > 0:
chunks = []
remaining = num_samps
while remaining > 0 and self._loopback.available > 0:
chunk = self._loopback.pull(remaining)
chunks.append(chunk)
remaining -= len(chunk)
result = np.concatenate(chunks) if chunks else np.array([], dtype=np.complex64)
if len(result) < num_samps:
pad = np.zeros(num_samps - len(result), dtype=np.complex64)
result = np.concatenate([result, pad])
return result[:num_samps]
# 无环回时返回噪声
noise = (
self._rng.standard_normal(num_samps)
+ 1j * self._rng.standard_normal(num_samps)
).astype(np.complex64) * 0.01
return noise
[文档]
def close(self) -> None:
if self._loopback is not None:
self._loopback.clear()
logger.info("MockDevice 已关闭")
[文档]
def status_string(self) -> str:
cfg = self._config
return (
f"[Mock] CH{cfg.channel_num} ({cfg.band}) "
f"Fc={self._freq_hz / 1e6:.1f}MHz "
f"Rate={self._sample_rate_hz / 1e6:.1f}Msps "
f"RXG={self._rx_gain_db:.1f}dB TXG={self._tx_gain_db:.1f}dB"
)
@property
def loopback(self) -> LoopbackBuffer | None:
return self._loopback