diff --git a/.github/workflows/verify.yml b/.github/workflows/verify.yml
index 6fd5632..dbd486f 100644
--- a/.github/workflows/verify.yml
+++ b/.github/workflows/verify.yml
@@ -61,7 +61,18 @@ jobs:
run: dotnet build --no-restore --configuration Release --verbosity minimal
- name: Test
- run: dotnet test --no-build --verbosity normal --configuration Release --collect:"XPlat Code Coverage" --logger "trx" --results-directory ./TestResults
+ run: >
+ dotnet test
+ --no-build
+ --configuration Release
+ --verbosity normal
+ --maxcpucount
+ -p:TestTfmsInParallel=true
+ --collect:"XPlat Code Coverage"
+ --logger "trx"
+ --results-directory ./TestResults
+ --
+ RunConfiguration.MaxCpuCount=0
- name: Test results report
uses: dorny/test-reporter@v2
diff --git a/Atomizer.sln b/Atomizer.sln
index b1e1e6b..1519eee 100644
--- a/Atomizer.sln
+++ b/Atomizer.sln
@@ -43,6 +43,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Atomizer.Redis.Tests", "tes
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Atomizer.Redis.Example", "samples\Atomizer.Redis.Example\Atomizer.Redis.Example.csproj", "{1D05317E-F0B0-4638-99D8-CCB018BCB681}"
EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Atomizer.FlowTests", "tests\Atomizer.FlowTests\Atomizer.FlowTests.csproj", "{F37FF791-43BD-4394-8BED-95C9F013DCA8}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -195,6 +197,18 @@ Global
{1D05317E-F0B0-4638-99D8-CCB018BCB681}.Release|x64.Build.0 = Release|Any CPU
{1D05317E-F0B0-4638-99D8-CCB018BCB681}.Release|x86.ActiveCfg = Release|Any CPU
{1D05317E-F0B0-4638-99D8-CCB018BCB681}.Release|x86.Build.0 = Release|Any CPU
+ {F37FF791-43BD-4394-8BED-95C9F013DCA8}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {F37FF791-43BD-4394-8BED-95C9F013DCA8}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {F37FF791-43BD-4394-8BED-95C9F013DCA8}.Debug|x64.ActiveCfg = Debug|Any CPU
+ {F37FF791-43BD-4394-8BED-95C9F013DCA8}.Debug|x64.Build.0 = Debug|Any CPU
+ {F37FF791-43BD-4394-8BED-95C9F013DCA8}.Debug|x86.ActiveCfg = Debug|Any CPU
+ {F37FF791-43BD-4394-8BED-95C9F013DCA8}.Debug|x86.Build.0 = Debug|Any CPU
+ {F37FF791-43BD-4394-8BED-95C9F013DCA8}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {F37FF791-43BD-4394-8BED-95C9F013DCA8}.Release|Any CPU.Build.0 = Release|Any CPU
+ {F37FF791-43BD-4394-8BED-95C9F013DCA8}.Release|x64.ActiveCfg = Release|Any CPU
+ {F37FF791-43BD-4394-8BED-95C9F013DCA8}.Release|x64.Build.0 = Release|Any CPU
+ {F37FF791-43BD-4394-8BED-95C9F013DCA8}.Release|x86.ActiveCfg = Release|Any CPU
+ {F37FF791-43BD-4394-8BED-95C9F013DCA8}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -212,5 +226,6 @@ Global
{33964D8D-5DA4-442C-8CEB-50F36DF81199} = {57DC23B8-FC3C-41A3-AEB0-21C016F935F6}
{B6852BFA-D9C6-48EE-9ED6-35B542B834C9} = {7233FD90-018C-47BF-9001-EB1681DFE75B}
{1D05317E-F0B0-4638-99D8-CCB018BCB681} = {5D128655-88B1-4CA4-B12B-9E6C067D6DA3}
+ {F37FF791-43BD-4394-8BED-95C9F013DCA8} = {7233FD90-018C-47BF-9001-EB1681DFE75B}
EndGlobalSection
EndGlobal
diff --git a/src/Atomizer/AssemblyAttributes.cs b/src/Atomizer/AssemblyAttributes.cs
index f63bf55..30a85dc 100644
--- a/src/Atomizer/AssemblyAttributes.cs
+++ b/src/Atomizer/AssemblyAttributes.cs
@@ -1,4 +1,5 @@
using System.Runtime.CompilerServices;
[assembly: InternalsVisibleTo("Atomizer.Tests")]
+[assembly: InternalsVisibleTo("Atomizer.FlowTests")]
[assembly: InternalsVisibleTo("DynamicProxyGenAssembly2")]
diff --git a/src/Atomizer/Configuration/QueueOptions.cs b/src/Atomizer/Configuration/QueueOptions.cs
index c345ea6..d4adc62 100644
--- a/src/Atomizer/Configuration/QueueOptions.cs
+++ b/src/Atomizer/Configuration/QueueOptions.cs
@@ -36,9 +36,12 @@ public sealed class QueueOptions
///
/// Gets the interval at which the internal processing loop ticks.
- /// Default is 1 second.
+ ///
+ /// Default is 1 second. The setter is internal so that test assemblies (via InternalsVisibleTo)
+ /// can reduce the tick interval for timing-sensitive flow tests without exposing mutation to external consumers.
+ ///
///
- public TimeSpan TickInterval { get; private set; } = TimeSpan.FromSeconds(1);
+ public TimeSpan TickInterval { get; internal set; } = TimeSpan.FromSeconds(1);
///
/// Initializes a new with the specified queue key.
diff --git a/src/Atomizer/Processing/QueuePoller.cs b/src/Atomizer/Processing/QueuePoller.cs
index e9c5c57..3829bbe 100644
--- a/src/Atomizer/Processing/QueuePoller.cs
+++ b/src/Atomizer/Processing/QueuePoller.cs
@@ -73,6 +73,11 @@ await storage.ExecuteInLeaseAsync(
foreach (var job in jobs)
{
+ if (job.Status == AtomizerJobStatus.Processing)
+ {
+ job.Release(now);
+ }
+
job.Lease(leaseToken, now, queue.VisibilityTimeout);
acquired.Add(job);
}
diff --git a/tests/Atomizer.FlowTests/Atomizer.FlowTests.csproj b/tests/Atomizer.FlowTests/Atomizer.FlowTests.csproj
new file mode 100644
index 0000000..ee6825f
--- /dev/null
+++ b/tests/Atomizer.FlowTests/Atomizer.FlowTests.csproj
@@ -0,0 +1,46 @@
+
+
+ enable
+ enable
+ Exe
+ Atomizer.FlowTests
+ net8.0
+ 14
+ false
+ true
+
+
+
+
+
+
+
+
+
+
+ all
+ runtime; build; native; contentfiles; analyzers; buildtransitive
+
+
+
+ all
+ runtime; build; native; contentfiles; analyzers; buildtransitive
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/tests/Atomizer.FlowTests/AtomizerFlowTests.cs b/tests/Atomizer.FlowTests/AtomizerFlowTests.cs
new file mode 100644
index 0000000..b1b622e
--- /dev/null
+++ b/tests/Atomizer.FlowTests/AtomizerFlowTests.cs
@@ -0,0 +1,100 @@
+using Atomizer.FlowTests.Drivers;
+using Atomizer.FlowTests.Infrastructure;
+
+namespace Atomizer.FlowTests;
+
+public abstract partial class AtomizerFlowTests : IAsyncLifetime
+{
+ private readonly FlowTestDriver _driver;
+ private readonly List _hosts = new List();
+ private readonly string _runId = Guid.NewGuid().ToString("N");
+ private FlowTestRecorder _recorder = null!;
+
+ protected AtomizerFlowTests(FlowTestDriver driver)
+ {
+ _driver = driver;
+ }
+
+ public async ValueTask InitializeAsync()
+ {
+ _recorder = new FlowTestRecorder();
+ await _driver.ResetAsync(TestContext.Current.CancellationToken);
+ }
+
+ public async ValueTask DisposeAsync()
+ {
+ foreach (var host in _hosts.AsEnumerable().Reverse())
+ {
+ await host.DisposeAsync();
+ }
+
+ await _driver.ResetAsync(CancellationToken.None);
+ }
+
+ private async Task StartHostAsync(Action? configure = null)
+ {
+ var host = await FlowTestHost.CreateAsync(
+ _driver,
+ _runId,
+ _recorder,
+ configure,
+ TestContext.Current.CancellationToken
+ );
+ _hosts.Add(host);
+ return host;
+ }
+
+ private async Task MoveScheduleIntoPastAsync(
+ FlowTestHost host,
+ JobKey scheduleKey,
+ DateTimeOffset lastEnqueueAt,
+ DateTimeOffset nextRunAt
+ )
+ {
+ var schedule = (await host.GetSchedulesAsync(TestContext.Current.CancellationToken)).Single(schedule =>
+ schedule.JobKey == scheduleKey
+ );
+ schedule.LastEnqueueAt = lastEnqueueAt;
+ schedule.NextRunAt = nextRunAt;
+ schedule.CreatedAt = lastEnqueueAt.AddSeconds(-10);
+ await host.UpdateSchedulesAsync(new[] { schedule }, TestContext.Current.CancellationToken);
+ }
+
+ private async Task WaitUntilDeletedAsync(FlowTestHost host, Guid jobId)
+ {
+ using var timeout = CancellationTokenSource.CreateLinkedTokenSource(TestContext.Current.CancellationToken);
+ timeout.CancelAfter(FlowTestTimings.WaitTimeout);
+
+ while (true)
+ {
+ if (await host.GetJobAsync(jobId, timeout.Token) is null)
+ return;
+
+ try
+ {
+ await Task.Delay(FlowTestTimings.PollInterval, timeout.Token);
+ }
+ catch (OperationCanceledException) when (timeout.IsCancellationRequested)
+ {
+ throw new TimeoutException($"Job {jobId} was not deleted by retention.");
+ }
+ }
+ }
+
+ private string NewKey() => $"{_driver.Name.Replace(" ", string.Empty).ToLowerInvariant()}-{Guid.NewGuid():N}";
+}
+
+[Collection(nameof(InMemoryFlowTestDriver))]
+public sealed class InMemoryAtomizerFlowTests(InMemoryFlowTestDriver driver) : AtomizerFlowTests(driver);
+
+[Collection(nameof(PostgreSqlFlowTestDriver))]
+public sealed class PostgreSqlAtomizerFlowTests(PostgreSqlFlowTestDriver driver) : AtomizerFlowTests(driver);
+
+[Collection(nameof(MySqlFlowTestDriver))]
+public sealed class MySqlAtomizerFlowTests(MySqlFlowTestDriver driver) : AtomizerFlowTests(driver);
+
+[Collection(nameof(SqlServerFlowTestDriver))]
+public sealed class SqlServerAtomizerFlowTests(SqlServerFlowTestDriver driver) : AtomizerFlowTests(driver);
+
+[Collection(nameof(RedisFlowTestDriver))]
+public sealed class RedisAtomizerFlowTests(RedisFlowTestDriver driver) : AtomizerFlowTests(driver);
diff --git a/tests/Atomizer.FlowTests/DequeueFlowTests.cs b/tests/Atomizer.FlowTests/DequeueFlowTests.cs
new file mode 100644
index 0000000..cac16eb
--- /dev/null
+++ b/tests/Atomizer.FlowTests/DequeueFlowTests.cs
@@ -0,0 +1,53 @@
+using Atomizer.FlowTests.TestJobs;
+
+namespace Atomizer.FlowTests;
+
+public abstract partial class AtomizerFlowTests
+{
+ [Fact]
+ public async Task DequeueAsync_WhenPendingJobIsCancelledBeforeProcessing_ShouldPersistCancelledStateAndNotDispatch()
+ {
+ var host = await StartHostAsync(options => options.AutoStart = false);
+ var key = NewKey();
+
+ var jobId = await host.Client.EnqueueAsync(
+ new FlowPayload(key),
+ cancellation: TestContext.Current.CancellationToken
+ );
+ var dequeued = await host.Client.DequeueAsync(jobId, TestContext.Current.CancellationToken);
+
+ await host.StartAsync(TestContext.Current.CancellationToken);
+ await Task.Delay(TimeSpan.FromMilliseconds(300), TestContext.Current.CancellationToken);
+
+ dequeued.Should().BeTrue();
+ var job = await host.GetJobAsync(jobId, TestContext.Current.CancellationToken);
+ job.Should().NotBeNull();
+ job!.Status.Should().Be(AtomizerJobStatus.Cancelled);
+ _recorder.AttemptsFor(key).Should().BeEmpty();
+ }
+
+ [Fact]
+ public async Task DequeueAsync_WhenJobAlreadyCompleted_ShouldReturnFalseAndKeepCompletedState()
+ {
+ var host = await StartHostAsync();
+ var key = NewKey();
+
+ var jobId = await host.Client.EnqueueAsync(
+ new FlowPayload(key),
+ cancellation: TestContext.Current.CancellationToken
+ );
+ await host.WaitForJobAsync(
+ jobId,
+ job => job.Status == AtomizerJobStatus.Completed,
+ TestContext.Current.CancellationToken
+ );
+
+ var dequeued = await host.Client.DequeueAsync(jobId, TestContext.Current.CancellationToken);
+ var job = await host.GetJobAsync(jobId, TestContext.Current.CancellationToken);
+
+ dequeued.Should().BeFalse();
+ job.Should().NotBeNull();
+ job!.Status.Should().Be(AtomizerJobStatus.Completed);
+ _recorder.AttemptsFor(key).Should().ContainSingle();
+ }
+}
diff --git a/tests/Atomizer.FlowTests/Drivers/EntityFrameworkCoreFlowTestDrivers.cs b/tests/Atomizer.FlowTests/Drivers/EntityFrameworkCoreFlowTestDrivers.cs
new file mode 100644
index 0000000..90394c5
--- /dev/null
+++ b/tests/Atomizer.FlowTests/Drivers/EntityFrameworkCoreFlowTestDrivers.cs
@@ -0,0 +1,122 @@
+using Atomizer.EntityFrameworkCore;
+using Atomizer.EntityFrameworkCore.Entities;
+using Atomizer.FlowTests.Infrastructure;
+using DotNet.Testcontainers.Containers;
+using Microsoft.EntityFrameworkCore;
+using Microsoft.Extensions.DependencyInjection;
+using Testcontainers.MsSql;
+using Testcontainers.MySql;
+using Testcontainers.PostgreSql;
+
+namespace Atomizer.FlowTests.Drivers;
+
+public abstract class EntityFrameworkCoreFlowTestDriver : FlowTestDriver
+{
+ private readonly IDatabaseContainer _container;
+ private readonly string? _schema;
+
+ protected EntityFrameworkCoreFlowTestDriver(IDatabaseContainer container, string? schema)
+ {
+ _container = container;
+ _schema = schema;
+ }
+
+ public override async ValueTask InitializeAsync()
+ {
+ await _container.StartAsync();
+ await using var dbContext = CreateDbContext();
+ await dbContext.Database.EnsureCreatedAsync();
+ }
+
+ public override async ValueTask DisposeAsync()
+ {
+ await _container.DisposeAsync();
+ }
+
+ public override void ConfigureServices(IServiceCollection services, string runId)
+ {
+ services.AddScoped(_ => CreateDbContext());
+ }
+
+ public override void ConfigureStorage(AtomizerOptions options, string runId)
+ {
+ options.UseEntityFrameworkCoreStorage();
+ }
+
+ public override async Task ResetAsync(CancellationToken cancellationToken)
+ {
+ await using var dbContext = CreateDbContext();
+
+ dbContext.Set().RemoveRange(dbContext.Set());
+ dbContext.Set().RemoveRange(dbContext.Set());
+ dbContext.Set().RemoveRange(dbContext.Set());
+ dbContext.Set().RemoveRange(dbContext.Set());
+
+ await dbContext.SaveChangesAsync(cancellationToken);
+ }
+
+ protected abstract void ConfigureProvider(DbContextOptionsBuilder optionsBuilder);
+
+ private FlowTestDbContext CreateDbContext()
+ {
+ var optionsBuilder = new DbContextOptionsBuilder();
+ ConfigureProvider(optionsBuilder);
+ return new FlowTestDbContext(optionsBuilder.Options, _schema);
+ }
+
+ protected string ConnectionString => _container.GetConnectionString();
+}
+
+public sealed class PostgreSqlFlowTestDriver : EntityFrameworkCoreFlowTestDriver
+{
+ public PostgreSqlFlowTestDriver()
+ : base(
+ new PostgreSqlBuilder().WithDatabase("atomizer").WithUsername("postgres").WithPassword("secret").Build(),
+ "Atomizer"
+ ) { }
+
+ public override string Name => "PostgreSQL";
+
+ protected override void ConfigureProvider(DbContextOptionsBuilder optionsBuilder)
+ {
+ optionsBuilder.UseNpgsql(ConnectionString);
+ }
+}
+
+public sealed class MySqlFlowTestDriver : EntityFrameworkCoreFlowTestDriver
+{
+ public MySqlFlowTestDriver()
+ : base(
+ new MySqlBuilder().WithDatabase("atomizer").WithUsername("root").WithPassword("secret").Build(),
+ schema: null
+ ) { }
+
+ public override string Name => "MySQL";
+
+ protected override void ConfigureProvider(DbContextOptionsBuilder optionsBuilder)
+ {
+ optionsBuilder.UseMySql(ConnectionString, ServerVersion.AutoDetect(ConnectionString));
+ }
+}
+
+public sealed class SqlServerFlowTestDriver : EntityFrameworkCoreFlowTestDriver
+{
+ public SqlServerFlowTestDriver()
+ : base(new MsSqlBuilder().Build(), "Atomizer") { }
+
+ public override string Name => "SQL Server";
+
+ protected override void ConfigureProvider(DbContextOptionsBuilder optionsBuilder)
+ {
+ optionsBuilder.UseSqlServer(ConnectionString);
+ }
+}
+
+[CollectionDefinition(nameof(PostgreSqlFlowTestDriver))]
+public sealed class PostgreSqlFlowTestCollection : ICollectionFixture;
+
+[CollectionDefinition(nameof(MySqlFlowTestDriver))]
+public sealed class MySqlFlowTestCollection : ICollectionFixture;
+
+[CollectionDefinition(nameof(SqlServerFlowTestDriver))]
+public sealed class SqlServerFlowTestCollection : ICollectionFixture;
diff --git a/tests/Atomizer.FlowTests/Drivers/FlowTestDriver.cs b/tests/Atomizer.FlowTests/Drivers/FlowTestDriver.cs
new file mode 100644
index 0000000..25dc1aa
--- /dev/null
+++ b/tests/Atomizer.FlowTests/Drivers/FlowTestDriver.cs
@@ -0,0 +1,18 @@
+using Microsoft.Extensions.DependencyInjection;
+
+namespace Atomizer.FlowTests.Drivers;
+
+public abstract class FlowTestDriver : IAsyncLifetime
+{
+ public abstract string Name { get; }
+
+ public virtual ValueTask InitializeAsync() => ValueTask.CompletedTask;
+
+ public virtual ValueTask DisposeAsync() => ValueTask.CompletedTask;
+
+ public virtual Task ResetAsync(CancellationToken cancellationToken) => Task.CompletedTask;
+
+ public virtual void ConfigureServices(IServiceCollection services, string runId) { }
+
+ public abstract void ConfigureStorage(AtomizerOptions options, string runId);
+}
diff --git a/tests/Atomizer.FlowTests/Drivers/InMemoryFlowTestDriver.cs b/tests/Atomizer.FlowTests/Drivers/InMemoryFlowTestDriver.cs
new file mode 100644
index 0000000..3aea139
--- /dev/null
+++ b/tests/Atomizer.FlowTests/Drivers/InMemoryFlowTestDriver.cs
@@ -0,0 +1,37 @@
+using Atomizer.Abstractions;
+using Atomizer.Core;
+using Atomizer.Storage;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Logging.Abstractions;
+
+namespace Atomizer.FlowTests.Drivers;
+
+public sealed class InMemoryFlowTestDriver : FlowTestDriver
+{
+ private InMemoryStorage? _storage;
+
+ public override string Name => "InMemory";
+
+ public override Task ResetAsync(CancellationToken cancellationToken)
+ {
+ _storage = null;
+ return Task.CompletedTask;
+ }
+
+ public override void ConfigureStorage(AtomizerOptions options, string runId)
+ {
+ options.JobStorageOptions = new JobStorageOptions(sp =>
+ {
+ _storage ??= new InMemoryStorage(
+ new InMemoryJobStorageOptions { AmountOfJobsToRetainInMemory = 1000 },
+ sp.GetRequiredService(),
+ NullLogger.Instance
+ );
+
+ return _storage;
+ });
+ }
+}
+
+[CollectionDefinition(nameof(InMemoryFlowTestDriver))]
+public sealed class InMemoryFlowTestCollection : ICollectionFixture;
diff --git a/tests/Atomizer.FlowTests/Drivers/RedisFlowTestDriver.cs b/tests/Atomizer.FlowTests/Drivers/RedisFlowTestDriver.cs
new file mode 100644
index 0000000..2c41093
--- /dev/null
+++ b/tests/Atomizer.FlowTests/Drivers/RedisFlowTestDriver.cs
@@ -0,0 +1,47 @@
+using Atomizer.Redis;
+using StackExchange.Redis;
+using Testcontainers.Redis;
+
+namespace Atomizer.FlowTests.Drivers;
+
+public sealed class RedisFlowTestDriver : FlowTestDriver
+{
+ private readonly RedisContainer _container = new RedisBuilder("redis:7-alpine").Build();
+
+ public override string Name => "Redis";
+
+ public IConnectionMultiplexer Connection { get; private set; } = null!;
+
+ public override async ValueTask InitializeAsync()
+ {
+ await _container.StartAsync();
+ var configuration = ConfigurationOptions.Parse(_container.GetConnectionString());
+ configuration.AllowAdmin = true;
+ Connection = await ConnectionMultiplexer.ConnectAsync(configuration);
+ }
+
+ public override async ValueTask DisposeAsync()
+ {
+ if (Connection is not null)
+ await Connection.DisposeAsync();
+
+ await _container.DisposeAsync();
+ }
+
+ public override void ConfigureStorage(AtomizerOptions options, string runId)
+ {
+ options.UseRedisStorage(Connection, storage => storage.KeyPrefix = $"flow:{runId}");
+ }
+
+ public override async Task ResetAsync(CancellationToken cancellationToken)
+ {
+ foreach (var endpoint in Connection.GetEndPoints())
+ {
+ var server = Connection.GetServer(endpoint);
+ await server.FlushDatabaseAsync();
+ }
+ }
+}
+
+[CollectionDefinition(nameof(RedisFlowTestDriver))]
+public sealed class RedisFlowTestCollection : ICollectionFixture;
diff --git a/tests/Atomizer.FlowTests/EnqueueFlowTests.cs b/tests/Atomizer.FlowTests/EnqueueFlowTests.cs
new file mode 100644
index 0000000..89a5b4e
--- /dev/null
+++ b/tests/Atomizer.FlowTests/EnqueueFlowTests.cs
@@ -0,0 +1,275 @@
+using Atomizer.FlowTests.Infrastructure;
+using Atomizer.FlowTests.TestJobs;
+
+namespace Atomizer.FlowTests;
+
+public abstract partial class AtomizerFlowTests
+{
+ [Fact]
+ public async Task EnqueueAsync_WhenHandlerSucceeds_ShouldProcessAndPersistCompletedJob()
+ {
+ var host = await StartHostAsync();
+ var key = NewKey();
+
+ var jobId = await host.Client.EnqueueAsync(
+ new FlowPayload(key),
+ cancellation: TestContext.Current.CancellationToken
+ );
+
+ await _recorder.WaitForCountAsync(key, 1, TestContext.Current.CancellationToken);
+ var job = await host.WaitForJobAsync(
+ jobId,
+ job => job.Status == AtomizerJobStatus.Completed,
+ TestContext.Current.CancellationToken
+ );
+
+ job.Attempts.Should().Be(1);
+ job.CompletedAt.Should().NotBeNull();
+ job.FailedAt.Should().BeNull();
+ job.Errors.Should().BeEmpty();
+ }
+
+ [Fact]
+ public async Task EnqueueAsync_WhenHandlerFailsWithoutRetry_ShouldPersistFailedJob()
+ {
+ var host = await StartHostAsync();
+ var key = NewKey();
+
+ var jobId = await host.Client.EnqueueAsync(
+ new FailingFlowPayload(key),
+ options => options.RetryStrategy = RetryStrategy.None,
+ TestContext.Current.CancellationToken
+ );
+
+ var job = await host.WaitForJobAsync(
+ jobId,
+ job => job.Status == AtomizerJobStatus.Failed,
+ TestContext.Current.CancellationToken
+ );
+
+ _recorder.AttemptsFor(key).Should().ContainSingle();
+ job.Attempts.Should().Be(1);
+ job.CompletedAt.Should().BeNull();
+ job.FailedAt.Should().NotBeNull();
+ job.Errors.Should().ContainSingle();
+ }
+
+ [Fact]
+ public async Task EnqueueAsync_WhenHandlerFailsThenSucceeds_ShouldRetryAndPersistCompletedJob()
+ {
+ var host = await StartHostAsync();
+ var key = NewKey();
+
+ var jobId = await host.Client.EnqueueAsync(
+ new EventuallySuccessfulFlowPayload(key, FailuresBeforeSuccess: 1),
+ options => options.RetryStrategy = RetryStrategy.Fixed(TimeSpan.Zero, maxAttempts: 2),
+ TestContext.Current.CancellationToken
+ );
+
+ await _recorder.WaitForCountAsync(key, 2, TestContext.Current.CancellationToken);
+ var job = await host.WaitForJobAsync(
+ jobId,
+ job => job.Status == AtomizerJobStatus.Completed,
+ TestContext.Current.CancellationToken
+ );
+
+ job.Attempts.Should().Be(2);
+ job.Errors.Should().ContainSingle();
+ job.CompletedAt.Should().NotBeNull();
+ }
+
+ [Fact]
+ public async Task EnqueueAsync_WhenIdempotencyKeyIsReused_ShouldProcessSinglePersistedJob()
+ {
+ var host = await StartHostAsync();
+ var key = NewKey();
+ var idempotencyKey = $"idem-{key}";
+
+ var firstJobId = await host.Client.EnqueueAsync(
+ new FlowPayload(key),
+ options => options.IdempotencyKey = idempotencyKey,
+ TestContext.Current.CancellationToken
+ );
+ var secondJobId = await host.Client.EnqueueAsync(
+ new FlowPayload(key),
+ options => options.IdempotencyKey = idempotencyKey,
+ TestContext.Current.CancellationToken
+ );
+
+ secondJobId.Should().Be(firstJobId);
+ await host.WaitForJobAsync(
+ firstJobId,
+ job => job.Status == AtomizerJobStatus.Completed,
+ TestContext.Current.CancellationToken
+ );
+ var jobs = await host.GetJobsAsync(TestContext.Current.CancellationToken);
+
+ jobs.Count(job => job.IdempotencyKey == idempotencyKey).Should().Be(1);
+ _recorder.AttemptsFor(key).Should().ContainSingle();
+ }
+
+ [Fact]
+ public async Task EnqueueAsync_WhenPartitioned_ShouldPersistSequenceAndCompleteJobs()
+ {
+ var host = await StartHostAsync(options =>
+ {
+ options.AutoStart = false;
+ options.QueueBatchSize = 1;
+ options.QueueDegreeOfParallelism = 1;
+ });
+ var key = NewKey();
+ var partition = new PartitionKey($"partition-{key}");
+ var expectedKeys = Enumerable.Range(1, 4).Select(index => $"{key}-{index}").ToArray();
+ var payloadKeysByJobId = new Dictionary();
+
+ foreach (var payloadKey in expectedKeys)
+ {
+ var jobId = await host.Client.EnqueueAsync(
+ new FlowPayload(payloadKey),
+ options => options.PartitionKey = partition,
+ TestContext.Current.CancellationToken
+ );
+ payloadKeysByJobId[jobId] = payloadKey;
+ }
+
+ await host.StartAsync(TestContext.Current.CancellationToken);
+ var completedJobs = await host.WaitForJobsAsync(
+ jobs =>
+ payloadKeysByJobId.Keys.All(jobId =>
+ jobs.Any(job => job.Id == jobId && job.Status == AtomizerJobStatus.Completed)
+ ),
+ "Partitioned jobs were not all persisted as completed.",
+ TestContext.Current.CancellationToken
+ );
+
+ var persistedJobs = completedJobs
+ .Where(job => payloadKeysByJobId.ContainsKey(job.Id))
+ .OrderBy(job => job.SequenceNumber)
+ .ThenBy(job => job.ScheduledAt)
+ .ThenBy(job => job.CreatedAt)
+ .ToArray();
+
+ persistedJobs.Select(job => payloadKeysByJobId[job.Id]).Should().Equal(expectedKeys);
+ persistedJobs.Select(job => job.SequenceNumber).Should().Equal(1, 2, 3, 4);
+ persistedJobs
+ .Should()
+ .AllSatisfy(job =>
+ {
+ job.PartitionKey.Should().Be(partition);
+ job.Status.Should().Be(AtomizerJobStatus.Completed);
+ });
+ }
+
+ [Fact]
+ public async Task EnqueueAsync_WhenMultipleQueuesConfigured_ShouldProcessEachQueue()
+ {
+ var host = await StartHostAsync();
+ var defaultKey = NewKey();
+ var criticalKey = NewKey();
+
+ var defaultJobId = await host.Client.EnqueueAsync(
+ new FlowPayload(defaultKey),
+ cancellation: TestContext.Current.CancellationToken
+ );
+ var criticalJobId = await host.Client.EnqueueAsync(
+ new FlowPayload(criticalKey),
+ options => options.Queue = FlowQueues.Critical,
+ TestContext.Current.CancellationToken
+ );
+
+ await host.WaitForJobAsync(
+ defaultJobId,
+ job => job.Status == AtomizerJobStatus.Completed,
+ TestContext.Current.CancellationToken
+ );
+ var criticalJob = await host.WaitForJobAsync(
+ criticalJobId,
+ job => job.Status == AtomizerJobStatus.Completed,
+ TestContext.Current.CancellationToken
+ );
+
+ criticalJob.QueueKey.Should().Be(FlowQueues.Critical);
+ _recorder.AttemptsFor(defaultKey).Should().ContainSingle();
+ _recorder.AttemptsFor(criticalKey).Should().ContainSingle();
+ }
+
+ [Fact]
+ public async Task EnqueueAsync_WhenQueueIsNotConfigured_ShouldRemainPending()
+ {
+ var host = await StartHostAsync();
+ var key = NewKey();
+ var unconfiguredQueue = new QueueKey($"missing-{key}");
+
+ var jobId = await host.Client.EnqueueAsync(
+ new FlowPayload(key),
+ options => options.Queue = unconfiguredQueue,
+ TestContext.Current.CancellationToken
+ );
+
+ await Task.Delay(TimeSpan.FromMilliseconds(500), TestContext.Current.CancellationToken);
+
+ var job = await host.GetJobAsync(jobId, TestContext.Current.CancellationToken);
+ job.Should().NotBeNull();
+ job!.Status.Should().Be(AtomizerJobStatus.Pending);
+ job.QueueKey.Should().Be(unconfiguredQueue);
+ _recorder.AttemptsFor(key).Should().BeEmpty();
+ }
+
+ [Fact]
+ public async Task EnqueueAsync_WhenHandlerIsMissing_ShouldPersistFailedJob()
+ {
+ var host = await StartHostAsync();
+ var key = NewKey();
+
+ var jobId = await host.Client.EnqueueAsync(
+ new MissingHandlerPayload(key),
+ options => options.RetryStrategy = RetryStrategy.None,
+ TestContext.Current.CancellationToken
+ );
+
+ var job = await host.WaitForJobAsync(
+ jobId,
+ job => job.Status == AtomizerJobStatus.Failed,
+ TestContext.Current.CancellationToken
+ );
+
+ _recorder.AttemptsFor(key).Should().BeEmpty();
+ job.Attempts.Should().Be(1);
+ job.Errors.Should().ContainSingle();
+ }
+
+ [Fact]
+ public async Task EnqueueAsync_WhenTwoHostsProcessSameQueue_ShouldCompleteEachJobExactlyOnce()
+ {
+ var firstHost = await StartHostAsync(options => options.AutoStart = false);
+ var secondHost = await StartHostAsync(options => options.AutoStart = false);
+ var keys = Enumerable.Range(1, 12).Select(_ => NewKey()).ToArray();
+ var jobIds = new List();
+
+ foreach (var key in keys)
+ {
+ jobIds.Add(
+ await firstHost.Client.EnqueueAsync(
+ new FlowPayload(key),
+ cancellation: TestContext.Current.CancellationToken
+ )
+ );
+ }
+
+ await Task.WhenAll(
+ firstHost.StartAsync(TestContext.Current.CancellationToken),
+ secondHost.StartAsync(TestContext.Current.CancellationToken)
+ );
+
+ await firstHost.WaitForJobsAsync(
+ jobs => jobIds.All(jobId => jobs.Any(job => job.Id == jobId && job.Status == AtomizerJobStatus.Completed)),
+ "Two processing hosts did not complete every queued job.",
+ TestContext.Current.CancellationToken
+ );
+
+ foreach (var key in keys)
+ {
+ _recorder.AttemptsFor(key).Should().ContainSingle();
+ }
+ }
+}
diff --git a/tests/Atomizer.FlowTests/ExecuteFlowTests.cs b/tests/Atomizer.FlowTests/ExecuteFlowTests.cs
new file mode 100644
index 0000000..e247b18
--- /dev/null
+++ b/tests/Atomizer.FlowTests/ExecuteFlowTests.cs
@@ -0,0 +1,68 @@
+using Atomizer.FlowTests.TestJobs;
+
+namespace Atomizer.FlowTests;
+
+public abstract partial class AtomizerFlowTests
+{
+ [Fact]
+ public async Task ExecuteAsync_WhenHandlerSucceeds_ShouldPersistCompletedJobImmediately()
+ {
+ var host = await StartHostAsync(options => options.AddProcessing = false);
+ var key = NewKey();
+
+ var jobId = await host.Client.ExecuteAsync(
+ new FlowPayload(key),
+ cancellation: TestContext.Current.CancellationToken
+ );
+
+ var job = await host.GetJobAsync(jobId, TestContext.Current.CancellationToken);
+ job.Should().NotBeNull();
+ job!.Status.Should().Be(AtomizerJobStatus.Completed);
+ job.Attempts.Should().Be(1);
+ _recorder.AttemptsFor(key).Should().ContainSingle();
+ }
+
+ [Fact]
+ public async Task ExecuteAsync_WhenHandlerThrows_ShouldPersistFailedJobAndRethrow()
+ {
+ var host = await StartHostAsync(options => options.AddProcessing = false);
+ var key = NewKey();
+
+ var exception = await Assert.ThrowsAsync(() =>
+ host.Client.ExecuteAsync(new FailingFlowPayload(key), cancellation: TestContext.Current.CancellationToken)
+ );
+
+ exception.Message.Should().Be(key);
+ var attempt = _recorder.AttemptsFor(key).Should().ContainSingle().Subject;
+ var job = await host.GetJobAsync(attempt.JobId, TestContext.Current.CancellationToken);
+ job.Should().NotBeNull();
+ job!.Status.Should().Be(AtomizerJobStatus.Failed);
+ job.Errors.Should().ContainSingle();
+ }
+
+ [Fact]
+ public async Task ExecuteAsync_WhenIdempotencyKeyAlreadyExists_ShouldNotDispatchDuplicateExecution()
+ {
+ var host = await StartHostAsync(options => options.AddProcessing = false);
+ var firstKey = NewKey();
+ var secondKey = NewKey();
+ var idempotencyKey = $"direct-{firstKey}";
+
+ var firstJobId = await host.Client.ExecuteAsync(
+ new FlowPayload(firstKey),
+ options => options.IdempotencyKey = idempotencyKey,
+ TestContext.Current.CancellationToken
+ );
+ var secondJobId = await host.Client.ExecuteAsync(
+ new FlowPayload(secondKey),
+ options => options.IdempotencyKey = idempotencyKey,
+ TestContext.Current.CancellationToken
+ );
+
+ secondJobId.Should().Be(firstJobId);
+ _recorder.AttemptsFor(firstKey).Should().ContainSingle();
+ _recorder.AttemptsFor(secondKey).Should().BeEmpty();
+ var jobs = await host.GetJobsAsync(TestContext.Current.CancellationToken);
+ jobs.Count(job => job.IdempotencyKey == idempotencyKey).Should().Be(1);
+ }
+}
diff --git a/tests/Atomizer.FlowTests/Infrastructure/FlowQueues.cs b/tests/Atomizer.FlowTests/Infrastructure/FlowQueues.cs
new file mode 100644
index 0000000..3a141c0
--- /dev/null
+++ b/tests/Atomizer.FlowTests/Infrastructure/FlowQueues.cs
@@ -0,0 +1,6 @@
+namespace Atomizer.FlowTests.Infrastructure;
+
+internal static class FlowQueues
+{
+ public static readonly QueueKey Critical = new QueueKey("critical");
+}
diff --git a/tests/Atomizer.FlowTests/Infrastructure/FlowTestDbContext.cs b/tests/Atomizer.FlowTests/Infrastructure/FlowTestDbContext.cs
new file mode 100644
index 0000000..4a3b298
--- /dev/null
+++ b/tests/Atomizer.FlowTests/Infrastructure/FlowTestDbContext.cs
@@ -0,0 +1,21 @@
+using Atomizer.EntityFrameworkCore;
+using Microsoft.EntityFrameworkCore;
+
+namespace Atomizer.FlowTests.Infrastructure;
+
+public sealed class FlowTestDbContext : DbContext
+{
+ private readonly string? _schema;
+
+ public FlowTestDbContext(DbContextOptions options, string? schema = null)
+ : base(options)
+ {
+ _schema = schema;
+ }
+
+ protected override void OnModelCreating(ModelBuilder modelBuilder)
+ {
+ modelBuilder.AddAtomizerEntities(_schema);
+ base.OnModelCreating(modelBuilder);
+ }
+}
diff --git a/tests/Atomizer.FlowTests/Infrastructure/FlowTestHost.cs b/tests/Atomizer.FlowTests/Infrastructure/FlowTestHost.cs
new file mode 100644
index 0000000..bb84fc9
--- /dev/null
+++ b/tests/Atomizer.FlowTests/Infrastructure/FlowTestHost.cs
@@ -0,0 +1,253 @@
+using Atomizer.Abstractions;
+using Atomizer.FlowTests.Drivers;
+using Atomizer.FlowTests.TestJobs;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Hosting;
+
+namespace Atomizer.FlowTests.Infrastructure;
+
+internal sealed class FlowTestHost : IAsyncDisposable
+{
+ private readonly IHost _host;
+ private bool _stopped;
+
+ private FlowTestHost(IHost host)
+ {
+ _host = host;
+ }
+
+ public IAtomizerClient Client => _host.Services.GetRequiredService();
+
+ public static async Task CreateAsync(
+ FlowTestDriver driver,
+ string runId,
+ FlowTestRecorder recorder,
+ Action? configure,
+ CancellationToken cancellationToken
+ )
+ {
+ var options = new FlowHostOptions();
+ configure?.Invoke(options);
+
+ var host = new HostBuilder()
+ .ConfigureServices(services =>
+ {
+ services.AddLogging();
+ services.AddSingleton(recorder);
+ driver.ConfigureServices(services, runId);
+
+ services.AddAtomizer(atomizer =>
+ {
+ atomizer.AddHandlersFrom();
+ atomizer.AddQueue(QueueKey.Default, queue => ConfigureQueue(queue, options));
+ atomizer.AddQueue(FlowQueues.Critical, queue => ConfigureQueue(queue, options));
+ atomizer.ConfigureScheduling(scheduling =>
+ {
+ scheduling.StorageCheckInterval = FlowTestTimings.PollInterval;
+ scheduling.ScheduleLeadTime = FlowTestTimings.ScheduleLeadTime;
+ scheduling.TickInterval = FlowTestTimings.PollInterval;
+ });
+
+ driver.ConfigureStorage(atomizer, runId);
+ options.ConfigureAtomizer?.Invoke(atomizer);
+ });
+
+ if (options.AddProcessing)
+ {
+ services.AddAtomizerProcessing(processing =>
+ {
+ processing.GracefulShutdownTimeout = options.GracefulShutdownTimeout;
+ processing.HeartbeatInterval = FlowTestTimings.PollInterval;
+ processing.StaleSweepInterval = FlowTestTimings.PollInterval;
+ processing.StaleServerTimeout = TimeSpan.FromMilliseconds(200);
+ processing.JobRetention = options.JobRetention;
+ processing.JobRetentionSweepInterval = FlowTestTimings.PollInterval;
+ options.ConfigureProcessing?.Invoke(processing);
+ });
+ }
+ })
+ .Build();
+
+ var testHost = new FlowTestHost(host);
+ if (options.AutoStart)
+ {
+ await testHost.StartAsync(cancellationToken);
+ }
+
+ return testHost;
+ }
+
+ public async Task StartAsync(CancellationToken cancellationToken)
+ {
+ await _host.StartAsync(cancellationToken);
+ _stopped = false;
+ }
+
+ public async Task StopAsync(CancellationToken cancellationToken)
+ {
+ if (_stopped)
+ return;
+
+ await _host.StopAsync(cancellationToken);
+ _stopped = true;
+ }
+
+ public async ValueTask DisposeAsync()
+ {
+ await StopAsync(CancellationToken.None);
+ _host.Dispose();
+ }
+
+ public async Task GetJobAsync(Guid jobId, CancellationToken cancellationToken)
+ {
+ using var scope = _host.Services.CreateScope();
+ var storage = scope.ServiceProvider.GetRequiredService();
+ return await storage.GetJobByIdAsync(jobId, cancellationToken);
+ }
+
+ public async Task> GetJobsAsync(CancellationToken cancellationToken)
+ {
+ using var scope = _host.Services.CreateScope();
+ var storage = scope.ServiceProvider.GetRequiredService();
+ var result = await storage.GetJobsAsync(new JobQuery { Take = 500 }, cancellationToken);
+ return result.Items;
+ }
+
+ public async Task> GetSchedulesAsync(CancellationToken cancellationToken)
+ {
+ using var scope = _host.Services.CreateScope();
+ var storage = scope.ServiceProvider.GetRequiredService();
+ return await storage.GetSchedulesAsync(cancellationToken);
+ }
+
+ public async Task UpdateJobsAsync(IEnumerable jobs, CancellationToken cancellationToken)
+ {
+ using var scope = _host.Services.CreateScope();
+ var storage = scope.ServiceProvider.GetRequiredService();
+ await storage.UpdateJobsAsync(jobs, cancellationToken);
+ }
+
+ public async Task UpdateSchedulesAsync(IEnumerable schedules, CancellationToken cancellationToken)
+ {
+ using var scope = _host.Services.CreateScope();
+ var storage = scope.ServiceProvider.GetRequiredService();
+ await storage.UpdateSchedulesAsync(schedules, cancellationToken);
+ }
+
+ public async Task UpsertHeartbeatAsync(AtomizerActiveServer server, CancellationToken cancellationToken)
+ {
+ using var scope = _host.Services.CreateScope();
+ var storage = scope.ServiceProvider.GetRequiredService();
+ await storage.UpsertHeartbeatAsync(server, cancellationToken);
+ }
+
+ public async Task WaitForJobAsync(
+ Guid jobId,
+ Func predicate,
+ CancellationToken cancellationToken
+ )
+ {
+ return await WaitUntilAsync(
+ async ct => await GetJobAsync(jobId, ct),
+ job => job is not null && predicate(job),
+ $"Job {jobId} did not reach the expected state.",
+ cancellationToken
+ ) ?? throw new InvalidOperationException("Wait returned null despite predicate success.");
+ }
+
+ public async Task> WaitForJobsAsync(
+ Func, bool> predicate,
+ string timeoutMessage,
+ CancellationToken cancellationToken
+ )
+ {
+ return await WaitUntilAsync(async ct => await GetJobsAsync(ct), predicate, timeoutMessage, cancellationToken);
+ }
+
+ public async Task> WaitForSchedulesAsync(
+ Func, bool> predicate,
+ string timeoutMessage,
+ CancellationToken cancellationToken
+ )
+ {
+ return await WaitUntilAsync(
+ async ct => await GetSchedulesAsync(ct),
+ predicate,
+ timeoutMessage,
+ cancellationToken
+ );
+ }
+
+ private static async Task WaitUntilAsync(
+ Func> probe,
+ Func predicate,
+ string timeoutMessage,
+ CancellationToken cancellationToken
+ )
+ {
+ using var timeout = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
+ timeout.CancelAfter(FlowTestTimings.WaitTimeout);
+ Exception? lastProbeException = null;
+
+ while (true)
+ {
+ T value;
+ try
+ {
+ value = await probe(timeout.Token);
+ }
+ catch (OperationCanceledException) when (timeout.IsCancellationRequested)
+ {
+ throw new TimeoutException(timeoutMessage, lastProbeException);
+ }
+ catch (Exception ex) when (!timeout.IsCancellationRequested)
+ {
+ lastProbeException = ex;
+ await DelayOrTimeoutAsync(timeout.Token, timeoutMessage, lastProbeException);
+ continue;
+ }
+
+ if (predicate(value))
+ return value;
+
+ await DelayOrTimeoutAsync(timeout.Token, timeoutMessage, lastProbeException);
+ }
+ }
+
+ private static async Task DelayOrTimeoutAsync(
+ CancellationToken cancellationToken,
+ string timeoutMessage,
+ Exception? innerException
+ )
+ {
+ try
+ {
+ await Task.Delay(FlowTestTimings.PollInterval, cancellationToken);
+ }
+ catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
+ {
+ throw new TimeoutException(timeoutMessage, innerException);
+ }
+ }
+
+ private static void ConfigureQueue(QueueOptions queue, FlowHostOptions options)
+ {
+ queue.BatchSize = options.QueueBatchSize;
+ queue.DegreeOfParallelism = options.QueueDegreeOfParallelism;
+ queue.StorageCheckInterval = FlowTestTimings.PollInterval;
+ queue.TickInterval = FlowTestTimings.PollInterval;
+ queue.VisibilityTimeout = FlowTestTimings.QueueVisibilityTimeout;
+ }
+}
+
+internal sealed class FlowHostOptions
+{
+ public bool AddProcessing { get; set; } = true;
+ public bool AutoStart { get; set; } = true;
+ public int QueueBatchSize { get; set; } = 10;
+ public int QueueDegreeOfParallelism { get; set; } = 4;
+ public TimeSpan GracefulShutdownTimeout { get; set; } = TimeSpan.FromSeconds(1);
+ public TimeSpan? JobRetention { get; set; }
+ public Action? ConfigureAtomizer { get; set; }
+ public Action? ConfigureProcessing { get; set; }
+}
diff --git a/tests/Atomizer.FlowTests/Infrastructure/FlowTestRecorder.cs b/tests/Atomizer.FlowTests/Infrastructure/FlowTestRecorder.cs
new file mode 100644
index 0000000..f5b3085
--- /dev/null
+++ b/tests/Atomizer.FlowTests/Infrastructure/FlowTestRecorder.cs
@@ -0,0 +1,99 @@
+namespace Atomizer.FlowTests.Infrastructure;
+
+internal sealed class FlowTestRecorder
+{
+ private readonly object _sync = new object();
+ private readonly List _attempts = new List();
+ private TaskCompletionSource _changed = CreateSignal();
+ private int _order;
+
+ public FlowAttempt Record(string key, JobContext context)
+ {
+ lock (_sync)
+ {
+ var attempt = new FlowAttempt(
+ key,
+ context.Job.Id,
+ context.Job.QueueKey,
+ context.Job.Attempts,
+ Interlocked.Increment(ref _order),
+ DateTimeOffset.UtcNow
+ );
+ _attempts.Add(attempt);
+ _changed.TrySetResult();
+ _changed = CreateSignal();
+ return attempt;
+ }
+ }
+
+ public IReadOnlyList AttemptsFor(string key)
+ {
+ lock (_sync)
+ {
+ return _attempts.Where(attempt => attempt.Key == key).ToArray();
+ }
+ }
+
+ public IReadOnlyList Attempts
+ {
+ get
+ {
+ lock (_sync)
+ {
+ return _attempts.ToArray();
+ }
+ }
+ }
+
+ public async Task WaitForCountAsync(string key, int expectedCount, CancellationToken cancellationToken)
+ {
+ await WaitUntilAsync(
+ attempts => attempts.Count(attempt => attempt.Key == key) >= expectedCount,
+ $"Expected at least {expectedCount} attempt(s) for '{key}'.",
+ cancellationToken
+ );
+ }
+
+ public async Task WaitUntilAsync(
+ Func, bool> predicate,
+ string timeoutMessage,
+ CancellationToken cancellationToken
+ )
+ {
+ using var timeout = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
+ timeout.CancelAfter(FlowTestTimings.WaitTimeout);
+
+ while (true)
+ {
+ Task changedTask;
+ lock (_sync)
+ {
+ if (predicate(_attempts.ToArray()))
+ return;
+
+ changedTask = _changed.Task;
+ }
+
+ try
+ {
+ await changedTask.WaitAsync(timeout.Token);
+ }
+ catch (OperationCanceledException) when (timeout.IsCancellationRequested)
+ {
+ throw new TimeoutException(timeoutMessage);
+ }
+ }
+ }
+
+ private static TaskCompletionSource CreateSignal() =>
+ new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
+}
+
+internal sealed record FlowAttempt(
+ string Key,
+ Guid JobId,
+ QueueKey QueueKey,
+ int Attempt,
+ int Order,
+ DateTimeOffset StartedAt
+);
diff --git a/tests/Atomizer.FlowTests/Infrastructure/FlowTestTimings.cs b/tests/Atomizer.FlowTests/Infrastructure/FlowTestTimings.cs
new file mode 100644
index 0000000..5bab229
--- /dev/null
+++ b/tests/Atomizer.FlowTests/Infrastructure/FlowTestTimings.cs
@@ -0,0 +1,9 @@
+namespace Atomizer.FlowTests.Infrastructure;
+
+internal static class FlowTestTimings
+{
+ public static readonly TimeSpan PollInterval = TimeSpan.FromMilliseconds(25);
+ public static readonly TimeSpan QueueVisibilityTimeout = TimeSpan.FromSeconds(5);
+ public static readonly TimeSpan ScheduleLeadTime = TimeSpan.FromMilliseconds(250);
+ public static readonly TimeSpan WaitTimeout = TimeSpan.FromSeconds(60);
+}
diff --git a/tests/Atomizer.FlowTests/ReliabilityFlowTests.cs b/tests/Atomizer.FlowTests/ReliabilityFlowTests.cs
new file mode 100644
index 0000000..09616fd
--- /dev/null
+++ b/tests/Atomizer.FlowTests/ReliabilityFlowTests.cs
@@ -0,0 +1,132 @@
+using Atomizer.FlowTests.TestJobs;
+
+namespace Atomizer.FlowTests;
+
+public abstract partial class AtomizerFlowTests
+{
+ [Fact]
+ public async Task HeartbeatRecovery_WhenStaleServerOwnsProcessingJob_ShouldReleaseAndProcessJob()
+ {
+ var host = await StartHostAsync(options => options.AutoStart = false);
+ var key = NewKey();
+ var staleInstanceId = $"stale-{key}";
+ var jobId = await host.Client.EnqueueAsync(
+ new FlowPayload(key),
+ cancellation: TestContext.Current.CancellationToken
+ );
+ var job = await host.GetJobAsync(jobId, TestContext.Current.CancellationToken);
+ job.Should().NotBeNull();
+ job!.Lease(
+ new LeaseToken(
+ $"{staleInstanceId}{LeaseToken.Delimiter}{QueueKey.Default}{LeaseToken.Delimiter}{Guid.NewGuid():N}"
+ ),
+ DateTimeOffset.UtcNow.AddSeconds(-10),
+ TimeSpan.FromMinutes(10)
+ );
+ await host.UpdateJobsAsync(new[] { job }, TestContext.Current.CancellationToken);
+ await host.UpsertHeartbeatAsync(
+ new AtomizerActiveServer
+ {
+ InstanceId = staleInstanceId,
+ LastHeartbeatAt = DateTimeOffset.UtcNow.AddSeconds(-10),
+ },
+ TestContext.Current.CancellationToken
+ );
+
+ await host.StartAsync(TestContext.Current.CancellationToken);
+
+ var completed = await host.WaitForJobAsync(
+ jobId,
+ job => job.Status == AtomizerJobStatus.Completed,
+ TestContext.Current.CancellationToken
+ );
+ completed.LeaseToken.Should().BeNull();
+ _recorder.AttemptsFor(key).Should().ContainSingle();
+ }
+
+ [Fact]
+ public async Task VisibilityTimeout_WhenProcessingLeaseExpires_ShouldRetryAndCompleteJob()
+ {
+ var host = await StartHostAsync(options => options.AutoStart = false);
+ var key = NewKey();
+ var staleInstanceId = $"expired-{key}";
+ var jobId = await host.Client.EnqueueAsync(
+ new FlowPayload(key),
+ cancellation: TestContext.Current.CancellationToken
+ );
+ var job = await host.GetJobAsync(jobId, TestContext.Current.CancellationToken);
+ job.Should().NotBeNull();
+ job!.Lease(
+ new LeaseToken(
+ $"{staleInstanceId}{LeaseToken.Delimiter}{QueueKey.Default}{LeaseToken.Delimiter}{Guid.NewGuid():N}"
+ ),
+ DateTimeOffset.UtcNow.AddSeconds(-10),
+ TimeSpan.FromMilliseconds(100)
+ );
+ await host.UpdateJobsAsync(new[] { job }, TestContext.Current.CancellationToken);
+
+ await host.StartAsync(TestContext.Current.CancellationToken);
+
+ var completed = await host.WaitForJobAsync(
+ jobId,
+ job => job.Status == AtomizerJobStatus.Completed,
+ TestContext.Current.CancellationToken
+ );
+ completed.LeaseToken.Should().BeNull();
+ _recorder.AttemptsFor(key).Should().ContainSingle();
+ }
+
+ [Fact]
+ public async Task StopAsync_WhenInFlightJobExceedsGracePeriod_ShouldReleaseForNextHost()
+ {
+ var key = NewKey();
+ var firstHost = await StartHostAsync(options =>
+ {
+ options.GracefulShutdownTimeout = TimeSpan.FromMilliseconds(100);
+ });
+ var jobId = await firstHost.Client.EnqueueAsync(
+ new StopAndResumeFlowPayload(key),
+ cancellation: TestContext.Current.CancellationToken
+ );
+
+ await _recorder.WaitForCountAsync(key, 1, TestContext.Current.CancellationToken);
+ await firstHost.StopAsync(TestContext.Current.CancellationToken);
+ var released = await firstHost.WaitForJobAsync(
+ jobId,
+ job => job.Status == AtomizerJobStatus.Pending,
+ TestContext.Current.CancellationToken
+ );
+ released.LeaseToken.Should().BeNull();
+
+ var secondHost = await StartHostAsync();
+ var completed = await secondHost.WaitForJobAsync(
+ jobId,
+ job => job.Status == AtomizerJobStatus.Completed,
+ TestContext.Current.CancellationToken
+ );
+
+ completed.Attempts.Should().Be(1);
+ var attempts = _recorder.AttemptsFor(key);
+ attempts.Count.Should().BeGreaterThanOrEqualTo(2);
+ attempts.Select(attempt => attempt.JobId).Distinct().Should().ContainSingle().Which.Should().Be(jobId);
+ }
+
+ [Fact]
+ public async Task JobRetention_WhenTerminalJobExpires_ShouldDeleteJob()
+ {
+ var host = await StartHostAsync(options => options.JobRetention = TimeSpan.FromMilliseconds(200));
+ var key = NewKey();
+
+ var jobId = await host.Client.EnqueueAsync(
+ new FlowPayload(key),
+ cancellation: TestContext.Current.CancellationToken
+ );
+ await host.WaitForJobAsync(
+ jobId,
+ job => job.Status == AtomizerJobStatus.Completed,
+ TestContext.Current.CancellationToken
+ );
+
+ await WaitUntilDeletedAsync(host, jobId);
+ }
+}
diff --git a/tests/Atomizer.FlowTests/ScheduleFlowTests.cs b/tests/Atomizer.FlowTests/ScheduleFlowTests.cs
new file mode 100644
index 0000000..ba22988
--- /dev/null
+++ b/tests/Atomizer.FlowTests/ScheduleFlowTests.cs
@@ -0,0 +1,282 @@
+using Atomizer.FlowTests.Infrastructure;
+using Atomizer.FlowTests.TestJobs;
+
+namespace Atomizer.FlowTests;
+
+public abstract partial class AtomizerFlowTests
+{
+ [Fact]
+ public async Task ScheduleAsync_WhenRunAtArrives_ShouldWaitUntilDueThenProcess()
+ {
+ var host = await StartHostAsync();
+ var key = NewKey();
+ var runAt = TruncateToMilliseconds(DateTimeOffset.UtcNow.AddSeconds(2));
+
+ var jobId = await host.Client.ScheduleAsync(
+ new FlowPayload(key),
+ runAt,
+ cancellation: TestContext.Current.CancellationToken
+ );
+
+ await Task.Delay(TimeSpan.FromMilliseconds(350), TestContext.Current.CancellationToken);
+ _recorder.AttemptsFor(key).Should().BeEmpty();
+ var pending = await host.GetJobAsync(jobId, TestContext.Current.CancellationToken);
+ pending.Should().NotBeNull();
+ pending!.Status.Should().Be(AtomizerJobStatus.Pending);
+
+ var completed = await host.WaitForJobAsync(
+ jobId,
+ job => job.Status == AtomizerJobStatus.Completed,
+ TestContext.Current.CancellationToken
+ );
+
+ completed.ScheduledAt.Should().Be(runAt);
+ _recorder.AttemptsFor(key).Should().ContainSingle();
+ }
+
+ private static DateTimeOffset TruncateToMilliseconds(DateTimeOffset value) =>
+ new(value.Ticks - value.Ticks % TimeSpan.TicksPerMillisecond, value.Offset);
+
+ [Fact]
+ public async Task ScheduleRecurringAsync_WhenOccurrenceIsDue_ShouldEnqueueProcessAndAdvanceSchedule()
+ {
+ var host = await StartHostAsync();
+ var key = NewKey();
+ var scheduleKey = new JobKey($"recurring-{key}");
+
+ await host.Client.ScheduleRecurringAsync(
+ new FlowPayload(key),
+ scheduleKey,
+ Schedule.Every().Second(),
+ cancellation: TestContext.Current.CancellationToken
+ );
+
+ await _recorder.WaitForCountAsync(key, 1, TestContext.Current.CancellationToken);
+ var jobs = await host.WaitForJobsAsync(
+ jobs =>
+ jobs.Any(job =>
+ job.ScheduleJobKey == scheduleKey
+ && job.PayloadType == typeof(FlowPayload)
+ && job.Status == AtomizerJobStatus.Completed
+ ),
+ $"Recurring schedule {scheduleKey} did not enqueue and complete a job.",
+ TestContext.Current.CancellationToken
+ );
+ var schedules = await host.WaitForSchedulesAsync(
+ schedules => schedules.Single(schedule => schedule.JobKey == scheduleKey).LastEnqueueAt is not null,
+ $"Recurring schedule {scheduleKey} did not advance.",
+ TestContext.Current.CancellationToken
+ );
+ var schedule = schedules.Single(schedule => schedule.JobKey == scheduleKey);
+
+ jobs.Count(job => job.ScheduleJobKey == scheduleKey).Should().BeGreaterThanOrEqualTo(1);
+ schedule.NextRunAt.Should().BeAfter(schedule.LastEnqueueAt!.Value);
+ }
+
+ [Fact]
+ public async Task ScheduleRecurringAsync_WhenCatchUpMisfires_ShouldEnqueueBoundedCatchUpJobs()
+ {
+ var host = await StartHostAsync(options => options.AutoStart = false);
+ var key = NewKey();
+ var scheduleKey = new JobKey($"catchup-{key}");
+ var now = DateTimeOffset.UtcNow;
+
+ await host.Client.ScheduleRecurringAsync(
+ new FlowPayload(key),
+ scheduleKey,
+ Schedule.Every(10).Seconds(),
+ options =>
+ {
+ options.MisfirePolicy = MisfirePolicy.CatchUp;
+ options.MaxCatchUp = 2;
+ },
+ TestContext.Current.CancellationToken
+ );
+ await MoveScheduleIntoPastAsync(host, scheduleKey, now.AddSeconds(-31), now.AddSeconds(-30));
+
+ await host.StartAsync(TestContext.Current.CancellationToken);
+
+ var jobs = await host.WaitForJobsAsync(
+ jobs =>
+ {
+ var scheduleJobs = jobs.Where(job => job.ScheduleJobKey == scheduleKey).ToArray();
+ return scheduleJobs.Length == 2 && scheduleJobs.All(job => job.Status == AtomizerJobStatus.Completed);
+ },
+ $"Catch-up schedule {scheduleKey} did not enqueue exactly two completed jobs.",
+ TestContext.Current.CancellationToken
+ );
+
+ jobs.Count(job => job.ScheduleJobKey == scheduleKey).Should().Be(2);
+ _recorder.AttemptsFor(key).Should().HaveCount(2);
+ }
+
+ [Fact]
+ public async Task ScheduleRecurringAsync_WhenMisfireIsIgnored_ShouldAdvanceWithoutEnqueuing()
+ {
+ var host = await StartHostAsync(options => options.AutoStart = false);
+ var key = NewKey();
+ var scheduleKey = new JobKey($"ignore-{key}");
+ var now = DateTimeOffset.UtcNow;
+
+ await host.Client.ScheduleRecurringAsync(
+ new FlowPayload(key),
+ scheduleKey,
+ Schedule.Every().Hour(),
+ options => options.MisfirePolicy = MisfirePolicy.Ignore,
+ TestContext.Current.CancellationToken
+ );
+ await MoveScheduleIntoPastAsync(host, scheduleKey, now.AddHours(-2), now.AddHours(-1));
+
+ await host.StartAsync(TestContext.Current.CancellationToken);
+
+ await host.WaitForSchedulesAsync(
+ schedules => schedules.Single(schedule => schedule.JobKey == scheduleKey).LastEnqueueAt is not null,
+ $"Ignore schedule {scheduleKey} did not advance.",
+ TestContext.Current.CancellationToken
+ );
+ await Task.Delay(TimeSpan.FromMilliseconds(300), TestContext.Current.CancellationToken);
+
+ var jobs = await host.GetJobsAsync(TestContext.Current.CancellationToken);
+ jobs.Should().NotContain(job => job.ScheduleJobKey == scheduleKey);
+ _recorder.AttemptsFor(key).Should().BeEmpty();
+ }
+
+ [Fact]
+ public async Task ScheduleRecurringAsync_WhenDisabledScheduleIsDue_ShouldNotEnqueueOccurrence()
+ {
+ var host = await StartHostAsync(options => options.AutoStart = false);
+ var key = NewKey();
+ var scheduleKey = new JobKey($"disabled-{key}");
+ var now = DateTimeOffset.UtcNow;
+
+ await host.Client.ScheduleRecurringAsync(
+ new FlowPayload(key),
+ scheduleKey,
+ Schedule.Every().Second(),
+ options => options.Enabled = false,
+ TestContext.Current.CancellationToken
+ );
+ await MoveScheduleIntoPastAsync(host, scheduleKey, now.AddSeconds(-2), now.AddSeconds(-1));
+
+ await host.StartAsync(TestContext.Current.CancellationToken);
+ await Task.Delay(TimeSpan.FromMilliseconds(500), TestContext.Current.CancellationToken);
+
+ var schedules = await host.GetSchedulesAsync(TestContext.Current.CancellationToken);
+ schedules.Single(schedule => schedule.JobKey == scheduleKey).Enabled.Should().BeFalse();
+ var jobs = await host.GetJobsAsync(TestContext.Current.CancellationToken);
+ jobs.Should().NotContain(job => job.ScheduleJobKey == scheduleKey);
+ _recorder.AttemptsFor(key).Should().BeEmpty();
+ }
+
+ [Fact]
+ public async Task ScheduleRecurringAsync_WhenOptionsAreConfigured_ShouldPropagateToEnqueuedJob()
+ {
+ var host = await StartHostAsync(options => options.AutoStart = false);
+ var key = NewKey();
+ var scheduleKey = new JobKey($"options-{key}");
+ var partition = new PartitionKey($"partition-{key}");
+ var now = DateTimeOffset.UtcNow;
+
+ await host.Client.ScheduleRecurringAsync(
+ new FailingFlowPayload(key),
+ scheduleKey,
+ Schedule.Every().Hour(),
+ options =>
+ {
+ options.Queue = FlowQueues.Critical;
+ options.PartitionKey = partition;
+ options.RetryStrategy = RetryStrategy.None;
+ options.MisfirePolicy = MisfirePolicy.ExecuteNow;
+ },
+ TestContext.Current.CancellationToken
+ );
+ await MoveScheduleIntoPastAsync(host, scheduleKey, now.AddHours(-2), now.AddHours(-1));
+
+ await host.StartAsync(TestContext.Current.CancellationToken);
+
+ var jobs = await host.WaitForJobsAsync(
+ jobs =>
+ jobs.Any(job =>
+ job.ScheduleJobKey == scheduleKey
+ && job.Status == AtomizerJobStatus.Failed
+ && job.QueueKey == FlowQueues.Critical
+ ),
+ $"Recurring schedule {scheduleKey} did not enqueue a failed critical-queue job.",
+ TestContext.Current.CancellationToken
+ );
+ var job = jobs.Single(job => job.ScheduleJobKey == scheduleKey);
+
+ job.QueueKey.Should().Be(FlowQueues.Critical);
+ job.PartitionKey.Should().Be(partition);
+ job.Attempts.Should().Be(1);
+ job.Errors.Should().ContainSingle();
+ _recorder.AttemptsFor(key).Should().ContainSingle();
+ }
+
+ [Fact]
+ public async Task DeleteRecurringAsync_WhenScheduleIsDeletedBeforeStart_ShouldNotEnqueueOccurrence()
+ {
+ var host = await StartHostAsync(options => options.AutoStart = false);
+ var key = NewKey();
+ var scheduleKey = new JobKey($"deleted-{key}");
+
+ await host.Client.ScheduleRecurringAsync(
+ new FlowPayload(key),
+ scheduleKey,
+ Schedule.Every().Second(),
+ cancellation: TestContext.Current.CancellationToken
+ );
+ var deleted = await host.Client.DeleteRecurringAsync(scheduleKey, TestContext.Current.CancellationToken);
+
+ await host.StartAsync(TestContext.Current.CancellationToken);
+ await Task.Delay(TimeSpan.FromMilliseconds(500), TestContext.Current.CancellationToken);
+
+ deleted.Should().BeTrue();
+ var schedules = await host.GetSchedulesAsync(TestContext.Current.CancellationToken);
+ var jobs = await host.GetJobsAsync(TestContext.Current.CancellationToken);
+ schedules.Should().NotContain(schedule => schedule.JobKey == scheduleKey);
+ jobs.Should().NotContain(job => job.ScheduleJobKey == scheduleKey);
+ _recorder.AttemptsFor(key).Should().BeEmpty();
+ }
+
+ [Fact]
+ public async Task ScheduleRecurringAsync_WhenTwoSchedulersSeeSameDueSchedule_ShouldEnqueueSingleOccurrence()
+ {
+ var firstHost = await StartHostAsync(options => options.AutoStart = false);
+ var secondHost = await StartHostAsync(options => options.AutoStart = false);
+ var key = NewKey();
+ var scheduleKey = new JobKey($"distributed-{key}");
+ var now = DateTimeOffset.UtcNow;
+
+ await firstHost.Client.ScheduleRecurringAsync(
+ new FlowPayload(key),
+ scheduleKey,
+ Schedule.Every().Hour(),
+ options => options.MisfirePolicy = MisfirePolicy.ExecuteNow,
+ TestContext.Current.CancellationToken
+ );
+ await MoveScheduleIntoPastAsync(firstHost, scheduleKey, now.AddHours(-2), now.AddHours(-1));
+
+ await Task.WhenAll(
+ firstHost.StartAsync(TestContext.Current.CancellationToken),
+ secondHost.StartAsync(TestContext.Current.CancellationToken)
+ );
+
+ await firstHost.WaitForJobsAsync(
+ jobs =>
+ {
+ var scheduleJobs = jobs.Where(job => job.ScheduleJobKey == scheduleKey).ToArray();
+ return scheduleJobs.Length == 1 && scheduleJobs[0].Status == AtomizerJobStatus.Completed;
+ },
+ $"Distributed recurring schedule {scheduleKey} did not produce one completed occurrence.",
+ TestContext.Current.CancellationToken
+ );
+ await Task.Delay(TimeSpan.FromMilliseconds(300), TestContext.Current.CancellationToken);
+
+ var jobs = await firstHost.GetJobsAsync(TestContext.Current.CancellationToken);
+ var scheduleJobs = jobs.Where(job => job.ScheduleJobKey == scheduleKey).ToArray();
+ scheduleJobs.Should().ContainSingle();
+ scheduleJobs[0].Status.Should().Be(AtomizerJobStatus.Completed);
+ _recorder.AttemptsFor(key).Should().ContainSingle();
+ }
+}
diff --git a/tests/Atomizer.FlowTests/TestJobs/FlowTestJobs.cs b/tests/Atomizer.FlowTests/TestJobs/FlowTestJobs.cs
new file mode 100644
index 0000000..224faf7
--- /dev/null
+++ b/tests/Atomizer.FlowTests/TestJobs/FlowTestJobs.cs
@@ -0,0 +1,50 @@
+using Atomizer.FlowTests.Infrastructure;
+
+namespace Atomizer.FlowTests.TestJobs;
+
+internal sealed class FlowTestJobMarker;
+
+internal sealed class FlowJob(FlowTestRecorder recorder) : IAtomizerJob
+{
+ public Task HandleAsync(FlowPayload payload, JobContext context)
+ {
+ recorder.Record(payload.Key, context);
+ return Task.CompletedTask;
+ }
+}
+
+internal sealed class FailingFlowJob(FlowTestRecorder recorder) : IAtomizerJob
+{
+ public Task HandleAsync(FailingFlowPayload payload, JobContext context)
+ {
+ recorder.Record(payload.Key, context);
+ throw new InvalidOperationException(payload.Key);
+ }
+}
+
+internal sealed class EventuallySuccessfulFlowJob(FlowTestRecorder recorder)
+ : IAtomizerJob
+{
+ public Task HandleAsync(EventuallySuccessfulFlowPayload payload, JobContext context)
+ {
+ var attempt = recorder.Record(payload.Key, context);
+ if (attempt.Attempt <= payload.FailuresBeforeSuccess)
+ {
+ throw new InvalidOperationException($"attempt-{attempt.Attempt}");
+ }
+
+ return Task.CompletedTask;
+ }
+}
+
+internal sealed class StopAndResumeFlowJob(FlowTestRecorder recorder) : IAtomizerJob
+{
+ public async Task HandleAsync(StopAndResumeFlowPayload payload, JobContext context)
+ {
+ recorder.Record(payload.Key, context);
+ if (recorder.AttemptsFor(payload.Key).Count > 1)
+ return;
+
+ await Task.Delay(Timeout.InfiniteTimeSpan, context.CancellationToken);
+ }
+}
diff --git a/tests/Atomizer.FlowTests/TestJobs/FlowTestPayloads.cs b/tests/Atomizer.FlowTests/TestJobs/FlowTestPayloads.cs
new file mode 100644
index 0000000..458d550
--- /dev/null
+++ b/tests/Atomizer.FlowTests/TestJobs/FlowTestPayloads.cs
@@ -0,0 +1,11 @@
+namespace Atomizer.FlowTests.TestJobs;
+
+internal sealed record FlowPayload(string Key);
+
+internal sealed record FailingFlowPayload(string Key);
+
+internal sealed record EventuallySuccessfulFlowPayload(string Key, int FailuresBeforeSuccess);
+
+internal sealed record MissingHandlerPayload(string Key);
+
+internal sealed record StopAndResumeFlowPayload(string Key);
diff --git a/tests/Atomizer.FlowTests/xunit.runner.json b/tests/Atomizer.FlowTests/xunit.runner.json
new file mode 100644
index 0000000..849b99c
--- /dev/null
+++ b/tests/Atomizer.FlowTests/xunit.runner.json
@@ -0,0 +1,5 @@
+{
+ "$schema": "https://xunit.net/schema/current/xunit.runner.schema.json",
+ "parallelizeTestCollections": false,
+ "parallelizeAssembly": true
+}