From 38c4b814728bf2741669d14fce5adb2bcbb697e6 Mon Sep 17 00:00:00 2001 From: Roman Date: Tue, 12 Nov 2024 01:36:39 +0400 Subject: [PATCH] Move --- grid.go | 228 ++++++++++++++++++++++++++++++++++----------------- view.go | 118 ++++++++++++++++++++------ view_test.go | 149 +++++++++++++++++++++++++-------- 3 files changed, 359 insertions(+), 136 deletions(-) diff --git a/grid.go b/grid.go index 7d7cf06..eed5451 100644 --- a/grid.go +++ b/grid.go @@ -34,8 +34,12 @@ func NewGridOf[T comparable](width, height int16) *Grid[T] { pages: pages, pageWidth: width, pageHeight: height, - observers: pubsub[T]{}, Size: At(width*3, height*3), + observers: pubsub[T]{ + tmp: sync.Pool{ + New: func() any { return make(map[Observer[T]]struct{}, 4) }, + }, + }, } // Function to calculate a point based on the index @@ -242,48 +246,60 @@ func (p *page[T]) Unlock() { // ---------------------------------- Mutations ---------------------------------- // writeTile stores the tile and return whether tile is observed or not -func (p *page[T]) writeTile(grid *Grid[T], idx uint8, tile Value) { - value := p.tileAt(idx) - for !atomic.CompareAndSwapUint32(&p.tiles[idx], uint32(value), uint32(tile)) { - value = p.tileAt(idx) +func (p *page[T]) writeTile(grid *Grid[T], idx uint8, after Value) { + before := p.tileAt(idx) + for !atomic.CompareAndSwapUint32(&p.tiles[idx], uint32(before), uint32(after)) { + before = p.tileAt(idx) } // If observed, notify the observers of the tile if p.IsObserved() { - grid.observers.Notify(p.point, &Update[T]{ - Point: pointOf(p.point, idx), - Old: value, - New: tile, - }) + at := pointOf(p.point, idx) + grid.observers.Notify1(&Update[T]{ + Old: UpdateState[T]{ + Point: at, + Value: before, + }, + New: UpdateState[T]{ + Point: at, + Value: after, + }, + }, p.point, at) } } // mergeTile atomically merges the tile bits given a function func (p *page[T]) mergeTile(grid *Grid[T], idx uint8, fn func(Value) Value) Value { - value := p.tileAt(idx) - merge := fn(value) + before := p.tileAt(idx) + after := fn(before) // Swap, if we're not able to re-merge again - for !atomic.CompareAndSwapUint32(&p.tiles[idx], uint32(value), uint32(merge)) { - value = p.tileAt(idx) - merge = fn(value) + for !atomic.CompareAndSwapUint32(&p.tiles[idx], uint32(before), uint32(after)) { + before = p.tileAt(idx) + after = fn(before) } // If observed, notify the observers of the tile if p.IsObserved() { - grid.observers.Notify(p.point, &Update[T]{ - Point: pointOf(p.point, idx), - Old: value, - New: merge, - }) + at := pointOf(p.point, idx) + grid.observers.Notify1(&Update[T]{ + Old: UpdateState[T]{ + Point: at, + Value: before, + }, + New: UpdateState[T]{ + Point: at, + Value: after, + }, + }, p.point, at) } // Return the merged tile data - return merge + return after } // addObject adds object to the set -func (p *page[T]) addObject(grid *Grid[T], idx uint8, object T) { +func (p *page[T]) addObject(idx uint8, object T) (value uint32) { p.Lock() // Lazily initialize the map, as most pages might not have anything stored @@ -293,38 +309,20 @@ func (p *page[T]) addObject(grid *Grid[T], idx uint8, object T) { } p.state[object] = uint8(idx) + value = p.tileAt(idx) p.Unlock() - - // If observed, notify the observers of the tile - if p.IsObserved() { - value := p.tileAt(idx) - grid.observers.Notify(p.point, &Update[T]{ - Point: pointOf(p.point, idx), - Old: value, - New: value, - Add: object, - }) - } + return } // delObject removes the object from the set -func (p *page[T]) delObject(grid *Grid[T], idx uint8, object T) { +func (p *page[T]) delObject(idx uint8, object T) (value uint32) { p.Lock() if p.state != nil { delete(p.state, object) } + value = p.tileAt(idx) p.Unlock() - - // If observed, notify the observers of the tile - if p.IsObserved() { - value := p.tileAt(idx) - grid.observers.Notify(p.point, &Update[T]{ - Point: pointOf(p.point, idx), - Old: value, - New: value, - Del: object, - }) - } + return } // ---------------------------------- Tile Cursor ---------------------------------- @@ -337,28 +335,33 @@ type Tile[T comparable] struct { } // Count returns number of objects at the current tile. -func (c Tile[T]) Count() (count int) { - c.data.Lock() - defer c.data.Unlock() - for _, idx := range c.data.state { - if idx == uint8(c.idx) { +func (t Tile[T]) Count() (count int) { + t.data.Lock() + defer t.data.Unlock() + for _, idx := range t.data.state { + if idx == uint8(t.idx) { count++ } } return } +// Point returns the point of the tile +func (t Tile[T]) Point() Point { + return pointOf(t.data.point, t.idx) +} + // Value reads the tile information -func (c Tile[T]) Value() Value { - return c.data.tileAt(c.idx) +func (t Tile[T]) Value() Value { + return t.data.tileAt(t.idx) } // Range iterates over all of the objects in the set -func (c Tile[T]) Range(fn func(T) error) error { - c.data.Lock() - defer c.data.Unlock() - for v, idx := range c.data.state { - if idx == uint8(c.idx) { +func (t Tile[T]) Range(fn func(T) error) error { + t.data.Lock() + defer t.data.Unlock() + for v, idx := range t.data.state { + if idx == uint8(t.idx) { if err := fn(v); err != nil { return err } @@ -368,42 +371,115 @@ func (c Tile[T]) Range(fn func(T) error) error { } // Observers iterates over all views observing this tile -func (c Tile[T]) Observers(fn func(view Observer[T])) { - if !c.data.IsObserved() { - return +func (t Tile[T]) Observers(fn func(view Observer[T])) { + if t.data.IsObserved() { + t.grid.observers.Each1(fn, t.data.point, t.Point()) } - - c.grid.observers.Each(c.data.point, func(sub Observer[T]) { - if view, ok := sub.(Observer[T]); ok { - fn(view) - } - }) } // Add adds object to the set -func (c Tile[T]) Add(v T) { - c.data.addObject(c.grid, c.idx, v) +func (t Tile[T]) Add(v T) { + value := t.data.addObject(t.idx, v) + + // If observed, notify the observers of the tile + if t.data.IsObserved() { + at := t.Point() + t.grid.observers.Notify1(&Update[T]{ + Old: UpdateState[T]{ + Point: at, + Value: value, + Add: v, + }, + New: UpdateState[T]{ + Point: at, + Value: value, + Add: v, + }, + }, t.data.point, at) + } } // Del removes the object from the set -func (c Tile[T]) Del(v T) { - c.data.delObject(c.grid, c.idx, v) +func (t Tile[T]) Del(v T) { + value := t.data.delObject(t.idx, v) + + // If observed, notify the observers of the tile + if t.data.IsObserved() { + at := t.Point() + t.grid.observers.Notify1(&Update[T]{ + Old: UpdateState[T]{ + Point: at, + Value: value, + Del: v, + }, + New: UpdateState[T]{ + Point: at, + Value: value, + Del: v, + }, + }, t.data.point, at) + } +} + +// Move moves an object from the current tile to the destination tile. +func (t Tile[T]) Move(v T, dst Point) bool { + d, ok := t.grid.At(dst.X, dst.Y) + if !ok { + return false + } + + // Move the object from the source to the destination + tv := t.data.delObject(d.idx, v) + dv := d.data.addObject(d.idx, v) + if !t.data.IsObserved() && !d.data.IsObserved() { + return true + } + + // Prepare the update notification + update := &Update[T]{ + Old: UpdateState[T]{ + Point: t.Point(), + Value: tv, + Del: v, + }, + New: UpdateState[T]{ + Point: d.Point(), + Value: dv, + Add: v, + }, + } + + switch { + case t.data == d.data || !d.data.IsObserved(): + t.grid.observers.Notify1(update, t.data.point, t.Point()) + case !t.data.IsObserved(): + t.grid.observers.Notify1(update, d.data.point, d.Point()) + default: + t.grid.observers.Notify2(update, [2]Point{ + t.data.point, + d.data.point, + }, [2]Point{ + t.Point(), + d.Point(), + }) + } + return true } // Write updates the entire tile value. -func (c Tile[T]) Write(tile Value) { - c.data.writeTile(c.grid, c.idx, tile) +func (t Tile[T]) Write(tile Value) { + t.data.writeTile(t.grid, t.idx, tile) } // Merge atomically merges the tile by applying a merging function. -func (c Tile[T]) Merge(merge func(Value) Value) Value { - return c.data.mergeTile(c.grid, c.idx, merge) +func (t Tile[T]) Merge(merge func(Value) Value) Value { + return t.data.mergeTile(t.grid, t.idx, merge) } // Mask updates the bits of tile. The bits are specified by the mask. The bits // that need to be updated should be flipped on in the mask. -func (c Tile[T]) Mask(tile, mask Value) Value { - return c.data.mergeTile(c.grid, c.idx, func(value Value) Value { +func (t Tile[T]) Mask(tile, mask Value) Value { + return t.data.mergeTile(t.grid, t.idx, func(value Value) Value { return (value &^ mask) | (tile & mask) }) } diff --git a/view.go b/view.go index 12494f5..3a0ed03 100644 --- a/view.go +++ b/view.go @@ -15,13 +15,18 @@ type Observer[T comparable] interface { onUpdate(*Update[T]) } +type UpdateState[T comparable] struct { + Point // The point of the tile + Value // The value of the tile + Add T // An object was added to the tile + Del T // An object was removed from the tile +} + // Update represents a tile update notification. type Update[T comparable] struct { - Point // The tile location - Old Value // Old tile value - New Value // New tile value - Add T // An object was added to the tile - Del T // An object was removed from the tile + Old UpdateState[T] // Old tile + value + New UpdateState[T] // New tile + value + } var _ Observer[string] = (*View[string, string])(nil) @@ -155,46 +160,112 @@ func (v *View[S, T]) Close() error { // onUpdate occurs when a tile has updated. func (v *View[S, T]) onUpdate(ev *Update[T]) { - if v.Viewport().Contains(ev.Point) { - v.Inbox <- *ev // (copy) - } + v.Inbox <- *ev // (copy) } // ----------------------------------------------------------------------------- // Pubsub represents a publish/subscribe layer for observers. type pubsub[T comparable] struct { - m sync.Map + m sync.Map // Concurrent map of observers + tmp sync.Pool // Temporary observer sets for notifications } // Notify notifies listeners of an update that happened. -func (p *pubsub[T]) Notify(page Point, ev *Update[T]) { - if v, ok := p.m.Load(page.Integer()); ok { - v.(*observers[T]).Notify(ev) - } +func (p *pubsub[T]) Notify1(ev *Update[T], page, at Point) { + p.Each1(func(sub Observer[T]) { + sub.onUpdate(ev) + }, page, at) +} + +// Notify notifies listeners of an update that happened. +func (p *pubsub[T]) Notify2(ev *Update[T], pages, locs [2]Point) { + p.Each2(func(sub Observer[T]) { + sub.onUpdate(ev) + }, pages, locs) } // Each iterates over each observer in a page -func (p *pubsub[T]) Each(page Point, fn func(sub Observer[T])) { +func (p *pubsub[T]) Each1(fn func(sub Observer[T]), page, at Point) { if v, ok := p.m.Load(page.Integer()); ok { - v.(*observers[T]).Each(fn) + v.(*observers[T]).Each(func(sub Observer[T]) { + if sub.Viewport().Contains(at) { + fn(sub) + } + }) + } +} + +// Each2 iterates over each observer in a page +func (p *pubsub[T]) Each2(fn func(sub Observer[T]), pages, locs [2]Point) { + targets := p.tmp.Get().(map[Observer[T]]struct{}) + clear(targets) + defer p.tmp.Put(targets) + + // Collect all observers from all pages + for _, page := range pages { + if v, ok := p.m.Load(page.Integer()); ok { + v.(*observers[T]).Each(func(sub Observer[T]) { + targets[sub] = struct{}{} + }) + } + } + + // Invoke the callback for each observer, once + for sub := range targets { + if sub.Viewport().Contains(locs[0]) || sub.Viewport().Contains(locs[1]) { + fn(sub) + } } } +/* +// Each iterates over each observer in a page +func (p *pubsub[T]) Each(fn func(sub Observer[T]), pages ...Point) { + switch len(pages) { + + // Single page: directly invoke the callback + case 1: + if v, ok := p.m.Load(pages[0].Integer()); ok { + v.(*observers[T]).Each(fn) + } + + // Multiple pages: merge distinct observers and invoke the callback + default: + targets := p.tmp.Get().(map[Observer[T]]struct{}) + clear(targets) + defer p.tmp.Put(targets) + + // Collect all observers from all pages + for _, page := range pages { + if v, ok := p.m.Load(page.Integer()); ok { + v.(*observers[T]).Each(func(sub Observer[T]) { + targets[sub] = struct{}{} + }) + } + } + + // Invoke the callback for each observer, once + for sub := range targets { + fn(sub) + } + } +}*/ + // Subscribe registers an event listener on a system -func (p *pubsub[T]) Subscribe(at Point, sub Observer[T]) bool { - if v, ok := p.m.Load(at.Integer()); ok { +func (p *pubsub[T]) Subscribe(page Point, sub Observer[T]) bool { + if v, ok := p.m.Load(page.Integer()); ok { return v.(*observers[T]).Subscribe(sub) } // Slow path - v, _ := p.m.LoadOrStore(at.Integer(), newObservers[T]()) + v, _ := p.m.LoadOrStore(page.Integer(), newObservers[T]()) return v.(*observers[T]).Subscribe(sub) } // Unsubscribe deregisters an event listener from a system -func (p *pubsub[T]) Unsubscribe(at Point, sub Observer[T]) bool { - if v, ok := p.m.Load(at.Integer()); ok { +func (p *pubsub[T]) Unsubscribe(page Point, sub Observer[T]) bool { + if v, ok := p.m.Load(page.Integer()); ok { return v.(*observers[T]).Unsubscribe(sub) } return false @@ -216,13 +287,6 @@ func newObservers[T comparable]() *observers[T] { } } -// Notify notifies listeners of an update that happened. -func (s *observers[T]) Notify(ev *Update[T]) { - s.Each(func(sub Observer[T]) { - sub.onUpdate(ev) - }) -} - // Each iterates over each observer func (s *observers[T]) Each(fn func(sub Observer[T])) { if s == nil { diff --git a/view_test.go b/view_test.go index 98e4baa..4c3b496 100644 --- a/view_test.go +++ b/view_test.go @@ -5,7 +5,6 @@ package tile import ( "testing" - "time" "github.com/stretchr/testify/assert" ) @@ -82,7 +81,7 @@ func TestView(t *testing.T) { before := cursor.Value() v.WriteAt(5, 5, Value(55)) update := <-v.Inbox - assert.Equal(t, At(5, 5), update.Point) + assert.Equal(t, At(5, 5), update.New.Point) assert.NotEqual(t, before, update.New) // Merge a tile in view, but with zero mask (won't do anything) @@ -90,8 +89,8 @@ func TestView(t *testing.T) { before = cursor.Value() v.MergeAt(5, 5, Value(66), Value(0)) // zero mask update = <-v.Inbox - assert.Equal(t, At(5, 5), update.Point) - assert.Equal(t, before, update.New) + assert.Equal(t, At(5, 5), update.New.Point) + assert.Equal(t, before, update.New.Value) // Close the view assert.NoError(t, v.Close()) @@ -99,6 +98,7 @@ func TestView(t *testing.T) { assert.Equal(t, 0, len(v.Inbox)) } +/* func TestObservers(t *testing.T) { ev := newObservers[uint32]() assert.NotNil(t, ev) @@ -128,20 +128,12 @@ func TestObservers(t *testing.T) { ev.Notify(&Update[uint32]{Point: At(2, 0)}) assert.Equal(t, 6, count) } +*/ -func TestObserversNil(t *testing.T) { - assert.NotPanics(t, func() { - var ev *observers[uint32] - ev.Notify(&Update[uint32]{Point: At(1, 0)}) - }) -} - -func TestStateUpdates(t *testing.T) { +func TestUpdates_Simple(t *testing.T) { m := mapFrom("300x300.png") - - // Create a new view c := counter(0) - v := NewView[string, string](m, "view 1") + v := NewView(m, "view 1") v.Resize(NewRect(0, 0, 10, 10), c.count) assert.NotNil(t, v) @@ -151,34 +143,56 @@ func TestStateUpdates(t *testing.T) { cursor, _ := v.At(5, 5) cursor.Write(Value(0xF0)) assert.Equal(t, Update[string]{ - Point: At(5, 5), - New: Value(0xF0), + Old: UpdateState[string]{ + Point: At(5, 5), + }, + New: UpdateState[string]{ + Point: At(5, 5), + Value: Value(0xF0), + }, }, <-v.Inbox) // Add an object to an observed tile cursor.Add("A") assert.Equal(t, Update[string]{ - Point: At(5, 5), - Old: Value(0xF0), - New: Value(0xF0), - Add: "A", + Old: UpdateState[string]{ + Point: At(5, 5), + Value: Value(0xF0), + Add: "A", + }, + New: UpdateState[string]{ + Point: At(5, 5), + Value: Value(0xF0), + Add: "A", + }, }, <-v.Inbox) // Delete an object from an observed tile cursor.Del("A") assert.Equal(t, Update[string]{ - Point: At(5, 5), - Old: Value(0xF0), - New: Value(0xF0), - Del: "A", + Old: UpdateState[string]{ + Point: At(5, 5), + Value: Value(0xF0), + Del: "A", + }, + New: UpdateState[string]{ + Point: At(5, 5), + Value: Value(0xF0), + Del: "A", + }, }, <-v.Inbox) // Mask a tile in view cursor.Mask(0xFF, 0x0F) assert.Equal(t, Update[string]{ - Point: At(5, 5), - Old: Value(0xF0), - New: Value(0xFF), + Old: UpdateState[string]{ + Point: At(5, 5), + Value: Value(0xF0), + }, + New: UpdateState[string]{ + Point: At(5, 5), + Value: Value(0xFF), + }, }, <-v.Inbox) // Merge a tile in view @@ -186,18 +200,87 @@ func TestStateUpdates(t *testing.T) { return 0xAA }) assert.Equal(t, Update[string]{ - Point: At(5, 5), - Old: Value(0xFF), - New: Value(0xAA), + Old: UpdateState[string]{ + Point: At(5, 5), + Value: Value(0xFF), + }, + New: UpdateState[string]{ + Point: At(5, 5), + Value: Value(0xAA), + }, }, <-v.Inbox) } -func TestObservers_MoveIncremental(t *testing.T) { +func TestMove_Within(t *testing.T) { + m := mapFrom("300x300.png") + c := counter(0) + v := NewView(m, "view 1") + v.Resize(NewRect(0, 0, 10, 10), c.count) + + // Add an object to an observed tile. This should only fire once since + // both the old and new states are the observed by the view. + cursor, _ := v.At(5, 5) + cursor.Move("A", At(6, 6)) + assert.Equal(t, Update[string]{ + Old: UpdateState[string]{ + Point: At(5, 5), + Del: "A", + }, + New: UpdateState[string]{ + Point: At(6, 6), + Add: "A", + }, + }, <-v.Inbox) +} + +func TestMove_Incoming(t *testing.T) { + m := mapFrom("300x300.png") + c := counter(0) + v := NewView(m, "view 1") + v.Resize(NewRect(0, 0, 10, 10), c.count) + + // Add an object to an observed tile from outside the view. + cursor, _ := v.At(20, 20) + cursor.Move("A", At(5, 5)) + assert.Equal(t, Update[string]{ + Old: UpdateState[string]{ + Point: At(20, 20), + Del: "A", + }, + New: UpdateState[string]{ + Point: At(5, 5), + Add: "A", + }, + }, <-v.Inbox) +} + +func TestMove_Outgoing(t *testing.T) { + m := mapFrom("300x300.png") + c := counter(0) + v := NewView(m, "view 1") + v.Resize(NewRect(0, 0, 10, 10), c.count) + + // Move an object from an observed tile outside of the view. + cursor, _ := v.At(5, 5) + cursor.Move("A", At(20, 20)) + assert.Equal(t, Update[string]{ + Old: UpdateState[string]{ + Point: At(5, 5), + Del: "A", + }, + New: UpdateState[string]{ + Point: At(20, 20), + Add: "A", + }, + }, <-v.Inbox) +} + +func TestView_MoveTo(t *testing.T) { m := mapFrom("300x300.png") // Create a new view c := counter(0) - v := NewView[string, string](m, "view 1") + v := NewView(m, "view 1") v.Resize(NewRect(10, 10, 12, 12), c.count) assert.NotNil(t, v)