Skip to content
Discussions/App Development/RxJava Error Handling - How to add error callbackForum ↗

RxJava Error Handling - How to add error callback

App Development15 posts4,539 views1 likesLast activity Nov 2022
LI
liavOP
Mar 2022

Hi there,

While coding an RxJava based program i’m facing an Exception that contain some recommendation of better handling:


io.reactivex.exceptions.OnErrorNotImplementedException: **The exception was not handled due to missing onError handler in the subscribe() method call.** Further reading: [https://github.com/ReactiveX/RxJava/wiki/Error-Handling](https://github.com/ReactiveX/RxJava/wiki/Error-Handling) | java.lang.RuntimeException: java.util.concurrent.ExecutionException: io.grpc.StatusRuntimeException: NOT_FOUND: CONTRACT_NOT_FOUND(11,d8368d0f): Contract could not be found with id 0054ff9fd1c537ece135de287c8fa14ff169ec9d6e16796ff7fe40562b6fb5d441
	at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:704)
	at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:701)
	at io.reactivex.internal.subscribers.LambdaSubscriber.onError(LambdaSubscriber.java:79)
	at io.reactivex.internal.subscribers.LambdaSubscriber.onNext(LambdaSubscriber.java:69)
	at io.reactivex.internal.operators.flowable.FlowableFlattenIterable$FlattenIterableSubscriber.drain(FlowableFlattenIterable.java:312)
	at io.reactivex.internal.operators.flowable.FlowableFlattenIterable$FlattenIterableSubscriber.onNext(FlowableFlattenIterable.java:174)
	at io.reactivex.internal.operators.flowable.FlowableMap$MapSubscriber.onNext(FlowableMap.java:68)
	at com.daml.grpc.adapter.client.rs.BufferingResponseObserver.lambda$onNext$2(BufferingResponseObserver.java:60)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.RuntimeException: java.util.concurrent.ExecutionException: io.grpc.StatusRuntimeException: NOT_FOUND: CONTRACT_NOT_FOUND(11,d8368d0f): Contract could not be found with id 0054ff9fd1c537ece135de287c8fa14ff169ec9d6e16796ff7fe40562b6fb5d441
	at io.reactivex.internal.util.ExceptionHelper.wrapOrThrow(ExceptionHelper.java:46)

The thing is that the docs explain how to enhance the “Observe” and “Subscribe” method with callbacks (such as recommended above) but in my case i’m not explicitly subscribing nor observing but rather fetching the transactions and iterating over it like this:

private void processTransaction(Transaction tx) {
        logger.info("processTransaction: {}", tx.toString());

        List<Event> exerciseEvents = tx.getEvents();

        List<Command> exerciseCommands = exerciseEvents.stream()
                            .filter(e -> e instanceof CreatedEvent)
                            .map(e -> (CreatedEvent) e)
                            .flatMap(e -> processEvent(tx.getWorkflowId(), e))
                            .collect(Collectors.toList());

        if (!exerciseCommands.isEmpty()) {
            client.getCommandClient().submitAndWait(
                    tx.getWorkflowId(), APPLICATION_ID, UUID.randomUUID().toString(), tpaPartyId, exerciseCommands)
                    .blockingGet();
        }
    }

So how do i still enhance my flow to catch and handle such Exception?

ST
stefanobaghino-da
Mar 2022

I think there are manye different concepts shown (both implicitly and explicitly) in the code you are sharing. I’ll go through them separately:

  1. the processTransaction method has a transaction as its input. This transaction comes from a service that allows you to query the Ledger API. The result of this call is returned to you as an RxJava stream on which you can use the methods described in the documentation you linked in your post
  2. the entirety of the content of the processTransaction method makes use of methods that either use a Java stream (not to be confused with an RxJava stream) or invoke a blocking method on an RxJava stream (the blockingGet in the third to last line in the snippet you shared) – in this case, exception handling can happen as it normally would in any Java application, by wrapping a portion of code with the appropriate try ... catch block.
  3. if you were to use the output of submitAndWait in a non-blocking fashion (i.e. not using the blockingGet method), its result type is Single, which you can see as an RxJava stream that ends after a single element is returned – as such, you could be using those same method for exception handling provided by RxJava

