diff --git a/pip/pip-458.md b/pip/pip-458.md new file mode 100644 index 0000000000000..0382b7685af24 --- /dev/null +++ b/pip/pip-458.md @@ -0,0 +1,284 @@ +# PIP-458: Add Async Resource List Filtering API to AuthorizationProvider + +*Status: Draft* + +# Background knowledge + +Pulsar's `AuthorizationProvider` is a pluggable interface (`pulsar-broker-common`) that brokers use to make authorization decisions. It exposes async methods for checking permissions on tenants, namespaces, topics, and clusters (e.g., `allowTenantOperationAsync`, `allowNamespaceOperationAsync`). `AuthorizationService` wraps this provider and adds a guard on the `authorizationEnabled` configuration flag before delegating. + +Pulsar's Admin REST API exposes list endpoints for clusters, tenants, namespaces, and topics. These endpoints are async — they retrieve data from the metadata store, apply post-processing, and call `asyncResponse.resume()` to return the result. Most list endpoints perform an all-or-nothing authorization check before returning the full list (e.g., `TenantOperation.LIST_TENANTS`, `NamespaceOperation.GET_TOPICS`). + +JAX-RS provides a `ContainerResponseFilter` hook that runs after the endpoint returns. Its `filter()` method is synchronous (returns `void`), which makes it unsuitable for any authorization logic that needs to access metadata asynchronously. + +# Motivation + +Currently, if a user is authorized for a LIST operation, they see **all** resources; otherwise they get a 403. There is no way for an `AuthorizationProvider` to filter list results per-item — for example, only returning tenants or namespaces that the user has access to. + +Users who need per-item filtering today must use a `ContainerResponseFilter`. However, because `filter()` is synchronous and `asyncResponse.resume()` may execute on the metadata thread or web executor thread, blocking metadata operations in a response filter can exhaust the thread pool and cause deadlocks. + +This PIP proposes adding a default method to `AuthorizationProvider` that allows async per-item filtering of list results, called inside the endpoint method where async execution is natural. + +# Goals + +## In Scope + +- New default method on `AuthorizationProvider` for async resource filtering. +- A `FilterContext` class to carry resource type and parent resource information. +- A corresponding delegation method on `AuthorizationService` that respects the `authorizationEnabled` flag. +- Integration into the list endpoints for clusters, tenants, namespaces, and topics. + +## Out of Scope + +- Changing the existing authorization check model (the all-or-nothing gate remains). +- Providing a built-in filtering implementation in `PulsarAuthorizationProvider` (this PIP only adds the extension point). + +# High Level Design + +A new default method `filterAsync` is added to the `AuthorizationProvider` interface. It accepts a `FilterContext` (resource type + optional parent resource), the list of resource names, the user's role, and authentication data. It returns a `CompletableFuture>` containing the filtered list. + +The default implementation returns the full list unchanged, so existing `AuthorizationProvider` implementations continue to work without modification. + +Each list endpoint (`getClusters`, `getTenants`, `getTenantNamespaces`, `getTopics`) inserts a `.thenCompose(resources -> authorizationService.filterAsync(...))` step into its existing async chain, after the list is retrieved and before `asyncResponse.resume()`. + +`AuthorizationService` wraps the call with the standard `authorizationEnabled` check — when authorization is disabled, the filtering step is skipped entirely. + +# Detailed Design + +## Design & Implementation Details + +### Interaction with existing authorization gates + +The existing all-or-nothing authorization check (e.g., `TenantOperation.LIST_TENANTS`, `NamespaceOperation.GET_TOPICS`) remains unchanged. `filterAsync` is invoked *after* the user passes the existing gate: + +- A user who fails the LIST permission check still receives a 403 — `filterAsync` is never called. +- A user who passes the LIST permission check will have their results filtered by `filterAsync`. + +This design keeps the existing security model intact. Replacing the gate with pure per-item filtering would change the security semantics (403 → empty list) and is out of scope. + +### Integration into list endpoints + +The `filterAsync` method will be called in the async chain of each list endpoint, after the list is retrieved from the metadata store and before `asyncResponse.resume()`: + +**TenantsBase.getTenants():** +```java +validateBothSuperUserAndTenantOperation(null, TenantOperation.LIST_TENANTS) + .thenCompose(__ -> tenantResources().listTenantsAsync()) + .thenCompose(tenants -> authorizationService.filterAsync( + new FilterContext(ResourceType.TENANT), + tenants, clientAppId(), clientAuthData())) + .thenAccept(filtered -> { + List deepCopy = new ArrayList<>(filtered); + deepCopy.sort(null); + asyncResponse.resume(deepCopy); + }) +``` + +**ClustersBase.getClusters():** + +Note: The clusters list endpoint currently does not perform an authorization check. This PIP does not add one — `filterAsync` is still called to allow the provider to filter cluster names if desired. + +```java +clusterResources().listAsync() + .thenApply(clusters -> clusters.stream() + .filter(cluster -> !Constants.GLOBAL_CLUSTER.equals(cluster)) + .collect(Collectors.toList())) + .thenCompose(clusters -> authorizationService.filterAsync( + new FilterContext(ResourceType.CLUSTER), + clusters, clientAppId(), clientAuthData())) + .thenAccept(filtered -> asyncResponse.resume(new LinkedHashSet<>(filtered))) +``` + +**NamespacesBase.internalGetTenantNamespaces():** +```java +validateTenantOperationAsync(tenant, TenantOperation.LIST_NAMESPACES) + .thenCompose(__ -> tenantResources().tenantExistsAsync(tenant)) + .thenCompose(existed -> { + if (!existed) { + throw new RestException(Status.NOT_FOUND, "Tenant not found"); + } + return tenantResources().getListOfNamespacesAsync(tenant); + }) + .thenCompose(namespaces -> authorizationService.filterAsync( + new FilterContext(ResourceType.NAMESPACE, tenant), + namespaces, clientAppId(), clientAuthData())) +``` + +**Namespaces.getTopics():** +```java +validateNamespaceOperationAsync(namespaceName, NamespaceOperation.GET_TOPICS) + .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName)) + .thenCompose(policies -> internalGetListOfTopics(response, policies, mode)) + .thenApply(topics -> filterSystemTopic(topics, includeSystemTopic)) + .thenCompose(topics -> authorizationService.filterAsync( + new FilterContext(ResourceType.TOPIC, namespaceName.toString()), + topics, clientAppId(), clientAuthData())) + .thenAccept(response::resume) +``` + +## Public-facing Changes + +### Public API + +#### New `ResourceType` enum + +```java +public enum ResourceType { + CLUSTER, + TENANT, + NAMESPACE, + TOPIC +} +``` + +#### New `FilterContext` class + +```java +public class FilterContext { + private final ResourceType resourceType; + /** + * The parent resource under which the listed resources reside. + * + */ + private final String parentResource; + + public FilterContext(ResourceType resourceType) { + this(resourceType, null); + } + + public FilterContext(ResourceType resourceType, String parentResource) { + this.resourceType = resourceType; + this.parentResource = parentResource; + } + + public ResourceType getResourceType() { + return resourceType; + } + + public String getParentResource() { + return parentResource; + } +} +``` + +#### Resource name formats + +The `resources` list passed to `filterAsync` uses the same format as the corresponding list endpoint's response: + +| ResourceType | Format | Example | +|---|---|---| +| `CLUSTER` | Short cluster name | `"us-east-1"` | +| `TENANT` | Short tenant name | `"my-tenant"` | +| `NAMESPACE` | `{tenant}/{namespace}` | `"my-tenant/my-namespace"` | +| `TOPIC` | Full topic URL | `"persistent://my-tenant/my-ns/my-topic"` | + +Implementations should be prepared to handle these formats when parsing resource names. + +#### New default method on `AuthorizationProvider` + +```java +/** + * Filter a list of resources based on authorization. + * + *

