Conversation
The EPICS archiver sends one point at a time which is too slow for the plotting app (the plotting app does a lot of work each time it gets a packet of data.) This stream combinator will be used to buffer up a large number of points to amortize the cost of updating the graph.
Since we're only going to ever instantiate one instance of this stream type, there won't be code bloat by using generic parameters. However, I can instantiate a max payload of 4, for instance, when writing unit tests.
Due to inexperience, I was boxing each stream that was built. Each combinator I created added another heap allocation and required a dereference when pulling elements. With the help of an AI, the stream APIs have been refactored to use generics and `impl Stream<>` monomorphisms to create combined streams that don't require heap allocations (besides any underlying vectors they might use in their state.)
|
Before merging, I need to make one more change. |
|
I'll check back when the new changes are added, but here are my initial comments: 1.) I don't think I know enough about the context to give as good a review as I'd like on the strategies employed here. My gut reaction is I'm a little worried we're implementing a lot of our own streams, when there are some out-of-the-box options that would (I think) require less long-term maintenance, and certainly decrease cognitive overhead. Again, that is a naive reaction as I don't know all the details, so don't take that as me recommending we actually go that route. Only bringing it up in case there was a way to make use of them; I'm having good luck returning an instance of 2.) Sort of related to 1.), but the 3.) Final thought, also building on 1.) and 2.), is more architectural. It looks like these changes are made to knit together different streams of data so we can shoot them back to clients as a single stream. That appears to involve a good amount of logic. Question I want to raise is, "How 'heavy' do we want Overall, I've no objection to these changes going in! Just a reminder about the caveat from 1.), where I don't have insights into the motivations or context for these changes, so bear that in mind. Good stuff here! |
beauremus
left a comment
There was a problem hiding this comment.
I agree with @jacob-curley-fnal's comments.
I'm willing to approve, but this is so much that I'd like more eyes on it and ideally some high-level discussion about structures.
Let me know how you want to proceed @rneswold.
| device: &str, start_time: f64, end_time: f64, | ||
| ) -> Result<impl Stream<Item = global::DataReply> + Send + 'static + Unpin> | ||
| { | ||
| const BASE_URL: &str = |
There was a problem hiding this comment.
Is this temporary?
We want config like this defined in the K8s config and not code. Via a envar.
Also, no SSL.
Do both these get resolved when we move to gRPC?
There was a problem hiding this comment.
I would imagine that, when PIP-II gets closer to commissioning, they'll have a pool of archivers. This is mostly proof-of-concept. Most of the archived data is spotty and there aren't many PVs registered.
Eventually we'll have the production endpoint and I'll generalize the configuration for it.
| .timestamp_opt(seconds, nanoseconds) | ||
| .earliest() | ||
| .unwrap_or_else(|| { | ||
| Utc.timestamp_millis_opt(0).unwrap() // Default to epoch start |
There was a problem hiding this comment.
Should we log a warning in this case so we can more easily count the number of failures?
There was a problem hiding this comment.
Maybe? It's set-up to return Jan 1, 1970. Any client should be suspicious of that date. I'm not sure a log message improves anything.
| } | ||
|
|
||
| #[instrument(name = "EPICS_ARCH", skip(device, start_time, end_time))] | ||
| async fn epics_archived_data( |
There was a problem hiding this comment.
I realize that the implementation of archived_data is quite different, but I expect them to share a function signature or a HistoricalData abstract class that they both implement.
There was a problem hiding this comment.
This function returns a DataStream, just like our ACNET archived_data method does.
There was a problem hiding this comment.
And the signatures are almost the same. The ACNET one uses the Context argument to obtain the DPM gRPC endpoint. The archiver does a single HTTP request so it doesn't need to preserve any state in Context<>.
| task::{Context, Poll}, | ||
| }; | ||
| use tracing::warn; | ||
|
|
There was a problem hiding this comment.
This is tough to grok.
I'm a bit concerned about absurd user historical requests bringing a pod to its knees due to buffering.
I think we could leverage concurrency and separation of concerns a bit more to make this simpler to understand. (concurrency == simpler 🤣)
Here's what I worked with copilot to mock up:
- Stream processing:
use tokio::sync::mpsc;
use tokio::task;
async fn process_archived_data(
archived: impl Stream<Item = global::DataReply> + Send + 'static + Unpin,
tx: mpsc::Sender<(i32, global::DataReply)>,
) {
tokio::pin!(archived);
while let Some(reply) = archived.next().await {
if tx.send((reply.ref_id, reply)).await.is_err() {
break; // Receiver dropped
}
}
}
async fn process_live_data(
live: impl Stream<Item = global::DataReply> + Send + 'static + Unpin,
tx: mpsc::Sender<(i32, global::DataReply)>,
) {
tokio::pin!(live);
while let Some(reply) = live.next().await {
if tx.send((reply.ref_id, reply)).await.is_err() {
break; // Receiver dropped
}
}
}
- Merging
async fn merge_streams(
rx: mpsc::Receiver<(i32, global::DataReply)>,
) -> impl Stream<Item = global::DataReply> {
tokio_stream::wrappers::ReceiverStream::new(rx)
.filter_map(|(ref_id, reply)| async move {
// Apply any filtering or ordering logic here
Some(reply)
})
}
- Execution
async fn data_merge(
archived: impl Stream<Item = global::DataReply> + Send + 'static + Unpin,
live: impl Stream<Item = global::DataReply> + Send + 'static + Unpin,
) -> impl Stream<Item = global::DataReply> {
let (tx, rx) = mpsc::channel(100);
// Spawn tasks for processing streams
task::spawn(process_archived_data(archived, tx.clone()));
task::spawn(process_live_data(live, tx));
// Merge the streams
merge_streams(rx).await
}
You've clearly thought about this a lot more than me. So don't let me hold this up, but I hope this plants some seeds.
There was a problem hiding this comment.
I would have used the combinators in futures::StreamExt if they met our needs, but they don't. For instance, your merge would have both streams write to the mpsc channel as the data comes in. My implementation returns the points in timestamp order so all the archive data gets returned first. Otherwise all our clients would need to sort by timestamp.
If I used .chain() to force one stream to finish before the other, then DPM could timeout if the archived data takes too long, so I wanted to pull and buffer DPM data so it didn't drop the subscription.
I'll try to clean some of this up, but I feel these data streams do stuff that the standard combinators don't. And the main reason is because:
- We want to sort incoming data from multiple streams using the timestamp
- DPM sends an empty array to indicate end-of-archived-data but doesn't close the stream
- We're trying to return plot data in 15 Hz batches so that all channels get drawn together
- Our stream of live data contains data from multiple devices, so we need to keep track of each's state
These are the sorts of problems the standard combinators don't solve.
The gRPC API returns a Starting a background task to put stream entries into a However, making the code easier to understand is a worthy goal. I appreciate everyone's thoughts and comments! |
We could Zoom or meet in the Cooler on a day that Beau and I are onsite to go through it in more detail. |
Code Coverage Report - 1,487 of 3,627 lines covered ( ⛔ 41.00%)
|
This should make it reach 100% coverage.
Code Coverage Report - 1,555 of 3,693 lines covered ( ⛔ 42.11%)
|
Code Coverage Report - 1,564 of 3,700 lines covered ( ⛔ 42.27%)
|
This should make the coverage 100%.
Code Coverage Report - 1,565 of 3,700 lines covered ( ⛔ 42.30%)
|
This adds a 2 second timeout for connecting to the clock service. Before this, if the clock service wasn't running, it would take a long time before the request would time-out.
The EPICS archiver appliance uses HTTP 1.1 to return the data. Initially, the GraphQL resolver had to wait for the whole document to be delivered before it could parsed and sent on to the client. The appliance returns the document in chunked format. With the help of AI, this commit is able to do a little hackery with the incoming stream of chunks. It is able to parse the data returns as they arrive which greatly speeds up the plotting of the data.
|
@cnlklink and I are documenting some minor issues with the latest commit so there will probably be a future commit to fix them. I also want to add a few unit tests for the new code. So it's not ready to merge, yet. |
This PR adds support to the
acceleratorDatasubscription to retrieve historical PV data from the Archiver Appliance.