How to safely cancel a gRPC ledger stream?
I’m using the java bindings, specifically the following stub to request completions from the ledger:
What I would like to know is how I can safely cancel this stream (without shutting down the entire GRPC channel).
I think this is a general question that can be applied to any stub returning a io.grpc.stub.StreamObserver.
Is there a specific reason why you are using the raw gRPC-generated code instead of the Java bindings?
As for raw gRPC-generated code, I would recommend you have a look at the gRPC docs here, there’s also a pointer to an example.
Hi, sorry for the late reply.
Is there a specific reason why you are using the raw gRPC-generated code instead of the Java bindings ?
I’m using a different streaming library, and I’ve had a mediocre experience with RX before. Also, I think RX1 has reach end-of-life already.
Thanks for the link; I will try and implement that.
I’m using a different streaming library, and I’ve had a mediocre experience with RX before.
What streaming library are you using and what advantages does it have over RxJava?
Also, I think RX1 has reach end-of-life already.
Indeed RxJava 2 (which is used by the Java bindings) is not actively developed anymore. The current plan is to transition to RxJava 3 to minimize disruptions and ensure a smooth transition to existing users.
Currently using fs2. It uses some scala-specific language features so won’t work in Java, I think. But the design is quite different. It has
- No side-effects, so it’s easier to reason about
- It’s pull-based (rather than push-based), making working with back-pressure much easier
- It has good concurrency primitives.
The previous experience I had with rx is that it was good for asynchronous programming, but it was lacking as soon as you started working with concurrency. But tbh that was RX1, quite a while ago.
Got it, thank you!
I would recommend you have a look at the gRPC docs here ,
As pointed out by stefano, there is an example of how to do this in java. The key is that in the daml API we provide a ResponseObserver which doesn’t have a cancel method, but it can be cast into a a ClientCall StreamObserver , as explained in this comment:
grpc/grpc-java/blob/41552bfd9a48da780fddacecd99348ea27217676/examples/src/main/java/io/grpc/examples/cancellation/CancellationClient.java#L153-L154
- // Unary and server-streaming stub methods don't return a StreamObserver. For such RPCs, you can
- // use ClientResponseObserver to get the ClientCallStreamObserver. For example:
This provides access to a cancel() method, which does what we want.