Skip to content

Commit

Permalink
refactor: split UDP serving from handling of packets
Browse files Browse the repository at this point in the history
This will allow us to re-use the handling of packets in the Caddy
server where serving is handled separately.
  • Loading branch information
sbruens committed Nov 18, 2024
1 parent ff61c9f commit 56aa501
Show file tree
Hide file tree
Showing 5 changed files with 162 additions and 118 deletions.
4 changes: 2 additions & 2 deletions cmd/outline-ss-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ func (s *OutlineServer) runConfig(config Config) (func() error, error) {
return err
}
slog.Info("UDP service started.", "address", pc.LocalAddr().String())
go ssService.HandlePacket(pc)
go service.PacketServe(pc, ssService.HandlePacket)
}

for _, serviceConfig := range config.Services {
Expand Down Expand Up @@ -271,7 +271,7 @@ func (s *OutlineServer) runConfig(config Config) (func() error, error) {
return err
}
slog.Info("UDP service started.", "address", pc.LocalAddr().String())
go ssService.HandlePacket(pc)
go service.PacketServe(pc, ssService.HandlePacket)
}
}
totalCipherCount += len(serviceConfig.Keys)
Expand Down
62 changes: 36 additions & 26 deletions internal/integration_test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package integration_test
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"net"
Expand Down Expand Up @@ -104,6 +105,9 @@ func startUDPEchoServer(t testing.TB) (*net.UDPConn, *sync.WaitGroup) {
for {
n, clientAddr, err := conn.ReadFromUDP(buf)
if err != nil {
if errors.Is(err, net.ErrClosed) {
return
}
t.Logf("Failed to read from UDP conn: %v", err)
return
}
Expand Down Expand Up @@ -268,16 +272,25 @@ type fakeUDPConnMetrics struct {
clientAddr net.Addr
accessKey string
up, down []udpRecord
mu sync.Mutex
}

var _ service.UDPConnMetrics = (*fakeUDPConnMetrics)(nil)

func (m *fakeUDPConnMetrics) AddPacketFromClient(status string, clientProxyBytes, proxyTargetBytes int64) {
fmt.Println("===>AddPacketFromClient")
m.mu.Lock()
defer m.mu.Unlock()
m.up = append(m.up, udpRecord{m.clientAddr, m.accessKey, status, clientProxyBytes, proxyTargetBytes})
}

func (m *fakeUDPConnMetrics) AddPacketFromTarget(status string, targetProxyBytes, proxyClientBytes int64) {
fmt.Println("===>AddPacketFromTarget")
m.mu.Lock()
defer m.mu.Unlock()
m.down = append(m.down, udpRecord{m.clientAddr, m.accessKey, status, targetProxyBytes, proxyClientBytes})
}

func (m *fakeUDPConnMetrics) RemoveNatEntry() {
// Not tested because it requires waiting for a long timeout.
}
Expand Down Expand Up @@ -315,7 +328,7 @@ func TestUDPEcho(t *testing.T) {
proxy.SetTargetIPValidator(allowAll)
done := make(chan struct{})
go func() {
proxy.Handle(proxyConn)
service.PacketServe(proxyConn, proxy.Handle)
done <- struct{}{}
}()

Expand Down Expand Up @@ -361,29 +374,26 @@ func TestUDPEcho(t *testing.T) {
snapshot := cipherList.SnapshotForClientIP(netip.Addr{})
keyID := snapshot[0].Value.(*service.CipherEntry).ID

if len(testMetrics.connMetrics) != 1 {
t.Errorf("Wrong NAT count: %d", len(testMetrics.connMetrics))
}
if len(testMetrics.connMetrics[0].up) != 1 {
t.Errorf("Wrong number of packets sent: %v", testMetrics.connMetrics[0].up)
} else {
record := testMetrics.connMetrics[0].up[0]
require.Equal(t, conn.LocalAddr(), record.clientAddr, "Bad upstream metrics")
require.Equal(t, keyID, record.accessKey, "Bad upstream metrics")
require.Equal(t, "OK", record.status, "Bad upstream metrics")
require.Greater(t, record.in, record.out, "Bad upstream metrics")
require.Equal(t, int64(N), record.out, "Bad upstream metrics")
}
if len(testMetrics.connMetrics[0].down) != 1 {
t.Errorf("Wrong number of packets received: %v", testMetrics.connMetrics[0].down)
} else {
record := testMetrics.connMetrics[0].down[0]
require.Equal(t, conn.LocalAddr(), record.clientAddr, "Bad downstream metrics")
require.Equal(t, keyID, record.accessKey, "Bad downstream metrics")
require.Equal(t, "OK", record.status, "Bad downstream metrics")
require.Greater(t, record.out, record.in, "Bad downstream metrics")
require.Equal(t, int64(N), record.in, "Bad downstream metrics")
}
require.Lenf(t, testMetrics.connMetrics, 1, "Wrong NAT count")

testMetrics.connMetrics[0].mu.Lock()
defer testMetrics.connMetrics[0].mu.Unlock()

require.Lenf(t, testMetrics.connMetrics[0].up, 1, "Wrong number of packets sent")
record := testMetrics.connMetrics[0].up[0]
require.Equal(t, conn.LocalAddr(), record.clientAddr, "Bad upstream metrics")
require.Equal(t, keyID, record.accessKey, "Bad upstream metrics")
require.Equal(t, "OK", record.status, "Bad upstream metrics")
require.Greater(t, record.in, record.out, "Bad upstream metrics")
require.Equal(t, int64(N), record.out, "Bad upstream metrics")

require.Lenf(t, testMetrics.connMetrics[0].down, 1, "Wrong number of packets received")
record = testMetrics.connMetrics[0].down[0]
require.Equal(t, conn.LocalAddr(), record.clientAddr, "Bad downstream metrics")
require.Equal(t, keyID, record.accessKey, "Bad downstream metrics")
require.Equal(t, "OK", record.status, "Bad downstream metrics")
require.Greater(t, record.out, record.in, "Bad downstream metrics")
require.Equal(t, int64(N), record.in, "Bad downstream metrics")
}

func BenchmarkTCPThroughput(b *testing.B) {
Expand Down Expand Up @@ -548,7 +558,7 @@ func BenchmarkUDPEcho(b *testing.B) {
proxy.SetTargetIPValidator(allowAll)
done := make(chan struct{})
go func() {
proxy.Handle(server)
service.PacketServe(server, proxy.Handle)
done <- struct{}{}
}()

Expand Down Expand Up @@ -592,7 +602,7 @@ func BenchmarkUDPManyKeys(b *testing.B) {
proxy.SetTargetIPValidator(allowAll)
done := make(chan struct{})
go func() {
proxy.Handle(proxyConn)
service.PacketServe(proxyConn, proxy.Handle)
done <- struct{}{}
}()

Expand Down
6 changes: 3 additions & 3 deletions service/shadowsocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type ServiceMetrics interface {

type Service interface {
HandleStream(ctx context.Context, conn transport.StreamConn)
HandlePacket(conn net.PacketConn)
HandlePacket(conn net.Conn, pkt []byte)
}

// Option is a Shadowsocks service constructor option.
Expand Down Expand Up @@ -137,8 +137,8 @@ func (s *ssService) HandleStream(ctx context.Context, conn transport.StreamConn)
}

// HandlePacket handles a Shadowsocks packet connection.
func (s *ssService) HandlePacket(conn net.PacketConn) {
s.ph.Handle(conn)
func (s *ssService) HandlePacket(conn net.Conn, pkt []byte) {
s.ph.Handle(conn, pkt)
}

type ssConnMetrics struct {
Expand Down
Loading

0 comments on commit 56aa501

Please sign in to comment.