Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 27 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,13 @@ Atomizer is a modern, high-performance job scheduling and queueing framework for
- ⏳ **Visibility Timeout** — Prevent job duplication by locking jobs during processing.
- 🕒 **FIFO Partitioned Processing** — Guarantee strict in-order, one-at-a-time execution per partition key (e.g. per customer, per entity).
- 🧪 **In-Memory Driver** — Perfect for local development and testing; spin up queues instantly with zero setup.
- 📈 **Dashboard** — Optional read-only dashboard for jobs, schedules, queue statistics, and worker heartbeats.
- 📈 **Dashboard** — Optional operational dashboard for jobs, schedules, queue statistics, worker heartbeats, and built-in job/schedule actions.
- 🔔 **ASP.NET Core Integration** — Works with DI, logging, and modern C# idioms.

The dashboard gives teams a focused operational view of Atomizer: inspect queue health, review job and schedule details, watch active workers, and trigger common actions without building a custom admin UI.

![Atomizer Dashboard jobs view](assets/atomizer-dashboard-logo-branding-dark.png)

## Quick Start
Get up and running in minutes:

Expand Down Expand Up @@ -114,14 +118,6 @@ builder.Services.AddAtomizer(options =>

Redis storage implements the same `IAtomizerStorage` monitoring methods as the other backends, so it works with `Atomizer.Dashboard` without referencing the dashboard package from `Atomizer.Redis`.

The Redis sample includes the dashboard and a few endpoints for creating jobs:

```bash
docker compose up -d redis
dotnet run --project samples/Atomizer.Redis.Example/Atomizer.Redis.Example.csproj
```

Open `http://localhost:5053/atomizer` to inspect Redis-backed jobs, schedules, queues, and workers.

### 3. Define a Job Handler
Create a handler for your job payload:
Expand All @@ -146,7 +142,7 @@ public class SendNewsletterJob(INewsletterService newsletterService, IEmailServi
}
```

### 4. Enqueue or schedule a Job
### 4. Enqueue, schedule, execute, or dequeue a Job
Add jobs to the queue from your application code:
```csharp
app.MapPost(
Expand All @@ -164,6 +160,22 @@ app.MapPost(
);
```

Use `ExecuteAsync` when the job should run immediately in the current process but still be recorded in Atomizer as completed or failed:

```csharp
var jobId = await atomizerClient.ExecuteAsync(new SendNewsletterCommand(product));
```

If the handler throws, Atomizer records the job as failed and then rethrows the exception to the caller.

Use `DequeueAsync` to cancel a job that is still pending:

```csharp
var dequeued = await atomizerClient.DequeueAsync(jobId);
```

It returns `false` when the job does not exist or is no longer pending.

### 5. FIFO Processing (Partitioned Jobs)
To guarantee jobs for the same entity execute one-at-a-time in enqueue order, assign a `PartitionKey`:

Expand Down Expand Up @@ -191,6 +203,10 @@ await atomizer.ScheduleRecurringAsync(
Schedule.Every(2).Minutes()
);

// Later, when the recurring schedule should stop:
// safe to call even when the schedule has already been deleted or never existed.
await atomizer.DeleteRecurringAsync("LoggerJob");

...
```

Expand All @@ -203,9 +219,7 @@ var app = builder.Build();
app.MapAtomizerDashboard("/atomizer");
```

Then browse to `/atomizer` to inspect jobs, job details, schedules, queue statistics, and active worker heartbeats. The dashboard is read-only in the current release; it does not retry, cancel, or dead-letter jobs.

![Atomizer Dashboard jobs view](assets/atomizer-dashboard-logo-branding-dark.png)
Then browse to `/atomizer` to inspect jobs, job details, schedules, queue statistics, and active worker heartbeats. The dashboard also includes operational actions such as triggering registered job types, retrying failed jobs, cancelling pending jobs, enabling or disabling schedules, and running schedules immediately.

If no authorization filters are configured, dashboard requests are restricted to localhost by default. Add an `IAtomizerDashboardAuthorizationFilter` through `AddAtomizerDashboard(options => options.Authorization.Add(...))` before exposing it outside local development.

Expand Down
49 changes: 48 additions & 1 deletion samples/Atomizer.EFCore.Example/Atomizer.EFCore.Example.http
Original file line number Diff line number Diff line change
@@ -1,6 +1,53 @@
@Atomizer.EFCore.Example_HostAddress = http://localhost:5052
@ProductId = 11111111-1111-1111-1111-111111111111
@PendingJobId = 00000000-0000-0000-0000-000000000000

GET {{Atomizer.EFCore.Example_HostAddress}}/weatherforecast/
POST {{Atomizer.EFCore.Example_HostAddress}}/products
Accept: application/json

###

POST {{Atomizer.EFCore.Example_HostAddress}}/assign-stock
Content-Type: application/json

{
"productId": "{{ProductId}}",
"quantity": 25
}

###

POST {{Atomizer.EFCore.Example_HostAddress}}/execute/log
Accept: application/json

###

POST {{Atomizer.EFCore.Example_HostAddress}}/stock-events
Content-Type: application/json

{
"productId": "{{ProductId}}",
"eventType": "Restocked",
"delta": 25
}

###

POST {{Atomizer.EFCore.Example_HostAddress}}/long-running-job
Content-Type: application/json

{
"durationInSeconds": 10
}

###

# Replace @PendingJobId with a job id returned by a scheduled or enqueued request before it is picked up.
DELETE {{Atomizer.EFCore.Example_HostAddress}}/jobs/{{PendingJobId}}
Accept: application/json

###

# Idempotent: returns no content even when the recurring schedule is already gone.
DELETE {{Atomizer.EFCore.Example_HostAddress}}/recurring/LoggerJob
Accept: application/json
37 changes: 35 additions & 2 deletions samples/Atomizer.EFCore.Example/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,16 +68,18 @@
await Task.WhenAll(postgres.Database.MigrateAsync(), mysql.Database.MigrateAsync(), sqlServer.Database.MigrateAsync());

var atomizer = app.Services.GetRequiredService<IAtomizerClient>();
const string recurringLoggerJob = "LoggerJob";
const string recurringLoggerCatchUpJob = "LoggerJobCatchUp";

await atomizer.ScheduleRecurringAsync(
new LoggerJobPayload("Recurring job started", LogLevel.Information),
"LoggerJob",
recurringLoggerJob,
Schedule.Every(2).Minutes()
);

await atomizer.ScheduleRecurringAsync(
new LoggerJobPayload("Recurring job started", LogLevel.Information),
"LoggerJobCatchUp",
recurringLoggerCatchUpJob,
Schedule.Cron("0/5 * * * * *"), // Every 5 seconds,
options =>
{
Expand All @@ -86,6 +88,17 @@ await atomizer.ScheduleRecurringAsync(
}
);

app.MapPost(
"/execute/log",
async ([FromServices] IAtomizerClient atomizerClient) =>
{
var jobId = await atomizerClient.ExecuteAsync(
new LoggerJobPayload("Executed immediately from the EF Core sample API", LogLevel.Information)
);
return Results.Ok(new { jobId });
}
);

app.MapPost(
"/products",
async ([FromServices] ExamplePostgresContext dbContext, [FromServices] IAtomizerClient atomizerClient) =>
Expand Down Expand Up @@ -165,6 +178,26 @@ await atomizer.ScheduleRecurringAsync(
}
);

app.MapDelete(
"/jobs/{jobId:guid}",
async (Guid jobId, [FromServices] IAtomizerClient atomizerClient) =>
{
var dequeued = await atomizerClient.DequeueAsync(jobId);
return dequeued
? Results.NoContent()
: Results.Conflict(new { message = "The job was not pending or was not found." });
}
);

app.MapDelete(
"/recurring/{name}",
async (string name, [FromServices] IAtomizerClient atomizerClient) =>
{
await atomizerClient.DeleteRecurringAsync(name);
return Results.NoContent();
}
);

app.MapAtomizerDashboard();

app.Run();
38 changes: 37 additions & 1 deletion samples/Atomizer.Example/Atomizer.Example.http
Original file line number Diff line number Diff line change
@@ -1,6 +1,42 @@
@Atomizer.Example_HostAddress = http://localhost:5191
@PendingJobId = 00000000-0000-0000-0000-000000000000

GET {{Atomizer.Example_HostAddress}}/weatherforecast/
POST {{Atomizer.Example_HostAddress}}/log
Accept: application/json

###

POST {{Atomizer.Example_HostAddress}}/execute/log
Accept: application/json

###

POST {{Atomizer.Example_HostAddress}}/exception
Accept: application/json

###

POST {{Atomizer.Example_HostAddress}}/schedule?runInSeconds=60
Accept: application/json

###

POST {{Atomizer.Example_HostAddress}}/log-to-queue?queue=priority-queue
Accept: application/json

###

POST {{Atomizer.Example_HostAddress}}/long-running?durationInSeconds=10
Accept: application/json

###

# Replace @PendingJobId with a job id returned by /schedule before the job is picked up.
DELETE {{Atomizer.Example_HostAddress}}/jobs/{{PendingJobId}}
Accept: application/json

###

# Idempotent: returns no content even when the recurring schedule is already gone.
DELETE {{Atomizer.Example_HostAddress}}/recurring/LoggerJob
Accept: application/json
55 changes: 47 additions & 8 deletions samples/Atomizer.Example/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,18 @@
}

var atomizer = app.Services.GetRequiredService<IAtomizerClient>();
const string recurringLoggerJob = "LoggerJob";
const string recurringLoggerCatchUpJob = "LoggerJobCatchUp";

await atomizer.ScheduleRecurringAsync(
new LoggerJobPayload("Recurring job started", LogLevel.Information),
"LoggerJob",
recurringLoggerJob,
Schedule.Every(2).Minutes()
);

await atomizer.ScheduleRecurringAsync(
new LoggerJobPayload("Recurring job started", LogLevel.Information),
"LoggerJobCatchUp",
recurringLoggerCatchUpJob,
Schedule.Cron("0/5 * * * * *"), // Every 5 seconds,
options => options.MisfirePolicy = MisfirePolicy.CatchUp
);
Expand All @@ -75,23 +77,37 @@ await atomizer.ScheduleRecurringAsync(
"/log",
async ([FromServices] IAtomizerClient atomizerClient) =>
{
await atomizerClient.EnqueueAsync(new LoggerJobPayload("Hello, Atomizer!", LogLevel.Information));
var jobId = await atomizerClient.EnqueueAsync(new LoggerJobPayload("Hello, Atomizer!", LogLevel.Information));
return Results.Accepted($"/jobs/{jobId}", new { jobId });
}
);

app.MapPost(
"/execute/log",
async ([FromServices] IAtomizerClient atomizerClient) =>
{
var jobId = await atomizerClient.ExecuteAsync(
new LoggerJobPayload("Executed immediately from the sample API", LogLevel.Information)
);
return Results.Ok(new { jobId });
}
);

app.MapPost(
"/exception",
async ([FromServices] IAtomizerClient atomizerClient) =>
{
await atomizerClient.EnqueueAsync(new ExceptionJobPayload("This job will always fail!"));
var jobId = await atomizerClient.EnqueueAsync(new ExceptionJobPayload("This job will always fail!"));
return Results.Accepted($"/jobs/{jobId}", new { jobId });
}
);

app.MapPost(
"/empty",
async ([FromServices] IAtomizerClient atomizerClient) =>
{
await atomizerClient.EnqueueAsync(new EmptyPayload());
var jobId = await atomizerClient.EnqueueAsync(new EmptyPayload());
return Results.Accepted($"/jobs/{jobId}", new { jobId });
}
);

Expand All @@ -100,29 +116,52 @@ await atomizer.ScheduleRecurringAsync(
async ([FromQuery] int runInSeconds, [FromServices] IAtomizerClient atomizerClient) =>
{
var runAt = DateTimeOffset.UtcNow.AddSeconds(runInSeconds);
await atomizerClient.ScheduleAsync(
var jobId = await atomizerClient.ScheduleAsync(
new LoggerJobPayload("This job is scheduled to run in 1 minute.", LogLevel.Information),
runAt
);
return Results.Accepted($"/jobs/{jobId}", new { jobId });
}
);

app.MapPost(
"/log-to-queue",
async (string queue, [FromServices] IAtomizerClient atomizerClient) =>
{
await atomizerClient.EnqueueAsync(
var jobId = await atomizerClient.EnqueueAsync(
new LoggerJobPayload($"Logging to {queue} queue!", LogLevel.Information),
options => options.Queue = queue
);
return Results.Accepted($"/jobs/{jobId}", new { jobId });
}
);

app.MapPost(
"/long-running",
async ([FromQuery] int durationInSeconds, [FromServices] IAtomizerClient atomizerClient) =>
{
await atomizerClient.EnqueueAsync(new LongRunningJobPayload(durationInSeconds));
var jobId = await atomizerClient.EnqueueAsync(new LongRunningJobPayload(durationInSeconds));
return Results.Accepted($"/jobs/{jobId}", new { jobId });
}
);

app.MapDelete(
"/jobs/{jobId:guid}",
async (Guid jobId, [FromServices] IAtomizerClient atomizerClient) =>
{
var dequeued = await atomizerClient.DequeueAsync(jobId);
return dequeued
? Results.NoContent()
: Results.Conflict(new { message = "The job was not pending or was not found." });
}
);

app.MapDelete(
"/recurring/{name}",
async (string name, [FromServices] IAtomizerClient atomizerClient) =>
{
await atomizerClient.DeleteRecurringAsync(name);
return Results.NoContent();
}
);

Expand Down
18 changes: 18 additions & 0 deletions samples/Atomizer.Redis.Example/Atomizer.Redis.Example.http
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
@Atomizer.Redis.Example_HostAddress = http://localhost:5053
@PendingJobId = 00000000-0000-0000-0000-000000000000

POST {{Atomizer.Redis.Example_HostAddress}}/log
Accept: application/json

###

POST {{Atomizer.Redis.Example_HostAddress}}/execute/log
Accept: application/json

###

POST {{Atomizer.Redis.Example_HostAddress}}/exception
Accept: application/json

Expand All @@ -27,3 +33,15 @@ Content-Type: application/json
"accountId": "account-123",
"eventName": "StatementGenerated"
}

###

# Replace @PendingJobId with a job id returned by /schedule before the job is picked up.
DELETE {{Atomizer.Redis.Example_HostAddress}}/jobs/{{PendingJobId}}
Accept: application/json

###

# Idempotent: returns no content even when the recurring schedule is already gone.
DELETE {{Atomizer.Redis.Example_HostAddress}}/recurring/RedisRecurringLogger
Accept: application/json
Loading
Loading