Source code for pyrxd.gravity.radiant_leg

"""Concrete Radiant covenant leg for the Gravity Taproot-HTLC atomic swap.

This is the production ``radiant_leg`` the
:class:`pyrxd.gravity.swap_coordinator.SwapCoordinator` drives (the coordinator
tests use a duck-typed fake; this is the real object). It composes:

* :mod:`pyrxd.gravity.htlc_covenant` — the funded covenant SPK builders;
* :mod:`pyrxd.gravity.htlc_spend` — the claim (preimage) / refund (CSV) TX builders;
* a :class:`RadiantChainIO` over :class:`pyrxd.network.electrumx.ElectrumXClient`
  for broadcast + confirmation polling + reading the funded covenant value;
* a :class:`SeenStore` (in-memory) for H-freshness.

Plus a :class:`RxinDexerRefAdapter` that resolves a genesis ref to a
:class:`pyrxd.gravity.ref_authenticity.ResolvedRef` via the RXinDexer
``glyph.get_token`` RPC, so the coordinator's pre-lock REF-authenticity gate has a
real backend.

Design notes (T7 plan D5/D6, reviewed)
--------------------------------------
* ``RadiantChainIO`` is a thin helper (broadcast + wait_confirmations + read UTXO),
  NOT unified with :class:`pyrxd.gravity.trade.GravityTrade` — that drives the
  *different* SPV-oracle finalize swap.
* The leg holds the party's own Radiant pkhs (taker + maker) so it can build the
  covenant and the spend holder outputs. ``expected_covenant_scriptpubkey`` builds
  the covenant from the negotiated terms and **asserts the resulting
  ``hash256(holder)`` binds equal the terms' ``taker_dest_hash``/``maker_dest_hash``**
  — fail-closed if the leg's configured pkhs don't produce the covenant the terms
  committed to (a wrong-key/wrong-party guard).
* ``carrier_value`` (the funded covenant output value) is read from the on-chain
  UTXO, never self-reported.
* **AUDIT GATE:** reuses :func:`pyrxd.btc_wallet.htlc_leg.require_audit_cleared` —
  the leg refuses to construct for a value-bearing network without the explicit
  opt-in (the always-succeeding fakes hide the one-sided-loss surface).
* ``SeenStore`` is an in-memory ``set`` for this milestone (a SQLite durable store
  is deferred to the audit-gated track; a blocking ``sqlite3`` call would stall the
  async loop). The duck-typed ``has_seen``/``mark_seen`` shape lets a durable store
  drop in later.
"""

from __future__ import annotations

import logging
from typing import Protocol, runtime_checkable

from pyrxd.btc_wallet.htlc_leg import require_audit_cleared
from pyrxd.btc_wallet.taproot import TimeUnit
from pyrxd.glyph.types import GlyphRef
from pyrxd.gravity.htlc_covenant import (
    HtlcCovenant,
    build_htlc_covenant_ft,
    build_htlc_covenant_nft,
    build_htlc_covenant_rxd,
)
from pyrxd.gravity.htlc_spend import FeeInput, build_htlc_claim_tx, build_htlc_refund_tx
from pyrxd.gravity.ref_authenticity import ResolvedRef
from pyrxd.gravity.swap_state import NegotiatedTerms, SwapRecord
from pyrxd.security.errors import NetworkError, ValidationError
from pyrxd.security.types import Hex20

__all__ = [
    "FeeUtxoSource",
    "RadiantBroadcaster",
    "RadiantChainIO",
    "RadiantCovenantLeg",
    "RxinDexerRefAdapter",
    "SeenStore",
]

logger = logging.getLogger(__name__)


# --------------------------------------------------------------------------- SeenStore


