Skip to content

Added gRPC interface definition for data cache#46

Open
amolfnal wants to merge 4 commits intomainfrom
data-cache
Open

Added gRPC interface definition for data cache#46
amolfnal wants to merge 4 commits intomainfrom
data-cache

Conversation

@amolfnal
Copy link

Added gRPC interface for data cache (Kafka)

@amolfnal amolfnal requested a review from cnlklink November 17, 2025 19:27
Copy link
Contributor

@beauremus beauremus left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we consider off-the-shelf solutions?
https://github.com/aklivity/zilla
https://github.com/mailgun/kafka-pixy

rpc CreateTopic(CreateTopicRequest) returns (CreateTopicResponse);

// Send a single message to a topic
rpc Produce(ProduceRequest) returns (ProduceResponse);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Our data logger process/service is likely a good example of why a streaming producer is a good idea.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Streaming is added to the gRPC interface

rpc Produce(ProduceRequest) returns (ProduceResponse);

// Stream messages from a topic (server streaming)
rpc Consume(ConsumeRequest) returns (stream ConsumeResponse);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not familiar with Kafka standard operations. Is there a way to request multiple topics? This seems to imply a socket connection per device, which I think is a single device at a single rate.

Copy link
Contributor

@jacob-curley-fnal jacob-curley-fnal Nov 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most implementations of a kafka consumer support one-consumer-many-topics setups.

Seems like the question is where we want the complexity to emerge. If each consumer is on one topic, we could have data from a single consumer be streamed to any requesting external client, and thereby limit the maximum number of active consumers. Allowing arbitrary combinations of topics means it gets harder to reuse consumers for different clients.

On the other hand, we have the concern you bring up, of a single client now needing many separate connections to listen on many topics, instead of a few connections.

But there's also the question of why we'd want data from many topics to be mangled into one stream. Usually a topic contains a specific kind of data. The data can come from many sources, but the idea is each topic is its own little pool of things that can be operated on or reasoned about in the same way. Allowing consumers to stream many topics back in one connection kinda breaks this pattern, making it harder to know what the data is that we're getting, and forcing external clients to implement a bunch of logic to disambiguate the data that comes in.

Copy link
Author

@amolfnal amolfnal Nov 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Normally, consumer has ability to consume messages from different topics. We can make an interface which has ability to consumer messages from different topics with single gRPC call.
is this the requirement?


