-
Notifications
You must be signed in to change notification settings - Fork 636
Description
I am trying to use org.springframework.cloud:spring-cloud-function-rsocket on the server side, with:
@Bean
Function<String, Fixture> fixtures() {
return name -> Fixture.newBuilder().setName(name).build();
}
@Bean
RSocketStrategiesCustomizer strategiesCustomizer() {
return strategies -> strategies
.encoder(new org.springframework.http.codec.protobuf.ProtobufEncoder())
.decoder(new org.springframework.http.codec.protobuf.ProtobufDecoder());
}where Fixture is proto.
During execution, the following exception is shown:
org.springframework.messaging.handler.invocation.MethodArgumentResolutionException: Could not resolve method parameter at index 0 in public abstract R java.util.function.Function.apply(T): Cannot decode to [java.lang.Object]GenericMessage [payload=FluxPeekFuseable, headers={dataBufferFactory=NettyDataBufferFactory (PooledByteBufAllocator(directByDefault: true)), rsocketRequester=org.springframework.messaging.rsocket.DefaultRSocketRequester@83f96b6, rsocketResponse=null, lookupDestination=fixtures, contentType=application/octet-stream, rsocketFrameType=REQUEST_STREAM}], failedMessage=GenericMessage [payload=FluxPeekFuseable, headers={dataBufferFactory=NettyDataBufferFactory (PooledByteBufAllocator(directByDefault: true)), rsocketRequester=org.springframework.messaging.rsocket.DefaultRSocketRequester@83f96b6, rsocketResponse=null, lookupDestination=fixtures, contentType=application/octet-stream, rsocketFrameType=REQUEST_STREAM}]
Client is good, as working perfectly with the server-side using RSocketExchange:
public interface ProtobufService {
@RSocketExchange("fixtures")
Flux<Fixture> fixtures(String request);
}
@Controller
public class ProtobufController implements ProtobufService {
public Flux<Fixture> fixtures(String request) {
return Flux.just(Fixture.newBuilder().setName(request).build()).log();
}
}
client:
public interface Protobuf {
@RSocketExchange("fixtures")
Flux<Fixture> fixtures(@Payload String request);
}
@Bean
RSocketRequester requester(RSocketRequester.Builder builder) {
return builder
.rsocketConnector(configurer -> configurer.resume(new Resume()))
.rsocketStrategies(
configurer -> configurer
.encoder(new ProtobufEncoder())
.decoder(new ProtobufDecoder()))
.dataMimeType(MimeTypeUtils.APPLICATION_OCTET_STREAM)
.tcp("127.0.0.1", 2222);
}
@Bean
@SuppressWarnings("null")
RSocketServiceProxyFactory factory(RSocketRequester requester) {
return RSocketServiceProxyFactory.builder(requester).build();
}
@Bean
Protobuf services(RSocketRequester requester, RSocketServiceProxyFactory factory) {
return factory.createClient(Protobuf.class);
}
@Bean
Disposable protobuf(Protobuf services) {
return Flux.just("one", "two", "three", "four", "five")
.flatMap(services::fixtures)
.log("client-protobuf")
.subscribe();
}Any ideas?