[docs] class SeenStore: """In-memory H-freshness store (the coordinator's ``reserve``/``has_seen``). Records every hashlock H the coordinator has committed to funding, so a reused H is rejected for BOTH reasons: economic (free-option replay) and cross-swap preimage replay. ``reserve(H)`` is the authoritative atomic test-and-set the coordinator calls PRE-broadcast; ``has_seen`` is a read-only advisory probe (the pre-lock gate's cheap early-reject), never the binding decision. NON-DURABLE (``durable = False``): a plain ``set``, so freshness does NOT survive a restart or a second process. That is acceptable only for a single-process, single-shot run that mints a fresh H per swap (the dust runbook); the coordinator's construct-time guard refuses this store on a value-bearing network unless the operator passes ``CoordinatorConfig(accept_nondurable_seen=True)``. A durable replacement (SQLite ``INSERT OR IGNORE`` keyed on H, declaring ``durable = True``) is deferred to the external-audit track; it MUST stay non-blocking (``asyncio.to_thread`` behind an async ``reserve``) and fsync the reservation BEFORE the BTC broadcast. The method shape is duck-compatible so that durable store drops in unchanged. """ durable = False
[docs] def __init__(self) -> None: self._seen: set[bytes] = set()
[docs] def reserve(self, hashlock: bytes) -> bool: """Atomically record H if unseen; True if freshly reserved, else False. Atomic on the single-threaded event loop precisely because there is no ``await`` between the membership test and the add. """ h = bytes(hashlock) if h in self._seen: return False self._seen.add(h) return True
[docs] def has_seen(self, hashlock: bytes) -> bool: return bytes(hashlock) in self._seen
[docs] def mark_seen(self, hashlock: bytes) -> None: # Retained as an unused primitive for the roundtrip test + back-compat; the # coordinator's authoritative consume is reserve() (atomic, pre-broadcast). self._seen.add(bytes(hashlock))
# --------------------------------------------------------------------------- chain IO @runtime_checkable class RadiantBroadcaster(Protocol): """Submit a raw Radiant tx; idempotent on an already-known tx.""" async def broadcast(self, raw_tx: bytes) -> str: # pragma: no cover - Protocol ...
[docs] class RadiantChainIO: """Thin chain helper over an ``ElectrumXClient``-like object. Provides exactly what the leg needs: broadcast, confirmation depth, and the on-chain value of a covenant output. NOT unified with ``GravityTrade`` (that drives the SPV-oracle finalize swap, a different protocol). The injected ``client`` must expose ``broadcast(raw)->txid``, ``get_transaction_verbose(txid)->dict`` (with ``confirmations``), and ``get_utxos(script_hash)->list`` (records with ``tx_hash``/``tx_pos``/``value``). """
[docs] def __init__(self, client) -> None: for m in ("broadcast", "get_transaction_verbose", "get_utxos"): if not hasattr(client, m): raise ValidationError(f"RadiantChainIO client must provide {m}()") self._client = client
[docs] async def broadcast(self, raw_tx: bytes) -> str: if not isinstance(raw_tx, (bytes, bytearray)) or len(raw_tx) == 0: raise ValidationError("raw_tx must be non-empty bytes") try: return str(await self._client.broadcast(bytes(raw_tx))) except Exception as exc: msg = str(exc).lower() if "already" in msg and ("known" in msg or "mempool" in msg or "chain" in msg): # Idempotent: the node already has it. Re-derive nothing; the caller # tracks the txid from the builder. Surface a sentinel for the leg. raise _AlreadyKnown() from exc raise NetworkError(f"radiant broadcast failed: {exc}") from exc
[docs] async def confirmations(self, txid: str) -> int: info = await self._client.get_transaction_verbose(txid) if not isinstance(info, dict): raise NetworkError("get_transaction_verbose did not return a dict") return int(info.get("confirmations", 0) or 0)
[docs] async def find_covenant_utxo(self, spk: bytes, *, expected_value: int | None = None) -> tuple[str, int, int]: """Locate the funded covenant UTXO for ``spk`` -> ``(outpoint, value, height)``. Scans the UTXO set of the covenant scriptPubKey (ElectrumX script-hash = ``sha256(spk)`` reversed). The covenant funds exactly one output, so there is one matching UTXO; if ``expected_value`` is given, the match must equal it (a wrong value is a mis-funded covenant -> fail-closed). The returned value is the ON-CHAIN value, never a self-report. """ import hashlib # A script-hash-keyed client (e.g. SshTrRadiantClient via scantxoutset) can only # resolve a script_hash back to its SPK from a registry; an UNregistered covenant # SPK scans EMPTY and is misread as "not funded / already spent". A fresh per-swap # claim leg (sidecar_leg_resolver) never pre-registers, so register the SPK we are # about to scan here — idempotent, and a no-op for clients without register_spk. register = getattr(self._client, "register_spk", None) if callable(register): register(bytes(spk)) script_hash = hashlib.sha256(bytes(spk)).digest()[::-1] utxos = await self._client.get_utxos(script_hash) if not utxos: raise NetworkError("no UTXO found for the covenant scriptPubKey (not yet funded / wrong SPK)") if expected_value is not None: utxos = [u for u in utxos if int(u.value) == int(expected_value)] if not utxos: raise NetworkError("no covenant UTXO matches the expected carrier value; fail-closed") if len(utxos) > 1: raise NetworkError(f"ambiguous covenant UTXO set ({len(utxos)} candidates); fail-closed") u = utxos[0] return f"{u.tx_hash}:{u.tx_pos}", int(u.value), int(u.height)
[docs] async def covenant_unspent_incl_mempool(self, outpoint: str) -> bool | None: """Mempool-AWARE liveness of a covenant outpoint — the complement to ``find_covenant_utxo``'s mempool-BLIND scantxoutset scan. ``True`` = unspent considering the mempool; ``False`` = spent (confirmed OR by a PENDING mempool tx); ``None`` = the client cannot answer (the caller keeps its own idempotency guard). Lets the autonomous claim executor treat a covenant already spent IN THE MEMPOOL as claimed — killing the per-tick re-carve drain WITHOUT a durable cross-restart store and WITHOUT the SeenStore's eviction blind spot (a truly-unspent covenant, e.g. after a claim is evicted by a reorg, correctly re-fires). """ fn = getattr(self._client, "txout_unspent_incl_mempool", None) if not callable(fn): return None txid, _sep, vout = outpoint.partition(":") if not _sep or not vout.isdigit(): raise ValidationError(f"bad covenant outpoint {outpoint!r}") return bool(await fn(txid, int(vout)))
class _AlreadyKnown(Exception): """Internal sentinel: a broadcast hit an already-known tx (idempotent success).""" # --------------------------------------------------------------------------- ref adapter
[docs] class RxinDexerRefAdapter: """Resolve a genesis ref to a :class:`ResolvedRef` via RXinDexer ``glyph.get_token``. Implements the ``RefAuthenticityIndexer`` protocol the pre-lock gate awaits. Maps the indexer's token dict to the inspectable fields the gate binds: * **genesis_outpoint** — from the token's ``ref_outpoint`` (``txid:vout``), re-encoded to the 36-byte wire ref so it compares equal to the advertised ``genesis_ref``. (``glyph.get_token`` only returns genuinely-minted Glyph tokens, so a resolvable token IS a ``gly`` reveal — see ``has_gly_marker``.) * **has_gly_marker** — ``True`` whenever the indexer returned a token dict for the ref (the indexer only indexes real ``gly`` envelopes). A bare wallet-UTXO singleton (the R1 forgery) resolves to ``None`` and the gate fails closed. * **payload_hash** — from ``payload_hash`` (bytes), or ``b""`` if absent. * **confirmations** — read separately from the genesis tx via ``chain_io`` (``glyph.get_token`` does not carry confs). NOTE (T7 plan D3): a single indexer is a SPOF, and decoding a token dict is NOT SPV authenticity (no Merkle/header binding). For the regtest milestone the local node is ground truth; SPV-bound / multi-source cross-checking is the audit-gated track. This adapter is the single-indexer regtest backend. """
[docs] def __init__(self, indexer, chain_io: RadiantChainIO) -> None: if not hasattr(indexer, "glyph_get_token"): raise ValidationError("indexer must provide glyph_get_token()") if not isinstance(chain_io, RadiantChainIO): raise ValidationError("chain_io must be a RadiantChainIO") self._indexer = indexer self._chain_io = chain_io
[docs] async def resolve_ref(self, genesis_ref: bytes) -> ResolvedRef | None: ref = GlyphRef.from_bytes(bytes(genesis_ref)) # raises on malformed -> gate fail-closed token = await self._indexer.glyph_get_token(f"{ref.txid}:{ref.vout}") if token is None: return None # unknown token -> the gate fails closed (R1 forgery) if not isinstance(token, dict): raise NetworkError(f"glyph_get_token returned {type(token).__name__}, expected dict|None") resolved_outpoint = self._genesis_outpoint(token, ref) payload_hash = self._payload_hash(token) confs = await self._chain_io.confirmations(ref.txid) return ResolvedRef( genesis_outpoint=resolved_outpoint, has_gly_marker=True, # glyph.get_token only resolves real gly reveals payload_hash=payload_hash, confirmations=confs, )
@staticmethod def _genesis_outpoint(token: dict, queried: GlyphRef) -> bytes: """Re-encode the token's reported genesis outpoint to the 36-byte wire ref. RXinDexer's ``glyph.get_token`` reports the genesis outpoint under ``glyph_id`` (``txid:vout``), alongside ``txid``+``vout`` and an ``is_reveal`` flag (verified against a live regtest RXinDexer 2026-06-01: a genuine reveal resolves with ``glyph_id == queried`` and ``is_reveal=True``; the commit outpoint and bare wallet UTXOs resolve to ``None``). We also accept the legacy ``ref_outpoint`` / ``ref_txid`` + ``ref_vout`` field names as fallbacks for other indexer builds. The token must be a genesis REVEAL for the outpoint to be a genesis: a transfer UTXO would report the genesis under ``glyph_id`` but is itself a different outpoint than ``queried``, so the gate's ``genesis_outpoint == advertised_ref`` binding would (correctly) reject it. If the indexer reports no resolvable outpoint, we return a value that will NOT equal the advertised ref, so the binding fails closed. """ # RXinDexer native: glyph_id == "txid:vout" of the genesis reveal. glyph_id = token.get("glyph_id") if isinstance(glyph_id, str) and glyph_id.count(":") == 1: txid, vout_s = glyph_id.split(":") try: return GlyphRef(txid=txid, vout=int(vout_s)).to_bytes() except (ValidationError, ValueError): return b"\x00" * 36 # RXinDexer native: separate txid + vout fields. txid = token.get("txid") vout = token.get("vout") if isinstance(txid, str) and isinstance(vout, int) and not isinstance(vout, bool): try: return GlyphRef(txid=txid, vout=vout).to_bytes() except (ValidationError, ValueError): return b"\x00" * 36 # Legacy/alternate indexer field names. outpoint = token.get("ref_outpoint") if isinstance(outpoint, str) and outpoint.count(":") == 1: txid, vout_s = outpoint.split(":") try: return GlyphRef(txid=txid, vout=int(vout_s)).to_bytes() except (ValidationError, ValueError): return b"\x00" * 36 rtxid = token.get("ref_txid") rvout = token.get("ref_vout") if isinstance(rtxid, str) and isinstance(rvout, int) and not isinstance(rvout, bool): try: return GlyphRef(txid=rtxid, vout=rvout).to_bytes() except (ValidationError, ValueError): return b"\x00" * 36 # No outpoint reported -> cannot confirm it equals the advertised ref. return b"\x00" * 36 @staticmethod def _payload_hash(token: dict) -> bytes: ph = token.get("payload_hash") if isinstance(ph, str): try: return bytes.fromhex(ph) except ValueError: return b"" if isinstance(ph, (bytes, bytearray)): return bytes(ph) return b""
# --------------------------------------------------------------------------- fee source @runtime_checkable class FeeUtxoSource(Protocol): """Supplies a plain-RXD fee UTXO (+ its WIF) for a covenant spend.""" def next_fee_input(self) -> FeeInput: # pragma: no cover - Protocol ... # --------------------------------------------------------------------------- the leg
[docs] class RadiantCovenantLeg: """The concrete Radiant ``radiant_leg`` (HTLC covenant claim/refund). Parameters ---------- network: Radiant network tag (regtest test chains bypass the audit gate). taker_pkh / maker_pkh: The taker (claim) and maker (refund) Radiant holder pubkey-hashes. The covenant binds ``hash256(holder(pkh))``; these must reproduce the terms' ``taker_dest_hash``/``maker_dest_hash`` (asserted in :meth:`expected_covenant_scriptpubkey`). chain_io: A :class:`RadiantChainIO` (broadcast + confirmations + UTXO value). fee_source: A :class:`FeeUtxoSource` supplying the fee input for each spend. min_confirmations: Confirmations required before the funded covenant value is trusted. audit_cleared: Explicit opt-in for a value-bearing ``network`` (see :func:`pyrxd.btc_wallet.htlc_leg.require_audit_cleared`). """
[docs] def __init__( self, *, network: str, taker_pkh: bytes, maker_pkh: bytes, chain_io: RadiantChainIO, fee_source: FeeUtxoSource, min_confirmations: int = 1, audit_cleared: bool = False, ) -> None: require_audit_cleared(network, audit_cleared=audit_cleared) if not isinstance(chain_io, RadiantChainIO): raise ValidationError("chain_io must be a RadiantChainIO") if not isinstance(fee_source, FeeUtxoSource): raise ValidationError("fee_source must implement next_fee_input()") if not isinstance(min_confirmations, int) or isinstance(min_confirmations, bool) or min_confirmations < 0: raise ValidationError("min_confirmations must be a non-negative int") self.network = network self.taker_pkh = bytes(Hex20(taker_pkh)) self.maker_pkh = bytes(Hex20(maker_pkh)) self.chain_io = chain_io self.fee_source = fee_source self.min_confirmations = min_confirmations
# -- covenant construction (binds the leg's pkhs to the terms) ---------- def _build_covenant(self, terms: NegotiatedTerms) -> HtlcCovenant: if not isinstance(terms, NegotiatedTerms): raise ValidationError("terms must be a NegotiatedTerms") # F-002 (belt-and-suspenders; NegotiatedTerms already enforces this): the # covenant CSV operand is a BIP68 BLOCK count with no SECONDS path on this # leg, so terms.t_rxd.value is used raw as refund_csv. Refuse a non-BLOCKS # t_rxd fail-closed rather than silently coercing it. if terms.t_rxd.unit is not TimeUnit.BLOCKS: raise ValidationError("Radiant leg requires a BLOCKS t_rxd (no SECONDS CSV encoding); fail-closed") variant = terms.asset_variant if variant == "rxd": cov = build_htlc_covenant_rxd( amount=terms.radiant_amount, taker_pkh=self.taker_pkh, maker_pkh=self.maker_pkh, hashlock=terms.hashlock, refund_csv=terms.t_rxd.value, ) else: ref = GlyphRef.from_bytes(terms.genesis_ref) if variant == "ft": cov = build_htlc_covenant_ft( genesis_txid=ref.txid, genesis_vout=ref.vout, amount=terms.radiant_amount, taker_pkh=self.taker_pkh, maker_pkh=self.maker_pkh, hashlock=terms.hashlock, refund_csv=terms.t_rxd.value, ) elif variant == "nft": cov = build_htlc_covenant_nft( genesis_txid=ref.txid, genesis_vout=ref.vout, nft_carrier_value=terms.radiant_amount, taker_pkh=self.taker_pkh, maker_pkh=self.maker_pkh, hashlock=terms.hashlock, refund_csv=terms.t_rxd.value, ) else: # pragma: no cover - NegotiatedTerms already constrains the variant raise ValidationError(f"unsupported asset_variant {variant!r}") # Bind the leg's configured pkhs to what the terms committed: the covenant's # hash256(holder) MUST equal the negotiated dest hashes, else the leg is # configured for the wrong party/keys — fail closed before any spend. if cov.expected_taker_hash != terms.taker_dest_hash: raise ValidationError("covenant taker hash != terms.taker_dest_hash (wrong taker pkh?); fail-closed") if cov.expected_maker_hash != terms.maker_dest_hash: raise ValidationError("covenant maker hash != terms.maker_dest_hash (wrong maker pkh?); fail-closed") return cov
[docs] async def expected_covenant_scriptpubkey(self, terms: NegotiatedTerms) -> bytes: """The covenant SPK the on-chain lock must equal (built from the terms).""" return self._build_covenant(terms).funded_spk
[docs] async def covenant_outpoint(self, terms: NegotiatedTerms) -> str: """Locate the funded covenant UTXO ``txid:vout`` by scanning its SPK's UTXO set. The maker locks the asset into the covenant SPK (a pure function of the terms); the leg finds that single funded UTXO on-chain via ElectrumX. The carrier value is bound to ``terms.radiant_amount`` so a mis-funded covenant fails closed. """ cov = self._build_covenant(terms) outpoint, _value, _height = await self.chain_io.find_covenant_utxo( cov.funded_spk, expected_value=terms.radiant_amount ) return outpoint
# -- spends ------------------------------------------------------------- async def _resolve_covenant(self, record: SwapRecord) -> tuple[HtlcCovenant, str, int]: """Build the covenant, locate its funded UTXO, conf-gate it, return value. Reads the on-chain value (never a self-report) and rejects a covenant shallower than ``min_confirmations`` so a reorg cannot un-fund it mid-spend. """ cov = self._build_covenant(record.terms) outpoint, value, _height = await self.chain_io.find_covenant_utxo( cov.funded_spk, expected_value=record.terms.radiant_amount ) txid = outpoint.split(":")[0] confs = await self.chain_io.confirmations(txid) if confs < self.min_confirmations: raise NetworkError( f"covenant has {confs} confirmations < required {self.min_confirmations}; not yet spendable" ) if ( value <= 0 ): # pragma: no cover - defense-in-depth; find_covenant_utxo already pins value>0 via expected_value raise NetworkError("covenant output value is non-positive; fail-closed") return cov, outpoint, value
[docs] async def claim_asset(self, record: SwapRecord, preimage: bytes) -> str: """Build + broadcast the TAKER's claim spend (reveals ``p``). Returns the txid.""" if not isinstance(record, SwapRecord): raise ValidationError("record must be a SwapRecord") cov, outpoint, carrier = await self._resolve_covenant(record) tx = build_htlc_claim_tx( covenant=cov, covenant_outpoint=outpoint, carrier_value=carrier, preimage=bytes(preimage), fee=self.fee_source.next_fee_input(), ) return await self._broadcast(tx)
[docs] async def refund_asset(self, record: SwapRecord) -> str: """Build + broadcast the MAKER's CSV refund spend. Returns the txid.""" if not isinstance(record, SwapRecord): raise ValidationError("record must be a SwapRecord") cov, outpoint, carrier = await self._resolve_covenant(record) tx = build_htlc_refund_tx( covenant=cov, covenant_outpoint=outpoint, carrier_value=carrier, fee=self.fee_source.next_fee_input(), ) return await self._broadcast(tx)
async def _broadcast(self, tx) -> str: raw = tx.serialize() try: return await self.chain_io.broadcast(raw) except _AlreadyKnown: # Idempotent: the node already has this exact tx -> its txid is authoritative. return tx.txid()