Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: sync.pool for datagram struct #91

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 30 additions & 22 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,29 +43,33 @@ type Server struct {
datagramPool sync.Pool
}

//NewServer returns a new Server
// NewServer returns a new Server
func NewServer() *Server {
return &Server{tlsPeerNameFunc: defaultTlsPeerName, datagramPool: sync.Pool{
New: func() interface{} {
return make([]byte, 65536)
return &Server{
tlsPeerNameFunc: defaultTlsPeerName,
datagramPool: sync.Pool{
New: func() interface{} {
return DatagramMessage{
message: make([]byte, 65536),
client: "",
}
},
},
},

datagramChannelSize: datagramChannelBufferSize,
}
}

//Sets the syslog format (RFC3164 or RFC5424 or RFC6587)
// Sets the syslog format (RFC3164 or RFC5424 or RFC6587)
func (s *Server) SetFormat(f format.Format) {
s.format = f
}

//Sets the handler, this handler with receive every syslog entry
// Sets the handler, this handler with receive every syslog entry
func (s *Server) SetHandler(handler Handler) {
s.handler = handler
}

//Sets the connection timeout for TCP connections, in milliseconds
// Sets the connection timeout for TCP connections, in milliseconds
func (s *Server) SetTimeout(millseconds int64) {
s.readTimeoutMilliseconds = millseconds
}
Expand All @@ -89,7 +93,7 @@ func defaultTlsPeerName(tlsConn *tls.Conn) (tlsPeer string, ok bool) {
return cn, true
}

//Configure the server for listen on an UDP addr
// Configure the server for listen on an UDP addr
func (s *Server) ListenUDP(addr string) error {
udpAddr, err := net.ResolveUDPAddr("udp", addr)
if err != nil {
Expand All @@ -106,7 +110,7 @@ func (s *Server) ListenUDP(addr string) error {
return nil
}

//Configure the server for listen on an unix socket
// Configure the server for listen on an unix socket
func (s *Server) ListenUnixgram(addr string) error {
unixAddr, err := net.ResolveUnixAddr("unixgram", addr)
if err != nil {
Expand All @@ -123,7 +127,7 @@ func (s *Server) ListenUnixgram(addr string) error {
return nil
}

//Configure the server for listen on a TCP addr
// Configure the server for listen on a TCP addr
func (s *Server) ListenTCP(addr string) error {
tcpAddr, err := net.ResolveTCPAddr("tcp", addr)
if err != nil {
Expand All @@ -140,7 +144,7 @@ func (s *Server) ListenTCP(addr string) error {
return nil
}

//Configure the server for listen on a TCP addr for TLS
// Configure the server for listen on a TCP addr for TLS
func (s *Server) ListenTCPTLS(addr string, config *tls.Config) error {
listener, err := tls.Listen("tcp", addr, config)
if err != nil {
Expand All @@ -152,7 +156,7 @@ func (s *Server) ListenTCPTLS(addr string, config *tls.Config) error {
return nil
}

//Starts the server, all the go routines goes to live
// Starts the server, all the go routines goes to live
func (s *Server) Boot() error {
if s.format == nil {
return errors.New("please set a valid format")
Expand Down Expand Up @@ -278,12 +282,12 @@ func (s *Server) parser(line []byte, client string, tlsPeer string) {
s.handler.Handle(logParts, int64(len(line)), err)
}

//Returns the last error
// Returns the last error
func (s *Server) GetLastError() error {
return s.lastError
}

//Kill the server
// Kill the server
func (s *Server) Kill() error {
for _, connection := range s.connections {
err := connection.Close()
Expand All @@ -308,7 +312,7 @@ func (s *Server) Kill() error {
return nil
}

//Waits until the server stops
// Waits until the server stops
func (s *Server) Wait() {
s.wait.Wait()
}
Expand All @@ -333,18 +337,20 @@ func (s *Server) goReceiveDatagrams(packetconn net.PacketConn) {
go func() {
defer s.wait.Done()
for {
buf := s.datagramPool.Get().([]byte)
n, addr, err := packetconn.ReadFrom(buf)
msg := s.datagramPool.Get().(DatagramMessage)
n, addr, err := packetconn.ReadFrom(msg.message)
if err == nil {
// Ignore trailing control characters and NULs
for ; (n > 0) && (buf[n-1] < 32); n-- {
for ; (n > 0) && (msg.message[n-1] < 32); n-- {
}
if n > 0 {
var address string
if addr != nil {
address = addr.String()
}
s.datagramChannel <- DatagramMessage{buf[:n], address}
msg.client = address
msg.message = msg.message[:n]
s.datagramChannel <- msg
}
} else {
// there has been an error. Either the server has been killed
Expand Down Expand Up @@ -379,7 +385,9 @@ func (s *Server) goParseDatagrams() {
} else {
s.parser(msg.message, msg.client, "")
}
s.datagramPool.Put(msg.message[:cap(msg.message)])

msg.message = msg.message[:cap(msg.message)] // reset
s.datagramPool.Put(msg)
}
}
}()
Expand Down