One final note: our example contains that usage of blockingGet for simplicity but it’s probably not necessarily something you want to do in a production application, as RxJava is designed for you to always work in this “stream processing” world where nothing ever stops. Do not take that example as a model for what you should be doing using reactive stream libraries like RxJava. I would recommend you keep learning more about RxJava through their documentation and use the accepted patterns they suggest.

LI
liav
Mar 2022

Thanks @stefanobaghino-da ,

Just to make sure i follow you: Your #1 advise is to add one of those error handlers as shown in below?

ST
stefanobaghino-da
Mar 2022

It depends on what you need to do. If you need a simple callback, doOnError will do. But it depends entirely on what kind of exception you want to handle and what kind of response you expect the application to have in such case. The documentation you linked is a good source of information. Doing some experimentation can help you think better about what the different exception handling methods do and how they can help you implement the exception handling logic you need.

LI
liav
Mar 2022

Hi @stefanobaghino-da ,

Thanks to the help of @cocreature i have successfully coded an application that performs as an observer by subscribing to the Ledger transaction stream.

However, during the few first seconds, after all the transactions are streamed to my end by the observable (Ledger) i encounter an Exception (below) that terminates the stream and prevents from my program to do the one thing it was designed to: listen to new Ledger’s contracts (of a certain template)

I’m trying to temporarily swallow the error in order to prevent from the stream to break. For that i added below callback but it never gets called. I followed the docs but cant seem to find a lead

io.reactivex.exceptions.OnErrorNotImplementedException: The exception was not handled due to missing onError handler in the subscribe() method call. Further reading: https://github.com/ReactiveX/RxJava/wiki/Error-Handling | io.grpc.StatusRuntimeException: INTERNAL: RST_STREAM closed stream. HTTP/2 error code: INTERNAL_ERROR
	at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:704)
	at io.reactivex.internal.functions.Functions$OnErrorMissingConsumer.accept(Functions.java:701)
	at io.reactivex.internal.subscribers.LambdaSubscriber.onError(LambdaSubscriber.java:79)
	at io.reactivex.internal.operators.flowable.FlowableFlattenIterable$FlattenIterableSubscriber.checkTerminated(FlowableFlattenIterable.java:395)
	at io.reactivex.internal.operators.flowable.FlowableFlattenIterable$FlattenIterableSubscriber.drain(FlowableFlattenIterable.java:255)
	at io.reactivex.internal.operators.flowable.FlowableFlattenIterable$FlattenIterableSubscriber.onError(FlowableFlattenIterable.java:181)
	at io.reactivex.internal.subscribers.BasicFuseableSubscriber.onError(BasicFuseableSubscriber.java:101)
	at com.daml.grpc.adapter.client.rs.BufferingResponseObserver.lambda$onError$3(BufferingResponseObserver.java:81)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: io.grpc.StatusRuntimeException: INTERNAL: RST_STREAM closed stream. HTTP/2 error code: INTERNAL_ERROR
	at io.grpc.Status.asRuntimeException(Status.java:535)
17:41:03.366 [client-0] ERROR c.d.g.a.SingleThreadExecutionSequencer - Unhandled exception in SingleThreadExecutionSequencer

My code:

public void runIndefinitely() {

        Flowable<Transaction> transactions = client.getTransactionsClient().getTransactions(
                LedgerOffset.LedgerBegin.getInstance(),
                new FiltersByParty(Collections.singletonMap(tpaPartyId, NoFilter.instance)),
                true, DownloadPolicyPagesFromLedger.tpaAccessToken);

        transactions.doOnError(throwable -> handleErrors(throwable) );

        transactions.onExceptionResumeNext(throwable -> System.err.println("do nothing"));

        transactions.forEach(this::processTransaction);
    }

    private void handleErrors(Throwable throwable) {
        System.err.println("Error Occurred on transaction Observable: "+ throwable.getMessage());
    }
LE
Leonid_Rozenberg
Mar 2022
liav:

https://github.com/ReactiveX/RxJava/wiki/Error-Handling

The documentation for OnErrorNotImplementedException states that

