Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions Sources/AsyncSubjects/AsyncCurrentValueSubject.swift
Original file line number Diff line number Diff line change
Expand Up @@ -91,20 +91,19 @@ public final class AsyncCurrentValueSubject<Element>: AsyncSubject where Element
func handleNewConsumer() -> (iterator: AsyncBufferedChannel<Element>.Iterator, unregister: @Sendable () -> Void) {
let asyncBufferedChannel = AsyncBufferedChannel<Element>()

let (terminalState, current) = self.state.withCriticalRegion { state -> (Termination?, Element) in
(state.terminalState, state.current)
let terminalState = self.state.withCriticalRegion { state -> Termination? in
state.terminalState
}

if let terminalState = terminalState, terminalState.isFinished {
asyncBufferedChannel.finish()
return (asyncBufferedChannel.makeAsyncIterator(), {})
}

asyncBufferedChannel.send(current)

let consumerId = self.state.withCriticalRegion { state -> Int in
state.ids += 1
state.channels[state.ids] = asyncBufferedChannel
asyncBufferedChannel.send(state.current)
return state.ids
Comment on lines 103 to 107
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Avoid resuming consumers while holding subject lock

Calling asyncBufferedChannel.send(state.current) while inside state.withCriticalRegion can synchronously resume an awaiting consumer. If that consumer’s code touches the subject again (e.g., reads value or calls send), it will try to re-enter the same ManagedCriticalState that uses a non‑reentrant os_unfair_lock, leading to a deadlock. This only occurs when a consumer awaits the channel and then re-enters the subject on receipt, but that pattern is common in for‑await loops.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Contributor

@twittemb twittemb Jan 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@codex What do you suggest that solves the initial issue while avoiding re-entrency ?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@codex What do you suggest that solves the initial issue while avoiding re-entrency ?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Codex couldn't complete this request. Try again later.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@codex What do you suggest that solves the initial issue while avoiding re-entrency ?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Codex couldn't complete this request. Try again later.

}

Expand Down
7 changes: 3 additions & 4 deletions Sources/AsyncSubjects/AsyncThrowingCurrentValueSubject.swift
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ public final class AsyncThrowingCurrentValueSubject<Element, Failure: Error>: As
) -> (iterator: AsyncThrowingBufferedChannel<Element, Error>.Iterator, unregister: @Sendable () -> Void) {
let asyncBufferedChannel = AsyncThrowingBufferedChannel<Element, Error>()

let (terminalState, current) = self.state.withCriticalRegion { state -> (Termination?, Element) in
(state.terminalState, state.current)
let terminalState = self.state.withCriticalRegion { state -> Termination? in
state.terminalState
}

if let terminalState = terminalState {
Expand All @@ -111,11 +111,10 @@ public final class AsyncThrowingCurrentValueSubject<Element, Failure: Error>: As
return (asyncBufferedChannel.makeAsyncIterator(), {})
}

asyncBufferedChannel.send(current)

let consumerId = self.state.withCriticalRegion { state -> Int in
state.ids += 1
state.channels[state.ids] = asyncBufferedChannel
asyncBufferedChannel.send(state.current)
return state.ids
Comment on lines 114 to 118
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Avoid resuming consumers while holding subject lock

Calling asyncBufferedChannel.send(state.current) while inside state.withCriticalRegion can synchronously resume an awaiting consumer. If that consumer’s code touches the subject again (e.g., reads value or calls send), it will try to re-enter the same ManagedCriticalState that uses a non‑reentrant os_unfair_lock, leading to a deadlock. This only occurs when a consumer awaits the channel and then re-enters the subject on receipt, but that pattern is common in for‑await loops.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@codex What do you suggest that solves the initial issue while avoiding re-entrency ?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@codex is there a real risk of deadlock here ?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Codex Review: Didn't find any major issues. 👍

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

}

Expand Down