nearlink_sdr.mac.security_manager 源代码

"""安全流程集成管理 -- TXS-10002-2025 标准 9.2-9.4。

将配对信令 (security.py)、密码学原语 (crypto.py) 和链路管理器
整合为端到端的安全流程:
- 配对状态机 (PairingManager): 驱动配对信令交互和密钥协商
- 帧加密上下文 (FrameCryptoContext): 管理 payload_count 和 AES-CCM 加解密
- ECDH 密钥交换 (P-256): 生成密钥对和共享密钥
"""

from __future__ import annotations

__all__ = [
    "ECDHKeyPair",
    "FrameCryptoContext",
    "PairingFailureReason",
    "PairingManager",
    "PairingState",
    "run_pairing_procedure",
]


import logging
import os
from dataclasses import dataclass, field
from enum import IntEnum, auto

from nearlink_sdr.mac.crypto import (
    AuthMethod,
    KdfType,
    aes_ccm_decrypt,
    aes_ccm_encrypt,
    build_ccm_nonce_async,
    compute_iv,
    derive_link_key,
    derive_session_key,
    generate_confirm_code,
)
from nearlink_sdr.mac.security import (
    GNodeConfirmCode,
    PairingConfirm,
    PairingFailure,
    PairingInitialInfo,
    PairingInitiate,
    PairingRequest,
    PairingResponse,
    RaMessage,
    RbMessage,
    TNodeConfirmCode,
)

log = logging.getLogger(__name__)


# ── 配对状态 ──