This indicates that an Observable tried to call its observer’s onError() method, but that no such method existed. You can eliminate this by either fixing the Observable so that it no longer reaches an error condition, by implementing an onError handler in the observer, or by intercepting the onError notification before it reaches the observer by using one of the operators described elsewhere on this page.

I wonder if you’re using the right error handler.

LI
liav
Mar 2022

Hi @Leonid_Rozenberg

After some learning and as i wrote over the slack, the reason for the doOnError callback not being invoked was that i haven’t chained it to the returned instance of the transaction stream. This callback however doesn’t process the Exception or swallows it but rather perform as an aspect to allow the client to “do something”

The onExceptionResumeNext will swallow it but it will still break the stream from being alive.

According to gRPC docs this is a server side issue.

INTERNAL: RST_STREAM closed stream. HTTP/2 error code: INTERNAL_ERROR

After debugging my client i saw the below that might lead to the reason why the Observable Ledger shut down the stream (no more elements):

image

How do gain access to the Ledger logs in order to investigate the reason?

gRPC forum on this exact error

gRPC status codes (checkout #13)

LI
liav
Mar 2022

Anything? Anyone?

ST
stefanobaghino-da
Mar 2022
liav:

How do gain access to the Ledger logs in order to investigate the reason?

Are you using Daml Hub or do you operate your own participant node attached to some ledger?

LI
liav
Mar 2022

Hi @stefanobaghino-da , I’m using Daml Hub

DT
dtanabe
Mar 2022

Apologies for the delay; there appears to be an issue with gRPC connections that causes them to be closed after one minute. We’re currently looking into this and will update this post with more information.

You should definitely also implement retry logic in your code, though, as over any network, it’s possible for any number of reasons for services to temporarily lose connectivity with each other.

RA
Ravi_Bhupatani
Jul 2022

Hi @dtanabe
I am also looking for a solution to gRPC Server errors which completely disconnects daml CompletionStreams/TransactionStreams at client.

I noticed the gRPC has following retry mechanism implemented.

github.com

grpc/proposal/blob/6070a6b5cd1c014e0b5be54701ca07b8fb1c128c/A6-client-retries.md

gRPC Retry Design
----
* Author(s): [Noah Eisen](https://github.com/ncteisen) and [Eric Gribkoff](https://github.com/ericgribkoff)
* Approver: a11r
* Status: Implemented
* Implemented in: Java, .NET, Go except hedging, and C-Core except hedging
* Last updated: 2022-02-14
* Discussion at: https://groups.google.com/forum/#!topic/grpc-io/zzHIICbwTZE

Table of Contents
----
  * [Abstract](#abstract)
  * [Background](#background)
  * [Proposal](#proposal)
     * [Overview](#overview)
     * [Detailed Design](#detailed-design)
        * [Retry Policy Capabilities](#retry-policy-capabilities)
           * [Maximum Number of Retries](#maximum-number-of-retries)
           * [Exponential Backoff](#exponential-backoff)
           * [Retryable Status Codes](#retryable-status-codes)
This file has been truncated. show original

Is daml library capable to configure the retries like mentioned here?

or is there any alternate mechanism we can use for daml java ledger apis?

DI
Diptesh_Chakraborty
Nov 2022

What was the final conclusion of the error callback?

transactionsTrees.subscribe(tx -> processTransactionTree(tx,partyId), e -> processError(e));

Even with the above subscription to error callback, I am getting a similar error like io.reactivex.exceptions.OnErrorNotImplementedException:

Do I need to implement the transactionsTrees.doOnError(Consumer<? super Throwable> onError) ?

ST
stefanobaghino-da
Nov 2022

I think it makes sense to process errors as part of a method devoted to that. Apart from doOnError, there are more operators available for you to handle errors in specific ways. You can read more about them here.

DI
Diptesh_Chakraborty
Nov 2022

transactionsTrees.retryWhen(retryHandler → retryHandler.flatMap(err →
observeRetries(err,partyId))).subscribe(tx →
processTransactionTree(tx,partyId),
e → processError(e));

I have not been able to invoke the observeRetries API when the publisher/flowable is returning UNAVAILABLE with status code as 503. I intended to handle the retry strategy based on error codes.

Do I need to append onErrorResumeNext in order to trap the publisher error and retry?

← Back to Discussions