Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
284 changes: 284 additions & 0 deletions pip/pip-458.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,284 @@
# PIP-458: Add Async Resource List Filtering API to AuthorizationProvider

*Status: Draft*

# Background knowledge

Pulsar's `AuthorizationProvider` is a pluggable interface (`pulsar-broker-common`) that brokers use to make authorization decisions. It exposes async methods for checking permissions on tenants, namespaces, topics, and clusters (e.g., `allowTenantOperationAsync`, `allowNamespaceOperationAsync`). `AuthorizationService` wraps this provider and adds a guard on the `authorizationEnabled` configuration flag before delegating.

Pulsar's Admin REST API exposes list endpoints for clusters, tenants, namespaces, and topics. These endpoints are async — they retrieve data from the metadata store, apply post-processing, and call `asyncResponse.resume()` to return the result. Most list endpoints perform an all-or-nothing authorization check before returning the full list (e.g., `TenantOperation.LIST_TENANTS`, `NamespaceOperation.GET_TOPICS`).

JAX-RS provides a `ContainerResponseFilter` hook that runs after the endpoint returns. Its `filter()` method is synchronous (returns `void`), which makes it unsuitable for any authorization logic that needs to access metadata asynchronously.

# Motivation

Currently, if a user is authorized for a LIST operation, they see **all** resources; otherwise they get a 403. There is no way for an `AuthorizationProvider` to filter list results per-item — for example, only returning tenants or namespaces that the user has access to.

Users who need per-item filtering today must use a `ContainerResponseFilter`. However, because `filter()` is synchronous and `asyncResponse.resume()` may execute on the metadata thread or web executor thread, blocking metadata operations in a response filter can exhaust the thread pool and cause deadlocks.

This PIP proposes adding a default method to `AuthorizationProvider` that allows async per-item filtering of list results, called inside the endpoint method where async execution is natural.

# Goals

## In Scope

- New default method on `AuthorizationProvider` for async resource filtering.
- A `FilterContext` class to carry resource type and parent resource information.
- A corresponding delegation method on `AuthorizationService` that respects the `authorizationEnabled` flag.
- Integration into the list endpoints for clusters, tenants, namespaces, and topics.

## Out of Scope

- Changing the existing authorization check model (the all-or-nothing gate remains).
- Providing a built-in filtering implementation in `PulsarAuthorizationProvider` (this PIP only adds the extension point).

# High Level Design

A new default method `filterAsync` is added to the `AuthorizationProvider` interface. It accepts a `FilterContext` (resource type + optional parent resource), the list of resource names, the user's role, and authentication data. It returns a `CompletableFuture<List<String>>` containing the filtered list.

The default implementation returns the full list unchanged, so existing `AuthorizationProvider` implementations continue to work without modification.

Each list endpoint (`getClusters`, `getTenants`, `getTenantNamespaces`, `getTopics`) inserts a `.thenCompose(resources -> authorizationService.filterAsync(...))` step into its existing async chain, after the list is retrieved and before `asyncResponse.resume()`.

`AuthorizationService` wraps the call with the standard `authorizationEnabled` check — when authorization is disabled, the filtering step is skipped entirely.

# Detailed Design

## Design & Implementation Details

### Interaction with existing authorization gates

The existing all-or-nothing authorization check (e.g., `TenantOperation.LIST_TENANTS`, `NamespaceOperation.GET_TOPICS`) remains unchanged. `filterAsync` is invoked *after* the user passes the existing gate:

- A user who fails the LIST permission check still receives a 403 — `filterAsync` is never called.
- A user who passes the LIST permission check will have their results filtered by `filterAsync`.

This design keeps the existing security model intact. Replacing the gate with pure per-item filtering would change the security semantics (403 → empty list) and is out of scope.

### Integration into list endpoints

The `filterAsync` method will be called in the async chain of each list endpoint, after the list is retrieved from the metadata store and before `asyncResponse.resume()`:

**TenantsBase.getTenants():**
```java
validateBothSuperUserAndTenantOperation(null, TenantOperation.LIST_TENANTS)
.thenCompose(__ -> tenantResources().listTenantsAsync())
.thenCompose(tenants -> authorizationService.filterAsync(
new FilterContext(ResourceType.TENANT),
tenants, clientAppId(), clientAuthData()))
.thenAccept(filtered -> {
List<String> deepCopy = new ArrayList<>(filtered);
deepCopy.sort(null);
asyncResponse.resume(deepCopy);
})
```

