Skip to content

IBIResearch/BidirectionalChannels.jl

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

8 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

BidirectionalChannels

Build Status

codecov.io

This package provides a lightweight, duplex channel abstraction for task-to-task communication. A BidirectionalChannel represents one endpoint; constructing a second BidirectionalChannel from the first yields the opposite endpoint that shares the same underlying channels.

Installation

Within Julia, use the package manager:

using Pkg
Pkg.add("BidirectionalChannels")

Usage

Create an endpoint and its opposite, then exchange messages from both sides:

using BidirectionalChannels

# Create endpoint A with buffer size 2
A = BidirectionalChannel{Int}(2)

# Create the opposite endpoint B (shares A's underlying channels)
B = BidirectionalChannel(A)

# Two tasks exchanging messages
tA = @async begin
    put!(A, 3)                # A -> B
    val_from_B = take!(A)     # A <- B
    @info "A received" val_from_B
end

tB = @async begin
    val_from_A = take!(B)     # B <- A
    @info "B received" val_from_A
    put!(B, 3*14)               # B -> A
end

wait(tA)

# Close both directions
close(A)
isopen(B) # false

You can choose a different channel implementation (must subtype AbstractChannel{T}) and set buffer size:

A = BidirectionalChannel{String}(32, Channel)
B = BidirectionalChannel(A)

It's also possible to use RemoteChannel from Distributed:

using Distributed
addprocs(1)  # ensure worker 2 exists

@everywhere using BidirectionalChannels

# Build an endpoint where:

# - inbound channel lives on worker 1 (main)

# - outbound channel lives on worker 2

A = BidirectionalChannel() do
    in  = RemoteChannel(() -> Channel{Vector{Int}}(2), 1)  # A.in on pid 1
    out = RemoteChannel(() -> Channel{Vector{Int}}(2), 2)  # A.out on pid 2
    (in, out)
end

# Opposite endpoint (B.in on pid 2, B.out on pid 1)

B = BidirectionalChannel(A)

# On worker 2: receive an array, mutate it in place, send it back

tB = @spawnat 2 begin
    @info "Worker $(myid()) waiting for array"
    arr = take!(B)                 # receives from B.in (pid 2)
    @info "Worker $(myid()) received" arr

    # In-place mutations
    push!(arr, 99)
    arr .*= 10
    arr[1] = -arr[1]

    @info "Worker $(myid()) sending mutated" arr
    put!(B, arr)                   # sends via B.out (pid 1)
    nothing
end

# Main process: send an array, then receive the mutated copy

orig = [1, 2, 3]
@info "Main (pid=$(myid())) sending orig" orig
put!(A, orig)                      # A.out (pid 2)

@info "Main (pid=$(myid())) still has orig (unchanged locally)" orig
mutated = take!(A)                 # A.in (pid 1)
@info "Main (pid=$(myid())) received mutated" mutated
@info "orig is unchanged" orig

fetch(tB) 
close(A)

Interface

  • take!(bi) reads from the inbound side.
  • put!(bi, value) writes to the outbound side.
  • isready(bi) checks inbound readiness.
  • isfull(bi) checks outbound fullness.
  • isopen(bi) checks that both directions are open.
  • wait(bi) waits on inbound readiness/closure.
  • close(bi) closes both directions.

Note: BidirectionalChannel(existing) is the canonical way to create the opposite endpoint; additional peers will share the same channels and coordinate via the same buffers.

About

No description, website, or topics provided.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages