Skip to content

Commit

Permalink
updates to the cache implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
caffix committed Nov 18, 2024
1 parent fa1e376 commit bbe3064
Show file tree
Hide file tree
Showing 3 changed files with 161 additions and 8 deletions.
163 changes: 155 additions & 8 deletions cache/edge.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,27 +21,174 @@ func (c *Cache) CreateEdge(edge *types.Edge) (*types.Edge, error) {
return nil, err
}

c.appendToDBQueue(func() {
_, _ = c.db.CreateEdge(edge)
})
sub, err := c.cache.FindEntityById(e.FromEntity.ID)
if err != nil {
return nil, err
}

obj, err := c.cache.FindEntityById(e.ToEntity.ID)
if err != nil {
return nil, err
}

if tag, found := c.checkCacheEdgeTag(edge, "cache_create_edge"); !found {
if last, err := time.Parse("2006-01-02 15:04:05", tag.Property.Value()); err == nil && time.Now().Add(-1*c.freq).After(last) {
_ = c.cache.DeleteEdgeTag(tag.ID)
_ = c.createCacheEdgeTag(edge, "cache_create_edge")

c.appendToDBQueue(func() {
s, err := c.db.FindEntityByContent(sub.Asset, time.Time{})
if err != nil || len(s) != 1 {
return
}

o, err := c.db.FindEntityByContent(obj.Asset, time.Time{})
if err != nil || len(o) != 1 {
return
}

_, _ = c.db.CreateEdge(&types.Edge{
CreatedAt: edge.CreatedAt,
LastSeen: edge.LastSeen,
Relation: e.Relation,
FromEntity: s[0],
ToEntity: o[0],
})
})
}
}

return e, nil
}

// IncomingEdges implements the Repository interface.
func (c *Cache) IncomingEdges(entity *types.Entity, since time.Time, labels ...string) ([]*types.Edge, error) {
c.Lock()
defer c.Unlock()
var dbquery bool

if since.IsZero() || since.Before(c.start) {
c.Lock()
if _, found := c.checkCacheEntityTag(entity, "cache_incoming_edges"); !found {
dbquery = true
}
c.Unlock()
}

if dbquery {
var dberr error
var dbedges []*types.Edge

done := make(chan struct{}, 1)
c.appendToDBQueue(func() {
defer func() { done <- struct{}{} }()

if e, err := c.db.FindEntityByContent(entity.Asset, time.Time{}); err == nil && len(e) == 1 {
dbedges, dberr = c.db.IncomingEdges(e[0], since, labels...)

for i, edge := range dbedges {
if e, err := c.db.FindEntityById(edge.ToEntity.ID); err == nil && e != nil {
dbedges[i].ToEntity = e
}
}
}
})
<-done
close(done)

c.Lock()
defer c.Unlock()

_ = c.createCacheEntityTag(entity, "cache_incoming_edges")

if dberr == nil && len(dbedges) > 0 {
for _, edge := range dbedges {
e, err := c.cache.CreateEntity(&types.Entity{
CreatedAt: edge.ToEntity.CreatedAt,
LastSeen: edge.ToEntity.LastSeen,
Asset: edge.ToEntity.Asset,
})

if err == nil && e != nil {
_, _ = c.cache.CreateEdge(&types.Edge{
CreatedAt: edge.CreatedAt,
LastSeen: edge.LastSeen,
Relation: edge.Relation,
FromEntity: entity,
ToEntity: e,
})
}
}
}
} else {
c.Lock()
defer c.Unlock()
}

return c.cache.IncomingEdges(entity, since, labels...)
}

// OutgoingEdges implements the Repository interface.
func (c *Cache) OutgoingEdges(entity *types.Entity, since time.Time, labels ...string) ([]*types.Edge, error) {
c.Lock()
defer c.Unlock()
var dbquery bool

if since.IsZero() || since.Before(c.start) {
c.Lock()
if _, found := c.checkCacheEntityTag(entity, "cache_outgoing_edges"); !found {
dbquery = true
}
c.Unlock()
}

return c.cache.OutgoingEdges(entity, since, labels...)
if dbquery {
var dberr error
var dbedges []*types.Edge

done := make(chan struct{}, 1)
c.appendToDBQueue(func() {
defer func() { done <- struct{}{} }()

if e, err := c.db.FindEntityByContent(entity.Asset, time.Time{}); err == nil && len(e) == 1 {
dbedges, dberr = c.db.IncomingEdges(e[0], since, labels...)

for i, edge := range dbedges {
if e, err := c.db.FindEntityById(edge.ToEntity.ID); err == nil && e != nil {
dbedges[i].ToEntity = e
}
}
}
})
<-done
close(done)

c.Lock()
defer c.Unlock()

_ = c.createCacheEntityTag(entity, "cache_outgoing_edges")

if dberr == nil && len(dbedges) > 0 {
for _, edge := range dbedges {
e, err := c.cache.CreateEntity(&types.Entity{
CreatedAt: edge.ToEntity.CreatedAt,
LastSeen: edge.ToEntity.LastSeen,
Asset: edge.ToEntity.Asset,
})

if err == nil && e != nil {
_, _ = c.cache.CreateEdge(&types.Edge{
CreatedAt: edge.CreatedAt,
LastSeen: edge.LastSeen,
Relation: edge.Relation,
FromEntity: entity,
ToEntity: e,
})
}
}
}
} else {
c.Lock()
defer c.Unlock()
}

return c.cache.IncomingEdges(entity, since, labels...)
}

// DeleteEdge implements the Repository interface.
Expand Down
2 changes: 2 additions & 0 deletions repository/sqlrepo/entity.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ func (sql *sqlRepository) CreateEntity(input *types.Entity) (*types.Entity, erro
if input.Asset.AssetType() == e.Asset.AssetType() {
if id, err := strconv.ParseUint(e.ID, 10, 64); err == nil {
entity.ID = id
entity.CreatedAt = e.CreatedAt
entity.UpdatedAt = time.Now().UTC()
}
}
} else {
Expand Down
4 changes: 4 additions & 0 deletions repository/sqlrepo/tag.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ func (sql *sqlRepository) CreateEntityTag(entity *types.Entity, input *types.Ent
if input.Property.PropertyType() == t.Property.PropertyType() && input.Property.Value() == t.Property.Value() {
if id, err := strconv.ParseUint(t.ID, 10, 64); err == nil {
tag.ID = id
tag.CreatedAt = t.CreatedAt
tag.UpdatedAt = time.Now().UTC()
break
}
}
Expand Down Expand Up @@ -211,6 +213,8 @@ func (sql *sqlRepository) CreateEdgeTag(edge *types.Edge, input *types.EdgeTag)
if input.Property.PropertyType() == t.Property.PropertyType() && input.Property.Value() == t.Property.Value() {
if id, err := strconv.ParseUint(t.ID, 10, 64); err == nil {
tag.ID = id
tag.CreatedAt = t.CreatedAt
tag.UpdatedAt = time.Now().UTC()
break
}
}
Expand Down

0 comments on commit bbe3064

Please sign in to comment.