Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ pom.xml
pom.xml.asc
*.jar
*.class
/.lein-*
*.lein-*
/.nrepl-port
.hgignore
.hg/
Expand Down
2 changes: 1 addition & 1 deletion modules/grpc-client/src/protojure/grpc/client/utils.clj
Original file line number Diff line number Diff line change
Expand Up @@ -54,5 +54,5 @@ resolves to a decoded result when successful. Used in remote procedure calls wi
| **ch** | _core.async/channel_ | A core.async channel expected to carry the response data |
"
[client params ch]
(-> (grpc/invoke client params)
(-> (grpc/invoke client (assoc params :unary? true))
Copy link
Member

@ghaskins ghaskins Feb 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about this: I'm not sure that we should make this specific to unary. If something is malformed, its probably best to fail the call regardless of unary or streaming. Silently dropping is probably both wrong and confusing regardless of the type. Thoughts?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went back and forth on that… I think I did recall that if an exception is thrown in the streaming case, the pipeline breaks — but I can’t remember if we went back to make that more resilient. I’ll check and get back on this PR after I’m done traveling this evening.

My main concern was whether we’d want to fail a client call out entirely for a single bad message in the streaming case — I think we’d ideally try to gracefully continue in the streaming case, but I will check to see whether or not thats how we’ve implemented the client currently.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In going back to test this without the :unary? check on the encode-promise, the test suite hangs on my local machine. I'd need to dig in further to understand why (I'm not certain if its a bug with the change to a p/all waiting on the pipeline to report, or a race condition.

When you get a moment, is there anything that jumps out to you about this change that would cause UTs to hang if the change is applied to streaming calls as well?

(p/then (fn [_] (take ch)))))
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
[{:keys [f] :as input} codecs content-coding max-frame-size]
(when (some? input)
(let [input-ch (:ch input)
output-ch (async/chan 16)]
(lpm/encode f input-ch output-ch {:codecs codecs :content-coding content-coding :max-frame-size max-frame-size})
output-ch)))
output-ch (async/chan 16)
encode-promise (lpm/encode f input-ch output-ch {:codecs codecs :content-coding content-coding :max-frame-size max-frame-size})]
[output-ch encode-promise])))

(defn- codecs-to-accept [codecs]
(clojure.string/join "," (cons "identity" (keys codecs))))
Expand Down Expand Up @@ -142,8 +142,8 @@
(deftype Http2Provider [context uri codecs content-coding max-frame-size input-buffer-size metadata]
api/Provider

(invoke [_ {:keys [input output] :as params}]
(let [input-ch (input-pipeline input codecs content-coding max-frame-size)
(invoke [_ {:keys [input output unary?] :as params}]
(let [[input-ch encode-promise] (input-pipeline input codecs content-coding max-frame-size)
meta-ch (async/chan 32)
output-ch (when (some? output) (async/chan (max 32
(/ input-buffer-size max-frame-size))))]
Expand All @@ -155,7 +155,10 @@
(safe-close! output-ch)
(async/close! meta-ch)
(safe-close! input-ch)
(throw ex))))])))
(throw ex))))
;;FIXME Exception handling and propagation from the encode and decode pipelines can be further improved
;; For streaming use cases, we do not want to fail for one bad input, for unary we do
(if unary? encode-promise nil)])))
(p/then (fn [[_ status]]
(log/trace "GRPC completed:" status)
status)
Expand Down
2 changes: 1 addition & 1 deletion test/project.clj
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
(defproject io.github.protojure/test "2.0.2-SNAPSHOT"
(defproject io.github.protojure/test "2.0.7-SNAPSHOT"
:description "Test harness for protojure libs"
:url "http://github.com/protojure/lib"
:license {:name "Apache License 2.0"
Expand Down
13 changes: 13 additions & 0 deletions test/test/protojure/grpc_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,19 @@
(p/then (fn [{:keys [message] :as result}]
(is (= message "Hello, World"))))))))

(deftest unary-grpc-encoding-exception-check
(testing "Check that an error in encoding a protobuf propagates out of pipeline"
(let [input (async/chan 1)
output (async/chan 16)
client (:grpc-client @test-env)
desc {:service "example.hello.Greeter"
:method "SayHello"
:input {:f new-HelloRequest :ch input}
:output {:f pb->HelloReply :ch output}}]

(is (thrown? Exception @(-> (client.utils/send-unary-params input {:name nil})
(p/then (fn [_] (client.utils/invoke-unary client desc output)))))))))

;;Note there are qualitative differences between this nil & error check and the below grpc-failing-status-check test
;; this test is run against an endpoint registrered in the pedestal interceptor stack, and so exercises
;; protojure.pedestal.interceptors.grpc pedestal interceptor, where the aforementioned below test does not
Expand Down