From 6bc5c9b27d1aa790c6cbf952ea1e5876a5cfbb4a Mon Sep 17 00:00:00 2001 From: Jose Javier Merchante Date: Thu, 6 Nov 2025 13:16:32 +0100 Subject: [PATCH 1/2] Reschedule for active tasks with no active jobs If a task fails to create a job for any reason, it will not be executed. Ensure that every task has a related job. Signed-off-by: Jose Javier Merchante --- src/grimoirelab/core/scheduler/scheduler.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/grimoirelab/core/scheduler/scheduler.py b/src/grimoirelab/core/scheduler/scheduler.py index e94e491..0cd0804 100644 --- a/src/grimoirelab/core/scheduler/scheduler.py +++ b/src/grimoirelab/core/scheduler/scheduler.py @@ -188,6 +188,14 @@ def maintain_tasks() -> None: for task in tasks: job_db = task.jobs.filter(status__in=active_status).order_by("-scheduled_at").first() + if not job_db: + logger.error( + "Task in active status but no active job found; rescheduling", + task_uuid=task.uuid, + ) + _enqueue_task(task, scheduled_at=datetime_utcnow()) + continue + if not _is_job_removed_or_stopped(job_db, task.default_job_queue): continue From 99c7af6343faa068aff0bd26a629c6ccb37b4d19 Mon Sep 17 00:00:00 2001 From: Jose Javier Merchante Date: Tue, 11 Nov 2025 12:03:51 +0100 Subject: [PATCH 2/2] Add SortingHat tasks and related APIs This commit includes SortingHat tasks to GrimoireLab scheduler. This allows to run SortingHat jobs from GrimoireLab, such as affiliate, unify, genderize, import identities, and generate recommendations. Include new generic API endpoints to list, create, retrieve, reschedule, and cancel tasks: Include the following API endpoints: ``` /api/v1/tasks/ /api/v1/tasks// /api/v1/tasks/// /api/v1/tasks///reschedule/ /api/v1/tasks///cancel/ /api/v1/tasks///jobs/ /api/v1/tasks///jobs// /api/v1/tasks///jobs//logs ``` Currently, could be 'eventizer', 'recommend_affiliations', 'affiliate', 'recommend_matches', 'unify', 'recommend_gender', 'genderize', or 'import_identities'. Signed-off-by: Jose Javier Merchante # Conflicts: # poetry.lock --- README.md | 9 + .../sortinghat-tasks-and-updated-api.yml | 8 + src/grimoirelab/core/app/urls.py | 9 +- src/grimoirelab/core/config/settings.py | 2 + src/grimoirelab/core/runner/commands/run.py | 30 + src/grimoirelab/core/scheduler/api.py | 327 ++--- src/grimoirelab/core/scheduler/jobs.py | 2 +- src/grimoirelab/core/scheduler/models.py | 14 +- src/grimoirelab/core/scheduler/serializers.py | 265 ++++ ...erizetask_importidentitiestask_and_more.py | 355 ++++++ .../core/scheduler/tasks/models.py | 353 +++++- src/grimoirelab/core/scheduler/urls.py | 31 +- src/grimoirelab/core/scheduler/views.py | 118 -- tests/unit/scheduler/test_api.py | 1119 +++++++++++++++++ tests/unit/scheduler/test_models.py | 3 + tests/unit/scheduler/test_scheduler.py | 6 + tests/unit/scheduler/test_tasks_sortinghat.py | 464 +++++++ ui/src/services/api/scheduler.js | 16 +- 18 files changed, 2825 insertions(+), 306 deletions(-) create mode 100644 releases/unreleased/sortinghat-tasks-and-updated-api.yml create mode 100644 src/grimoirelab/core/scheduler/serializers.py create mode 100644 src/grimoirelab/core/scheduler/tasks/migrations/0006_affiliatetask_genderizetask_importidentitiestask_and_more.py delete mode 100644 src/grimoirelab/core/scheduler/views.py create mode 100644 tests/unit/scheduler/test_api.py create mode 100644 tests/unit/scheduler/test_tasks_sortinghat.py diff --git a/README.md b/README.md index fc20829..955bb7b 100644 --- a/README.md +++ b/README.md @@ -179,6 +179,15 @@ and store it into SortingHat database. grimoirelab run ushers ``` +### Run SortingHat workers + +Run the SortingHat workers that will run jobs related to identities management. +Like affiliate, unify, genderize, recommendations and identities importers. + +``` +grimoirelab run sortinghat-workers +``` + #### Run the backend API Run the backend API server that will provide a REST API to manage the diff --git a/releases/unreleased/sortinghat-tasks-and-updated-api.yml b/releases/unreleased/sortinghat-tasks-and-updated-api.yml new file mode 100644 index 0000000..7a2b638 --- /dev/null +++ b/releases/unreleased/sortinghat-tasks-and-updated-api.yml @@ -0,0 +1,8 @@ +--- +title: SortingHat tasks and updated API +category: added +author: Jose Javier Merchante +issue: null +notes: > + Include SortingHat tasks in GrimoireLab scheduler and extend the + API to support them alongside eventizer tasks. diff --git a/src/grimoirelab/core/app/urls.py b/src/grimoirelab/core/app/urls.py index 3804b70..86767cd 100644 --- a/src/grimoirelab/core/app/urls.py +++ b/src/grimoirelab/core/app/urls.py @@ -13,14 +13,14 @@ from sortinghat.core.views import SortingHatGraphQLView from ..views import api_login -from grimoirelab.core.scheduler.urls import urlpatterns as sched_urlpatterns +from grimoirelab.core.scheduler.urls import tasks_urlpatterns from grimoirelab.core.datasources.urls import ecosystems_urlpatterns + urlpatterns = [ path("login", api_login, name="api_login"), path("token/", TokenObtainPairView.as_view(), name="token_obtain_pair"), path("token/refresh/", TokenRefreshView.as_view(), name="token_refresh"), - path("scheduler/", include(sched_urlpatterns)), path( "api/v1/", include( @@ -34,10 +34,13 @@ SortingHatGraphQLView.as_view(graphiql=settings.DEBUG, schema=schema) ), ), + # Tasks API + path("tasks/", include(tasks_urlpatterns)), ] ), ), re_path( - r"^(?!static|scheduler|datasources).*$", TemplateView.as_view(template_name="index.html") + r"^(?!static|login|token|api).*$", + TemplateView.as_view(template_name="index.html"), ), ] diff --git a/src/grimoirelab/core/config/settings.py b/src/grimoirelab/core/config/settings.py index 4f12a56..dfa063c 100644 --- a/src/grimoirelab/core/config/settings.py +++ b/src/grimoirelab/core/config/settings.py @@ -293,6 +293,7 @@ # GRIMOIRELAB_Q_EVENTIZER_JOBS = os.environ.get("GRIMOIRELAB_Q_EVENTIZER_JOBS", "eventizer_jobs") +GRIMOIRELAB_Q_SORTINGHAT_JOBS = os.environ.get("GRIMOIRELAB_Q_SORTINGHAT_JOBS", "sortinghat_jobs") _RQ_DATABASE = { "HOST": os.environ.get("GRIMOIRELAB_REDIS_HOST", "127.0.0.1"), @@ -304,6 +305,7 @@ RQ_QUEUES = { "default": _RQ_DATABASE, GRIMOIRELAB_Q_EVENTIZER_JOBS: _RQ_DATABASE, + GRIMOIRELAB_Q_SORTINGHAT_JOBS: _RQ_DATABASE, } GRIMOIRELAB_EVENTS_STREAM_NAME = os.environ.get("GRIMOIRELAB_EVENTS_STREAM_NAME", "events") diff --git a/src/grimoirelab/core/runner/commands/run.py b/src/grimoirelab/core/runner/commands/run.py index 31db20e..1f72400 100644 --- a/src/grimoirelab/core/runner/commands/run.py +++ b/src/grimoirelab/core/runner/commands/run.py @@ -209,6 +209,36 @@ def eventizers(workers: int, verbose: bool, burst: bool): ) +@run.command() +@worker_options(workers=5) +def sortinghat_workers(workers: int, verbose: bool, burst: bool): + """Start a pool of SortingHat workers. + + The workers on the pool will process SortingHat related tasks. + + The number of workers running in the pool can be defined with the + parameter '--workers'. + + To enable verbose mode, use the '--verbose' flag. + + If the '--burst' flag is enabled, the pool will process all the events + and exit. + + Workers get jobs from the GRIMOIRELAB_Q_SORTINGHAT_JOBS queue defined + in the configuration file. + """ + _wait_redis_ready() + _wait_database_ready() + + django.core.management.call_command( + "rqworker-pool", + settings.GRIMOIRELAB_Q_SORTINGHAT_JOBS, + num_workers=workers, + burst=burst, + verbosity=3 if verbose else 1, + ) + + def _sleep_backoff(attempt: int) -> None: """Sleep with exponential backoff""" diff --git a/src/grimoirelab/core/scheduler/api.py b/src/grimoirelab/core/scheduler/api.py index 7c3663b..1e1a5f3 100644 --- a/src/grimoirelab/core/scheduler/api.py +++ b/src/grimoirelab/core/scheduler/api.py @@ -16,8 +16,6 @@ # along with this program. If not, see . # -import django_rq - from django.db.models import ( F, OuterRef, @@ -27,148 +25,65 @@ from rest_framework import ( filters, generics, - pagination, response, - serializers, + status, + views, ) +from rest_framework.exceptions import ValidationError +from .errors import NotFoundError from .models import SchedulerStatus, get_registered_task_model -from .tasks.models import EventizerTask - - -class EventizerPaginator(pagination.PageNumberPagination): - page_size = 25 - page_size_query_param = "size" - max_page_size = 100 - - def get_paginated_response(self, data): - return response.Response( - { - "links": {"next": self.get_next_link(), "previous": self.get_previous_link()}, - "count": self.page.paginator.count, - "page": self.page.number, - "total_pages": self.page.paginator.num_pages, - "results": data, - } - ) - - -class EventizerTaskListSerializer(serializers.ModelSerializer): - status = serializers.CharField(source="get_status_display") - last_jobs = serializers.SerializerMethodField() - - class Meta: - model = EventizerTask - fields = [ - "uuid", - "status", - "runs", - "failures", - "last_run", - "last_jobs", - "scheduled_at", - "datasource_type", - "datasource_category", - ] - - def get_last_jobs(self, obj): - job_klass = get_registered_task_model("eventizer")[1] - jobs = job_klass.objects.filter(task=obj).order_by("-job_num")[:10] - return EventizerJobSummarySerializer(jobs, many=True).data - - -class EventizerJobListSerializer(serializers.ModelSerializer): - status = serializers.CharField(source="get_status_display") - - class Meta: - model = get_registered_task_model("eventizer")[1] - fields = [ - "uuid", - "job_num", - "status", - "scheduled_at", - "finished_at", - "queue", - ] - - -class EventizerTaskSerializer(serializers.ModelSerializer): - status = serializers.CharField(source="get_status_display") - - class Meta: - model = EventizerTask - fields = [ - "uuid", - "status", - "runs", - "failures", - "last_run", - "job_interval", - "scheduled_at", - "datasource_type", - "datasource_category", - ] - - -class EventizerJobSummarySerializer(serializers.ModelSerializer): - status = serializers.CharField(source="get_status_display") - - class Meta: - model = get_registered_task_model("eventizer")[1] - fields = [ - "uuid", - "job_num", - "status", - "scheduled_at", - "finished_at", - ] - - -class EventizerJobSerializer(serializers.ModelSerializer): - status = serializers.CharField(source="get_status_display") - progress = serializers.SerializerMethodField() - - class Meta: - model = get_registered_task_model("eventizer")[1] - fields = [ - "uuid", - "job_num", - "status", - "scheduled_at", - "finished_at", - "queue", - "progress", - ] - - def get_progress(self, obj): - if obj.status == SchedulerStatus.RUNNING: - rq_job = django_rq.get_queue(obj.queue).fetch_job(obj.uuid) - if rq_job: - return rq_job.progress.to_dict() - return obj.progress - - -class EventizerJobLogsSerializer(serializers.ModelSerializer): - logs = serializers.SerializerMethodField() - - class Meta: - model = get_registered_task_model("eventizer")[1] - fields = [ - "uuid", - "logs", - ] - - def get_logs(self, obj): - if obj.status == SchedulerStatus.RUNNING: - rq_job = django_rq.get_queue(obj.queue).fetch_job(obj.uuid) - if rq_job: - return rq_job.job_log - return obj.logs - - -class EventizerTaskList(generics.ListAPIView): - serializer_class = EventizerTaskListSerializer - pagination_class = EventizerPaginator +from .scheduler import schedule_task, reschedule_task, cancel_task +from .serializers import ( + EventizerTaskSerializer, + SchedulerPaginator, + JobSummarySerializer, + JobSerializer, + JobLogsSerializer, + AffiliateTaskSerializer, + GenderizeTaskSerializer, + UnifyTaskSerializer, + ImportIdentitiesTaskSerializer, + RecommendAffiliationsTaskSerializer, + RecommendGenderTaskSerializer, + RecommendMatchesTaskSerializer, +) +from .tasks.models import ( + EventizerTask, + AffiliateTask, + UnifyTask, + GenderizeTask, + ImportIdentitiesTask, + RecommendAffiliationsTask, + RecommendGenderTask, + RecommendMatchesTask, +) + + +TASKS_SERIALIZERS = { + EventizerTask.TASK_TYPE: EventizerTaskSerializer, + AffiliateTask.TASK_TYPE: AffiliateTaskSerializer, + GenderizeTask.TASK_TYPE: GenderizeTaskSerializer, + UnifyTask.TASK_TYPE: UnifyTaskSerializer, + ImportIdentitiesTask.TASK_TYPE: ImportIdentitiesTaskSerializer, + RecommendAffiliationsTask.TASK_TYPE: RecommendAffiliationsTaskSerializer, + RecommendGenderTask.TASK_TYPE: RecommendGenderTaskSerializer, + RecommendMatchesTask.TASK_TYPE: RecommendMatchesTaskSerializer, +} + + +class ListTaskTypes(views.APIView): + """API view to list all registered task types.""" + + def get(self, request, *args, **kwargs): + task_types = list(TASKS_SERIALIZERS.keys()) + return response.Response(task_types, status=200) + + +class ListCreateTasks(generics.ListCreateAPIView): + """API view to list all tasks paginated or create a new task.""" + + pagination_class = SchedulerPaginator filter_backends = [filters.OrderingFilter] ordering_fields = [ "scheduled_at", @@ -176,8 +91,20 @@ class EventizerTaskList(generics.ListAPIView): ] ordering = [F("last_run").desc(nulls_first=True)] + def get_serializer_class(self): + task_type = self.kwargs["task_type"] + try: + return TASKS_SERIALIZERS[task_type] + except KeyError: + raise ValidationError(f"Unknown task type: '{task_type}'") + def get_queryset(self): - queryset = EventizerTask.objects.all() + task_type = self.kwargs["task_type"] + try: + queryset = get_registered_task_model(task_type)[0].objects.all() + except KeyError: + raise ValidationError(f"Unknown task type: '{task_type}'") + status = self.request.query_params.get("status") last_run_status = self.request.query_params.get("last_run_status") if status is not None: @@ -187,7 +114,7 @@ def get_queryset(self): queryset = queryset.filter(status=status) if last_run_status is not None: annotation = Subquery( - get_registered_task_model("eventizer")[1] + get_registered_task_model(task_type)[1] .objects.filter(task_id=OuterRef("id"), finished_at__isnull=False) .order_by("-job_num") .values("status")[:1] @@ -197,46 +124,122 @@ def get_queryset(self): ) return queryset + def create(self, request, *args, **kwargs): + task_type = self.kwargs["task_type"] + serializer = self.get_serializer(data=request.data) + if not serializer.is_valid(): + return response.Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) + task = schedule_task(task_type, **serializer.create_scheduler_task_args()) + output_serializer = self.get_serializer(task) + return response.Response(output_serializer.data, status=status.HTTP_201_CREATED) + + +class RetrieveDestroyTask(generics.RetrieveDestroyAPIView): + """API view to retrieve or delete a specific task.""" -class EventizerTaskDetail(generics.RetrieveAPIView): - queryset = EventizerTask.objects.all() lookup_field = "uuid" - serializer_class = EventizerTaskSerializer - pagination_class = EventizerPaginator + def get_serializer_class(self): + task_type = self.kwargs["task_type"] + try: + return TASKS_SERIALIZERS[task_type] + except KeyError: + raise ValidationError(f"Unknown task type: '{task_type}'") + + def get_queryset(self): + task_type = self.kwargs["task_type"] + try: + task_model = get_registered_task_model(task_type)[0] + except KeyError: + raise ValidationError(f"Unknown task type: '{task_type}'") + return task_model.objects.all() + + +class RescheduleTask(views.APIView): + """API view to reschedule a specific task.""" + + def post(self, request, *args, **kwargs): + task_id = self.kwargs["uuid"] + try: + reschedule_task(task_id) + except NotFoundError: + return response.Response( + {"detail": f"Task with id '{task_id}' not found."}, + status=status.HTTP_404_NOT_FOUND, + ) + data = [f"Task '{task_id}' rescheduled."] + return response.Response(data, status=200) + + +class CancelTask(views.APIView): + """API view to cancel a specific task.""" + + def post(self, request, *args, **kwargs): + task_id = self.kwargs["uuid"] + + cancel_task(task_id) + + data = [f"Task '{task_id}' cancelled."] + return response.Response(data, status=200) -class EventizerJobList(generics.ListAPIView): - serializer_class = EventizerJobListSerializer - pagination_class = EventizerPaginator + +class ListJobs(generics.ListAPIView): + """API view to list all jobs paginated for a specific task.""" + + serializer_class = JobSummarySerializer + pagination_class = SchedulerPaginator + filter_backends = [filters.OrderingFilter] + ordering_fields = [ + "scheduled_at", + "last_run", + ] def get_queryset(self): + task_type = self.kwargs["task_type"] task_id = self.kwargs["task_id"] - queryset = ( - get_registered_task_model("eventizer")[1] - .objects.filter(task__uuid=task_id) - .order_by("-scheduled_at") - ) + try: + job_model = get_registered_task_model(task_type)[1] + except KeyError: + raise ValidationError(f"Unknown task type: '{task_type}'") + + queryset = job_model.objects.filter(task__uuid=task_id).order_by("-job_num") + status = self.request.query_params.get("status") if status is not None: queryset = queryset.filter(status=status) return queryset -class EventizerJobDetail(generics.RetrieveAPIView): +class JobDetail(generics.RetrieveAPIView): + """API view to retrieve detailed information about a specific job task.""" + lookup_field = "uuid" - serializer_class = EventizerJobSerializer - pagination_class = EventizerPaginator + serializer_class = JobSerializer + pagination_class = SchedulerPaginator def get_queryset(self): + task_type = self.kwargs["task_type"] task_id = self.kwargs["task_id"] - return get_registered_task_model("eventizer")[1].objects.filter(task__uuid=task_id) + try: + job_model = get_registered_task_model(task_type)[1] + except KeyError: + raise ValidationError(f"Unknown task type: '{task_type}'") + + return job_model.objects.filter(task__uuid=task_id) -class EventizerJobLogs(generics.RetrieveAPIView): +class JobLogs(generics.RetrieveAPIView): + """API view to retrieve log entries for a specific job task.""" + lookup_field = "uuid" - serializer_class = EventizerJobLogsSerializer - pagination_class = EventizerPaginator + serializer_class = JobLogsSerializer def get_queryset(self): + task_type = self.kwargs["task_type"] task_id = self.kwargs["task_id"] - return get_registered_task_model("eventizer")[1].objects.filter(task__uuid=task_id) + try: + job_model = get_registered_task_model(task_type)[1] + except KeyError: + raise ValidationError(f"Unknown task type: '{task_type}'") + + return job_model.objects.filter(task__uuid=task_id) diff --git a/src/grimoirelab/core/scheduler/jobs.py b/src/grimoirelab/core/scheduler/jobs.py index 22ddc6d..4a1daa0 100644 --- a/src/grimoirelab/core/scheduler/jobs.py +++ b/src/grimoirelab/core/scheduler/jobs.py @@ -47,7 +47,7 @@ class GrimoireLabJob(rq.job.Job): """ # Default packages to log - PACKAGES_TO_LOG = [__name__, "chronicler", "perceval", "rq"] + PACKAGES_TO_LOG = [__name__, "chronicler", "perceval", "rq", "sortinghat"] def __init__(self, *args, **kwargs) -> None: super().__init__(*args, **kwargs) diff --git a/src/grimoirelab/core/scheduler/models.py b/src/grimoirelab/core/scheduler/models.py index a8999d9..8fa1c6f 100644 --- a/src/grimoirelab/core/scheduler/models.py +++ b/src/grimoirelab/core/scheduler/models.py @@ -22,6 +22,7 @@ import uuid from django.conf import settings +from django.contrib.auth.models import User from django.core.serializers.json import DjangoJSONEncoder from django.db.models import ( BooleanField, @@ -224,6 +225,15 @@ def default(self, o): return super().default(o) +class JobArgsEncoder(DjangoJSONEncoder): + """JSON encoder for job arguments.""" + + def default(self, o): + if isinstance(o, User): + return o.username + return DjangoJSONEncoder.default(self, o) + + class Job(BaseModel): """Base class for jobs executed by the scheduler. @@ -243,7 +253,7 @@ class Job(BaseModel): # Job data uuid = CharField(max_length=MAX_SIZE_CHAR_INDEX, unique=True) job_num = PositiveIntegerField(null=False) - job_args = JSONField(null=True, default=None) + job_args = JSONField(encoder=JobArgsEncoder, null=True, default=None) # Status status = IntegerField(choices=SchedulerStatus.choices, default=SchedulerStatus.ENQUEUED) @@ -362,7 +372,7 @@ def get_registered_task_model(task_type: str) -> tuple[type[Task], type[Job]]: return GRIMOIRELAB_TASK_MODELS[task_type] -def get_all_registered_task_models() -> Iterator[type[Task], type[Job]]: +def get_all_registered_task_models() -> Iterator[tuple[type[Task], type[Job]]]: """Return all registered task models. :returns: an iterator with all registered task classes and diff --git a/src/grimoirelab/core/scheduler/serializers.py b/src/grimoirelab/core/scheduler/serializers.py new file mode 100644 index 0000000..29da1af --- /dev/null +++ b/src/grimoirelab/core/scheduler/serializers.py @@ -0,0 +1,265 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) GrimoireLab Contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# + +import django_rq + +from rest_framework import ( + pagination, + response, + serializers, +) + +from .models import SchedulerStatus +from .tasks.models import ( + EventizerTask, + AffiliateTask, + UnifyTask, + GenderizeTask, + ImportIdentitiesTask, + RecommendAffiliationsTask, + RecommendGenderTask, + RecommendMatchesTask, +) + + +class SchedulerPaginator(pagination.PageNumberPagination): + """Paginator for scheduler serializers. + + It extends the default PageNumberPagination to set a default + page size and a maximum page size. + """ + + page_size = 25 + page_size_query_param = "size" + max_page_size = 100 + + def get_paginated_response(self, data): + return response.Response( + { + "links": {"next": self.get_next_link(), "previous": self.get_previous_link()}, + "count": self.page.paginator.count, + "page": self.page.number, + "total_pages": self.page.paginator.num_pages, + "results": data, + } + ) + + +class TaskSerializerMixin(serializers.ModelSerializer): + """Mixin to serialize common fields of all Task models. + + This class defines the common fields and methods to be used by + the task serializers. + Subclasses must define the Meta class with the 'model' + to be serialized, the 'fields' that will be included in the + Response serialization, and the 'task_args' that will be + validated to create a new task. + """ + + uuid = serializers.CharField(read_only=True) + status = serializers.CharField(source="get_status_display", read_only=True) + runs = serializers.IntegerField(read_only=True) + failures = serializers.IntegerField(read_only=True) + last_run = serializers.DateTimeField(read_only=True) + scheduled_at = serializers.DateTimeField(read_only=True) + last_jobs = serializers.SerializerMethodField(read_only=True) + task_args = serializers.JSONField(required=True) + job_max_retries = serializers.IntegerField(required=False) + job_interval = serializers.IntegerField(required=False) + burst = serializers.BooleanField(required=False) + + class Meta: + model = None + fields = [ + "uuid", + "status", + "runs", + "failures", + "last_run", + "job_interval", + "scheduled_at", + "job_max_retries", + "task_args", + "burst", + "last_jobs", + ] + task_args = ["task_args", "job_interval", "job_max_retries", "burst"] + + def get_last_jobs(self, obj): + """Get the last jobs associated with the task.""" + + jobs = obj.jobs.order_by("-job_num")[:10] + return JobSummarySerializer(jobs, many=True).data + + def create_scheduler_task_args(self) -> dict: + """Extract the arguments to schedule the task from the validated data. + + This method simplifies the creation of new tasks by extracting + the relevant arguments from the validated data using the + 'task_args' defined in the Meta class. + """ + task_args = {} + for key in self.Meta.task_args: + if key in self.validated_data: + task_args[key] = self.validated_data[key] + return task_args + + +class JobSummarySerializer(serializers.Serializer): + """Serializer for job summary information. + + This serializer is used to provide a short summary of jobs + associated with a task. + """ + + uuid = serializers.CharField() + job_num = serializers.IntegerField() + status = serializers.CharField(source="get_status_display") + scheduled_at = serializers.DateTimeField(allow_null=True) + started_at = serializers.DateTimeField(allow_null=True) + finished_at = serializers.DateTimeField(allow_null=True) + + class Meta: + fields = [ + "uuid", + "job_num", + "status", + "scheduled_at", + "started_at", + "finished_at", + "queue", + ] + + +class JobSerializer(JobSummarySerializer): + """Serializer for job details. + + This serializer extends the JobSummarySerializer to include + the progress information. + """ + + progress = serializers.SerializerMethodField() + + class Meta: + fields = JobSummarySerializer.Meta.fields + [ + "progress", + ] + + def get_progress(self, obj): + if obj.status == SchedulerStatus.RUNNING: + rq_job = django_rq.get_queue(obj.queue).fetch_job(obj.uuid) + if rq_job: + return rq_job.progress.to_dict() + return obj.progress + + +class JobLogsSerializer(JobSummarySerializer): + """Serializer for Job logs. + + This serializer provides a summary of the job along with its logs. + """ + + uuid = serializers.CharField() + status = serializers.CharField(source="get_status_display") + logs = serializers.SerializerMethodField() + + class Meta: + fields = [ + "uuid", + "status", + "logs", + ] + + def get_logs(self, obj): + if obj.status == SchedulerStatus.RUNNING: + rq_job = django_rq.get_queue(obj.queue).fetch_job(obj.uuid) + if rq_job: + return rq_job.job_log + return obj.logs + + +class EventizerTaskSerializer(TaskSerializerMixin): + """Serializer for EventizerTask model.""" + + class Meta: + model = EventizerTask + fields = TaskSerializerMixin.Meta.fields + ["datasource_type", "datasource_category"] + task_args = TaskSerializerMixin.Meta.task_args + ["datasource_type", "datasource_category"] + + +class RecommendAffiliationsTaskSerializer(TaskSerializerMixin): + """Serializer for RecommendAffiliationsTask model.""" + + class Meta: + model = RecommendAffiliationsTask + fields = TaskSerializerMixin.Meta.fields + RecommendAffiliationsTask.EXTRA_TASK_FIELDS + task_args = TaskSerializerMixin.Meta.task_args + RecommendAffiliationsTask.EXTRA_TASK_FIELDS + + +class RecommendMatchesTaskSerializer(TaskSerializerMixin): + """Serializer for RecommendMatchesTask model.""" + + class Meta: + model = RecommendMatchesTask + fields = TaskSerializerMixin.Meta.fields + RecommendMatchesTask.EXTRA_TASK_FIELDS + task_args = TaskSerializerMixin.Meta.task_args + RecommendMatchesTask.EXTRA_TASK_FIELDS + + +class RecommendGenderTaskSerializer(TaskSerializerMixin): + """Serializer for RecommendGenderTask model.""" + + class Meta: + model = RecommendGenderTask + fields = TaskSerializerMixin.Meta.fields + RecommendGenderTask.EXTRA_TASK_FIELDS + task_args = TaskSerializerMixin.Meta.task_args + RecommendGenderTask.EXTRA_TASK_FIELDS + + +class AffiliateTaskSerializer(TaskSerializerMixin): + """Serializer for AffiliateTask model.""" + + class Meta: + model = AffiliateTask + fields = TaskSerializerMixin.Meta.fields + AffiliateTask.EXTRA_TASK_FIELDS + task_args = TaskSerializerMixin.Meta.task_args + AffiliateTask.EXTRA_TASK_FIELDS + + +class UnifyTaskSerializer(TaskSerializerMixin): + """Serializer for UnifyTask model.""" + + class Meta: + model = UnifyTask + fields = TaskSerializerMixin.Meta.fields + UnifyTask.EXTRA_TASK_FIELDS + task_args = TaskSerializerMixin.Meta.task_args + UnifyTask.EXTRA_TASK_FIELDS + + +class GenderizeTaskSerializer(TaskSerializerMixin): + """Serializer for GenderizeTask model.""" + + class Meta: + model = GenderizeTask + fields = TaskSerializerMixin.Meta.fields + GenderizeTask.EXTRA_TASK_FIELDS + task_args = TaskSerializerMixin.Meta.task_args + GenderizeTask.EXTRA_TASK_FIELDS + + +class ImportIdentitiesTaskSerializer(TaskSerializerMixin): + """Serializer for ImportIdentitiesTask model.""" + + class Meta: + model = ImportIdentitiesTask + fields = TaskSerializerMixin.Meta.fields + ImportIdentitiesTask.EXTRA_TASK_FIELDS + task_args = TaskSerializerMixin.Meta.task_args + ImportIdentitiesTask.EXTRA_TASK_FIELDS diff --git a/src/grimoirelab/core/scheduler/tasks/migrations/0006_affiliatetask_genderizetask_importidentitiestask_and_more.py b/src/grimoirelab/core/scheduler/tasks/migrations/0006_affiliatetask_genderizetask_importidentitiestask_and_more.py new file mode 100644 index 0000000..4f49a90 --- /dev/null +++ b/src/grimoirelab/core/scheduler/tasks/migrations/0006_affiliatetask_genderizetask_importidentitiestask_and_more.py @@ -0,0 +1,355 @@ +# Generated by Django 5.2.10 on 2026-01-30 16:19 + +import datetime +import django.db.models.deletion +import grimoirelab.core.models +import grimoirelab.core.scheduler.models +import grimoirelab_toolkit.datetime +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('tasks', '0005_canceled_status'), + ] + + operations = [ + migrations.CreateModel( + name='AffiliateTask', + fields=[ + ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('created_at', grimoirelab.core.models.CreationDateTimeField(default=grimoirelab_toolkit.datetime.datetime_utcnow, editable=False)), + ('uuid', models.CharField(max_length=191, unique=True)), + ('task_type', models.CharField(max_length=128)), + ('task_args', models.JSONField(default=None, null=True)), + ('status', models.IntegerField(choices=[(1, 'new'), (2, 'enqueued'), (3, 'running'), (4, 'completed'), (5, 'failed'), (6, 'recovery'), (7, 'canceled')], default=1)), + ('runs', models.PositiveIntegerField(default=0)), + ('failures', models.PositiveIntegerField(default=0)), + ('last_run', models.DateTimeField(default=None, null=True)), + ('scheduled_at', models.DateTimeField(default=None, null=True)), + ('job_interval', models.PositiveIntegerField(default=7200)), + ('job_max_retries', models.PositiveIntegerField(default=5, null=True)), + ('burst', models.BooleanField(default=False)), + ('uuids', models.JSONField(default=None, null=True)), + ('last_modified', models.DateTimeField(default=datetime.datetime(1900, 1, 1, 0, 0, tzinfo=datetime.timezone.utc))), + ], + options={ + 'abstract': False, + }, + ), + migrations.CreateModel( + name='GenderizeTask', + fields=[ + ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('created_at', grimoirelab.core.models.CreationDateTimeField(default=grimoirelab_toolkit.datetime.datetime_utcnow, editable=False)), + ('last_modified', grimoirelab.core.models.LastModificationDateTimeField(default=grimoirelab_toolkit.datetime.datetime_utcnow, editable=False)), + ('uuid', models.CharField(max_length=191, unique=True)), + ('task_type', models.CharField(max_length=128)), + ('task_args', models.JSONField(default=None, null=True)), + ('status', models.IntegerField(choices=[(1, 'new'), (2, 'enqueued'), (3, 'running'), (4, 'completed'), (5, 'failed'), (6, 'recovery'), (7, 'canceled')], default=1)), + ('runs', models.PositiveIntegerField(default=0)), + ('failures', models.PositiveIntegerField(default=0)), + ('last_run', models.DateTimeField(default=None, null=True)), + ('scheduled_at', models.DateTimeField(default=None, null=True)), + ('job_interval', models.PositiveIntegerField(default=7200)), + ('job_max_retries', models.PositiveIntegerField(default=5, null=True)), + ('burst', models.BooleanField(default=False)), + ('uuids', models.JSONField(default=None, null=True)), + ('exclude', models.BooleanField(default=True)), + ('no_strict_matching', models.BooleanField(default=False)), + ], + options={ + 'abstract': False, + }, + ), + migrations.CreateModel( + name='ImportIdentitiesTask', + fields=[ + ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('created_at', grimoirelab.core.models.CreationDateTimeField(default=grimoirelab_toolkit.datetime.datetime_utcnow, editable=False)), + ('last_modified', grimoirelab.core.models.LastModificationDateTimeField(default=grimoirelab_toolkit.datetime.datetime_utcnow, editable=False)), + ('uuid', models.CharField(max_length=191, unique=True)), + ('task_type', models.CharField(max_length=128)), + ('task_args', models.JSONField(default=None, null=True)), + ('status', models.IntegerField(choices=[(1, 'new'), (2, 'enqueued'), (3, 'running'), (4, 'completed'), (5, 'failed'), (6, 'recovery'), (7, 'canceled')], default=1)), + ('runs', models.PositiveIntegerField(default=0)), + ('failures', models.PositiveIntegerField(default=0)), + ('last_run', models.DateTimeField(default=None, null=True)), + ('scheduled_at', models.DateTimeField(default=None, null=True)), + ('job_interval', models.PositiveIntegerField(default=7200)), + ('job_max_retries', models.PositiveIntegerField(default=5, null=True)), + ('burst', models.BooleanField(default=False)), + ('backend_name', models.CharField(max_length=255)), + ('url', models.CharField(max_length=2048)), + ], + options={ + 'abstract': False, + }, + ), + migrations.CreateModel( + name='RecommendAffiliationsTask', + fields=[ + ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('created_at', grimoirelab.core.models.CreationDateTimeField(default=grimoirelab_toolkit.datetime.datetime_utcnow, editable=False)), + ('uuid', models.CharField(max_length=191, unique=True)), + ('task_type', models.CharField(max_length=128)), + ('task_args', models.JSONField(default=None, null=True)), + ('status', models.IntegerField(choices=[(1, 'new'), (2, 'enqueued'), (3, 'running'), (4, 'completed'), (5, 'failed'), (6, 'recovery'), (7, 'canceled')], default=1)), + ('runs', models.PositiveIntegerField(default=0)), + ('failures', models.PositiveIntegerField(default=0)), + ('last_run', models.DateTimeField(default=None, null=True)), + ('scheduled_at', models.DateTimeField(default=None, null=True)), + ('job_interval', models.PositiveIntegerField(default=7200)), + ('job_max_retries', models.PositiveIntegerField(default=5, null=True)), + ('burst', models.BooleanField(default=False)), + ('uuids', models.JSONField(default=None, null=True)), + ('last_modified', models.DateTimeField(default=datetime.datetime(1900, 1, 1, 0, 0, tzinfo=datetime.timezone.utc))), + ], + options={ + 'abstract': False, + }, + ), + migrations.CreateModel( + name='RecommendGenderTask', + fields=[ + ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('created_at', grimoirelab.core.models.CreationDateTimeField(default=grimoirelab_toolkit.datetime.datetime_utcnow, editable=False)), + ('last_modified', grimoirelab.core.models.LastModificationDateTimeField(default=grimoirelab_toolkit.datetime.datetime_utcnow, editable=False)), + ('uuid', models.CharField(max_length=191, unique=True)), + ('task_type', models.CharField(max_length=128)), + ('task_args', models.JSONField(default=None, null=True)), + ('status', models.IntegerField(choices=[(1, 'new'), (2, 'enqueued'), (3, 'running'), (4, 'completed'), (5, 'failed'), (6, 'recovery'), (7, 'canceled')], default=1)), + ('runs', models.PositiveIntegerField(default=0)), + ('failures', models.PositiveIntegerField(default=0)), + ('last_run', models.DateTimeField(default=None, null=True)), + ('scheduled_at', models.DateTimeField(default=None, null=True)), + ('job_interval', models.PositiveIntegerField(default=7200)), + ('job_max_retries', models.PositiveIntegerField(default=5, null=True)), + ('burst', models.BooleanField(default=False)), + ('uuids', models.JSONField(default=None, null=True)), + ('exclude', models.BooleanField(default=True)), + ('no_strict_matching', models.BooleanField(default=False)), + ], + options={ + 'abstract': False, + }, + ), + migrations.CreateModel( + name='RecommendMatchesTask', + fields=[ + ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('created_at', grimoirelab.core.models.CreationDateTimeField(default=grimoirelab_toolkit.datetime.datetime_utcnow, editable=False)), + ('uuid', models.CharField(max_length=191, unique=True)), + ('task_type', models.CharField(max_length=128)), + ('task_args', models.JSONField(default=None, null=True)), + ('status', models.IntegerField(choices=[(1, 'new'), (2, 'enqueued'), (3, 'running'), (4, 'completed'), (5, 'failed'), (6, 'recovery'), (7, 'canceled')], default=1)), + ('runs', models.PositiveIntegerField(default=0)), + ('failures', models.PositiveIntegerField(default=0)), + ('last_run', models.DateTimeField(default=None, null=True)), + ('scheduled_at', models.DateTimeField(default=None, null=True)), + ('job_interval', models.PositiveIntegerField(default=7200)), + ('job_max_retries', models.PositiveIntegerField(default=5, null=True)), + ('burst', models.BooleanField(default=False)), + ('criteria', models.JSONField(default=None, null=True)), + ('source_uuids', models.JSONField(default=None, null=True)), + ('target_uuids', models.JSONField(default=None, null=True)), + ('exclude', models.BooleanField(default=True)), + ('strict', models.BooleanField(default=True)), + ('match_source', models.BooleanField(default=False)), + ('guess_github_user', models.BooleanField(default=False)), + ('last_modified', models.DateTimeField(default=datetime.datetime(1900, 1, 1, 0, 0, tzinfo=datetime.timezone.utc))), + ], + options={ + 'abstract': False, + }, + ), + migrations.CreateModel( + name='UnifyTask', + fields=[ + ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('created_at', grimoirelab.core.models.CreationDateTimeField(default=grimoirelab_toolkit.datetime.datetime_utcnow, editable=False)), + ('uuid', models.CharField(max_length=191, unique=True)), + ('task_type', models.CharField(max_length=128)), + ('task_args', models.JSONField(default=None, null=True)), + ('status', models.IntegerField(choices=[(1, 'new'), (2, 'enqueued'), (3, 'running'), (4, 'completed'), (5, 'failed'), (6, 'recovery'), (7, 'canceled')], default=1)), + ('runs', models.PositiveIntegerField(default=0)), + ('failures', models.PositiveIntegerField(default=0)), + ('last_run', models.DateTimeField(default=None, null=True)), + ('scheduled_at', models.DateTimeField(default=None, null=True)), + ('job_interval', models.PositiveIntegerField(default=7200)), + ('job_max_retries', models.PositiveIntegerField(default=5, null=True)), + ('burst', models.BooleanField(default=False)), + ('criteria', models.JSONField(default=None, null=True)), + ('source_uuids', models.JSONField(default=None, null=True)), + ('target_uuids', models.JSONField(default=None, null=True)), + ('exclude', models.BooleanField(default=True)), + ('strict', models.BooleanField(default=True)), + ('match_source', models.BooleanField(default=False)), + ('guess_github_user', models.BooleanField(default=False)), + ('last_modified', models.DateTimeField(default=datetime.datetime(1900, 1, 1, 0, 0, tzinfo=datetime.timezone.utc))), + ], + options={ + 'abstract': False, + }, + ), + migrations.AlterField( + model_name='eventizerjob', + name='job_args', + field=models.JSONField(default=None, encoder=grimoirelab.core.scheduler.models.JobArgsEncoder, null=True), + ), + migrations.CreateModel( + name='AffiliateJob', + fields=[ + ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('created_at', grimoirelab.core.models.CreationDateTimeField(default=grimoirelab_toolkit.datetime.datetime_utcnow, editable=False)), + ('last_modified', grimoirelab.core.models.LastModificationDateTimeField(default=grimoirelab_toolkit.datetime.datetime_utcnow, editable=False)), + ('uuid', models.CharField(max_length=191, unique=True)), + ('job_num', models.PositiveIntegerField()), + ('job_args', models.JSONField(default=None, encoder=grimoirelab.core.scheduler.models.JobArgsEncoder, null=True)), + ('status', models.IntegerField(choices=[(1, 'new'), (2, 'enqueued'), (3, 'running'), (4, 'completed'), (5, 'failed'), (6, 'recovery'), (7, 'canceled')], default=2)), + ('progress', models.JSONField(default=None, encoder=grimoirelab.core.scheduler.models.JobResultEncoder, null=True)), + ('logs', models.JSONField(default=None, null=True)), + ('queue', models.CharField(default=None, max_length=128, null=True)), + ('scheduled_at', models.DateTimeField(default=None, null=True)), + ('started_at', models.DateTimeField(default=None, null=True)), + ('finished_at', models.DateTimeField(default=None, null=True)), + ('task', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='jobs', to='tasks.affiliatetask')), + ], + options={ + 'abstract': False, + }, + ), + migrations.CreateModel( + name='GenderizeJob', + fields=[ + ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('created_at', grimoirelab.core.models.CreationDateTimeField(default=grimoirelab_toolkit.datetime.datetime_utcnow, editable=False)), + ('last_modified', grimoirelab.core.models.LastModificationDateTimeField(default=grimoirelab_toolkit.datetime.datetime_utcnow, editable=False)), + ('uuid', models.CharField(max_length=191, unique=True)), + ('job_num', models.PositiveIntegerField()), + ('job_args', models.JSONField(default=None, encoder=grimoirelab.core.scheduler.models.JobArgsEncoder, null=True)), + ('status', models.IntegerField(choices=[(1, 'new'), (2, 'enqueued'), (3, 'running'), (4, 'completed'), (5, 'failed'), (6, 'recovery'), (7, 'canceled')], default=2)), + ('progress', models.JSONField(default=None, encoder=grimoirelab.core.scheduler.models.JobResultEncoder, null=True)), + ('logs', models.JSONField(default=None, null=True)), + ('queue', models.CharField(default=None, max_length=128, null=True)), + ('scheduled_at', models.DateTimeField(default=None, null=True)), + ('started_at', models.DateTimeField(default=None, null=True)), + ('finished_at', models.DateTimeField(default=None, null=True)), + ('task', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='jobs', to='tasks.genderizetask')), + ], + options={ + 'abstract': False, + }, + ), + migrations.CreateModel( + name='ImportIdentitiesJob', + fields=[ + ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('created_at', grimoirelab.core.models.CreationDateTimeField(default=grimoirelab_toolkit.datetime.datetime_utcnow, editable=False)), + ('last_modified', grimoirelab.core.models.LastModificationDateTimeField(default=grimoirelab_toolkit.datetime.datetime_utcnow, editable=False)), + ('uuid', models.CharField(max_length=191, unique=True)), + ('job_num', models.PositiveIntegerField()), + ('job_args', models.JSONField(default=None, encoder=grimoirelab.core.scheduler.models.JobArgsEncoder, null=True)), + ('status', models.IntegerField(choices=[(1, 'new'), (2, 'enqueued'), (3, 'running'), (4, 'completed'), (5, 'failed'), (6, 'recovery'), (7, 'canceled')], default=2)), + ('progress', models.JSONField(default=None, encoder=grimoirelab.core.scheduler.models.JobResultEncoder, null=True)), + ('logs', models.JSONField(default=None, null=True)), + ('queue', models.CharField(default=None, max_length=128, null=True)), + ('scheduled_at', models.DateTimeField(default=None, null=True)), + ('started_at', models.DateTimeField(default=None, null=True)), + ('finished_at', models.DateTimeField(default=None, null=True)), + ('task', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='jobs', to='tasks.importidentitiestask')), + ], + options={ + 'abstract': False, + }, + ), + migrations.CreateModel( + name='RecommendAffiliationsJob', + fields=[ + ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('created_at', grimoirelab.core.models.CreationDateTimeField(default=grimoirelab_toolkit.datetime.datetime_utcnow, editable=False)), + ('last_modified', grimoirelab.core.models.LastModificationDateTimeField(default=grimoirelab_toolkit.datetime.datetime_utcnow, editable=False)), + ('uuid', models.CharField(max_length=191, unique=True)), + ('job_num', models.PositiveIntegerField()), + ('job_args', models.JSONField(default=None, encoder=grimoirelab.core.scheduler.models.JobArgsEncoder, null=True)), + ('status', models.IntegerField(choices=[(1, 'new'), (2, 'enqueued'), (3, 'running'), (4, 'completed'), (5, 'failed'), (6, 'recovery'), (7, 'canceled')], default=2)), + ('progress', models.JSONField(default=None, encoder=grimoirelab.core.scheduler.models.JobResultEncoder, null=True)), + ('logs', models.JSONField(default=None, null=True)), + ('queue', models.CharField(default=None, max_length=128, null=True)), + ('scheduled_at', models.DateTimeField(default=None, null=True)), + ('started_at', models.DateTimeField(default=None, null=True)), + ('finished_at', models.DateTimeField(default=None, null=True)), + ('task', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='jobs', to='tasks.recommendaffiliationstask')), + ], + options={ + 'abstract': False, + }, + ), + migrations.CreateModel( + name='RecommendGenderJob', + fields=[ + ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('created_at', grimoirelab.core.models.CreationDateTimeField(default=grimoirelab_toolkit.datetime.datetime_utcnow, editable=False)), + ('last_modified', grimoirelab.core.models.LastModificationDateTimeField(default=grimoirelab_toolkit.datetime.datetime_utcnow, editable=False)), + ('uuid', models.CharField(max_length=191, unique=True)), + ('job_num', models.PositiveIntegerField()), + ('job_args', models.JSONField(default=None, encoder=grimoirelab.core.scheduler.models.JobArgsEncoder, null=True)), + ('status', models.IntegerField(choices=[(1, 'new'), (2, 'enqueued'), (3, 'running'), (4, 'completed'), (5, 'failed'), (6, 'recovery'), (7, 'canceled')], default=2)), + ('progress', models.JSONField(default=None, encoder=grimoirelab.core.scheduler.models.JobResultEncoder, null=True)), + ('logs', models.JSONField(default=None, null=True)), + ('queue', models.CharField(default=None, max_length=128, null=True)), + ('scheduled_at', models.DateTimeField(default=None, null=True)), + ('started_at', models.DateTimeField(default=None, null=True)), + ('finished_at', models.DateTimeField(default=None, null=True)), + ('task', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='jobs', to='tasks.recommendgendertask')), + ], + options={ + 'abstract': False, + }, + ), + migrations.CreateModel( + name='RecommendMatchesJob', + fields=[ + ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('created_at', grimoirelab.core.models.CreationDateTimeField(default=grimoirelab_toolkit.datetime.datetime_utcnow, editable=False)), + ('last_modified', grimoirelab.core.models.LastModificationDateTimeField(default=grimoirelab_toolkit.datetime.datetime_utcnow, editable=False)), + ('uuid', models.CharField(max_length=191, unique=True)), + ('job_num', models.PositiveIntegerField()), + ('job_args', models.JSONField(default=None, encoder=grimoirelab.core.scheduler.models.JobArgsEncoder, null=True)), + ('status', models.IntegerField(choices=[(1, 'new'), (2, 'enqueued'), (3, 'running'), (4, 'completed'), (5, 'failed'), (6, 'recovery'), (7, 'canceled')], default=2)), + ('progress', models.JSONField(default=None, encoder=grimoirelab.core.scheduler.models.JobResultEncoder, null=True)), + ('logs', models.JSONField(default=None, null=True)), + ('queue', models.CharField(default=None, max_length=128, null=True)), + ('scheduled_at', models.DateTimeField(default=None, null=True)), + ('started_at', models.DateTimeField(default=None, null=True)), + ('finished_at', models.DateTimeField(default=None, null=True)), + ('task', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='jobs', to='tasks.recommendmatchestask')), + ], + options={ + 'abstract': False, + }, + ), + migrations.CreateModel( + name='UnifyJob', + fields=[ + ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('created_at', grimoirelab.core.models.CreationDateTimeField(default=grimoirelab_toolkit.datetime.datetime_utcnow, editable=False)), + ('last_modified', grimoirelab.core.models.LastModificationDateTimeField(default=grimoirelab_toolkit.datetime.datetime_utcnow, editable=False)), + ('uuid', models.CharField(max_length=191, unique=True)), + ('job_num', models.PositiveIntegerField()), + ('job_args', models.JSONField(default=None, encoder=grimoirelab.core.scheduler.models.JobArgsEncoder, null=True)), + ('status', models.IntegerField(choices=[(1, 'new'), (2, 'enqueued'), (3, 'running'), (4, 'completed'), (5, 'failed'), (6, 'recovery'), (7, 'canceled')], default=2)), + ('progress', models.JSONField(default=None, encoder=grimoirelab.core.scheduler.models.JobResultEncoder, null=True)), + ('logs', models.JSONField(default=None, null=True)), + ('queue', models.CharField(default=None, max_length=128, null=True)), + ('scheduled_at', models.DateTimeField(default=None, null=True)), + ('started_at', models.DateTimeField(default=None, null=True)), + ('finished_at', models.DateTimeField(default=None, null=True)), + ('task', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='jobs', to='tasks.unifytask')), + ], + options={ + 'abstract': False, + }, + ), + ] diff --git a/src/grimoirelab/core/scheduler/tasks/models.py b/src/grimoirelab/core/scheduler/tasks/models.py index d0e2fd4..0f76cae 100644 --- a/src/grimoirelab/core/scheduler/tasks/models.py +++ b/src/grimoirelab/core/scheduler/tasks/models.py @@ -21,7 +21,21 @@ import typing from django.conf import settings -from django.db.models import CharField +from django.contrib.auth import get_user_model +from django.db.models import CharField, JSONField, BooleanField, DateTimeField + +from sortinghat.core.context import SortingHatContext +from sortinghat.core.importer.backend import find_import_identities_backends +from sortinghat.core.jobs import ( + recommend_affiliations, + recommend_matches, + recommend_gender, + affiliate, + unify, + genderize, + import_identities, +) +from sortinghat.core.models import MIN_PERIOD_DATE from ...scheduler.models import ( SchedulerStatus, @@ -176,4 +190,341 @@ def on_failure_callback(*args, **kwargs): return _on_failure_callback(*args, **kwargs) +class BaseIdentitiesTask(Task): + """Base class for Identities tasks and jobs.""" + + # This attribute indicates the additional arguments used to run the task + # to simplify methods like create_task, prepare_job_parameters and serializers. + EXTRA_TASK_FIELDS = [] + + @classmethod + def create_task( + cls, + task_args: dict[str, Any], + job_interval: int, + job_max_retries: int, + burst: bool = False, + *args, + **kwargs, + ) -> Self: + """Create a new Identities task. + + This method will create a new Identities task. Besides the + common arguments to create a task, this method requires + additional arguments specific to the Identities tasks that + could be defined in the `EXTRA_TASK_FIELDS` attribute. + """ + task = super().create_task( + task_args, job_interval, job_max_retries, burst=burst, *args, **kwargs + ) + for field in cls.EXTRA_TASK_FIELDS: + if field in kwargs: + setattr(task, field, kwargs[field]) + task.save() + + return task + + def prepare_job_parameters(self): + """Generate the parameters for a new job. + + By default, this method will generate the parameters + for a new job based on the original parameters set for the task. + If the task has additional fields defined in the + `EXTRA_TASK_FIELDS` attribute, those fields will be added + to the job arguments. + """ + system_user = get_user_model().objects.get(username=settings.SYSTEM_BOT_USER) + ctx = SortingHatContext(user=system_user, job_id=None, tenant=None) + + job_args = { + "ctx": ctx, + } + for field in self.EXTRA_TASK_FIELDS: + job_args[field] = getattr(self, field) + + return job_args + + def can_be_retried(self): + return True + + @property + def default_job_queue(self): + return settings.GRIMOIRELAB_Q_SORTINGHAT_JOBS + + @staticmethod + def on_success_callback(*args, **kwargs): + return _on_success_callback(*args, **kwargs) + + @staticmethod + def on_failure_callback(*args, **kwargs): + return _on_failure_callback(*args, **kwargs) + + class Meta: + abstract = True + + +class RecommendAffiliationsTask(BaseIdentitiesTask): + """Task to generate a list of affiliation recommendations from individuals. + + This task generates a list of recommendations which include the + organizations where individuals can be affiliated. + This job returns a dictionary with which individuals are recommended to be + affiliated to which organization. + """ + + TASK_TYPE = "recommend_affiliations" + EXTRA_TASK_FIELDS = ["uuids", "last_modified"] + + uuids = JSONField(null=True, default=None) + last_modified = DateTimeField(default=MIN_PERIOD_DATE) + + @staticmethod + def job_function(*args, **kwargs): + ctx = kwargs.get("ctx") + if ctx and not isinstance(ctx, SortingHatContext): + ctx[0] = get_user_model().objects.get(username=ctx[0]) + kwargs["ctx"] = SortingHatContext(*ctx) + + return recommend_affiliations(*args, **kwargs) + + +class RecommendMatchesTask(BaseIdentitiesTask): + """Task to generate affiliation recommendations. + + This task generates a list of recommendations which include the + matching identities from the individuals which can be merged with. + This task returns a dictionary with which individuals are recommended to be + merged to which individual (or which identities if `verbose` mode is activated). + """ + + TASK_TYPE = "recommend_matches" + EXTRA_TASK_FIELDS = [ + "criteria", + "source_uuids", + "target_uuids", + "exclude", + "strict", + "match_source", + "guess_github_user", + "last_modified", + ] + + criteria = JSONField(null=True, default=None) + source_uuids = JSONField(null=True, default=None) + target_uuids = JSONField(null=True, default=None) + exclude = BooleanField(default=True) + strict = BooleanField(default=True) + match_source = BooleanField(default=False) + guess_github_user = BooleanField(default=False) + last_modified = DateTimeField(default=MIN_PERIOD_DATE) + + @staticmethod + def job_function(*args, **kwargs): + ctx = kwargs.get("ctx") + if ctx and not isinstance(ctx, SortingHatContext): + ctx[0] = get_user_model().objects.get(username=ctx[0]) + kwargs["ctx"] = SortingHatContext(*ctx) + + return recommend_matches(*args, **kwargs) + + +class RecommendGenderTask(BaseIdentitiesTask): + """Task to generate a list of gender recommendations. + + This task generates a list of recommendations with the + probable gender of the given individuals. + """ + + TASK_TYPE = "recommend_gender" + EXTRA_TASK_FIELDS = ["uuids", "exclude", "no_strict_matching"] + + uuids = JSONField(null=True, default=None) + exclude = BooleanField(default=True) + no_strict_matching = BooleanField(default=False) + + @staticmethod + def job_function(*args, **kwargs): + ctx = kwargs.get("ctx") + if ctx and not isinstance(ctx, SortingHatContext): + ctx[0] = get_user_model().objects.get(username=ctx[0]) + kwargs["ctx"] = SortingHatContext(*ctx) + + return recommend_gender(*args, **kwargs) + + +class AffiliateTask(BaseIdentitiesTask): + """Task to Affiliates identities in SortingHat. + + This task automates the affiliation process obtaining + a list of recommendations where individuals can be + affiliated. After that, individuals are enrolled to them. + The job returns a dictionary with which individuals were + enrolled and the errors generated during this process. + """ + + TASK_TYPE = "affiliate" + EXTRA_TASK_FIELDS = ["uuids", "last_modified"] + + uuids = JSONField(null=True, default=None) + last_modified = DateTimeField(default=MIN_PERIOD_DATE) + + @staticmethod + def job_function(*args, **kwargs): + ctx = kwargs.get("ctx") + if ctx and not isinstance(ctx, SortingHatContext): + ctx[0] = get_user_model().objects.get(username=ctx[0]) + kwargs["ctx"] = SortingHatContext(*ctx) + + return affiliate(*args, **kwargs) + + +class UnifyTask(BaseIdentitiesTask): + """Task to Unify identities in SortingHat. + + This task automates the identities unify process obtaining + a list of recommendations where matching individuals can be merged. + After that, matching individuals are merged. + This job returns a list with the individuals which have been merged + and the errors generated during this process. + """ + + TASK_TYPE = "unify" + EXTRA_TASK_FIELDS = [ + "criteria", + "source_uuids", + "target_uuids", + "exclude", + "strict", + "match_source", + "guess_github_user", + "last_modified", + ] + + criteria = JSONField(null=True, default=None) + source_uuids = JSONField(null=True, default=None) + target_uuids = JSONField(null=True, default=None) + exclude = BooleanField(default=True) + strict = BooleanField(default=True) + match_source = BooleanField(default=False) + guess_github_user = BooleanField(default=False) + last_modified = DateTimeField(default=MIN_PERIOD_DATE) + + @staticmethod + def job_function(*args, **kwargs): + ctx = kwargs.get("ctx") + if ctx and not isinstance(ctx, SortingHatContext): + ctx[0] = get_user_model().objects.get(username=ctx[0]) + kwargs["ctx"] = SortingHatContext(*ctx) + + return unify(*args, **kwargs) + + +class GenderizeTask(BaseIdentitiesTask): + """Task to assign a gender to a set of individuals. + + This task autocompletes the gender information (stored in + the profile) of unique identities after obtaining a list + of recommendations for their gender based on their name. + """ + + TASK_TYPE = "genderize" + EXTRA_TASK_FIELDS = ["uuids", "exclude", "no_strict_matching"] + + uuids = JSONField(null=True, default=None) + exclude = BooleanField(default=True) + no_strict_matching = BooleanField(default=False) + + @staticmethod + def job_function(*args, **kwargs): + ctx = kwargs.get("ctx") + if ctx and not isinstance(ctx, SortingHatContext): + ctx[0] = get_user_model().objects.get(username=ctx[0]) + kwargs["ctx"] = SortingHatContext(*ctx) + + return genderize(*args, **kwargs) + + +class ImportIdentitiesTask(BaseIdentitiesTask): + """Task to Import identities in SortingHat.""" + + TASK_TYPE = "import_identities" + EXTRA_TASK_FIELDS = ["backend_name", "url"] + + backend_name = CharField(max_length=255) + url = CharField(max_length=2048) + + @classmethod + def create_task( + cls, + task_args: dict[str, Any], + job_interval: int, + job_max_retries: int, + backend_name: str, + url: str, + burst: bool = False, + *args, + **kwargs, + ) -> Self: + """Create a new task to import identities to SortingHat. + + This task imports identities to SortingHat using the + data obtained from the URL using the specified backend. + """ + task = super().create_task( + task_args, job_interval, job_max_retries, burst=burst, *args, **kwargs + ) + task.backend_name = backend_name + task.url = url + task.save() + + return task + + @staticmethod + def job_function(*args, **kwargs): + ctx = kwargs.get("ctx") + if ctx and not isinstance(ctx, SortingHatContext): + ctx[0] = get_user_model().objects.get(username=ctx[0]) + kwargs["ctx"] = SortingHatContext(*ctx) + + return import_identities(*args, **kwargs) + + def prepare_job_parameters(self): + """Generate the parameters for a new job. + + This method generates the parameters required to + execute the job associated with this task. If the task + has been executed before, it includes the timestamp of + the last modification to only import new or updated + identities. + """ + + system_user = get_user_model().objects.get(username=settings.SYSTEM_BOT_USER) + ctx = SortingHatContext(user=system_user, job_id=None, tenant=None) + + job_args = { + "ctx": ctx, + "backend_name": self.backend_name, + "url": self.url, + **self.task_args, + } + backends = find_import_identities_backends() + try: + backend_args = backends[self.backend_name]["args"] + except KeyError: + backend_args = {} + if "from_date" in backend_args: + last_job = self.jobs.order_by("job_num").filter(status=SchedulerStatus.COMPLETED).last() + if last_job and last_job.started_at: + job_args["from_date"] = last_job.started_at.isoformat() + + return job_args + + register_task_model(EventizerTask.TASK_TYPE, EventizerTask) +register_task_model(AffiliateTask.TASK_TYPE, AffiliateTask) +register_task_model(UnifyTask.TASK_TYPE, UnifyTask) +register_task_model(GenderizeTask.TASK_TYPE, GenderizeTask) +register_task_model(RecommendAffiliationsTask.TASK_TYPE, RecommendAffiliationsTask) +register_task_model(RecommendMatchesTask.TASK_TYPE, RecommendMatchesTask) +register_task_model(RecommendGenderTask.TASK_TYPE, RecommendGenderTask) +register_task_model(ImportIdentitiesTask.TASK_TYPE, ImportIdentitiesTask) diff --git a/src/grimoirelab/core/scheduler/urls.py b/src/grimoirelab/core/scheduler/urls.py index c6358d1..7cb84fd 100644 --- a/src/grimoirelab/core/scheduler/urls.py +++ b/src/grimoirelab/core/scheduler/urls.py @@ -16,19 +16,28 @@ # along with this program. If not, see . # -from django.urls import path, re_path +from django.urls import path from . import api -from . import views -urlpatterns = [ - re_path(r"^add_task", views.add_task), - re_path(r"^reschedule_task", views.reschedule_task), - re_path(r"^cancel_task", views.cancel_task), - path("tasks/", api.EventizerTaskList.as_view()), - path("tasks//", api.EventizerTaskDetail.as_view()), - path("tasks//jobs/", api.EventizerJobList.as_view()), - path("tasks//jobs//", api.EventizerJobDetail.as_view()), - path("tasks//jobs//logs/", api.EventizerJobLogs.as_view()), +tasks_urlpatterns = [ + path("", api.ListTaskTypes.as_view(), name="task-types"), + path("/", api.ListCreateTasks.as_view(), name="tasks"), + path("//", api.RetrieveDestroyTask.as_view(), name="task-detail"), + path( + "//reschedule/", + api.RescheduleTask.as_view(), + name="task-reschedule", + ), + path("//cancel/", api.CancelTask.as_view(), name="task-cancel"), + path("//jobs/", api.ListJobs.as_view(), name="jobs"), + path( + "//jobs//", api.JobDetail.as_view(), name="job-detail" + ), + path( + "//jobs//logs/", + api.JobLogs.as_view(), + name="job-logs", + ), ] diff --git a/src/grimoirelab/core/scheduler/views.py b/src/grimoirelab/core/scheduler/views.py deleted file mode 100644 index 1859599..0000000 --- a/src/grimoirelab/core/scheduler/views.py +++ /dev/null @@ -1,118 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Copyright (C) GrimoireLab Contributors -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . -# - -from rest_framework.decorators import api_view -from rest_framework.response import Response -from django.conf import settings - -from .scheduler import ( - cancel_task as scheduler_cancel_task, - schedule_task, - reschedule_task as scheduler_reschedule_task, -) - - -@api_view(["POST"]) -def add_task(request): - """Create a Task to fetch items - - The body should contain a JSON similar to: - { - 'type': 'eventizer', - 'task_args': { - 'datasource_type': 'git', - 'datasource_category': 'commit', - 'backend_args': { - 'uri': 'https://github.com/chaoss/grimoirelab.git' - } - }, - 'scheduler': { - 'job_interval': 86400, - 'job_max_retries': 3 - } - } - """ - data = request.data - - task_type = data["type"] - - job_interval = settings.GRIMOIRELAB_JOB_INTERVAL - job_max_retries = settings.GRIMOIRELAB_JOB_MAX_RETRIES - - if "scheduler" in data: - job_interval = data["scheduler"].get("job_interval", job_interval) - job_max_retries = data["scheduler"].get("job_max_retries", job_max_retries) - - task_args = data["task_args"]["backend_args"] - - task = schedule_task( - task_type, - task_args, - datasource_type=data["task_args"]["datasource_type"], - datasource_category=data["task_args"]["datasource_category"], - job_interval=job_interval, - job_max_retries=job_max_retries, - ) - - response = { - "status": "ok", - "message": f"Task {task.id} added correctly", - } - return Response(response, status=200) - - -@api_view(["POST"]) -def reschedule_task(request): - """Reschedule a Task - - The body should contain the task id to reschedule: - { - 'taskId': 'task_id' - } - """ - data = request.data - task_id = data["taskId"] - - scheduler_reschedule_task(task_id) - - response = { - "status": "ok", - "message": f"Task {task_id} rescheduled correctly", - } - return Response(response, status=200) - - -@api_view(["POST"]) -def cancel_task(request): - """Cancel a Task - - The body should contain the task id to cancel: - { - 'taskId': 'task_id' - } - """ - data = request.data - task_id = data["taskId"] - - scheduler_cancel_task(task_id) - - response = { - "status": "ok", - "message": f"Task {task_id} canceled correctly", - } - return Response(response, status=200) diff --git a/tests/unit/scheduler/test_api.py b/tests/unit/scheduler/test_api.py new file mode 100644 index 0000000..c9e669f --- /dev/null +++ b/tests/unit/scheduler/test_api.py @@ -0,0 +1,1119 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) GrimoireLab Contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# + +from unittest.mock import patch + +from django.conf import settings +from django.contrib.auth import get_user_model +from django.urls import reverse +from rest_framework import status +from rest_framework.test import APITestCase + +from grimoirelab.core.scheduler.models import ( + SchedulerStatus, + get_all_registered_task_names, +) +from grimoirelab.core.scheduler.tasks.models import ( + EventizerTask, + AffiliateTask, + UnifyTask, + GenderizeTask, + ImportIdentitiesTask, + RecommendAffiliationsTask, + RecommendGenderTask, + RecommendMatchesTask, +) + + +class ListTaskTypesApiTest(APITestCase): + """Unit tests for the List Task Types API""" + + def setUp(self): + user = get_user_model().objects.create(username="test", is_superuser=True) + self.client.force_authenticate(user=user) + + def test_list_task_types(self): + """Test that it returns a list of available task types""" + + url = reverse("task-types") + response = self.client.get(url) + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertIsInstance(response.data, list) + names = get_all_registered_task_names() + for name in names: + self.assertIn(name, response.data) + + def test_unauthenticated_request(self): + """Test that it returns an error if no credentials were provided""" + + self.client.force_authenticate(user=None) + + url = reverse("task-types") + response = self.client.get(url) + self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) + self.assertEqual(response.data["detail"], "Authentication credentials were not provided.") + + +class CreateTasksApiTest(APITestCase): + """Unit tests for the Create Tasks API""" + + def setUp(self): + user = get_user_model().objects.create(username="test", is_superuser=True) + get_user_model().objects.get_or_create(username=settings.SYSTEM_BOT_USER) + self.client.force_authenticate(user=user) + + def test_create_eventizer_task(self): + """Test creating an eventizer task""" + + url = reverse("tasks", kwargs={"task_type": "eventizer"}) + data = { + "datasource_type": "git", + "datasource_category": "commit", + "task_args": {"uri": "https://github.com/example/repo.git"}, + "job_interval": 3600, + "job_max_retries": 5, + "burst": True, + } + response = self.client.post(url, data, format="json") + self.assertEqual(response.status_code, status.HTTP_201_CREATED) + self.assertEqual(response.data["status"], SchedulerStatus.ENQUEUED.label) + self.assertEqual(response.data["datasource_type"], "git") + self.assertEqual(response.data["datasource_category"], "commit") + self.assertEqual(response.data["task_args"], {"uri": "https://github.com/example/repo.git"}) + self.assertEqual(response.data["job_interval"], 3600) + self.assertEqual(response.data["job_max_retries"], 5) + self.assertEqual(response.data["burst"], True) + + def test_create_affiliate_task(self): + """Test creating an affiliate task""" + + url = reverse("tasks", kwargs={"task_type": "affiliate"}) + data = { + "task_args": {}, + "job_interval": 3600, + "job_max_retries": 5, + "burst": True, + "uuids": ["uuid1", "uuid2"], + # last_modified is optional, defaulted in model + } + response = self.client.post(url, data, format="json") + self.assertEqual(response.status_code, status.HTTP_201_CREATED) + self.assertEqual(response.data["status"], SchedulerStatus.ENQUEUED.label) + self.assertEqual(response.data["task_args"], {}) + self.assertEqual(response.data["job_interval"], 3600) + self.assertEqual(response.data["job_max_retries"], 5) + self.assertEqual(response.data["burst"], True) + self.assertEqual(response.data["uuids"], ["uuid1", "uuid2"]) + self.assertEqual(response.data["last_modified"], "1900-01-01T00:00:00Z") + + def test_create_unify_task(self): + """Test creating a unify task""" + + url = reverse("tasks", kwargs={"task_type": "unify"}) + data = { + "task_args": {}, + "job_interval": 3600, + "job_max_retries": 5, + "burst": True, + "criteria": {"email": True}, + "source_uuids": ["uuid1"], + "target_uuids": ["uuid2"], + "exclude": False, + "strict": True, + "match_source": False, + "guess_github_user": False, + } + response = self.client.post(url, data, format="json") + self.assertEqual(response.status_code, status.HTTP_201_CREATED) + self.assertEqual(response.data["status"], SchedulerStatus.ENQUEUED.label) + self.assertEqual(response.data["task_args"], {}) + self.assertEqual(response.data["criteria"], {"email": True}) + self.assertEqual(response.data["source_uuids"], ["uuid1"]) + self.assertEqual(response.data["target_uuids"], ["uuid2"]) + self.assertEqual(response.data["exclude"], False) + self.assertEqual(response.data["strict"], True) + self.assertEqual(response.data["match_source"], False) + self.assertEqual(response.data["guess_github_user"], False) + + def test_create_genderize_task(self): + """Test creating a genderize task""" + + url = reverse("tasks", kwargs={"task_type": "genderize"}) + data = { + "task_args": {}, + "job_interval": 3600, + "job_max_retries": 5, + "burst": True, + "uuids": ["uuid1", "uuid2"], + "exclude": False, + "no_strict_matching": True, + } + response = self.client.post(url, data, format="json") + self.assertEqual(response.status_code, status.HTTP_201_CREATED) + self.assertEqual(response.data["status"], SchedulerStatus.ENQUEUED.label) + self.assertEqual(response.data["task_args"], {}) + self.assertEqual(response.data["uuids"], ["uuid1", "uuid2"]) + self.assertEqual(response.data["exclude"], False) + self.assertEqual(response.data["no_strict_matching"], True) + + def test_create_import_identities_task(self): + """Test creating an import identities task""" + + url = reverse("tasks", kwargs={"task_type": "import_identities"}) + data = { + "task_args": {}, + "job_interval": 3600, + "job_max_retries": 5, + "burst": True, + "backend_name": "test_backend", + "url": "https://example.com/identities.json", + } + response = self.client.post(url, data, format="json") + self.assertEqual(response.status_code, status.HTTP_201_CREATED) + self.assertEqual(response.data["status"], SchedulerStatus.ENQUEUED.label) + self.assertEqual(response.data["task_args"], {}) + self.assertEqual(response.data["backend_name"], "test_backend") + self.assertEqual(response.data["url"], "https://example.com/identities.json") + + def test_create_recommend_affiliations_task(self): + """Test creating a recommend affiliations task""" + + url = reverse("tasks", kwargs={"task_type": "recommend_affiliations"}) + data = { + "task_args": {}, + "job_interval": 3600, + "job_max_retries": 5, + "burst": True, + "uuids": ["uuid1", "uuid2"], + } + response = self.client.post(url, data, format="json") + self.assertEqual(response.status_code, status.HTTP_201_CREATED) + self.assertEqual(response.data["status"], SchedulerStatus.ENQUEUED.label) + self.assertEqual(response.data["task_args"], {}) + self.assertEqual(response.data["uuids"], ["uuid1", "uuid2"]) + + def test_create_recommend_gender_task(self): + """Test creating a recommend gender task""" + + url = reverse("tasks", kwargs={"task_type": "recommend_gender"}) + data = { + "task_args": {}, + "job_interval": 3600, + "job_max_retries": 5, + "burst": True, + "uuids": ["uuid1", "uuid2"], + "exclude": False, + "no_strict_matching": True, + } + response = self.client.post(url, data, format="json") + self.assertEqual(response.status_code, status.HTTP_201_CREATED) + self.assertEqual(response.data["status"], SchedulerStatus.ENQUEUED.label) + self.assertEqual(response.data["task_args"], {}) + self.assertEqual(response.data["uuids"], ["uuid1", "uuid2"]) + self.assertEqual(response.data["exclude"], False) + self.assertEqual(response.data["no_strict_matching"], True) + + def test_create_recommend_matches_task(self): + """Test creating a recommend matches task""" + + url = reverse("tasks", kwargs={"task_type": "recommend_matches"}) + data = { + "task_args": {}, + "job_interval": 3600, + "job_max_retries": 5, + "burst": True, + "criteria": {"email": True}, + "source_uuids": ["uuid1"], + "exclude": False, + "strict": True, + "match_source": False, + "guess_github_user": False, + } + response = self.client.post(url, data, format="json") + self.assertEqual(response.status_code, status.HTTP_201_CREATED) + self.assertEqual(response.data["status"], SchedulerStatus.ENQUEUED.label) + self.assertEqual(response.data["task_args"], {}) + self.assertEqual(response.data["criteria"], {"email": True}) + self.assertEqual(response.data["source_uuids"], ["uuid1"]) + self.assertEqual(response.data["target_uuids"], None) + self.assertEqual(response.data["exclude"], False) + self.assertEqual(response.data["strict"], True) + self.assertEqual(response.data["match_source"], False) + self.assertEqual(response.data["guess_github_user"], False) + + def test_create_task_invalid_type(self): + """Test creating a task with an invalid task type""" + + url = reverse("tasks", kwargs={"task_type": "invalid-type"}) + data = { + "task_args": {}, + } + response = self.client.post(url, data, format="json") + self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) + self.assertIn("Unknown task type", str(response.data)) + + def test_create_task_missing_required_fields(self): + """Test creating a task with missing required fields""" + + url = reverse("tasks", kwargs={"task_type": "eventizer"}) + data = { + "datasource_type": "git", + # Missing datasource_category and task_args + } + response = self.client.post(url, data, format="json") + self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) + + +class ListTasksAPITestCase(APITestCase): + """Test case for listing tasks via the API""" + + def setUp(self): + user = get_user_model().objects.create(username="test", is_superuser=True) + get_user_model().objects.get_or_create(username=settings.SYSTEM_BOT_USER) + self.client.force_authenticate(user=user) + + def test_list_eventizer_tasks(self): + """Test that it returns a list of eventizer tasks""" + + # Create some test tasks + task1 = EventizerTask.create_task( + task_args={"uri": "https://github.com/example/repo1.git"}, + job_interval=3600, + job_max_retries=3, + datasource_type="git", + datasource_category="commit", + ) + task2 = EventizerTask.create_task( + task_args={"uri": "https://github.com/example/repo2"}, + job_interval=3600, + job_max_retries=3, + datasource_type="github", + datasource_category="issue", + ) + + url = reverse("tasks", kwargs={"task_type": "eventizer"}) + response = self.client.get(url) + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.data["count"], 2) + self.assertEqual(len(response.data["results"]), 2) + + task_data = response.data["results"][0] + self.assertEqual(task1.uuid, task_data["uuid"]) + self.assertEqual(task1.datasource_type, task_data["datasource_type"]) + self.assertEqual(task1.datasource_category, task_data["datasource_category"]) + self.assertEqual(task1.task_args, task_data["task_args"]) + self.assertEqual(task1.job_interval, task_data["job_interval"]) + self.assertEqual(task1.job_max_retries, task_data["job_max_retries"]) + + task_data = response.data["results"][1] + self.assertEqual(task2.uuid, task_data["uuid"]) + self.assertEqual(task2.datasource_type, task_data["datasource_type"]) + self.assertEqual(task2.datasource_category, task_data["datasource_category"]) + self.assertEqual(task2.task_args, task_data["task_args"]) + self.assertEqual(task2.job_interval, task_data["job_interval"]) + self.assertEqual(task2.job_max_retries, task_data["job_max_retries"]) + + def test_list_affiliate_tasks(self): + """Test that it returns a list of affiliate tasks""" + + task1 = AffiliateTask.create_task( + task_args={}, + job_interval=3600, + job_max_retries=3, + uuids=["uuid1", "uuid2"], + ) + task2 = AffiliateTask.create_task( + task_args={}, + job_interval=7200, + job_max_retries=2, + uuids=["uuid3"], + ) + + url = reverse("tasks", kwargs={"task_type": "affiliate"}) + response = self.client.get(url) + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.data["count"], 2) + self.assertEqual(len(response.data["results"]), 2) + + task_data = response.data["results"][0] + self.assertEqual(task1.uuid, task_data["uuid"]) + self.assertEqual(task1.uuids, task_data["uuids"]) + self.assertEqual(task1.task_args, task_data["task_args"]) + self.assertEqual(task1.job_interval, task_data["job_interval"]) + self.assertEqual(task1.job_max_retries, task_data["job_max_retries"]) + + task_data = response.data["results"][1] + self.assertEqual(task2.uuid, task_data["uuid"]) + self.assertEqual(task2.uuids, task_data["uuids"]) + self.assertEqual(task2.task_args, task_data["task_args"]) + self.assertEqual(task2.job_interval, task_data["job_interval"]) + self.assertEqual(task2.job_max_retries, task_data["job_max_retries"]) + + def test_list_unify_tasks(self): + """Test that it returns a list of unify tasks""" + + task1 = UnifyTask.create_task( + task_args={}, + job_interval=3600, + job_max_retries=3, + criteria={"email": True}, + source_uuids=["uuid1"], + target_uuids=["uuid2"], + exclude=False, + strict=True, + match_source=False, + guess_github_user=False, + ) + task2 = UnifyTask.create_task( + task_args={}, + job_interval=7200, + job_max_retries=2, + criteria={"email": False}, + source_uuids=["uuid3"], + target_uuids=["uuid4"], + exclude=True, + strict=False, + match_source=True, + guess_github_user=True, + ) + + url = reverse("tasks", kwargs={"task_type": "unify"}) + response = self.client.get(url) + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.data["count"], 2) + self.assertEqual(len(response.data["results"]), 2) + + task_data = response.data["results"][0] + self.assertEqual(task1.uuid, task_data["uuid"]) + self.assertEqual(task1.criteria, task_data["criteria"]) + self.assertEqual(task1.source_uuids, task_data["source_uuids"]) + self.assertEqual(task1.target_uuids, task_data["target_uuids"]) + self.assertEqual(task1.exclude, task_data["exclude"]) + self.assertEqual(task1.strict, task_data["strict"]) + self.assertEqual(task1.match_source, task_data["match_source"]) + self.assertEqual(task1.guess_github_user, task_data["guess_github_user"]) + self.assertEqual(task1.task_args, task_data["task_args"]) + self.assertEqual(task1.job_interval, task_data["job_interval"]) + self.assertEqual(task1.job_max_retries, task_data["job_max_retries"]) + + task_data = response.data["results"][1] + self.assertEqual(task2.uuid, task_data["uuid"]) + self.assertEqual(task2.criteria, task_data["criteria"]) + self.assertEqual(task2.source_uuids, task_data["source_uuids"]) + self.assertEqual(task2.target_uuids, task_data["target_uuids"]) + self.assertEqual(task2.exclude, task_data["exclude"]) + self.assertEqual(task2.strict, task_data["strict"]) + self.assertEqual(task2.match_source, task_data["match_source"]) + self.assertEqual(task2.guess_github_user, task_data["guess_github_user"]) + self.assertEqual(task2.task_args, task_data["task_args"]) + self.assertEqual(task2.job_interval, task_data["job_interval"]) + self.assertEqual(task2.job_max_retries, task_data["job_max_retries"]) + + def test_list_genderize_tasks(self): + """Test that it returns a list of genderize tasks""" + + task1 = GenderizeTask.create_task( + task_args={}, + job_interval=3600, + job_max_retries=3, + uuids=["uuid1", "uuid2"], + exclude=False, + no_strict_matching=True, + ) + task2 = GenderizeTask.create_task( + task_args={}, + job_interval=7200, + job_max_retries=2, + uuids=["uuid3"], + exclude=True, + no_strict_matching=False, + ) + + url = reverse("tasks", kwargs={"task_type": "genderize"}) + response = self.client.get(url) + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.data["count"], 2) + self.assertEqual(len(response.data["results"]), 2) + + task_data = response.data["results"][0] + self.assertEqual(task1.uuid, task_data["uuid"]) + self.assertEqual(task1.uuids, task_data["uuids"]) + self.assertEqual(task1.exclude, task_data["exclude"]) + self.assertEqual(task1.no_strict_matching, task_data["no_strict_matching"]) + self.assertEqual(task1.task_args, task_data["task_args"]) + self.assertEqual(task1.job_interval, task_data["job_interval"]) + self.assertEqual(task1.job_max_retries, task_data["job_max_retries"]) + + task_data = response.data["results"][1] + self.assertEqual(task2.uuid, task_data["uuid"]) + self.assertEqual(task2.uuids, task_data["uuids"]) + self.assertEqual(task2.exclude, task_data["exclude"]) + self.assertEqual(task2.no_strict_matching, task_data["no_strict_matching"]) + self.assertEqual(task2.task_args, task_data["task_args"]) + self.assertEqual(task2.job_interval, task_data["job_interval"]) + self.assertEqual(task2.job_max_retries, task_data["job_max_retries"]) + + def test_list_import_identities_tasks(self): + """Test that it returns a list of import identities tasks""" + + task1 = ImportIdentitiesTask.create_task( + task_args={}, + job_interval=3600, + job_max_retries=3, + backend_name="test_backend1", + url="https://example.com/identities1.json", + ) + task2 = ImportIdentitiesTask.create_task( + task_args={}, + job_interval=7200, + job_max_retries=2, + backend_name="test_backend2", + url="https://example.com/identities2.json", + ) + + url = reverse("tasks", kwargs={"task_type": "import_identities"}) + response = self.client.get(url) + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.data["count"], 2) + self.assertEqual(len(response.data["results"]), 2) + + task_data = response.data["results"][0] + self.assertEqual(task1.uuid, task_data["uuid"]) + self.assertEqual(task1.backend_name, task_data["backend_name"]) + self.assertEqual(task1.url, task_data["url"]) + self.assertEqual(task1.task_args, task_data["task_args"]) + self.assertEqual(task1.job_interval, task_data["job_interval"]) + self.assertEqual(task1.job_max_retries, task_data["job_max_retries"]) + + task_data = response.data["results"][1] + self.assertEqual(task2.uuid, task_data["uuid"]) + self.assertEqual(task2.backend_name, task_data["backend_name"]) + self.assertEqual(task2.url, task_data["url"]) + self.assertEqual(task2.task_args, task_data["task_args"]) + self.assertEqual(task2.job_interval, task_data["job_interval"]) + self.assertEqual(task2.job_max_retries, task_data["job_max_retries"]) + + def test_list_recommend_affiliations_tasks(self): + """Test that it returns a list of recommend affiliations tasks""" + + task1 = RecommendAffiliationsTask.create_task( + task_args={}, + job_interval=3600, + job_max_retries=3, + uuids=["uuid1", "uuid2"], + ) + task2 = RecommendAffiliationsTask.create_task( + task_args={}, + job_interval=7200, + job_max_retries=2, + uuids=["uuid3"], + ) + + url = reverse("tasks", kwargs={"task_type": "recommend_affiliations"}) + response = self.client.get(url) + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.data["count"], 2) + self.assertEqual(len(response.data["results"]), 2) + + task_data = response.data["results"][0] + self.assertEqual(task1.uuid, task_data["uuid"]) + self.assertEqual(task1.uuids, task_data["uuids"]) + self.assertEqual(task1.task_args, task_data["task_args"]) + self.assertEqual(task1.job_interval, task_data["job_interval"]) + self.assertEqual(task1.job_max_retries, task_data["job_max_retries"]) + + task_data = response.data["results"][1] + self.assertEqual(task2.uuid, task_data["uuid"]) + self.assertEqual(task2.uuids, task_data["uuids"]) + self.assertEqual(task2.task_args, task_data["task_args"]) + self.assertEqual(task2.job_interval, task_data["job_interval"]) + self.assertEqual(task2.job_max_retries, task_data["job_max_retries"]) + + def test_list_recommend_gender_tasks(self): + """Test that it returns a list of recommend gender tasks""" + + task1 = RecommendGenderTask.create_task( + task_args={}, + job_interval=3600, + job_max_retries=3, + uuids=["uuid1", "uuid2"], + exclude=False, + no_strict_matching=True, + ) + task2 = RecommendGenderTask.create_task( + task_args={}, + job_interval=7200, + job_max_retries=2, + uuids=["uuid3"], + exclude=True, + no_strict_matching=False, + ) + + url = reverse("tasks", kwargs={"task_type": "recommend_gender"}) + response = self.client.get(url) + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.data["count"], 2) + self.assertEqual(len(response.data["results"]), 2) + + task_data = response.data["results"][0] + self.assertEqual(task1.uuid, task_data["uuid"]) + self.assertEqual(task1.uuids, task_data["uuids"]) + self.assertEqual(task1.exclude, task_data["exclude"]) + self.assertEqual(task1.no_strict_matching, task_data["no_strict_matching"]) + self.assertEqual(task1.task_args, task_data["task_args"]) + self.assertEqual(task1.job_interval, task_data["job_interval"]) + self.assertEqual(task1.job_max_retries, task_data["job_max_retries"]) + + task_data = response.data["results"][1] + self.assertEqual(task2.uuid, task_data["uuid"]) + self.assertEqual(task2.uuids, task_data["uuids"]) + self.assertEqual(task2.exclude, task_data["exclude"]) + self.assertEqual(task2.no_strict_matching, task_data["no_strict_matching"]) + self.assertEqual(task2.task_args, task_data["task_args"]) + self.assertEqual(task2.job_interval, task_data["job_interval"]) + self.assertEqual(task2.job_max_retries, task_data["job_max_retries"]) + + def test_list_recommend_matches_tasks(self): + """Test that it returns a list of recommend matches tasks""" + + task1 = RecommendMatchesTask.create_task( + task_args={}, + job_interval=3600, + job_max_retries=3, + criteria={"email": True}, + source_uuids=["uuid1"], + exclude=False, + strict=True, + match_source=False, + guess_github_user=False, + ) + task2 = RecommendMatchesTask.create_task( + task_args={}, + job_interval=7200, + job_max_retries=2, + criteria={"email": False}, + source_uuids=["uuid3"], + exclude=True, + strict=False, + match_source=True, + guess_github_user=True, + ) + + url = reverse("tasks", kwargs={"task_type": "recommend_matches"}) + response = self.client.get(url) + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.data["count"], 2) + self.assertEqual(len(response.data["results"]), 2) + + task_data = response.data["results"][0] + self.assertEqual(task1.uuid, task_data["uuid"]) + self.assertEqual(task1.criteria, task_data["criteria"]) + self.assertEqual(task1.source_uuids, task_data["source_uuids"]) + self.assertEqual(task1.exclude, task_data["exclude"]) + self.assertEqual(task1.strict, task_data["strict"]) + self.assertEqual(task1.match_source, task_data["match_source"]) + self.assertEqual(task1.guess_github_user, task_data["guess_github_user"]) + self.assertEqual(task1.task_args, task_data["task_args"]) + self.assertEqual(task1.job_interval, task_data["job_interval"]) + self.assertEqual(task1.job_max_retries, task_data["job_max_retries"]) + + task_data = response.data["results"][1] + self.assertEqual(task2.uuid, task_data["uuid"]) + self.assertEqual(task2.criteria, task_data["criteria"]) + self.assertEqual(task2.source_uuids, task_data["source_uuids"]) + self.assertEqual(task2.exclude, task_data["exclude"]) + self.assertEqual(task2.strict, task_data["strict"]) + self.assertEqual(task2.match_source, task_data["match_source"]) + self.assertEqual(task2.guess_github_user, task_data["guess_github_user"]) + self.assertEqual(task2.task_args, task_data["task_args"]) + self.assertEqual(task2.job_interval, task_data["job_interval"]) + self.assertEqual(task2.job_max_retries, task_data["job_max_retries"]) + + def test_list_tasks_with_status_filter(self): + """Test filtering tasks by status""" + + task1 = EventizerTask.create_task( + task_args={"uri": "https://github.com/example/repo1.git"}, + job_interval=3600, + job_max_retries=3, + datasource_type="git", + datasource_category="commit", + ) + task1.status = SchedulerStatus.NEW + task1.save() + + task2 = EventizerTask.create_task( + task_args={"uri": "https://github.com/example/repo2.git"}, + job_interval=3600, + job_max_retries=3, + datasource_type="git", + datasource_category="commit", + ) + task2.status = SchedulerStatus.COMPLETED + task2.save() + + url = reverse("tasks", kwargs={"task_type": "eventizer"}) + response = self.client.get(url, {"status": SchedulerStatus.NEW}) + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.data["count"], 1) + self.assertEqual(response.data["results"][0]["status"], "new") + + def test_list_tasks_pagination(self): + """Test that tasks are properly paginated""" + + # Create multiple tasks + for i in range(30): + EventizerTask.create_task( + task_args={"uri": f"https://github.com/example/repo{i}.git"}, + job_interval=3600, + job_max_retries=3, + datasource_type="git", + datasource_category="commit", + ) + + url = reverse("tasks", kwargs={"task_type": "eventizer"}) + response = self.client.get(url, {"page": 2, "size": 10}) + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.data["count"], 30) + self.assertEqual(response.data["page"], 2) + self.assertEqual(response.data["total_pages"], 3) + self.assertEqual(len(response.data["results"]), 10) + + def test_list_tasks_invalid_type(self): + """Test listing tasks with invalid task type""" + + url = reverse("tasks", kwargs={"task_type": "invalid-type"}) + response = self.client.get(url) + self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) + self.assertIn("Unknown task type", str(response.data)) + + def test_unauthenticated_request(self): + """Test that it returns an error if no credentials were provided""" + + self.client.force_authenticate(user=None) + + url = reverse("tasks", kwargs={"task_type": "eventizer"}) + response = self.client.get(url) + self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) + + +class RetrieveDestroyTaskApiTest(APITestCase): + """Unit tests for the Retrieve Destroy Task API""" + + def setUp(self): + user = get_user_model().objects.create(username="test", is_superuser=True) + self.client.force_authenticate(user=user) + + def test_get_eventizer_task(self): + """Test retrieving an eventizer task""" + + task = EventizerTask.create_task( + task_args={"uri": "https://github.com/example/repo.git"}, + job_interval=3600, + job_max_retries=3, + datasource_type="git", + datasource_category="commit", + ) + + url = reverse("task-detail", kwargs={"task_type": "eventizer", "uuid": task.uuid}) + response = self.client.get(url) + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.data["uuid"], task.uuid) + self.assertEqual(response.data["datasource_type"], "git") + self.assertEqual(response.data["datasource_category"], "commit") + + def test_get_affiliate_task(self): + """Test retrieving an affiliate task""" + + task = AffiliateTask.create_task( + task_args={}, + job_interval=3600, + job_max_retries=3, + uuids=["uuid1", "uuid2"], + ) + + url = reverse("task-detail", kwargs={"task_type": "affiliate", "uuid": task.uuid}) + response = self.client.get(url) + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.data["uuid"], task.uuid) + self.assertEqual(response.data["uuids"], ["uuid1", "uuid2"]) + self.assertEqual(response.data["task_args"], {}) + self.assertEqual(response.data["job_interval"], 3600) + self.assertEqual(response.data["job_max_retries"], 3) + + def test_get_unify_task(self): + """Test retrieving a unify task""" + + task = UnifyTask.create_task( + task_args={}, + job_interval=3600, + job_max_retries=3, + criteria={"email": True}, + source_uuids=["uuid1"], + target_uuids=["uuid2"], + exclude=False, + strict=True, + match_source=False, + guess_github_user=False, + ) + + url = reverse("task-detail", kwargs={"task_type": "unify", "uuid": task.uuid}) + response = self.client.get(url) + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.data["uuid"], task.uuid) + self.assertEqual(response.data["criteria"], {"email": True}) + self.assertEqual(response.data["source_uuids"], ["uuid1"]) + self.assertEqual(response.data["target_uuids"], ["uuid2"]) + self.assertEqual(response.data["exclude"], False) + self.assertEqual(response.data["strict"], True) + self.assertEqual(response.data["match_source"], False) + self.assertEqual(response.data["guess_github_user"], False) + self.assertEqual(response.data["task_args"], {}) + self.assertEqual(response.data["job_interval"], 3600) + self.assertEqual(response.data["job_max_retries"], 3) + + def test_get_genderize_task(self): + """Test retrieving a genderize task""" + + task = GenderizeTask.create_task( + task_args={}, + job_interval=3600, + job_max_retries=3, + uuids=["uuid1", "uuid2"], + exclude=False, + no_strict_matching=True, + ) + + url = reverse("task-detail", kwargs={"task_type": "genderize", "uuid": task.uuid}) + response = self.client.get(url) + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.data["uuid"], task.uuid) + self.assertEqual(response.data["uuids"], ["uuid1", "uuid2"]) + self.assertEqual(response.data["exclude"], False) + self.assertEqual(response.data["no_strict_matching"], True) + self.assertEqual(response.data["task_args"], {}) + self.assertEqual(response.data["job_interval"], 3600) + self.assertEqual(response.data["job_max_retries"], 3) + + def test_get_import_identities_task(self): + """Test retrieving an import identities task""" + + task = ImportIdentitiesTask.create_task( + task_args={}, + job_interval=3600, + job_max_retries=3, + backend_name="test_backend", + url="https://example.com/identities.json", + ) + + url = reverse("task-detail", kwargs={"task_type": "import_identities", "uuid": task.uuid}) + response = self.client.get(url) + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.data["uuid"], task.uuid) + self.assertEqual(response.data["backend_name"], "test_backend") + self.assertEqual(response.data["url"], "https://example.com/identities.json") + self.assertEqual(response.data["task_args"], {}) + self.assertEqual(response.data["job_interval"], 3600) + self.assertEqual(response.data["job_max_retries"], 3) + + def test_get_recommend_affiliations_task(self): + """Test retrieving a recommend affiliations task""" + + task = RecommendAffiliationsTask.create_task( + task_args={}, + job_interval=3600, + job_max_retries=3, + uuids=["uuid1", "uuid2"], + ) + + url = reverse( + "task-detail", kwargs={"task_type": "recommend_affiliations", "uuid": task.uuid} + ) + response = self.client.get(url) + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.data["uuid"], task.uuid) + self.assertEqual(response.data["uuids"], ["uuid1", "uuid2"]) + self.assertEqual(response.data["task_args"], {}) + self.assertEqual(response.data["job_interval"], 3600) + self.assertEqual(response.data["job_max_retries"], 3) + + def test_get_recommend_gender_task(self): + """Test retrieving a recommend gender task""" + + task = RecommendGenderTask.create_task( + task_args={}, + job_interval=3600, + job_max_retries=3, + uuids=["uuid1", "uuid2"], + exclude=False, + no_strict_matching=True, + ) + + url = reverse("task-detail", kwargs={"task_type": "recommend_gender", "uuid": task.uuid}) + response = self.client.get(url) + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.data["uuid"], task.uuid) + self.assertEqual(response.data["uuids"], ["uuid1", "uuid2"]) + self.assertEqual(response.data["exclude"], False) + self.assertEqual(response.data["no_strict_matching"], True) + self.assertEqual(response.data["task_args"], {}) + self.assertEqual(response.data["job_interval"], 3600) + self.assertEqual(response.data["job_max_retries"], 3) + + def test_get_recommend_matches_task(self): + """Test retrieving a recommend matches task""" + + task = RecommendMatchesTask.create_task( + task_args={}, + job_interval=3600, + job_max_retries=3, + criteria={"email": True}, + source_uuids=["uuid1"], + exclude=False, + strict=True, + match_source=False, + guess_github_user=False, + ) + + url = reverse("task-detail", kwargs={"task_type": "recommend_matches", "uuid": task.uuid}) + response = self.client.get(url) + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertEqual(response.data["uuid"], task.uuid) + self.assertEqual(response.data["criteria"], {"email": True}) + self.assertEqual(response.data["source_uuids"], ["uuid1"]) + self.assertEqual(response.data["exclude"], False) + self.assertEqual(response.data["strict"], True) + self.assertEqual(response.data["match_source"], False) + self.assertEqual(response.data["guess_github_user"], False) + self.assertEqual(response.data["task_args"], {}) + self.assertEqual(response.data["job_interval"], 3600) + self.assertEqual(response.data["job_max_retries"], 3) + + def test_get_task_not_found(self): + """Test retrieving a task that doesn't exist""" + + fake_uuid = "fake_task_uuid" + url = reverse("task-detail", kwargs={"task_type": "eventizer", "uuid": fake_uuid}) + response = self.client.get(url) + self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND) + + def test_get_task_invalid_type(self): + """Test retrieving a task with invalid task type""" + + fake_uuid = "fake_task_uuid" + url = reverse("task-detail", kwargs={"task_type": "invalid-type", "uuid": fake_uuid}) + response = self.client.get(url) + self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) + self.assertIn("Unknown task type", str(response.data)) + + def test_delete_task(self): + """Test deleting a task""" + + task = EventizerTask.create_task( + task_args={"uri": "https://github.com/example/repo.git"}, + job_interval=3600, + job_max_retries=3, + datasource_type="git", + datasource_category="commit", + ) + + url = reverse("task-detail", kwargs={"task_type": "eventizer", "uuid": task.uuid}) + response = self.client.delete(url) + self.assertEqual(response.status_code, status.HTTP_204_NO_CONTENT) + + # Verify task is deleted + self.assertFalse(EventizerTask.objects.filter(uuid=task.uuid).exists()) + + def test_unauthenticated_request(self): + """Test that it returns an error if no credentials were provided""" + + self.client.force_authenticate(user=None) + + fake_uuid = "fake_task_uuid" + url = reverse("task-detail", kwargs={"task_type": "eventizer", "uuid": fake_uuid}) + response = self.client.get(url) + self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) + + +class RescheduleTaskApiTest(APITestCase): + """Unit tests for the Reschedule Task API""" + + def setUp(self): + user = get_user_model().objects.create(username="test", is_superuser=True) + self.client.force_authenticate(user=user) + + @patch("grimoirelab.core.scheduler.api.reschedule_task") + def test_reschedule_task(self, mock_reschedule_task): + """Test rescheduling a task""" + + task_uuid = "test_task_uuid" + url = reverse("task-reschedule", kwargs={"task_type": "eventizer", "uuid": task_uuid}) + + response = self.client.post(url) + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertIn("rescheduled", response.data[0]) + mock_reschedule_task.assert_called_once_with(task_uuid) + + @patch("grimoirelab.core.scheduler.api.reschedule_task") + def test_reschedule_task_not_found(self, mock_reschedule_task): + """Test rescheduling a task that doesn't exist""" + + from grimoirelab.core.scheduler.errors import NotFoundError + + mock_reschedule_task.side_effect = NotFoundError(element="Task") + + task_uuid = "test_task_uuid" + url = reverse("task-reschedule", kwargs={"task_type": "eventizer", "uuid": task_uuid}) + + response = self.client.post(url) + self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND) + self.assertIn("not found", response.data["detail"]) + + def test_unauthenticated_request(self): + """Test that it returns an error if no credentials were provided""" + + self.client.force_authenticate(user=None) + + task_uuid = "test_task_uuid" + url = reverse("task-reschedule", kwargs={"task_type": "eventizer", "uuid": task_uuid}) + response = self.client.post(url) + self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) + + +class CancelTaskApiTest(APITestCase): + """Unit tests for the Cancel Task API""" + + def setUp(self): + user = get_user_model().objects.create(username="test", is_superuser=True) + self.client.force_authenticate(user=user) + + @patch("grimoirelab.core.scheduler.api.cancel_task") + def test_cancel_task(self, mock_cancel_task): + """Test cancelling a task""" + + task_uuid = "test_task_uuid" + url = reverse("task-cancel", kwargs={"task_type": "eventizer", "uuid": task_uuid}) + + response = self.client.post(url) + self.assertEqual(response.status_code, status.HTTP_200_OK) + self.assertIn("cancelled", response.data[0]) + mock_cancel_task.assert_called_once_with(task_uuid) + + def test_unauthenticated_request(self): + """Test that it returns an error if no credentials were provided""" + + self.client.force_authenticate(user=None) + + task_uuid = "test_task_uuid" + url = reverse("task-cancel", kwargs={"task_type": "eventizer", "uuid": task_uuid}) + response = self.client.post(url) + self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) + + +class ListJobsApiTest(APITestCase): + """Unit tests for the List Jobs API""" + + def setUp(self): + user = get_user_model().objects.create(username="test", is_superuser=True) + self.client.force_authenticate(user=user) + + def test_list_jobs_invalid_task_type(self): + """Test listing jobs with invalid task type""" + + task_uuid = "test_task_uuid" + url = reverse("jobs", kwargs={"task_type": "invalid-type", "task_id": task_uuid}) + response = self.client.get(url) + self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) + self.assertIn("Unknown task type", str(response.data)) + + def test_unauthenticated_request(self): + """Test that it returns an error if no credentials were provided""" + + self.client.force_authenticate(user=None) + + task_uuid = "test_task_uuid" + url = reverse("jobs", kwargs={"task_type": "eventizer", "task_id": task_uuid}) + response = self.client.get(url) + self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) + + +class JobDetailApiTest(APITestCase): + """Unit tests for the Job Detail API""" + + def setUp(self): + user = get_user_model().objects.create(username="test", is_superuser=True) + self.client.force_authenticate(user=user) + + def test_get_job_invalid_task_type(self): + """Test retrieving job details with invalid task type""" + + task_uuid = "test_task_uuid" + job_uuid = "test_job_uuid" + url = reverse( + "job-detail", + kwargs={"task_type": "invalid-type", "task_id": task_uuid, "uuid": job_uuid}, + ) + response = self.client.get(url) + self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) + self.assertIn("Unknown task type", str(response.data)) + + def test_unauthenticated_request(self): + """Test that it returns an error if no credentials were provided""" + + self.client.force_authenticate(user=None) + + task_uuid = "test_task_uuid" + job_uuid = "test_job_uuid" + url = reverse( + "job-detail", kwargs={"task_type": "eventizer", "task_id": task_uuid, "uuid": job_uuid} + ) + response = self.client.get(url) + self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) + + +class JobLogsApiTest(APITestCase): + """Unit tests for the Job Logs API""" + + def setUp(self): + user = get_user_model().objects.create(username="test", is_superuser=True) + self.client.force_authenticate(user=user) + + def test_get_job_logs_invalid_task_type(self): + """Test retrieving job logs with invalid task type""" + + task_uuid = "test_task_uuid" + job_uuid = "test_job_uuid" + url = reverse( + "job-logs", kwargs={"task_type": "invalid-type", "task_id": task_uuid, "uuid": job_uuid} + ) + response = self.client.get(url) + self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) + self.assertIn("Unknown task type", str(response.data)) + + def test_unauthenticated_request(self): + """Test that it returns an error if no credentials were provided""" + + self.client.force_authenticate(user=None) + + task_uuid = "test_task_uuid" + job_uuid = "test_job_uuid" + url = reverse( + "job-logs", kwargs={"task_type": "eventizer", "task_id": task_uuid, "uuid": job_uuid} + ) + response = self.client.get(url) + self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) diff --git a/tests/unit/scheduler/test_models.py b/tests/unit/scheduler/test_models.py index 5927881..4a4fb49 100644 --- a/tests/unit/scheduler/test_models.py +++ b/tests/unit/scheduler/test_models.py @@ -120,6 +120,9 @@ class TestTaskRegistration(GrimoireLabTestCase): def setUp(self): GRIMOIRELAB_TASK_MODELS.clear() + def tearDown(self): + GRIMOIRELAB_TASK_MODELS.clear() + def test_register_task(self): """Task is correctly registered and job class is created""" diff --git a/tests/unit/scheduler/test_scheduler.py b/tests/unit/scheduler/test_scheduler.py index 0a86793..86b75aa 100644 --- a/tests/unit/scheduler/test_scheduler.py +++ b/tests/unit/scheduler/test_scheduler.py @@ -153,6 +153,7 @@ def setUp(self): task_class, job_class = register_task_model("test_task", SchedulerTestTask) def cleanup_test_model(): + GRIMOIRELAB_TASK_MODELS.clear() with django.db.connection.schema_editor() as schema_editor: schema_editor.delete_model(job_class) schema_editor.delete_model(task_class) @@ -294,6 +295,7 @@ def setUp(self): ) def cleanup_test_model(): + GRIMOIRELAB_TASK_MODELS.clear() with django.db.connection.schema_editor() as schema_editor: schema_editor.delete_model(job_class_sched) schema_editor.delete_model(task_class_sched) @@ -457,6 +459,7 @@ def setUp(self): ) def cleanup_test_model(): + GRIMOIRELAB_TASK_MODELS.clear() with django.db.connection.schema_editor() as schema_editor: schema_editor.delete_model(self.job_class) schema_editor.delete_model(self.task_class) @@ -527,6 +530,7 @@ def setUp(self): task_class, job_class = register_task_model("test_task", SchedulerTestTask) def cleanup_test_model(): + GRIMOIRELAB_TASK_MODELS.clear() with django.db.connection.schema_editor() as schema_editor: schema_editor.delete_model(job_class) schema_editor.delete_model(task_class) @@ -681,6 +685,7 @@ def setUp(self): ) def cleanup_test_model(): + GRIMOIRELAB_TASK_MODELS.clear() with django.db.connection.schema_editor() as schema_editor: schema_editor.delete_model(self.job_class) schema_editor.delete_model(self.task_class) @@ -824,6 +829,7 @@ def setUp(self): ) def cleanup_test_model(): + GRIMOIRELAB_TASK_MODELS.clear() with django.db.connection.schema_editor() as schema_editor: schema_editor.delete_model(self.job_class) schema_editor.delete_model(self.task_class) diff --git a/tests/unit/scheduler/test_tasks_sortinghat.py b/tests/unit/scheduler/test_tasks_sortinghat.py new file mode 100644 index 0000000..f3386d1 --- /dev/null +++ b/tests/unit/scheduler/test_tasks_sortinghat.py @@ -0,0 +1,464 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) GrimoireLab Contributors +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . +# + +import unittest + +from typing import Any + +import django_rq + +from django.conf import settings +from django.contrib.auth import get_user_model + +from grimoirelab.core.scheduler.models import register_task_model +from grimoirelab_toolkit.datetime import datetime_utcnow, str_to_datetime + +from grimoirelab.core.scheduler.scheduler import schedule_task, reschedule_task +from grimoirelab.core.scheduler.tasks.models import ( + AffiliateTask, + UnifyTask, + RecommendAffiliationsTask, + RecommendMatchesTask, + ImportIdentitiesTask, +) + +from sortinghat.core import api +from sortinghat.core.context import SortingHatContext +from sortinghat.core.importer.backend import IdentitiesImporter +from sortinghat.core.models import Individual, AffiliationRecommendation, MergeRecommendation + + +from ..base import GrimoireLabTestCase + + +def setup_sortinghat_database() -> dict[str, Any]: + """Set up SortingHat database for tests. + + The individuals and organizations are obtained partially from tests + in Sortinghat: https://github.com/chaoss/grimoirelab-sortinghat + + returns: A dictionary with created objects. + """ + user = get_user_model().objects.create(username="test") + ctx = SortingHatContext(user) + + # Organizations and domains + org_1 = api.add_organization(ctx, "Example") + api.add_domain(ctx, "Example", "example.com", is_top_domain=True) + + org_2 = api.add_organization(ctx, "Example Int.") + api.add_domain(ctx, "Example Int.", "u.example.com", is_top_domain=True) + api.add_domain(ctx, "Example Int.", "es.u.example.com") + api.add_domain(ctx, "Example Int.", "en.u.example.com") + + org_3 = api.add_organization(ctx, "Bitergia") + api.add_domain(ctx, "Bitergia", "bitergia.com") + api.add_domain(ctx, "Bitergia", "bitergia.org") + + api.add_organization(ctx, "LibreSoft") + + # john_smith identity + john_smith = api.add_identity(ctx, email="jsmith@example.com", name="John Smith", source="scm") + js2 = api.add_identity(ctx, name="John Smith", source="scm", uuid=john_smith.uuid) + js3 = api.add_identity(ctx, username="jsmith", source="scm", uuid=john_smith.uuid) + + # jsmith + jsmith = api.add_identity(ctx, name="J. Smith", username="john_smith", source="alt") + jsm2 = api.add_identity( + ctx, name="John Smith", username="jsmith", source="alt", uuid=jsmith.uuid + ) + jsm3 = api.add_identity(ctx, email="jsmith@example.com", source="alt", uuid=jsmith.uuid) + + # jane_rae + jane_rae = api.add_identity(ctx, name="Janer Rae", source="mls") + jr2 = api.add_identity( + ctx, email="jane.rae@example.net", name="Jane Rae Doe", source="mls", uuid=jane_rae.uuid + ) + + # js_alt + js_alt = api.add_identity(ctx, name="J. Smith", username="john_smith", source="scm") + js_alt2 = api.add_identity( + ctx, email="J_Smith@bitergia.com", username="john_smith", source="mls", uuid=js_alt.uuid + ) + js_alt3 = api.add_identity(ctx, username="Smith. J", source="mls", uuid=js_alt.uuid) + js_alt4 = api.add_identity( + ctx, email="J_Smith@bitergia.com", name="Smith. J", source="mls", uuid=js_alt.uuid + ) + + # jrae + jrae = api.add_identity(ctx, email="jrae@example.net", name="Jane Rae Doe", source="mls") + jrae2 = api.add_identity(ctx, name="jrae", source="mls", uuid=jrae.uuid) + jrae3 = api.add_identity(ctx, name="jrae", source="scm", uuid=jrae.uuid) + + return { + "org_1": org_1, + "org_2": org_2, + "org_3": org_3, + "john_smith": john_smith, + "js2": js2, + "js3": js3, + "jsmith": jsmith, + "jsm2": jsm2, + "jsm3": jsm3, + "jane_rae": jane_rae, + "jr2": jr2, + "js_alt": js_alt, + "js_alt2": js_alt2, + "js_alt3": js_alt3, + "js_alt4": js_alt4, + "jrae": jrae, + "jrae2": jrae2, + "jrae3": jrae3, + } + + +class TestAffiliateTask(GrimoireLabTestCase): + """Unit tests for AffiliateTask class""" + + @classmethod + def setUpClass(cls): + super().setUpClass() + # GRIMOIRELAB_TASK_MODELS is empty from other tests that removes the tasks + try: + register_task_model(AffiliateTask.TASK_TYPE, AffiliateTask) + except ValueError: + # Already registered + pass + + def setUp(self): + """Initialize database with a dataset""" + + get_user_model().objects.get_or_create(username=settings.SYSTEM_BOT_USER) + + data = setup_sortinghat_database() + for key, value in data.items(): + setattr(self, key, value) + + def test_affiliate_task(self): + """Test affiliate task execution""" + + task = schedule_task("affiliate", task_args={}, burst=True, uuids=[self.jsmith.uuid]) + self.assertIsInstance(task, AffiliateTask) + self.assertEqual(task.uuids, [self.jsmith.uuid]) + + # Execute the job + worker = django_rq.workers.get_worker(task.default_job_queue) + processed = worker.work(burst=True, with_scheduler=True) + self.assertEqual(processed, 1) + + # Check database objects + individual_db = Individual.objects.get(mk=self.john_smith.uuid) + enrollments_db = individual_db.enrollments.all() + self.assertEqual(len(enrollments_db), 0) + + individual_db = Individual.objects.get(mk=self.jsmith.uuid) + enrollments_db = individual_db.enrollments.all() + self.assertEqual(len(enrollments_db), 1) + enrollment_db_1 = enrollments_db[0] + self.assertEqual(enrollment_db_1.group.name, self.org_1.name) + + +class TestRecommendAffiliationsTask(GrimoireLabTestCase): + """Unit tests for RecommendAffiliationsTask class""" + + @classmethod + def setUpClass(cls): + super().setUpClass() + # GRIMOIRELAB_TASK_MODELS might be empty from other tests that removes the tasks + try: + register_task_model(RecommendAffiliationsTask.TASK_TYPE, RecommendAffiliationsTask) + except ValueError: + # Already registered + pass + + def setUp(self): + """Initialize database with a dataset""" + + get_user_model().objects.get_or_create(username=settings.SYSTEM_BOT_USER) + + data = setup_sortinghat_database() + for key, value in data.items(): + setattr(self, key, value) + + def test_recommend_affiliations_task(self): + """Test recommend affiliations task execution""" + + task = schedule_task("recommend_affiliations", task_args={}, burst=True) + self.assertIsInstance(task, RecommendAffiliationsTask) + self.assertEqual(task.uuids, None) + + # Execute the job + worker = django_rq.workers.get_worker(task.default_job_queue) + processed = worker.work(burst=True, with_scheduler=True) + self.assertEqual(processed, 1) + + # Check database objects + recommendations = AffiliationRecommendation.objects.all() + self.assertEqual(len(recommendations), 3) + + for recommendation in recommendations: + if recommendation.individual.mk == self.john_smith.uuid: + self.assertEqual(recommendation.organization.name, self.org_1.name) + elif recommendation.individual.mk == self.jsmith.uuid: + self.assertEqual(recommendation.organization.name, self.org_1.name) + elif recommendation.individual.mk == self.js_alt.uuid: + self.assertEqual(recommendation.organization.name, self.org_3.name) + else: + self.fail("Unexpected individual in affiliation recommendations") + + +class TestUnifyTask(GrimoireLabTestCase): + """Unit tests for UnifyTask class""" + + @classmethod + def setUpClass(cls): + super().setUpClass() + # GRIMOIRELAB_TASK_MODELS might be empty from other tests that removes the tasks + try: + register_task_model(UnifyTask.TASK_TYPE, UnifyTask) + except ValueError: + # Already registered + pass + + def setUp(self): + """Initialize database with a dataset""" + + get_user_model().objects.get_or_create(username=settings.SYSTEM_BOT_USER) + + data = setup_sortinghat_database() + for key, value in data.items(): + setattr(self, key, value) + + def test_task(self): + """Test unify task execution""" + + source_uuids = [self.john_smith.uuid, self.jrae3.uuid, self.jr2.uuid] + target_uuids = [ + self.john_smith.uuid, + self.js2.uuid, + self.js3.uuid, + self.jsmith.uuid, + self.jsm2.uuid, + self.jsm3.uuid, + self.jane_rae.uuid, + self.jr2.uuid, + self.js_alt.uuid, + self.js_alt2.uuid, + self.js_alt3.uuid, + self.js_alt4.uuid, + self.jrae.uuid, + self.jrae2.uuid, + self.jrae3.uuid, + ] + criteria = ["email", "name", "username"] + + task = schedule_task( + "unify", + task_args={}, + burst=True, + criteria=criteria, + source_uuids=source_uuids, + target_uuids=target_uuids, + ) + self.assertIsInstance(task, UnifyTask) + self.assertEqual(task.source_uuids, source_uuids) + self.assertEqual(task.target_uuids, target_uuids) + self.assertEqual(task.criteria, criteria) + + # Execute the job + worker = django_rq.workers.get_worker(task.default_job_queue) + processed = worker.work(burst=True, with_scheduler=True) + self.assertEqual(processed, 1) + + # Checking if the identities have been merged + # Individual 1 + individual_1 = Individual.objects.get(mk=self.jsmith.uuid) + identities = individual_1.identities.all() + self.assertEqual(len(identities), 6) + + id1 = identities[0] + self.assertEqual(id1, self.jsm2) + + id2 = identities[1] + self.assertEqual(id2, self.jsmith) + + id3 = identities[2] + self.assertEqual(id3, self.jsm3) + + id4 = identities[3] + self.assertEqual(id4, self.john_smith) + + id5 = identities[4] + self.assertEqual(id5, self.js2) + + id6 = identities[5] + self.assertEqual(id6, self.js3) + + # Individual 2 + individual_2 = Individual.objects.get(mk=self.jrae.uuid) + identities = individual_2.identities.all() + self.assertEqual(len(identities), 5) + + id1 = identities[0] + self.assertEqual(id1, self.jrae2) + + id2 = identities[1] + self.assertEqual(id2, self.jrae3) + + id3 = identities[2] + self.assertEqual(id3, self.jrae) + + id4 = identities[3] + self.assertEqual(id4, self.jane_rae) + + id5 = identities[4] + self.assertEqual(id5, self.jr2) + + +class TestRecommendMatchesTask(GrimoireLabTestCase): + """Unit tests for RecommendMatchesTask class""" + + @classmethod + def setUpClass(cls): + super().setUpClass() + # GRIMOIRELAB_TASK_MODELS might be empty from other tests that removes the tasks + try: + register_task_model(RecommendMatchesTask.TASK_TYPE, RecommendMatchesTask) + except ValueError: + # Already registered + pass + + def setUp(self): + """Initialize database with a dataset""" + + get_user_model().objects.get_or_create(username=settings.SYSTEM_BOT_USER) + + data = setup_sortinghat_database() + for key, value in data.items(): + setattr(self, key, value) + + def test_recommend_matches_task(self): + """Test recommend matches task execution""" + + criteria = ["email", "name", "username"] + + task = schedule_task( + "recommend_matches", + task_args={}, + burst=True, + criteria=criteria, + ) + self.assertIsInstance(task, RecommendMatchesTask) + self.assertEqual(task.source_uuids, None) + self.assertEqual(task.target_uuids, None) + self.assertEqual(task.criteria, criteria) + + # Execute the job + worker = django_rq.workers.get_worker(task.default_job_queue) + processed = worker.work(burst=True, with_scheduler=True) + self.assertEqual(processed, 1) + + # Check database objects + recommendations_expected = [ + sorted([self.js_alt.individual.mk, self.jsmith.individual.mk]), + sorted([self.jsmith.individual.mk, self.john_smith.individual.mk]), + sorted([self.jrae.individual.mk, self.jane_rae.individual.mk]), + ] + + recommendations = MergeRecommendation.objects.all() + self.assertEqual(len(recommendations), 3) + for recommendation in recommendations: + uuids = [recommendation.individual1.mk, recommendation.individual2.mk] + self.assertIn(uuids, recommendations_expected) + + +class MockTestImporter(IdentitiesImporter): + NAME = "test_backend" + + def __init__(self, ctx, url, from_date=None, token=None): + super().__init__(ctx, url) + self.token = token + self.from_date = from_date + + def get_individuals(self): + from sortinghat.core.importer.models import Individual, Identity + + indiv = Individual() + indiv.identities.append(Identity(source="test_backend", username="test_user")) + return [indiv] + + +class TestImportIdentities(GrimoireLabTestCase): + """Unit tests for import_identities""" + + @classmethod + def setUpClass(cls): + super().setUpClass() + # GRIMOIRELAB_TASK_MODELS might be empty from other tests that removes the tasks + try: + register_task_model(ImportIdentitiesTask.TASK_TYPE, ImportIdentitiesTask) + except ValueError: + # Already registered + pass + + def setUp(self): + """Initialize database""" + + self.user = get_user_model().objects.get_or_create(username=settings.SYSTEM_BOT_USER) + + @unittest.mock.patch("sortinghat.core.importer.backend.find_backends") + def test_import_identities(self, mock_find_backends): + """Check if the importer is executed correctly""" + + mock_find_backends.return_value = {"test_backend": MockTestImporter} + + task = schedule_task( + "import_identities", + task_args={}, + burst=True, + backend_name="test_backend", + url="my_url", + ) + self.assertIsInstance(task, ImportIdentitiesTask) + self.assertEqual(task.backend_name, "test_backend") + self.assertEqual(task.url, "my_url") + + dt_before = datetime_utcnow() + + # Execute the job + worker = django_rq.workers.get_worker(task.default_job_queue) + processed = worker.work(burst=True, with_scheduler=True) + self.assertEqual(processed, 1) + + dt_after = datetime_utcnow() + + # Check individual and identity are inserted + indiv = Individual.objects.first() + identity = indiv.identities.first() + self.assertEqual(identity.source, "test_backend") + self.assertEqual(identity.username, "test_user") + + # The next execution the from_date should be set + reschedule_task(task.uuid) + + job = task.jobs.order_by("-created_at").first() + + self.assertIsNotNone(job) + self.assertIn("from_date", job.job_args) + self.assertGreater(str_to_datetime(job.job_args["from_date"]), dt_before) + self.assertLess(str_to_datetime(job.job_args["from_date"]), dt_after) diff --git a/ui/src/services/api/scheduler.js b/ui/src/services/api/scheduler.js index ca372ed..09dd4e1 100644 --- a/ui/src/services/api/scheduler.js +++ b/ui/src/services/api/scheduler.js @@ -1,12 +1,12 @@ import { client } from './client' export const scheduler = { - list: (params) => client.get(`/scheduler/tasks`, { params }), - get: (taskId) => client.get(`/scheduler/tasks/${taskId}`), - create: (data) => client.post(`/scheduler/add_task`, data), - cancel: (taskId) => client.post(`/scheduler/cancel_task`, { taskId }), - reschedule: (taskId) => client.post(`/scheduler/reschedule_task`, { taskId }), - getTaskJobs: (taskId, params) => client.get(`/scheduler/tasks/${taskId}/jobs/`, { params }), - getJob: (taskId, jobId) => client.get(`/scheduler/tasks/${taskId}/jobs/${jobId}`), - getJobLogs: (taskId, jobId) => client.get(`/scheduler/tasks/${taskId}/jobs/${jobId}/logs/`) + list: (params) => client.get(`/api/v1/tasks/eventizer/`, { params }), + get: (taskId) => client.get(`/api/v1/tasks/eventizer/${taskId}/`), + create: (data) => client.post(`/api/v1/tasks/eventizer/`, data), + cancel: (taskId) => client.post(`/api/v1/tasks/eventizer/${taskId}/cancel/`, { taskId }), + reschedule: (taskId) => client.post(`/api/v1/tasks/eventizer/${taskId}/reschedule/`, { taskId }), + getTaskJobs: (taskId, params) => client.get(`/api/v1/tasks/eventizer/${taskId}/jobs/`, { params }), + getJob: (taskId, jobId) => client.get(`/api/v1/tasks/eventizer/${taskId}/jobs/${jobId}/`), + getJobLogs: (taskId, jobId) => client.get(`/api/v1/tasks/eventizer/${taskId}/jobs/${jobId}/logs/`) }