Guidance on how to run tonic over UnixStream::pair() or stdin/stdout
#2503
-
|
I have been trying for a while to run tonic over The model where we have to provide multiple connections have obviously a strong impedence mismatch with a pair of file descriptors which we are running the wires over. Here's a MWE that I cannot make to work, the thing stops because the server ends after the first yield of the Unix stream because of — I assume — EOF on the child_socket. use std::env;
use nix::fcntl::{FcntlArg, FdFlag};
use tokio::{
net::UnixStream, process::Command
};
use tonic::{transport::Server, Request, Response, Status};
use std::os::fd::BorrowedFd;
pub mod hello {
tonic::include_proto!("hello");
}
use hello::{
greeter_server::{Greeter, GreeterServer},
greeter_client::GreeterClient,
HelloReply, HelloRequest,
};
use std::os::fd::{AsRawFd, FromRawFd};
#[derive(Default)]
struct MyGreeter;
#[tonic::async_trait]
impl Greeter for MyGreeter {
async fn say_hello(
&self,
req: Request<HelloRequest>,
) -> Result<Response<HelloReply>, Status> {
Ok(Response::new(HelloReply {
message: format!("hello {}", req.into_inner().name),
}))
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
env_logger::init();
if env::var("CHILD").is_ok() {
println!("Running child");
run_child().await?;
} else {
println!("Running parent");
run_parent().await?;
}
Ok(())
}
async fn run_child() -> Result<(), Box<dyn std::error::Error>> {
let std_stream = unsafe {
std::os::unix::net::UnixStream::from_raw_fd(3)
};
std_stream.set_nonblocking(true)?;
let stream = UnixStream::from_std(std_stream)?;
println!("Starting server");
Server::builder()
.add_service(GreeterServer::new(MyGreeter::default()))
.serve_with_incoming(tokio_stream::once(Ok::<tokio::net::UnixStream, std::convert::Infallible>(stream)))
.await?;
println!("Server stopped");
Ok(())
}
async fn run_parent() -> Result<(), Box<dyn std::error::Error>> {
let (parent_sock, child_sock) = std::os::unix::net::UnixStream::pair()?;
let exe = env::current_exe()?;
println!("two unnamed UNIX sockets obtained");
println!("FD3 prepared");
let mut child = unsafe { Command::new(exe)
.env("CHILD", "1")
.pre_exec(move || {
let fd = child_sock.as_raw_fd();
nix::libc::dup2(fd, 3);
let fd3 = BorrowedFd::borrow_raw(3);
nix::fcntl::fcntl(fd3, FcntlArg::F_SETFD(FdFlag::empty()))?;
Ok(())
})
.spawn()? };
println!("Child spawned");
parent_sock.set_nonblocking(true)?;
let mut parent_sock: Option<_> = Some(hyper_util::rt::TokioIo::new(UnixStream::from_std(parent_sock)?));
let channel = tonic::transport::Endpoint::try_from("http://[::]:50051")?
.connect_with_connector(tower::service_fn(move |_| {
println!("Connecting to the FD3");
let io = parent_sock.take().expect("BUG: more than one service called");
async move {
Ok::<_, std::io::Error>(io)
}
}))
.await?;
println!("Creating client");
let mut client = GreeterClient::new(channel);
println!("Sending a request");
let resp = client
.say_hello(Request::new(HelloRequest {
name: "stdin".into(),
}))
.await?;
println!("RESPONSE = {:?}", resp.into_inner().message);
child.wait().await?;
Ok(())
}syntax = "proto3";
package hello;
service Greeter {
rpc SayHello (HelloRequest) returns (HelloReply);
}
message HelloRequest {
string name = 1;
}
message HelloReply {
string message = 1;
}I know that Golang has gRPC over stdout/stdin for Terraform things, I wonder how we could achieve something similar in Rust. |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 3 replies
-
|
The issue is that tonic expects a stream of incoming connections, not a single connection. For a stdin/stdout or |
Beta Was this translation helpful? Give feedback.
Sorry, misread the MWE. The likely fix is to chain your
once(Ok(stream))withfutures::stream::pending()so the accept loop never observes end-of-stream. The server task then only exits when you drop it. Glad to hearpending()worked for you.