From c642c9716e463ee33f41a30ab5a0d4e6380f855d Mon Sep 17 00:00:00 2001 From: Konstantin Tcepliaev Date: Thu, 5 Mar 2020 14:54:06 +0000 Subject: [PATCH] support asynchronous responses to unary requests. --- src/grpcbox_stream.erl | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/src/grpcbox_stream.erl b/src/grpcbox_stream.erl index 76f71ba..2302da5 100644 --- a/src/grpcbox_stream.erl +++ b/src/grpcbox_stream.erl @@ -8,6 +8,7 @@ -export([send/2, send/3, + unary_reply/2, send_headers/2, add_headers/2, add_trailers/2, @@ -50,6 +51,7 @@ resp_trailers=[] :: list(), headers_sent=false :: boolean(), trailers_sent=false :: boolean(), + async_unary=false :: boolean(), unary_interceptor :: fun() | undefined, stream_interceptor :: fun() | undefined, stream_id :: stream_id(), @@ -260,6 +262,8 @@ handle_unary(Ctx, Message, State=#state{unary_interceptor=UnaryInterceptor, {ok, Response, Ctx2} -> State1 = from_ctx(Ctx2), send(false, Response, State1); + {noreply, Ctx2} -> + (from_ctx(Ctx2))#state{async_unary=true}; E={grpc_error, _} -> throw(E) end. @@ -268,6 +272,9 @@ on_end_stream(State) -> on_end_stream_(State), {ok, State}. + +on_end_stream_(#state{async_unary=true}) -> + ok; on_end_stream_(#state{input_ref=Ref, callback_pid=Pid, method=#method{input={_Input, true}, @@ -365,10 +372,16 @@ ctx(#state{handler=Pid}) -> ctx(#state{handler=Pid}, Ctx) -> h2_stream:call(Pid, {ctx, Ctx}). +unary_reply(Message, Ctx) -> + #state{handler=Pid} = from_ctx(Ctx), + h2_stream:call(Pid, {unary_reply, Message}). + handle_call(ctx, State=#state{ctx=Ctx}) -> {ok, Ctx, State}; handle_call({ctx, Ctx}, State) -> - {ok, ok, State#state{ctx=Ctx}}. + {ok, ok, State#state{ctx=Ctx}}; +handle_call({unary_reply, Message}, State) -> + {ok, ok, end_stream(send(false, Message, State))}. handle_info({add_headers, Headers}, State) -> update_headers(Headers, State);