Skip to content

Commit

Permalink
add ip resolution
Browse files Browse the repository at this point in the history
  • Loading branch information
noboruma committed Jun 11, 2024
1 parent a597934 commit 89e8c26
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 6 deletions.
9 changes: 8 additions & 1 deletion deepfence_server/handler/agent_report.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
jsoniter "github.com/json-iterator/go"

"github.com/deepfence/ThreatMapper/deepfence_server/ingesters"
"github.com/deepfence/ThreatMapper/deepfence_server/model"
"github.com/deepfence/ThreatMapper/deepfence_server/pkg/scope/report"
"github.com/deepfence/ThreatMapper/deepfence_utils/controls"
"github.com/deepfence/ThreatMapper/deepfence_utils/directory"
Expand Down Expand Up @@ -40,7 +41,13 @@ func getAgentReportIngester(ctx context.Context) (*ingesters.Ingester[report.Com
if has {
return ing.(*ingesters.Ingester[report.CompressedReport]), nil
}
newEntry, err := ingesters.NewNeo4jCollector(ctx)

token, err := model.SimpleFetchLicense(ctx)
if err != nil {
log.Error().Err(err).Msg("Continuing without license key")
}

newEntry, err := ingesters.NewNeo4jCollector(ctx, token)
if err != nil {
return nil, err
}
Expand Down
120 changes: 115 additions & 5 deletions deepfence_server/ingesters/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,19 @@ package ingesters
import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/deepfence/ThreatMapper/deepfence_server/pkg/constants"
"github.com/deepfence/ThreatMapper/deepfence_server/pkg/scope/report"
"github.com/deepfence/ThreatMapper/deepfence_utils/directory"
"github.com/deepfence/ThreatMapper/deepfence_utils/log"
Expand Down Expand Up @@ -485,7 +490,7 @@ func NewReportIngestionData() ReportIngestionData {
}
}

func prepareNeo4jIngestion(rpt *report.Report, resolvers *EndpointResolversCache, buf *bytes.Buffer) ReportIngestionData {
func prepareNeo4jIngestion(rpt *report.Report, resolvers *EndpointResolversCache, buf *bytes.Buffer, token string) ReportIngestionData {

res := ReportIngestionData{
Hosts: make([]map[string]interface{}, 0, len(rpt.Host)),
Expand Down Expand Up @@ -620,6 +625,7 @@ func prepareNeo4jIngestion(rpt *report.Report, resolvers *EndpointResolversCache
}
}
}
connections = resolveCloudService(connections, token)
res.EndpointEdgesBatch = connections2maps(connections, buf)

processEdgesBatch := map[string][]string{}
Expand Down Expand Up @@ -1038,11 +1044,11 @@ func (nc *neo4jIngester) runDBPusher(
log.Info().Msgf("runDBPusher ended")
}

func (nc *neo4jIngester) runPreparer() {
func (nc *neo4jIngester) runPreparer(token string) {
var buf bytes.Buffer
for rpt := range nc.preparersInput {
r := computeResolvers(&rpt, &buf)
data := prepareNeo4jIngestion(&rpt, nc.resolvers, &buf)
data := prepareNeo4jIngestion(&rpt, nc.resolvers, &buf, token)
select {
case nc.batcher <- data:
nc.resolversUpdate <- r
Expand All @@ -1052,7 +1058,7 @@ func (nc *neo4jIngester) runPreparer() {
}
}

func NewNeo4jCollector(ctx context.Context) (Ingester[report.CompressedReport], error) {
func NewNeo4jCollector(ctx context.Context, token string) (Ingester[report.CompressedReport], error) {
rdb, err := newEndpointResolversCache(ctx)

if err != nil {
Expand Down Expand Up @@ -1112,7 +1118,7 @@ func NewNeo4jCollector(ctx context.Context) (Ingester[report.CompressedReport],
}()

for i := 0; i < preparerWorkersNum; i++ {
go nc.runPreparer()
go nc.runPreparer(token)
}

go func() {
Expand Down Expand Up @@ -1332,3 +1338,107 @@ func GetPushBack(ctx context.Context, driver neo4j.DriverWithContext) (int32, er
}
return int32(rec.Values[0].(int64)), nil
}

const (
threatIntelResolverURL = "https://threat-intel.deepfence.io/threat-intel"
)

type CloudInfo struct {
Type string `json:"type"`
Region string `json:"region"`
Provider string `json:"provider"`
}

func (ci *CloudInfo) NodeID() string {
return fmt.Sprintf("%s-%s", ci.Provider, ci.Type)
}

type IPResponse struct {
Infos []CloudInfo `json:"Infos"`
}

type IPRequest struct {
IPv4s []string `json:"ipv4s"`
IPv6s []string `json:"ipv6s"`
}

func requestCloudInfo(ctx context.Context, strIps []string, token string) ([]CloudInfo, error) {
// check if token is present
var infos []CloudInfo

bodyReq := IPRequest{
IPv4s: strIps,
}
b, err := json.Marshal(bodyReq)
if err != nil {
return infos, err
}

req, err := http.NewRequestWithContext(ctx, http.MethodGet, threatIntelResolverURL+"/cloud-ips", bytes.NewReader(b))
if err != nil {
return infos, err
}

req.Header.Set("x-license-key", token)

q := req.URL.Query()
q.Add("version", constants.Version)
q.Add("product", utils.Project)
req.URL.RawQuery = q.Encode()

log.Info().Msgf("query threatintel at %s", req.URL.String())

tr := http.DefaultTransport.(*http.Transport).Clone()
tr.TLSClientConfig = &tls.Config{
InsecureSkipVerify: true,
}
hc := http.Client{
Timeout: 1 * time.Minute,
Transport: tr,
}
resp, err := hc.Do(req)
if err != nil {
log.Error().Err(err).Msg("failed http request")
return infos, err
}

if resp.StatusCode != http.StatusOK {
return infos, fmt.Errorf("%d invaid response code", resp.StatusCode)
}

body, err := io.ReadAll(resp.Body)
if err != nil {
log.Error().Err(err).Msg("failed read response body")
return infos, err
}
defer resp.Body.Close()

var res IPResponse
if err := json.Unmarshal(body, &res); err != nil {
log.Error().Err(err).Msg("failed to decode response body")
return infos, err
}

return res.Infos, nil
}

func resolveCloudService(connections []Connection, token string) []Connection {
ips := []string{}
for i := range connections {
if connections[i].rightIP != nil {
ips = append(ips, *connections[i].rightIP)
}
}
if len(ips) == 0 {
return connections
}
infos, err := requestCloudInfo(context.Background(), ips, token)
if err != nil || len(connections) != len(infos) {
log.Error().Err(err).Msgf("issue fetching cloud infos %d/%d", len(infos), len(connections))
return connections
}
for i := range infos {
connections[i].destination = infos[i].NodeID()
}
return connections
}
14 changes: 14 additions & 0 deletions deepfence_server/model/license.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"net/http"
"time"

"github.com/deepfence/ThreatMapper/deepfence_utils/directory"
"github.com/deepfence/ThreatMapper/deepfence_utils/log"
postgresqlDb "github.com/deepfence/ThreatMapper/deepfence_utils/postgresql/postgresql-db"
"github.com/deepfence/ThreatMapper/deepfence_utils/utils"
Expand Down Expand Up @@ -371,3 +372,16 @@ func formatDate(dt time.Time) string {
func parseDate(dateString string) (time.Time, error) {
return time.Parse(DateLayout1, dateString)
}

// FetchLicense gets license key from database
func SimpleFetchLicense(ctx context.Context) (string, error) {
pgClient, err := directory.PostgresClient(ctx)
if err != nil {
return "", err
}
license, err := GetLicense(ctx, pgClient)
if err != nil {
return "", err
}
return license.LicenseKey, nil
}

0 comments on commit 89e8c26

Please sign in to comment.