**ClustersBase.getClusters():**

Note: The clusters list endpoint currently does not perform an authorization check. This PIP does not add one — `filterAsync` is still called to allow the provider to filter cluster names if desired.

```java
clusterResources().listAsync()
.thenApply(clusters -> clusters.stream()
.filter(cluster -> !Constants.GLOBAL_CLUSTER.equals(cluster))
.collect(Collectors.toList()))
.thenCompose(clusters -> authorizationService.filterAsync(
new FilterContext(ResourceType.CLUSTER),
clusters, clientAppId(), clientAuthData()))
.thenAccept(filtered -> asyncResponse.resume(new LinkedHashSet<>(filtered)))
```

**NamespacesBase.internalGetTenantNamespaces():**
```java
validateTenantOperationAsync(tenant, TenantOperation.LIST_NAMESPACES)
.thenCompose(__ -> tenantResources().tenantExistsAsync(tenant))
.thenCompose(existed -> {
if (!existed) {
throw new RestException(Status.NOT_FOUND, "Tenant not found");
}
return tenantResources().getListOfNamespacesAsync(tenant);
})
.thenCompose(namespaces -> authorizationService.filterAsync(
new FilterContext(ResourceType.NAMESPACE, tenant),
namespaces, clientAppId(), clientAuthData()))
```

**Namespaces.getTopics():**
```java
validateNamespaceOperationAsync(namespaceName, NamespaceOperation.GET_TOPICS)
.thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
.thenCompose(policies -> internalGetListOfTopics(response, policies, mode))
.thenApply(topics -> filterSystemTopic(topics, includeSystemTopic))
.thenCompose(topics -> authorizationService.filterAsync(
new FilterContext(ResourceType.TOPIC, namespaceName.toString()),
topics, clientAppId(), clientAuthData()))
.thenAccept(response::resume)
```

## Public-facing Changes

### Public API

#### New `ResourceType` enum

```java
public enum ResourceType {
CLUSTER,
TENANT,
NAMESPACE,
TOPIC
}
```

#### New `FilterContext` class

```java
public class FilterContext {
private final ResourceType resourceType;
/**
* The parent resource under which the listed resources reside.
* <ul>
* <li>CLUSTER — null (clusters are top-level)</li>
* <li>TENANT — null (tenants are top-level)</li>
* <li>NAMESPACE — tenant name (e.g., "my-tenant")</li>
* <li>TOPIC — full namespace name (e.g., "my-tenant/my-namespace")</li>
* </ul>
*/
private final String parentResource;

public FilterContext(ResourceType resourceType) {
this(resourceType, null);
}

public FilterContext(ResourceType resourceType, String parentResource) {
this.resourceType = resourceType;
this.parentResource = parentResource;
}

public ResourceType getResourceType() {
return resourceType;
}

public String getParentResource() {
return parentResource;
}
}
```

#### Resource name formats

The `resources` list passed to `filterAsync` uses the same format as the corresponding list endpoint's response:

| ResourceType | Format | Example |
|---|---|---|
| `CLUSTER` | Short cluster name | `"us-east-1"` |
| `TENANT` | Short tenant name | `"my-tenant"` |
| `NAMESPACE` | `{tenant}/{namespace}` | `"my-tenant/my-namespace"` |
| `TOPIC` | Full topic URL | `"persistent://my-tenant/my-ns/my-topic"` |

Implementations should be prepared to handle these formats when parsing resource names.

#### New default method on `AuthorizationProvider`

```java
/**
* Filter a list of resources based on authorization.
*
* <p>Called after a list operation (e.g., list tenants, list namespaces) to allow
* the authorization provider to filter results per-item. The default implementation
* returns the full list without filtering.
*
* <p>Implementations that perform per-item authorization checks should batch or
* parallelize checks where possible to avoid serializing N sequential RPCs, which
* could significantly increase latency for large resource lists.
*
* @param context the filter context containing resource type and parent resource
* @param resources the list of resource names to filter
* @param role the role requesting the list
* @param authData authentication data for the role
* @return a CompletableFuture containing the filtered list of resource names
*/
default CompletableFuture<List<String>> filterAsync(
FilterContext context, List<String> resources, String role,
AuthenticationDataSource authData) {
return CompletableFuture.completedFuture(resources);
}
```