message ConsumeRequest {
string topic = 1;
string group_id = 2;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is group_id? It doesn't mirror the ProduceRequest.

Copy link
Contributor

@jacob-curley-fnal jacob-curley-fnal Nov 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a kafka-ism. Each consumer of a topic can optionally be in a group. If in a group, kafka will spread consumers across the partitions for the topic as evenly as possible. If there are fewer consumers in the group than partitions, some consumers will get multiple partitions. If there are more consumers than partitions, some consumers will not get any data at all. Probably better to not use groups unless we're sure we're ok with some client only getting some of the messages from a topic.

EDIT: Wanted to clarify, if a consumer is not in a group, or if it is the only consumer in its group, it will get all the partitions of a topic (unless the consumer has been configured to listen on a specific subset of partitions, which is also a possibility). And you can have as many consumers listening to the topic that you want. They only get throttled when in the same group.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding group_id, I should not include (thank you Beau nice catch)
This is more kafka related which may not be present if we changed from kafka to something else
Therefore, I should remove this.....

Copy link
Contributor

@rneswold rneswold left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm guilty of not putting enough comments in my .proto files. But a lot of the fields in these messages have very generic names: topic, key, name, group_id, etc. (maybe these are recognized in the Kafka world?) Some comments would help understand what they're purpose is.

For instance, in the response messages, you have a success field and a message field. Is the message field to describe an error if success is false? If so, what happens if success is true, but an error message is sent?

If those fields are paired like that, I'd get rid of them and make one optional field: errMessage. If the error message is missing, then it's success. If it's there, something went wrong. There's no way to have an invalid state.

If they're not tied together, then comments would be nice to understand why they're needed and what they indicate.

@jacob-curley-fnal
Copy link
Contributor

I guess this is more for @beauremus , though maybe you also have an answer Amol, but what's the gain from putting Kafka behind a gRPC service, again? I'd heard an argument about "it keeps all our internal services speaking gRPC", but is that all?

For example, we have several services that know how to speak Postgres, and that seems to work ok. In fact, we had a whole meeting where it was explicitly determined that we would have many Postgres-fluent services instead of one Postgres-Service-To-Rule-Them-All.

I see the move to put Kafka behind a service in a similar vein. Out of the box, Kafka is designed to handle enterprise-scale messaging, at thousands or even millions of messages a second. Seems like a big waste to hide that capability behind another network hop, just so we can say we limited the number of things that need to know how to talk to Kafka.

Not here to say there aren't good reasons out there, but I haven't heard anything concrete of what exactly is motivating this architectural decision, when we've had a similar discussion in the past about Postgres that went the other way. As an alternative idea, what if we wrote up a library for talking to Postgres and another for talking to Kafka? Microservices that needed a connection to one or the other could just add the library as a dependency. It would keep each microservice in charge of its own data connections, rather than demanding everyone goes through a central service. But it still buys us the benefit of only writing the nitty-gritty implementation of Postgres/Kafka logic once. And we avoid accidentally creating a bottleneck in the control system that doesn't need to be there.

Again, just wanted to throw this out there as a means of sparking discussion - not meant to be construed as a demand for going any particular direction. Appreciate anyone who takes the time to engage!

@rneswold
Copy link
Contributor

but what's the gain from putting Kafka behind a gRPC service, again?

Very true. Our services should use gRPC. But we're not trying to gRPC-ize the APIs of products. All gRPC services that need Postgres can use it directly. All gRPC services that leverage Kafka should use it directly. The gRPC APIs are providing a control system service -- not an alternate, generic API for these products.

@cnlklink
Copy link

I think I agree with @rneswold and @jacob-curley-fnal. The benefit of our own gRPC layer wrapping Kafka that I see is it keeps us de-coupled from Kafka as a technology choice. We can swap out Kafka for other message queue technology at a later date, should we chose, without needing to modify all of the down-stream services. But TBH, I think the cost-benefit isn't there. This API would need to scale at the same level as Kafka - is that a reasonable expectation? How much effort will that take? Instead, given that Kafka is a mature and popular open-source product we should just embrace it's API.

@cnlklink
Copy link

And, as @jacob-curley-fnal suggest, I think a better way to mitigate the risk of abandoning Kafka in the future would be to use native adapters rather than a microservice.

@rneswold
Copy link
Contributor

The benefit of our own gRPC layer wrapping Kafka that I see is it keeps us de-coupled from Kafka as a technology choice.

But we're not de-coupled if the gRPC uses Kafka terms and data structures. Because if we move away from Kafka, then we're trying to make our Kafka-compatible gRPC API fit whatever new backend we chose.

@amolfnal
Copy link
Author

Thank you all for expressing your opinion about it.
There are many motivations behind the gRPC adoption

  1. Unified communication mechanism for the microservices
  2. Abstraction layer to adopt another technology like kafka which can satisfy the data cache requirement
  3. Controlled functionality of the framework
    For example, FaaS has capability that the user can create many environments. However, it can create problem to host the FaaS service. For example if we have 1000 users and each user will create 10 environment then there will be 10000 x 3 pods / environment = 30000 pods which we do not have capacity to host on our K8S cluster. That's why I am thinking not to give environment creation functionality to the user which has two reasons
    3.1. Many users host the same environment with different name (unoptimized utilization of the computing resource)
    3.2. Accountability of what is getting installed on the containers
  4. These all are open source technologies.
    Therefore, there is risk that can damage the system if open source code is compromised. Therefore, if I kill gRPC server then there will not be any communication to and from data cache which can protect other modules of the central service. I agree that we are creating a bottle neck. However, it can be mitigated by hosting proper gRPC server which can satisfy the control system requirements and scale up/down depending on workload. Moreover, I believe that the K8S cluster is/going to be interconnected with 100 GB/s network
  5. Customization
    I am also planing to include one gRPC call to push the same message to multiple kafka topics also consumer can consume messages from different topics (I know one java program which takes two properties/values and add them together and push to another property). These can be implemented with single gRPC call instead of two kafka native API calls

@jacob-curley-fnal
Copy link
Contributor

Thanks for your comments, Amol! I think there's a good amount to talk about, so we might benefit from getting folks in a room. I've been working on a graphQL -> Kafka endpoint as part of extapi-acsys, specifically for grabbing alarms. I know you've been working on data caching for device readings. This raises the question of are we talking about one Kafka instance servicing all the control system's needs, or will there be several instances for different purposes? That might impact how we decide to talk to it, or it might not. Either way, I had a whole long thing written out, but might just be more efficient if we have a meeting to be sure everyone's on the same page. Thoughts?

@cnlklink
Copy link

@jacob-curley-fnal I like the idea of having a meeting to discuss this too.

@amolfnal
Copy link
Author

Definitely, I would love to join application team meeting to discuss about interfaces.
I have already requested @mguzman04 @beauremus for meeting request.

Copy link
Contributor

@jacob-curley-fnal jacob-curley-fnal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ran into some issues when attempting to call the service from the alarms server. Would be really nice to use lists when we're expecting messages with multiple instances of a single field, rather than clients having to know to parse a comma-delimited string.

}

