Source code for debusine.db.models.workers

# Copyright © The Debusine Developers
# See the AUTHORS file at the top-level directory of this distribution
#
# This file is part of Debusine. It is subject to the license terms
# in the LICENSE file found in the top-level directory of this
# distribution. No part of Debusine, including this file, may be copied,
# modified, propagated, or distributed except according to the terms
# contained in the LICENSE file.

"""Data models for db workers."""

import copy
import hashlib
from typing import Any, Optional, TYPE_CHECKING

from django.db import IntegrityError, models, transaction
from django.db.models import CheckConstraint, Count, F, JSONField, Q, QuerySet
from django.utils import timezone
from django.utils.text import slugify

from debusine.db.models import WorkRequest
from debusine.db.models.auth import Token
from debusine.tasks.models import WorkerType

if TYPE_CHECKING:
    from django_stubs_ext.db.models import TypedModelMeta
else:
    TypedModelMeta = object


class WorkerManager(models.Manager["Worker"]):
    """Manager for Worker model."""

    def connected(self) -> QuerySet["Worker"]:
        """Return connected workers."""
        return Worker.objects.filter(connected_at__isnull=False).order_by(
            'connected_at'
        )

    def waiting_for_work_request(self) -> QuerySet["Worker"]:
        """
        Return workers that can be assigned a new work request.

        The workers with fewer associated pending or running work requests
        than their concurrency level could take more work right now and are
        thus waiting for a work request.

        Worker's token must be enabled.
        """
        # Import here to prevent circular imports
        from debusine.db.models.work_requests import WorkRequest

        running_work_request_count = Count(
            'assigned_work_requests',
            filter=Q(
                assigned_work_requests__status__in=[
                    WorkRequest.Statuses.RUNNING,
                    WorkRequest.Statuses.PENDING,
                ]
            ),
        )
        workers = (
            Worker.objects.filter(connected_at__isnull=False)
            .order_by('connected_at')
            .annotate(count_running=running_work_request_count)
            .filter(count_running__lt=F("concurrency"))
            .filter(Q(worker_type=WorkerType.CELERY) | Q(token__enabled=True))
        )

        return workers

    @staticmethod
    def _generate_unique_name(name: str, counter: int) -> str:
        """Return name slugified adding "-counter" if counter != 1."""
        new_name = slugify(name.replace('.', '-'))

        if counter != 1:
            new_name += f'-{counter}'

        return new_name

    @classmethod
    def create_with_fqdn(
        cls,
        fqdn: str,
        token: Token,
        worker_type: WorkerType = WorkerType.EXTERNAL,
    ) -> "Worker":
        """Return a new Worker with its name based on fqdn, with token."""
        counter = 1

        while True:
            name = cls._generate_unique_name(fqdn, counter)
            try:
                with transaction.atomic():
                    return Worker.objects.create(
                        name=name,
                        token=token,
                        worker_type=worker_type,
                        registered_at=timezone.now(),
                    )
            except IntegrityError:
                counter += 1

    @classmethod
    def get_or_create_celery(cls) -> "Worker":
        """Return a new Worker representing the Celery task queue."""
        try:
            return Worker.objects.get(
                name="celery", worker_type=WorkerType.CELERY
            )
        except Worker.DoesNotExist:
            return Worker.objects.create(
                name="celery",
                worker_type=WorkerType.CELERY,
                registered_at=timezone.now(),
            )

    def get_worker_by_token_key_or_none(
        self, token_key: str
    ) -> Optional["Worker"]:
        """Return a Worker identified by its associated secret token."""
        try:
            token_hash = hashlib.sha256(token_key.encode()).hexdigest()
            return Worker.objects.get(token__hash=token_hash)
        except Worker.DoesNotExist:
            return None

    def get_worker_or_none(self, worker_name: str) -> Optional["Worker"]:
        """Return the worker with worker_name or None."""
        try:
            return self.get(name=worker_name)
        except Worker.DoesNotExist:
            return None


[docs]class Worker(models.Model): """Database model of a worker.""" name = models.SlugField( unique=True, help_text='Human readable name of the worker based on the FQDN', ) registered_at = models.DateTimeField() connected_at = models.DateTimeField(blank=True, null=True) # This is the token used by the Worker to authenticate # Users have their own tokens - this is specific to a single worker. token = models.OneToOneField( Token, null=True, on_delete=models.PROTECT, related_name="worker" ) static_metadata = JSONField(default=dict, blank=True) dynamic_metadata = JSONField(default=dict, blank=True) dynamic_metadata_updated_at = models.DateTimeField(blank=True, null=True) worker_type = models.CharField( max_length=8, choices=WorkerType.choices, default=WorkerType.EXTERNAL, editable=False, ) # Only Celery workers currently support concurrency levels greater than # 1. concurrency = models.PositiveIntegerField( default=1, help_text="Number of tasks this worker can run simultaneously", ) class Meta(TypedModelMeta): constraints = [ # Non-Celery workers must have a token. CheckConstraint( name="%(app_label)s_%(class)s_celery_or_token", check=Q(worker_type=WorkerType.CELERY) | Q(token__isnull=False), ) ]
[docs] def mark_disconnected(self) -> None: """Update and save relevant Worker fields after disconnecting.""" self.connected_at = None self.save()
[docs] def running_work_requests(self) -> QuerySet["WorkRequest"]: """Return queryset of work requests running on this worker.""" return self.assigned_work_requests.filter( status=WorkRequest.Statuses.RUNNING ).order_by("id")
[docs] def mark_connected(self) -> None: """Update and save relevant Worker fields after connecting.""" self.connected_at = timezone.now() self.save()
[docs] def connected(self) -> bool: """Return True if the Worker is connected.""" return self.connected_at is not None
[docs] def is_busy(self) -> bool: """ Return True if the Worker is busy with work requests. A Worker is busy if it has as many running or pending work requests as its concurrency level. """ # Import here to prevent circular imports from debusine.db.models.work_requests import WorkRequest return ( WorkRequest.objects.running(worker=self) | WorkRequest.objects.pending(worker=self) ).count() >= self.concurrency
[docs] def metadata(self) -> dict[str, Any]: """ Return all metadata with static_metadata and dynamic_metadata merged. If the same key is in static_metadata and dynamic_metadata: static_metadata takes priority. """ return { **copy.deepcopy(self.dynamic_metadata), **copy.deepcopy(self.static_metadata), }
[docs] def set_dynamic_metadata(self, metadata: dict[str, Any]) -> None: """Save metadata and update dynamic_metadata_updated_at.""" self.dynamic_metadata = metadata self.dynamic_metadata_updated_at = timezone.now() self.save()
def __str__(self) -> str: """Return the id and name of the Worker.""" return f"Id: {self.id} Name: {self.name}" objects = WorkerManager()