The default implementation returns the full list (no filtering), preserving backward compatibility. Custom `AuthorizationProvider` implementations can override this to implement per-item authorization filtering.

#### New delegation method on `AuthorizationService`

```java
public CompletableFuture<List<String>> filterAsync(
FilterContext context, List<String> resources, String role,
AuthenticationDataSource authData) {
if (!this.conf.isAuthorizationEnabled()) {
return CompletableFuture.completedFuture(resources);
}
return provider.filterAsync(context, resources, role, authData);
}
```

# Security Considerations

- **No weakening of existing checks**: The all-or-nothing LIST permission gate remains. `filterAsync` adds an additional layer of filtering; it cannot grant access to resources that the existing gate would deny.
- **Multi-tenancy**: By design, `filterAsync` enables stricter tenant isolation — providers can ensure that one tenant cannot see another tenant's namespaces or topics, even if the caller has LIST permission.
- **Default is permissive**: The default no-op implementation returns the full list. Deployments that need filtering must explicitly opt in by providing a custom `AuthorizationProvider`.

# Backward & Forward Compatibility

## Upgrade

No special steps required. The new method is a `default` method on the `AuthorizationProvider` interface, returning the full list by default. Existing custom implementations continue to work without changes.

## Downgrade / Rollback

No special steps required. Rolling back to a version without `filterAsync` simply removes the filtering step — list endpoints return their full, unfiltered results as before.

## Pulsar Geo-Replication Upgrade & Downgrade/Rollback Considerations

No impact. The filtering is applied at the REST API layer in each broker independently. It does not affect replication state, topic metadata, or cross-cluster communication.

# Performance Considerations

The `filterAsync` method is invoked on every list request, so implementations should be mindful of performance:

- **Batch authorization checks**: Rather than issuing N sequential authorization RPCs for N resources, implementations should parallelize checks (e.g., using `CompletableFuture.allOf`) or use a batch API if available.
- **Caching**: For deployments with stable ACLs, caching authorization decisions with a short TTL can significantly reduce latency.
- **Short-circuit for super users**: Implementations may choose to skip filtering entirely for super users or admin roles.

The default implementation (return full list) adds negligible overhead since it returns an already-completed future.

# Test Plan

- **Unit tests for `FilterContext`**: Verify construction with and without parent resource, getter behavior.
- **Unit tests for default `filterAsync`**: Verify the default implementation returns the full list unchanged.
- **Unit tests for `AuthorizationService.filterAsync`**:
- Returns the full list when `authorizationEnabled=false` (provider is never called).
- Delegates to the provider when `authorizationEnabled=true`.
- **Unit tests with a custom filtering provider**: Register a mock `AuthorizationProvider` that filters based on role and resource type; verify correct filtering for each `ResourceType`.
- **Integration tests for each list endpoint** (clusters, tenants, namespaces, topics):
- With a no-op filter provider: verify the endpoint returns the same results as before.
- With a filtering provider: verify the endpoint returns only the permitted subset.
- Verify that `parentResource` is correctly populated in the `FilterContext` (null for clusters/tenants, tenant name for namespaces, namespace name for topics).
- **Thread safety test**: Verify that a `filterAsync` implementation performing async metadata lookups does not deadlock or block the calling thread.

# Alternatives

## Per-resource-type methods (e.g., `filterTenantsAsync`, `filterNamespacesAsync`)

Using separate methods for each resource type would require adding a new method every time a new filterable resource type is introduced. A single method with `FilterContext` is more extensible.

## Using `ContainerResponseFilter`

The JAX-RS `ContainerResponseFilter` API is synchronous and cannot perform async authorization checks without blocking the calling thread. This leads to thread pool exhaustion and potential deadlocks when the filter needs to access metadata.

## Replacing the existing auth gate with per-item filtering

An alternative design would skip the all-or-nothing LIST permission check and rely solely on `filterAsync` to determine visibility. This was rejected because it changes the existing security model — deployments that rely on the 403 behavior for unauthorized users would silently start returning empty lists instead. The current design is additive: it layers filtering on top of the existing gate without altering its semantics.

# Links

* Mailing List discussion thread:
* Mailing List voting thread:
Loading