nearlink_sdr.phy.control_info 源代码

"""物理层控制信息 -- TXS-10002-2025 标准 6.4

A 组: 无线帧类型 1 和 2, CRC12, 种子为同步序列低 12 位。
B 组: 无线帧类型 3 和 4, CRC24B, 种子 0x555555, CRC 与 LLID 异或。
"""

from __future__ import annotations

__all__ = [
    "ControlInfoA1",
    "ControlInfoA2",
    "ControlInfoA3",
    "ControlInfoA4",
    "ControlInfoA5",
    "ControlInfoA6",
    "ControlInfoA7",
    "ControlInfoB1",
    "ControlInfoB2",
    "ControlInfoB3",
    "ControlInfoB4",
    "ControlInfoB5",
    "ControlInfoType",
    "polar_decode_control",
    "polar_encode_control",
]


from dataclasses import dataclass
from enum import IntEnum

import numpy as np
from numpy.typing import NDArray

from nearlink_sdr.common.crc import (
    CRC12_POLY,
    CRC24B_POLY,
    crc_attach,
    crc_check,
)

# ---------------------------------------------------------------------------
# 辅助工具
# ---------------------------------------------------------------------------


def _int_to_bits(value: int, width: int) -> NDArray[np.int_]:
    """将无符号整数转为低位在前 (LSB-first) 的比特数组。"""
    bits = np.zeros(width, dtype=int)
    for i in range(width):
        bits[i] = (value >> i) & 1
    return bits


def _bits_to_int(bits: NDArray[np.int_]) -> int:
    """将低位在前 (LSB-first) 的比特数组转为无符号整数。"""
    val = 0
    for i, b in enumerate(bits):
        val |= int(b) << i
    return val


