Handle Concurrent Job Execution - Allowing User to Skip Job, If an instance is executing#774
Conversation
Codecov Report
|
|
@odinserj When will you include this commit to the main branch? This is very important feature for us |
|
@anupkumarsharma beware, that adding new arguments with default values to an existing api surface is breaking the binary contract this library has to other dependent libraries until they get a chance to recompile. The correct way is to keep the existing api surface and add new method overloads accepting the new argument in question. |
|
@burningice2866 - There are three flavours of such methods. If I create an overload of all these flavours and unless I am missing something, we have to create 6 different methods. This sounds bit messy. One way is to take params[] rather than keep adding parameters. Thoughts? On the other hand, if a user is upgrading the library, shouldn't we assume chances of recompilation? |
|
@anupkumarsharma You're overlooking the scenario of a solution with multiple libraries depending on HangFire but build againts different version numbers. (version numbers are random and only there to prove my point) Solution is upgading to Hangfire 2.9 which contains these changed default values
In this case yo can't update Hangfire for the solution without having to update all the libraries, and you don't control the release cycle of them. Arguments with default values is a pain and i always object to using them in public surfacing API's since most people misunderstand how its implemented in the language/compiler. https://lostechies.com/jimmybogard/2010/05/18/caveats-of-c-4-0-optional-parameters/ is a nice article which sums it up like this
|
|
@burningice2866 - Ok. I was not aware of that. I will add overloaded function and update the pull. |
|
Are there any updates as to when this will completed and committed to the main branch? |
|
@esmmrodriguez - This pull has been there for quite a few months. Admins any idea? |
|
Hi, any progress? |
|
any updates on this one? |
|
Would LOVE to have this capability. Any chance this is going in soon? |
|
This can be pretty easily solved with job filters. In a simplest case it can just cancel creation of new job instances when a previous one is still running: public class SkipWhenPreviousInstanceIsRunningFilter : JobFilterAttribute, IClientFilter, IApplyStateFilter
{
public void OnCreating(CreatingContext context)
{
var connection = context.Connection as JobStorageConnection;
if (connection == null) return;
if (!context.Parameters.ContainsKey("RecurringJobId")) return;
var recurringJobId = context.Parameters["RecurringJobId"] as string;
if (string.IsNullOrWhiteSpace(recurringJobId)) return;
var running = connection.GetValueFromHash($"recurring-job:{recurringJobId}", "Running");
context.Canceled = "true".Equals(running, StringComparison.OrdinalIgnoreCase);
}
public void OnCreated(CreatedContext filterContext)
{
}
public void OnStateApplied(ApplyStateContext context, IWriteOnlyTransaction transaction)
{
if (context.NewState is EnqueuedState)
{
SetRunning(context, transaction, true);
}
else if (context.NewState.IsFinal || context.NewState is FailedState)
{
SetRunning(context, transaction, false);
}
}
public void OnStateUnapplied(ApplyStateContext context, IWriteOnlyTransaction transaction)
{
}
private static void SetRunning(ApplyStateContext context, IWriteOnlyTransaction transaction, bool running)
{
var recurringJobId = JobHelper.FromJson<string>(
context.Connection.GetJobParameter(context.BackgroundJob.Id, "RecurringJobId"));
if (string.IsNullOrWhiteSpace(recurringJobId)) return;
transaction.SetRangeInHash($"recurring-job:{recurringJobId}",
new[] { new KeyValuePair<string, string>("Running", running ? "true" : "false") });
}
}In more complex cases, an |
Hello! I didn't find any documentation on working with connections or transactions. Can you please explain this code a bit? I`m not saying that approach is wrong, just asking. If we are working with some internals here, so, why can't we just query if are any enqueued jobs in DB and cancel the context if there are? |
|
Hello @odinserj, can you please help with this workaround? We found unfortunate side effect of this solution. If server was stopped while running recurring job instance, then "Running" flag will not be removed and it'll prevent re executing aborted job. I know that we should use Cancellation Tokens to prevent this, but solution still looks unreliable. Service may be stopped in unsafe way such as app crash, hard reset, container recycle, etc. Is there any way to safely query current recurring job status? Our case: we have to use long poling to check for updates from integrated system. We need to do that very often - every few minutes, but handling this updates can take about half an hour. As the result we have growing number of unwanted jobs in a queue. I know that this architecture sucks but this is what we have due to restrictions on customer side. We can't make him change. |
|
is this fixed now? |
|
@LeszekKalibrate the full version is available here – https://gist.github.com/odinserj/a6ad7ba6686076c9b9b2e03fcf6bf74e, and race condition is fixed there whenever the storage implementation supports transaction-owned distributed locks. Official storages, Hangfire.SqlServer, Hangfire.InMemory, Hangfire.Pro.Redis support this feature, but I'm not sure regarding community-based implementations. |
Hello @odinserj and thanks for your time. Does it cover server crash scenarios when job is not canceled respectfully? |
|
Yes as it's fully transactional. However, please note that after a server is crashed, there can be a delay in scheduling new recurring job executions, because the aborted job should be re-queued and executed first to clear the "Running" state. |
This pull is in response to feature request - #746
Goal - To provide a user option to skip the job, if there is already an executing instance of the same job.
Approach - My approach is to tweak the recurring job scheduler to pre-check for the state of last executed job for a particular JobId. In case the state is enqueued/processing, the job scheduler will add a new state called skip state with appropriate details, instead of adding one more enqueued instance. This conditional logic will only execute when user have opted for this check, configured by bool flag called disableConcurrentExecution. By default, its false, meaning job don't care about concurrent execution. When true the above check will ensure there is only one executing instance of Job
Since we have introduced a new state, Skipped. We need to add this Job state to the dashboard. This is already part of the pull request and working perfectly.
I have tested it and it runs perfectly. Let me know If there is something I have missed.