Conversation
Signed-off-by: Subhobrata Dey <sbcd90@gmail.com>
Signed-off-by: Subhobrata Dey <sbcd90@gmail.com>
|
I marked this as 0.4, hopefully paho.mqtt new version will be released before our new version and we can get this one in. I'll review this PR asap |
|
Seems like musl doesn't like paho.mqtt? Is there a way to fix it? I see this issue is open here: eclipse-paho/paho.mqtt.rust#57 One thing I wanted with the integration between sdk-rust and mqtt is a crate supporting no_std (given we're also investing a lot of time in supporting no std #94). Is there any particular reason why @sbcd90 you chose paho.mqtt? Did you considered other mqtt clients supporting no_std? |
|
I did evaluate https://github.com/bytebeamio/rumqtt . I can use any other library. |
|
Please let me know if i should try switching the library. |
|
A Paho release went out last week (v0.8). Another (v0.9) is in active development and should be out by the end of the year. The initial purpose for the next release was to get HTTP/S proxy support for websocket connections, but a few other additions popped up, and now we're also going to push for better cross-compiling support and fully static linking. That includes (optional) static OpenSSL linking and getting that musl support stable. |
|
@sbcd90 I think we can go ahead and work on this one, but let's be explicit we're supporting paho, so maybe rename it to |
Signed-off-by: Subhobrata Dey <sbcd90@gmail.com>
@slinkydeveloper I have set version to |
| } | ||
|
|
||
| impl ConsumerMessageDeserializer { | ||
| fn get_mqtt_headers(message: &Message) -> Result<HashMap<String, Vec<u8>>> { |
There was a problem hiding this comment.
Do we need the result return here? I don't see any failing stmt here
| fn encoding(&self) -> Encoding { | ||
| match ( | ||
| self.headers | ||
| .get("content-type") |
There was a problem hiding this comment.
Can you use the constant in headers?
| headers::MqttVersion::V3_1_1 => { | ||
| StructuredDeserializer::into_event(ConsumerMessageDeserializer::new(msg)?) | ||
| } | ||
| } |
There was a problem hiding this comment.
I may miss some knowledge about mqtt here, but why do we need the headers version? Also, why in V5 only Binary Deserializer is supported (aka only binary mode is supported), more than the general MessageDeserializer?
Also, can you infer the headers version from Message more than providing it manually as parameter?
There was a problem hiding this comment.
now, V5 should allow both structured & binary mode. V3 only structured mode.
slinkydeveloper
left a comment
There was a problem hiding this comment.
So i have some concerns about the whole v3/v5 difference, why did you defined an extension for it? do we need it?
cloudevents-sdk-paho-mqtt/src/lib.rs
Outdated
| //! This library provides Mqtt protocol bindings for CloudEvents | ||
| //! using the [paho.mqtt.rust](https://github.com/eclipse/paho.mqtt.rust) library.\\ |
There was a problem hiding this comment.
Can we have here a sample usage in the docs? (look at actix-web integration module for details on how)
| use cloudevents::{EventBuilderV10, EventBuilder}; | ||
| use cloudevents_sdk_mqtt::{MessageRecord, MessageBuilderExt, MqttVersion, MessageExt}; | ||
|
|
||
| fn consume_v3(broker: &str, topic_name: &str) { |
There was a problem hiding this comment.
Can you make the version just an argument more than 2 separate functions?
| process::exit(1); | ||
| }); | ||
|
|
||
| if let Err(err) = block_on(async { |
There was a problem hiding this comment.
I wonder, can you make the consume_v3 function async in order to push the "async concern" to the outer scope (caller of consume)
| process::exit(1); | ||
| }); | ||
|
|
||
| if let Err(err) = block_on(async { |
| } | ||
| } | ||
|
|
||
| fn produce_v3(broker: &str, topic_name: &str) { |
| .long("mode") | ||
| .help("enter \"consmer\" or \"producer\"") | ||
| .takes_value(true) | ||
| .possible_values(&["consumerV3", "producerV3", "consumerV5", "producerV5"]) |
There was a problem hiding this comment.
Can you make the version a separate arg and choose some default?
| let mut hm = HashMap::new(); | ||
| let prop_iterator = message.properties().iter(PropertyCode::UserProperty); |
There was a problem hiding this comment.
Do we really need this copy? Can't you just access to the user properties in the deserialize_binary method?
| self.headers | ||
| .get(headers::MQTT_VERSION_HEADER) | ||
| .map(|s| String::from_utf8(s.to_vec()).ok()) | ||
| .flatten() | ||
| .map(|s| s.eq(headers::MQTT_V5_BINARY)) | ||
| .unwrap_or(false), |
There was a problem hiding this comment.
So do we expect incoming messages to have this header?
|
|
||
| pub fn from_event(event: Event) -> Result<Self> { | ||
| match event | ||
| .extension(headers::MQTT_VERSION_HEADER) |
There was a problem hiding this comment.
Is this part of the spec?
Hi @slinkydeveloper , currently, v3 supports only structured mode. v5 supports both structured as well as binary mode according to spec. The spec doesnt define how to set this mqtt version info. Earlier, I was passing the flag as a parameter, now I pass it as an extension field. paho-mqtt passes version info as a param. Another approach, we can follow is, ignore the mqtt version, & just trust the Content-Type header. This leaves us with a risk that we're depending on library api behavior a little bit. Kindly let me know according to you which is the best approach. |
|
Ok so one step at the time. MQTT -> CloudEventFirst of all, i think you should remove the headers map you create and you should directly access to the For figuring out v3 vs v5 messages, I think we should simply do best effort: from the paho.mqtt rustdocs i see that you can always get the properties from a message https://docs.rs/paho-mqtt/0.8.0/paho_mqtt/message/struct.Message.html#method.properties and in case the CloudEvent -> MQTTIn this case, we should have an additional input in the serialization that explicits the mqtt version, like an enum Also note that i don't expect the user providing And the implementation would look like: Check out the actix-web, because it's a better one (I see that you probably looked at the Does that makes sense? Did I understood the problems here? |
Signed-off-by: Subhobrata Dey <sbcd90@gmail.com>
|
Hi @slinkydeveloper , Thanks a lot for your detailed review. Refactored the code accordingly. Can you please have a look now? |
Signed-off-by: Subhobrata Dey <sbcd90@gmail.com>
slinkydeveloper
left a comment
There was a problem hiding this comment.
To me this PR sounds good, @sbcd90 can you fix MUSL tests? Is paho-mqtt compatible with musl?
@Lazzaretti can you review this PR?
cloudevents-sdk-paho-mqtt/src/lib.rs
Outdated
| //! This library provides Mqtt protocol bindings for CloudEvents | ||
| //! using the [paho.mqtt.rust](https://github.com/eclipse/paho.mqtt.rust) library.\\ |
|
|
||
| match version { | ||
| MQTT_5 => { | ||
| self = self.properties(message_record.headers.clone()); |
There was a problem hiding this comment.
You can avoid clone destructuring message_record
| if let Some(s) = message_record.payload.as_ref() { | ||
| self = self.payload(s.to_vec()); | ||
| } |
There was a problem hiding this comment.
No need to perform this copy, after destructing (as in the above comment) use a pattern matching here so you get ownership of s
Signed-off-by: Subhobrata Dey <sbcd90@gmail.com>
Signed-off-by: Subhobrata Dey <sbcd90@gmail.com>
|
hi @slinkydeveloper , i fixed the remaining issues. |
|
@sbcd90 try to modify the github action here: https://github.com/cloudevents/sdk-rust/blob/master/.github/workflows/rust_tests.yml#L31 adding openssl dev package setup |
Signed-off-by: Subhobrata Dey <sbcd90@gmail.com>
Signed-off-by: Subhobrata Dey <sbcd90@gmail.com>
Signed-off-by: Subhobrata Dey <sbcd90@gmail.com>
Signed-off-by: Subhobrata Dey <sbcd90@gmail.com>
Signed-off-by: Subhobrata Dey <sbcd90@gmail.com>
Signed-off-by: Subhobrata Dey <sbcd90@gmail.com>
Signed-off-by: Subhobrata Dey <sbcd90@gmail.com>
| serde_json = "^1.0" | ||
| futures = "^0.3" | ||
| tokio = { version = "^0.2", features = ["full"] } | ||
| clap = "2.33.1" No newline at end of file |
There was a problem hiding this comment.
Here you need to add
[workspace]
Like https://github.com/cloudevents/sdk-rust/blob/master/example-projects/warp-example/Cargo.toml
Signed-off-by: Subhobrata Dey <sbcd90@gmail.com>
Signed-off-by: Subhobrata Dey <sbcd90@gmail.com>
Signed-off-by: Subhobrata Dey <sbcd90@gmail.com>
Signed-off-by: Subhobrata Dey <sbcd90@gmail.com>
Signed-off-by: Subhobrata Dey <sbcd90@gmail.com>
|
hi @slinkydeveloper , the builds are fixed now. |
| - Start the MQTT V3 Consumer | ||
|
|
||
| ``` | ||
| run --package <package-name> --bin <binary-name> -- --mode consumerV3 --broker tcp://localhost:1883 --topic test |
There was a problem hiding this comment.
what is run? can we have some more specific examples?
| openssl-sys = "*" | ||
| openssl = { version = "*", features = ["vendored"] } |
There was a problem hiding this comment.
may we should add some version restrictions?
There was a problem hiding this comment.
Good catch! Why in the main toml we need openssl?
| pub fn from_event(event: Event, version: &headers::MqttVersion) -> Result<Self> { | ||
| match version { | ||
| headers::MqttVersion::MQTT_5 => { | ||
| BinaryDeserializer::deserialize_binary(event, MessageRecord::new()) | ||
| } |
There was a problem hiding this comment.
As I understood MQTTv5 should support Binary and Structured Content Mode.
https://github.com/cloudevents/spec/blob/v1.0.1/mqtt-protocol-binding.md#32-structured-content-mode
So I would add a parameter to define the content mode (Binary or Structured) and not the MQTT version directly.
There was a problem hiding this comment.
More than version you would use the Encoding struct? sounds good
No description provided.