Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions wooey/migrations/0055_add_wooeyjob_retry_fields.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from django.db import migrations, models


class Migration(migrations.Migration):
dependencies = [
("wooey", "0054_add_script_version_is_active"),
]

operations = [
migrations.AddField(
model_name="wooeyjob",
name="retry_count",
field=models.PositiveSmallIntegerField(default=0),
),
migrations.AlterField(
model_name="wooeyjob",
name="status",
field=models.CharField(
choices=[
("completed", "Completed"),
("deleted", "Deleted"),
("FAILURE", "Failed"),
("error", "Error"),
("queued", "Queued"),
("RETRY", "Retrying"),
("running", "Running"),
("submitted", "Submitted"),
],
default="submitted",
max_length=255,
),
),
]
16 changes: 14 additions & 2 deletions wooey/models/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,8 @@ class WooeyJob(models.Model):
DELETED = "deleted"
FAILED = states.FAILURE
ERROR = "error"
QUEUED = "queued"
RETRY = states.RETRY
RUNNING = "running"
SUBMITTED = "submitted"

Expand All @@ -241,11 +243,14 @@ class WooeyJob(models.Model):
(DELETED, _("Deleted")),
(FAILED, _("Failed")),
(ERROR, _("Error")),
(QUEUED, _("Queued")),
(RETRY, _("Retrying")),
(RUNNING, _("Running")),
(SUBMITTED, _("Submitted")),
)

status = models.CharField(max_length=255, default=SUBMITTED, choices=STATUS_CHOICES)
retry_count = models.PositiveSmallIntegerField(default=0)

save_path = models.CharField(max_length=255, blank=True, null=True)
command = models.TextField()
Expand Down Expand Up @@ -290,17 +295,24 @@ def submit_to_celery(self, **kwargs):
param.job = self
param.recreate()
param.save()
self.celery_id = None
self.retry_count = 0
self.status = self.SUBMITTED
rerun = kwargs.pop("rerun", False)
if rerun:
self.command = ""
self.save()
task_kwargs = {"wooey_job": self.pk, "rerun": rerun}
job_pk = self.pk
task_kwargs = {"wooey_job": job_pk, "rerun": rerun}

if rerun:
utils.purge_output(job=self)

def submit_task():
tasks.queue_script_job(job_pk, rerun=rerun)

if wooey_settings.WOOEY_CELERY:
transaction.on_commit(lambda: tasks.submit_script.delay(**task_kwargs))
transaction.on_commit(submit_task)
else:
transaction.on_commit(lambda: tasks.submit_script(**task_kwargs))
return self
Expand Down
4 changes: 4 additions & 0 deletions wooey/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import os
import sys
import tempfile
from datetime import timedelta

from django.conf import settings
from django.utils.translation import gettext_lazy as _
Expand Down Expand Up @@ -37,6 +38,9 @@ def get(key, default):
WOOEY_EPHEMERAL_FILES = get("WOOEY_EPHEMERAL_FILES", False)
WOOEY_FILE_DIR = get("WOOEY_FILE_DIR", "wooey_files")
WOOEY_JOB_EXPIRATION = get("WOOEY_JOB_EXPIRATION", {"anonymous": None, "users": None})
WOOEY_JOB_QUEUE_TIMEOUT = get("WOOEY_JOB_QUEUE_TIMEOUT", timedelta(hours=24))
WOOEY_JOB_RESUBMIT_TIMEOUT = get("WOOEY_JOB_RESUBMIT_TIMEOUT", timedelta(hours=1))
WOOEY_JOB_RESUBMIT_LIMIT = get("WOOEY_JOB_RESUBMIT_LIMIT", 3)
WOOEY_REALTIME_CACHE = get("WOOEY_REALTIME_CACHE", None)
WOOEY_SCRIPT_DIR = get("WOOEY_SCRIPT_DIR", "wooey_scripts")

