Skip to content
Discussions/App Development/Using offsetsForum ↗

Using offsets

App Development8 posts905 views11 likesLast activity Oct 2022
PE
perbergmanOP
Dec 2020

Hi!

I want to be crash-safe so I save the last consumed offset processed in an event listener.
How do I calculate the next offset to start reading from, as I don’t want to reprocess the event already handled?

ST
stefanobaghino-da
Dec 2020

In general, you can’t:

The format of absolute offsets is opaque to the client: no client-side transformation of an offset is guaranteed to return a meaningful offset.

(source: Ledger API Reference — DAML SDK 1.7.0 documentation)

But the good news is that the starting offset you pass in requests is exclusive, meaning that you can safely memorize the last offset you received and use it to ask for events following the one for which you saved a valid offset. Note that the ending offset (if provided) is inclusive instead.

I’ll have a better look at the documentation, but it appears that this is in fact poorly documented.

Thanks for the good question!

ST
stefanobaghino-da
Dec 2020

I created this PR to document it. If you believe there are other parts of the documentation that can be improved in this regard, please add your review. :bowing_man:

PE
perbergman
Dec 2020

Thanks!

Works perfectly fine, as long as one understands the threading model in rxjava :slight_smile:

“If you have no problems, buy a goat.”

Per

TA
Tamas_Kalcza
Oct 2022

I supposes this means that this Transaction is not a ledger offset and cannot be used like this new LedgerOffset.Absolute(x.getOffset()). Am I right?

If so, how I can get the ledger offsets in a reliable way? I’m using the TransactionClient to get the stream of transactions.

PE
perbergman
Oct 2022

I do like this: (and others do the same)


 var currentOffset = state.getOffset();
        var lo = (currentOffset == null) ? LedgerOffset.LedgerBegin.getInstance() : new LedgerOffset.Absolute(currentOffset);

transactionsClient.getTransactions(lo, transactionFilter, isLedgerVerbose, tokenVault.getSystemContext().getBearer())
                .subscribe(t -> {
                    log.info("PROCESSOR got transaction in thread " + Thread.currentThread() + " with " + t.getEvents().size() + " events, offset " + t.getOffset());
                    t.getEvents().forEach(e -> {
                        JsonElement je = gson.toJsonTree(e);
                        kafkaServiceImpl.send(je);
                    });
                    state.setOffset(t.getOffset());
                }, e -> {
                    log.error("flow failure, restart", e);
                    this.runIndefinitely(); // restart
                });
TA
Tamas_Kalcza
Oct 2022

I get that, in fact I have something like this as well. I guess the real question is that does this mean Transaction returns an offset? If so, why is the return type not LedgerOffset?

PE
perbergman
Oct 2022

Yes, the transaction returns an offset. I haven’t checked the exact type.

state.setOffset(t.getOffset());

← Back to Discussions