-
-
Notifications
You must be signed in to change notification settings - Fork 692
fix(android,ios): add thread safety to listener operations #3157
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,154 @@ | ||
| package com.margelo.nitro.iap | ||
|
|
||
| import org.junit.Test | ||
| import org.junit.Assert.* | ||
| import java.util.concurrent.CyclicBarrier | ||
| import java.util.concurrent.atomic.AtomicInteger | ||
| import java.util.concurrent.atomic.AtomicReference | ||
| import kotlin.concurrent.thread | ||
|
|
||
| /** | ||
| * Thread safety tests for the synchronized + snapshot listener pattern | ||
| * used in HybridRnIap to prevent ConcurrentModificationException. | ||
| * | ||
| * Addresses Issue #3150 where purchase events were silently lost | ||
| * due to concurrent listener access. | ||
| */ | ||
| class ListenerThreadSafetyTest { | ||
|
|
||
| @Test | ||
| fun `concurrent add and snapshot iterate does not throw`() { | ||
| val listeners = mutableListOf<(String) -> Unit>() | ||
| val callCount = AtomicInteger(0) | ||
| val errorRef = AtomicReference<Throwable?>(null) | ||
| val iterations = 500 | ||
| val barrier = CyclicBarrier(2) | ||
|
|
||
| val adder = thread { | ||
| barrier.await() | ||
| repeat(iterations) { | ||
| synchronized(listeners) { listeners.add { callCount.incrementAndGet() } } | ||
| } | ||
| } | ||
|
|
||
| val sender = thread { | ||
| barrier.await() | ||
| repeat(iterations) { | ||
| val snapshot = synchronized(listeners) { ArrayList(listeners) } | ||
| snapshot.forEach { | ||
| try { | ||
| it("event") | ||
| } catch (e: Throwable) { | ||
| errorRef.compareAndSet(null, e) | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| adder.join(5000) | ||
| sender.join(5000) | ||
| assertFalse("Adder thread did not finish", adder.isAlive) | ||
| assertFalse("Sender thread did not finish", sender.isAlive) | ||
| assertNull("Should not throw ConcurrentModificationException", errorRef.get()) | ||
| } | ||
|
|
||
| @Test | ||
| fun `concurrent add, remove, and iterate is safe`() { | ||
| val listeners = mutableListOf<(String) -> Unit>() | ||
| val errorRef = AtomicReference<Throwable?>(null) | ||
| val barrier = CyclicBarrier(3) | ||
|
|
||
| val adder = thread { | ||
| barrier.await() | ||
| repeat(200) { | ||
| synchronized(listeners) { listeners.add { _ -> } } | ||
| } | ||
| } | ||
|
|
||
| val remover = thread { | ||
| barrier.await() | ||
| repeat(200) { | ||
| synchronized(listeners) { | ||
| if (listeners.isNotEmpty()) listeners.removeAt(0) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| val sender = thread { | ||
| barrier.await() | ||
| repeat(200) { | ||
| val snapshot = synchronized(listeners) { ArrayList(listeners) } | ||
| snapshot.forEach { | ||
| try { | ||
| it("event") | ||
| } catch (e: Throwable) { | ||
| errorRef.compareAndSet(null, e) | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| adder.join(5000) | ||
| remover.join(5000) | ||
| sender.join(5000) | ||
| assertFalse("Adder thread did not finish", adder.isAlive) | ||
| assertFalse("Remover thread did not finish", remover.isAlive) | ||
| assertFalse("Sender thread did not finish", sender.isAlive) | ||
| assertNull("Concurrent access should be safe with snapshot pattern", errorRef.get()) | ||
| } | ||
|
|
||
| @Test | ||
| fun `snapshot delivers to all registered listeners`() { | ||
| val listeners = mutableListOf<(String) -> Unit>() | ||
| val results = mutableListOf<String>() | ||
|
|
||
| synchronized(listeners) { | ||
| listeners.add { results.add("listener1:$it") } | ||
| listeners.add { results.add("listener2:$it") } | ||
| } | ||
|
|
||
| val snapshot = synchronized(listeners) { ArrayList(listeners) } | ||
| snapshot.forEach { it("event") } | ||
|
|
||
| assertEquals(2, results.size) | ||
| assertTrue(results.contains("listener1:event")) | ||
| assertTrue(results.contains("listener2:event")) | ||
| } | ||
|
|
||
| @Test | ||
| fun `synchronized clear removes all listeners`() { | ||
| val listeners = mutableListOf<(String) -> Unit>() | ||
| synchronized(listeners) { | ||
| listeners.add { _ -> } | ||
| listeners.add { _ -> } | ||
| } | ||
|
|
||
| synchronized(listeners) { listeners.clear() } | ||
|
|
||
| val snapshot = synchronized(listeners) { ArrayList(listeners) } | ||
| assertTrue("Listeners should be empty after clear", snapshot.isEmpty()) | ||
| } | ||
|
|
||
| @Test | ||
| fun `snapshot is isolated from subsequent modifications`() { | ||
| val listeners = mutableListOf<(String) -> Unit>() | ||
| val results = mutableListOf<String>() | ||
|
|
||
| synchronized(listeners) { | ||
| listeners.add { results.add("original:$it") } | ||
| } | ||
|
|
||
| // Take snapshot before adding more listeners | ||
| val snapshot = synchronized(listeners) { ArrayList(listeners) } | ||
|
|
||
| // Add another listener after snapshot | ||
| synchronized(listeners) { | ||
| listeners.add { results.add("added-after:$it") } | ||
| } | ||
|
|
||
| // Only the original listener should be in the snapshot | ||
| snapshot.forEach { it("event") } | ||
| assertEquals(1, results.size) | ||
| assertEquals("original:event", results[0]) | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -23,7 +23,9 @@ class HybridRnIap: HybridRnIapSpec { | |
| private var deliveredPurchaseEventOrder: [String] = [] | ||
| private let purchaseEventDedupLimit = 128 | ||
| private var purchasePayloadById: [String: [String: Any]] = [:] | ||
|
|
||
| // Thread safety lock for listener arrays and error dedup state | ||
| private let listenerLock = NSLock() | ||
|
|
||
| // MARK: - Initialization | ||
|
|
||
| override init() { | ||
|
|
@@ -851,8 +853,8 @@ class HybridRnIap: HybridRnIapSpec { | |
| } | ||
|
|
||
| func addPromotedProductListenerIOS(listener: @escaping (NitroProduct) -> Void) throws { | ||
| promotedProductListeners.append(listener) | ||
| listenerLock.withLock { promotedProductListeners.append(listener) } | ||
|
|
||
| // If a promoted product is already available from OpenIAP, notify immediately | ||
| Task { | ||
| RnIapLog.payload("promotedProductListenerIOS.fetch", nil) | ||
|
|
@@ -864,33 +866,27 @@ class HybridRnIap: HybridRnIapSpec { | |
| } | ||
| } | ||
| } | ||
|
|
||
| func removePromotedProductListenerIOS(listener: @escaping (NitroProduct) -> Void) throws { | ||
| // Note: In Swift, comparing closures is not straightforward, so we'll clear all listeners | ||
| // In a real implementation, you might want to use a unique identifier for each listener | ||
| promotedProductListeners.removeAll() | ||
| listenerLock.withLock { promotedProductListeners.removeAll() } | ||
| } | ||
|
coderabbitai[bot] marked this conversation as resolved.
|
||
|
|
||
| // MARK: - Event Listener Methods | ||
|
|
||
| func addPurchaseUpdatedListener(listener: @escaping (NitroPurchase) -> Void) throws { | ||
| purchaseUpdatedListeners.append(listener) | ||
| listenerLock.withLock { purchaseUpdatedListeners.append(listener) } | ||
| } | ||
|
|
||
| func addPurchaseErrorListener(listener: @escaping (NitroPurchaseResult) -> Void) throws { | ||
| purchaseErrorListeners.append(listener) | ||
| listenerLock.withLock { purchaseErrorListeners.append(listener) } | ||
| } | ||
|
|
||
| func removePurchaseUpdatedListener(listener: @escaping (NitroPurchase) -> Void) throws { | ||
| // Note: This is a limitation of Swift closures - we can't easily remove by reference | ||
| // For now, we'll just clear all listeners when requested | ||
| purchaseUpdatedListeners.removeAll() | ||
| listenerLock.withLock { purchaseUpdatedListeners.removeAll() } | ||
| } | ||
|
Comment on lines
884
to
886
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This method, along with the other The JS layer and the newly added tests ( To properly resolve this, each listener needs to be tracked individually. Since Swift closures cannot be directly compared, a common approach is to associate each listener with a unique identifier. For example, listeners could be wrapped in a class that conforms to Alternative approach:
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good catch. The The JS layer manages individual listener mapping via However, in Swift, closures passed through the Nitro bridge create new wrapper instances on each call, so the native
That said, this is a valid limitation for advanced multi-listener use cases. A token/ID-based approach would require Nitro interface changes and is better addressed in a separate PR. Filed as a follow-up consideration. |
||
|
|
||
| func removePurchaseErrorListener(listener: @escaping (NitroPurchaseResult) -> Void) throws { | ||
| // Note: This is a limitation of Swift closures - we can't easily remove by reference | ||
| // For now, we'll just clear all listeners when requested | ||
| purchaseErrorListeners.removeAll() | ||
| listenerLock.withLock { purchaseErrorListeners.removeAll() } | ||
| } | ||
|
|
||
| // MARK: - Private Helper Methods | ||
|
|
@@ -954,20 +950,22 @@ class HybridRnIap: HybridRnIapSpec { | |
| RnIapLog.result("fetchProducts", payloads) | ||
| if let payload = payloads.first { | ||
| let nitro = RnIapHelper.convertProductDictionary(payload) | ||
| let snapshot = self.listenerLock.withLock { Array(self.promotedProductListeners) } | ||
| await MainActor.run { | ||
| for listener in self.promotedProductListeners { listener(nitro) } | ||
| for listener in snapshot { listener(nitro) } | ||
| } | ||
| } | ||
| } catch { | ||
| RnIapLog.failure("promotedProductListenerIOS", error: error) | ||
| let id = productId | ||
| let snapshot = self.listenerLock.withLock { Array(self.promotedProductListeners) } | ||
| await MainActor.run { | ||
| var minimal = NitroProduct() | ||
| minimal.id = id | ||
| minimal.title = id | ||
| minimal.type = "inapp" | ||
| minimal.platform = .ios | ||
| for listener in self.promotedProductListeners { listener(minimal) } | ||
| for listener in snapshot { listener(minimal) } | ||
| } | ||
| } | ||
| } | ||
|
|
@@ -992,44 +990,59 @@ class HybridRnIap: HybridRnIapSpec { | |
| ] | ||
| let eventKey = keyComponents.joined(separator: "#") | ||
|
|
||
| if deliveredPurchaseEventKeys.contains(eventKey) { | ||
| RnIapLog.warn("Duplicate purchase update skipped for \(purchase.productId)") | ||
| return | ||
| var isDuplicate = false | ||
| let snapshot: [(NitroPurchase) -> Void] = listenerLock.withLock { | ||
| if deliveredPurchaseEventKeys.contains(eventKey) { | ||
| isDuplicate = true | ||
| return [] | ||
| } | ||
|
|
||
| deliveredPurchaseEventKeys.insert(eventKey) | ||
| deliveredPurchaseEventOrder.append(eventKey) | ||
| if deliveredPurchaseEventOrder.count > purchaseEventDedupLimit, let removed = deliveredPurchaseEventOrder.first { | ||
| deliveredPurchaseEventOrder.removeFirst() | ||
| deliveredPurchaseEventKeys.remove(removed) | ||
| } | ||
|
|
||
| return Array(purchaseUpdatedListeners) | ||
| } | ||
|
|
||
| deliveredPurchaseEventKeys.insert(eventKey) | ||
| deliveredPurchaseEventOrder.append(eventKey) | ||
| if deliveredPurchaseEventOrder.count > purchaseEventDedupLimit, let removed = deliveredPurchaseEventOrder.first { | ||
| deliveredPurchaseEventOrder.removeFirst() | ||
| deliveredPurchaseEventKeys.remove(removed) | ||
| if isDuplicate { | ||
| RnIapLog.warn("Duplicate purchase update skipped for \(purchase.productId)") | ||
| return | ||
| } | ||
|
|
||
| for listener in purchaseUpdatedListeners { | ||
| for listener in snapshot { | ||
| listener(purchase) | ||
| } | ||
| } | ||
|
|
||
| private func sendPurchaseError(_ error: NitroPurchaseResult, productId: String? = nil) { | ||
| let now = Date().timeIntervalSince1970 | ||
| let dedupIdentifier = productId | ||
| ?? (error.purchaseToken?.isEmpty == false ? error.purchaseToken : nil) | ||
| ?? (error.message.isEmpty ? nil : error.message) | ||
| let currentKey = RnIapHelper.makeErrorDedupKey(code: error.code, productId: dedupIdentifier) | ||
| // Dedup only when the exact same error is emitted almost simultaneously. | ||
| let withinWindow = (now - lastPurchaseErrorTimestamp) < 0.15 | ||
| if currentKey == lastPurchaseErrorKey && withinWindow { | ||
| return | ||
| } | ||
|
|
||
| lastPurchaseErrorKey = currentKey | ||
| lastPurchaseErrorTimestamp = now | ||
| // Protect error dedup state since sendPurchaseError is called from multiple threads | ||
| let shouldSkip: Bool = listenerLock.withLock { | ||
| let now = Date().timeIntervalSince1970 | ||
| let withinWindow = (now - lastPurchaseErrorTimestamp) < 0.15 | ||
| if currentKey == lastPurchaseErrorKey && withinWindow { | ||
| return true | ||
| } | ||
| lastPurchaseErrorKey = currentKey | ||
| lastPurchaseErrorTimestamp = now | ||
| return false | ||
| } | ||
| if shouldSkip { return } | ||
|
|
||
| // Ensure we never leak SKU via purchaseToken | ||
| var sanitized = error | ||
| if let pid = productId, sanitized.purchaseToken == pid { | ||
| sanitized.purchaseToken = nil | ||
| } | ||
| for listener in purchaseErrorListeners { | ||
| let snapshot = listenerLock.withLock { Array(purchaseErrorListeners) } | ||
| for listener in snapshot { | ||
| listener(sanitized) | ||
| } | ||
| } | ||
|
|
@@ -1067,15 +1080,17 @@ class HybridRnIap: HybridRnIapSpec { | |
| RnIapLog.result("endConnection", result as Any) | ||
| } | ||
|
|
||
| // Clear event listeners | ||
| purchaseUpdatedListeners.removeAll() | ||
| purchaseErrorListeners.removeAll() | ||
| promotedProductListeners.removeAll() | ||
| deliveredPurchaseEventKeys.removeAll() | ||
| deliveredPurchaseEventOrder.removeAll() | ||
| purchasePayloadById.removeAll() | ||
| lastPurchaseErrorKey = nil | ||
| lastPurchaseErrorTimestamp = 0 | ||
| // Clear event listeners, error dedup state, and delivery state (thread-safe) | ||
| listenerLock.withLock { | ||
| purchaseUpdatedListeners.removeAll() | ||
| purchaseErrorListeners.removeAll() | ||
| promotedProductListeners.removeAll() | ||
| lastPurchaseErrorKey = nil | ||
| lastPurchaseErrorTimestamp = 0 | ||
| deliveredPurchaseEventKeys.removeAll() | ||
| deliveredPurchaseEventOrder.removeAll() | ||
| purchasePayloadById.removeAll() | ||
| } | ||
|
coderabbitai[bot] marked this conversation as resolved.
|
||
| } | ||
|
|
||
| func deepLinkToSubscriptionsAndroid(options: NitroDeepLinkOptionsAndroid) throws -> Promise<Void> { | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.