# Copyright © The Debusine Developers
# See the AUTHORS file at the top-level directory of this distribution
#
# This file is part of Debusine. It is subject to the license terms
# in the LICENSE file found in the top-level directory of this
# distribution. No part of Debusine, including this file, may be copied,
# modified, propagated, or distributed except according to the terms
# contained in the LICENSE file.
"""Data models for db scopes."""
import re
from collections.abc import Collection
from typing import (
Any,
Generic,
TYPE_CHECKING,
TypeAlias,
TypeVar,
Union,
assert_never,
cast,
)
from django.core.exceptions import ValidationError
from django.db import models
from django.db.models import QuerySet, UniqueConstraint
from debusine.db.context import context
from debusine.db.models import permissions
from debusine.db.models.permissions import (
PartialCheckResult,
PermissionUser,
ROLES,
permission_check,
permission_filter,
)
if TYPE_CHECKING:
from django.contrib.auth.models import AnonymousUser
from django_stubs_ext.db.models import TypedModelMeta
from debusine.db.models.auth import User
else:
TypedModelMeta = object
#: Name of the fallback scope used for transitioning to scoped models
FALLBACK_SCOPE_NAME = "debusine"
#: Scope names reserved for use in toplevel URL path components
RESERVED_SCOPE_NAMES = frozenset(
(
"accounts",
"admin",
"api",
"api-auth",
"artifact",
"task-status",
"user",
"workers",
"work-request",
"workspace",
)
)
#: Regexp matching the structure of scope names
scope_name_regex = re.compile(r"^[A-Za-z][A-Za-z0-9+._-]*$")
def is_valid_scope_name(value: str) -> bool:
"""Check if value is a valid scope name."""
if value in RESERVED_SCOPE_NAMES:
return False
return bool(scope_name_regex.match(value))
def validate_scope_name(value: str) -> None:
"""Validate scope names."""
if not is_valid_scope_name(value):
raise ValidationError(
"%(value)r is not a valid scope name", params={"value": value}
)
A = TypeVar("A")
class ScopeQuerySet(QuerySet["Scope", A], Generic[A]):
"""Custom QuerySet for Scope."""
@permission_filter
def can_display(
self, user: PermissionUser # noqa: U100
) -> "ScopeQuerySet[A]":
"""Keep only Scopes that can be displayed."""
assert user is not None # Enforced by decorator
return self
@permission_filter
def can_create_workspace(self, user: PermissionUser) -> "ScopeQuerySet[A]":
"""Keep only Scopes where the user can create workspaces."""
assert user is not None # Enforced by decorator
if not user.is_authenticated:
return self.none()
return self.filter(ROLES(user, Scope.Roles.OWNER))
class ScopeManager(models.Manager["Scope"]):
"""Manager for Scope model."""
def get_roles_model(self) -> type["ScopeRole"]:
"""Get the model used for role assignment."""
return ScopeRole
def get_queryset(self) -> ScopeQuerySet[Any]:
"""Use the custom QuerySet."""
return ScopeQuerySet(self.model, using=self._db)
class ScopeRoles(permissions.Roles):
"""Available roles for a Scope."""
OWNER = "owner", "Owner"
[docs]class Scope(models.Model):
"""
Scope model.
This is used to create different distinct sets of groups and workspaces
"""
Roles: TypeAlias = ScopeRoles
objects = ScopeManager.from_queryset(ScopeQuerySet)()
name = models.CharField(
max_length=255,
unique=True,
validators=[validate_scope_name],
help_text="internal name for the scope",
)
label = models.CharField(
max_length=255,
unique=True,
help_text="User-visible name for the scope",
)
icon = models.CharField(
max_length=255,
default="",
blank=True,
help_text=(
"Optional user-visible icon,"
" resolved via ``{% static %}`` in templates"
),
)
def __str__(self) -> str:
"""Return basic information of Scope."""
return self.name
[docs] @permission_check("{user} cannot display scope {resource}")
def can_display(self, user: PermissionUser) -> bool: # noqa: U100
"""Check if the scope can be displayed."""
assert user is not None # enforced by decorator
return True
[docs] @permission_check("{user} cannot create workspaces in {resource}")
def can_create_workspace(self, user: PermissionUser) -> bool:
"""Check if the user can create workspaces in this scope."""
assert user is not None # enforced by decorator
# Token is not taken into account here
if not user.is_authenticated:
return False
# Shortcut to avoid hitting the database for common cases
match self.context_has_role(user, Scope.Roles.OWNER):
case PartialCheckResult.ALLOW:
return True
case PartialCheckResult.DENY:
return False
case PartialCheckResult.PASS:
pass
case _ as unreachable:
assert_never(unreachable)
return (
Scope.objects.can_create_workspace(user).filter(pk=self.pk).exists()
)
[docs] def context_has_role(
self, user: "User", roles: ScopeRoles | Collection[ScopeRoles]
) -> PartialCheckResult:
"""
Check user roles in the current context.
:returns:
* ALLOW if the context has enough information to determine that the
user has at least one of the given roles
* DENY if the context has enough information to determine that the
user does not have any of the given roles
* PASS if the context does not have enough information to decide
"""
if not roles:
raise ValueError("context_has_role needs at least one role")
if context.user != user or context.scope != self:
return PartialCheckResult.PASS
if isinstance(roles, ScopeRoles):
roles = (roles,)
for role in roles:
if role in context.scope_roles:
return PartialCheckResult.ALLOW
return PartialCheckResult.DENY
# See https://github.com/typeddjango/django-stubs/issues/1047 for the typing
[docs] def get_roles(
self, user: Union["User", "AnonymousUser"]
) -> QuerySet["ScopeRole", "ScopeRoles"]:
"""Get the roles of the user on this scope."""
if not user.is_authenticated:
result = ScopeRole.objects.none().values_list("role", flat=True)
else:
result = (
ScopeRole.objects.filter(resource=self, group__users=user)
.values_list("role", flat=True)
.distinct()
)
# QuerySet sees a CharField, but we know it's a ScopeRoles enum
return cast(QuerySet["ScopeRole", "ScopeRoles"], result)
class ScopeRole(models.Model):
"""Role assignments for scopes."""
Roles: TypeAlias = ScopeRoles
resource = models.ForeignKey(
Scope,
on_delete=models.CASCADE,
related_name="roles",
)
group = models.ForeignKey(
"Group",
on_delete=models.CASCADE,
related_name="scope_roles",
)
role = models.CharField(max_length=16, choices=Roles.choices)
class Meta(TypedModelMeta):
constraints = [
UniqueConstraint(
fields=["resource", "group", "role"],
name="%(app_label)s_%(class)s_unique_resource_group_role",
),
]
def __str__(self) -> str:
"""Return a description of the role assignment."""
return f"{self.group}─{self.role}⟶{self.resource}"