We encountered an issue now where a duplicate ID in system_pricing_plans.json caused an exception to be thrown in the PricingPlansUpdater (anonymized):
Caught exception in ForkJoinPool
java.lang.IllegalStateException: Duplicate key xxx (attempted merging values PricingPlan{xxx})
at java.base/java.util.stream.Collectors.duplicateKeyException(Unknown Source)
at java.base/java.util.stream.Collectors.lambda$uniqKeysMapAccumulator$1(Unknown Source)
at java.base/java.util.stream.ReduceOps$3ReducingSink.accept(Unknown Source)
at java.base/java.util.stream.ReferencePipeline$3$1.accept(Unknown Source)
at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(Unknown Source)
at java.base/java.util.stream.AbstractPipeline.copyInto(Unknown Source)
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(Unknown Source)
at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(Unknown Source)
at java.base/java.util.stream.AbstractPipeline.evaluate(Unknown Source)
at java.base/java.util.stream.ReferencePipeline.collect(Unknown Source)
at org.entur.lamassu.leader.entityupdater.PricingPlansUpdater.update(PricingPlansUpdater.java:51)
at org.entur.lamassu.leader.entityupdater.EntityCachesUpdater.updateEntityCaches(EntityCachesUpdater.java:86)
at org.entur.lamassu.leader.FeedUpdater.receiveV3Update(FeedUpdater.java:285)
at org.entur.lamassu.leader.FeedUpdater.lambda$createSubscription$3(FeedUpdater.java:177)
at org.entur.gbfs.loader.v2.GbfsV2Subscription.update(GbfsV2Subscription.java:131)
at java.base/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(Unknown Source)
at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown Source)
at java.base/java.util.concurrent.ForkJoinPool.scan(Unknown Source)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown Source)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source)
This code path is reached after the feed caches are updated, so the duplicates are visible in the gbfs api, but it causes the remaining entity updaters never to be reached. This was confusing because we then saw data in the gbfs api that was never updated in the graphql api. We should consider what strategy we want for these updates. A few options come to mind:
- Transactional updates: Only commit data to redis after the whole update sequence completes successfully. This has the benefit of strong consistency at the expense of added complexity
- Forgive and forget. Wrap each updater in a try-catch-block so that subsequent updaters can still run. This approach will ensure that stuff still works even if some non-critical problems appear in the data. But we risk that subsequent updaters that depend on earlier failed updaters will also fail because of missing data. We also risk bad data to users.
- Handle duplicates less stringently. I.e. instead of failing on duplicates, maybe log a warning or something and keep one of them. The downside of this is that it "fixes" a problem that is still present in the gbfs output.
For now, I'm leaning towards handling duplicates better, but the more general question is still relevant.
We encountered an issue now where a duplicate ID in system_pricing_plans.json caused an exception to be thrown in the PricingPlansUpdater (anonymized):
This code path is reached after the feed caches are updated, so the duplicates are visible in the gbfs api, but it causes the remaining entity updaters never to be reached. This was confusing because we then saw data in the gbfs api that was never updated in the graphql api. We should consider what strategy we want for these updates. A few options come to mind:
For now, I'm leaning towards handling duplicates better, but the more general question is still relevant.