Skip to content

Commit

Permalink
performance improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
caffix committed Dec 4, 2024
1 parent ba05937 commit a526899
Show file tree
Hide file tree
Showing 9 changed files with 417 additions and 401 deletions.
54 changes: 0 additions & 54 deletions cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ package cache
import (
"time"

"github.com/caffix/queue"
"github.com/owasp-amass/asset-db/repository"
"github.com/owasp-amass/asset-db/types"
"github.com/owasp-amass/open-asset-model/property"
Expand All @@ -16,23 +15,18 @@ import (
type Cache struct {
start time.Time
freq time.Duration
done chan struct{}
cache repository.Repository
db repository.Repository
queue queue.Queue
}

func New(cache, database repository.Repository, freq time.Duration) (*Cache, error) {
c := &Cache{
start: time.Now(),
freq: freq,
done: make(chan struct{}, 1),
cache: cache,
db: database,
queue: queue.NewQueue(),
}

go c.processDBCallbacks()
return c, nil
}

Expand All @@ -43,15 +37,6 @@ func (c *Cache) StartTime() time.Time {

// Close implements the Repository interface.
func (c *Cache) Close() error {
close(c.done)

for {
if c.queue.Empty() {
break
}
time.Sleep(2 * time.Second)
}

return c.cache.Close()
}

Expand All @@ -60,45 +45,6 @@ func (c *Cache) GetDBType() string {
return c.db.GetDBType()
}

func (c *Cache) appendToDBQueue(callback func()) {
select {
case <-c.done:
return
default:
}
c.queue.Append(callback)
}

func (c *Cache) processDBCallbacks() {
t := time.NewTicker(100 * time.Millisecond)
defer t.Stop()
loop:
for {
select {
case <-c.done:
break loop
case <-c.queue.Signal():
if element, ok := c.queue.Next(); ok {
if callback, success := element.(func()); success {
callback()
}
}
case <-t.C:
if element, ok := c.queue.Next(); ok {
if callback, success := element.(func()); success {
callback()
}
}
}
}
// execute the remaining callbacks in the queue
c.queue.Process(func(data interface{}) {
if callback, success := data.(func()); success {
callback()
}
})
}

func (c *Cache) createCacheEntityTag(entity *types.Entity, name string, since time.Time) error {
_, err := c.cache.CreateEntityProperty(entity, &property.SimpleProperty{
PropertyName: name,
Expand Down
116 changes: 49 additions & 67 deletions cache/edge.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,28 +34,26 @@ func (c *Cache) CreateEdge(edge *types.Edge) (*types.Edge, error) {
}
_ = c.createCacheEdgeTag(e, "cache_create_edge", time.Now())

c.appendToDBQueue(func() {
s, err := c.db.FindEntityByContent(sub.Asset, time.Time{})
if err != nil || len(s) != 1 {
return
}
s, err := c.db.FindEntityByContent(sub.Asset, time.Time{})
if err != nil || len(s) != 1 {
return nil, err
}

o, err := c.db.FindEntityByContent(obj.Asset, time.Time{})
if err != nil || len(o) != 1 {
return
}
o, err := c.db.FindEntityByContent(obj.Asset, time.Time{})
if err != nil || len(o) != 1 {
return nil, err
}

_, _ = c.db.CreateEdge(&types.Edge{
CreatedAt: edge.CreatedAt,
LastSeen: edge.LastSeen,
Relation: e.Relation,
FromEntity: s[0],
ToEntity: o[0],
})
_, _ = c.db.CreateEdge(&types.Edge{
CreatedAt: edge.CreatedAt,
LastSeen: edge.LastSeen,
Relation: e.Relation,
FromEntity: s[0],
ToEntity: o[0],
})
}

return e, nil
return e, err
}

// FindEdgeById implements the Repository interface.
Expand All @@ -81,22 +79,15 @@ func (c *Cache) IncomingEdges(entity *types.Entity, since time.Time, labels ...s
var dberr error
var dbedges []*types.Edge

done := make(chan struct{}, 1)
c.appendToDBQueue(func() {
defer func() { done <- struct{}{} }()
if e, err := c.db.FindEntityByContent(entity.Asset, time.Time{}); err == nil && len(e) == 1 {
dbedges, dberr = c.db.IncomingEdges(e[0], since)

if e, err := c.db.FindEntityByContent(entity.Asset, time.Time{}); err == nil && len(e) == 1 {
dbedges, dberr = c.db.IncomingEdges(e[0], since)

for i, edge := range dbedges {
if e, err := c.db.FindEntityById(edge.ToEntity.ID); err == nil && e != nil {
dbedges[i].ToEntity = e
}
for i, edge := range dbedges {
if e, err := c.db.FindEntityById(edge.ToEntity.ID); err == nil && e != nil {
dbedges[i].ToEntity = e
}
}
})
<-done
close(done)
}

if dberr == nil && len(dbedges) > 0 {
for _, edge := range dbedges {
Expand Down Expand Up @@ -140,22 +131,15 @@ func (c *Cache) OutgoingEdges(entity *types.Entity, since time.Time, labels ...s
var dberr error
var dbedges []*types.Edge

done := make(chan struct{}, 1)
c.appendToDBQueue(func() {
defer func() { done <- struct{}{} }()

if e, err := c.db.FindEntityByContent(entity.Asset, time.Time{}); err == nil && len(e) == 1 {
dbedges, dberr = c.db.OutgoingEdges(e[0], since)
if e, err := c.db.FindEntityByContent(entity.Asset, time.Time{}); err == nil && len(e) == 1 {
dbedges, dberr = c.db.OutgoingEdges(e[0], since)

for i, edge := range dbedges {
if e, err := c.db.FindEntityById(edge.ToEntity.ID); err == nil && e != nil {
dbedges[i].ToEntity = e
}
for i, edge := range dbedges {
if e, err := c.db.FindEntityById(edge.ToEntity.ID); err == nil && e != nil {
dbedges[i].ToEntity = e
}
}
})
<-done
close(done)
}

if dberr == nil && len(dbedges) > 0 {
for _, edge := range dbedges {
Expand Down Expand Up @@ -202,33 +186,31 @@ func (c *Cache) DeleteEdge(id string) error {
return err
}

c.appendToDBQueue(func() {
s, err := c.db.FindEntityByContent(sub.Asset, time.Time{})
if err != nil || len(s) != 1 {
return
}
s, err := c.db.FindEntityByContent(sub.Asset, time.Time{})
if err != nil || len(s) != 1 {
return err
}

o, err := c.db.FindEntityByContent(obj.Asset, time.Time{})
if err != nil || len(o) != 1 {
return
}
o, err := c.db.FindEntityByContent(obj.Asset, time.Time{})
if err != nil || len(o) != 1 {
return err
}

edges, err := c.db.OutgoingEdges(s[0], time.Time{}, edge.Relation.Label())
if err != nil || len(edges) == 0 {
return
}
edges, err := c.db.OutgoingEdges(s[0], time.Time{}, edge.Relation.Label())
if err != nil || len(edges) == 0 {
return err
}

var target *types.Edge
for _, e := range edges {
if e.ToEntity.ID == o[0].ID && reflect.DeepEqual(e.Relation, edge.Relation) {
target = e
break
}
var target *types.Edge
for _, e := range edges {
if e.ToEntity.ID == o[0].ID && reflect.DeepEqual(e.Relation, edge.Relation) {
target = e
break
}
if target != nil {
_ = c.db.DeleteEdge(target.ID)
}
})
}
if target != nil {
err = c.db.DeleteEdge(target.ID)
}

return nil
return err
}
Loading

0 comments on commit a526899

Please sign in to comment.