Skip to content

Commit

Permalink
Proxy Version 1.2.0
Browse files Browse the repository at this point in the history
* Support for sequential communication with the datalogger.  If 2 or more clients request data simultaneously the requests will be buffered by the proxy and sent sequentially to the datalogger. Requires the `-buffered` flag to be used.
* Datalogger write timeout.
  • Loading branch information
githubDante committed Dec 10, 2024
1 parent 563512f commit 75378d2
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 14 deletions.
12 changes: 8 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,21 @@
```console
go-solarmanV5-proxy 192.168.1.3 12345
```
* `-debug` flag can be used to see what's going on under the hood :sunglasses:
* `-silent` flag will make the proxy completely silent
* `-bcast` enable the broadcast listener/server
* all messages are logged to stdout for now
* Options
* `-debug` flag can be used to see what's going on under the hood :sunglasses:
* `-silent` flag will make the proxy completely silent
* `-bcast` activates a broadcast listener/server
* `-buffered` will activate sequential communication with the datalogger
* all messages are logged to stdout for now
* Data logger configuration (config_hide.html)
![image](img/logger_tcp_srv.png "Config")

All clients then can be connected to port 8899 of the proxy server

When the `-bcast` flag is used the proxy will respond to logger scan requests. All dataloggers currently connected will be listed.

The `-buffered` flag allows much more stable communication with the inverter when 2 or more clients are used.

---
#### Build

Expand Down
81 changes: 76 additions & 5 deletions client/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,15 @@ import (
"github.com/githubDante/go-solarman-proxy/protocol"
)

const (
writeTimeout = 200 * time.Millisecond // Deadline for socket write operations
)

type loggerBuffer struct {
logger *ClientSolarman
buf []byte
}

// ClientLogger - А data logger connected to the proxy
type ClientLogger struct {
Conn net.Conn
Expand All @@ -20,9 +29,12 @@ type ClientLogger struct {
// Reporting channel for serial numbers
SReporter chan *CommLogger
// Reporting channel on socket disconnection
stoppedCh chan *CommLogger
Running bool
Id uint32
stoppedCh chan *CommLogger
Running bool
Id uint32
waitingForData bool
dataBuffer []*loggerBuffer
bufferWanted bool
}

// NewLoggerClient - Initializes a new data-logger client
Expand Down Expand Up @@ -82,6 +94,7 @@ func (c *ClientLogger) Run() {
}
}

c.waitingForData = false
go c.sendToAll(buffer[:pLen])
}
}
Expand All @@ -97,6 +110,14 @@ func (c *ClientLogger) Stop() {
}
}

// EnableBuffering activates the logger write buffer. All messages will be sent sequentially
//
// The responses from the logger are still broadcasted to all clients
func (c *ClientLogger) EnableBuffering() {
log.LogDebugf("Logger <%p> write buffer activated.\n", c)
c.bufferWanted = true
}

