diff --git a/cmd/outline-ss-server/main.go b/cmd/outline-ss-server/main.go index 2c1da142..87860df7 100644 --- a/cmd/outline-ss-server/main.go +++ b/cmd/outline-ss-server/main.go @@ -87,7 +87,7 @@ func (s *SSServer) startPort(portNum int) error { port := &ssPort{tcpListener: listener, packetConn: packetConn, cipherList: service.NewCipherList()} authFunc := service.NewShadowsocksStreamAuthenticator(port.cipherList, &s.replayCache, s.m) // TODO: Register initial data metrics at zero. - tcpHandler := service.NewTCPHandler(portNum, authFunc, s.m, tcpReadTimeout) + tcpHandler := service.NewTCPHandler(listener.Addr().String(), authFunc, s.m, tcpReadTimeout) packetHandler := service.NewPacketHandler(s.natTimeout, port.cipherList, s.m) s.ports[portNum] = port go service.StreamServe(service.WrapStreamListener(listener.AcceptTCP), tcpHandler.Handle) diff --git a/cmd/outline-ss-server/metrics.go b/cmd/outline-ss-server/metrics.go index 531c16ba..e95ceeb3 100644 --- a/cmd/outline-ss-server/metrics.go +++ b/cmd/outline-ss-server/metrics.go @@ -18,7 +18,6 @@ import ( "fmt" "net" "net/netip" - "strconv" "sync" "time" @@ -357,8 +356,8 @@ func (m *outlineMetrics) RemoveUDPNatEntry(clientAddr net.Addr, accessKey string } } -func (m *outlineMetrics) AddTCPProbe(status, drainResult string, port int, clientProxyBytes int64) { - m.tcpProbes.WithLabelValues(strconv.Itoa(port), status, drainResult).Observe(float64(clientProxyBytes)) +func (m *outlineMetrics) AddTCPProbe(status, drainResult string, listenerId string, clientProxyBytes int64) { + m.tcpProbes.WithLabelValues(listenerId, status, drainResult).Observe(float64(clientProxyBytes)) } func (m *outlineMetrics) AddTCPCipherSearch(accessKeyFound bool, timeToCipher time.Duration) { diff --git a/cmd/outline-ss-server/metrics_test.go b/cmd/outline-ss-server/metrics_test.go index 353520e4..80e81817 100644 --- a/cmd/outline-ss-server/metrics_test.go +++ b/cmd/outline-ss-server/metrics_test.go @@ -68,7 +68,7 @@ func TestMethodsDontPanic(t *testing.T) { ssMetrics.AddUDPPacketFromTarget(ipInfo, "3", "OK", 10, 20) ssMetrics.AddUDPNatEntry(fakeAddr("127.0.0.1:9"), "key-1") ssMetrics.RemoveUDPNatEntry(fakeAddr("127.0.0.1:9"), "key-1") - ssMetrics.AddTCPProbe("ERR_CIPHER", "eof", 443, proxyMetrics.ClientProxy) + ssMetrics.AddTCPProbe("ERR_CIPHER", "eof", "127.0.0.1:443", proxyMetrics.ClientProxy) ssMetrics.AddTCPCipherSearch(true, 10*time.Millisecond) ssMetrics.AddUDPCipherSearch(true, 10*time.Millisecond) } @@ -164,11 +164,10 @@ func BenchmarkProbe(b *testing.B) { ssMetrics := newPrometheusOutlineMetrics(nil, prometheus.NewRegistry()) status := "ERR_REPLAY" drainResult := "other" - port := 12345 data := metrics.ProxyMetrics{} b.ResetTimer() for i := 0; i < b.N; i++ { - ssMetrics.AddTCPProbe(status, drainResult, port, data.ClientProxy) + ssMetrics.AddTCPProbe(status, drainResult, "127.0.0.1:12345", data.ClientProxy) } } diff --git a/internal/integration_test/integration_test.go b/internal/integration_test/integration_test.go index 4ca2f120..f98319f4 100644 --- a/internal/integration_test/integration_test.go +++ b/internal/integration_test/integration_test.go @@ -133,7 +133,7 @@ func TestTCPEcho(t *testing.T) { const testTimeout = 200 * time.Millisecond testMetrics := &service.NoOpTCPMetrics{} authFunc := service.NewShadowsocksStreamAuthenticator(cipherList, &replayCache, testMetrics) - handler := service.NewTCPHandler(proxyListener.Addr().(*net.TCPAddr).Port, authFunc, testMetrics, testTimeout) + handler := service.NewTCPHandler(proxyListener.Addr().String(), authFunc, testMetrics, testTimeout) handler.SetTargetDialer(&transport.TCPDialer{}) done := make(chan struct{}) go func() { @@ -202,7 +202,7 @@ func TestRestrictedAddresses(t *testing.T) { const testTimeout = 200 * time.Millisecond testMetrics := &statusMetrics{} authFunc := service.NewShadowsocksStreamAuthenticator(cipherList, nil, testMetrics) - handler := service.NewTCPHandler(proxyListener.Addr().(*net.TCPAddr).Port, authFunc, testMetrics, testTimeout) + handler := service.NewTCPHandler(proxyListener.Addr().String(), authFunc, testMetrics, testTimeout) done := make(chan struct{}) go func() { service.StreamServe(service.WrapStreamListener(proxyListener.AcceptTCP), handler.Handle) @@ -384,7 +384,7 @@ func BenchmarkTCPThroughput(b *testing.B) { const testTimeout = 200 * time.Millisecond testMetrics := &service.NoOpTCPMetrics{} authFunc := service.NewShadowsocksStreamAuthenticator(cipherList, nil, testMetrics) - handler := service.NewTCPHandler(proxyListener.Addr().(*net.TCPAddr).Port, authFunc, testMetrics, testTimeout) + handler := service.NewTCPHandler(proxyListener.Addr().String(), authFunc, testMetrics, testTimeout) handler.SetTargetDialer(&transport.TCPDialer{}) done := make(chan struct{}) go func() { @@ -448,7 +448,7 @@ func BenchmarkTCPMultiplexing(b *testing.B) { const testTimeout = 200 * time.Millisecond testMetrics := &service.NoOpTCPMetrics{} authFunc := service.NewShadowsocksStreamAuthenticator(cipherList, &replayCache, testMetrics) - handler := service.NewTCPHandler(proxyListener.Addr().(*net.TCPAddr).Port, authFunc, testMetrics, testTimeout) + handler := service.NewTCPHandler(proxyListener.Addr().String(), authFunc, testMetrics, testTimeout) handler.SetTargetDialer(&transport.TCPDialer{}) done := make(chan struct{}) go func() { diff --git a/service/tcp.go b/service/tcp.go index 85ab9990..2195480b 100644 --- a/service/tcp.go +++ b/service/tcp.go @@ -44,7 +44,7 @@ type TCPMetrics interface { AddOpenTCPConnection(clientInfo ipinfo.IPInfo) AddAuthenticatedTCPConnection(clientAddr net.Addr, accessKey string) AddClosedTCPConnection(clientInfo ipinfo.IPInfo, clientAddr net.Addr, accessKey string, status string, data metrics.ProxyMetrics, duration time.Duration) - AddTCPProbe(status, drainResult string, port int, clientProxyBytes int64) + AddTCPProbe(status, drainResult string, listenerId string, clientProxyBytes int64) } func remoteIP(conn net.Conn) netip.Addr { @@ -162,7 +162,7 @@ func NewShadowsocksStreamAuthenticator(ciphers CipherList, replayCache *ReplayCa } type tcpHandler struct { - port int + listenerId string m TCPMetrics readTimeout time.Duration authenticate StreamAuthenticateFunc @@ -170,9 +170,9 @@ type tcpHandler struct { } // NewTCPService creates a TCPService -func NewTCPHandler(port int, authenticate StreamAuthenticateFunc, m TCPMetrics, timeout time.Duration) TCPHandler { +func NewTCPHandler(listenerId string, authenticate StreamAuthenticateFunc, m TCPMetrics, timeout time.Duration) TCPHandler { return &tcpHandler{ - port: port, + listenerId: listenerId, m: m, readTimeout: timeout, authenticate: authenticate, @@ -375,7 +375,7 @@ func (h *tcpHandler) absorbProbe(clientConn io.ReadCloser, status string, proxyM _, drainErr := io.Copy(io.Discard, clientConn) // drain socket drainResult := drainErrToString(drainErr) logger.Debugf("Drain error: %v, drain result: %v", drainErr, drainResult) - h.m.AddTCPProbe(status, drainResult, h.port, proxyMetrics.ClientProxy) + h.m.AddTCPProbe(status, drainResult, h.listenerId, proxyMetrics.ClientProxy) } func drainErrToString(drainErr error) string { @@ -404,6 +404,6 @@ func (m *NoOpTCPMetrics) GetIPInfo(net.IP) (ipinfo.IPInfo, error) { func (m *NoOpTCPMetrics) AddOpenTCPConnection(clientInfo ipinfo.IPInfo) {} func (m *NoOpTCPMetrics) AddAuthenticatedTCPConnection(clientAddr net.Addr, accessKey string) { } -func (m *NoOpTCPMetrics) AddTCPProbe(status, drainResult string, port int, clientProxyBytes int64) { +func (m *NoOpTCPMetrics) AddTCPProbe(status, drainResult string, listenerId string, clientProxyBytes int64) { } func (m *NoOpTCPMetrics) AddTCPCipherSearch(accessKeyFound bool, timeToCipher time.Duration) {} diff --git a/service/tcp_test.go b/service/tcp_test.go index 1a70ed67..5c3bc9df 100644 --- a/service/tcp_test.go +++ b/service/tcp_test.go @@ -239,7 +239,7 @@ func (m *probeTestMetrics) AddOpenTCPConnection(clientInfo ipinfo.IPInfo) { func (m *probeTestMetrics) AddAuthenticatedTCPConnection(clientAddr net.Addr, accessKey string) { } -func (m *probeTestMetrics) AddTCPProbe(status, drainResult string, port int, clientProxyBytes int64) { +func (m *probeTestMetrics) AddTCPProbe(status, drainResult string, listenerId string, clientProxyBytes int64) { m.mu.Lock() m.probeData = append(m.probeData, clientProxyBytes) m.probeStatus = append(m.probeStatus, status) @@ -281,7 +281,7 @@ func TestProbeRandom(t *testing.T) { require.NoError(t, err, "MakeTestCiphers failed: %v", err) testMetrics := &probeTestMetrics{} authFunc := NewShadowsocksStreamAuthenticator(cipherList, nil, testMetrics) - handler := NewTCPHandler(listener.Addr().(*net.TCPAddr).Port, authFunc, testMetrics, 200*time.Millisecond) + handler := NewTCPHandler(listener.Addr().String(), authFunc, testMetrics, 200*time.Millisecond) done := make(chan struct{}) go func() { StreamServe(WrapStreamListener(listener.AcceptTCP), handler.Handle) @@ -358,7 +358,7 @@ func TestProbeClientBytesBasicTruncated(t *testing.T) { cipher := firstCipher(cipherList) testMetrics := &probeTestMetrics{} authFunc := NewShadowsocksStreamAuthenticator(cipherList, nil, testMetrics) - handler := NewTCPHandler(listener.Addr().(*net.TCPAddr).Port, authFunc, testMetrics, 200*time.Millisecond) + handler := NewTCPHandler(listener.Addr().String(), authFunc, testMetrics, 200*time.Millisecond) handler.SetTargetDialer(makeValidatingTCPStreamDialer(allowAll)) done := make(chan struct{}) go func() { @@ -393,7 +393,7 @@ func TestProbeClientBytesBasicModified(t *testing.T) { cipher := firstCipher(cipherList) testMetrics := &probeTestMetrics{} authFunc := NewShadowsocksStreamAuthenticator(cipherList, nil, testMetrics) - handler := NewTCPHandler(listener.Addr().(*net.TCPAddr).Port, authFunc, testMetrics, 200*time.Millisecond) + handler := NewTCPHandler(listener.Addr().String(), authFunc, testMetrics, 200*time.Millisecond) handler.SetTargetDialer(makeValidatingTCPStreamDialer(allowAll)) done := make(chan struct{}) go func() { @@ -429,7 +429,7 @@ func TestProbeClientBytesCoalescedModified(t *testing.T) { cipher := firstCipher(cipherList) testMetrics := &probeTestMetrics{} authFunc := NewShadowsocksStreamAuthenticator(cipherList, nil, testMetrics) - handler := NewTCPHandler(listener.Addr().(*net.TCPAddr).Port, authFunc, testMetrics, 200*time.Millisecond) + handler := NewTCPHandler(listener.Addr().String(), authFunc, testMetrics, 200*time.Millisecond) handler.SetTargetDialer(makeValidatingTCPStreamDialer(allowAll)) done := make(chan struct{}) go func() { @@ -472,7 +472,7 @@ func TestProbeServerBytesModified(t *testing.T) { cipher := firstCipher(cipherList) testMetrics := &probeTestMetrics{} authFunc := NewShadowsocksStreamAuthenticator(cipherList, nil, testMetrics) - handler := NewTCPHandler(listener.Addr().(*net.TCPAddr).Port, authFunc, testMetrics, 200*time.Millisecond) + handler := NewTCPHandler(listener.Addr().String(), authFunc, testMetrics, 200*time.Millisecond) done := make(chan struct{}) go func() { StreamServe(WrapStreamListener(listener.AcceptTCP), handler.Handle) @@ -503,7 +503,7 @@ func TestReplayDefense(t *testing.T) { testMetrics := &probeTestMetrics{} const testTimeout = 200 * time.Millisecond authFunc := NewShadowsocksStreamAuthenticator(cipherList, &replayCache, testMetrics) - handler := NewTCPHandler(listener.Addr().(*net.TCPAddr).Port, authFunc, testMetrics, testTimeout) + handler := NewTCPHandler(listener.Addr().String(), authFunc, testMetrics, testTimeout) snapshot := cipherList.SnapshotForClientIP(netip.Addr{}) cipherEntry := snapshot[0].Value.(*CipherEntry) cipher := cipherEntry.CryptoKey @@ -582,7 +582,7 @@ func TestReverseReplayDefense(t *testing.T) { testMetrics := &probeTestMetrics{} const testTimeout = 200 * time.Millisecond authFunc := NewShadowsocksStreamAuthenticator(cipherList, &replayCache, testMetrics) - handler := NewTCPHandler(listener.Addr().(*net.TCPAddr).Port, authFunc, testMetrics, testTimeout) + handler := NewTCPHandler(listener.Addr().String(), authFunc, testMetrics, testTimeout) snapshot := cipherList.SnapshotForClientIP(netip.Addr{}) cipherEntry := snapshot[0].Value.(*CipherEntry) cipher := cipherEntry.CryptoKey @@ -653,7 +653,7 @@ func probeExpectTimeout(t *testing.T, payloadSize int) { require.NoError(t, err, "MakeTestCiphers failed: %v", err) testMetrics := &probeTestMetrics{} authFunc := NewShadowsocksStreamAuthenticator(cipherList, nil, testMetrics) - handler := NewTCPHandler(listener.Addr().(*net.TCPAddr).Port, authFunc, testMetrics, testTimeout) + handler := NewTCPHandler(listener.Addr().String(), authFunc, testMetrics, testTimeout) done := make(chan struct{}) go func() {