Called after a list operation (e.g., list tenants, list namespaces) to allow + * the authorization provider to filter results per-item. The default implementation + * returns the full list without filtering. + * + *

Implementations that perform per-item authorization checks should batch or + * parallelize checks where possible to avoid serializing N sequential RPCs, which + * could significantly increase latency for large resource lists. + * + * @param context the filter context containing resource type and parent resource + * @param resources the list of resource names to filter + * @param role the role requesting the list + * @param authData authentication data for the role + * @return a CompletableFuture containing the filtered list of resource names + */ +default CompletableFuture> filterAsync( + FilterContext context, List resources, String role, + AuthenticationDataSource authData) { + return CompletableFuture.completedFuture(resources); +} +``` + +The default implementation returns the full list (no filtering), preserving backward compatibility. Custom `AuthorizationProvider` implementations can override this to implement per-item authorization filtering. + +#### New delegation method on `AuthorizationService` + +```java +public CompletableFuture> filterAsync( + FilterContext context, List resources, String role, + AuthenticationDataSource authData) { + if (!this.conf.isAuthorizationEnabled()) { + return CompletableFuture.completedFuture(resources); + } + return provider.filterAsync(context, resources, role, authData); +} +``` + +# Security Considerations + +- **No weakening of existing checks**: The all-or-nothing LIST permission gate remains. `filterAsync` adds an additional layer of filtering; it cannot grant access to resources that the existing gate would deny. +- **Multi-tenancy**: By design, `filterAsync` enables stricter tenant isolation — providers can ensure that one tenant cannot see another tenant's namespaces or topics, even if the caller has LIST permission. +- **Default is permissive**: The default no-op implementation returns the full list. Deployments that need filtering must explicitly opt in by providing a custom `AuthorizationProvider`. + +# Backward & Forward Compatibility + +## Upgrade + +No special steps required. The new method is a `default` method on the `AuthorizationProvider` interface, returning the full list by default. Existing custom implementations continue to work without changes. + +## Downgrade / Rollback + +No special steps required. Rolling back to a version without `filterAsync` simply removes the filtering step — list endpoints return their full, unfiltered results as before. + +## Pulsar Geo-Replication Upgrade & Downgrade/Rollback Considerations + +No impact. The filtering is applied at the REST API layer in each broker independently. It does not affect replication state, topic metadata, or cross-cluster communication. + +# Performance Considerations + +The `filterAsync` method is invoked on every list request, so implementations should be mindful of performance: + +- **Batch authorization checks**: Rather than issuing N sequential authorization RPCs for N resources, implementations should parallelize checks (e.g., using `CompletableFuture.allOf`) or use a batch API if available. +- **Caching**: For deployments with stable ACLs, caching authorization decisions with a short TTL can significantly reduce latency. +- **Short-circuit for super users**: Implementations may choose to skip filtering entirely for super users or admin roles. + +The default implementation (return full list) adds negligible overhead since it returns an already-completed future. + +# Test Plan + +- **Unit tests for `FilterContext`**: Verify construction with and without parent resource, getter behavior. +- **Unit tests for default `filterAsync`**: Verify the default implementation returns the full list unchanged. +- **Unit tests for `AuthorizationService.filterAsync`**: + - Returns the full list when `authorizationEnabled=false` (provider is never called). + - Delegates to the provider when `authorizationEnabled=true`. +- **Unit tests with a custom filtering provider**: Register a mock `AuthorizationProvider` that filters based on role and resource type; verify correct filtering for each `ResourceType`. +- **Integration tests for each list endpoint** (clusters, tenants, namespaces, topics): + - With a no-op filter provider: verify the endpoint returns the same results as before. + - With a filtering provider: verify the endpoint returns only the permitted subset. + - Verify that `parentResource` is correctly populated in the `FilterContext` (null for clusters/tenants, tenant name for namespaces, namespace name for topics). +- **Thread safety test**: Verify that a `filterAsync` implementation performing async metadata lookups does not deadlock or block the calling thread. + +# Alternatives + +## Per-resource-type methods (e.g., `filterTenantsAsync`, `filterNamespacesAsync`) + +Using separate methods for each resource type would require adding a new method every time a new filterable resource type is introduced. A single method with `FilterContext` is more extensible. + +## Using `ContainerResponseFilter` + +The JAX-RS `ContainerResponseFilter` API is synchronous and cannot perform async authorization checks without blocking the calling thread. This leads to thread pool exhaustion and potential deadlocks when the filter needs to access metadata. + +## Replacing the existing auth gate with per-item filtering + +An alternative design would skip the all-or-nothing LIST permission check and rely solely on `filterAsync` to determine visibility. This was rejected because it changes the existing security model — deployments that rely on the 403 behavior for unauthorized users would silently start returning empty lists instead. The current design is additive: it layers filtering on top of the existing gate without altering its semantics. + +# Links + +* Mailing List discussion thread: +* Mailing List voting thread: