Skip to content

Commit

Permalink
Implementing a general priority lock
Browse files Browse the repository at this point in the history
  • Loading branch information
mohanson committed Aug 26, 2024
1 parent 85f4794 commit e6d3c53
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 41 deletions.
4 changes: 2 additions & 2 deletions protocol/czar/err.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (

// Err is an object that will only store an error once.
type Err struct {
mux sync.Mutex // Guards following
mux *sync.Mutex // Guards following
err error
sig chan struct{}
}
Expand Down Expand Up @@ -36,7 +36,7 @@ func (e *Err) Sig() <-chan struct{} {

func NewErr() *Err {
return &Err{
mux: sync.Mutex{},
mux: &sync.Mutex{},
err: nil,
sig: make(chan struct{}),
}
Expand Down
70 changes: 31 additions & 39 deletions protocol/czar/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ type Stream struct {
rch chan []byte
rer *Err
wer *Err
wmu sync.Mutex
zo0 sync.Once
zo1 sync.Once
}
Expand All @@ -28,9 +27,10 @@ func (s *Stream) Close() error {
s.rer.Put(io.ErrClosedPipe)
s.wer.Put(io.ErrClosedPipe)
s.zo0.Do(func() {
s.wmu.Lock()
s.mux.Write(0, []byte{s.idx, 0x02, 0x00, 0x00})
s.wmu.Unlock()
s.mux.pri.H(func() error {
s.mux.con.Write([]byte{s.idx, 0x02, 0x00, 0x00})
return nil
})
})
return nil
}
Expand All @@ -40,9 +40,10 @@ func (s *Stream) Esolc() error {
s.rer.Put(io.EOF)
s.wer.Put(io.ErrClosedPipe)
s.zo0.Do(func() {
s.wmu.Lock()
s.mux.Write(0, []byte{s.idx, 0x02, 0x01, 0x00})
s.wmu.Unlock()
s.mux.pri.H(func() error {
s.mux.con.Write([]byte{s.idx, 0x02, 0x01, 0x00})
return nil
})
})
s.zo1.Do(func() {
s.idp.Put(s.idx)
Expand Down Expand Up @@ -98,18 +99,20 @@ func (s *Stream) Write(p []byte) (int, error) {
binary.BigEndian.PutUint16(b[2:4], uint16(l))
copy(b[4:], p[:l])
p = p[l:]
s.wmu.Lock()
if err := s.wer.Get(); err != nil {
s.wmu.Unlock()
return n, err
}
_, err := s.mux.Write(1, b[:4+l])
err := s.mux.pri.M(func() error {
if err := s.wer.Get(); err != nil {
return err
}
_, err := s.mux.con.Write(b[:4+l])
if err != nil {
s.wer.Put(err)
return err
}
return nil
})
if err != nil {
s.wer.Put(err)
s.wmu.Unlock()
return n, err
}
s.wmu.Unlock()
n += l
}
}
Expand All @@ -124,7 +127,6 @@ func NewStream(idx uint8, mux *Mux) *Stream {
rch: make(chan []byte, 32),
rer: NewErr(),
wer: NewErr(),
wmu: sync.Mutex{},
zo0: sync.Once{},
zo1: sync.Once{},
}
Expand All @@ -144,10 +146,9 @@ type Mux struct {
ach chan *Stream
con net.Conn
idp *Sip
pri *Priority
rer *Err
usb []*Stream
wm0 sync.Mutex
wm1 sync.Mutex
}

// Accept is used to block until the next available stream is ready to be accepted.
Expand All @@ -163,17 +164,23 @@ func (m *Mux) Close() error {

// Open is used to create a new stream as a net.Conn.
func (m *Mux) Open() (*Stream, error) {
idx, err := m.idp.Get()
var (
err error
idx uint8
stm *Stream
)
idx, err = m.idp.Get()
if err != nil {
return nil, err
}
cnt, err := m.Write(0, []byte{idx, 0x00, 0x00, 0x00})
err = m.pri.H(func() error {
return doa.Err(m.con.Write([]byte{idx, 0x00, 0x00, 0x00}))
})
if err != nil {
m.idp.Put(idx)
return nil, err
}
doa.Doa(cnt == 4)
stm := NewStream(idx, m)
stm = NewStream(idx, m)
stm.idp = m.idp
m.usb[idx] = stm
return stm, nil
Expand Down Expand Up @@ -242,30 +249,15 @@ func (m *Mux) Recv() {
close(m.ach)
}

// Write writes data to the connection. The code implements a simple priority write using two locks.
func (m *Mux) Write(priority int, b []byte) (int, error) {
if priority >= 1 {
m.wm1.Lock()
defer m.wm1.Unlock()
}
if priority >= 0 {
m.wm0.Lock()
defer m.wm0.Unlock()
}
n, err := m.con.Write(b)
return n, err
}

// NewMux returns a new Mux.
func NewMux(conn net.Conn) *Mux {
mux := &Mux{
ach: make(chan *Stream),
con: conn,
idp: nil,
pri: NewPriority(),
rer: NewErr(),
usb: make([]*Stream, 256),
wm0: sync.Mutex{},
wm1: sync.Mutex{},
}
return mux
}
Expand Down
48 changes: 48 additions & 0 deletions protocol/czar/priority.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package czar

import (
"sync"
)

// Priority implement a lock with three priorities.
type Priority struct {
l *sync.Mutex
m *sync.Mutex
h *sync.Mutex
}

// H executes function f with 0 priority.
func (p *Priority) H(f func() error) error {
p.h.Lock()
defer p.h.Unlock()
return f()
}

// H executes function f with 1 priority.
func (p *Priority) M(f func() error) error {
p.m.Lock()
defer p.m.Unlock()
p.h.Lock()
defer p.h.Unlock()
return f()
}

// H executes function f with 2 priority.
func (p *Priority) L(f func() error) error {
p.l.Lock()
defer p.l.Unlock()
p.m.Lock()
defer p.m.Unlock()
p.h.Lock()
defer p.h.Unlock()
return f()
}

// NewPriority returns a new Priority.
func NewPriority() *Priority {
return &Priority{
l: &sync.Mutex{},
m: &sync.Mutex{},
h: &sync.Mutex{},
}
}
18 changes: 18 additions & 0 deletions protocol/czar/priority_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package czar

import (
"testing"
)

func TestPriority(t *testing.T) {
pri := NewPriority()
pri.H(func() error {
return nil
})
pri.M(func() error {
return nil
})
pri.L(func() error {
return nil
})
}

0 comments on commit e6d3c53

Please sign in to comment.