From ad58efe47d4e7fe1671c0743ff98fc26191e2584 Mon Sep 17 00:00:00 2001 From: Alessandro Ros Date: Fri, 16 Feb 2024 23:27:27 +0100 Subject: [PATCH] move RTMP tests into internal/servers/rtmp (#3035) --- internal/core/api_test.go | 20 +- internal/core/core_test.go | 52 --- internal/core/metrics_test.go | 4 +- internal/core/rtmp_server_test.go | 385 -------------------- internal/core/rtsp_server_test.go | 68 ++++ internal/highleveltests/rtsp_server_test.go | 5 +- internal/highleveltests/tests_test.go | 52 --- internal/servers/rtmp/conn.go | 2 +- internal/servers/rtmp/server.go | 8 +- internal/servers/rtmp/server_test.go | 290 +++++++++++++++ internal/staticsources/rtmp/source_test.go | 56 +-- internal/staticsources/rtsp/source_test.go | 56 +-- internal/stream/stream.go | 5 +- internal/stream/stream_format.go | 6 +- internal/test/tls_cert.go | 55 +++ 15 files changed, 446 insertions(+), 618 deletions(-) delete mode 100644 internal/core/rtmp_server_test.go create mode 100644 internal/servers/rtmp/server_test.go create mode 100644 internal/test/tls_cert.go diff --git a/internal/core/api_test.go b/internal/core/api_test.go index 2e797393303..620be4fcd17 100644 --- a/internal/core/api_test.go +++ b/internal/core/api_test.go @@ -149,11 +149,11 @@ func TestAPIPathsList(t *testing.T) { }) t.Run("rtsps session", func(t *testing.T) { - serverCertFpath, err := writeTempFile(serverCert) + serverCertFpath, err := writeTempFile(test.TLSCertPub) require.NoError(t, err) defer os.Remove(serverCertFpath) - serverKeyFpath, err := writeTempFile(serverKey) + serverKeyFpath, err := writeTempFile(test.TLSCertKey) require.NoError(t, err) defer os.Remove(serverKeyFpath) @@ -340,11 +340,11 @@ func TestAPIPathsGet(t *testing.T) { } func TestAPIProtocolListGet(t *testing.T) { - serverCertFpath, err := writeTempFile(serverCert) + serverCertFpath, err := writeTempFile(test.TLSCertPub) require.NoError(t, err) defer os.Remove(serverCertFpath) - serverKeyFpath, err := writeTempFile(serverKey) + serverKeyFpath, err := writeTempFile(test.TLSCertKey) require.NoError(t, err) defer os.Remove(serverKeyFpath) @@ -813,11 +813,11 @@ func TestAPIProtocolListGet(t *testing.T) { } func TestAPIProtocolGetNotFound(t *testing.T) { - serverCertFpath, err := writeTempFile(serverCert) + serverCertFpath, err := writeTempFile(test.TLSCertPub) require.NoError(t, err) defer os.Remove(serverCertFpath) - serverKeyFpath, err := writeTempFile(serverKey) + serverKeyFpath, err := writeTempFile(test.TLSCertKey) require.NoError(t, err) defer os.Remove(serverKeyFpath) @@ -913,11 +913,11 @@ func TestAPIProtocolGetNotFound(t *testing.T) { } func TestAPIProtocolKick(t *testing.T) { - serverCertFpath, err := writeTempFile(serverCert) + serverCertFpath, err := writeTempFile(test.TLSCertPub) require.NoError(t, err) defer os.Remove(serverCertFpath) - serverKeyFpath, err := writeTempFile(serverKey) + serverKeyFpath, err := writeTempFile(test.TLSCertKey) require.NoError(t, err) defer os.Remove(serverKeyFpath) @@ -1060,11 +1060,11 @@ func TestAPIProtocolKick(t *testing.T) { } func TestAPIProtocolKickNotFound(t *testing.T) { - serverCertFpath, err := writeTempFile(serverCert) + serverCertFpath, err := writeTempFile(test.TLSCertPub) require.NoError(t, err) defer os.Remove(serverCertFpath) - serverKeyFpath, err := writeTempFile(serverKey) + serverKeyFpath, err := writeTempFile(test.TLSCertKey) require.NoError(t, err) defer os.Remove(serverKeyFpath) diff --git a/internal/core/core_test.go b/internal/core/core_test.go index ccffec9e6c7..5d08993098c 100644 --- a/internal/core/core_test.go +++ b/internal/core/core_test.go @@ -11,58 +11,6 @@ import ( "github.com/stretchr/testify/require" ) -var serverCert = []byte(`-----BEGIN CERTIFICATE----- -MIIDazCCAlOgAwIBAgIUXw1hEC3LFpTsllv7D3ARJyEq7sIwDQYJKoZIhvcNAQEL -BQAwRTELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoM -GEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDAeFw0yMDEyMTMxNzQ0NThaFw0zMDEy -MTExNzQ0NThaMEUxCzAJBgNVBAYTAkFVMRMwEQYDVQQIDApTb21lLVN0YXRlMSEw -HwYDVQQKDBhJbnRlcm5ldCBXaWRnaXRzIFB0eSBMdGQwggEiMA0GCSqGSIb3DQEB -AQUAA4IBDwAwggEKAoIBAQDG8DyyS51810GsGwgWr5rjJK7OE1kTTLSNEEKax8Bj -zOyiaz8rA2JGl2VUEpi2UjDr9Cm7nd+YIEVs91IIBOb7LGqObBh1kGF3u5aZxLkv -NJE+HrLVvUhaDobK2NU+Wibqc/EI3DfUkt1rSINvv9flwTFu1qHeuLWhoySzDKEp -OzYxpFhwjVSokZIjT4Red3OtFz7gl2E6OAWe2qoh5CwLYVdMWtKR0Xuw3BkDPk9I -qkQKx3fqv97LPEzhyZYjDT5WvGrgZ1WDAN3booxXF3oA1H3GHQc4m/vcLatOtb8e -nI59gMQLEbnp08cl873bAuNuM95EZieXTHNbwUnq5iybAgMBAAGjUzBRMB0GA1Ud -DgQWBBQBKhJh8eWu0a4au9X/2fKhkFX2vjAfBgNVHSMEGDAWgBQBKhJh8eWu0a4a -u9X/2fKhkFX2vjAPBgNVHRMBAf8EBTADAQH/MA0GCSqGSIb3DQEBCwUAA4IBAQBj -3aCW0YPKukYgVK9cwN0IbVy/D0C1UPT4nupJcy/E0iC7MXPZ9D/SZxYQoAkdptdO -xfI+RXkpQZLdODNx9uvV+cHyZHZyjtE5ENu/i5Rer2cWI/mSLZm5lUQyx+0KZ2Yu -tEI1bsebDK30msa8QSTn0WidW9XhFnl3gRi4wRdimcQapOWYVs7ih+nAlSvng7NI -XpAyRs8PIEbpDDBMWnldrX4TP6EWYUi49gCp8OUDRREKX3l6Ls1vZ02F34yHIt/7 -7IV/XSKG096bhW+icKBWV0IpcEsgTzPK1J1hMxgjhzIMxGboAeUU+kidthOob6Sd -XQxaORfgM//NzX9LhUPk ------END CERTIFICATE----- -`) - -var serverKey = []byte(`-----BEGIN RSA PRIVATE KEY----- -MIIEogIBAAKCAQEAxvA8skudfNdBrBsIFq+a4ySuzhNZE0y0jRBCmsfAY8zsoms/ -KwNiRpdlVBKYtlIw6/Qpu53fmCBFbPdSCATm+yxqjmwYdZBhd7uWmcS5LzSRPh6y -1b1IWg6GytjVPlom6nPxCNw31JLda0iDb7/X5cExbtah3ri1oaMkswyhKTs2MaRY -cI1UqJGSI0+EXndzrRc+4JdhOjgFntqqIeQsC2FXTFrSkdF7sNwZAz5PSKpECsd3 -6r/eyzxM4cmWIw0+Vrxq4GdVgwDd26KMVxd6ANR9xh0HOJv73C2rTrW/HpyOfYDE -CxG56dPHJfO92wLjbjPeRGYnl0xzW8FJ6uYsmwIDAQABAoIBACi0BKcyQ3HElSJC -kaAao+Uvnzh4yvPg8Nwf5JDIp/uDdTMyIEWLtrLczRWrjGVZYbsVROinP5VfnPTT -kYwkfKINj2u+gC6lsNuPnRuvHXikF8eO/mYvCTur1zZvsQnF5kp4GGwIqr+qoPUP -bB0UMndG1PdpoMryHe+JcrvTrLHDmCeH10TqOwMsQMLHYLkowvxwJWsmTY7/Qr5S -Wm3PPpOcW2i0uyPVuyuv4yD1368fqnqJ8QFsQp1K6QtYsNnJ71Hut1/IoxK/e6hj -5Z+byKtHVtmcLnABuoOT7BhleJNFBksX9sh83jid4tMBgci+zXNeGmgqo2EmaWAb -agQslkECgYEA8B1rzjOHVQx/vwSzDa4XOrpoHQRfyElrGNz9JVBvnoC7AorezBXQ -M9WTHQIFTGMjzD8pb+YJGi3gj93VN51r0SmJRxBaBRh1ZZI9kFiFzngYev8POgD3 -ygmlS3kTHCNxCK/CJkB+/jMBgtPj5ygDpCWVcTSuWlQFphePkW7jaaECgYEA1Blz -ulqgAyJHZaqgcbcCsI2q6m527hVr9pjzNjIVmkwu38yS9RTCgdlbEVVDnS0hoifl -+jVMEGXjF3xjyMvL50BKbQUH+KAa+V4n1WGlnZOxX9TMny8MBjEuSX2+362vQ3BX -4vOlX00gvoc+sY+lrzvfx/OdPCHQGVYzoKCxhLsCgYA07HcviuIAV/HsO2/vyvhp -xF5gTu+BqNUHNOZDDDid+ge+Jre2yfQLCL8VPLXIQW3Jff53IH/PGl+NtjphuLvj -7UDJvgvpZZuymIojP6+2c3gJ3CASC9aR3JBnUzdoE1O9s2eaoMqc4scpe+SWtZYf -3vzSZ+cqF6zrD/Rf/M35IQKBgHTU4E6ShPm09CcoaeC5sp2WK8OevZw/6IyZi78a -r5Oiy18zzO97U/k6xVMy6F+38ILl/2Rn31JZDVJujniY6eSkIVsUHmPxrWoXV1HO -y++U32uuSFiXDcSLarfIsE992MEJLSAynbF1Rsgsr3gXbGiuToJRyxbIeVy7gwzD -94TpAoGAY4/PejWQj9psZfAhyk5dRGra++gYRQ/gK1IIc1g+Dd2/BxbT/RHr05GK -6vwrfjsoRyMWteC1SsNs/CurjfQ/jqCfHNP5XPvxgd5Ec8sRJIiV7V5RTuWJsPu1 -+3K6cnKEyg+0ekYmLertRFIY6SwWmY1fyKgTvxudMcsBY7dC4xs= ------END RSA PRIVATE KEY----- -`) - func writeTempFile(byts []byte) (string, error) { tmpf, err := os.CreateTemp(os.TempDir(), "rtsp-") if err != nil { diff --git a/internal/core/metrics_test.go b/internal/core/metrics_test.go index 02d3f59d94d..d68e7e2f3f5 100644 --- a/internal/core/metrics_test.go +++ b/internal/core/metrics_test.go @@ -42,11 +42,11 @@ func httpPullFile(t *testing.T, hc *http.Client, u string) []byte { } func TestMetrics(t *testing.T) { - serverCertFpath, err := writeTempFile(serverCert) + serverCertFpath, err := writeTempFile(test.TLSCertPub) require.NoError(t, err) defer os.Remove(serverCertFpath) - serverKeyFpath, err := writeTempFile(serverKey) + serverKeyFpath, err := writeTempFile(test.TLSCertKey) require.NoError(t, err) defer os.Remove(serverKeyFpath) diff --git a/internal/core/rtmp_server_test.go b/internal/core/rtmp_server_test.go deleted file mode 100644 index 2fdbc821e3e..00000000000 --- a/internal/core/rtmp_server_test.go +++ /dev/null @@ -1,385 +0,0 @@ -package core //nolint:dupl - -import ( - "context" - "crypto/tls" - "encoding/json" - "net" - "net/http" - "net/url" - "os" - "testing" - "time" - - "github.com/bluenviron/gortsplib/v4/pkg/format" - "github.com/stretchr/testify/require" - - "github.com/bluenviron/mediamtx/internal/protocols/rtmp" - "github.com/bluenviron/mediamtx/internal/test" -) - -type testHTTPAuthenticator struct { - *http.Server -} - -func newTestHTTPAuthenticator(t *testing.T, protocol string, action string) *testHTTPAuthenticator { - firstReceived := false - - ts := &testHTTPAuthenticator{} - - ts.Server = &http.Server{ - Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - require.Equal(t, http.MethodPost, r.Method) - require.Equal(t, "/auth", r.URL.Path) - - var in struct { - IP string `json:"ip"` - User string `json:"user"` - Password string `json:"password"` - Path string `json:"path"` - Protocol string `json:"protocol"` - ID string `json:"id"` - Action string `json:"action"` - Query string `json:"query"` - } - err := json.NewDecoder(r.Body).Decode(&in) - require.NoError(t, err) - - var user string - if action == "publish" { - user = "testpublisher" - } else { - user = "testreader" - } - - if in.IP != "127.0.0.1" || - in.User != user || - in.Password != "testpass" || - in.Path != "teststream" || - in.Protocol != protocol || - (firstReceived && in.ID == "") || - in.Action != action || - (in.Query != "user=testreader&pass=testpass¶m=value" && - in.Query != "user=testpublisher&pass=testpass¶m=value" && - in.Query != "param=value") { - w.WriteHeader(http.StatusBadRequest) - return - } - - firstReceived = true - }), - } - - ln, err := net.Listen("tcp", "127.0.0.1:9120") - require.NoError(t, err) - - go ts.Server.Serve(ln) - - return ts -} - -func (ts *testHTTPAuthenticator) close() { - ts.Server.Shutdown(context.Background()) -} - -func TestRTMPServer(t *testing.T) { - for _, encrypt := range []string{ - "plain", - "tls", - } { - for _, auth := range []string{ - "none", - "internal", - "external", - } { - t.Run("encrypt_"+encrypt+"_auth_"+auth, func(t *testing.T) { - var port string - var conf string - - if encrypt == "plain" { - port = "1935" - - conf = "rtsp: no\n" + - "hls: no\n" - } else { - port = "1936" - - serverCertFpath, err := writeTempFile(serverCert) - require.NoError(t, err) - defer os.Remove(serverCertFpath) - - serverKeyFpath, err := writeTempFile(serverKey) - require.NoError(t, err) - defer os.Remove(serverKeyFpath) - - conf = "rtsp: no\n" + - "hls: no\n" + - "webrtc: no\n" + - "rtmpEncryption: \"yes\"\n" + - "rtmpServerCert: " + serverCertFpath + "\n" + - "rtmpServerKey: " + serverKeyFpath + "\n" - } - - switch auth { - case "none": - conf += "paths:\n" + - " all_others:\n" - - case "internal": - conf += "paths:\n" + - " all_others:\n" + - " publishUser: testpublisher\n" + - " publishPass: testpass\n" + - " publishIPs: [127.0.0.0/16]\n" + - " readUser: testreader\n" + - " readPass: testpass\n" + - " readIPs: [127.0.0.0/16]\n" - - case "external": - conf += "externalAuthenticationURL: http://localhost:9120/auth\n" + - "paths:\n" + - " all_others:\n" - } - - p, ok := newInstance(conf) - require.Equal(t, true, ok) - defer p.Close() - - var a *testHTTPAuthenticator - if auth == "external" { - a = newTestHTTPAuthenticator(t, "rtmp", "publish") - } - - u1, err := url.Parse("rtmp://127.0.0.1:" + port + "/teststream?user=testpublisher&pass=testpass¶m=value") - require.NoError(t, err) - - nconn1, err := func() (net.Conn, error) { - if encrypt == "plain" { - return net.Dial("tcp", u1.Host) - } - return tls.Dial("tcp", u1.Host, &tls.Config{InsecureSkipVerify: true}) - }() - require.NoError(t, err) - defer nconn1.Close() - - conn1, err := rtmp.NewClientConn(nconn1, u1, true) - require.NoError(t, err) - - w, err := rtmp.NewWriter(conn1, test.FormatH264, test.FormatMPEG4Audio) - require.NoError(t, err) - - time.Sleep(500 * time.Millisecond) - - if auth == "external" { - a.close() - a = newTestHTTPAuthenticator(t, "rtmp", "read") - defer a.close() - } - - u2, err := url.Parse("rtmp://127.0.0.1:" + port + "/teststream?user=testreader&pass=testpass¶m=value") - require.NoError(t, err) - - nconn2, err := func() (net.Conn, error) { - if encrypt == "plain" { - return net.Dial("tcp", u2.Host) - } - return tls.Dial("tcp", u2.Host, &tls.Config{InsecureSkipVerify: true}) - }() - require.NoError(t, err) - defer nconn2.Close() - - conn2, err := rtmp.NewClientConn(nconn2, u2, false) - require.NoError(t, err) - - r, err := rtmp.NewReader(conn2) - require.NoError(t, err) - - videoTrack1, audioTrack2 := r.Tracks() - require.Equal(t, test.FormatH264, videoTrack1) - require.Equal(t, test.FormatMPEG4Audio, audioTrack2) - - err = w.WriteH264(0, 0, true, [][]byte{ - {0x05, 0x02, 0x03, 0x04}, // IDR 1 - {0x05, 0x02, 0x03, 0x04}, // IDR 2 - }) - require.NoError(t, err) - - r.OnDataH264(func(pts time.Duration, au [][]byte) { - require.Equal(t, [][]byte{ - test.FormatH264.SPS, - test.FormatH264.PPS, - { // IDR 1 - 0x05, 0x02, 0x03, 0x04, - }, - { // IDR 2 - 0x05, 0x02, 0x03, 0x04, - }, - }, au) - }) - - err = r.Read() - require.NoError(t, err) - }) - } - } -} - -func TestRTMPServerAuthFail(t *testing.T) { - t.Run("publish", func(t *testing.T) { //nolint:dupl - p, ok := newInstance("rtsp: no\n" + - "hls: no\n" + - "webrtc: no\n" + - "paths:\n" + - " all_others:\n" + - " publishUser: testuser2\n" + - " publishPass: testpass\n") - require.Equal(t, true, ok) - defer p.Close() - - u1, err := url.Parse("rtmp://127.0.0.1:1935/teststream?user=testuser&pass=testpass") - require.NoError(t, err) - - nconn1, err := net.Dial("tcp", u1.Host) - require.NoError(t, err) - defer nconn1.Close() - - conn1, err := rtmp.NewClientConn(nconn1, u1, true) - require.NoError(t, err) - - videoTrack := &format.H264{ - PayloadTyp: 96, - SPS: []byte{ - 0x67, 0x64, 0x00, 0x0c, 0xac, 0x3b, 0x50, 0xb0, - 0x4b, 0x42, 0x00, 0x00, 0x03, 0x00, 0x02, 0x00, - 0x00, 0x03, 0x00, 0x3d, 0x08, - }, - PPS: []byte{ - 0x68, 0xee, 0x3c, 0x80, - }, - PacketizationMode: 1, - } - - _, err = rtmp.NewWriter(conn1, videoTrack, nil) - require.NoError(t, err) - - time.Sleep(500 * time.Millisecond) - - u2, err := url.Parse("rtmp://127.0.0.1:1935/teststream") - require.NoError(t, err) - - nconn2, err := net.Dial("tcp", u2.Host) - require.NoError(t, err) - defer nconn2.Close() - - conn2, err := rtmp.NewClientConn(nconn2, u2, false) - require.NoError(t, err) - - _, err = rtmp.NewReader(conn2) - require.EqualError(t, err, "EOF") - }) - - t.Run("publish_external", func(t *testing.T) { - p, ok := newInstance("externalAuthenticationURL: http://localhost:9120/auth\n" + - "paths:\n" + - " all_others:\n") - require.Equal(t, true, ok) - defer p.Close() - - a := newTestHTTPAuthenticator(t, "rtmp", "publish") - defer a.close() - - u1, err := url.Parse("rtmp://127.0.0.1:1935/teststream?user=testuser1&pass=testpass") - require.NoError(t, err) - - nconn1, err := net.Dial("tcp", u1.Host) - require.NoError(t, err) - defer nconn1.Close() - - conn1, err := rtmp.NewClientConn(nconn1, u1, true) - require.NoError(t, err) - - videoTrack := &format.H264{ - PayloadTyp: 96, - SPS: []byte{ - 0x67, 0x64, 0x00, 0x0c, 0xac, 0x3b, 0x50, 0xb0, - 0x4b, 0x42, 0x00, 0x00, 0x03, 0x00, 0x02, 0x00, - 0x00, 0x03, 0x00, 0x3d, 0x08, - }, - PPS: []byte{ - 0x68, 0xee, 0x3c, 0x80, - }, - PacketizationMode: 1, - } - - _, err = rtmp.NewWriter(conn1, videoTrack, nil) - require.NoError(t, err) - - time.Sleep(500 * time.Millisecond) - - u2, err := url.Parse("rtmp://127.0.0.1:1935/teststream") - require.NoError(t, err) - - nconn2, err := net.Dial("tcp", u2.Host) - require.NoError(t, err) - defer nconn2.Close() - - conn2, err := rtmp.NewClientConn(nconn2, u2, false) - require.NoError(t, err) - - _, err = rtmp.NewReader(conn2) - require.EqualError(t, err, "EOF") - }) - - t.Run("read", func(t *testing.T) { //nolint:dupl - p, ok := newInstance("rtsp: no\n" + - "hls: no\n" + - "webrtc: no\n" + - "paths:\n" + - " all_others:\n" + - " readUser: testuser2\n" + - " readPass: testpass\n") - require.Equal(t, true, ok) - defer p.Close() - - u1, err := url.Parse("rtmp://127.0.0.1:1935/teststream") - require.NoError(t, err) - - nconn1, err := net.Dial("tcp", u1.Host) - require.NoError(t, err) - defer nconn1.Close() - - conn1, err := rtmp.NewClientConn(nconn1, u1, true) - require.NoError(t, err) - - videoTrack := &format.H264{ - PayloadTyp: 96, - SPS: []byte{ - 0x67, 0x64, 0x00, 0x0c, 0xac, 0x3b, 0x50, 0xb0, - 0x4b, 0x42, 0x00, 0x00, 0x03, 0x00, 0x02, 0x00, - 0x00, 0x03, 0x00, 0x3d, 0x08, - }, - PPS: []byte{ - 0x68, 0xee, 0x3c, 0x80, - }, - PacketizationMode: 1, - } - - _, err = rtmp.NewWriter(conn1, videoTrack, nil) - require.NoError(t, err) - - time.Sleep(500 * time.Millisecond) - - u2, err := url.Parse("rtmp://127.0.0.1:1935/teststream?user=testuser1&pass=testpass") - require.NoError(t, err) - - nconn2, err := net.Dial("tcp", u2.Host) - require.NoError(t, err) - defer nconn2.Close() - - conn2, err := rtmp.NewClientConn(nconn2, u2, false) - require.NoError(t, err) - - _, err = rtmp.NewReader(conn2) - require.EqualError(t, err, "EOF") - }) -} diff --git a/internal/core/rtsp_server_test.go b/internal/core/rtsp_server_test.go index 672d5172677..276df921b9b 100644 --- a/internal/core/rtsp_server_test.go +++ b/internal/core/rtsp_server_test.go @@ -1,6 +1,10 @@ package core import ( + "context" + "encoding/json" + "net" + "net/http" "testing" "github.com/bluenviron/gortsplib/v4" @@ -10,6 +14,70 @@ import ( "github.com/stretchr/testify/require" ) +type testHTTPAuthenticator struct { + *http.Server +} + +func newTestHTTPAuthenticator(t *testing.T, protocol string, action string) *testHTTPAuthenticator { + firstReceived := false + + ts := &testHTTPAuthenticator{} + + ts.Server = &http.Server{ + Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + require.Equal(t, http.MethodPost, r.Method) + require.Equal(t, "/auth", r.URL.Path) + + var in struct { + IP string `json:"ip"` + User string `json:"user"` + Password string `json:"password"` + Path string `json:"path"` + Protocol string `json:"protocol"` + ID string `json:"id"` + Action string `json:"action"` + Query string `json:"query"` + } + err := json.NewDecoder(r.Body).Decode(&in) + require.NoError(t, err) + + var user string + if action == "publish" { + user = "testpublisher" + } else { + user = "testreader" + } + + if in.IP != "127.0.0.1" || + in.User != user || + in.Password != "testpass" || + in.Path != "teststream" || + in.Protocol != protocol || + (firstReceived && in.ID == "") || + in.Action != action || + (in.Query != "user=testreader&pass=testpass¶m=value" && + in.Query != "user=testpublisher&pass=testpass¶m=value" && + in.Query != "param=value") { + w.WriteHeader(http.StatusBadRequest) + return + } + + firstReceived = true + }), + } + + ln, err := net.Listen("tcp", "127.0.0.1:9120") + require.NoError(t, err) + + go ts.Server.Serve(ln) + + return ts +} + +func (ts *testHTTPAuthenticator) close() { + ts.Server.Shutdown(context.Background()) +} + func TestRTSPServer(t *testing.T) { for _, auth := range []string{ "none", diff --git a/internal/highleveltests/rtsp_server_test.go b/internal/highleveltests/rtsp_server_test.go index 776b69cd685..1f7129a2273 100644 --- a/internal/highleveltests/rtsp_server_test.go +++ b/internal/highleveltests/rtsp_server_test.go @@ -8,6 +8,7 @@ import ( "testing" "time" + "github.com/bluenviron/mediamtx/internal/test" "github.com/stretchr/testify/require" ) @@ -54,11 +55,11 @@ func TestRTSPServerPublishRead(t *testing.T) { proto = "rtsps" port = "8322" - serverCertFpath, err := writeTempFile(serverCert) + serverCertFpath, err := writeTempFile(test.TLSCertPub) require.NoError(t, err) defer os.Remove(serverCertFpath) - serverKeyFpath, err := writeTempFile(serverKey) + serverKeyFpath, err := writeTempFile(test.TLSCertKey) require.NoError(t, err) defer os.Remove(serverKeyFpath) diff --git a/internal/highleveltests/tests_test.go b/internal/highleveltests/tests_test.go index 2f8b24c549d..e2c9472ceb5 100644 --- a/internal/highleveltests/tests_test.go +++ b/internal/highleveltests/tests_test.go @@ -12,58 +12,6 @@ import ( "github.com/bluenviron/mediamtx/internal/core" ) -var serverCert = []byte(`-----BEGIN CERTIFICATE----- -MIIDazCCAlOgAwIBAgIUXw1hEC3LFpTsllv7D3ARJyEq7sIwDQYJKoZIhvcNAQEL -BQAwRTELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoM -GEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDAeFw0yMDEyMTMxNzQ0NThaFw0zMDEy -MTExNzQ0NThaMEUxCzAJBgNVBAYTAkFVMRMwEQYDVQQIDApTb21lLVN0YXRlMSEw -HwYDVQQKDBhJbnRlcm5ldCBXaWRnaXRzIFB0eSBMdGQwggEiMA0GCSqGSIb3DQEB -AQUAA4IBDwAwggEKAoIBAQDG8DyyS51810GsGwgWr5rjJK7OE1kTTLSNEEKax8Bj -zOyiaz8rA2JGl2VUEpi2UjDr9Cm7nd+YIEVs91IIBOb7LGqObBh1kGF3u5aZxLkv -NJE+HrLVvUhaDobK2NU+Wibqc/EI3DfUkt1rSINvv9flwTFu1qHeuLWhoySzDKEp -OzYxpFhwjVSokZIjT4Red3OtFz7gl2E6OAWe2qoh5CwLYVdMWtKR0Xuw3BkDPk9I -qkQKx3fqv97LPEzhyZYjDT5WvGrgZ1WDAN3booxXF3oA1H3GHQc4m/vcLatOtb8e -nI59gMQLEbnp08cl873bAuNuM95EZieXTHNbwUnq5iybAgMBAAGjUzBRMB0GA1Ud -DgQWBBQBKhJh8eWu0a4au9X/2fKhkFX2vjAfBgNVHSMEGDAWgBQBKhJh8eWu0a4a -u9X/2fKhkFX2vjAPBgNVHRMBAf8EBTADAQH/MA0GCSqGSIb3DQEBCwUAA4IBAQBj -3aCW0YPKukYgVK9cwN0IbVy/D0C1UPT4nupJcy/E0iC7MXPZ9D/SZxYQoAkdptdO -xfI+RXkpQZLdODNx9uvV+cHyZHZyjtE5ENu/i5Rer2cWI/mSLZm5lUQyx+0KZ2Yu -tEI1bsebDK30msa8QSTn0WidW9XhFnl3gRi4wRdimcQapOWYVs7ih+nAlSvng7NI -XpAyRs8PIEbpDDBMWnldrX4TP6EWYUi49gCp8OUDRREKX3l6Ls1vZ02F34yHIt/7 -7IV/XSKG096bhW+icKBWV0IpcEsgTzPK1J1hMxgjhzIMxGboAeUU+kidthOob6Sd -XQxaORfgM//NzX9LhUPk ------END CERTIFICATE----- -`) - -var serverKey = []byte(`-----BEGIN RSA PRIVATE KEY----- -MIIEogIBAAKCAQEAxvA8skudfNdBrBsIFq+a4ySuzhNZE0y0jRBCmsfAY8zsoms/ -KwNiRpdlVBKYtlIw6/Qpu53fmCBFbPdSCATm+yxqjmwYdZBhd7uWmcS5LzSRPh6y -1b1IWg6GytjVPlom6nPxCNw31JLda0iDb7/X5cExbtah3ri1oaMkswyhKTs2MaRY -cI1UqJGSI0+EXndzrRc+4JdhOjgFntqqIeQsC2FXTFrSkdF7sNwZAz5PSKpECsd3 -6r/eyzxM4cmWIw0+Vrxq4GdVgwDd26KMVxd6ANR9xh0HOJv73C2rTrW/HpyOfYDE -CxG56dPHJfO92wLjbjPeRGYnl0xzW8FJ6uYsmwIDAQABAoIBACi0BKcyQ3HElSJC -kaAao+Uvnzh4yvPg8Nwf5JDIp/uDdTMyIEWLtrLczRWrjGVZYbsVROinP5VfnPTT -kYwkfKINj2u+gC6lsNuPnRuvHXikF8eO/mYvCTur1zZvsQnF5kp4GGwIqr+qoPUP -bB0UMndG1PdpoMryHe+JcrvTrLHDmCeH10TqOwMsQMLHYLkowvxwJWsmTY7/Qr5S -Wm3PPpOcW2i0uyPVuyuv4yD1368fqnqJ8QFsQp1K6QtYsNnJ71Hut1/IoxK/e6hj -5Z+byKtHVtmcLnABuoOT7BhleJNFBksX9sh83jid4tMBgci+zXNeGmgqo2EmaWAb -agQslkECgYEA8B1rzjOHVQx/vwSzDa4XOrpoHQRfyElrGNz9JVBvnoC7AorezBXQ -M9WTHQIFTGMjzD8pb+YJGi3gj93VN51r0SmJRxBaBRh1ZZI9kFiFzngYev8POgD3 -ygmlS3kTHCNxCK/CJkB+/jMBgtPj5ygDpCWVcTSuWlQFphePkW7jaaECgYEA1Blz -ulqgAyJHZaqgcbcCsI2q6m527hVr9pjzNjIVmkwu38yS9RTCgdlbEVVDnS0hoifl -+jVMEGXjF3xjyMvL50BKbQUH+KAa+V4n1WGlnZOxX9TMny8MBjEuSX2+362vQ3BX -4vOlX00gvoc+sY+lrzvfx/OdPCHQGVYzoKCxhLsCgYA07HcviuIAV/HsO2/vyvhp -xF5gTu+BqNUHNOZDDDid+ge+Jre2yfQLCL8VPLXIQW3Jff53IH/PGl+NtjphuLvj -7UDJvgvpZZuymIojP6+2c3gJ3CASC9aR3JBnUzdoE1O9s2eaoMqc4scpe+SWtZYf -3vzSZ+cqF6zrD/Rf/M35IQKBgHTU4E6ShPm09CcoaeC5sp2WK8OevZw/6IyZi78a -r5Oiy18zzO97U/k6xVMy6F+38ILl/2Rn31JZDVJujniY6eSkIVsUHmPxrWoXV1HO -y++U32uuSFiXDcSLarfIsE992MEJLSAynbF1Rsgsr3gXbGiuToJRyxbIeVy7gwzD -94TpAoGAY4/PejWQj9psZfAhyk5dRGra++gYRQ/gK1IIc1g+Dd2/BxbT/RHr05GK -6vwrfjsoRyMWteC1SsNs/CurjfQ/jqCfHNP5XPvxgd5Ec8sRJIiV7V5RTuWJsPu1 -+3K6cnKEyg+0ekYmLertRFIY6SwWmY1fyKgTvxudMcsBY7dC4xs= ------END RSA PRIVATE KEY----- -`) - func writeTempFile(byts []byte) (string, error) { tmpf, err := os.CreateTemp(os.TempDir(), "rtsp-") if err != nil { diff --git a/internal/servers/rtmp/conn.go b/internal/servers/rtmp/conn.go index 0af7e4bc826..f4b4774b9b2 100644 --- a/internal/servers/rtmp/conn.go +++ b/internal/servers/rtmp/conn.go @@ -63,7 +63,7 @@ type conn struct { wg *sync.WaitGroup nconn net.Conn externalCmdPool *externalcmd.Pool - pathManager defs.PathManager + pathManager serverPathManager parent *Server ctx context.Context diff --git a/internal/servers/rtmp/server.go b/internal/servers/rtmp/server.go index 40ec802701a..8f492805618 100644 --- a/internal/servers/rtmp/server.go +++ b/internal/servers/rtmp/server.go @@ -17,6 +17,7 @@ import ( "github.com/bluenviron/mediamtx/internal/externalcmd" "github.com/bluenviron/mediamtx/internal/logger" "github.com/bluenviron/mediamtx/internal/restrictnetwork" + "github.com/bluenviron/mediamtx/internal/stream" ) // ErrConnNotFound is returned when a connection is not found. @@ -50,6 +51,11 @@ type serverAPIConnsKickReq struct { res chan serverAPIConnsKickRes } +type serverPathManager interface { + AddPublisher(req defs.PathAddPublisherReq) (defs.Path, error) + AddReader(req defs.PathAddReaderReq) (defs.Path, *stream.Stream, error) +} + type serverParent interface { logger.Writer } @@ -68,7 +74,7 @@ type Server struct { RunOnConnectRestart bool RunOnDisconnect string ExternalCmdPool *externalcmd.Pool - PathManager defs.PathManager + PathManager serverPathManager Parent serverParent ctx context.Context diff --git a/internal/servers/rtmp/server_test.go b/internal/servers/rtmp/server_test.go new file mode 100644 index 00000000000..7d256c1a584 --- /dev/null +++ b/internal/servers/rtmp/server_test.go @@ -0,0 +1,290 @@ +package rtmp + +import ( + "crypto/tls" + "net" + "net/url" + "os" + "testing" + "time" + + "github.com/bluenviron/gortsplib/v4/pkg/description" + "github.com/bluenviron/gortsplib/v4/pkg/format" + "github.com/bluenviron/mediamtx/internal/asyncwriter" + "github.com/bluenviron/mediamtx/internal/conf" + "github.com/bluenviron/mediamtx/internal/defs" + "github.com/bluenviron/mediamtx/internal/externalcmd" + "github.com/bluenviron/mediamtx/internal/protocols/rtmp" + "github.com/bluenviron/mediamtx/internal/stream" + "github.com/bluenviron/mediamtx/internal/test" + "github.com/bluenviron/mediamtx/internal/unit" + "github.com/stretchr/testify/require" +) + +func writeTempFile(byts []byte) (string, error) { + tmpf, err := os.CreateTemp(os.TempDir(), "rtsp-") + if err != nil { + return "", err + } + defer tmpf.Close() + + _, err = tmpf.Write(byts) + if err != nil { + return "", err + } + + return tmpf.Name(), nil +} + +type dummyPath struct { + stream *stream.Stream + streamCreated chan struct{} +} + +func (p *dummyPath) Name() string { + return "teststream" +} + +func (p *dummyPath) SafeConf() *conf.Path { + return &conf.Path{} +} + +func (p *dummyPath) ExternalCmdEnv() externalcmd.Environment { + return externalcmd.Environment{} +} + +func (p *dummyPath) StartPublisher(req defs.PathStartPublisherReq) (*stream.Stream, error) { + var err error + p.stream, err = stream.New( + 1460, + req.Desc, + true, + test.NilLogger{}, + ) + if err != nil { + return nil, err + } + close(p.streamCreated) + return p.stream, nil +} + +func (p *dummyPath) StopPublisher(_ defs.PathStopPublisherReq) { +} + +func (p *dummyPath) RemovePublisher(_ defs.PathRemovePublisherReq) { +} + +func (p *dummyPath) RemoveReader(_ defs.PathRemoveReaderReq) { +} + +type dummyPathManager struct { + path *dummyPath +} + +func (pm *dummyPathManager) AddPublisher(_ defs.PathAddPublisherReq) (defs.Path, error) { + return pm.path, nil +} + +func (pm *dummyPathManager) AddReader(_ defs.PathAddReaderReq) (defs.Path, *stream.Stream, error) { + return pm.path, pm.path.stream, nil +} + +func TestServerPublish(t *testing.T) { + for _, encrypt := range []string{ + "plain", + "tls", + } { + t.Run(encrypt, func(t *testing.T) { + var serverCertFpath string + var serverKeyFpath string + + if encrypt == "tls" { + var err error + serverCertFpath, err = writeTempFile(test.TLSCertPub) + require.NoError(t, err) + defer os.Remove(serverCertFpath) + + serverKeyFpath, err = writeTempFile(test.TLSCertKey) + require.NoError(t, err) + defer os.Remove(serverKeyFpath) + } + + path := &dummyPath{ + streamCreated: make(chan struct{}), + } + + pathManager := &dummyPathManager{path: path} + + s := &Server{ + Address: "127.0.0.1:1935", + ReadTimeout: conf.StringDuration(10 * time.Second), + WriteTimeout: conf.StringDuration(10 * time.Second), + WriteQueueSize: 512, + IsTLS: encrypt == "tls", + ServerCert: serverCertFpath, + ServerKey: serverKeyFpath, + RTSPAddress: "", + RunOnConnect: "", + RunOnConnectRestart: false, + RunOnDisconnect: "", + ExternalCmdPool: nil, + PathManager: pathManager, + Parent: &test.NilLogger{}, + } + err := s.Initialize() + require.NoError(t, err) + defer s.Close() + + u, err := url.Parse("rtmp://127.0.0.1:1935/teststream?user=testpublisher&pass=testpass¶m=value") + require.NoError(t, err) + + nconn, err := func() (net.Conn, error) { + if encrypt == "plain" { + return net.Dial("tcp", u.Host) + } + return tls.Dial("tcp", u.Host, &tls.Config{InsecureSkipVerify: true}) + }() + require.NoError(t, err) + defer nconn.Close() + + conn, err := rtmp.NewClientConn(nconn, u, true) + require.NoError(t, err) + + w, err := rtmp.NewWriter(conn, test.FormatH264, test.FormatMPEG4Audio) + require.NoError(t, err) + + <-path.streamCreated + + aw := asyncwriter.New(512, &test.NilLogger{}) + + recv := make(chan struct{}) + + path.stream.AddReader(aw, + path.stream.Desc().Medias[0], + path.stream.Desc().Medias[0].Formats[0], + func(u unit.Unit) error { + require.Equal(t, [][]byte{ + test.FormatH264.SPS, + test.FormatH264.PPS, + {5, 2, 3, 4}, + }, u.(*unit.H264).AU) + close(recv) + return nil + }) + + err = w.WriteH264(0, 0, true, [][]byte{ + {5, 2, 3, 4}, + }) + require.NoError(t, err) + + aw.Start() + + <-recv + + aw.Stop() + }) + } +} + +func TestServerRead(t *testing.T) { + for _, encrypt := range []string{ + "plain", + "tls", + } { + t.Run(encrypt, func(t *testing.T) { + var serverCertFpath string + var serverKeyFpath string + + if encrypt == "tls" { + var err error + serverCertFpath, err = writeTempFile(test.TLSCertPub) + require.NoError(t, err) + defer os.Remove(serverCertFpath) + + serverKeyFpath, err = writeTempFile(test.TLSCertKey) + require.NoError(t, err) + defer os.Remove(serverKeyFpath) + } + + testMediaH264 := &description.Media{ + Type: description.MediaTypeVideo, + Formats: []format.Format{test.FormatH264}, + } + + desc := &description.Session{Medias: []*description.Media{testMediaH264}} + + stream, err := stream.New( + 1460, + desc, + true, + test.NilLogger{}, + ) + require.NoError(t, err) + + path := &dummyPath{stream: stream} + + pathManager := &dummyPathManager{path: path} + + s := &Server{ + Address: "127.0.0.1:1935", + ReadTimeout: conf.StringDuration(10 * time.Second), + WriteTimeout: conf.StringDuration(10 * time.Second), + WriteQueueSize: 512, + IsTLS: encrypt == "tls", + ServerCert: serverCertFpath, + ServerKey: serverKeyFpath, + RTSPAddress: "", + RunOnConnect: "", + RunOnConnectRestart: false, + RunOnDisconnect: "", + ExternalCmdPool: nil, + PathManager: pathManager, + Parent: &test.NilLogger{}, + } + err = s.Initialize() + require.NoError(t, err) + defer s.Close() + + u, err := url.Parse("rtmp://127.0.0.1:1935/teststream?user=testreader&pass=testpass¶m=value") + require.NoError(t, err) + + nconn, err := func() (net.Conn, error) { + if encrypt == "plain" { + return net.Dial("tcp", u.Host) + } + return tls.Dial("tcp", u.Host, &tls.Config{InsecureSkipVerify: true}) + }() + require.NoError(t, err) + defer nconn.Close() + + conn, err := rtmp.NewClientConn(nconn, u, false) + require.NoError(t, err) + + r, err := rtmp.NewReader(conn) + require.NoError(t, err) + + videoTrack, _ := r.Tracks() + require.Equal(t, test.FormatH264, videoTrack) + + stream.WriteUnit(desc.Medias[0], desc.Medias[0].Formats[0], &unit.H264{ + Base: unit.Base{ + NTP: time.Time{}, + }, + AU: [][]byte{ + {5, 2, 3, 4}, // IDR + }, + }) + + r.OnDataH264(func(pts time.Duration, au [][]byte) { + require.Equal(t, [][]byte{ + test.FormatH264.SPS, + test.FormatH264.PPS, + {5, 2, 3, 4}, + }, au) + }) + + err = r.Read() + require.NoError(t, err) + }) + } +} diff --git a/internal/staticsources/rtmp/source_test.go b/internal/staticsources/rtmp/source_test.go index c14dd5fd74e..963be165662 100644 --- a/internal/staticsources/rtmp/source_test.go +++ b/internal/staticsources/rtmp/source_test.go @@ -15,58 +15,6 @@ import ( "github.com/bluenviron/mediamtx/internal/test" ) -var serverCert = []byte(`-----BEGIN CERTIFICATE----- -MIIDazCCAlOgAwIBAgIUXw1hEC3LFpTsllv7D3ARJyEq7sIwDQYJKoZIhvcNAQEL -BQAwRTELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoM -GEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDAeFw0yMDEyMTMxNzQ0NThaFw0zMDEy -MTExNzQ0NThaMEUxCzAJBgNVBAYTAkFVMRMwEQYDVQQIDApTb21lLVN0YXRlMSEw -HwYDVQQKDBhJbnRlcm5ldCBXaWRnaXRzIFB0eSBMdGQwggEiMA0GCSqGSIb3DQEB -AQUAA4IBDwAwggEKAoIBAQDG8DyyS51810GsGwgWr5rjJK7OE1kTTLSNEEKax8Bj -zOyiaz8rA2JGl2VUEpi2UjDr9Cm7nd+YIEVs91IIBOb7LGqObBh1kGF3u5aZxLkv -NJE+HrLVvUhaDobK2NU+Wibqc/EI3DfUkt1rSINvv9flwTFu1qHeuLWhoySzDKEp -OzYxpFhwjVSokZIjT4Red3OtFz7gl2E6OAWe2qoh5CwLYVdMWtKR0Xuw3BkDPk9I -qkQKx3fqv97LPEzhyZYjDT5WvGrgZ1WDAN3booxXF3oA1H3GHQc4m/vcLatOtb8e -nI59gMQLEbnp08cl873bAuNuM95EZieXTHNbwUnq5iybAgMBAAGjUzBRMB0GA1Ud -DgQWBBQBKhJh8eWu0a4au9X/2fKhkFX2vjAfBgNVHSMEGDAWgBQBKhJh8eWu0a4a -u9X/2fKhkFX2vjAPBgNVHRMBAf8EBTADAQH/MA0GCSqGSIb3DQEBCwUAA4IBAQBj -3aCW0YPKukYgVK9cwN0IbVy/D0C1UPT4nupJcy/E0iC7MXPZ9D/SZxYQoAkdptdO -xfI+RXkpQZLdODNx9uvV+cHyZHZyjtE5ENu/i5Rer2cWI/mSLZm5lUQyx+0KZ2Yu -tEI1bsebDK30msa8QSTn0WidW9XhFnl3gRi4wRdimcQapOWYVs7ih+nAlSvng7NI -XpAyRs8PIEbpDDBMWnldrX4TP6EWYUi49gCp8OUDRREKX3l6Ls1vZ02F34yHIt/7 -7IV/XSKG096bhW+icKBWV0IpcEsgTzPK1J1hMxgjhzIMxGboAeUU+kidthOob6Sd -XQxaORfgM//NzX9LhUPk ------END CERTIFICATE----- -`) - -var serverKey = []byte(`-----BEGIN RSA PRIVATE KEY----- -MIIEogIBAAKCAQEAxvA8skudfNdBrBsIFq+a4ySuzhNZE0y0jRBCmsfAY8zsoms/ -KwNiRpdlVBKYtlIw6/Qpu53fmCBFbPdSCATm+yxqjmwYdZBhd7uWmcS5LzSRPh6y -1b1IWg6GytjVPlom6nPxCNw31JLda0iDb7/X5cExbtah3ri1oaMkswyhKTs2MaRY -cI1UqJGSI0+EXndzrRc+4JdhOjgFntqqIeQsC2FXTFrSkdF7sNwZAz5PSKpECsd3 -6r/eyzxM4cmWIw0+Vrxq4GdVgwDd26KMVxd6ANR9xh0HOJv73C2rTrW/HpyOfYDE -CxG56dPHJfO92wLjbjPeRGYnl0xzW8FJ6uYsmwIDAQABAoIBACi0BKcyQ3HElSJC -kaAao+Uvnzh4yvPg8Nwf5JDIp/uDdTMyIEWLtrLczRWrjGVZYbsVROinP5VfnPTT -kYwkfKINj2u+gC6lsNuPnRuvHXikF8eO/mYvCTur1zZvsQnF5kp4GGwIqr+qoPUP -bB0UMndG1PdpoMryHe+JcrvTrLHDmCeH10TqOwMsQMLHYLkowvxwJWsmTY7/Qr5S -Wm3PPpOcW2i0uyPVuyuv4yD1368fqnqJ8QFsQp1K6QtYsNnJ71Hut1/IoxK/e6hj -5Z+byKtHVtmcLnABuoOT7BhleJNFBksX9sh83jid4tMBgci+zXNeGmgqo2EmaWAb -agQslkECgYEA8B1rzjOHVQx/vwSzDa4XOrpoHQRfyElrGNz9JVBvnoC7AorezBXQ -M9WTHQIFTGMjzD8pb+YJGi3gj93VN51r0SmJRxBaBRh1ZZI9kFiFzngYev8POgD3 -ygmlS3kTHCNxCK/CJkB+/jMBgtPj5ygDpCWVcTSuWlQFphePkW7jaaECgYEA1Blz -ulqgAyJHZaqgcbcCsI2q6m527hVr9pjzNjIVmkwu38yS9RTCgdlbEVVDnS0hoifl -+jVMEGXjF3xjyMvL50BKbQUH+KAa+V4n1WGlnZOxX9TMny8MBjEuSX2+362vQ3BX -4vOlX00gvoc+sY+lrzvfx/OdPCHQGVYzoKCxhLsCgYA07HcviuIAV/HsO2/vyvhp -xF5gTu+BqNUHNOZDDDid+ge+Jre2yfQLCL8VPLXIQW3Jff53IH/PGl+NtjphuLvj -7UDJvgvpZZuymIojP6+2c3gJ3CASC9aR3JBnUzdoE1O9s2eaoMqc4scpe+SWtZYf -3vzSZ+cqF6zrD/Rf/M35IQKBgHTU4E6ShPm09CcoaeC5sp2WK8OevZw/6IyZi78a -r5Oiy18zzO97U/k6xVMy6F+38ILl/2Rn31JZDVJujniY6eSkIVsUHmPxrWoXV1HO -y++U32uuSFiXDcSLarfIsE992MEJLSAynbF1Rsgsr3gXbGiuToJRyxbIeVy7gwzD -94TpAoGAY4/PejWQj9psZfAhyk5dRGra++gYRQ/gK1IIc1g+Dd2/BxbT/RHr05GK -6vwrfjsoRyMWteC1SsNs/CurjfQ/jqCfHNP5XPvxgd5Ec8sRJIiV7V5RTuWJsPu1 -+3K6cnKEyg+0ekYmLertRFIY6SwWmY1fyKgTvxudMcsBY7dC4xs= ------END RSA PRIVATE KEY----- -`) - func writeTempFile(byts []byte) (string, error) { tmpf, err := os.CreateTemp(os.TempDir(), "rtsp-") if err != nil { @@ -93,11 +41,11 @@ func TestSource(t *testing.T) { return net.Listen("tcp", "127.0.0.1:1935") } - serverCertFpath, err := writeTempFile(serverCert) + serverCertFpath, err := writeTempFile(test.TLSCertPub) require.NoError(t, err) defer os.Remove(serverCertFpath) - serverKeyFpath, err := writeTempFile(serverKey) + serverKeyFpath, err := writeTempFile(test.TLSCertKey) require.NoError(t, err) defer os.Remove(serverKeyFpath) diff --git a/internal/staticsources/rtsp/source_test.go b/internal/staticsources/rtsp/source_test.go index be46aeb65f5..420cad5226a 100644 --- a/internal/staticsources/rtsp/source_test.go +++ b/internal/staticsources/rtsp/source_test.go @@ -19,58 +19,6 @@ import ( "github.com/bluenviron/mediamtx/internal/test" ) -var serverCert = []byte(`-----BEGIN CERTIFICATE----- -MIIDazCCAlOgAwIBAgIUXw1hEC3LFpTsllv7D3ARJyEq7sIwDQYJKoZIhvcNAQEL -BQAwRTELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoM -GEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDAeFw0yMDEyMTMxNzQ0NThaFw0zMDEy -MTExNzQ0NThaMEUxCzAJBgNVBAYTAkFVMRMwEQYDVQQIDApTb21lLVN0YXRlMSEw -HwYDVQQKDBhJbnRlcm5ldCBXaWRnaXRzIFB0eSBMdGQwggEiMA0GCSqGSIb3DQEB -AQUAA4IBDwAwggEKAoIBAQDG8DyyS51810GsGwgWr5rjJK7OE1kTTLSNEEKax8Bj -zOyiaz8rA2JGl2VUEpi2UjDr9Cm7nd+YIEVs91IIBOb7LGqObBh1kGF3u5aZxLkv -NJE+HrLVvUhaDobK2NU+Wibqc/EI3DfUkt1rSINvv9flwTFu1qHeuLWhoySzDKEp -OzYxpFhwjVSokZIjT4Red3OtFz7gl2E6OAWe2qoh5CwLYVdMWtKR0Xuw3BkDPk9I -qkQKx3fqv97LPEzhyZYjDT5WvGrgZ1WDAN3booxXF3oA1H3GHQc4m/vcLatOtb8e -nI59gMQLEbnp08cl873bAuNuM95EZieXTHNbwUnq5iybAgMBAAGjUzBRMB0GA1Ud -DgQWBBQBKhJh8eWu0a4au9X/2fKhkFX2vjAfBgNVHSMEGDAWgBQBKhJh8eWu0a4a -u9X/2fKhkFX2vjAPBgNVHRMBAf8EBTADAQH/MA0GCSqGSIb3DQEBCwUAA4IBAQBj -3aCW0YPKukYgVK9cwN0IbVy/D0C1UPT4nupJcy/E0iC7MXPZ9D/SZxYQoAkdptdO -xfI+RXkpQZLdODNx9uvV+cHyZHZyjtE5ENu/i5Rer2cWI/mSLZm5lUQyx+0KZ2Yu -tEI1bsebDK30msa8QSTn0WidW9XhFnl3gRi4wRdimcQapOWYVs7ih+nAlSvng7NI -XpAyRs8PIEbpDDBMWnldrX4TP6EWYUi49gCp8OUDRREKX3l6Ls1vZ02F34yHIt/7 -7IV/XSKG096bhW+icKBWV0IpcEsgTzPK1J1hMxgjhzIMxGboAeUU+kidthOob6Sd -XQxaORfgM//NzX9LhUPk ------END CERTIFICATE----- -`) - -var serverKey = []byte(`-----BEGIN RSA PRIVATE KEY----- -MIIEogIBAAKCAQEAxvA8skudfNdBrBsIFq+a4ySuzhNZE0y0jRBCmsfAY8zsoms/ -KwNiRpdlVBKYtlIw6/Qpu53fmCBFbPdSCATm+yxqjmwYdZBhd7uWmcS5LzSRPh6y -1b1IWg6GytjVPlom6nPxCNw31JLda0iDb7/X5cExbtah3ri1oaMkswyhKTs2MaRY -cI1UqJGSI0+EXndzrRc+4JdhOjgFntqqIeQsC2FXTFrSkdF7sNwZAz5PSKpECsd3 -6r/eyzxM4cmWIw0+Vrxq4GdVgwDd26KMVxd6ANR9xh0HOJv73C2rTrW/HpyOfYDE -CxG56dPHJfO92wLjbjPeRGYnl0xzW8FJ6uYsmwIDAQABAoIBACi0BKcyQ3HElSJC -kaAao+Uvnzh4yvPg8Nwf5JDIp/uDdTMyIEWLtrLczRWrjGVZYbsVROinP5VfnPTT -kYwkfKINj2u+gC6lsNuPnRuvHXikF8eO/mYvCTur1zZvsQnF5kp4GGwIqr+qoPUP -bB0UMndG1PdpoMryHe+JcrvTrLHDmCeH10TqOwMsQMLHYLkowvxwJWsmTY7/Qr5S -Wm3PPpOcW2i0uyPVuyuv4yD1368fqnqJ8QFsQp1K6QtYsNnJ71Hut1/IoxK/e6hj -5Z+byKtHVtmcLnABuoOT7BhleJNFBksX9sh83jid4tMBgci+zXNeGmgqo2EmaWAb -agQslkECgYEA8B1rzjOHVQx/vwSzDa4XOrpoHQRfyElrGNz9JVBvnoC7AorezBXQ -M9WTHQIFTGMjzD8pb+YJGi3gj93VN51r0SmJRxBaBRh1ZZI9kFiFzngYev8POgD3 -ygmlS3kTHCNxCK/CJkB+/jMBgtPj5ygDpCWVcTSuWlQFphePkW7jaaECgYEA1Blz -ulqgAyJHZaqgcbcCsI2q6m527hVr9pjzNjIVmkwu38yS9RTCgdlbEVVDnS0hoifl -+jVMEGXjF3xjyMvL50BKbQUH+KAa+V4n1WGlnZOxX9TMny8MBjEuSX2+362vQ3BX -4vOlX00gvoc+sY+lrzvfx/OdPCHQGVYzoKCxhLsCgYA07HcviuIAV/HsO2/vyvhp -xF5gTu+BqNUHNOZDDDid+ge+Jre2yfQLCL8VPLXIQW3Jff53IH/PGl+NtjphuLvj -7UDJvgvpZZuymIojP6+2c3gJ3CASC9aR3JBnUzdoE1O9s2eaoMqc4scpe+SWtZYf -3vzSZ+cqF6zrD/Rf/M35IQKBgHTU4E6ShPm09CcoaeC5sp2WK8OevZw/6IyZi78a -r5Oiy18zzO97U/k6xVMy6F+38ILl/2Rn31JZDVJujniY6eSkIVsUHmPxrWoXV1HO -y++U32uuSFiXDcSLarfIsE992MEJLSAynbF1Rsgsr3gXbGiuToJRyxbIeVy7gwzD -94TpAoGAY4/PejWQj9psZfAhyk5dRGra++gYRQ/gK1IIc1g+Dd2/BxbT/RHr05GK -6vwrfjsoRyMWteC1SsNs/CurjfQ/jqCfHNP5XPvxgd5Ec8sRJIiV7V5RTuWJsPu1 -+3K6cnKEyg+0ekYmLertRFIY6SwWmY1fyKgTvxudMcsBY7dC4xs= ------END RSA PRIVATE KEY----- -`) - func writeTempFile(byts []byte) (string, error) { tmpf, err := os.CreateTemp(os.TempDir(), "rtsp-") if err != nil { @@ -176,11 +124,11 @@ func TestSource(t *testing.T) { s.UDPRTCPAddress = "127.0.0.1:8003" case "tls": - serverCertFpath, err := writeTempFile(serverCert) + serverCertFpath, err := writeTempFile(test.TLSCertPub) require.NoError(t, err) defer os.Remove(serverCertFpath) - serverKeyFpath, err := writeTempFile(serverKey) + serverKeyFpath, err := writeTempFile(test.TLSCertKey) require.NoError(t, err) defer os.Remove(serverKeyFpath) diff --git a/internal/stream/stream.go b/internal/stream/stream.go index a95f1ae9930..e92bffeb4ae 100644 --- a/internal/stream/stream.go +++ b/internal/stream/stream.go @@ -16,7 +16,8 @@ import ( "github.com/bluenviron/mediamtx/internal/unit" ) -type readerFunc func(unit.Unit) error +// ReadFunc is the callback passed to AddReader(). +type ReadFunc func(unit.Unit) error // Stream is a media stream. // It stores tracks, readers and allows to write data to readers. @@ -115,7 +116,7 @@ func (s *Stream) RTSPSStream(server *gortsplib.Server) *gortsplib.ServerStream { } // AddReader adds a reader. -func (s *Stream) AddReader(r *asyncwriter.Writer, medi *description.Media, forma format.Format, cb readerFunc) { +func (s *Stream) AddReader(r *asyncwriter.Writer, medi *description.Media, forma format.Format, cb ReadFunc) { s.mutex.Lock() defer s.mutex.Unlock() diff --git a/internal/stream/stream_format.go b/internal/stream/stream_format.go index 8b8832852bb..84c665a95d3 100644 --- a/internal/stream/stream_format.go +++ b/internal/stream/stream_format.go @@ -25,7 +25,7 @@ func unitSize(u unit.Unit) uint64 { type streamFormat struct { decodeErrLogger logger.Writer proc formatprocessor.Processor - readers map[*asyncwriter.Writer]readerFunc + readers map[*asyncwriter.Writer]ReadFunc } func newStreamFormat( @@ -42,13 +42,13 @@ func newStreamFormat( sf := &streamFormat{ decodeErrLogger: decodeErrLogger, proc: proc, - readers: make(map[*asyncwriter.Writer]readerFunc), + readers: make(map[*asyncwriter.Writer]ReadFunc), } return sf, nil } -func (sf *streamFormat) addReader(r *asyncwriter.Writer, cb readerFunc) { +func (sf *streamFormat) addReader(r *asyncwriter.Writer, cb ReadFunc) { sf.readers[r] = cb } diff --git a/internal/test/tls_cert.go b/internal/test/tls_cert.go new file mode 100644 index 00000000000..e110dc4d4ae --- /dev/null +++ b/internal/test/tls_cert.go @@ -0,0 +1,55 @@ +package test + +// TLSCertPub is the public key of a test certificate. +var TLSCertPub = []byte(`-----BEGIN CERTIFICATE----- +MIIDazCCAlOgAwIBAgIUXw1hEC3LFpTsllv7D3ARJyEq7sIwDQYJKoZIhvcNAQEL +BQAwRTELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoM +GEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDAeFw0yMDEyMTMxNzQ0NThaFw0zMDEy +MTExNzQ0NThaMEUxCzAJBgNVBAYTAkFVMRMwEQYDVQQIDApTb21lLVN0YXRlMSEw +HwYDVQQKDBhJbnRlcm5ldCBXaWRnaXRzIFB0eSBMdGQwggEiMA0GCSqGSIb3DQEB +AQUAA4IBDwAwggEKAoIBAQDG8DyyS51810GsGwgWr5rjJK7OE1kTTLSNEEKax8Bj +zOyiaz8rA2JGl2VUEpi2UjDr9Cm7nd+YIEVs91IIBOb7LGqObBh1kGF3u5aZxLkv +NJE+HrLVvUhaDobK2NU+Wibqc/EI3DfUkt1rSINvv9flwTFu1qHeuLWhoySzDKEp +OzYxpFhwjVSokZIjT4Red3OtFz7gl2E6OAWe2qoh5CwLYVdMWtKR0Xuw3BkDPk9I +qkQKx3fqv97LPEzhyZYjDT5WvGrgZ1WDAN3booxXF3oA1H3GHQc4m/vcLatOtb8e +nI59gMQLEbnp08cl873bAuNuM95EZieXTHNbwUnq5iybAgMBAAGjUzBRMB0GA1Ud +DgQWBBQBKhJh8eWu0a4au9X/2fKhkFX2vjAfBgNVHSMEGDAWgBQBKhJh8eWu0a4a +u9X/2fKhkFX2vjAPBgNVHRMBAf8EBTADAQH/MA0GCSqGSIb3DQEBCwUAA4IBAQBj +3aCW0YPKukYgVK9cwN0IbVy/D0C1UPT4nupJcy/E0iC7MXPZ9D/SZxYQoAkdptdO +xfI+RXkpQZLdODNx9uvV+cHyZHZyjtE5ENu/i5Rer2cWI/mSLZm5lUQyx+0KZ2Yu +tEI1bsebDK30msa8QSTn0WidW9XhFnl3gRi4wRdimcQapOWYVs7ih+nAlSvng7NI +XpAyRs8PIEbpDDBMWnldrX4TP6EWYUi49gCp8OUDRREKX3l6Ls1vZ02F34yHIt/7 +7IV/XSKG096bhW+icKBWV0IpcEsgTzPK1J1hMxgjhzIMxGboAeUU+kidthOob6Sd +XQxaORfgM//NzX9LhUPk +-----END CERTIFICATE----- +`) + +// TLSCertKey is the private key of a test certificate. +var TLSCertKey = []byte(`-----BEGIN RSA PRIVATE KEY----- +MIIEogIBAAKCAQEAxvA8skudfNdBrBsIFq+a4ySuzhNZE0y0jRBCmsfAY8zsoms/ +KwNiRpdlVBKYtlIw6/Qpu53fmCBFbPdSCATm+yxqjmwYdZBhd7uWmcS5LzSRPh6y +1b1IWg6GytjVPlom6nPxCNw31JLda0iDb7/X5cExbtah3ri1oaMkswyhKTs2MaRY +cI1UqJGSI0+EXndzrRc+4JdhOjgFntqqIeQsC2FXTFrSkdF7sNwZAz5PSKpECsd3 +6r/eyzxM4cmWIw0+Vrxq4GdVgwDd26KMVxd6ANR9xh0HOJv73C2rTrW/HpyOfYDE +CxG56dPHJfO92wLjbjPeRGYnl0xzW8FJ6uYsmwIDAQABAoIBACi0BKcyQ3HElSJC +kaAao+Uvnzh4yvPg8Nwf5JDIp/uDdTMyIEWLtrLczRWrjGVZYbsVROinP5VfnPTT +kYwkfKINj2u+gC6lsNuPnRuvHXikF8eO/mYvCTur1zZvsQnF5kp4GGwIqr+qoPUP +bB0UMndG1PdpoMryHe+JcrvTrLHDmCeH10TqOwMsQMLHYLkowvxwJWsmTY7/Qr5S +Wm3PPpOcW2i0uyPVuyuv4yD1368fqnqJ8QFsQp1K6QtYsNnJ71Hut1/IoxK/e6hj +5Z+byKtHVtmcLnABuoOT7BhleJNFBksX9sh83jid4tMBgci+zXNeGmgqo2EmaWAb +agQslkECgYEA8B1rzjOHVQx/vwSzDa4XOrpoHQRfyElrGNz9JVBvnoC7AorezBXQ +M9WTHQIFTGMjzD8pb+YJGi3gj93VN51r0SmJRxBaBRh1ZZI9kFiFzngYev8POgD3 +ygmlS3kTHCNxCK/CJkB+/jMBgtPj5ygDpCWVcTSuWlQFphePkW7jaaECgYEA1Blz +ulqgAyJHZaqgcbcCsI2q6m527hVr9pjzNjIVmkwu38yS9RTCgdlbEVVDnS0hoifl ++jVMEGXjF3xjyMvL50BKbQUH+KAa+V4n1WGlnZOxX9TMny8MBjEuSX2+362vQ3BX +4vOlX00gvoc+sY+lrzvfx/OdPCHQGVYzoKCxhLsCgYA07HcviuIAV/HsO2/vyvhp +xF5gTu+BqNUHNOZDDDid+ge+Jre2yfQLCL8VPLXIQW3Jff53IH/PGl+NtjphuLvj +7UDJvgvpZZuymIojP6+2c3gJ3CASC9aR3JBnUzdoE1O9s2eaoMqc4scpe+SWtZYf +3vzSZ+cqF6zrD/Rf/M35IQKBgHTU4E6ShPm09CcoaeC5sp2WK8OevZw/6IyZi78a +r5Oiy18zzO97U/k6xVMy6F+38ILl/2Rn31JZDVJujniY6eSkIVsUHmPxrWoXV1HO +y++U32uuSFiXDcSLarfIsE992MEJLSAynbF1Rsgsr3gXbGiuToJRyxbIeVy7gwzD +94TpAoGAY4/PejWQj9psZfAhyk5dRGra++gYRQ/gK1IIc1g+Dd2/BxbT/RHr05GK +6vwrfjsoRyMWteC1SsNs/CurjfQ/jqCfHNP5XPvxgd5Ec8sRJIiV7V5RTuWJsPu1 ++3K6cnKEyg+0ekYmLertRFIY6SwWmY1fyKgTvxudMcsBY7dC4xs= +-----END RSA PRIVATE KEY----- +`)