Source code for hsslms.lmots

# -*- coding: utf-8 -*-
"""LM-OTS One-Time Signatures

For reference see RFC 8554, section 4.
"""
import io
from secrets import token_bytes
from .utils import LMOTS_ALGORITHM_TYPE
from .utils import INVALID, FAILURE
from .utils import D_MESG, D_PBLC
from .utils import coef, cksm, u16str, u8str, u32str


[docs]class LM_OTS_Pub: """A class used to hold the public key of LM-OTS One-Time Signatures (LMOTS) For a reference see RFC 8554, section 4. Args: pubkey (bytes): typecode || I || q || K """ def __init__(self, pubkey): try: self.pubtype = LMOTS_ALGORITHM_TYPE(int.from_bytes(pubkey[:4], 'big')) except ValueError: raise INVALID n = self.pubtype.n if len(pubkey) != 24+n: raise INVALID self.I = pubkey[4:4+16] # 16-byte string self.q = pubkey[20:20+4] self.K = pubkey[24:] self.pubkey = pubkey def _algo4b(self, message, signature): if len(signature) < 4: raise INVALID sigtype = LMOTS_ALGORITHM_TYPE(int.from_bytes(signature[:4], 'big')) if self.pubtype != sigtype: raise INVALID H, n, w, p, ls = sigtype.H, sigtype.n, sigtype.w, sigtype.p, sigtype.ls if len(signature) != 4 + n * (p+1): raise INVALID C = signature[4:4+n] if type(message) is bytes: Q = H(self.I + self.q + D_MESG + C + message) elif type(message) is io.BufferedReader: Q = H(self.I + self.q + D_MESG + C ) try: while True: buffer = message.read(1024**2) Q.update(buffer) if len(buffer) < 1024**2: break message.close() except IOError: raise FAILURE("Error. Cannot read message.") else: raise FAILURE("Invalid message type.") Q = Q.digest()[:n] Qa = Q + cksm(Q, w, n, ls) Kc = H(self.I + self.q + D_PBLC) for i in range(p): a = coef(Qa, i, w) tmp = signature[4+n+i*n : 4+n+(i+1)*n] # y[i] for j in range(a, 2**w - 1): tmp = H(self.I + self.q + u16str(i) + u8str(j) + tmp).digest()[:n] Kc.update(tmp) # z return Kc.digest()[:n] # Kc
[docs] def verify(self, message, signature): """Signature Verification of LMOTS Tries to verify the signature of a message with the public key associated with the class. Args: message (bytes, BufferedReader): Message to be verified with `signature` signature (bytes): Signature belonging to the `message` Raises: INVALID: If signature is invalid. """ Kc = self._algo4b(message, signature) if Kc != self.K: raise INVALID
def __repr__(self): return str(self.pubkey)
[docs]class LM_OTS_Priv: """A class used to hold the private key of LM-OTS One-Time Signatures (LMOTS) For a reference see RFC 8554, section 4. This class can be used to generate the belonging public key `LM_OTS_Pub`. Args: typecode (LMOTS_ALGORITHM_TYPE): Enumeration of Leighton-Micali One-Time-Signatures (LMOTS) algorithm types I (bytes): 16 random bytes q (int): 32-bit number / no. SEED (bytes): 32 random bytes for PRNG for LM_OTS """ def __init__(self, typecode, I, q, SEED): self.I = I self.q = q self.H, self.n, self.w, self.p, self.ls = typecode.H, typecode.n, typecode.w, typecode.p, typecode.ls self.typecode = u32str(typecode.value) self.x = [self.H(self.I + u32str(self.q) + u16str(i) + b'\xff' + SEED).digest()[:self.n] for i in range(self.p)] self.used = False
[docs] def sign(self, message): """Signature Generation of LMOTS Signs a message with the private key associated with the class. Args: message (bytes, BufferedReader): Message to be signed Raises: FAILURE: If a signature has already been computed, or for other technical reason Returns: bytes: The signature to `message`. """ if self.used == True: raise FAILURE("Private key has already been used for signing.") C = token_bytes(self.n) signature = self.typecode + C; if type(message) is bytes: Q = self.H(self.I + u32str(self.q) + D_MESG + C + message) elif type(message) is io.BufferedReader: Q = self.H(self.I + u32str(self.q) + D_MESG + C) try: while True: buffer = message.read(1024**2) Q.update(buffer) if len(buffer) < 1024**2: break message.close() except IOError: raise FAILURE("Error. Cannot read message.") else: raise FAILURE("Invalid message type.") Q = Q.digest()[:self.n] Qa = Q + cksm(Q, self.w, self.n, self.ls) for i in range(self.p): a = coef(Qa, i, self.w) tmp = self.x[i] for j in range(a): tmp = self.H(self.I + u32str(self.q) + u16str(i) + u8str(j) + tmp).digest()[:self.n] signature += tmp # y self.used = True return signature
[docs] def gen_pub_K(self): u32str_q = u32str(self.q) K = self.H(self.I + u32str_q + D_PBLC) u8str_j = [u8str(j) for j in range(2**self.w - 1)] for i in range(self.p): tmp = self.x[i] Iqi = self.I + u32str_q + u16str(i) for j in u8str_j: tmp = self.H(Iqi + j + tmp).digest()[:self.n] K.update(tmp) return K.digest()[:self.n]
[docs] def gen_pub(self): """Computes the public key associated with the private key in this class. Returns: LM_OTS_Pub: The public key belonging to this private key. """ return LM_OTS_Pub(self.typecode + self.I + u32str(self.q) + self.gen_pub_K()) def __repr__(self): return str(self.typecode + self.I + u32str(self.q) + b''.join(self.x))