From bbe306494480f80c3191c57dd90bb0ec7587266d Mon Sep 17 00:00:00 2001 From: caffix Date: Mon, 18 Nov 2024 10:14:21 -0500 Subject: [PATCH] updates to the cache implementation --- cache/edge.go | 163 +++++++++++++++++++++++++++++++++-- repository/sqlrepo/entity.go | 2 + repository/sqlrepo/tag.go | 4 + 3 files changed, 161 insertions(+), 8 deletions(-) diff --git a/cache/edge.go b/cache/edge.go index 68f1722..d273873 100644 --- a/cache/edge.go +++ b/cache/edge.go @@ -21,27 +21,174 @@ func (c *Cache) CreateEdge(edge *types.Edge) (*types.Edge, error) { return nil, err } - c.appendToDBQueue(func() { - _, _ = c.db.CreateEdge(edge) - }) + sub, err := c.cache.FindEntityById(e.FromEntity.ID) + if err != nil { + return nil, err + } + + obj, err := c.cache.FindEntityById(e.ToEntity.ID) + if err != nil { + return nil, err + } + + if tag, found := c.checkCacheEdgeTag(edge, "cache_create_edge"); !found { + if last, err := time.Parse("2006-01-02 15:04:05", tag.Property.Value()); err == nil && time.Now().Add(-1*c.freq).After(last) { + _ = c.cache.DeleteEdgeTag(tag.ID) + _ = c.createCacheEdgeTag(edge, "cache_create_edge") + + c.appendToDBQueue(func() { + s, err := c.db.FindEntityByContent(sub.Asset, time.Time{}) + if err != nil || len(s) != 1 { + return + } + + o, err := c.db.FindEntityByContent(obj.Asset, time.Time{}) + if err != nil || len(o) != 1 { + return + } + + _, _ = c.db.CreateEdge(&types.Edge{ + CreatedAt: edge.CreatedAt, + LastSeen: edge.LastSeen, + Relation: e.Relation, + FromEntity: s[0], + ToEntity: o[0], + }) + }) + } + } return e, nil } // IncomingEdges implements the Repository interface. func (c *Cache) IncomingEdges(entity *types.Entity, since time.Time, labels ...string) ([]*types.Edge, error) { - c.Lock() - defer c.Unlock() + var dbquery bool + + if since.IsZero() || since.Before(c.start) { + c.Lock() + if _, found := c.checkCacheEntityTag(entity, "cache_incoming_edges"); !found { + dbquery = true + } + c.Unlock() + } + + if dbquery { + var dberr error + var dbedges []*types.Edge + + done := make(chan struct{}, 1) + c.appendToDBQueue(func() { + defer func() { done <- struct{}{} }() + + if e, err := c.db.FindEntityByContent(entity.Asset, time.Time{}); err == nil && len(e) == 1 { + dbedges, dberr = c.db.IncomingEdges(e[0], since, labels...) + + for i, edge := range dbedges { + if e, err := c.db.FindEntityById(edge.ToEntity.ID); err == nil && e != nil { + dbedges[i].ToEntity = e + } + } + } + }) + <-done + close(done) + + c.Lock() + defer c.Unlock() + + _ = c.createCacheEntityTag(entity, "cache_incoming_edges") + + if dberr == nil && len(dbedges) > 0 { + for _, edge := range dbedges { + e, err := c.cache.CreateEntity(&types.Entity{ + CreatedAt: edge.ToEntity.CreatedAt, + LastSeen: edge.ToEntity.LastSeen, + Asset: edge.ToEntity.Asset, + }) + + if err == nil && e != nil { + _, _ = c.cache.CreateEdge(&types.Edge{ + CreatedAt: edge.CreatedAt, + LastSeen: edge.LastSeen, + Relation: edge.Relation, + FromEntity: entity, + ToEntity: e, + }) + } + } + } + } else { + c.Lock() + defer c.Unlock() + } return c.cache.IncomingEdges(entity, since, labels...) } // OutgoingEdges implements the Repository interface. func (c *Cache) OutgoingEdges(entity *types.Entity, since time.Time, labels ...string) ([]*types.Edge, error) { - c.Lock() - defer c.Unlock() + var dbquery bool + + if since.IsZero() || since.Before(c.start) { + c.Lock() + if _, found := c.checkCacheEntityTag(entity, "cache_outgoing_edges"); !found { + dbquery = true + } + c.Unlock() + } - return c.cache.OutgoingEdges(entity, since, labels...) + if dbquery { + var dberr error + var dbedges []*types.Edge + + done := make(chan struct{}, 1) + c.appendToDBQueue(func() { + defer func() { done <- struct{}{} }() + + if e, err := c.db.FindEntityByContent(entity.Asset, time.Time{}); err == nil && len(e) == 1 { + dbedges, dberr = c.db.IncomingEdges(e[0], since, labels...) + + for i, edge := range dbedges { + if e, err := c.db.FindEntityById(edge.ToEntity.ID); err == nil && e != nil { + dbedges[i].ToEntity = e + } + } + } + }) + <-done + close(done) + + c.Lock() + defer c.Unlock() + + _ = c.createCacheEntityTag(entity, "cache_outgoing_edges") + + if dberr == nil && len(dbedges) > 0 { + for _, edge := range dbedges { + e, err := c.cache.CreateEntity(&types.Entity{ + CreatedAt: edge.ToEntity.CreatedAt, + LastSeen: edge.ToEntity.LastSeen, + Asset: edge.ToEntity.Asset, + }) + + if err == nil && e != nil { + _, _ = c.cache.CreateEdge(&types.Edge{ + CreatedAt: edge.CreatedAt, + LastSeen: edge.LastSeen, + Relation: edge.Relation, + FromEntity: entity, + ToEntity: e, + }) + } + } + } + } else { + c.Lock() + defer c.Unlock() + } + + return c.cache.IncomingEdges(entity, since, labels...) } // DeleteEdge implements the Repository interface. diff --git a/repository/sqlrepo/entity.go b/repository/sqlrepo/entity.go index 844552b..911f334 100644 --- a/repository/sqlrepo/entity.go +++ b/repository/sqlrepo/entity.go @@ -36,6 +36,8 @@ func (sql *sqlRepository) CreateEntity(input *types.Entity) (*types.Entity, erro if input.Asset.AssetType() == e.Asset.AssetType() { if id, err := strconv.ParseUint(e.ID, 10, 64); err == nil { entity.ID = id + entity.CreatedAt = e.CreatedAt + entity.UpdatedAt = time.Now().UTC() } } } else { diff --git a/repository/sqlrepo/tag.go b/repository/sqlrepo/tag.go index 161dc9b..7b5bf4e 100644 --- a/repository/sqlrepo/tag.go +++ b/repository/sqlrepo/tag.go @@ -40,6 +40,8 @@ func (sql *sqlRepository) CreateEntityTag(entity *types.Entity, input *types.Ent if input.Property.PropertyType() == t.Property.PropertyType() && input.Property.Value() == t.Property.Value() { if id, err := strconv.ParseUint(t.ID, 10, 64); err == nil { tag.ID = id + tag.CreatedAt = t.CreatedAt + tag.UpdatedAt = time.Now().UTC() break } } @@ -211,6 +213,8 @@ func (sql *sqlRepository) CreateEdgeTag(edge *types.Edge, input *types.EdgeTag) if input.Property.PropertyType() == t.Property.PropertyType() && input.Property.Value() == t.Property.Value() { if id, err := strconv.ParseUint(t.ID, 10, 64); err == nil { tag.ID = id + tag.CreatedAt = t.CreatedAt + tag.UpdatedAt = time.Now().UTC() break } }