Expand Down
2 changes: 2 additions & 0 deletions wooey/static/wooey/css/base.css
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,7 @@ li.active > a.tab-error {

.status-submitted-toggle,
.status-pending-toggle,
.status-queued-toggle,
.status-running-toggle,
.status-completed-toggle,
.status-revoked-toggle,
Expand All @@ -448,6 +449,7 @@ li.active > a.tab-error {

.status-submitted .status-submitted-toggle,
.status-pending .status-pending-toggle,
.status-queued .status-queued-toggle,
.status-running .status-running-toggle,
.status-completed .status-completed-toggle,
.status-revoked .status-revoked-toggle,
Expand Down
166 changes: 142 additions & 24 deletions wooey/tasks.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,26 @@
from __future__ import absolute_import

import os
import subprocess
import sys
import tarfile
import tempfile
import traceback
import zipfile
from datetime import timedelta
from threading import Thread

from django.utils.text import get_valid_filename
from django.core.files import File
from django.conf import settings
from django.utils.translation import gettext_lazy as _

from celery import app
from celery.schedules import crontab
from celery.signals import worker_process_init
from django.conf import settings
from django.core.files import File
from django.db.models import F
from django.utils.text import get_valid_filename
from django.utils.translation import gettext_lazy as _

from .backend import utils
from . import settings as wooey_settings
from .backend import utils

try:
from Queue import Empty, Queue
Expand All @@ -30,6 +32,32 @@
celery_app = app.app_or_default()


def revoke_job_task(task_id):
if task_id:
celery_app.control.revoke(task_id)


def queue_script_job(
job_id, rerun=False, increment_retry_count=False, revoke_existing=False
):
from .models import WooeyJob

job = WooeyJob.objects.get(pk=job_id)
if revoke_existing:
revoke_job_task(job.celery_id)
WooeyJob.objects.filter(pk=job_id).update(celery_id=None)

async_result = submit_script.delay(wooey_job=job_id, rerun=rerun)
update_kwargs = {
"celery_id": async_result.id,
"status": WooeyJob.QUEUED,
}
if increment_retry_count:
update_kwargs["retry_count"] = F("retry_count") + 1
WooeyJob.objects.filter(pk=job_id).update(**update_kwargs)
return async_result


def enqueue_output(out, q):
for line in iter(out.readline, b""):
q.put(line.decode("utf-8"))
Expand Down Expand Up @@ -321,6 +349,7 @@ def submit_script(**kwargs):
@celery_app.task()
def cleanup_wooey_jobs(**kwargs):
from django.utils import timezone

from .models import WooeyJob

cleanup_settings = wooey_settings.WOOEY_JOB_EXPIRATION
Expand All @@ -337,47 +366,136 @@ def cleanup_wooey_jobs(**kwargs):
).delete()


def _extract_task_ids(worker_info):
task_ids = set()
if not worker_info:
return task_ids

for tasks in worker_info.values():
for task in tasks or []:
request = task.get("request")
if isinstance(request, dict) and request.get("id"):
task_ids.add(request["id"])
continue

if task.get("id"):
task_ids.add(task["id"])

return task_ids


@celery_app.task()
def cleanup_dead_jobs():
def cleanup_stuck_jobs():
"""
This cleans up jobs that have been marked as ran, but are not queue'd in celery. It is meant
to cleanup jobs that have been lost due to a server crash or some other reason a job is
in limbo.
This cleans up jobs that are stuck in limbo between Wooey and the task broker.
"""
from django.utils import timezone

from .models import WooeyJob

# Get active tasks from Celery
inspect = celery_app.control.inspect()
worker_info = inspect.active()
active_info = inspect.active()
reserved_info = inspect.reserved()
scheduled_info = inspect.scheduled()

# If we cannot connect to the workers, we do not know if the tasks are running or not, so
# we cannot mark them as dead
if not worker_info:
# If we cannot connect to the workers, we do not know if the tasks are running or queued.
if all(info is None for info in (active_info, reserved_info, scheduled_info)):
return

active_tasks = {
task["id"] for worker, tasks in worker_info.items() for task in tasks
}
now = timezone.now()
minimum_cleanup_age = timedelta(minutes=10)
oldest_cleanup_eligible = now - minimum_cleanup_age
active_task_ids = _extract_task_ids(active_info)
queued_task_ids = (
active_task_ids
| _extract_task_ids(reserved_info)
| _extract_task_ids(scheduled_info)
)

queue_timeout = (
wooey_settings.WOOEY_JOB_QUEUE_TIMEOUT
if wooey_settings.WOOEY_JOB_QUEUE_TIMEOUT is not None
else timedelta(hours=24)
)
resubmit_timeout = (
wooey_settings.WOOEY_JOB_RESUBMIT_TIMEOUT
if wooey_settings.WOOEY_JOB_RESUBMIT_TIMEOUT is not None
else timedelta(hours=1)
)
resubmit_limit = (
wooey_settings.WOOEY_JOB_RESUBMIT_LIMIT
if wooey_settings.WOOEY_JOB_RESUBMIT_LIMIT is not None
else 0
)

# find jobs that are marked as running but not present in celery's active tasks
active_jobs = WooeyJob.objects.filter(status=WooeyJob.RUNNING)
active_jobs = WooeyJob.objects.filter(
status=WooeyJob.RUNNING,
created_date__lte=oldest_cleanup_eligible,
)
to_disable = set()
for job in active_jobs:
if job.celery_id not in active_tasks:
if job.celery_id not in active_task_ids:
to_disable.add(job.pk)

queued_jobs = WooeyJob.objects.filter(
status__in=(WooeyJob.SUBMITTED, WooeyJob.RETRY, WooeyJob.QUEUED),
created_date__lte=oldest_cleanup_eligible,
)
jobs_to_resubmit = []
jobs_to_mark_queued = set()
task_ids_to_revoke = set()
for job in queued_jobs:
if job.celery_id in active_task_ids:
continue

if job.created_date <= now - queue_timeout:
task_ids_to_revoke.add(job.celery_id)
to_disable.add(job.pk)
continue

if job.celery_id in queued_task_ids:
if job.status in (WooeyJob.SUBMITTED, WooeyJob.RETRY):
jobs_to_mark_queued.add(job.pk)
continue

if job.status == WooeyJob.QUEUED:
continue

if job.modified_date > now - resubmit_timeout:
continue

if job.retry_count >= resubmit_limit:
task_ids_to_revoke.add(job.celery_id)
to_disable.add(job.pk)
continue

jobs_to_resubmit.append(job.pk)

for task_id in task_ids_to_revoke:
revoke_job_task(task_id)

WooeyJob.objects.filter(pk__in=jobs_to_mark_queued).update(status=WooeyJob.QUEUED)
WooeyJob.objects.filter(pk__in=to_disable).update(status=WooeyJob.FAILED)

for job_id in jobs_to_resubmit:
WooeyJob.objects.filter(pk=job_id).update(status=WooeyJob.RETRY)
queue_script_job(
job_id,
rerun=False,
increment_retry_count=True,
revoke_existing=True,
)


celery_app.conf.beat_schedule.update(
{
"cleanup-old-jobs": {
"task": "wooey.tasks.cleanup_wooey_jobs",
"schedule": crontab(hour=0, minute=0), # cleanup at midnight each day
},
"cleanup-dead-jobs": {
"task": "wooey.tasks.cleanup_dead_jobs",
"schedule": crontab(minute="*/10"), # run every 6 minutes
"cleanup-stuck-jobs": {
"task": "wooey.tasks.cleanup_stuck_jobs",
"schedule": crontab(minute="*/10"), # run every 10 minutes
},
}
)
2 changes: 1 addition & 1 deletion wooey/templates/wooey/jobs/job_list.html
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ <h3 id="wooey-job-list-title">{{ title }}</h3>
<span class="label label-success"><span class="glyphicon glyphicon-ok"></span> {% trans "Success" %}</span>
{% elif job.status == 'running' %}
<span class="label label-success"><span class="glyphicon glyphicon-refresh spinning"></span> {% trans "Executing" %}</span>
{% elif job.status == 'pending' %}
{% elif job.status == 'queued' or job.status == 'pending' %}
<span class="label label-default"><span class="glyphicon time"></span> {% trans "Queued" %}</span>
{% elif job.status == 'revoked' %}
<span class="label label-danger"><span class="glyphicon glyphicon-stop"></span> {% trans "Halted" %}</span>
Expand Down
4 changes: 2 additions & 2 deletions wooey/templates/wooey/jobs/job_view.html
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ <h1 class="text-center">{{ job_error }}</h1>
<input name="job-id" value="{{ job_info.job.pk }}" type="hidden">

{% get_wooey_setting "WOOEY_CELERY_STOPPABLE_JOBS" as stoppable_jobs %}
<button {% if not stoppable_jobs %}data-placement="bottom" data-toggle="tooltip" title="The current celery broker does not support stoppable jobs and stopping will likely not work. To resolve this, use RabbitMQ as a celery message broker."{% endif %}class="btn btn-primary btn-danger status-running-toggle status-submitted-toggle status-pending-toggle" name="celery-command" value="stop" type="submit">
<button {% if not stoppable_jobs %}data-placement="bottom" data-toggle="tooltip" title="The current celery broker does not support stoppable jobs and stopping will likely not work. To resolve this, use RabbitMQ as a celery message broker."{% endif %}class="btn btn-primary btn-danger status-running-toggle status-submitted-toggle status-pending-toggle status-queued-toggle" name="celery-command" value="stop" type="submit">
<span class="glyphicon glyphicon-stop" aria-hidden="true"></span> {% trans "Stop" %}
</button>
<button class="btn btn-primary btn-warning status-completed-toggle status-revoked-toggle status-failure-toggle" name="celery-command" value="rerun" type="submit">
Expand Down Expand Up @@ -137,7 +137,7 @@ <h1>{{ job_info.job.job_name }} <small>{{ job_info.job.job_description }}</small
• {% blocktrans with job_last_modified_date=job_info.last_modified|timesince %}Updated {{ job_last_modified_date }} ago{% endblocktrans %}

<span class="status-submitted-toggle label label-default"><span class="glyphicon glyphicon-hourglass"></span> {% trans "Waiting" %}</span>
<span class="status-pending-toggle label label-default"><span class="glyphicon time"></span> {% trans "Queued" %}</span>
<span class="status-pending-toggle status-queued-toggle label label-default"><span class="glyphicon time"></span> {% trans "Queued" %}</span>
<span class="status-running-toggle label label-success"><span class="glyphicon glyphicon-refresh spinning"></span> {% trans "Executing" %}</span>
<span class="status-completed-toggle label label-success"><span class="glyphicon glyphicon-ok"></span> {% trans "Success" %}</span>
<span class="status-revoked-toggle label label-danger"><span class="glyphicon glyphicon-stop"></span> {% trans "Halted" %}</span>
Expand Down
Loading
Loading