Skip to content

[Feature Request]: Add take(n) convenience method to PCollection #37429

@shaheeramjad

Description

@shaheeramjad

What would you like to happen?

Currently, to get the first N elements from a PCollection, users need to use Top.Of() or Sample.FixedSizeGlobally(), which are more complex than needed for this common use case. Additionally, Top.Of() returns a list wrapped in a PCollection, requiring an extra FlatMap step to get individual elements.

Current approach (complex):

# Using Top.Of - returns a list, needs flattening
first_10 = pcoll | beam.transforms.combiners.Top.Of(10, key=lambda x: 0)
first_10_flat = first_10 | beam.FlatMap(lambda x: x)

# Using Sample - non-deterministic and also returns a list
first_10 = pcoll | beam.transforms.combiners.Sample.FixedSizeGlobally(10)
first_10_flat = first_10 | beam.FlatMap(lambda x: x)

Desired approach (simple):

# Simple and intuitive
first_10 = pcoll | beam.take(10)

# Or as a method
first_10 = pcoll.take(10)

I would like to add a take(n) convenience method to PCollection that:

  1. Takes the first N elements deterministically - Uses Top.Of() internally with a constant key function
  2. Returns individual elements - Automatically flattens the list returned by Top.Of()
  3. Preserves type hints - Maintains the element type of the input PCollection
  4. Provides both function and method syntax - Can be used as beam.take(n) or pcoll.take(n)

This enhancement will significantly improve the developer experience for common debugging, testing, and prototyping scenarios where users need to inspect or work with a limited subset of their data.

Use cases:

  • Debugging: Quickly inspect a sample of pipeline data
  • Testing: Test pipelines with limited data subsets
  • Prototyping: Build and validate pipelines with small datasets
  • Validation: Sample data for quality checks

Implementation approach:

  • Add a Take transform class in transforms/util.py that wraps Top.Of() with a constant key
  • Add a take() convenience function
  • Add a take() method to the PCollection class
  • Automatically flatten the list result from Top.Of() to return individual elements
  • Add comprehensive tests and documentation

This follows the same pattern as other convenience transforms like Filter(), Map(), and FlatMap(), making the API more consistent and user-friendly.

Issue Priority

Priority: 2 (default / most feature requests should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Infrastructure
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions