Skip to content
1 change: 1 addition & 0 deletions docs/10_overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,4 @@ Below, you can find additional documentation for `mlp`:
- [Generation Configuration](./30_generate.md)
- [Hydration Logic](./40_hydrate.md)
- [Interpolatation Template](./50_interpolate.md)
- [Filtered Jobs](./60_filtered_job.md)
86 changes: 86 additions & 0 deletions docs/60_filtered_job.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# Filtered Jobs

`mlp` supports the execution of Kubernetes `Job` resources before the main deploy phase. These filtered jobs
can be used to run database migrations, data transformations, or any other task that must complete successfully
before the rest of the resources are applied to the cluster.

## How It Works

Filtered jobs are identified by the annotation `mia-platform.eu/deploy` on a `Job` resource. When the
`--filtered-job-annotation` flag is provided to the `deploy` command, `mlp` will scan the resources and
separate all `Job` resources whose annotation value matches the one provided.

These jobs are executed before the remaining resources are applied. If the flag is not provided, any `Job`
resource carrying the `mia-platform.eu/deploy` annotation will be stripped from the resource list and not
applied at all.

## Annotating a Job

To mark a `Job` as a filtered job, add the `mia-platform.eu/deploy` annotation with the desired value:

```yaml
apiVersion: batch/v1
kind: Job
metadata:
name: db-migration
annotations:
mia-platform.eu/deploy: pre-deploy
spec:
template:
spec:
restartPolicy: Never
containers:
- name: migrate
image: my-app:latest
command: ["./migrate"]
```

Then pass the matching value to the deploy command:

```sh
mlp deploy --filtered-job-annotation pre-deploy ...
```

## Optional Jobs

A filtered job can be marked as optional by adding the annotation `mia-platform.eu/deploy-optional: "true"`.
Optional jobs are non-blocking: if they fail, the failure is logged as a warning and the deploy process
continues normally. Mandatory jobs (those without the optional annotation) will block and fail the deploy if
they cannot complete successfully.

```yaml
apiVersion: batch/v1
kind: Job
metadata:
name: optional-cleanup
annotations:
mia-platform.eu/deploy: pre-deploy
mia-platform.eu/deploy-optional: "true"
spec:
template:
spec:
restartPolicy: Never
containers:
- name: cleanup
image: my-app:latest
command: ["./cleanup"]
```

## Retry and Timeout

Each filtered job is retried automatically on failure. Before each retry the failed job is deleted from
the cluster so that a fresh instance can be created. The number of retries and the per-execution timeout
can be controlled via dedicated flags:

| Flag | Default | Description |
|---|---|---|
| `--filtered-job-annotation` | _(empty)_ | Annotation value used to identify filtered jobs |
| `--filtered-job-max-retries` | `3` | Maximum number of retry attempts for a failed job |
| `--filtered-job-timeout` | `30s` | Timeout for a single job execution attempt |

If a job exceeds the configured timeout it is considered failed and the retry logic applies as normal.

## Dry Run

