Skip to content

Commit

Permalink
Simplify packet handling with a new association struct.
Browse files Browse the repository at this point in the history
  • Loading branch information
sbruens committed Dec 18, 2024
1 parent 078032c commit 667ba51
Show file tree
Hide file tree
Showing 6 changed files with 296 additions and 305 deletions.
24 changes: 22 additions & 2 deletions caddy/shadowsocks_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,22 @@ package caddy

import (
"container/list"
"errors"
"fmt"
"log/slog"
"net"

"github.com/Jigsaw-Code/outline-sdk/transport"
"github.com/Jigsaw-Code/outline-sdk/transport/shadowsocks"
outline "github.com/Jigsaw-Code/outline-ss-server/service"
"github.com/caddyserver/caddy/v2"
"github.com/mholt/caddy-l4/layer4"

"github.com/Jigsaw-Code/outline-ss-server/internal/slicepool"
outline "github.com/Jigsaw-Code/outline-ss-server/service"
)

const serverUDPBufferSize = 64 * 1024

const ssModuleName = "layer4.handlers.shadowsocks"

func init() {
Expand Down Expand Up @@ -116,7 +121,22 @@ func (h *ShadowsocksHandler) Handle(cx *layer4.Connection, _ layer4.Handler) err
case transport.StreamConn:
h.service.HandleStream(cx.Context, conn)
case net.Conn:
h.service.HandleAssociation(cx)
assoc, err := h.service.NewAssociation(conn)
if err != nil {
return fmt.Errorf("Failed to handle association: %v", err)
}
bufPool := slicepool.MakePool(serverUDPBufferSize)
for {
lazySlice := bufPool.LazySlice()
buf := lazySlice.Acquire()
n, err := conn.Read(buf)
if errors.Is(err, net.ErrClosed) {
lazySlice.Release()
return err
}
pkt := buf[:n]
go assoc.HandlePacket(pkt, lazySlice)
}
default:
return fmt.Errorf("failed to handle unknown connection type: %t", conn)
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/outline-ss-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ func (s *OutlineServer) runConfig(config Config) (func() error, error) {
return err
}
slog.Info("UDP service started.", "address", pc.LocalAddr().String())
go service.PacketServe(pc, ssService.HandleAssociation, s.serverMetrics)
go service.PacketServe(pc, ssService.NewAssociation, s.serverMetrics)
}