[文档] class PairingState(IntEnum): """配对流程状态。""" IDLE = 0 INITIATED = auto() REQUEST_SENT = auto() RESPONSE_SENT = auto() CONFIRM_SENT = auto() PUBLIC_KEY_EXCHANGED = auto() CONFIRM_CODE_SENT = auto() COMPLETED = auto() FAILED = auto()
[文档] class PairingFailureReason(IntEnum): """配对失败原因 (标准 9.2)。""" PASSKEY_ENTRY_FAILED = 0x01 OOB_NOT_AVAILABLE = 0x02 AUTHENTICATION_REQUIREMENTS = 0x03 CONFIRM_VALUE_FAILED = 0x04 PAIRING_NOT_SUPPORTED = 0x05 ENCRYPTION_KEY_SIZE = 0x06 COMMAND_NOT_SUPPORTED = 0x07 UNSPECIFIED_REASON = 0x08 REPEATED_ATTEMPTS = 0x09 INVALID_PARAMETERS = 0x0A DHKEY_CHECK_FAILED = 0x0B NUMERIC_COMPARISON_FAILED = 0x0C
# ── ECDH P-256 密钥交换 ──
[文档] @dataclass class ECDHKeyPair: """P-256 椭圆曲线密钥对。""" private_key_bytes: bytes = b"" public_key_x: bytes = b"" public_key_y: bytes = b""
[文档] @classmethod def generate(cls) -> ECDHKeyPair: """生成 P-256 密钥对。""" from cryptography.hazmat.primitives.asymmetric.ec import ( SECP256R1, generate_private_key, ) from cryptography.hazmat.primitives.serialization import ( Encoding, NoEncryption, PrivateFormat, PublicFormat, ) private_key = generate_private_key(SECP256R1()) private_bytes = private_key.private_bytes( Encoding.DER, PrivateFormat.PKCS8, NoEncryption(), ) public_bytes = private_key.public_key().public_bytes( Encoding.X962, PublicFormat.UncompressedPoint, ) # X962 uncompressed: 0x04 || X(32B) || Y(32B) pub_x = public_bytes[1:33] pub_y = public_bytes[33:65] return cls( private_key_bytes=private_bytes, public_key_x=pub_x, public_key_y=pub_y, )
[文档] def compute_shared_secret(self, peer_pub_x: bytes, peer_pub_y: bytes) -> bytes: """计算 ECDH 共享密钥。 :returns: 共享密钥 (32 字节)。 """ from cryptography.hazmat.primitives.asymmetric.ec import ( ECDH, SECP256R1, EllipticCurvePublicKey, ) from cryptography.hazmat.primitives.serialization import ( load_der_private_key, ) private_key = load_der_private_key(self.private_key_bytes, password=None) # 构建对端公钥 peer_pub_bytes = b"\x04" + peer_pub_x + peer_pub_y peer_public_key = EllipticCurvePublicKey.from_encoded_point( SECP256R1(), peer_pub_bytes, ) shared_key = private_key.exchange(ECDH(), peer_public_key) return shared_key
# ── 配对管理器 ──
[文档] @dataclass class PairingManager: """配对流程状态机 (标准 9.2)。 驱动 G 节点与 T 节点之间的配对信令交互, 完成: 1. 配对发起/请求/响应 2. ECDH 公钥交换 3. 确认码互验 4. 链路密钥派生 5. 会话密钥派生 :ivar is_g_node: 本端是否为 G 节点。 :ivar kdf_type: 密钥派生函数类型。 :ivar auth_method: 鉴权方式。 :ivar max_key_length: 最大密钥长度 (字节)。 """ is_g_node: bool = True kdf_type: KdfType = KdfType.AES_CMAC auth_method: AuthMethod = AuthMethod.NUMERIC_COMPARISON max_key_length: int = 16 local_address: bytes = b"\x00" * 6 peer_address: bytes = b"\x00" * 6 # 状态 state: PairingState = PairingState.IDLE failure_reason: PairingFailureReason | None = None # 密钥材料 local_keypair: ECDHKeyPair = field(default_factory=ECDHKeyPair) peer_pub_x: bytes = b"" peer_pub_y: bytes = b"" local_random: bytes = b"" peer_random: bytes = b"" dh_key: bytes = b"" link_key: bytes = b"" session_key: bytes = b"" integrity_key: bytes | None = None # 信令历史 _outgoing: list[object] = field(default_factory=list)
[文档] def start_pairing(self) -> list[object]: """发起配对, 返回待发送的信令列表。 G 节点发送: PairingInitiate → PairingRequest T 节点发送: (等待 G 节点发起) """ self.state = PairingState.IDLE self._outgoing.clear() # 生成本端密钥对 self.local_keypair = ECDHKeyPair.generate() # 生成本端随机数 self.local_random = os.urandom(16) if self.is_g_node: initiate = PairingInitiate(auth_request=0x01) request = PairingRequest( io_capability=0x03, oob_data_flag=0, auth_request=0x01, max_key_length=self.max_key_length, security_dist_info=0x01, crypto_capability=b"\x01\x00\x00\x00", psk_indication=0, ) self._outgoing.extend([initiate, request]) self.state = PairingState.REQUEST_SENT else: # T 节点等待接收 self.state = PairingState.INITIATED return list(self._outgoing)
[文档] def process_message(self, msg: object) -> list[object]: """处理收到的配对信令, 返回待发送的响应信令。""" responses: list[object] = [] if isinstance(msg, PairingInitiate): responses = self._handle_initiate(msg) elif isinstance(msg, PairingRequest): responses = self._handle_request(msg) elif isinstance(msg, PairingResponse): responses = self._handle_response(msg) elif isinstance(msg, PairingConfirm): responses = self._handle_confirm(msg) elif isinstance(msg, PairingInitialInfo): responses = self._handle_initial_info(msg) elif isinstance(msg, RaMessage): responses = self._handle_ra(msg) elif isinstance(msg, RbMessage): responses = self._handle_rb(msg) elif isinstance(msg, TNodeConfirmCode): responses = self._handle_t_confirm(msg) elif isinstance(msg, GNodeConfirmCode): responses = self._handle_g_confirm(msg) elif isinstance(msg, PairingFailure): self.state = PairingState.FAILED self.failure_reason = PairingFailureReason(msg.reason) log.warning("配对失败: %s", self.failure_reason.name) self._outgoing.extend(responses) return responses
def _handle_initiate(self, msg: PairingInitiate) -> list[object]: if not self.is_g_node: self.state = PairingState.INITIATED return [] def _handle_request(self, msg: PairingRequest) -> list[object]: if not self.is_g_node: # T 节点: 响应配对请求 response = PairingResponse( io_capability=0x03, oob_data_flag=0, auth_request=0x01, max_key_length=self.max_key_length, security_dist_info=0x01, crypto_capability=b"\x01\x00\x00\x00", psk_indication=0, ) self.state = PairingState.RESPONSE_SENT return [response] return [] def _handle_response(self, msg: PairingResponse) -> list[object]: if self.is_g_node: # G 节点: 发送 PairingConfirm (含公钥) confirm = PairingConfirm( key_length=self.max_key_length, auth_method=int(self.auth_method), crypto_algorithm=b"\x01\x00\x00\x00", g_public_key_x=self.local_keypair.public_key_x, g_public_key_y=self.local_keypair.public_key_y, ) self.state = PairingState.CONFIRM_SENT return [confirm] return [] def _handle_confirm(self, msg: PairingConfirm) -> list[object]: if not self.is_g_node: # T 节点: 保存 G 公钥, 发送 T 公钥 self.peer_pub_x = msg.g_public_key_x self.peer_pub_y = msg.g_public_key_y initial_info = PairingInitialInfo( t_public_key_x=self.local_keypair.public_key_x, t_public_key_y=self.local_keypair.public_key_y, ) self.state = PairingState.PUBLIC_KEY_EXCHANGED return [initial_info] return [] def _handle_initial_info(self, msg: PairingInitialInfo) -> list[object]: if self.is_g_node: # G 节点: 保存 T 公钥, 计算 DH Key, 发送 Ra self.peer_pub_x = msg.t_public_key_x self.peer_pub_y = msg.t_public_key_y self._compute_dh_key() ra = RaMessage(ra=self.local_random) self.state = PairingState.PUBLIC_KEY_EXCHANGED return [ra] return [] def _handle_ra(self, msg: RaMessage) -> list[object]: if not self.is_g_node: # T 节点: 保存 Ra, 计算 DH Key, 发送 Rb self.peer_random = msg.ra self._compute_dh_key() rb = RbMessage(rb=self.local_random) return [rb] return [] def _handle_rb(self, msg: RbMessage) -> list[object]: if self.is_g_node: # G 节点: 保存 Rb, 生成并发送确认码 self.peer_random = msg.rb confirm_code = self._generate_confirm_code() g_confirm = GNodeConfirmCode(confirm_code=confirm_code) self.state = PairingState.CONFIRM_CODE_SENT return [g_confirm] return [] def _handle_t_confirm(self, msg: TNodeConfirmCode) -> list[object]: if self.is_g_node: # G 节点: 验证 T 确认码 expected = self._generate_peer_confirm_code() if msg.confirm_code == expected: self._derive_keys() self.state = PairingState.COMPLETED log.info("配对完成 (G 节点)") else: self.state = PairingState.FAILED self.failure_reason = PairingFailureReason.CONFIRM_VALUE_FAILED return [PairingFailure(reason=0x04)] return [] def _handle_g_confirm(self, msg: GNodeConfirmCode) -> list[object]: if not self.is_g_node: # T 节点: 验证 G 确认码, 发送 T 确认码 expected = self._generate_peer_confirm_code() if msg.confirm_code == expected: confirm_code = self._generate_confirm_code() t_confirm = TNodeConfirmCode(confirm_code=confirm_code) self._derive_keys() self.state = PairingState.COMPLETED log.info("配对完成 (T 节点)") return [t_confirm] else: self.state = PairingState.FAILED self.failure_reason = PairingFailureReason.CONFIRM_VALUE_FAILED return [PairingFailure(reason=0x04)] return [] def _compute_dh_key(self) -> None: """计算 ECDH 共享密钥。""" self.dh_key = self.local_keypair.compute_shared_secret( self.peer_pub_x, self.peer_pub_y, ) def _generate_confirm_code(self) -> bytes: """生成本端确认码。""" if self.is_g_node: g_pub = self.local_keypair.public_key_x + self.local_keypair.public_key_y t_pub = self.peer_pub_x + self.peer_pub_y else: g_pub = self.peer_pub_x + self.peer_pub_y t_pub = self.local_keypair.public_key_x + self.local_keypair.public_key_y return generate_confirm_code( self.kdf_type, self.auth_method, self.local_random, g_pub, t_pub, ) def _generate_peer_confirm_code(self) -> bytes: """生成对端确认码 (用于验证)。""" if self.is_g_node: g_pub = self.local_keypair.public_key_x + self.local_keypair.public_key_y t_pub = self.peer_pub_x + self.peer_pub_y else: g_pub = self.peer_pub_x + self.peer_pub_y t_pub = self.local_keypair.public_key_x + self.local_keypair.public_key_y return generate_confirm_code( self.kdf_type, self.auth_method, self.peer_random, g_pub, t_pub, ) def _derive_keys(self) -> None: """从 DH Key 派生链路密钥和会话密钥。""" ra = self.local_random if self.is_g_node else self.peer_random rb = self.peer_random if self.is_g_node else self.local_random g_addr = self.local_address if self.is_g_node else self.peer_address t_addr = self.peer_address if self.is_g_node else self.local_address self.link_key = derive_link_key( self.kdf_type, self.dh_key, ra, rb, g_addr, t_addr, ) # 确定性生成分散因子: 从 Ra 和 Rb 截取前 8 字节 g_div = ra[:8] t_div = rb[:8] self.session_key, self.integrity_key = derive_session_key( self.kdf_type, self.link_key, g_div, t_div, use_authenticated=True, ) @property def is_paired(self) -> bool: return self.state == PairingState.COMPLETED
# ── 帧加密上下文 ──
[文档] @dataclass class FrameCryptoContext: """帧级 AES-CCM 加解密上下文。 管理 payload_count 单调递增和 nonce 构建, 为 MAC 帧提供加解密服务。 :ivar session_key: 会话密钥 (16 字节)。 :ivar iv_base: 初始化向量基底 (8 字节)。 :ivar direction: 传输方向 (0=G→T, 1=T→G)。 :ivar mic_len: MIC 长度 (字节, 4/8/12/16)。 :ivar frame_type: 帧类型 (用于 IV 计算)。 :ivar link_id: 链路标识 (用于 IV 计算)。 """ session_key: bytes = b"\x00" * 16 iv_base: bytes = b"\x00" * 8 direction: int = 0 mic_len: int = 4 frame_type: int = 2 link_id: int = 0 _tx_payload_count: int = 0 _rx_payload_count: int = 0 @property def tx_count(self) -> int: return self._tx_payload_count @property def rx_count(self) -> int: return self._rx_payload_count def _build_nonce(self, payload_count: int, data_length: int) -> bytes: """构建 CCM nonce (13 字节)。 build_ccm_nonce_async 返回 16 字节 B0 块: flag(1B) | nonce(13B) | data_length(2B) AESCCM 需要 13 字节 nonce 部分。 """ iv = compute_iv(self.iv_base, self.link_id, self.frame_type) b0_block = build_ccm_nonce_async( payload_count=payload_count, direction=self.direction, iv_base=iv, data_length=data_length, ) return b0_block[1:14]
[文档] def encrypt(self, plaintext: bytes, aad: bytes = b"") -> tuple[bytes, bytes]: """加密一帧数据, 自动递增 payload_count。 :param plaintext: 明文数据。 :param aad: 关联数据 (不加密但参与完整性校验)。 :returns: (密文, MIC)。 """ nonce = self._build_nonce(self._tx_payload_count, len(plaintext)) ciphertext, mic = aes_ccm_encrypt( self.session_key, nonce, plaintext, aad, self.mic_len, ) self._tx_payload_count += 1 return ciphertext, mic
[文档] def decrypt(self, ciphertext: bytes, mic: bytes, aad: bytes = b"") -> bytes: """解密一帧数据, 自动递增 payload_count。 :param ciphertext: 密文数据。 :param mic: 消息完整性码。 :param aad: 关联数据。 :returns: 解密后的明文。 """ nonce = self._build_nonce(self._rx_payload_count, len(ciphertext)) plaintext = aes_ccm_decrypt( self.session_key, nonce, ciphertext, mic, aad, self.mic_len, ) self._rx_payload_count += 1 return plaintext
[文档] def reset_counters(self) -> None: """重置 payload 计数器。""" self._tx_payload_count = 0 self._rx_payload_count = 0
# ── 端到端配对流程 (仿真/测试用) ──
[文档] def run_pairing_procedure( g_address: bytes = b"\x01\x02\x03\x04\x05\x06", t_address: bytes = b"\x0A\x0B\x0C\x0D\x0E\x0F", kdf_type: KdfType = KdfType.AES_CMAC, auth_method: AuthMethod = AuthMethod.NUMERIC_COMPARISON, ) -> tuple[PairingManager, PairingManager]: """执行完整的端到端配对流程。 模拟 G 节点和 T 节点之间的配对信令交互, 完成密钥交换和会话密钥派生。 :returns: (G 节点管理器, T 节点管理器)。 """ g_mgr = PairingManager( is_g_node=True, kdf_type=kdf_type, auth_method=auth_method, local_address=g_address, peer_address=t_address, ) t_mgr = PairingManager( is_g_node=False, kdf_type=kdf_type, auth_method=auth_method, local_address=t_address, peer_address=g_address, ) # G 节点发起 g_msgs = g_mgr.start_pairing() # T 节点也准备密钥 t_mgr.start_pairing() # G 产生的消息发给 T, T 产生的消息发给 G # 用 (source, msg) 元组跟踪消息来源 pending: list[tuple[str, object]] = [("g", m) for m in g_msgs] max_rounds = 20 for _ in range(max_rounds): if not pending: break next_pending: list[tuple[str, object]] = [] for source, msg in pending: if source == "g": # G 发送的消息由 T 处理 responses = t_mgr.process_message(msg) next_pending.extend(("t", r) for r in responses) else: # T 发送的消息由 G 处理 responses = g_mgr.process_message(msg) next_pending.extend(("g", r) for r in responses) pending = next_pending if (g_mgr.state == PairingState.COMPLETED and t_mgr.state == PairingState.COMPLETED): break return g_mgr, t_mgr