When the `--dry-run` flag is active, no jobs are created on the cluster. Instead, `mlp` will print a message
for each job that would have been executed, allowing you to verify the configuration without side effects.
113 changes: 89 additions & 24 deletions pkg/cmd/deploy/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,17 @@ const (
waitFlagDefaultValue = true
waitFlagUsage = "if true, wait for resources to be current before marking them as successfully applied"

filteredJobAnnotationFlagName = "filtered-job-annotation"
filteredJobAnnotationFlagUsage = "the annotation value for mia-platform.eu/deploy to identify filtered jobs"

filteredJobMaxRetriesFlagName = "filtered-job-max-retries"
filteredJobMaxRetriesDefaultValue = 3
filteredJobMaxRetriesFlagUsage = "the maximum number of retries for a failed filtered job"

filteredJobTimeoutFlagName = "filtered-job-timeout"
filteredJobTimeoutDefaultValue = 30 * time.Second
filteredJobTimeoutFlagUsage = "the timeout for a single filtered job execution"

stdinToken = "-"
fieldManager = "mlp"
inventoryName = "eu.mia-platform.mlp"
Expand All @@ -97,25 +108,31 @@ var (
// Flags contains all the flags for the `deploy` command. They will be converted to Options
// that contains all runtime options for the command.
type Flags struct {
ConfigFlags *genericclioptions.ConfigFlags
inputPaths []string
deployType string
forceDeploy bool
ensureNamespace bool
timeout time.Duration
dryRun bool
wait bool
ConfigFlags *genericclioptions.ConfigFlags
inputPaths []string
deployType string
forceDeploy bool
ensureNamespace bool
timeout time.Duration
dryRun bool
wait bool
filteredJobAnnotation string
filteredJobMaxRetries int
filteredJobTimeout time.Duration
}

// Options have the data required to perform the deploy operation
type Options struct {
inputPaths []string
deployType string
forceDeploy bool
ensureNamespace bool
timeout time.Duration
dryRun bool
wait bool
inputPaths []string
deployType string
forceDeploy bool
ensureNamespace bool
timeout time.Duration
dryRun bool
wait bool
filteredJobAnnotation string
filteredJobMaxRetries int
filteredJobTimeout time.Duration

clientFactory util.ClientFactory
clock clock.PassiveClock
Expand Down Expand Up @@ -185,6 +202,9 @@ func (f *Flags) AddFlags(flags *pflag.FlagSet) {
flags.DurationVar(&f.timeout, timeoutFlagName, timeoutDefaultValue, timeoutFlagUsage)
flags.BoolVar(&f.dryRun, dryRunFlagName, dryRunDefaultValue, dryRunFlagUsage)
flags.BoolVar(&f.wait, waitFlagName, waitFlagDefaultValue, waitFlagUsage)
flags.StringVar(&f.filteredJobAnnotation, filteredJobAnnotationFlagName, "", filteredJobAnnotationFlagUsage)
flags.IntVar(&f.filteredJobMaxRetries, filteredJobMaxRetriesFlagName, filteredJobMaxRetriesDefaultValue, filteredJobMaxRetriesFlagUsage)
flags.DurationVar(&f.filteredJobTimeout, filteredJobTimeoutFlagName, filteredJobTimeoutDefaultValue, filteredJobTimeoutFlagUsage)
}

// ToOptions transform the command flags in command runtime arguments
Expand All @@ -194,12 +214,15 @@ func (f *Flags) ToOptions(reader io.Reader, writer io.Writer) (*Options, error)
}

return &Options{
inputPaths: f.inputPaths,
deployType: f.deployType,
forceDeploy: f.forceDeploy,
ensureNamespace: f.ensureNamespace,
timeout: f.timeout,
wait: f.wait,
inputPaths: f.inputPaths,
deployType: f.deployType,
forceDeploy: f.forceDeploy,
ensureNamespace: f.ensureNamespace,
timeout: f.timeout,
wait: f.wait,
filteredJobAnnotation: f.filteredJobAnnotation,
filteredJobMaxRetries: f.filteredJobMaxRetries,
filteredJobTimeout: f.filteredJobTimeout,

clientFactory: util.NewFactory(f.ConfigFlags),
reader: reader,
Expand Down Expand Up @@ -247,6 +270,16 @@ func (o *Options) Run(ctx context.Context) error {
return nil
}

var stop bool
resources, stop, err = o.runFilteredJobPhase(ctx, namespace, resources)
if err != nil {
return err
}
if stop {
logger.V(3).Info("filtered jobs executed, skipping remaining resources apply")
return nil
}

deployIdentifier := map[string]string{
"time": o.clock.Now().Format(time.RFC3339),
}
Expand Down Expand Up @@ -299,13 +332,45 @@ loop:
return nil
}

return errors.New(formatApplyErrors(errorsDuringApplying))
}

func formatApplyErrors(errs []error) string {
builder := new(strings.Builder)
fmt.Fprintf(builder, "applying process has encountered %d error(s):\n", len(errorsDuringApplying))
for _, err := range errorsDuringApplying {
fmt.Fprintf(builder, "applying process has encountered %d error(s):\n", len(errs))
for _, err := range errs {
fmt.Fprintf(builder, "\t- %s\n", err)
}
return builder.String()
}

// runFilteredJobPhase handles filtered job execution. When the --filtered-job-annotation
// flag is set, matching jobs are extracted and executed; if any are found the
// caller should stop the normal apply phase (stop=true). If the flag is set but no jobs
// match the filter, nothing is applied (stop=true with empty resources). When the flag is
// absent, annotated jobs are stripped from the resource list so they are never applied as
// regular resources.
func (o *Options) runFilteredJobPhase(ctx context.Context, namespace string, resources []*unstructured.Unstructured) ([]*unstructured.Unstructured, bool, error) {
if o.filteredJobAnnotation == "" {
return StripAnnotatedJobs(resources), false, nil
}

filteredJobs, remaining := FilterAnnotatedJobs(resources, o.filteredJobAnnotation)
if len(filteredJobs) == 0 {
return nil, true, nil
}

clientSet, err := o.clientFactory.KubernetesClientSet()
if err != nil {
return nil, false, err
}

runner := NewFilteredJobRunner(clientSet, namespace, o.filteredJobMaxRetries, o.filteredJobTimeout, o.writer, o.dryRun)
if err := runner.Run(ctx, filteredJobs); err != nil {
return nil, false, err
}

return errors.New(builder.String())
return remaining, true, nil
}

func deployTypeFlagCompletionfunc(*cobra.Command, []string, string) ([]string, cobra.ShellCompDirective) {
Expand Down
24 changes: 15 additions & 9 deletions pkg/cmd/deploy/deploy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,18 +95,24 @@ func TestOptions(t *testing.T) {
configFlags := genericclioptions.NewConfigFlags(false)

expectedOpts := &Options{
inputPaths: []string{"input"},
deployType: "smart_deploy",
reader: reader,
writer: buffer,
clientFactory: util.NewFactory(configFlags),
clock: clock.RealClock{},
wait: false,
inputPaths: []string{"input"},
deployType: "smart_deploy",
filteredJobTimeout: 5 * time.Minute,
filteredJobMaxRetries: 3,
filteredJobAnnotation: "pre-deploy",
reader: reader,
writer: buffer,
clientFactory: util.NewFactory(configFlags),
clock: clock.RealClock{},
wait: false,
}

flag := &Flags{
inputPaths: []string{"input"},
deployType: "smart_deploy",
inputPaths: []string{"input"},
deployType: "smart_deploy",
filteredJobTimeout: 5 * time.Minute,
filteredJobMaxRetries: 3,
filteredJobAnnotation: "pre-deploy",
}
_, err := flag.ToOptions(reader, buffer)
assert.ErrorContains(t, err, "config flags are required")
Expand Down
Loading