// sendToAll packet broadcast to all connected clients
func (c *ClientLogger) sendToAll(data []byte) {
c.lock.Lock()
Expand All @@ -122,6 +143,13 @@ func (c *ClientLogger) sendToAll(data []byte) {

log.LogDebugf("Logger <%p> data sent to all [%d] clients...\n", c, len(c.Clients))
log.LogDebugf("Logger <%p> data: %s\n", c, hex.EncodeToString(data))

if c.pendingInBuffer() {
buf := c.getFromBuffer()
if buf != nil {
c.Send(buf.buf, buf.logger)
}
}
}

func (c *ClientLogger) Add(cl *ClientSolarman) {
Expand All @@ -138,8 +166,24 @@ func (c *ClientLogger) Send(data []byte, from *ClientSolarman) {
if !c.Running {
return
}
if c.waitingForData && c.bufferWanted {
c.addToBuffer(data, from)
return
}
log.LogDebugf("Logger <%p> sending data from <%p>\n", c, from)
c.Conn.Write(data)
c.Conn.SetWriteDeadline(time.Now().Add(writeTimeout))
c.waitingForData = true
_, err := c.Conn.Write(data)
if err != nil {
log.LogErrorf("Cannot communicate with logger <%p>\n", c)
log.LogWarnf("Logger <%p> will be disconnected!\n", c)
c.Stop()
} else {
if c.bufferWanted {
log.LogInfof("Logger <%p> sending complete. Waiting for data [%t]\n", c, c.waitingForData)
}
}

}

// DumpClients drops all ClientSolarman instances associated with the logger and returns them as a slice
Expand All @@ -155,5 +199,32 @@ func (c *ClientLogger) DumpClients() []*ClientSolarman {
// serialProbe send a predefined packet to the datalogger in order to acquire the serial number
func (c *ClientLogger) serialProbe() {
probe := ReadHolding
c.Conn.Write(probe.ToBytes())
c.Conn.SetWriteDeadline(time.Now().Add(writeTimeout))
_, err := c.Conn.Write(probe.ToBytes())
if err != nil {
log.LogErrorf("SerialProbe failed, Cannot communicate with logger <%p>\n", c)
log.LogWarnf("Logger <%p> will be disconnected!\n", c)
c.Stop()
}
}

func (c *ClientLogger) addToBuffer(buffer []byte, logger *ClientSolarman) {
log.LogDebugf("Logger <%p> sending [%d bytes] to write buffer.\n", c, len(buffer))
c.dataBuffer = append(c.dataBuffer, &loggerBuffer{logger: logger, buf: buffer})
}

func (c *ClientLogger) getFromBuffer() *loggerBuffer {
if len(c.dataBuffer) == 0 {
return nil
}
log.LogDebugf("Logger <%p> write buffer len [%d].\n", c, len(c.dataBuffer))
top := c.dataBuffer[0]
c.dataBuffer = c.dataBuffer[1:]
log.LogDebugf("Logger <%p> got [%d bytes] message from buffer. Pending messages [%d].\n",
c, len(top.buf), len(c.dataBuffer))
return top
}

func (c *ClientLogger) pendingInBuffer() bool {
return len(c.dataBuffer) > 0
}
3 changes: 2 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ func main() {
debug := flag.Bool("debug", false, "enable debug logging")
silent := flag.Bool("silent", false, "enable silent mode")
bcast := flag.Bool("bcast", false, "enable the broadcast listener")
buffer := flag.Bool("buffered", false, "enable the logger write buffer (sequential client communication)")
flag.Parse()
args := flag.Args()

Expand All @@ -41,7 +42,7 @@ func main() {
log.EnableSilent()
}
proxy := server.NewProxy(ip, int(port))
err = proxy.Serve(*bcast)
err = proxy.Serve(*bcast, *buffer)
if err != nil {
log.LogErrorf("Proxy start error: %s\n", err.Error())
os.Exit(1)
Expand Down
10 changes: 6 additions & 4 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (s *V5ProxyServer) Wait() {
}

// Serve - Creates listeners and starts the proxy loops
func (s *V5ProxyServer) Serve(enableBroadcast bool) error {
func (s *V5ProxyServer) Serve(enableBroadcast, loggerBuffering bool) error {

var err error
s.clientsL, err = net.Listen("tcp4", fmt.Sprintf("%s:%d", "0.0.0.0", s.ClientsPort))
Expand All @@ -82,7 +82,7 @@ func (s *V5ProxyServer) Serve(enableBroadcast bool) error {
}

log.LogInfof("[Proxy] sockets created. ClientPort [%d] - LoggersPort [%d]\n", 8899, s.LoggersPort)
go s.loggersConn()
go s.loggersConn(loggerBuffering)
go s.clientsConn()
go s.handleBroadcasts()
go s.janitor()
Expand All @@ -95,7 +95,7 @@ func (s *V5ProxyServer) Serve(enableBroadcast bool) error {
}

// loggersConn Connection manager for data logger connections
func (s *V5ProxyServer) loggersConn() {
func (s *V5ProxyServer) loggersConn(enableBuffering bool) {

log.LogInfof("[Loggers-Proxy] waiting for logger connections\n")

Expand All @@ -122,7 +122,9 @@ func (s *V5ProxyServer) loggersConn() {
}
cl := client.NewLoggerClient(conn, s.loggersComm, s.loggerStopped)
s.martians[cl.Id] = cl

if enableBuffering {
cl.EnableBuffering()
}
go cl.Run()
}
}
Expand Down

0 comments on commit 75378d2

Please sign in to comment.