Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Increase combiner request batch size #299

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions clients/graphql/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,6 @@ func makeRequest(ctx context.Context, client string, req *graphql.Request, heade
}
defer httpResp.Body.Close()

if httpResp.StatusCode == http.StatusInternalServerError {
return nil, ErrResponseSizeTooBig
}

if httpResp.StatusCode != http.StatusOK {
var respBody []byte
respBody, err = io.ReadAll(httpResp.Body)
Expand Down
145 changes: 77 additions & 68 deletions clients/graphql/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,9 @@ func createGetAlertsQuery(inputs []*AlertsInput) (string, map[string]interface{}
variables := make(map[string]interface{})
var queryBuilder strings.Builder

// Define fragment
queryBuilder.WriteString(getAlertsFragment)

// Define the operation with necessary variables
queryBuilder.WriteString("query getAlerts(")
for i := range inputs {
Expand Down Expand Up @@ -294,73 +297,79 @@ pageInfo {
}
}
alerts {
alertId
addresses
contracts {
name
projectId
}
createdAt
description
hash
metadata
name
projects {
id
}
protocol
scanNodeCount
severity
source {
transactionHash
bot {
chainIds
createdAt
description
developer
docReference
enabled
id
image
name
reference
repository
projects
scanNodes
version
}
block {
number
hash
timestamp
chainId
}
sourceAlert {
hash
botId
timestamp
chainId
}
}
alertDocumentType
findingType
relatedAlerts
chainId
labels {
label
confidence
entity
entityType
remove
metadata
uniqueKey
embedding
}
addressBloomFilter {
bitset
itemCount
k
m
}
...alertDetails
}
`

const getAlertsFragment = `
fragment alertDetails on Alert {
alertId
addresses
contracts {
name
projectId
}
createdAt
description
hash
metadata
name
projects {
id
}
protocol
scanNodeCount
severity
source {
transactionHash
bot {
chainIds
createdAt
description
developer
docReference
enabled
id
image
name
reference
repository
projects
scanNodes
version
}
block {
number
hash
timestamp
chainId
}
sourceAlert {
hash
botId
timestamp
chainId
}
}
alertDocumentType
findingType
relatedAlerts
chainId
labels {
label
confidence
entity
entityType
remove
metadata
uniqueKey
embedding
}
addressBloomFilter {
bitset
itemCount
k
m
}
}
`
142 changes: 73 additions & 69 deletions clients/graphql/models_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,78 @@ func Test_createGetAlertsQuery(t *testing.T) {
assert.Equal(t, variable.AlertId, testInput.AlertId)
}

const mockExpectedQuery = `query getAlerts($input0: AlertsInput) {alerts0: alerts(input: $input0) {
const mockExpectedQuery = `
fragment alertDetails on Alert {
alertId
addresses
contracts {
name
projectId
}
createdAt
description
hash
metadata
name
projects {
id
}
protocol
scanNodeCount
severity
source {
transactionHash
bot {
chainIds
createdAt
description
developer
docReference
enabled
id
image
name
reference
repository
projects
scanNodes
version
}
block {
number
hash
timestamp
chainId
}
sourceAlert {
hash
botId
timestamp
chainId
}
}
alertDocumentType
findingType
relatedAlerts
chainId
labels {
label
confidence
entity
entityType
remove
metadata
uniqueKey
embedding
}
addressBloomFilter {
bitset
itemCount
k
m
}
}
query getAlerts($input0: AlertsInput) {alerts0: alerts(input: $input0) {
pageInfo {
hasNextPage
endCursor {
Expand All @@ -26,73 +97,6 @@ pageInfo {
}
}
alerts {
alertId
addresses
contracts {
name
projectId
}
createdAt
description
hash
metadata
name
projects {
id
}
protocol
scanNodeCount
severity
source {
transactionHash
bot {
chainIds
createdAt
description
developer
docReference
enabled
id
image
name
reference
repository
projects
scanNodes
version
}
block {
number
hash
timestamp
chainId
}
sourceAlert {
hash
botId
timestamp
chainId
}
}
alertDocumentType
findingType
relatedAlerts
chainId
labels {
label
confidence
entity
entityType
remove
metadata
uniqueKey
embedding
}
addressBloomFilter {
bitset
itemCount
k
m
}
...alertDetails
}
}}`
9 changes: 7 additions & 2 deletions feeds/combiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ var (
)

const (
DefaultBatchSize = 10
DefaultBatchSize = 50
)

type cfHandler struct {
Expand Down Expand Up @@ -129,6 +129,7 @@ type CombinerFeedConfig struct {
Start uint64
End uint64
CombinerCachePath string
BatchSize int
}

func (cf *combinerFeed) Start() {
Expand Down Expand Up @@ -437,6 +438,10 @@ func NewCombinerFeedWithClient(ctx context.Context, cfg CombinerFeedConfig, clie
rateLimit = time.NewTicker(time.Millisecond * time.Duration(cfg.QueryInterval))
}

if cfg.BatchSize == 0 {
cfg.BatchSize = DefaultBatchSize
}

bf := &combinerFeed{
maxAlertAge: time.Minute * 20,
ctx: ctx,
Expand All @@ -446,7 +451,7 @@ func NewCombinerFeedWithClient(ctx context.Context, cfg CombinerFeedConfig, clie
botSubscriptions: []*domain.CombinerBotSubscription{},
cfg: cfg,
combinerCache: c,
batchSize: DefaultBatchSize,
batchSize: cfg.BatchSize,
}

return bf, nil
Expand Down
3 changes: 3 additions & 0 deletions feeds/combiner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func Test_combinerFeed_Start(t *testing.T) {
cf, err := NewCombinerFeedWithClient(
ctx, CombinerFeedConfig{
QueryInterval: rate,
BatchSize: 10,
}, successfulMockClient,
)

Expand Down Expand Up @@ -99,6 +100,7 @@ func Test_combinerFeed_Start(t *testing.T) {
cfTooBig, err := NewCombinerFeedWithClient(
ctxTooBig, CombinerFeedConfig{
QueryInterval: rate,
BatchSize: 10,
}, responseTooBigClient,
)
assert.NoError(t, err)
Expand Down Expand Up @@ -141,6 +143,7 @@ func Test_combinerFeed_Start(t *testing.T) {
cfUnauth, err := NewCombinerFeedWithClient(
ctxUnauth, CombinerFeedConfig{
QueryInterval: rate,
BatchSize: 10,
}, unauthClient,
)
assert.NoError(t, err)
Expand Down
Loading