"""测量链路传输与测量量计算 -- TXS-10002-2025 标准 6.7/6.8。
实现窄带跳频和超宽带脉冲测量链路的参数配置、时间资源调度和测量量计算:
- 6.7.2/6.7.3: 测量/感知链路参数模型
- 6.7.4: 时间资源配置 (事件组调度)
- 6.7.5/6.8.5: 测量量 (飞行时间/角度/CIR/距离多普勒)
"""
from __future__ import annotations
__all__ = [
"UWB_CONFIG_TIME_GRANULARITY_TC",
"AntennaOrderType",
"CIRConfig",
"CSIFeedback",
"ChannelSpliceMode",
"EventFrameType",
"HoppingOrder",
"MeasBandwidth",
"MeasDirection",
"MeasLinkParams",
"NodeMeasConfig",
"SecurityType",
"TimeRefType",
"UWBMeasLinkParams",
"UWBMeasMode",
"angle_estimate",
"compute_csi_feedback",
"csi_to_rx_power",
"ds_twr_2msg",
"ds_twr_3msg",
"event_schedule",
"event_start_times",
"extract_cir",
"range_doppler",
"uwb_event_count_per_mode",
"uwb_event_sender",
]
import cmath
import math
from dataclasses import dataclass, field
from enum import IntEnum
import numpy as np
from numpy.typing import NDArray
# ---------------------------------------------------------------------------
# 枚举定义
# ---------------------------------------------------------------------------
[文档]
class MeasDirection(IntEnum):
"""测量方向。"""
UNIDIRECTIONAL = 0 # 单向测量
BIDIRECTIONAL = 1 # 双向测量
[文档]
class HoppingOrder(IntEnum):
"""跳频方式 (6.7.2)。"""
LOW_TO_HIGH = 0 # 信道号从低到高
HIGH_TO_LOW = 1 # 信道号从高到低
ALGORITHMIC = 2 # 按跳频算法
[文档]
class MeasBandwidth(IntEnum):
"""测量信号带宽 (6.7.2)。"""
BW_1MHZ = 0
BW_2MHZ = 1
BW_4MHZ = 2
[文档]
class SecurityType(IntEnum):
"""安全类型。"""
NONE = 0
SECURE = 1
[文档]
class TimeRefType(IntEnum):
"""初始时间参考类型 (6.8.5.4)。"""
FIRST_PATH = 0 # 第一径到达时间
STRONGEST_PATH = 1 # 最强径到达时间
TX_TIME = 2 # 信号发送时间
[文档]
class AntennaOrderType(IntEnum):
"""天线对次序类型 (6.7.2)。"""
SEQUENTIAL = 0
RANDOM = 1
[文档]
class EventFrameType(IntEnum):
"""事件中传输的帧类型。"""
INIT = 0 # 初始化阶段事件
TYPE_1 = 1 # 测量帧类型 1
TYPE_2 = 2 # 测量帧类型 2
# ---------------------------------------------------------------------------
# 6.7.2/6.7.3 测量/感知链路参数
# ---------------------------------------------------------------------------
[文档]
@dataclass
class NodeMeasConfig:
"""单个节点的测量参数。"""
sync_signal_type: int = 0 # 同步信号类型
sync_signal_length: int = 64 # 同步信号长度
meas_signal_length: int = 128 # 测量序列长度
switch_interval_us: float = 0 # 同步-测量切换间隔 (us)
ft1_duration_us: float = 200 # 测量帧类型1传输时长 (us)
ft2_duration_us: float = 100 # 测量帧类型2传输时长 (us)
security_type: SecurityType = SecurityType.NONE
antenna_count: int = 1
[文档]
@dataclass
class MeasLinkParams:
"""测量/感知链路参数 (6.7.2/6.7.3)。"""
# 事件组时间参数
event_group_start_us: float = 0 # 事件组起始时刻 (us)
event_group_period_us: float = 10000 # 事件组周期 (us)
ft1_inter_event_us: float = 500 # 类型1事件间间隔 (us)
ft2_inter_event_us: float = 300 # 类型2事件间间隔 (us)
init_inter_event_us: float = 400 # 初始化阶段事件间间隔 (us)
intra_event_us: float = 100 # 事件内间隔 (us)
inter_event_us: float = 200 # 事件间间隔 (us)
inter_group_us: float = 1000 # 事件组间间隔 (us)
event_count: int = 4 # 事件总数/事件组
ft1_period: int = 2 # 类型1事件周期
has_init_phase: bool = False # 是否存在初始化阶段
# 方向与跳频
direction: MeasDirection = MeasDirection.UNIDIRECTIONAL
first_sender: int = 0 # 先发后发指示 (0=配置方先发)
hopping_order: HoppingOrder = HoppingOrder.LOW_TO_HIGH
bandwidth: MeasBandwidth = MeasBandwidth.BW_1MHZ
init_channel: int = 0 # 初始化频点信道号
# 先发/后发节点配置
first_node: NodeMeasConfig = field(default_factory=NodeMeasConfig)
second_node: NodeMeasConfig = field(default_factory=NodeMeasConfig)
# 多天线参数
antenna_switch_interval_us: float = 10 # 天线切换间隔 (us)
first_sub_signal_length: int = 0 # 第一个子测量信号长度
antenna_order_type: AntennaOrderType = AntennaOrderType.SEQUENTIAL
antenna_random_k: int = 0 # 天线随机数位宽
# 安全
security_random_seed: int = 0
@property
def has_multi_antenna(self) -> bool:
return (self.first_node.antenna_count > 1
or self.second_node.antenna_count > 1)
# ---------------------------------------------------------------------------
# 6.7.4 时间资源配置
# ---------------------------------------------------------------------------
[文档]
def event_schedule(params: MeasLinkParams) -> list[tuple[int, EventFrameType]]:
"""生成一个事件组内的事件调度序列。
:param params: 测量链路参数。
:returns: 列表, 每个元素为 (事件索引, 帧类型)。
"""
schedule: list[tuple[int, EventFrameType]] = []
for idx in range(params.event_count):
if params.has_init_phase and idx == 0:
schedule.append((idx, EventFrameType.INIT))
continue
effective_idx = idx
if params.ft1_period == 0:
# 所有事件都是类型2
schedule.append((idx, EventFrameType.TYPE_2))
elif params.ft1_period == 1:
schedule.append((idx, EventFrameType.TYPE_1))
else:
# 初始化占了第0个, 其余按周期分
if params.has_init_phase:
# 索引0已是INIT, 其余看 effective_idx 是否为 ft1_period 倍数
if effective_idx % params.ft1_period == 0:
schedule.append((idx, EventFrameType.TYPE_1))
else:
schedule.append((idx, EventFrameType.TYPE_2))
else:
if effective_idx % params.ft1_period == 0:
schedule.append((idx, EventFrameType.TYPE_1))
else:
schedule.append((idx, EventFrameType.TYPE_2))
return schedule
[文档]
def event_start_times(
params: MeasLinkParams,
group_index: int = 0,
) -> list[float]:
"""计算一个事件组内各事件的起始时刻 (us)。
:param params: 测量链路参数。
:param group_index: 事件组序号。
:returns: 各事件的起始时刻列表 (us)。
"""
schedule = event_schedule(params)
group_offset = (params.event_group_start_us
+ group_index * params.event_group_period_us)
times: list[float] = []
current = group_offset
for i, (_, ft) in enumerate(schedule):
times.append(current)
if i < len(schedule) - 1:
if ft == EventFrameType.INIT:
gap = params.init_inter_event_us
elif ft == EventFrameType.TYPE_1:
gap = params.ft1_inter_event_us
else:
gap = params.ft2_inter_event_us
current += gap
return times
# ---------------------------------------------------------------------------
# 6.8.5.1 双边两消息测量量 (DS-TWR 2-message)
# ---------------------------------------------------------------------------
[文档]
def ds_twr_2msg(ra: float, db: float) -> float:
"""双边两消息飞行时间估计 (6.8.5.1)。
:param ra: 先发节点测量的往返时间差 Ra = Ra2 - Ra1。
:param db: 后发节点测量的处理时间差 Db = Db2 - Db1。
:returns: 单向飞行时间估计 T_prop (与 ra/db 相同时间单位)。
"""
return 0.5 * (ra - db)
# ---------------------------------------------------------------------------
# 6.8.5.2 双边三消息测量量 (DS-TWR 3-message)
# ---------------------------------------------------------------------------
[文档]
def ds_twr_3msg(ra: float, rb: float, da: float, db: float) -> float:
"""双边三消息飞行时间估计 (6.8.5.2)。
:param ra: 先发节点 Ra = Ra2 - Ra1。
:param rb: 后发节点 Rb = Rb2 - Rb1。
:param da: 先发节点 Da = Da2 - Da1。
:param db: 后发节点 Db = Db2 - Db1。
:returns: 单向飞行时间估计 T_prop。
"""
numerator = ra * rb - da * db
denominator = ra + rb + da + db
if denominator == 0:
return 0.0
return numerator / denominator
# ---------------------------------------------------------------------------
# 6.8.5.3 角度测量量 (AoA/AoD)
# ---------------------------------------------------------------------------
[文档]
def angle_estimate(
delta_phi: float,
wavelength: float,
antenna_spacing: float,
) -> float:
"""根据相位差估计到达角/出发角 (6.8.5.3)。
:param delta_phi: 两天线接收/发送信号的载波相位差 (rad)。
:param wavelength: 信号波长 (m)。
:param antenna_spacing: 天线间距 (m)。
:returns: 角度估计值 (rad), 范围 [0, pi]。
"""
cos_val = (delta_phi * wavelength) / (2.0 * math.pi * antenna_spacing)
cos_val = max(-1.0, min(1.0, cos_val))
return math.acos(cos_val)
# ---------------------------------------------------------------------------
# 6.8.5.4 信道冲击响应测量量 (CIR)
# ---------------------------------------------------------------------------
[文档]
@dataclass
class CIRConfig:
"""信道冲击响应测量配置。"""
time_ref: TimeRefType = TimeRefType.FIRST_PATH
delay_offset_ns: float = 0.0 # 多径时延偏移量 (ns)
delay_length_ns: float = 100.0 # 多径时延指示长度 (ns)
sample_rate_hz: float = 499.2e6 # 时域采样频率
# ---------------------------------------------------------------------------
# 6.8.5.5 距离多普勒测量量
# ---------------------------------------------------------------------------
[文档]
def range_doppler(
cir_frames: NDArray[np.complex128],
) -> NDArray[np.complex128]:
"""距离多普勒测量量计算 (6.8.5.5)。
对多帧 CIR 数据在慢时间维度做 FFT 获取距离-多普勒图。
:param cir_frames: 形状 (n_frames, n_delay_bins) 的多帧 CIR 数据。
:returns: 形状 (n_frames, n_delay_bins) 的距离-多普勒矩阵。
"""
if cir_frames.ndim == 1:
return np.fft.fft(cir_frames)
# 沿帧维度 (慢时间, axis=0) 做 FFT
return np.fft.fft(cir_frames, axis=0)
# ---------------------------------------------------------------------------
# CSI 反馈量化 (6.7.5)
# ---------------------------------------------------------------------------
[文档]
@dataclass
class CSIFeedback:
"""信道状态信息反馈 (6.7.5)。
接收信号功率(dBm) = 20*lg(abs(相对IQ值/1024)) + 参考接收功率值
"""
ref_power_dbm: float # 参考接收功率值 (dBm)
relative_iq: complex # 相对 IQ 值 (线性复数)
[文档]
def compute_csi_feedback(
rx_power_dbm: float,
channel_iq: complex,
) -> CSIFeedback:
"""从接收功率和信道 IQ 估计值生成 CSI 反馈。
:param rx_power_dbm: 接收信号功率 (dBm)。
:param channel_iq: 信道估计的复数 IQ 值。
:returns: CSIFeedback 实例。
"""
iq_mag = abs(channel_iq)
if iq_mag < 1e-30:
return CSIFeedback(ref_power_dbm=rx_power_dbm, relative_iq=0j)
# 选择 ref_power 使 relative_iq 量化精度最好
# 标准: rx_power = 20*lg(|rel_iq|/1024) + ref_power
# 取 ref_power = rx_power, 则 |rel_iq| = 1024
# 保持相位: rel_iq = 1024 * exp(j*angle)
phase = cmath.phase(channel_iq)
relative_iq = 1024.0 * cmath.exp(1j * phase)
return CSIFeedback(
ref_power_dbm=rx_power_dbm,
relative_iq=relative_iq,
)
[文档]
def csi_to_rx_power(feedback: CSIFeedback) -> float:
"""从 CSI 反馈恢复接收信号功率。
:param feedback: CSI 反馈数据。
:returns: 接收信号功率 (dBm)。
"""
iq_mag = abs(feedback.relative_iq)
if iq_mag < 1e-30:
return feedback.ref_power_dbm - 200.0
return 20.0 * math.log10(iq_mag / 1024.0) + feedback.ref_power_dbm
# ---------------------------------------------------------------------------
# 6.8.3/6.8.4 超宽带脉冲测量链路参数与配置
# ---------------------------------------------------------------------------
[文档]
class UWBMeasMode(IntEnum):
"""超宽带脉冲测量模式 (6.8.1)。"""
ONE_WAY = 0 # 单向
DS_TWR_2MSG = 1 # 双边两消息
DS_TWR_3MSG = 2 # 双边三消息
[文档]
class ChannelSpliceMode(IntEnum):
"""信道拼接方式 (6.8.3)。"""
NONE = 0 # 不拼接
OVERLAP = 1 # 重叠方式
CONTINUOUS = 2 # 连续方式
NON_CONTINUOUS = 3 # 非连续方式
UWB_CONFIG_TIME_GRANULARITY_TC = 256 # 超宽带脉冲配置时间粒度 = 256*Tc
[文档]
@dataclass
class UWBMeasLinkParams:
"""超宽带脉冲测量链路参数 (6.8.3/6.8.4)。"""
mode: UWBMeasMode = UWBMeasMode.DS_TWR_2MSG
event_group_period_us: float = 10000
event_count: int = 4
intra_event_us: float = 100 # 事件内间隔
inter_event_us: float = 500 # 事件间间隔
inter_group_us: float = 2000 # 事件组间间隔
first_sender: int = 0 # 0=配置方先发
alternate_sender: bool = False # 交替先发标志
# 信道
channels: list[int] = field(default_factory=lambda: [0])
splice_mode: ChannelSpliceMode = ChannelSpliceMode.NONE
# 符号
code_length: int = 31 # 码字长度
cyclic_shift: int = 0 # 码字循环移位
# 多天线
tx_antenna_count: int = 1
rx_antenna_count: int = 1
antenna_switch_start_seg: int = 0 # 天线切换起始测量子片段
sub_segment_gap_us: float = 5.0 # 测量子片段间间隔
# 安全
secure_mode: bool = False
# 聚合上报
aggregate_events: int = 1 # 聚合多少个事件后上报
@property
def total_antenna_pairs(self) -> int:
return self.tx_antenna_count * self.rx_antenna_count
[文档]
def uwb_event_sender(
params: UWBMeasLinkParams,
event_index: int,
) -> int:
"""确定指定事件中谁先发。
:param params: UWB 测量参数。
:param event_index: 事件索引。
:returns: 0=配置方先发, 1=对端先发。
"""
if not params.alternate_sender:
return params.first_sender
return (params.first_sender + event_index) % 2
[文档]
def uwb_event_count_per_mode(mode: UWBMeasMode) -> int:
"""每个测量事件中包含的 UWB 帧数。"""
if mode == UWBMeasMode.ONE_WAY:
return 1
if mode == UWBMeasMode.DS_TWR_2MSG:
return 2
return 3 # DS_TWR_3MSG