Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion .github/workflows/verify.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,18 @@ jobs:
run: dotnet build --no-restore --configuration Release --verbosity minimal

- name: Test
run: dotnet test --no-build --verbosity normal --configuration Release --collect:"XPlat Code Coverage" --logger "trx" --results-directory ./TestResults
run: >
dotnet test
--no-build
--configuration Release
--verbosity normal
--maxcpucount
-p:TestTfmsInParallel=true
--collect:"XPlat Code Coverage"
--logger "trx"
--results-directory ./TestResults
--
RunConfiguration.MaxCpuCount=0

- name: Test results report
uses: dorny/test-reporter@v2
Expand Down
15 changes: 15 additions & 0 deletions Atomizer.sln
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Atomizer.Redis.Tests", "tes
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Atomizer.Redis.Example", "samples\Atomizer.Redis.Example\Atomizer.Redis.Example.csproj", "{1D05317E-F0B0-4638-99D8-CCB018BCB681}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Atomizer.FlowTests", "tests\Atomizer.FlowTests\Atomizer.FlowTests.csproj", "{F37FF791-43BD-4394-8BED-95C9F013DCA8}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -195,6 +197,18 @@ Global
{1D05317E-F0B0-4638-99D8-CCB018BCB681}.Release|x64.Build.0 = Release|Any CPU
{1D05317E-F0B0-4638-99D8-CCB018BCB681}.Release|x86.ActiveCfg = Release|Any CPU
{1D05317E-F0B0-4638-99D8-CCB018BCB681}.Release|x86.Build.0 = Release|Any CPU
{F37FF791-43BD-4394-8BED-95C9F013DCA8}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{F37FF791-43BD-4394-8BED-95C9F013DCA8}.Debug|Any CPU.Build.0 = Debug|Any CPU
{F37FF791-43BD-4394-8BED-95C9F013DCA8}.Debug|x64.ActiveCfg = Debug|Any CPU
{F37FF791-43BD-4394-8BED-95C9F013DCA8}.Debug|x64.Build.0 = Debug|Any CPU
{F37FF791-43BD-4394-8BED-95C9F013DCA8}.Debug|x86.ActiveCfg = Debug|Any CPU
{F37FF791-43BD-4394-8BED-95C9F013DCA8}.Debug|x86.Build.0 = Debug|Any CPU
{F37FF791-43BD-4394-8BED-95C9F013DCA8}.Release|Any CPU.ActiveCfg = Release|Any CPU
{F37FF791-43BD-4394-8BED-95C9F013DCA8}.Release|Any CPU.Build.0 = Release|Any CPU
{F37FF791-43BD-4394-8BED-95C9F013DCA8}.Release|x64.ActiveCfg = Release|Any CPU
{F37FF791-43BD-4394-8BED-95C9F013DCA8}.Release|x64.Build.0 = Release|Any CPU
{F37FF791-43BD-4394-8BED-95C9F013DCA8}.Release|x86.ActiveCfg = Release|Any CPU
{F37FF791-43BD-4394-8BED-95C9F013DCA8}.Release|x86.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand All @@ -212,5 +226,6 @@ Global
{33964D8D-5DA4-442C-8CEB-50F36DF81199} = {57DC23B8-FC3C-41A3-AEB0-21C016F935F6}
{B6852BFA-D9C6-48EE-9ED6-35B542B834C9} = {7233FD90-018C-47BF-9001-EB1681DFE75B}
{1D05317E-F0B0-4638-99D8-CCB018BCB681} = {5D128655-88B1-4CA4-B12B-9E6C067D6DA3}
{F37FF791-43BD-4394-8BED-95C9F013DCA8} = {7233FD90-018C-47BF-9001-EB1681DFE75B}
EndGlobalSection
EndGlobal
1 change: 1 addition & 0 deletions src/Atomizer/AssemblyAttributes.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Runtime.CompilerServices;

[assembly: InternalsVisibleTo("Atomizer.Tests")]
[assembly: InternalsVisibleTo("Atomizer.FlowTests")]
[assembly: InternalsVisibleTo("DynamicProxyGenAssembly2")]
7 changes: 5 additions & 2 deletions src/Atomizer/Configuration/QueueOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,12 @@ public sealed class QueueOptions