for _, serviceConfig := range config.Services {
Expand Down Expand Up @@ -271,7 +271,7 @@ func (s *OutlineServer) runConfig(config Config) (func() error, error) {
return err
}
slog.Info("UDP service started.", "address", pc.LocalAddr().String())
go service.PacketServe(pc, ssService.HandleAssociation, s.serverMetrics)
go service.PacketServe(pc, ssService.NewAssociation, s.serverMetrics)
}
}
totalCipherCount += len(serviceConfig.Keys)
Expand Down
23 changes: 12 additions & 11 deletions internal/integration_test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,16 +318,14 @@ func TestUDPEcho(t *testing.T) {
if err != nil {
t.Fatal(err)
}
proxy := service.NewAssociationHandler(time.Hour, cipherList, &fakeShadowsocksMetrics{})
proxy := service.NewPacketHandler(time.Hour, cipherList, &fakeShadowsocksMetrics{})

proxy.SetTargetIPValidator(allowAll)
done := make(chan struct{})
natMetrics := &natTestMetrics{}
associationMetrics := &fakeUDPAssocationMetrics{}
go func() {
service.PacketServe(proxyConn, func(conn net.Conn) { proxy.Handle(conn, associationMetrics) }, natMetrics)
done <- struct{}{}
}()
go service.PacketServe(proxyConn, func(conn net.Conn) (service.Association, error) {
return proxy.NewAssociation(conn, associationMetrics)
}, natMetrics)

cryptoKey, err := shadowsocks.NewEncryptionKey(shadowsocks.CHACHA20IETFPOLY1305, secrets[0])
require.NoError(t, err)
Expand Down Expand Up @@ -366,7 +364,6 @@ func TestUDPEcho(t *testing.T) {
echoConn.Close()
echoRunning.Wait()
proxyConn.Close()
<-done
// Verify that the expected metrics were reported.
snapshot := cipherList.SnapshotForClientIP(netip.Addr{})
keyID := snapshot[0].Value.(*service.CipherEntry).ID
Expand Down Expand Up @@ -549,11 +546,13 @@ func BenchmarkUDPEcho(b *testing.B) {
if err != nil {
b.Fatal(err)
}
proxy := service.NewAssociationHandler(time.Hour, cipherList, &fakeShadowsocksMetrics{})
proxy := service.NewPacketHandler(time.Hour, cipherList, &fakeShadowsocksMetrics{})
proxy.SetTargetIPValidator(allowAll)
done := make(chan struct{})
go func() {
service.PacketServe(server, func(conn net.Conn) { proxy.Handle(conn, &service.NoOpUDPAssocationMetrics{}) }, &natTestMetrics{})
service.PacketServe(server, func(conn net.Conn) (service.Association, error) {
return proxy.NewAssociation(conn, nil)
}, &natTestMetrics{})
done <- struct{}{}
}()

Expand Down Expand Up @@ -593,11 +592,13 @@ func BenchmarkUDPManyKeys(b *testing.B) {
if err != nil {
b.Fatal(err)
}
proxy := service.NewAssociationHandler(time.Hour, cipherList, &fakeShadowsocksMetrics{})
proxy := service.NewPacketHandler(time.Hour, cipherList, &fakeShadowsocksMetrics{})
proxy.SetTargetIPValidator(allowAll)
done := make(chan struct{})
go func() {
service.PacketServe(proxyConn, func(conn net.Conn) { proxy.Handle(conn, &service.NoOpUDPAssocationMetrics{}) }, &natTestMetrics{})
service.PacketServe(proxyConn, func(conn net.Conn) (service.Association, error) {
return proxy.NewAssociation(conn, nil)
}, &natTestMetrics{})
done <- struct{}{}
}()

Expand Down
12 changes: 6 additions & 6 deletions service/shadowsocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type ServiceMetrics interface {

type Service interface {
HandleStream(ctx context.Context, conn transport.StreamConn)
HandleAssociation(conn net.Conn)
NewAssociation(conn net.Conn) (Association, error)
}

// Option is a Shadowsocks service constructor option.
Expand All @@ -58,7 +58,7 @@ type ssService struct {
replayCache *ReplayCache

sh StreamHandler
ah AssociationHandler
ph PacketHandler
}

// NewShadowsocksService creates a new Shadowsocks service.
Expand All @@ -85,8 +85,8 @@ func NewShadowsocksService(opts ...Option) (Service, error) {
)
s.sh.SetLogger(s.logger)

s.ah = NewAssociationHandler(s.natTimeout, s.ciphers, &ssConnMetrics{ServiceMetrics: s.metrics, proto: "udp"})
s.ah.SetLogger(s.logger)
s.ph = NewPacketHandler(s.natTimeout, s.ciphers, &ssConnMetrics{ServiceMetrics: s.metrics, proto: "udp"})
s.ph.SetLogger(s.logger)

return s, nil
}
Expand Down Expand Up @@ -137,12 +137,12 @@ func (s *ssService) HandleStream(ctx context.Context, conn transport.StreamConn)
}

// HandleAssociation handles a Shadowsocks packet-based assocation.
func (s *ssService) HandleAssociation(conn net.Conn) {
func (s *ssService) NewAssociation(conn net.Conn) (Association, error) {
var metrics UDPAssocationMetrics
if s.metrics != nil {
metrics = s.metrics.AddOpenUDPAssociation(conn)
}
s.ah.Handle(conn, metrics)
return s.ph.NewAssociation(conn, metrics)
}

type ssConnMetrics struct {
Expand Down
Loading

0 comments on commit 667ba51

Please sign in to comment.