# SPDX-License-Identifier: GPLv3-or-later
# Copyright © 2025 pygaindalf Rui Pinheiro
import random
import re
import sys
from collections.abc import Hashable
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any, ClassVar, Protocol, Self, override, runtime_checkable
from pydantic import GetCoreSchemaHandler
from pydantic_core import CoreSchema, core_schema
from ...util.helpers import script_info
if TYPE_CHECKING:
from ..models.entity import Entity, EntityRecord
UID_SEPARATOR = ":"
UID_ID_REGEX = re.compile(r"^[a-zA-Z0-9@_#-]+$")
# MARK: Uid Class
[docs]
@dataclass(frozen=True, slots=True)
class Uid:
namespace: str = "DEFAULT"
id: Hashable = field(default_factory=lambda: random.getrandbits(sys.hash_info.width))
def __post_init__(self) -> None:
if not self.namespace or not isinstance(self.namespace, str):
msg = "Namespace must be a non-empty string."
raise ValueError(msg)
if re.search(UID_ID_REGEX, self.namespace) is None:
msg = f"ID '{self.namespace}' is not valid. It must match the pattern '{UID_ID_REGEX.pattern}'."
raise ValueError(msg)
if self.id is None:
msg = "ID must be an integer or string."
raise ValueError(msg)
if re.search(UID_ID_REGEX, self.id_as_str) is None:
msg = f"ID '{self.id}' is not valid. When converted to string, it must match the pattern '{UID_ID_REGEX.pattern}'."
raise ValueError(msg)
[docs]
def as_tuple(self) -> tuple[str, Hashable]:
return (self.namespace, self.id)
@property
def id_as_str(self) -> str:
"""Returns the ID as a string, suitable for display."""
if isinstance(self.id, int):
return format(self.id, "x")
else:
return str(self.id)
@override
def __hash__(self) -> int:
return hash(self.as_tuple())
@override
def __eq__(self, other: object) -> bool:
if not isinstance(other, Uid):
return False
return self.as_tuple() == other.as_tuple()
@override
def __ne__(self, other: object) -> bool:
return not self.__eq__(other)
def __lt__(self, other: object) -> bool:
if not isinstance(other, Uid):
return NotImplemented
return self.as_tuple() < other.as_tuple()
def __le__(self, other: object) -> bool:
if not isinstance(other, Uid):
return NotImplemented
return self.as_tuple() <= other.as_tuple()
def __gt__(self, other: object) -> bool:
if not isinstance(other, Uid):
return NotImplemented
return self.as_tuple() > other.as_tuple()
def __ge__(self, other: object) -> bool:
if not isinstance(other, Uid):
return NotImplemented
return self.as_tuple() >= other.as_tuple()
@property
def entity_or_none(self) -> Entity | None:
from ..models.entity import Entity
return Entity.by_uid_or_none(self)
@property
def entity(self) -> Entity:
from ..models.entity import Entity
return Entity.by_uid(self)
@property
def record_or_none(self) -> EntityRecord | None:
from ..models.entity import EntityRecord
return EntityRecord.by_uid_or_none(self)
@property
def record(self) -> EntityRecord:
from ..models.entity import EntityRecord
return EntityRecord.by_uid(self)
@override
def __str__(self) -> str:
return f"{self.namespace}{UID_SEPARATOR}{self.id_as_str}"
@override
def __repr__(self) -> str:
return f"<Uid: {self!s}>"
@classmethod
def __get_pydantic_core_schema__(cls, source: type[Any], handler: GetCoreSchemaHandler) -> CoreSchema:
return core_schema.is_instance_schema(
cls,
serialization=core_schema.to_string_ser_schema(
when_used="always",
),
)
[docs]
@classmethod
def from_string(cls, value: str) -> Uid:
if UID_SEPARATOR not in value:
msg = f"Invalid UID string: '{value}'"
raise ValueError(msg)
namespace, id_str = value.split(UID_SEPARATOR, maxsplit=1)
# Try to parse id_str as an integer (hex), otherwise keep as string
try:
id_value: Hashable = int(id_str, 16)
except ValueError:
id_value = id_str
return Uid(namespace=namespace, id=id_value)
[docs]
@classmethod
def from_value(cls, value: Any) -> Uid:
if isinstance(value, Uid):
return value
elif isinstance(value, str):
return cls.from_string(value)
else:
msg = f"Cannot create Uid from value of type '{type(value).__name__}'"
raise TypeError(msg)
# MARK: Incrementing Uid Factory
[docs]
class IncrementingUidFactory:
_instance: ClassVar[IncrementingUidFactory]
counters: dict[str, int]
[docs]
def __new__(cls) -> Self:
instance = getattr(cls, "_instance", None)
if not instance:
instance = cls._instance = super().__new__(cls)
return instance
[docs]
def __init__(self) -> None:
# Ensure singleton behavior for each namespace
if not hasattr(self, "namespace"):
self.counters = {}
[docs]
def next(self, namespace: str, /, *, increment: bool = True) -> Uid:
# Warning: This method is not thread-safe.
counter = self.counters.get(namespace, 1)
uid = Uid(namespace=namespace, id=counter)
if increment:
self.counters[namespace] = counter + 1
return uid
if script_info.is_unit_test():
def reset(self) -> None:
self.counters.clear()
# MARK: Protocol
[docs]
@runtime_checkable
class UidProtocol(Protocol):
@property
def uid(self) -> Uid: ...