Source code for ewokscore.hashing

import random
import hashlib
from typing import Any, Optional, Type, Union
from import Mapping, Iterable, Set
import numpy
from ewoksutils.import_utils import qualname
from . import missing_data

def classhashdata(cls: Type) -> bytes:
    return qualname(cls).encode()

def multitype_sorted(sequence: Iterable, key=None) -> list:
        return sorted(sequence, key=key)
    except TypeError:
    if key is None:

        def key(item):
            return item

    adict = dict()
    for item in sequence:
        typename = type(key(item)).__name__
        adict.setdefault(typename, list()).append(item)

    return [
        for _, items in sorted(adict.items(), key=lambda tpl: tpl[0])
        for item in sorted(items, key=key)

class UniversalHash:
    def __init__(self, hexdigest: Union[str, bytes]):
        if isinstance(hexdigest, bytes):
            hexdigest = hexdigest.decode()
        if not isinstance(hexdigest, str):
            raise TypeError(hexdigest, type(hexdigest))
        self._hexdigest = hexdigest

    def __hash__(self):
        # make it python hashable (to use in sets and dict keys)
        return hash(self._hexdigest)

    def __repr__(self):
        return "UniversalHash('{}')".format(self)

    def __str__(self):
        return self._hexdigest

    def __eq__(self, other):
        return str(self) == str(other)

    def __lt__(self, other):
        return str(self) < str(other)

def uhash(value, _hash=None) -> UniversalHash:
    """Universial hash (as opposed to python's `hash`)."""
    # Avoid using python's hash!
    bdigest = _hash is None
    if bdigest:
        _hash = hashlib.sha256()
    if value is None:
    elif isinstance(value, HasUhash):
    elif isinstance(value, UniversalHash):
    elif isinstance(value, bytes):
    elif isinstance(value, str):
    elif isinstance(value, int):
    elif isinstance(value, float):
    elif isinstance(value, (numpy.ndarray, numpy.number)):
    elif isinstance(value, Mapping):
        lst = multitype_sorted(value.items(), key=lambda item: item[0])
        if lst:
            keys, values = zip(*lst)
            keys = values = list()
        uhash(keys, _hash=_hash)
        uhash(values, _hash=_hash)
    elif isinstance(value, Set):
        values = multitype_sorted(value)
        uhash(values, _hash=_hash)
    elif isinstance(value, Iterable):
        # Ordered
        for v in value:
            uhash(v, _hash=_hash)
        # TODO: register custom types
        raise TypeError(f"cannot uhash {value} (type: {type(value)})")
    if bdigest:
        return UniversalHash(_hash.hexdigest())

class HasUhash:
    def uhash(self) -> Optional[UniversalHash]:
        raise NotImplementedError

    def __hash__(self):
        # make it python hashable (to use in sets and dict keys)
        uhash = self.uhash
        if uhash is None:
            return hash(id(self))
            return hash(uhash)

    def __eq__(self, other):
        if isinstance(other, HasUhash):
            uhash = other.uhash
        elif isinstance(other, UniversalHash):
            uhash = other
            raise TypeError(other, type(other))
        return self.uhash == uhash

    def _get_repr_data(self) -> dict:
        data = dict()
        uhash = self.uhash
        if uhash is None:
            data["uhash"] = None
            data["uhash"] = repr(str(uhash))
        return data

    def __repr__(self):
        data = self._get_repr_data()
        if data:
            sdata = ", ".join([f"{k}={v}" for k, v in data.items()])
            return f"{super().__repr__()}({sdata})"
            return super().__repr__()

    def __str__(self):
        data = self._get_repr_data()
        if data:
            sdata = ", ".join([f"{k}={v}" for k, v in data.items()])
            return f"{qualname(type(self))}({sdata})"
            return qualname(type(self))

PreUhashTypes = Union[str, bytes, UniversalHash, HasUhash]

class UniversalHashable(HasUhash):
    """The universal hash of an instance of this class is based on:

     * pre-uhash
     * instance nonce (if any)

    The universal hash is equal to the pre-hash when an instance nonce is not provided.

    The pre-uhash is either provided or based on:

     * data
     * class nonce (class qualifier name, class version, superclass nonce)

    __CLASS_NONCE = None
    __VERSION = None
    MISSING_DATA = missing_data.MISSING_DATA

    def __init__(
        pre_uhash: Optional[PreUhashTypes] = None,
        instance_nonce: Optional[Any] = None,
        self.set_uhash_init(pre_uhash=pre_uhash, instance_nonce=instance_nonce)

    def __init_subclass__(subcls, version=None, **kwargs):
        supercls_data = subcls.class_nonce()
        subcls.__VERSION = version
        subcls_data = subcls.class_nonce_data()
        subcls.__CLASS_NONCE = str(uhash((subcls_data, supercls_data)))

    def set_uhash_init(
        pre_uhash: Optional[PreUhashTypes] = None,
        instance_nonce: Optional[Any] = None,
        self.__original_pre_uhash = self.__pre_uhash
        self.__instance_nonce = instance_nonce
        self.__original__instance_nonce = instance_nonce

    def get_uhash_init(self, serialize=False):
        pre_uhash = self.__original_pre_uhash
        if serialize:
            if isinstance(pre_uhash, HasUhash):
                pre_uhash = str(pre_uhash.uhash)
            elif isinstance(pre_uhash, UniversalHash):
                pre_uhash = str(pre_uhash)
        return {
            "pre_uhash": pre_uhash,
            "instance_nonce": self.__original__instance_nonce,

    def __set_pre_uhash(self, pre_uhash):
        if pre_uhash is None:
            self.__pre_uhash = None
        elif isinstance(pre_uhash, (str, bytes)):
            self.__pre_uhash = UniversalHash(pre_uhash)
        elif isinstance(pre_uhash, (UniversalHash, HasUhash)):
            self.__pre_uhash = pre_uhash
            self.__pre_uhash = uhash(pre_uhash)

    def class_nonce(cls):
        return cls.__CLASS_NONCE

    def class_nonce_data(cls):
        return qualname(cls), cls.__VERSION

    def instance_nonce(self):
        return self.__instance_nonce

    def fix_uhash(self):
        """Fix the uhash when it is derived from the uhash data."""
        if self.__pre_uhash is not None:
        keep, self.__instance_nonce = self.__instance_nonce, None
            pre_uhash = self.uhash
            self.__instance_nonce = keep

    def undo_fix_uhash(self):
        self.__pre_uhash = self.__original_pre_uhash

    def cleanup_references(self):
        """Remove all references to other hashables.
        Side effect: fixes the uhash when it depends on another hashable.
        if isinstance(self.__pre_uhash, HasUhash):
            pre_uhash = self.__pre_uhash.uhash
            self.__pre_uhash = pre_uhash
            self.__original_pre_uhash = pre_uhash

    def uhash(self) -> Optional[UniversalHash]:
        _uhash = self.__pre_uhash
        if _uhash is None:
            data = self._uhash_data()
            if missing_data.is_missing_data(data):
                return None
            cnonce = self.class_nonce()
            inonce = self.instance_nonce()
            if inonce is None:
                return uhash((data, cnonce))
                return uhash((data, cnonce, inonce))
            if isinstance(_uhash, HasUhash):
                _uhash = _uhash.uhash
                if _uhash is None:
                    return None
            inonce = self.instance_nonce()
            if inonce is None:
                return _uhash
                return uhash((_uhash, inonce))

    def _uhash_data(self):
        return self.MISSING_DATA

    def uhash_randomize(self):
        self.__instance_nonce = random.randint(-1e100, 1e100)

    def undo_randomize(self):
        self.__instance_nonce = self.__original__instance_nonce