Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,10 @@ func PeerLine(idx int, peer tsnet.Peer, peerData tsnet.PeerData) []string {
switch peerData.Status {
case tsnet.NotLinked:
// leave uncolored
case tsnet.Connecting:
case tsnet.SentConn:
idxStr = tcolor.Inverse + Color16(tcolor.BrightYellow, idxStr)
case tsnet.ReceivedConn:
idxStr = tcolor.Inverse + Color16(tcolor.BrightBlue, idxStr)
case tsnet.Failed:
idxStr = tcolor.Inverse + Color16(tcolor.BrightRed, idxStr)
case tsnet.Connected:
Expand Down
54 changes: 44 additions & 10 deletions tsnet/tsnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,10 @@ type ConnectionStatus int
const (
// NotLinked is the initial state once discovered.
NotLinked ConnectionStatus = iota
// Connecting is the state when a connection is being established.
Connecting
// SentConn is the state when a connection is being established.
SentConn
// ReceivedConn is the state when a connection request has been received.
ReceivedConn
// Connected is the state when a connection has been established.
Connected
// Failed is the state when a connection has failed.
Expand All @@ -72,10 +74,16 @@ type Server struct {
cancel context.CancelFunc
wg sync.WaitGroup
Peers *smap.Map[Peer, PeerData]
Sources *smap.Map[Source, Peer] // maps ip,port to peer
idStr string
epoch atomic.Int32 // set to negative when stopped, panics after 2B ticks/if it wraps.
}

type Source struct {
IP string
Port int
}

type Peer struct {
IP string
Name string
Expand All @@ -91,7 +99,11 @@ type PeerData struct {
}

func (c *Config) NewServer() *Server {
return &Server{Config: *c, Peers: smap.New[Peer, PeerData]()}
return &Server{
Config: *c,
Peers: smap.New[Peer, PeerData](),
Sources: smap.New[Source, Peer](),
}
}

func (s *Server) Start(ctx context.Context) error {
Expand Down Expand Up @@ -212,15 +224,19 @@ func (s *Server) runAdv(ctx context.Context) {

func (s *Server) PeersCleanup() {
var toDelete []Peer
var toDeleteSources []Source
now := time.Now()
for peer, data := range s.Peers.All() {
if now.Sub(data.LastSeen) > s.PeerTimeout {
toDelete = append(toDelete, peer)
src := Source{IP: peer.IP, Port: data.Port}
toDeleteSources = append(toDeleteSources, src)
}
}
if len(toDelete) > 0 {
log.Infof("Removing %d expired peers: %v", len(toDelete), toDelete)
s.Peers.Delete(toDelete...)
s.Sources.Delete(toDeleteSources...) // TODO share lock/transaction.
}
}

Expand Down Expand Up @@ -318,6 +334,10 @@ func (s *Server) runMulticastReceive(ctx context.Context) {
if v.Port != data.Port {
log.Infof("Peer %q port changed from %d to %d", peer, v.Port, data.Port)
data.Status = NotLinked
src := Source{IP: peer.IP, Port: v.Port} // old source to delete
s.Sources.Delete(src)
src.Port = data.Port
s.Sources.Set(src, peer)
}
// Update last seen and epoch
s.change(s.Peers.Set(peer, data))
Expand All @@ -330,6 +350,8 @@ func (s *Server) runMulticastReceive(ctx context.Context) {
data.HumanHash = "BAD-PKEY"
}
nv := s.Peers.Set(peer, data)
src := Source{IP: peer.IP, Port: data.Port}
s.Sources.Set(src, peer)
log.S(log.Info, "New peer", log.Any("count", s.Peers.Len()),
log.Any("Peer", peer), log.Any("Data", data))
s.change(nv)
Expand Down Expand Up @@ -456,30 +478,42 @@ func (s *Server) ConnectToPeer(peer Peer) error {
return err
}
// Update status to sent = connecting
peerData.Status = Connecting
peerData.Status = SentConn
s.Peers.Set(peer, peerData)
log.Infof("Connection request sent to %s (%s)", peer.Name, peer.IP)
return nil
}

// handleDirectMessage processes incoming direct connection messages.
func (s *Server) handleDirectMessage(buf []byte, addr *net.UDPAddr) {
func (s *Server) handleDirectMessage(buf []byte, from *net.UDPAddr) {
msgStr := string(buf)

// Try to parse as connection request
var requesterName, targetName string
if n, err := fmt.Sscanf(msgStr, ConnectMessageFormat, &requesterName, &targetName); err == nil && n == 2 {
s.handleConnectionRequest(requesterName, targetName)
s.handleConnectionRequest(from, requesterName, targetName)
return
}

log.Warnf("Unknown direct message format from %v: %q", addr, msgStr)
log.Warnf("Unknown direct message format from %v: %q", from, msgStr)
}

// handleConnectionRequest processes incoming connection requests.
func (s *Server) handleConnectionRequest(requesterName, targetName string) {
log.Infof("Received connection request from %s to %s", requesterName, targetName)

func (s *Server) handleConnectionRequest(from *net.UDPAddr, requesterName, targetName string) {
log.Infof("Received connection request from %v: %v to %v", from, requesterName, targetName)
src := Source{IP: from.IP.String(), Port: from.Port}
peer, exists := s.Sources.Get(src)
if !exists {
log.Errf("Connection request from unknown source %v (not in source to peer map)", src)
return
}
pData, found := s.Peers.Get(peer)
if !found {
log.Errf("Connection request from unknown peer %v (not in discovery map)", peer)
return
}
pData.Status = ReceivedConn
s.Peers.Set(peer, pData)
// Check if the target name matches our name
if targetName != s.Name {
log.Warnf("Connection request target name %q doesn't match our name %q", targetName, s.Name)
Expand Down
11 changes: 9 additions & 2 deletions tsnet/tsnet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func NoMCastOnMacInCI(t *testing.T) {
}
}

func TestPeerDiscovery(t *testing.T) {
func TestPeerDiscovery(t *testing.T) { //nolint:gocognit // it's a test.
NoMCastOnMacInCI(t)

log.SetLogLevel(log.Info) // Set to Debug for more verbose output during test debugging
Expand Down Expand Up @@ -135,11 +135,18 @@ func TestPeerDiscovery(t *testing.T) {

// Check that the connection was created on A's side
connA, exists := serverA.Peers.Get(peerB)
if !exists || connA.Status != tsnet.Connecting {
if !exists || connA.Status != tsnet.SentConn {
t.Fatal("Connection from A to B not found in A's connection map")
}
t.Logf("✓ Connection created on A's side: status %v", connA.Status)

// Check that the connection was received on B's side
connB, exists := serverB.Peers.Get(peerA)
if !exists || connB.Status != tsnet.ReceivedConn {
t.Fatal("Connection from A to B not found in B's connection map")
}
t.Logf("✓ Connection received on B's side: status %v", connB.Status)

t.Log("✓ Test completed successfully!")
}

Expand Down