Skip to content

Commit

Permalink
fix trace
Browse files Browse the repository at this point in the history
  • Loading branch information
guonaihong committed May 23, 2024
1 parent 16e5cae commit d567305
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 51 deletions.
97 changes: 85 additions & 12 deletions client_option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,31 @@ import (
"net/http"
"net/http/httptest"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

"golang.org/x/net/proxy"
)

// // 实现安全的net.Conn
// type safeConn struct {
// net.Conn
// sync.Mutex
// }

// func (s *safeConn) Write(b []byte) (n int, err error) {
// s.Lock()
// defer s.Unlock()
// return s.Conn.Write(b)
// }

// func (s *safeConn) Read(b []byte) (n int, err error) {
// s.Lock()
// defer s.Unlock()
// return s.Conn.Read(b)
// }
func Test_ClientOption(t *testing.T) {
t.Run("ClientOption.WithClientHTTPHeader", func(t *testing.T) {
done := make(chan string, 1)
Expand Down Expand Up @@ -401,21 +419,76 @@ func Test_ClientOption(t *testing.T) {
return
}
defer c2.Close()
done := make(chan struct{})

// done := make(chan struct{})
// newConn = &safeConn{Conn: newConn}
// c2 = &safeConn{Conn: c2}
// go func() {
// _, err = io.Copy(newConn, c2)
// if err != nil {
// t.Error(err)
// return
// }
// close(done)
// }()
// _, err = io.Copy(c2, newConn)
// if err != nil {
// t.Error(err)
// return
// }
// <-done

var (
newConnMu sync.Mutex
c2Mu sync.Mutex
wg sync.WaitGroup
)

wg.Add(2)

go func() {
_, err = io.Copy(newConn, c2)
if err != nil {
t.Error(err)
return
defer wg.Done()
buf := make([]byte, 4096)
for {
n, err := c2.Read(buf)
if err != nil {
if err != io.EOF {
t.Error(err)
}
break
}
newConnMu.Lock()
_, err = newConn.Write(buf[:n])
newConnMu.Unlock()
if err != nil {
t.Error(err)
break
}
}
close(done)
}()
_, err = io.Copy(c2, newConn)
if err != nil {
t.Error(err)
return
}
<-done

go func() {
defer wg.Done()
buf := make([]byte, 4096)
for {
n, err := newConn.Read(buf)
if err != nil {
if err != io.EOF {
t.Error(err)
}
break
}
c2Mu.Lock()
_, err = c2.Write(buf[:n])
c2Mu.Unlock()
if err != nil {
t.Error(err)
break
}
}
}()

wg.Wait()
}()

got := make([]byte, 0, 128)
Expand Down
82 changes: 43 additions & 39 deletions common_options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1698,6 +1698,7 @@ func Test_CommonOption(t *testing.T) {
t.Run("13-15.client: WriteMessageDelay", func(t *testing.T) {
run := int32(0)
data := make(chan string, 1)
recvServer := int32(0)
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
c, err := Upgrade(w, r,
WithServerDecompressAndCompress(),
Expand All @@ -1708,8 +1709,10 @@ func Test_CommonOption(t *testing.T) {
t.Error(err)
return
}
atomic.AddInt32(&run, int32(1))
data <- string(payload)
atomic.AddInt32(&recvServer, int32(1))
if atomic.LoadInt32(&recvServer) == 3 {
c.Close()
}
}))
if err != nil {
t.Error(err)
Expand All @@ -1720,36 +1723,36 @@ func Test_CommonOption(t *testing.T) {
defer ts.Close()

url := strings.ReplaceAll(ts.URL, "http", "ws")
recv := int32(0)
con, err := Dial(url, WithClientDecompressAndCompress(),
WithClientDecompression(),
WithClientMaxDelayWriteDuration(30*time.Millisecond),
WithClientMaxDelayWriteNum(3),
WithClientWindowsParseMode(),
WithClientDelayWriteInitBufferSize(4096),
WithClientOnMessageFunc(func(c *Conn, op Opcode, payload []byte) {
err := c.WriteMessageDelay(op, []byte("hello"))
if err != nil {
t.Error(err)
return
}

err = c.WriteMessageDelay(op, []byte("hello"))
if err != nil {
t.Error(err)
return
}
err = c.WriteMessageDelay(op, []byte("hello"))
if err != nil {
t.Error(err)
return
atomic.AddInt32(&recv, int32(1))
if atomic.LoadInt32(&recv) == 3 {
data <- "hello"
}
}))
if err != nil {
t.Error(err)
}
defer con.Close()

err = con.WriteMessage(Binary, []byte("hello"))
err = con.WriteMessageDelay(Text, []byte("hello"))
if err != nil {
t.Error(err)
return
}

err = con.WriteMessageDelay(Text, []byte("hello"))
if err != nil {
t.Error(err)
return
}
err = con.WriteMessageDelay(Text, []byte("hello"))
if err != nil {
t.Error(err)
return
Expand All @@ -1760,10 +1763,11 @@ func Test_CommonOption(t *testing.T) {
if d != "hello" {
t.Errorf("write message or read message fail:got:%s, need:hello\n", d)
}
run++
case <-time.After(1000 * time.Millisecond):
}
if atomic.LoadInt32(&run) != 1 {
t.Error("not run server:method fail")
t.Errorf("not run server:method fail:%d", atomic.LoadInt32(&run))
}
})

Expand All @@ -1773,14 +1777,13 @@ func Test_CommonOption(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
c, err := Upgrade(w, r,
WithServerDecompressAndCompress(),
// WithServerBufioParseMode(),
WithServerCallbackFunc(nil, func(c *Conn, op Opcode, payload []byte) {
if op != Binary {
if op != Text {
t.Error("opcode error")
}
err := c.WriteMessage(op, payload)
if err != nil {
t.Error(err)
err1 := c.WriteMessage(op, payload)
if err1 != nil {
t.Errorf("c.WriteMessage:%v", err1)
return
}
}, func(c *Conn, err error) {
Expand All @@ -1804,30 +1807,22 @@ func Test_CommonOption(t *testing.T) {
defer ts.Close()

url := strings.ReplaceAll(ts.URL, "http", "ws")
recv := int32(0)
con, err := Dial(url,
WithClientDecompressAndCompress(),
WithClientMaxDelayWriteDuration(30*time.Millisecond),
WithClientMaxDelayWriteNum(3),
WithClientWindowsParseMode(),
WithClientDelayWriteInitBufferSize(4096),
WithClientOnMessageFunc(func(c *Conn, op Opcode, payload []byte) {
if op != Binary {
if op != Text {
t.Error("opcode error")
}
err := c.WriteMessageDelay(op, []byte("hello"))
if err != nil {
t.Error(err)
}
err = c.WriteMessageDelay(op, []byte("hello"))
if err != nil {
t.Error(err)
atomic.AddInt32(&recv, int32(1))
if atomic.LoadInt32(&recv) == 3 {
data <- "hello"
}
err = c.WriteMessageDelay(op, []byte("hello"))
if err != nil {
t.Error(err)
}
data <- "hello"
atomic.AddInt32(&run, int32(1))
// atomic.AddInt32(&run, int32(1))
}))
if err != nil {
t.Error(err)
Expand All @@ -1837,7 +1832,15 @@ func Test_CommonOption(t *testing.T) {
if !con.Compression {
t.Error("not compression:method fail")
}
err = con.WriteMessage(Binary, []byte("hello"))
err = con.WriteMessageDelay(Text, []byte("hello"))
if err != nil {
t.Error(err)
}
err = con.WriteMessageDelay(Text, []byte("hello"))
if err != nil {
t.Error(err)
}
err = con.WriteMessageDelay(Text, []byte("hello"))
if err != nil {
t.Error(err)
}
Expand All @@ -1848,6 +1851,7 @@ func Test_CommonOption(t *testing.T) {
if d != "hello" {
t.Errorf("write message or read message fail:got:%s, need:hello\n", d)
}
run++
case <-time.After(1000 * time.Millisecond):
t.Errorf("write message timeout\n")
}
Expand Down
4 changes: 4 additions & 0 deletions utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@ import (
"fmt"
"math/rand"
"net/http"
"sync"
"time"
"unsafe"
)

var rng = rand.New(rand.NewSource(time.Now().UnixNano()))

var mu sync.Mutex
var uuid = []byte("258EAFA5-E914-47DA-95CA-C5AB0DC85B11")

func StringToBytes(s string) []byte {
Expand All @@ -50,7 +52,9 @@ func StringToBytes(s string) []byte {
func secWebSocketAccept() string {
// rfc规定是16字节
var key [16]byte
mu.Lock()
rng.Read(key[:])
mu.Unlock()
return base64.StdEncoding.EncodeToString(key[:])
}

Expand Down

0 comments on commit d567305

Please sign in to comment.