/// <summary>
/// Gets the interval at which the internal processing loop ticks.
/// <remarks>Default is 1 second.</remarks>
/// <remarks>
/// Default is 1 second. The setter is internal so that test assemblies (via InternalsVisibleTo)
/// can reduce the tick interval for timing-sensitive flow tests without exposing mutation to external consumers.
/// </remarks>
/// </summary>
public TimeSpan TickInterval { get; private set; } = TimeSpan.FromSeconds(1);
public TimeSpan TickInterval { get; internal set; } = TimeSpan.FromSeconds(1);

/// <summary>
/// Initializes a new <see cref="QueueOptions"/> with the specified queue key.
Expand Down
5 changes: 5 additions & 0 deletions src/Atomizer/Processing/QueuePoller.cs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ await storage.ExecuteInLeaseAsync(

foreach (var job in jobs)
{
if (job.Status == AtomizerJobStatus.Processing)
{
job.Release(now);
}

job.Lease(leaseToken, now, queue.VisibilityTimeout);
acquired.Add(job);
}
Expand Down
46 changes: 46 additions & 0 deletions tests/Atomizer.FlowTests/Atomizer.FlowTests.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<OutputType>Exe</OutputType>
<RootNamespace>Atomizer.FlowTests</RootNamespace>
<TargetFramework>net8.0</TargetFramework>
<LangVersion>14</LangVersion>
<IsPackable>false</IsPackable>
<IsTestProject>true</IsTestProject>
</PropertyGroup>
<ItemGroup>
<Content Include="xunit.runner.json" CopyToOutputDirectory="PreserveNewest" />
</ItemGroup>
<ItemGroup>
<Using Include="Xunit" />
<Using Include="AwesomeAssertions" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="coverlet.collector" Version="6.0.4">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="AwesomeAssertions" Version="9.4.0" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Design" Version="6.0.0">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="Microsoft.EntityFrameworkCore.SqlServer" Version="6.0.0" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="8.0.0" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.13.0" />
<PackageReference Include="Npgsql.EntityFrameworkCore.PostgreSQL" Version="6.0.0" />
<PackageReference Include="Pomelo.EntityFrameworkCore.MySql" Version="6.0.0" />
<PackageReference Include="StackExchange.Redis" Version="2.11.3" />
<PackageReference Include="Testcontainers.MsSql" Version="4.10.0" />
<PackageReference Include="Testcontainers.MySql" Version="4.10.0" />
<PackageReference Include="Testcontainers.PostgreSql" Version="4.10.0" />
<PackageReference Include="Testcontainers.Redis" Version="4.10.0" />
<PackageReference Include="xunit.v3" Version="2.0.1" />
<PackageReference Include="xunit.runner.visualstudio" Version="3.0.2" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\src\Atomizer.EntityFrameworkCore\Atomizer.EntityFrameworkCore.csproj" />
<ProjectReference Include="..\..\src\Atomizer.Redis\Atomizer.Redis.csproj" />
</ItemGroup>
</Project>
100 changes: 100 additions & 0 deletions tests/Atomizer.FlowTests/AtomizerFlowTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
using Atomizer.FlowTests.Drivers;
using Atomizer.FlowTests.Infrastructure;

namespace Atomizer.FlowTests;

public abstract partial class AtomizerFlowTests : IAsyncLifetime
{
private readonly FlowTestDriver _driver;
private readonly List<FlowTestHost> _hosts = new List<FlowTestHost>();
private readonly string _runId = Guid.NewGuid().ToString("N");
private FlowTestRecorder _recorder = null!;

protected AtomizerFlowTests(FlowTestDriver driver)
{
_driver = driver;
}

public async ValueTask InitializeAsync()
{
_recorder = new FlowTestRecorder();
await _driver.ResetAsync(TestContext.Current.CancellationToken);
}

public async ValueTask DisposeAsync()
{
foreach (var host in _hosts.AsEnumerable().Reverse())
{
await host.DisposeAsync();
}

await _driver.ResetAsync(CancellationToken.None);
}

private async Task<FlowTestHost> StartHostAsync(Action<FlowHostOptions>? configure = null)
{
var host = await FlowTestHost.CreateAsync(
_driver,
_runId,
_recorder,
configure,
TestContext.Current.CancellationToken
);
_hosts.Add(host);
return host;
}

private async Task MoveScheduleIntoPastAsync(
FlowTestHost host,
JobKey scheduleKey,
DateTimeOffset lastEnqueueAt,
DateTimeOffset nextRunAt
)
{
var schedule = (await host.GetSchedulesAsync(TestContext.Current.CancellationToken)).Single(schedule =>
schedule.JobKey == scheduleKey
);
schedule.LastEnqueueAt = lastEnqueueAt;
schedule.NextRunAt = nextRunAt;
schedule.CreatedAt = lastEnqueueAt.AddSeconds(-10);
await host.UpdateSchedulesAsync(new[] { schedule }, TestContext.Current.CancellationToken);
}

private async Task WaitUntilDeletedAsync(FlowTestHost host, Guid jobId)
{
using var timeout = CancellationTokenSource.CreateLinkedTokenSource(TestContext.Current.CancellationToken);
timeout.CancelAfter(FlowTestTimings.WaitTimeout);

while (true)
{
if (await host.GetJobAsync(jobId, timeout.Token) is null)
return;

try
{
await Task.Delay(FlowTestTimings.PollInterval, timeout.Token);
}
catch (OperationCanceledException) when (timeout.IsCancellationRequested)
{
throw new TimeoutException($"Job {jobId} was not deleted by retention.");
}
}
}

private string NewKey() => $"{_driver.Name.Replace(" ", string.Empty).ToLowerInvariant()}-{Guid.NewGuid():N}";
}

[Collection(nameof(InMemoryFlowTestDriver))]
public sealed class InMemoryAtomizerFlowTests(InMemoryFlowTestDriver driver) : AtomizerFlowTests(driver);

[Collection(nameof(PostgreSqlFlowTestDriver))]
public sealed class PostgreSqlAtomizerFlowTests(PostgreSqlFlowTestDriver driver) : AtomizerFlowTests(driver);

[Collection(nameof(MySqlFlowTestDriver))]
public sealed class MySqlAtomizerFlowTests(MySqlFlowTestDriver driver) : AtomizerFlowTests(driver);

[Collection(nameof(SqlServerFlowTestDriver))]
public sealed class SqlServerAtomizerFlowTests(SqlServerFlowTestDriver driver) : AtomizerFlowTests(driver);

[Collection(nameof(RedisFlowTestDriver))]
public sealed class RedisAtomizerFlowTests(RedisFlowTestDriver driver) : AtomizerFlowTests(driver);
53 changes: 53 additions & 0 deletions tests/Atomizer.FlowTests/DequeueFlowTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
using Atomizer.FlowTests.TestJobs;

namespace Atomizer.FlowTests;

public abstract partial class AtomizerFlowTests
{
[Fact]
public async Task DequeueAsync_WhenPendingJobIsCancelledBeforeProcessing_ShouldPersistCancelledStateAndNotDispatch()
{
var host = await StartHostAsync(options => options.AutoStart = false);
var key = NewKey();

var jobId = await host.Client.EnqueueAsync(
new FlowPayload(key),
cancellation: TestContext.Current.CancellationToken
);
var dequeued = await host.Client.DequeueAsync(jobId, TestContext.Current.CancellationToken);

await host.StartAsync(TestContext.Current.CancellationToken);
await Task.Delay(TimeSpan.FromMilliseconds(300), TestContext.Current.CancellationToken);

dequeued.Should().BeTrue();
var job = await host.GetJobAsync(jobId, TestContext.Current.CancellationToken);
job.Should().NotBeNull();
job!.Status.Should().Be(AtomizerJobStatus.Cancelled);
_recorder.AttemptsFor(key).Should().BeEmpty();
}

[Fact]
public async Task DequeueAsync_WhenJobAlreadyCompleted_ShouldReturnFalseAndKeepCompletedState()
{
var host = await StartHostAsync();
var key = NewKey();

var jobId = await host.Client.EnqueueAsync(
new FlowPayload(key),
cancellation: TestContext.Current.CancellationToken
);
await host.WaitForJobAsync(
jobId,
job => job.Status == AtomizerJobStatus.Completed,
TestContext.Current.CancellationToken
);

var dequeued = await host.Client.DequeueAsync(jobId, TestContext.Current.CancellationToken);
var job = await host.GetJobAsync(jobId, TestContext.Current.CancellationToken);

dequeued.Should().BeFalse();
job.Should().NotBeNull();
job!.Status.Should().Be(AtomizerJobStatus.Completed);
_recorder.AttemptsFor(key).Should().ContainSingle();
}
}
Loading
Loading