Skip to content

Initial support of querying sdk worker status#3

Open
xinyu-liu-glean wants to merge 2 commits intotimmy-2.59from
sdk-worker-status
Open

Initial support of querying sdk worker status#3
xinyu-liu-glean wants to merge 2 commits intotimmy-2.59from
sdk-worker-status

Conversation

@xinyu-liu-glean
Copy link

@xinyu-liu-glean xinyu-liu-glean commented Jun 4, 2025

This patch adds the WorkerStatus grpc server on the taskManager side, which allows the python sdk worker to connect and report status. Also created a httpServer inside the taskManger so we can query the endpoint to get the stack dump as well as heap dump.

This feature is gated by the --enable_worker_status option. We will add configs to enable this option.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

Comment on lines +54 to +57
if (DefaultJobBundleFactory.getEnableWorkerStatus(jobInfo)) {
SdkWorkerStatusServer.create();
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for learning's sake: can you explain why we initialize the server here?

My understanding is this: FlinkExecutableStageContextFactory is called upon initialization of an ExecutableStageDoFnOperator on the taskmanager. Ie, each time the taskmanager loads an operator (which encapsulates a series of beam transformations ie. executable stage)

SdkWorkerStatusServer is a wrapper that invokes BeamWorkerStatusGrpcService which is connected to grpc clients on the sdk harnesses and will use that to fetch + aggregate their statuses. (Instrumenting the status grpc service is something that needs to be done in future PRs as well)

The changes in DefaultJobBundleFactory below are to initialize the resources that the FlinkExecutableStageContextFactory will rely on (as job bundle is initialized before executable stage operators).

Is that right?

Copy link
Author

@xinyu-liu-glean xinyu-liu-glean Jun 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. The server is started during the creatiion of the ExecutableStageContext.Factory for this jobId. Based on the javadoc above ("This map should only ever have a single element..."), there should be only a single factory so the server should be created once. I also added some logic in the server to be safe. Since this server is only used when we run Beam Flink portable runner, I added to the FlinkExecutableStageContextFactory, instead of the common portablity part. such as DefaultJobBundleFactory or DefaultExecutableStageContext.

The changes in DefaultJobBundleFactory below is to create the WorkerStatus grpc server so we can fetch the worker statuf from there. Once the status grpc url is set into provision info, the python worker side should automatically connect to the status server. I am going to test this out to verify we can get worker status from the server endpoint.

Copy link

@steve-scio steve-scio Jun 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks! one follow up question if the boundary between DefaultJobBundleFactory vs ExecutableStageContextFactory is that the former is for all jobs while the latter is for portability framework jobs only, then theoretically today for non-portability jobs we'd be creating the GRPC server but never using it. is that right? (non blocking, given we don't use this custom fork for java jobs atm)

Copy link
Author

@xinyu-liu-glean xinyu-liu-glean Jun 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My experience is that DefaultJobBundleFactory is only used for portability framework too. The difference is that DefaultJobBundleFactory is used across runners while FlinkExecutableStageContextFactory is only used by Flink. For Java pipelines, Flink runs a different runner (FlinkRunner) instead of FlinkPipelineRunner (portability). For all open source runners, they have both java runner and portability runner. I am not sure about dataflow though. Do you know whether Google converged into a single (portable) runner?

The cost of adding the grpc server and http server without much traffic should be pretty small. I am thinking whether we can add this by default so we can do profiling anytime. Right now I use a pipelineoption to gate it. But it seems pretty cumbersome to pass in the option and restart the pipeline.

@steve-scio
Copy link

adding @timmy-xiao-glean you might be interested in this stuff

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants

Comments