From b06173d5fb287a6087fc4485d61e842a0f001128 Mon Sep 17 00:00:00 2001 From: Souyama Date: Thu, 4 Dec 2025 18:45:59 +0530 Subject: [PATCH 1/4] feat: implement protocol recovery and connection handling in serial transporters --- asciiclient.go | 78 +++++++++++++++++++++++++++++++++++++++++------ rtuclient.go | 82 +++++++++++++++++++++++++++++++++++++++----------- serial.go | 16 ++++++++-- 3 files changed, 147 insertions(+), 29 deletions(-) diff --git a/asciiclient.go b/asciiclient.go index a310372..b05bae1 100644 --- a/asciiclient.go +++ b/asciiclient.go @@ -9,6 +9,8 @@ import ( "context" "encoding/hex" "fmt" + "io" + "syscall" "time" ) @@ -181,17 +183,76 @@ func (mb *asciiSerialTransporter) Send(ctx context.Context, aduRequest []byte) ( mb.lastActivity = time.Now() mb.startCloseTimer() - // Send the request - mb.logf("modbus: send % x\n", aduRequest) - if _, err = mb.port.Write(aduRequest); err != nil { + connDeadline := time.Now().Add(mb.Timeout) + linkRecoveryDeadline := time.Now().Add(mb.LinkRecoveryTimeout) + + for { + // Send the request + mb.logf("modbus: send % x\n", aduRequest) + if _, err = mb.port.Write(aduRequest); err != nil { + if err == io.EOF || err == io.ErrUnexpectedEOF || err == syscall.ECONNRESET { + if time.Now().After(linkRecoveryDeadline) { + err = fmt.Errorf("modbus: link recovery timeout reached: %w", err) + return + } + // reconnect on connection reset + mb.logf("modbus: connection reset, reconnecting") + if cerr := mb.close(); cerr != nil { + mb.logf("modbus: error closing connection: %v", cerr) + return + } + if cerr := mb.connect(ctx); cerr != nil { + mb.logf("modbus: error reconnecting: %v", cerr) + return + } + // retry the communication + continue + } + + return + } + // Get the response + aduResponse, err = readAscii(mb.port, connDeadline) + mb.logf("modbus: recv % x\n", aduResponse) + if err != nil { + if err == io.EOF || err == io.ErrUnexpectedEOF || err == syscall.ECONNRESET { + if time.Now().After(linkRecoveryDeadline) { + err = fmt.Errorf("modbus: link recovery timeout reached: %w", err) + return + } + // reconnect on connection reset + mb.logf("modbus: connection reset, reconnecting") + if cerr := mb.close(); cerr != nil { + mb.logf("modbus: error closing connection: %v", cerr) + return + } + if cerr := mb.connect(ctx); cerr != nil { + mb.logf("modbus: error reconnecting: %v", cerr) + return + } + // retry the communication + continue + } + // Unknown error + mb.logf("modbus: read error: %v", err) + return + } + return } - // Get the response +} + +func readAscii(r io.Reader, deadline time.Time) ([]byte, error) { var n, length int var data [asciiMaxSize]byte + var err error + for { - if n, err = mb.port.Read(data[length:]); err != nil { - return + if time.Now().After(deadline) { + return nil, context.DeadlineExceeded + } + if n, err = r.Read(data[length:]); err != nil { + return nil, err } length += n if length >= asciiMaxSize || n == 0 { @@ -204,9 +265,8 @@ func (mb *asciiSerialTransporter) Send(ctx context.Context, aduRequest []byte) ( } } } - aduResponse = data[:length] - mb.logf("modbus: recv % x\n", aduResponse) - return + + return data[:length], nil } // writeHex encodes byte to string in hexadecimal, e.g. 0xA5 => "A5" diff --git a/rtuclient.go b/rtuclient.go index e1bf12d..abb1907 100644 --- a/rtuclient.go +++ b/rtuclient.go @@ -7,9 +7,9 @@ package modbus import ( "context" "encoding/binary" - "errors" "fmt" "io" + "syscall" "time" ) @@ -168,7 +168,7 @@ func readIncrementally(slaveID, functionCode byte, r io.Reader, deadline time.Ti for { if time.Now().After(deadline) { // Possible that serialport may spew data - return nil, errors.New("failed to read from serial port within deadline") + return nil, context.DeadlineExceeded } if _, err := io.ReadAtLeast(r, buf, 1); err != nil { @@ -264,24 +264,72 @@ func (mb *rtuSerialTransporter) Send(ctx context.Context, aduRequest []byte) (ad mb.lastActivity = time.Now() mb.startCloseTimer() - // Send the request - mb.logf("modbus: send % x\n", aduRequest) - if _, err = mb.port.Write(aduRequest); err != nil { + connDeadline := time.Now().Add(mb.Timeout) + linkRecoveryDeadline := time.Now().Add(mb.LinkRecoveryTimeout) + + for { + // Send the request + mb.logf("modbus: send % x\n", aduRequest) + if _, err = mb.port.Write(aduRequest); err != nil { + if err == io.EOF || err == io.ErrUnexpectedEOF || err == syscall.ECONNRESET { + if time.Now().After(linkRecoveryDeadline) { + err = fmt.Errorf("modbus: link recovery timeout reached: %w", err) + return + } + // reconnect on connection reset + mb.logf("modbus: connection reset, reconnecting") + if cerr := mb.close(); cerr != nil { + mb.logf("modbus: error closing connection: %v", cerr) + return + } + if cerr := mb.connect(ctx); cerr != nil { + mb.logf("modbus: error reconnecting: %v", cerr) + return + } + // retry the communication + continue + } + + return + } + // function := aduRequest[1] + // functionFail := aduRequest[1] & 0x80 + bytesToRead := calculateResponseLength(aduRequest) + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-time.After(mb.calculateDelay(len(aduRequest) + bytesToRead)): + } + + aduResponse, err = readIncrementally(aduRequest[0], aduRequest[1], mb.port, connDeadline) + mb.logf("modbus: recv % x\n", aduResponse[:]) + + if err != nil { + if err == io.EOF || err == io.ErrUnexpectedEOF || err == syscall.ECONNRESET { + if time.Now().After(linkRecoveryDeadline) { + err = fmt.Errorf("modbus: link recovery timeout reached: %w", err) + return + } + // reconnect on connection reset + mb.logf("modbus: connection reset, reconnecting") + if cerr := mb.close(); cerr != nil { + mb.logf("modbus: error closing connection: %v", cerr) + return + } + if cerr := mb.connect(ctx); cerr != nil { + mb.logf("modbus: error reconnecting: %v", cerr) + return + } + // retry the communication + continue + } + // Unknown error + mb.logf("modbus: read error: %v", err) + return + } return } - // function := aduRequest[1] - // functionFail := aduRequest[1] & 0x80 - bytesToRead := calculateResponseLength(aduRequest) - select { - case <-ctx.Done(): - return nil, ctx.Err() - case <-time.After(mb.calculateDelay(len(aduRequest) + bytesToRead)): - } - data, err := readIncrementally(aduRequest[0], aduRequest[1], mb.port, time.Now().Add(mb.Config.Timeout)) - mb.logf("modbus: recv % x\n", data[:]) - aduResponse = data - return } // calculateDelay roughly calculates time needed for the next frame. diff --git a/serial.go b/serial.go index 5ad9692..98f4f8d 100644 --- a/serial.go +++ b/serial.go @@ -25,8 +25,13 @@ type serialPort struct { // Serial port configuration. serial.Config - Logger Logger + Logger Logger + // IdleTimeout is the duration to close the connection when no activity. IdleTimeout time.Duration + // Silent period after successful connection + ConnectDelay time.Duration + // Recovery timeout if the connection is lost + LinkRecoveryTimeout time.Duration mu sync.Mutex // port is platform-dependent data structure for serial port. @@ -52,9 +57,14 @@ func (mb *serialPort) connect(ctx context.Context) error { if mb.port == nil { port, err := serial.Open(&mb.Config) if err != nil { - return fmt.Errorf("could not open %s: %w", mb.Config.Address, err) + return fmt.Errorf("could not open %s: %w", mb.Address, err) } mb.port = port + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(mb.ConnectDelay): //silent period + } } return nil } @@ -103,6 +113,6 @@ func (mb *serialPort) closeIdle() { if idle := time.Since(mb.lastActivity); idle >= mb.IdleTimeout { mb.logf("modbus: closing connection due to idle timeout: %v", idle) - mb.close() + _ = mb.close() } } From 81d1a8b867d70c4c68c1c32b82d916d305aada06 Mon Sep 17 00:00:00 2001 From: Souyama Date: Thu, 4 Dec 2025 19:17:21 +0530 Subject: [PATCH 2/4] Lint issue fix --- asciiclient.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/asciiclient.go b/asciiclient.go index b05bae1..bc78051 100644 --- a/asciiclient.go +++ b/asciiclient.go @@ -212,7 +212,7 @@ func (mb *asciiSerialTransporter) Send(ctx context.Context, aduRequest []byte) ( return } // Get the response - aduResponse, err = readAscii(mb.port, connDeadline) + aduResponse, err = readASCII(mb.port, connDeadline) mb.logf("modbus: recv % x\n", aduResponse) if err != nil { if err == io.EOF || err == io.ErrUnexpectedEOF || err == syscall.ECONNRESET { @@ -242,7 +242,7 @@ func (mb *asciiSerialTransporter) Send(ctx context.Context, aduRequest []byte) ( } } -func readAscii(r io.Reader, deadline time.Time) ([]byte, error) { +func readASCII(r io.Reader, deadline time.Time) ([]byte, error) { var n, length int var data [asciiMaxSize]byte var err error From 8fb2118fcc3b1f94298d87b2f47161757d5d9cae Mon Sep 17 00:00:00 2001 From: Souyama Date: Sun, 7 Dec 2025 10:59:38 +0530 Subject: [PATCH 3/4] feat: add PTY tests for RTUSerialTransporter functionality --- rtu_transport_test.go | 117 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 117 insertions(+) create mode 100644 rtu_transport_test.go diff --git a/rtu_transport_test.go b/rtu_transport_test.go new file mode 100644 index 0000000..acb1778 --- /dev/null +++ b/rtu_transport_test.go @@ -0,0 +1,117 @@ +//go:build darwin || linux || freebsd || openbsd || netbsd +// +build darwin linux freebsd openbsd netbsd + +// Copyright 2014 Quoc-Viet Nguyen. All rights reserved. +// This software may be modified and distributed under the terms +// of the BSD license. See the LICENSE file for details. + +package modbus + +import ( + "bytes" + "context" + "fmt" + "os" + "syscall" + "testing" + "time" + "unsafe" +) + +// openPTY opens a PTY pair and returns the master file and the slave path. +func openPTY() (master *os.File, slavePath string, err error) { + master, err = os.OpenFile("/dev/ptmx", os.O_RDWR, 0) + if err != nil { + return nil, "", err + } + + // unlockpt + var unlock int32 + // TIOCSPTLCK + _, _, errno := syscall.Syscall(syscall.SYS_IOCTL, master.Fd(), syscall.TIOCSPTLCK, uintptr(unsafe.Pointer(&unlock))) + if errno != 0 { + master.Close() + return nil, "", errno + } + + // ptsname + var ptyno int32 + // TIOCGPTN + _, _, errno = syscall.Syscall(syscall.SYS_IOCTL, master.Fd(), syscall.TIOCGPTN, uintptr(unsafe.Pointer(&ptyno))) + if errno != 0 { + master.Close() + return nil, "", errno + } + + slavePath = fmt.Sprintf("/dev/pts/%d", ptyno) + return master, slavePath, nil +} + +func TestRTUSerialTransporter_Send_PTY(t *testing.T) { + master, slavePath, err := openPTY() + if err != nil { + t.Skipf("Skipping PTY test: %v", err) + } + defer master.Close() + + // Request: 01 03 00 00 00 01 84 0A (Read Holding Registers) + // Response: 01 03 02 00 00 B8 44 + req := []byte{0x01, 0x03, 0x00, 0x00, 0x00, 0x01, 0x84, 0x0A} + resp := []byte{0x01, 0x03, 0x02, 0x00, 0x00, 0xB8, 0x44} + + transporter := &rtuSerialTransporter{} + transporter.Address = slavePath + transporter.BaudRate = 19200 + transporter.Timeout = 1 * time.Second + + // Start a goroutine to read request and write response to master + go func() { + buf := make([]byte, 1024) + n, err := master.Read(buf) + if err != nil { + return + } + if !bytes.Equal(buf[:n], req) { + // t.Errorf would be racy here, just log or ignore + return + } + // Write response + _, err = master.Write(resp) + if err != nil { + t.Errorf("Failed to write response: %v", err) + } + }() + + ctx := context.Background() + aduResponse, err := transporter.Send(ctx, req) + if err != nil { + t.Fatalf("Send failed: %v", err) + } + + if !bytes.Equal(aduResponse, resp) { + t.Errorf("Expected response %x, got %x", resp, aduResponse) + } +} + +func TestRTUSerialTransporter_Timeout_PTY(t *testing.T) { + master, slavePath, err := openPTY() + if err != nil { + t.Skipf("Skipping PTY test: %v", err) + } + defer master.Close() + + req := []byte{0x01, 0x03, 0x00, 0x00, 0x00, 0x01, 0x84, 0x0A} + + transporter := &rtuSerialTransporter{} + transporter.Address = slavePath + transporter.BaudRate = 19200 + transporter.Timeout = 100 * time.Millisecond + + // Don't write anything to master + + ctx := context.Background() + _, err = transporter.Send(ctx, req) + if err == nil { + t.Fatal("Expected timeout error, got nil") + } +} From ea9f4665a0b01338d4ad10b35b230c6c1f4ed00e Mon Sep 17 00:00:00 2001 From: Souyama Date: Sun, 7 Dec 2025 11:05:33 +0530 Subject: [PATCH 4/4] feat: add PTY tests for ASCIISerialTransporter functionality --- ascii_transport_test.go | 89 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 89 insertions(+) create mode 100644 ascii_transport_test.go diff --git a/ascii_transport_test.go b/ascii_transport_test.go new file mode 100644 index 0000000..b07ccde --- /dev/null +++ b/ascii_transport_test.go @@ -0,0 +1,89 @@ +//go:build darwin || linux || freebsd || openbsd || netbsd +// +build darwin linux freebsd openbsd netbsd + +// Copyright 2014 Quoc-Viet Nguyen. All rights reserved. +// This software may be modified and distributed under the terms +// of the BSD license. See the LICENSE file for details. + +package modbus + +import ( + "bytes" + "context" + "testing" + "time" +) + +func TestASCIISerialTransporter_Send_PTY(t *testing.T) { + master, slavePath, err := openPTY() + if err != nil { + t.Skipf("Skipping PTY test: %v", err) + } + defer master.Close() + + // Request: 01 03 00 00 00 01 (Read Holding Registers) + // ASCII: :010300000001FB\r\n + reqASCII := []byte(":010300000001FB\r\n") + + // Response: 01 03 02 00 00 + // ASCII: :0103020000FA\r\n + respASCII := []byte(":0103020000FA\r\n") + + transporter := &asciiSerialTransporter{} + transporter.Address = slavePath + transporter.BaudRate = 19200 + transporter.Timeout = 1 * time.Second + transporter.IdleTimeout = serialIdleTimeout + + // Start a goroutine to read request and write response to master + go func() { + buf := make([]byte, 1024) + n, err := master.Read(buf) + if err != nil { + return + } + if !bytes.Equal(buf[:n], reqASCII) { + // t.Errorf would be racy here, just log or ignore + return + } + // Write response + _, err = master.Write(respASCII) + if err != nil { + t.Errorf("Failed to write response: %v", err) + } + }() + + ctx := context.Background() + aduResponse, err := transporter.Send(ctx, reqASCII) + if err != nil { + t.Fatalf("Send failed: %v", err) + } + + if !bytes.Equal(aduResponse, respASCII) { + t.Errorf("Expected response %s, got %s", respASCII, aduResponse) + } +} + +func TestASCIISerialTransporter_Timeout_PTY(t *testing.T) { + master, slavePath, err := openPTY() + if err != nil { + t.Skipf("Skipping PTY test: %v", err) + } + defer master.Close() + + reqASCII := []byte(":010300000001FB\r\n") + + transporter := &asciiSerialTransporter{} + transporter.Address = slavePath + transporter.BaudRate = 19200 + transporter.Timeout = 100 * time.Millisecond + transporter.IdleTimeout = serialIdleTimeout + + // Don't write anything to master + + ctx := context.Background() + _, err = transporter.Send(ctx, reqASCII) + if err == nil { + t.Fatal("Expected timeout error, got nil") + } +}