diff --git a/README.md b/README.md index b7b8a26..256c484 100644 --- a/README.md +++ b/README.md @@ -22,10 +22,12 @@ ```console go-solarmanV5-proxy 192.168.1.3 12345 ``` - * `-debug` flag can be used to see what's going on under the hood :sunglasses: - * `-silent` flag will make the proxy completely silent - * `-bcast` enable the broadcast listener/server - * all messages are logged to stdout for now + * Options + * `-debug` flag can be used to see what's going on under the hood :sunglasses: + * `-silent` flag will make the proxy completely silent + * `-bcast` activates a broadcast listener/server + * `-buffered` will activate sequential communication with the datalogger + * all messages are logged to stdout for now * Data logger configuration (config_hide.html) ![image](img/logger_tcp_srv.png "Config") @@ -33,6 +35,8 @@ All clients then can be connected to port 8899 of the proxy server When the `-bcast` flag is used the proxy will respond to logger scan requests. All dataloggers currently connected will be listed. +The `-buffered` flag allows much more stable communication with the inverter when 2 or more clients are used. + --- #### Build diff --git a/client/logger.go b/client/logger.go index 5f1909e..c46b9ec 100644 --- a/client/logger.go +++ b/client/logger.go @@ -10,6 +10,15 @@ import ( "github.com/githubDante/go-solarman-proxy/protocol" ) +const ( + writeTimeout = 200 * time.Millisecond // Deadline for socket write operations +) + +type loggerBuffer struct { + logger *ClientSolarman + buf []byte +} + // ClientLogger - А data logger connected to the proxy type ClientLogger struct { Conn net.Conn @@ -20,9 +29,12 @@ type ClientLogger struct { // Reporting channel for serial numbers SReporter chan *CommLogger // Reporting channel on socket disconnection - stoppedCh chan *CommLogger - Running bool - Id uint32 + stoppedCh chan *CommLogger + Running bool + Id uint32 + waitingForData bool + dataBuffer []*loggerBuffer + bufferWanted bool } // NewLoggerClient - Initializes a new data-logger client @@ -82,6 +94,7 @@ func (c *ClientLogger) Run() { } } + c.waitingForData = false go c.sendToAll(buffer[:pLen]) } } @@ -97,6 +110,14 @@ func (c *ClientLogger) Stop() { } } +// EnableBuffering activates the logger write buffer. All messages will be sent sequentially +// +// The responses from the logger are still broadcasted to all clients +func (c *ClientLogger) EnableBuffering() { + log.LogDebugf("Logger <%p> write buffer activated.\n", c) + c.bufferWanted = true +} + // sendToAll packet broadcast to all connected clients func (c *ClientLogger) sendToAll(data []byte) { c.lock.Lock() @@ -122,6 +143,13 @@ func (c *ClientLogger) sendToAll(data []byte) { log.LogDebugf("Logger <%p> data sent to all [%d] clients...\n", c, len(c.Clients)) log.LogDebugf("Logger <%p> data: %s\n", c, hex.EncodeToString(data)) + + if c.pendingInBuffer() { + buf := c.getFromBuffer() + if buf != nil { + c.Send(buf.buf, buf.logger) + } + } } func (c *ClientLogger) Add(cl *ClientSolarman) { @@ -138,8 +166,24 @@ func (c *ClientLogger) Send(data []byte, from *ClientSolarman) { if !c.Running { return } + if c.waitingForData && c.bufferWanted { + c.addToBuffer(data, from) + return + } log.LogDebugf("Logger <%p> sending data from <%p>\n", c, from) - c.Conn.Write(data) + c.Conn.SetWriteDeadline(time.Now().Add(writeTimeout)) + c.waitingForData = true + _, err := c.Conn.Write(data) + if err != nil { + log.LogErrorf("Cannot communicate with logger <%p>\n", c) + log.LogWarnf("Logger <%p> will be disconnected!\n", c) + c.Stop() + } else { + if c.bufferWanted { + log.LogInfof("Logger <%p> sending complete. Waiting for data [%t]\n", c, c.waitingForData) + } + } + } // DumpClients drops all ClientSolarman instances associated with the logger and returns them as a slice @@ -155,5 +199,32 @@ func (c *ClientLogger) DumpClients() []*ClientSolarman { // serialProbe send a predefined packet to the datalogger in order to acquire the serial number func (c *ClientLogger) serialProbe() { probe := ReadHolding - c.Conn.Write(probe.ToBytes()) + c.Conn.SetWriteDeadline(time.Now().Add(writeTimeout)) + _, err := c.Conn.Write(probe.ToBytes()) + if err != nil { + log.LogErrorf("SerialProbe failed, Cannot communicate with logger <%p>\n", c) + log.LogWarnf("Logger <%p> will be disconnected!\n", c) + c.Stop() + } +} + +func (c *ClientLogger) addToBuffer(buffer []byte, logger *ClientSolarman) { + log.LogDebugf("Logger <%p> sending [%d bytes] to write buffer.\n", c, len(buffer)) + c.dataBuffer = append(c.dataBuffer, &loggerBuffer{logger: logger, buf: buffer}) +} + +func (c *ClientLogger) getFromBuffer() *loggerBuffer { + if len(c.dataBuffer) == 0 { + return nil + } + log.LogDebugf("Logger <%p> write buffer len [%d].\n", c, len(c.dataBuffer)) + top := c.dataBuffer[0] + c.dataBuffer = c.dataBuffer[1:] + log.LogDebugf("Logger <%p> got [%d bytes] message from buffer. Pending messages [%d].\n", + c, len(top.buf), len(c.dataBuffer)) + return top +} + +func (c *ClientLogger) pendingInBuffer() bool { + return len(c.dataBuffer) > 0 } diff --git a/main.go b/main.go index 920d300..2628453 100644 --- a/main.go +++ b/main.go @@ -20,6 +20,7 @@ func main() { debug := flag.Bool("debug", false, "enable debug logging") silent := flag.Bool("silent", false, "enable silent mode") bcast := flag.Bool("bcast", false, "enable the broadcast listener") + buffer := flag.Bool("buffered", false, "enable the logger write buffer (sequential client communication)") flag.Parse() args := flag.Args() @@ -41,7 +42,7 @@ func main() { log.EnableSilent() } proxy := server.NewProxy(ip, int(port)) - err = proxy.Serve(*bcast) + err = proxy.Serve(*bcast, *buffer) if err != nil { log.LogErrorf("Proxy start error: %s\n", err.Error()) os.Exit(1) diff --git a/server/server.go b/server/server.go index c2e81d2..26ce2a2 100644 --- a/server/server.go +++ b/server/server.go @@ -68,7 +68,7 @@ func (s *V5ProxyServer) Wait() { } // Serve - Creates listeners and starts the proxy loops -func (s *V5ProxyServer) Serve(enableBroadcast bool) error { +func (s *V5ProxyServer) Serve(enableBroadcast, loggerBuffering bool) error { var err error s.clientsL, err = net.Listen("tcp4", fmt.Sprintf("%s:%d", "0.0.0.0", s.ClientsPort)) @@ -82,7 +82,7 @@ func (s *V5ProxyServer) Serve(enableBroadcast bool) error { } log.LogInfof("[Proxy] sockets created. ClientPort [%d] - LoggersPort [%d]\n", 8899, s.LoggersPort) - go s.loggersConn() + go s.loggersConn(loggerBuffering) go s.clientsConn() go s.handleBroadcasts() go s.janitor() @@ -95,7 +95,7 @@ func (s *V5ProxyServer) Serve(enableBroadcast bool) error { } // loggersConn Connection manager for data logger connections -func (s *V5ProxyServer) loggersConn() { +func (s *V5ProxyServer) loggersConn(enableBuffering bool) { log.LogInfof("[Loggers-Proxy] waiting for logger connections\n") @@ -122,7 +122,9 @@ func (s *V5ProxyServer) loggersConn() { } cl := client.NewLoggerClient(conn, s.loggersComm, s.loggerStopped) s.martians[cl.Id] = cl - + if enableBuffering { + cl.EnableBuffering() + } go cl.Run() } }