Skip to content

Commit

Permalink
IO unset read/write (#3)
Browse files Browse the repository at this point in the history
* Update gosec build step

* IO can unset reads/writes

* Close() unsets reads+writes and deregisters the IO slot

* AsyncAdapter exposes its IO Slot
  • Loading branch information
sergiu128 authored Sep 24, 2024
1 parent afad11c commit 4e1ba28
Show file tree
Hide file tree
Showing 8 changed files with 73 additions and 6 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/gosec.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ jobs:
- name: Checkout Source
uses: actions/checkout@v3
- name: Run Gosec Security Scanner
uses: securego/gosec@v2.15.0
uses: securego/gosec@v2.21.0
with:
args: '-no-fail -fmt=sarif -out=results.sarif -exclude-dir=examples -exclude-dir=stress_test -exclude-dir=other -exclude-dir=docs -exclude-dir=tests -exclude-dir=benchmark ./...'
- name: Upload SARIF file
uses: github/codeql-action/upload-sarif@v2
uses: github/codeql-action/upload-sarif@v3
with:
sarif_file: results.sarif
7 changes: 6 additions & 1 deletion async_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,8 @@ func (a *AsyncAdapter) Close() error {
return io.EOF
}

_ = a.ioc.poller.Del(&a.slot)
_ = a.ioc.UnsetReadWrite(&a.slot)
a.ioc.Deregister(&a.slot)

return syscall.Close(a.slot.Fd)
}
Expand Down Expand Up @@ -245,3 +246,7 @@ func (a *AsyncAdapter) cancelWrites() {
func (a *AsyncAdapter) RawFd() int {
return a.slot.Fd
}

func (a *AsyncAdapter) Slot() *internal.Slot {
return &a.slot
}
4 changes: 2 additions & 2 deletions file.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,10 +232,10 @@ func (f *file) Close() error {
return io.EOF
}

err := f.ioc.poller.Del(&f.slot)
if err != nil {
if err := f.ioc.UnsetReadWrite(&f.slot); err != nil {
return err
}
f.ioc.Deregister(&f.slot)

return syscall.Close(f.slot.Fd)
}
Expand Down
25 changes: 25 additions & 0 deletions io.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,39 @@ func (ioc *IO) Deregister(slot *internal.Slot) {
}
}

// SetRead tells the kernel to notify us when reads can be made on the provided IO slot. If successful, this call must
// be succeeded by Register(slot).
//
// It is safe to call this method multiple times.
func (ioc *IO) SetRead(slot *internal.Slot) error {
return ioc.poller.SetRead(slot)
}

// UnsetRead tells the kernel to not notify us anymore when reads can be made on the provided IO slot. Since the
// underlying platform-specific poller already unsets a read before dispatching it, callers must only use this method
// they want to cancel a currently-scheduled read. For example, when an error occurs outside of an AsyncRead call and
// the underlying file descriptor must be closed. In that case, this call must be succeeded by Deregister(slot).
//
// It is safe to call this method multiple times.
func (ioc *IO) UnsetRead(slot *internal.Slot) error {
return ioc.poller.DelRead(slot)
}

// Like SetRead but for writes.
func (ioc *IO) SetWrite(slot *internal.Slot) error {
return ioc.poller.SetWrite(slot)
}

// Like UnsetRead but for writes.
func (ioc *IO) UnsetWrite(slot *internal.Slot) error {
return ioc.poller.DelWrite(slot)
}

// UnsetRead and UnsetWrite in a single call.
func (ioc *IO) UnsetReadWrite(slot *internal.Slot) error {
return ioc.poller.Del(slot)
}

// Run runs the event processing loop.
func (ioc *IO) Run() error {
for {
Expand Down
32 changes: 32 additions & 0 deletions io_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,38 @@ func TestIOPending(t *testing.T) {
}
}

func TestSetUnsetRead(t *testing.T) {
ioc := MustIO()
defer ioc.Close()

pipe, err := internal.NewPipe()
if err != nil {
t.Fatal(err)
}

if pipe.ReadFd() != pipe.Slot().Fd {
t.Fatal("pipe must be identified by its read end file descriptor")
}

for i := 0; i < 10; i++ {
if err := ioc.SetRead(pipe.Slot()); err != nil {
t.Fatal(err)
}
}

for i := 0; i < 100; i++ {
if err := ioc.UnsetRead(pipe.Slot()); err != nil {
t.Fatal(err)
}
}

for i := 0; i < 100; i++ {
if err := ioc.UnsetReadWrite(pipe.Slot()); err != nil {
t.Fatal(err)
}
}
}

func BenchmarkPollOne(b *testing.B) {
ioc := MustIO()
defer ioc.Close()
Expand Down
3 changes: 2 additions & 1 deletion listen_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ func (l *listener) accept() (Conn, error) {
}

func (l *listener) Close() error {
_ = l.ioc.poller.Del(&l.slot)
_ = l.ioc.UnsetReadWrite(&l.slot)
l.ioc.Deregister(&l.slot)
return syscall.Close(l.slot.Fd)
}

Expand Down
2 changes: 2 additions & 0 deletions multicast/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -676,6 +676,8 @@ func (p *UDPPeer) LocalAddr() *net.UDPAddr {
func (p *UDPPeer) Close() error {
if !p.closed {
p.closed = true
_ = p.ioc.UnsetReadWrite(&p.slot)
p.ioc.Deregister(&p.slot)
return p.socket.Close()
}
return nil
Expand Down
2 changes: 2 additions & 0 deletions packet.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,8 @@ func (c *packetConn) getWriteHandler(b []byte, to net.Addr, cb AsyncWriteCallbac

func (c *packetConn) Close() error {
atomic.StoreUint32(&c.closed, 1)
_ = c.ioc.UnsetReadWrite(&c.slot)
c.ioc.Deregister(&c.slot)
return syscall.Close(c.slot.Fd)
}

Expand Down

0 comments on commit 4e1ba28

Please sign in to comment.