message PushMQsRequest {
string mq_names = 1; // list of topics
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use repeated here, so clients can provide a list of names.

}

message ConsumersResponse {
string keys = 1; // associated key or keys
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use repeated here, so clients can parse the list of keys


message ConsumersResponse {
string keys = 1; // associated key or keys
bytes values = 2; // response
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use repeated here, so clients can parse the list of messages.

message ConsumersResponse {
string keys = 1; // associated key or keys
bytes values = 2; // response
string offsets = 3; // offset or many offsets
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

repeated int32 or repeated int64 would be better here

string keys = 1; // associated key or keys
bytes values = 2; // response
string offsets = 3; // offset or many offsets
string partitions = 4; // partition or many partitions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use repeated for multiple objects returned

Comment on lines +59 to +67
message PullMQRequest {
string mq_name = 1; // topic name
int32 timeout_seconds = 2; // optional, stop after secs (0 = no timeout)
}

message PullMQsRequest {
string mq_names = 1; // list of topics
int32 timeout_seconds = 2; // optional, stop after secs (0 = no timeout)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be nice to have an option to specify a starting offset. If the connection to the gRPC server drops, I don't want to get all the messages again when I reconnect.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about this more, I'm seeing that the ConsumersResponse object has the partitions and offsets, and those are a little confusing to me. Is the intention that each message will correspond to an index in the lists? So index 1 in the keys field will line up with index 1 in the values, offsets, and partitions fields?

If the above is true, may I suggest a much clearer option? Use this structure instead:

message DataMessage { // Or some other name that makes sense to you, doesn't need to be DataMessage necessarily
    string key = 1;
    bytes value = 2;
    uint64 offset = 3;
    uint32 partition = 4;
}
message ConsumersResponse {
    repeated DataMessage messages = 1;
}

Copy link
Contributor

@jacob-curley-fnal jacob-curley-fnal Feb 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But now I have a further question: are clients expected to keep track of the partitions that messages come from? If the network drops my connection to the server, am I going to have to specify the latest offset for all the partitions I know about, if I want to avoid pulling down messages I've already seen? And is this exposing too much of Kafka's guts to the clients? As a client, I'd like to see topics as a singular stream, and not need to care about partitions.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I want to keep it generalized so that when/if kafka will get replaced then again we don't need to replace the PullMQRequest. However, I like the idea of offset. Because irrespective of any MQ (Redis, Kafka and RabbitMQ) offset can be generalized and get the message with offset from the stream. Let me update the code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, this gets to the heart of the problem. I think we're hiding Kafka at too high of a level.

By hiding it inside a singleton service like this, we have N clients using 1 Kafka connection, with new clients joining the stream at arbitrary times. The service now needs to track which client has seen what messages, or the client has to track offsets (and then the service will need to map the client's offset to the order in which messages came back from the different partitions; partition 0 and partition 1 can both have a message at offset 5, but they'll be different messages). This is a boatload of complexity being distributed into the control system, just to avoid using Kafka's out-of-the-box, performant, enterprise-scale solution (groups)!

If, instead, we hide Kafka inside a library that each client depends on, and each client is therefore responsible for its own connection to Kafka, then we can use Kafka's group mechanism to let it handle reconnecting from the correct position for us. The business logic of the clients still won't know they're talking to Kafka, as that detail is hidden in the library. We can now have our cake and eat it too. Yes, this does add a little more cost to converting to a different platform than Kafka, but we'd just update the details in the library and push it to all the dependent applications. Automated tools like Dependabot will make this very easy.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me be clear once again, we are not going to support native calls to the any central service (including Kafka).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants