Source code for app.util.helpers.callguard

# SPDX-License-Identifier: GPLv3-or-later
# Copyright © 2025 pygaindalf Rui Pinheiro

import logging
import inspect
import functools
import dataclasses
import re

from types import FrameType
from typing import (Any, Iterable, ParamSpec, Concatenate, TypedDict, Unpack, overload, Literal, Annotated, Protocol, runtime_checkable, Mapping,
    cast as typing_cast,
)
from collections.abc import Callable
import pydantic
from typing_extensions import runtime
from pydantic import BaseModel

from ..logging import getLogger


# MARK: Configuration
CALLGUARD_ENABLED = True # Global enable/disable switch
CALLGUARD_COMPARE_MODULE = True # Whether to compare caller and callee modules
CALLGUARD_COMPARE_SELF = True # Whether to compare 'self' or 'cls' instances (if False, will skip self/cls comparison)
CALLGUARD_SELF_IS_FIRST_ARGUMENT = True # Whether to assume the first argument is 'self' or 'cls' (if False, will use introspection to find the first argument)
CALLGUARD_STRICT_SELF = True # Whether to enforce that the first argument is named 'self' or 'cls'

LOG = getLogger(__name__)
LOG.disabled = True


# MARK: Frame inspection utilities
[docs] def get_execution_frame(*, frames_up : int = 0) -> FrameType: # WARNING: It is important to 'del frame' once you are done with the frame! if frames_up < 0: raise ValueError("frames_up must be non-negative") n_frames = frames_up frame = inspect.currentframe() if frame is None: raise RuntimeError("No current frame available") try: while True: # Get the next frame up the stack next_frame = frame.f_back if next_frame is None: raise RuntimeError("No caller frame available") n_frames -= 1 del frame frame = next_frame if n_frames <= 0: return frame except: del frame raise
[docs] def get_execution_frame_module(frame : FrameType) -> str | None: return frame.f_globals.get("__name__", None)
[docs] def get_execution_frame_self_varname(frame : FrameType) -> str | Iterable[str] | None: if not CALLGUARD_SELF_IS_FIRST_ARGUMENT: return ('self', 'cls') # Get the name of the first argument of the function/method in this frame code = frame.f_code if code.co_argcount <= 0: return None result = code.co_varnames[0] if CALLGUARD_STRICT_SELF: if result not in ('self', 'cls'): return None return result
[docs] def get_execution_frame_self(frame : FrameType) -> object | None: self_varnames = get_execution_frame_self_varname(frame) if self_varnames is None: return None if isinstance(self_varnames, str): self_varnames = (self_varnames,) for self_varname in self_varnames: result = frame.f_locals.get(self_varname, None) if result is not None: return result return None
# MARK: Option types type CallguardWrapped[T,**P, R] = Callable[Concatenate[T,P], R]
[docs] @dataclasses.dataclass(slots=True) class CallguardHandlerInfo[T : object, **P, R]: method_name : str check_module : bool check_self : bool caller_frame : FrameType callee_frame : FrameType caller_self : object callee_self : object caller_module : str callee_module : str default_checker : 'Callable[[CallguardHandlerInfo], bool]' default_handler : 'Callable[Concatenate[T, Callable[Concatenate[T,P], R], CallguardHandlerInfo, P], R]'
[docs] class CallguardOptions(TypedDict, total=False): method_name : str check_module : bool
[docs] class CallguardClassOptions(TypedDict, total=False): force : bool private_methods : bool public_methods : bool dunder_methods : bool pydantic_decorators : bool ignore_patterns : Iterable[str | re.Pattern[str]]
def _callguard_enabled(obj : Any = None) -> bool: result = CALLGUARD_ENABLED and not getattr(obj, '__callguard_disabled__', False) and not getattr(obj, '__callguarded__', False) if not result: LOG.debug(f"Callguard: Object {obj.__name__} is not callguard-enabled, skipping") return result # MARK: Method decorator
[docs] def default_callguard_checker[T : object, **P, R](info : CallguardHandlerInfo[T,P,R]) -> bool: if LOG.isEnabledFor(logging.DEBUG): LOG.debug(f"Caller frame: {info.caller_frame.f_code.co_name} in {info.caller_module}") callee_name = info.callee_frame.f_locals.get('method_name', info.callee_frame.f_code.co_name) LOG.debug(f"Callee frame: {callee_name} in {info.callee_module}") if CALLGUARD_COMPARE_MODULE and info.check_module: if info.caller_module != info.callee_module: LOG.error(f"Module mismatch: caller {info.caller_module}, callee {info.callee_module}") return False if CALLGUARD_COMPARE_SELF and info.check_self: if info.caller_self is not info.callee_self: if ( (not isinstance(info.callee_self, type) or not isinstance(info.caller_self, info.callee_self)) and (not isinstance(info.caller_self, type) or not isinstance(info.callee_self, info.caller_self)) ): LOG.error(f"Self mismatch: caller {info.caller_self}, callee {info.callee_self}") return False return True
[docs] class CallguardCallableDecorator[T : object, **P, R]:
[docs] @runtime_checkable class CallguardHandlerProtocol(Protocol): def __callguard_handler__( self : T, method : CallguardWrapped[T,P,R], info : CallguardHandlerInfo, *args : P.args, **kwargs : P.kwargs ) -> R: ...
[docs] def __init__(self, **options : Unpack[CallguardOptions]): self.options = options
def __call__(self, method : CallguardWrapped[T,P,R]) -> CallguardWrapped[T,P,R]: return self.guard(method, **self.options)
[docs] @staticmethod def default_handler(obj : T, method : CallguardWrapped[T,P,R], info : CallguardHandlerInfo[T,P,R], *args : P.args, **kwargs : P.kwargs) -> R: if not info.default_checker(info): raise RuntimeError(f"Callguard: Unauthorized call to {info.method_name} from module {info.caller_module} and caller {info.caller_self}") return method(obj, *args, **kwargs)
[docs] @staticmethod def guard(method : CallguardWrapped[T,P,R], **callguard_options : Unpack[CallguardOptions]) -> CallguardWrapped[T,P,R]: if not _callguard_enabled(method): return method check_module = callguard_options.get('check_module', False) method_name = callguard_options.get('method_name', method.__name__) @functools.wraps(method) def _wrapper(self : T, *args : P.args, **kwargs : P.kwargs) -> Any: LOG.debug(f"Callguard: Guarding call to {method_name}") callee_frame = get_execution_frame(frames_up=1) if callee_frame is None: raise RuntimeError("No callee frame found") try: caller_frame = callee_frame.f_back if caller_frame is None: raise RuntimeError("No caller frame found") try: caller_module = get_execution_frame_module(caller_frame) if caller_module is None: raise RuntimeError("No caller module found") info : CallguardHandlerInfo[T,P,R] = CallguardHandlerInfo( method_name=method_name, check_module=check_module, check_self=True, caller_frame=caller_frame, callee_frame=callee_frame, caller_self=get_execution_frame_self(caller_frame), callee_self=self, caller_module=caller_module, callee_module=method.__module__, default_checker=default_callguard_checker, default_handler=CallguardCallableDecorator[T,P,R].default_handler, ) try: if isinstance(self, CallguardCallableDecorator[T,P,R].CallguardHandlerProtocol): return self.__callguard_handler__(method, info, *args, **kwargs) else: return info.default_handler(self, method, info, *args, **kwargs) finally: del info finally: del caller_frame finally: del callee_frame setattr(_wrapper, '__callguarded__', True) return _wrapper
[docs] def callguard_callable(**callguard_options : Unpack[CallguardOptions]) -> CallguardCallableDecorator: return CallguardCallableDecorator(**callguard_options)
# MARK: Classmethod Decorator
[docs] class CallguardClassmethodDecorator[T : type, **P, R]: type WrappedClassmethod = classmethod[T, P, R]
[docs] def __init__(self, **options : Unpack[CallguardOptions]): self.options = options
def __call__(self, method : WrappedClassmethod) -> WrappedClassmethod: return self.guard(method, **self.options)
[docs] @staticmethod def guard(method : WrappedClassmethod, **callguard_options : Unpack[CallguardOptions]) -> WrappedClassmethod: if not _callguard_enabled(method): return method callguarded = typing_cast(Callable[Concatenate[T,P],R], CallguardCallableDecorator.guard(method.__func__, **callguard_options)) if callguarded is method.__func__: return method result = classmethod(callguarded) result.__doc__ = method.__doc__ return result
[docs] def callguard_classmethod(**callguard_options : Unpack[CallguardOptions]) -> CallguardClassmethodDecorator: return CallguardClassmethodDecorator(**callguard_options)
# MARK: Property Decorator
[docs] class CallguardPropertyDecorator:
[docs] def __init__(self, **options : Unpack[CallguardOptions]): self.options = options
def __call__(self, prop : property) -> property: return self.guard(prop, **self.options)
[docs] @staticmethod def guard(prop : property, **callguard_options : Unpack[CallguardOptions]) -> property: if not _callguard_enabled(prop): return prop getter = prop.fget setter = prop.fset deleter = prop.fdel orig_name = callguard_options.get('method_name', prop.__name__) callguard_options['method_name'] = f"{orig_name}" callguarded_getter = CallguardCallableDecorator.guard(getter, **callguard_options) if getter else None callguard_options['method_name'] = f"{orig_name}.setter" callguarded_setter = CallguardCallableDecorator.guard(setter, **callguard_options) if setter else None callguard_options['method_name'] = f"{orig_name}.deleter" callguarded_deleter = CallguardCallableDecorator.guard(deleter, **callguard_options) if deleter else None if callguarded_getter is getter and callguarded_setter is setter and callguarded_deleter is deleter: return prop return property( callguarded_getter, callguarded_setter, callguarded_deleter, prop.__doc__ )
[docs] def callguard_property(**callguard_options : Unpack[CallguardOptions]) -> CallguardPropertyDecorator: return CallguardPropertyDecorator(**callguard_options)
# MARK: Class decorator
[docs] class CallguardClassDecorator[T : type]:
[docs] @runtime_checkable class PydanticDescriptorProtocol(Protocol): @property def decorator_info(self): ...
[docs] def __init__(self, **callguard_options: Unpack[CallguardClassOptions]): self.options = callguard_options
def __call__(self, cls: T) -> T: return self.guard(cls, **self.options) # type: ignore
[docs] @staticmethod def guard(klass: T, **callguard_class_options: Unpack[CallguardClassOptions]) -> T: LOG.debug(f"Callguard: Guarding class {klass.__name__}") # Check if we should proceed if not callguard_class_options.get('force', False) and not _callguard_enabled(klass): return klass if getattr(klass, f'_{klass.__name__}__callguarded__', False): # Already callguarded LOG.debug(f"Callguard: Class {klass.__name__} is already callguarded, skipping") return klass # Always force callguarding if inherited from a callguarded class callguard_class_options['force'] = True # Default values wrap_private_methods : bool = callguard_class_options.get('private_methods', True ) wrap_public_methods : bool = callguard_class_options.get('public_methods' , False) wrap_dunder_methods : bool = callguard_class_options.get('dunder_methods' , False) wrap_pydantic_decorators : bool = callguard_class_options.get('pydantic_decorators', False) # If class is a pydantic model, preparee the list of decorators if (not wrap_pydantic_decorators) and issubclass(klass, BaseModel): infos = klass.__pydantic_decorators__ validator_dicts = (getattr(infos, v.name) for v in dataclasses.fields(infos)) pydantic_decorators = tuple(k for d in validator_dicts for k in d.keys()) else: pydantic_decorators = () klass = typing_cast(T, klass) # Pyright gets confused with the issubclass call above, so we reset the klass type here # Patch methods and properties in-place modifications = {} d = klass.__dict__ if not isinstance(d, Mapping): raise ValueError(f"klass must have a __dict__ Mapping, got {type(d)} instead") for name, value in d.items(): # Filter out public/dunder methods if name.startswith('_'): if name.startswith('__') and name.endswith('__'): if not wrap_dunder_methods: continue elif not wrap_private_methods: continue elif not wrap_public_methods: continue if name in pydantic_decorators: LOG.debug(f"Callguard: Skipping {klass.__name__}.{name} as it is a pydantic decorator") continue # Filter out ignored patterns if any(re.match(pattern, name) for pattern in callguard_class_options.get('ignore_patterns', ())): continue # Call custom filter method, if defined callguard_filter_method = getattr(klass, '__callguard_filter__', None) if callguard_filter_method is not None: if not callable(callguard_filter_method): raise RuntimeError(f"Class {klass.__name__} has a non-callable __callguard_filter__ attribute") if not callguard_filter_method(name, value): LOG.debug(f"Callguard: Skipping {klass.__name__}.{name} due to __callguard_filter__") continue callguard_options : CallguardOptions = { 'check_module': name.startswith('__') or name.startswith(f"_{klass.__name__}__"), 'method_name': name, } # Wrap the method/property if isinstance(value, staticmethod): pass # Static methods can't be guarded, as they have no self/cls elif isinstance(value, property): modifications[name] = CallguardPropertyDecorator.guard(value, **callguard_options) elif isinstance(value, classmethod): modifications[name] = CallguardClassmethodDecorator.guard(value, **callguard_options) elif isinstance(value, type): pass # Classes are not recursively guarded elif callable(value): modifications[name] = CallguardCallableDecorator.guard(value, **callguard_options) else: LOG.debug(f"Skipping non-callable, non-property attribute {name} of type {type(value)}") # Apply modifications for name, value in modifications.items(): LOG.debug(f"Callguard: Patching {klass.__name__}.{name}") setattr(klass, name, value) # Mark class as callguarded setattr(klass, f'_{klass.__name__}__callguarded__', True) if not getattr(klass, '__callguarded__', False): LOG.debug(f"Callguard: Marking class {klass.__name__} as callguarded") setattr(klass, '__callguarded__', True) # Inject __init_subclass__ to auto-guard subclasses, if the inheritance chain does not already have it original_init_subclass = d.get('__init_subclass__', None) if original_init_subclass is not None and not isinstance(original_init_subclass, classmethod): raise TypeError(f"Class {klass.__name__} has a non-classmethod __init_subclass__, cannot wrap it") @classmethod def init_subclass_wrapper(subcls): if original_init_subclass is not None: original_init_subclass.__func__(subcls) else: super(klass, subcls).__init_subclass__() CallguardClassDecorator.guard(subcls, **callguard_class_options) setattr(klass, '__init_subclass__', init_subclass_wrapper) # Done return klass
[docs] def callguard_class(**callguard_options : Unpack[CallguardClassOptions]) -> CallguardClassDecorator: return CallguardClassDecorator(**callguard_options)
# MARK: Generic Decorator @overload def callguard[T : property](obj : T, **callguard_options : Unpack[CallguardOptions]) -> T: ... @overload def callguard[T : classmethod](obj : T, **callguard_options : Unpack[CallguardOptions]) -> T: ... @overload def callguard[T : type](obj : T, **callguard_options : Unpack[CallguardClassOptions]) -> T: ... @overload def callguard[T : Callable](obj : T, **callguard_options : Unpack[CallguardOptions]) -> T: ...
[docs] def callguard[T](obj : T, **callguard_options) -> T: if isinstance(obj, staticmethod): raise ValueError("callguard cannot be applied to staticmethods, as they have no self/cls") elif isinstance(obj, property): return CallguardPropertyDecorator.guard(obj, **callguard_options) elif isinstance(obj, classmethod): return CallguardClassmethodDecorator.guard(obj, **callguard_options) elif isinstance(obj, type): return CallguardClassDecorator.guard(obj, **callguard_options) elif callable(obj): return CallguardCallableDecorator.guard(obj, **callguard_options) else: raise TypeError("callguard can only be applied to classes, methods, or properties")
# MARK: No callguard Decorator
[docs] def no_callguard[T : Any](obj : T) -> T: setattr(obj, '__callguard_disabled__', True) return obj
# MARK: Callguard mixin
[docs] class Callguard: if CALLGUARD_ENABLED: def __init_subclass__(cls) -> None: super().__init_subclass__() CallguardClassDecorator.guard(cls)