[文档] class ControlInfoType(IntEnum): """控制信息类型编码。""" A1 = 0b00001 A2 = 0b00010 A3 = 0b00011 A4 = 0b00100 A5 = 0b00101 A6 = 0b00110 A7 = 0b00111 B1 = 0b10001 B2 = 0b10010 B3 = 0b10011 B4 = 0b10100 B5 = 0b10101
# ========================================================================= # A 组控制信息 (帧类型 1/2) # ========================================================================= def _a_group_pack(info_bits: NDArray[np.int_], sync_seed: int, lqi: int | None = None) -> NDArray[np.int_]: """A 组通用打包: 拼接 LQI(可选) + info_bits + CRC12。""" parts = [] if lqi is not None: parts.append(_int_to_bits(lqi, 8)) parts.append(info_bits) payload = np.concatenate(parts) return crc_attach(payload, CRC12_POLY, 12, seed=sync_seed) def _a_group_unpack(bits: NDArray[np.int_], sync_seed: int, has_lqi: bool) -> tuple[int | None, NDArray[np.int_]]: """A 组通用解包: 校验 CRC12, 返回 (lqi, info_bits)。""" if not crc_check(bits, CRC12_POLY, 12, seed=sync_seed): raise ValueError("CRC12 校验失败") payload = bits[:-12] lqi = None if has_lqi: lqi = _bits_to_int(payload[:8]) payload = payload[8:] return lqi, payload # --------------------------------------------------------------------------- # A1: 广播 / 发现 / 接入 # ---------------------------------------------------------------------------
[文档] @dataclass class ControlInfoA1: broadcast_type: int # 3 bit packet_type: int # 3 bit reserved: int # 6 bit data_length: int # 8 bit
[文档] def pack(self, sync_seed: int, lqi: int | None = None) -> NDArray[np.int_]: info = np.concatenate([ _int_to_bits(self.broadcast_type, 3), _int_to_bits(self.packet_type, 3), _int_to_bits(self.reserved, 6), _int_to_bits(self.data_length, 8), ]) return _a_group_pack(info, sync_seed, lqi)
[文档] @classmethod def unpack(cls, bits: NDArray[np.int_], sync_seed: int, has_lqi: bool = False) -> tuple[int | None, ControlInfoA1]: lqi, info = _a_group_unpack(bits, sync_seed, has_lqi) off = 0 broadcast_type = _bits_to_int(info[off:off + 3]) off += 3 packet_type = _bits_to_int(info[off:off + 3]) off += 3 reserved = _bits_to_int(info[off:off + 6]) off += 6 data_length = _bits_to_int(info[off:off + 8]) return lqi, cls(broadcast_type, packet_type, reserved, data_length)
# --------------------------------------------------------------------------- # A2: 异步单播 # ---------------------------------------------------------------------------
[文档] @dataclass class ControlInfoA2: packet_type: int # 2 bit empty_packet: int # 1 bit tx_sn: int # 1 bit rx_sn: int # 1 bit flow_ctrl: int # 1 bit sys_mgmt_rx: int # 1 bit reserved: int # 2 bit data_length: int # 11 bit
[文档] def pack(self, sync_seed: int, lqi: int | None = None) -> NDArray[np.int_]: info = np.concatenate([ _int_to_bits(self.packet_type, 2), _int_to_bits(self.empty_packet, 1), _int_to_bits(self.tx_sn, 1), _int_to_bits(self.rx_sn, 1), _int_to_bits(self.flow_ctrl, 1), _int_to_bits(self.sys_mgmt_rx, 1), _int_to_bits(self.reserved, 2), _int_to_bits(self.data_length, 11), ]) return _a_group_pack(info, sync_seed, lqi)
[文档] @classmethod def unpack(cls, bits: NDArray[np.int_], sync_seed: int, has_lqi: bool = False) -> tuple[int | None, ControlInfoA2]: lqi, info = _a_group_unpack(bits, sync_seed, has_lqi) off = 0 packet_type = _bits_to_int(info[off:off + 2]) off += 2 empty_packet = _bits_to_int(info[off:off + 1]) off += 1 tx_sn = _bits_to_int(info[off:off + 1]) off += 1 rx_sn = _bits_to_int(info[off:off + 1]) off += 1 flow_ctrl = _bits_to_int(info[off:off + 1]) off += 1 sys_mgmt_rx = _bits_to_int(info[off:off + 1]) off += 1 reserved = _bits_to_int(info[off:off + 2]) off += 2 data_length = _bits_to_int(info[off:off + 11]) return lqi, cls(packet_type, empty_packet, tx_sn, rx_sn, flow_ctrl, sys_mgmt_rx, reserved, data_length)
# --------------------------------------------------------------------------- # A3: 同步单播 # ---------------------------------------------------------------------------
[文档] @dataclass class ControlInfoA3: packet_type: int # 2 bit empty_packet: int # 1 bit tx_sn: int # 5 bit rx_sn: int # 5 bit flow_ctrl: int # 1 bit async_sched: int # 1 bit reserved: int # 2 bit data_length: int # 11 bit
[文档] def pack(self, sync_seed: int, lqi: int | None = None) -> NDArray[np.int_]: info = np.concatenate([ _int_to_bits(self.packet_type, 2), _int_to_bits(self.empty_packet, 1), _int_to_bits(self.tx_sn, 5), _int_to_bits(self.rx_sn, 5), _int_to_bits(self.flow_ctrl, 1), _int_to_bits(self.async_sched, 1), _int_to_bits(self.reserved, 2), _int_to_bits(self.data_length, 11), ]) return _a_group_pack(info, sync_seed, lqi)
[文档] @classmethod def unpack(cls, bits: NDArray[np.int_], sync_seed: int, has_lqi: bool = False) -> tuple[int | None, ControlInfoA3]: lqi, info = _a_group_unpack(bits, sync_seed, has_lqi) off = 0 packet_type = _bits_to_int(info[off:off + 2]) off += 2 empty_packet = _bits_to_int(info[off:off + 1]) off += 1 tx_sn = _bits_to_int(info[off:off + 5]) off += 5 rx_sn = _bits_to_int(info[off:off + 5]) off += 5 flow_ctrl = _bits_to_int(info[off:off + 1]) off += 1 async_sched = _bits_to_int(info[off:off + 1]) off += 1 reserved = _bits_to_int(info[off:off + 2]) off += 2 data_length = _bits_to_int(info[off:off + 11]) return lqi, cls(packet_type, empty_packet, tx_sn, rx_sn, flow_ctrl, async_sched, reserved, data_length)
# --------------------------------------------------------------------------- # A4: 组播组长 (异步 / 同步) # ---------------------------------------------------------------------------
[文档] @dataclass class ControlInfoA4: packet_type: int # 2 bit empty_packet: int # 1 bit tx_sn: int # 1 bit rx_sn: int # 8 bit flow_ctrl: int # 1 bit async_sched: int # 1 bit reserved: int # 3 bit data_length: int # 11 bit
[文档] def pack(self, sync_seed: int, lqi: int | None = None) -> NDArray[np.int_]: info = np.concatenate([ _int_to_bits(self.packet_type, 2), _int_to_bits(self.empty_packet, 1), _int_to_bits(self.tx_sn, 1), _int_to_bits(self.rx_sn, 8), _int_to_bits(self.flow_ctrl, 1), _int_to_bits(self.async_sched, 1), _int_to_bits(self.reserved, 3), _int_to_bits(self.data_length, 11), ]) return _a_group_pack(info, sync_seed, lqi)
[文档] @classmethod def unpack(cls, bits: NDArray[np.int_], sync_seed: int, has_lqi: bool = False) -> tuple[int | None, ControlInfoA4]: lqi, info = _a_group_unpack(bits, sync_seed, has_lqi) off = 0 packet_type = _bits_to_int(info[off:off + 2]) off += 2 empty_packet = _bits_to_int(info[off:off + 1]) off += 1 tx_sn = _bits_to_int(info[off:off + 1]) off += 1 rx_sn = _bits_to_int(info[off:off + 8]) off += 8 flow_ctrl = _bits_to_int(info[off:off + 1]) off += 1 async_sched = _bits_to_int(info[off:off + 1]) off += 1 reserved = _bits_to_int(info[off:off + 3]) off += 3 data_length = _bits_to_int(info[off:off + 11]) return lqi, cls(packet_type, empty_packet, tx_sn, rx_sn, flow_ctrl, async_sched, reserved, data_length)
# --------------------------------------------------------------------------- # A5: 组播组员 # ---------------------------------------------------------------------------
[文档] @dataclass class ControlInfoA5: packet_type: int # 2 bit empty_packet: int # 1 bit tx_sn: int # 1 bit rx_sn: int # 1 bit flow_ctrl: int # 1 bit async_sched: int # 1 bit reserved: int # 2 bit data_length: int # 11 bit
[文档] def pack(self, sync_seed: int, lqi: int | None = None) -> NDArray[np.int_]: info = np.concatenate([ _int_to_bits(self.packet_type, 2), _int_to_bits(self.empty_packet, 1), _int_to_bits(self.tx_sn, 1), _int_to_bits(self.rx_sn, 1), _int_to_bits(self.flow_ctrl, 1), _int_to_bits(self.async_sched, 1), _int_to_bits(self.reserved, 2), _int_to_bits(self.data_length, 11), ]) return _a_group_pack(info, sync_seed, lqi)
[文档] @classmethod def unpack(cls, bits: NDArray[np.int_], sync_seed: int, has_lqi: bool = False) -> tuple[int | None, ControlInfoA5]: lqi, info = _a_group_unpack(bits, sync_seed, has_lqi) off = 0 packet_type = _bits_to_int(info[off:off + 2]) off += 2 empty_packet = _bits_to_int(info[off:off + 1]) off += 1 tx_sn = _bits_to_int(info[off:off + 1]) off += 1 rx_sn = _bits_to_int(info[off:off + 1]) off += 1 flow_ctrl = _bits_to_int(info[off:off + 1]) off += 1 async_sched = _bits_to_int(info[off:off + 1]) off += 1 reserved = _bits_to_int(info[off:off + 2]) off += 2 data_length = _bits_to_int(info[off:off + 11]) return lqi, cls(packet_type, empty_packet, tx_sn, rx_sn, flow_ctrl, async_sched, reserved, data_length)
# --------------------------------------------------------------------------- # A6: 同步广播 # ---------------------------------------------------------------------------
[文档] @dataclass class ControlInfoA6: packet_type: int # 2 bit packet_sn: int # 5 bit packet_group: int # 3 bit end_indicator: int # 1 bit sys_mgmt_rx: int # 1 bit reserved: int # 5 bit data_length: int # 11 bit
[文档] def pack(self, sync_seed: int, lqi: int | None = None) -> NDArray[np.int_]: info = np.concatenate([ _int_to_bits(self.packet_type, 2), _int_to_bits(self.packet_sn, 5), _int_to_bits(self.packet_group, 3), _int_to_bits(self.end_indicator, 1), _int_to_bits(self.sys_mgmt_rx, 1), _int_to_bits(self.reserved, 5), _int_to_bits(self.data_length, 11), ]) return _a_group_pack(info, sync_seed, lqi)
[文档] @classmethod def unpack(cls, bits: NDArray[np.int_], sync_seed: int, has_lqi: bool = False) -> tuple[int | None, ControlInfoA6]: lqi, info = _a_group_unpack(bits, sync_seed, has_lqi) off = 0 packet_type = _bits_to_int(info[off:off + 2]) off += 2 packet_sn = _bits_to_int(info[off:off + 5]) off += 5 packet_group = _bits_to_int(info[off:off + 3]) off += 3 end_indicator = _bits_to_int(info[off:off + 1]) off += 1 sys_mgmt_rx = _bits_to_int(info[off:off + 1]) off += 1 reserved = _bits_to_int(info[off:off + 5]) off += 5 data_length = _bits_to_int(info[off:off + 11]) return lqi, cls(packet_type, packet_sn, packet_group, end_indicator, sys_mgmt_rx, reserved, data_length)
# --------------------------------------------------------------------------- # A7: 系统管理帧 # ---------------------------------------------------------------------------
[文档] @dataclass class ControlInfoA7: packet_sn: int # 5 bit packet_group: int # 3 bit reserved: int # 9 bit data_length: int # 11 bit
[文档] def pack(self, sync_seed: int, lqi: int | None = None) -> NDArray[np.int_]: info = np.concatenate([ _int_to_bits(self.packet_sn, 5), _int_to_bits(self.packet_group, 3), _int_to_bits(self.reserved, 9), _int_to_bits(self.data_length, 11), ]) return _a_group_pack(info, sync_seed, lqi)
[文档] @classmethod def unpack(cls, bits: NDArray[np.int_], sync_seed: int, has_lqi: bool = False) -> tuple[int | None, ControlInfoA7]: lqi, info = _a_group_unpack(bits, sync_seed, has_lqi) off = 0 packet_sn = _bits_to_int(info[off:off + 5]) off += 5 packet_group = _bits_to_int(info[off:off + 3]) off += 3 reserved = _bits_to_int(info[off:off + 9]) off += 9 data_length = _bits_to_int(info[off:off + 11]) return lqi, cls(packet_sn, packet_group, reserved, data_length)
# ========================================================================= # B 组控制信息 (帧类型 3/4) # ========================================================================= _CRC24B_SEED = 0x555555 def _llid_mask(llid: int) -> NDArray[np.int_]: """将 LLID 扩展为 24 位掩码用于 CRC 异或。""" return _int_to_bits(llid & 0xFFFFFF, 24) def _b_group_pack(info_bits: NDArray[np.int_], llid: int) -> NDArray[np.int_]: """B 组通用打包: info_bits + CRC24B (种子 0x555555, 异或 LLID)。""" mask = _llid_mask(llid) return crc_attach(info_bits, CRC24B_POLY, 24, seed=_CRC24B_SEED, mask=mask) def _b_group_check(bits: NDArray[np.int_], llid: int) -> NDArray[np.int_]: """B 组通用解包: 校验 CRC24B, 返回 info_bits。""" mask = _llid_mask(llid) if not crc_check(bits, CRC24B_POLY, 24, seed=_CRC24B_SEED, mask=mask): raise ValueError("CRC24B 校验失败") return bits[:-24] # --------------------------------------------------------------------------- # B1: 单播 / 组播 PSK # ---------------------------------------------------------------------------
[文档] @dataclass class ControlInfoB1: frame_format: int # 1 bit harq_feedback: int # 8 bit packet_sn: int # 1 bit mcs: int # 4 bit data_length: int # 11 bit flow_ctrl: int # 1 bit upper_link: int # 1 bit
[文档] def pack(self, llid: int) -> NDArray[np.int_]: info = np.concatenate([ _int_to_bits(self.frame_format, 1), _int_to_bits(self.harq_feedback, 8), _int_to_bits(self.packet_sn, 1), _int_to_bits(self.mcs, 4), _int_to_bits(self.data_length, 11), _int_to_bits(self.flow_ctrl, 1), _int_to_bits(self.upper_link, 1), ]) return _b_group_pack(info, llid)
[文档] @classmethod def unpack(cls, bits: NDArray[np.int_], llid: int) -> ControlInfoB1: info = _b_group_check(bits, llid) off = 0 frame_format = _bits_to_int(info[off:off + 1]) off += 1 harq_feedback = _bits_to_int(info[off:off + 8]) off += 8 packet_sn = _bits_to_int(info[off:off + 1]) off += 1 mcs = _bits_to_int(info[off:off + 4]) off += 4 data_length = _bits_to_int(info[off:off + 11]) off += 11 flow_ctrl = _bits_to_int(info[off:off + 1]) off += 1 upper_link = _bits_to_int(info[off:off + 1]) return cls(frame_format, harq_feedback, packet_sn, mcs, data_length, flow_ctrl, upper_link)
# --------------------------------------------------------------------------- # B2: 反馈组播组长 # ---------------------------------------------------------------------------
[文档] @dataclass class ControlInfoB2: harq_feedback: int # 25 bit flow_ctrl: int # 1 bit upper_link: int # 1 bit
[文档] def pack(self, llid: int) -> NDArray[np.int_]: info = np.concatenate([ _int_to_bits(self.harq_feedback, 25), _int_to_bits(self.flow_ctrl, 1), _int_to_bits(self.upper_link, 1), ]) return _b_group_pack(info, llid)
[文档] @classmethod def unpack(cls, bits: NDArray[np.int_], llid: int) -> ControlInfoB2: info = _b_group_check(bits, llid) off = 0 harq_feedback = _bits_to_int(info[off:off + 25]) off += 25 flow_ctrl = _bits_to_int(info[off:off + 1]) off += 1 upper_link = _bits_to_int(info[off:off + 1]) return cls(harq_feedback, flow_ctrl, upper_link)
# --------------------------------------------------------------------------- # B3: 同步广播 (帧类型 3/4) # ---------------------------------------------------------------------------
[文档] @dataclass class ControlInfoB3: packet_group: int # 5 bit packet_sn: int # 5 bit mcs: int # 4 bit data_length: int # 11 bit flow_ctrl: int # 1 bit max_sn_indicator: int # 1 bit
[文档] def pack(self, llid: int) -> NDArray[np.int_]: info = np.concatenate([ _int_to_bits(self.packet_group, 5), _int_to_bits(self.packet_sn, 5), _int_to_bits(self.mcs, 4), _int_to_bits(self.data_length, 11), _int_to_bits(self.flow_ctrl, 1), _int_to_bits(self.max_sn_indicator, 1), ]) return _b_group_pack(info, llid)
[文档] @classmethod def unpack(cls, bits: NDArray[np.int_], llid: int) -> ControlInfoB3: info = _b_group_check(bits, llid) off = 0 packet_group = _bits_to_int(info[off:off + 5]) off += 5 packet_sn = _bits_to_int(info[off:off + 5]) off += 5 mcs = _bits_to_int(info[off:off + 4]) off += 4 data_length = _bits_to_int(info[off:off + 11]) off += 11 flow_ctrl = _bits_to_int(info[off:off + 1]) off += 1 max_sn_indicator = _bits_to_int(info[off:off + 1]) return cls(packet_group, packet_sn, mcs, data_length, flow_ctrl, max_sn_indicator)
# --------------------------------------------------------------------------- # B4: 异步广播 (帧类型 3/4) # ---------------------------------------------------------------------------
[文档] @dataclass class ControlInfoB4: broadcast_group_id: int # 5 bit data_update: int # 1 bit packet_sn: int # 4 bit mcs: int # 4 bit data_length: int # 11 bit flow_ctrl: int # 1 bit max_sn_indicator: int # 1 bit
[文档] def pack(self, llid: int) -> NDArray[np.int_]: info = np.concatenate([ _int_to_bits(self.broadcast_group_id, 5), _int_to_bits(self.data_update, 1), _int_to_bits(self.packet_sn, 4), _int_to_bits(self.mcs, 4), _int_to_bits(self.data_length, 11), _int_to_bits(self.flow_ctrl, 1), _int_to_bits(self.max_sn_indicator, 1), ]) return _b_group_pack(info, llid)
[文档] @classmethod def unpack(cls, bits: NDArray[np.int_], llid: int) -> ControlInfoB4: info = _b_group_check(bits, llid) off = 0 broadcast_group_id = _bits_to_int(info[off:off + 5]) off += 5 data_update = _bits_to_int(info[off:off + 1]) off += 1 packet_sn = _bits_to_int(info[off:off + 4]) off += 4 mcs = _bits_to_int(info[off:off + 4]) off += 4 data_length = _bits_to_int(info[off:off + 11]) off += 11 flow_ctrl = _bits_to_int(info[off:off + 1]) off += 1 max_sn_indicator = _bits_to_int(info[off:off + 1]) return cls(broadcast_group_id, data_update, packet_sn, mcs, data_length, flow_ctrl, max_sn_indicator)
# --------------------------------------------------------------------------- # B5: 基础广播 / 发现 / 接入 (帧类型 3/4) # ---------------------------------------------------------------------------
[文档] @dataclass class ControlInfoB5: reserved: int # 7 bit msg_type: int # 3 bit accessible: int # 1 bit queryable: int # 1 bit directed_content: int # 1 bit undirected_content: int # 1 bit data_update: int # 1 bit mcs: int # 4 bit data_length: int # 8 bit
[文档] def pack(self, llid: int) -> NDArray[np.int_]: info = np.concatenate([ _int_to_bits(self.reserved, 7), _int_to_bits(self.msg_type, 3), _int_to_bits(self.accessible, 1), _int_to_bits(self.queryable, 1), _int_to_bits(self.directed_content, 1), _int_to_bits(self.undirected_content, 1), _int_to_bits(self.data_update, 1), _int_to_bits(self.mcs, 4), _int_to_bits(self.data_length, 8), ]) return _b_group_pack(info, llid)
[文档] @classmethod def unpack(cls, bits: NDArray[np.int_], llid: int) -> ControlInfoB5: info = _b_group_check(bits, llid) off = 0 reserved = _bits_to_int(info[off:off + 7]) off += 7 msg_type = _bits_to_int(info[off:off + 3]) off += 3 accessible = _bits_to_int(info[off:off + 1]) off += 1 queryable = _bits_to_int(info[off:off + 1]) off += 1 directed_content = _bits_to_int(info[off:off + 1]) off += 1 undirected_content = _bits_to_int(info[off:off + 1]) off += 1 data_update = _bits_to_int(info[off:off + 1]) off += 1 mcs = _bits_to_int(info[off:off + 4]) off += 4 data_length = _bits_to_int(info[off:off + 8]) return cls(reserved, msg_type, accessible, queryable, directed_content, undirected_content, data_update, mcs, data_length)
# ========================================================================= # 帧类型 2 Polar 编码辅助 # =========================================================================
[文档] def polar_encode_control(raw_bits: NDArray[np.int_], n_coded: int = 64) -> NDArray[np.int_]: """对帧类型 2 的 A 组控制信息进行 Polar 编码。 输入 40 或 48 比特 (含 LQI + CRC12), 编码到 64 比特。 """ from nearlink_sdr.common.polar import PolarEncoder k = len(raw_bits) encoder = PolarEncoder(n_coded, k) return encoder.encode(raw_bits)
[文档] def polar_decode_control(coded_bits: NDArray[np.int_], k_info: int, n_coded: int = 64) -> NDArray[np.int_]: """对帧类型 2 的 A 组控制信息进行 Polar 解码。 输入 64 比特, 解码到 k_info 比特 (40 或 48)。 """ from nearlink_sdr.common.polar import get_polar_decoder decoder = get_polar_decoder(n_coded, k_info) llr = (1 - 2 * coded_bits.astype(float)) * 10.0 return decoder.decode(llr)