from __future__ import annotations
import hashlib
import hmac
from base64 import b64decode, b64encode
from collections.abc import Callable
from coincurve import PrivateKey as CcPrivateKey
from coincurve import PublicKey as CcPublicKey
from .aes_cbc import aes_decrypt_with_iv, aes_encrypt_with_iv
from .base58 import base58check_encode
from .constants import NETWORK_ADDRESS_PREFIX_DICT, NETWORK_WIF_PREFIX_DICT, PUBLIC_KEY_COMPRESSED_PREFIX_LIST, Network
from .curve import Point, curve
from .curve import curve_add as curve_add
from .curve import curve_multiply as curve_multiply
from .hash import hash160, hash256, hmac_sha256
from .security.errors import ValidationError
from .utils import (
decode_wif,
deserialize_ecdsa_recoverable,
serialize_ecdsa_der,
stringify_ecdsa_recoverable,
text_digest,
unstringify_ecdsa_recoverable,
)
class PublicKey:
def __init__(self, public_key: str | bytes | Point | CcPublicKey):
"""
create public key from serialized hex string or bytes, or curve point, or CoinCurve public key
"""
self.compressed: bool = True # use compressed format public key by default
if isinstance(public_key, Point):
# from curve point
self.key: CcPublicKey = CcPublicKey.from_point(public_key.x, public_key.y)
elif isinstance(public_key, CcPublicKey):
# from CoinCurve public key
self.key: CcPublicKey = public_key
else:
if isinstance(public_key, str):
# from serialized public key in hex string
pk: bytes = bytes.fromhex(public_key)
elif isinstance(public_key, bytes):
# from serialized public key in bytes
pk: bytes = public_key
else:
raise TypeError("unsupported public key type")
# here we have serialized public key in bytes
self.key: CcPublicKey = CcPublicKey(pk)
self.compressed: bool = pk[:1] in PUBLIC_KEY_COMPRESSED_PREFIX_LIST
def point(self) -> Point:
return Point(*self.key.point())
def serialize(self, compressed: bool | None = None) -> bytes:
compressed = self.compressed if compressed is None else compressed
return self.key.format(compressed)
def hex(self, compressed: bool | None = None) -> str:
return self.serialize(compressed).hex()
def hash160(self, compressed: bool | None = None) -> bytes:
"""
:returns: public key hash corresponding to this public key
"""
return hash160(self.serialize(compressed))
hash = hash160
def address(self, compressed: bool | None = None, network: Network = Network.MAINNET) -> str:
"""
:returns: P2PKH address corresponding to this public key
"""
return base58check_encode(NETWORK_ADDRESS_PREFIX_DICT.get(network) + self.hash160(compressed))
def verify(self, signature: bytes, message: bytes, hasher: Callable[[bytes], bytes] | None = hash256) -> bool:
"""
verify serialized ECDSA signature in bitcoin strict DER (low-s) format
"""
return self.key.verify(signature, message, hasher)
def verify_recoverable(
self, signature: bytes, message: bytes, hasher: Callable[[bytes], bytes] | None = hash256
) -> bool:
"""
verify serialized recoverable ECDSA signature in format "r (32 bytes) + s (32 bytes) + recovery_id (1 byte)"
"""
r, s, _ = deserialize_ecdsa_recoverable(signature)
der = serialize_ecdsa_der((r, s))
return self.verify(der, message, hasher) and self == recover_public_key(signature, message, hasher)
def derive_shared_secret(self, key: PrivateKey) -> bytes:
return PublicKey(self.key.multiply(key.serialize())).serialize()
def encrypt(self, message: bytes) -> bytes:
"""
Electrum ECIES (aka BIE1) encryption
"""
# generate an ephemeral EC private key in order to derive shared secret (ECDH key)
ephemeral_private_key = PrivateKey()
# derive ECDH key
ecdh_key: bytes = self.derive_shared_secret(ephemeral_private_key)
# SHA512(ECDH_KEY), then we have
# key_e and iv used in AES, key_m used in HMAC.SHA256
key: bytes = hashlib.sha512(ecdh_key).digest()
iv, key_e, key_m = key[0:16], key[16:32], key[32:]
# make AES encryption
cipher: bytes = aes_encrypt_with_iv(key_e, iv, message)
# encrypted = magic_bytes (4 bytes) + ephemeral_public_key (33 bytes) + cipher (16 bytes at least)
encrypted: bytes = b"BIE1" + ephemeral_private_key.public_key().serialize() + cipher
# mac = HMAC_SHA256(encrypted), 32 bytes
mac: bytes = hmac.new(key_m, encrypted, hashlib.sha256).digest()
# give out encrypted + mac
return encrypted + mac
def encrypt_text(self, text: str) -> str:
"""
:returns: BIE1 encrypted text, base64 encoded
"""
message: bytes = text.encode("utf-8")
return b64encode(self.encrypt(message)).decode("ascii")
def derive_child(self, private_key: PrivateKey, invoice_number: str) -> PublicKey:
"""
derive a child key with BRC-42
:param private_key: the private key of the other party
:param invoice_number: the invoice number used to derive the child key
:return: the derived child key
"""
shared_key = self.derive_shared_secret(private_key)
hashing = hmac_sha256(shared_key, invoice_number.encode("utf-8"))
point = curve_multiply(int.from_bytes(hashing, "big"), curve.g)
final_point = curve_add(self.point(), point)
return PublicKey(final_point)
def __eq__(self, o: object) -> bool:
if isinstance(o, PublicKey):
return self.key == o.key
return super().__eq__(o) # pragma: no cover
def __str__(self) -> str: # pragma: no cover
return f"<PublicKey hex={self.hex()}>"
def __repr__(self) -> str: # pragma: no cover
return self.__str__()
[docs]
class PrivateKey:
[docs]
def __init__(self, private_key: str | int | bytes | CcPrivateKey | None = None, network: Network | None = None):
"""
create private key from WIF (str), or int, or bytes, or CoinCurve private key
random a new private key if None
"""
self.network: Network = network or Network.MAINNET
self.compressed: bool = True # use compressed WIF by default
if private_key is None:
# create a new private key
self.key: CcPrivateKey = CcPrivateKey()
elif isinstance(private_key, CcPrivateKey):
# from CoinCurve private key
self.key: CcPrivateKey = private_key
else:
if isinstance(private_key, str):
# from wif
private_key_bytes, self.compressed, self.network = decode_wif(private_key)
self.key: CcPrivateKey = CcPrivateKey(private_key_bytes)
elif isinstance(private_key, int):
# from private key as int
self.key: CcPrivateKey = CcPrivateKey.from_int(private_key)
elif isinstance(private_key, bytes):
# from private key integer in bytes
self.key: CcPrivateKey = CcPrivateKey(private_key)
else:
raise TypeError("unsupported private key type")
[docs]
def public_key(self) -> PublicKey:
return PublicKey(self.key.public_key.format(self.compressed))
[docs]
def address(self, compressed: bool | None = None, network: Network | None = None) -> str:
"""
:returns: P2PKH address corresponding to this private key
"""
compressed = self.compressed if compressed is None else compressed
network = network or self.network
return self.public_key().address(compressed, network)
[docs]
def wif(self, compressed: bool | None = None, network: Network | None = None) -> str:
compressed = self.compressed if compressed is None else compressed
network = network or self.network
key_bytes = self.serialize()
compressed_bytes = b"\x01" if compressed else b""
return base58check_encode(NETWORK_WIF_PREFIX_DICT.get(network) + key_bytes + compressed_bytes)
[docs]
def int(self) -> int:
return self.key.to_int()
[docs]
def serialize(self) -> bytes:
return self.key.secret
[docs]
def hex(self) -> str:
return self.serialize().hex()
[docs]
def der(self) -> bytes: # pragma: no cover
return self.key.to_der()
[docs]
def pem(self) -> bytes: # pragma: no cover
return self.key.to_pem()
[docs]
def sign(self, message: bytes, hasher: Callable[[bytes], bytes] | None = hash256, k: int | None = None) -> bytes:
"""
:returns: ECDSA signature in bitcoin strict DER (low-s) format
Low-s enforcement: coincurve's sign() calls libsecp256k1 which
normalises signatures to low-s (SECP256K1_EC_NORMALIZED) by default.
For custom k, _sign_custom_k() explicitly enforces low-s.
"""
if k is not None:
return self._sign_custom_k(message, hasher, k)
# coincurve sign() uses libsecp256k1 which enforces low-s automatically. # nosec B105
return self.key.sign(message, hasher)
def _sign_custom_k(self, message: bytes, hasher: Callable[[bytes], bytes], k: int) -> bytes:
# TODO: This could be done using self.key.sign() but the interface needs a custom k value function to be injected into te C binary
# of libsecp256k1, since the default one does some transformations to the value.
# See https://github.com/rustyrussell/secp256k1-py/blob/5bad581d959d722bf6c2df5eaa996fd4c24096aa/tests/test_custom_nonce.py#L51ffi%20=%20FFI()
# https://github.com/bitcoin-core/secp256k1/blob/master/src/secp256k1.c#L518
z = int.from_bytes(hasher(message), "big")
# Ensure k is valid
k = k % curve.n
if k == 0:
raise ValueError("Invalid nonce k")
# Compute R = k * G and obtain its x-coordinate (r)
R = curve_multiply(k, curve.g)
if R is None:
raise ValueError("Invalid R value")
r = R.x
# Compute s = k^(-1) * (z + r * d) mod n
d = int.from_bytes(self.serialize(), "big")
s = (pow(k, -1, curve.n) * (z + r * d)) % curve.n
if s == 0:
raise ValueError("Invalid s value")
# Ensure the signature is canonical (low S value)
if s > curve.n // 2:
s = curve.n - s
# Convert r and s to bytes
r_bytes = r.to_bytes(32, "big")
s_bytes = s.to_bytes(32, "big")
# Add prefix if the MSB is set
if r_bytes[0] & 0x80:
r_bytes = b"\x00" + r_bytes
if s_bytes[0] & 0x80:
s_bytes = b"\x00" + s_bytes
# Serialize the signature in DER format
signature = (
b"\x30"
+ (4 + len(r_bytes) + len(s_bytes)).to_bytes(1, "big")
+ b"\x02"
+ len(r_bytes).to_bytes(1, "big")
+ r_bytes
+ b"\x02"
+ len(s_bytes).to_bytes(1, "big")
+ s_bytes
)
return signature
[docs]
def verify(self, signature: bytes, message: bytes, hasher: Callable[[bytes], bytes] | None = hash256) -> bool:
"""
verify ECDSA signature in bitcoin strict DER (low-s) format
"""
return self.public_key().verify(signature, message, hasher)
[docs]
def sign_recoverable(self, message: bytes, hasher: Callable[[bytes], bytes] | None = hash256) -> bytes:
"""
:returns: serialized recoverable ECDSA signature (aka compact signature) in format
r (32 bytes) + s (32 bytes) + recovery_id (1 byte)
"""
return self.key.sign_recoverable(message, hasher)
[docs]
def verify_recoverable(
self, signature: bytes, message: bytes, hasher: Callable[[bytes], bytes] | None = hash256
) -> bool:
"""
verify serialized recoverable ECDSA signature in format "r (32 bytes) + s (32 bytes) + recovery_id (1 byte)"
"""
return self.public_key().verify_recoverable(signature, message, hasher)
[docs]
def sign_text(self, text: str) -> tuple[str, str]:
"""sign arbitrary text with bitcoin private key
:returns: (p2pkh_address, stringified_recoverable_ecdsa_signature)
This function follows Bitcoin Signed Message Format.
For BRC-77, use signed_message.py instead.
"""
message: bytes = text_digest(text)
return self.address(), stringify_ecdsa_recoverable(self.sign_recoverable(message), self.compressed)
[docs]
def derive_shared_secret(self, key: PublicKey) -> bytes:
return PublicKey(key.key.multiply(self.serialize())).serialize()
[docs]
def decrypt(self, message: bytes) -> bytes:
"""
Electrum ECIES (aka BIE1) decryption
"""
if len(message) < 85:
raise ValidationError("invalid encrypted length")
encrypted, mac = message[:-32], message[-32:]
# encrypted = magic_bytes (4 bytes) + ephemeral_public_key (33 bytes) + cipher_text (16 bytes at least)
magic_bytes, ephemeral_public_key, cipher = encrypted[:4], PublicKey(encrypted[4:37]), encrypted[37:]
if magic_bytes.decode("utf-8") != "BIE1":
raise ValidationError("invalid magic bytes")
# restore ECDH key
ecdh_key = self.derive_shared_secret(ephemeral_public_key)
# restore iv, key_e, key_m
key = hashlib.sha512(ecdh_key).digest()
iv, key_e, key_m = key[0:16], key[16:32], key[32:]
# verify mac — use hmac.compare_digest for constant-time comparison # nosec B105
expected_mac = hmac.new(key_m, encrypted, hashlib.sha256).digest()
if not hmac.compare_digest(expected_mac, mac):
raise ValidationError("incorrect hmac checksum")
# make the AES decryption
return aes_decrypt_with_iv(key_e, iv, cipher)
[docs]
def decrypt_text(self, text: str) -> str:
"""
decrypt BIE1 encrypted, base64 encoded text
"""
message: bytes = b64decode(text)
return self.decrypt(message).decode("utf-8")
[docs]
def encrypt(self, message: bytes) -> bytes: # pragma: no cover
"""
Electrum ECIES (aka BIE1) encryption
"""
return self.public_key().encrypt(message)
[docs]
def encrypt_text(self, text: str) -> str: # pragma: no cover
"""
:returns: BIE1 encrypted text, base64 encoded
"""
return self.public_key().encrypt_text(text)
[docs]
def derive_child(self, public_key: PublicKey, invoice_number: str) -> PrivateKey:
"""
derive a child key with BRC-42
:param public_key: the public key of the other party
:param invoice_number: the invoice number used to derive the child key
:return: the derived child key
"""
shared_key = self.derive_shared_secret(public_key)
hashing = hmac_sha256(shared_key, invoice_number.encode("utf-8"))
return PrivateKey((self.int() + int.from_bytes(hashing, "big")) % curve.n)
def __eq__(self, o: object) -> bool:
if isinstance(o, PrivateKey):
return hmac.compare_digest(self.key.secret, o.key.secret)
return super().__eq__(o) # pragma: no cover
# PrivateKey is not hashable — putting secrets in dict/set risks leaking via hash collisions.
__hash__ = None # type: ignore[assignment]
def __reduce_ex__(self, protocol: int) -> object:
raise TypeError("PrivateKey cannot be pickled — serializing key material defeats in-memory protection")
__reduce__ = __reduce_ex__ # type: ignore[assignment]
def __copy__(self) -> PrivateKey:
raise TypeError("PrivateKey cannot be copied (use explicit construction)")
def __deepcopy__(self, memo: dict) -> PrivateKey:
raise TypeError("PrivateKey cannot be deep-copied (use explicit construction)")
def __str__(self) -> str:
# SECURITY: do not include key bytes, WIF, or integer scalar in repr.
return f"<PrivateKey network={self.network} compressed={self.compressed}>"
def __repr__(self) -> str:
return self.__str__()
[docs]
@classmethod
def from_hex(cls, octets: str | bytes) -> PrivateKey:
b: bytes = octets if isinstance(octets, bytes) else bytes.fromhex(octets)
return PrivateKey(CcPrivateKey(b))
[docs]
@classmethod
def from_der(cls, octets: str | bytes) -> PrivateKey: # pragma: no cover
b: bytes = octets if isinstance(octets, bytes) else bytes.fromhex(octets)
return PrivateKey(CcPrivateKey.from_der(b))
[docs]
@classmethod
def from_pem(cls, octets: str | bytes) -> PrivateKey: # pragma: no cover
b: bytes = octets if isinstance(octets, bytes) else bytes.fromhex(octets)
return PrivateKey(CcPrivateKey.from_pem(b))
def verify_signed_text(
text: str, address: str, signature: str, hasher: Callable[[bytes], bytes] | None = hash256
) -> bool:
"""
verify signed arbitrary text
"""
serialized_recoverable, compressed = unstringify_ecdsa_recoverable(signature)
r, s, _ = deserialize_ecdsa_recoverable(serialized_recoverable)
message: bytes = text_digest(text)
public_key: PublicKey = recover_public_key(serialized_recoverable, message, hasher)
der: bytes = serialize_ecdsa_der((r, s))
return public_key.verify(der, message, hasher) and public_key.address(compressed=compressed) == address
def recover_public_key(
signature: bytes, message: bytes, hasher: Callable[[bytes], bytes] | None = hash256
) -> PublicKey:
"""
recover public key from serialized recoverable ECDSA signature in format
"r (32 bytes) + s (32 bytes) + recovery_id (1 byte)"
"""
return PublicKey(CcPublicKey.from_signature_and_message(signature, message, hasher))