From e0396f5717a9925c558fe95ce0b27580a0d2e0f2 Mon Sep 17 00:00:00 2001 From: Thomas Legris Date: Tue, 11 Jun 2024 14:20:56 +0900 Subject: [PATCH 1/4] add cloud services ingester --- deepfence_utils/utils/constants.go | 4 +- deepfence_worker/cronjobs/service_resolver.go | 200 ++++++++++++++++++ deepfence_worker/cronscheduler/scheduler.go | 9 +- deepfence_worker/worker.go | 2 + 4 files changed, 213 insertions(+), 2 deletions(-) create mode 100644 deepfence_worker/cronjobs/service_resolver.go diff --git a/deepfence_utils/utils/constants.go b/deepfence_utils/utils/constants.go index c45e53a2fd..73e510644f 100644 --- a/deepfence_utils/utils/constants.go +++ b/deepfence_utils/utils/constants.go @@ -67,7 +67,8 @@ const ( UpdateLicenseTask = "update_license" ReportLicenseUsageTask = "report_license_usage" - ThreatIntelUpdateTask = "threat_intel_update" + ThreatIntelUpdateTask = "threat_intel_update" + CloudServicesUpdateTask = "cloud_services_update" ) const ( @@ -238,6 +239,7 @@ var Tasks = []string{ ReportLicenseUsageTask, ThreatIntelUpdateTask, + CloudServicesUpdateTask, } type ReportType string diff --git a/deepfence_worker/cronjobs/service_resolver.go b/deepfence_worker/cronjobs/service_resolver.go new file mode 100644 index 0000000000..560b49a15b --- /dev/null +++ b/deepfence_worker/cronjobs/service_resolver.go @@ -0,0 +1,200 @@ +package cronjobs + +import ( + "bytes" + "context" + "crypto/tls" + "encoding/json" + "fmt" + "io" + "net/http" + "time" + + "github.com/deepfence/ThreatMapper/deepfence_utils/directory" + "github.com/deepfence/ThreatMapper/deepfence_utils/log" + "github.com/deepfence/ThreatMapper/deepfence_utils/utils" + wutils "github.com/deepfence/ThreatMapper/deepfence_worker/utils" + "github.com/hibiken/asynq" + "github.com/neo4j/neo4j-go-driver/v5/neo4j" +) + +const ( + threatIntelResolverURL = "https://threat-intel.deepfence.io/threat-intel" +) + +type CloudInfo struct { + Type string `json:"type"` + Region string `json:"region"` + Provider string `json:"provider"` +} + +type IPResponse struct { + Infos []CloudInfo `json:"Infos"` +} + +type ServicesResponse struct { + Infos []string `json:"infos"` +} + +type IPRequest struct { + IPv4s []string `json:"ipv4s"` + IPv6s []string `json:"ipv6s"` +} + +func requestCloudInfo(ctx context.Context, strIps []string) ([]CloudInfo, error) { + // check if token is present + var infos []CloudInfo + token, err := FetchLicense(ctx) + if err != nil || token == "" { + log.Error().Err(err).Msg("token is required to access threat intel") + return infos, err + } + + bodyReq := IPRequest{ + IPv4s: strIps, + } + b, err := json.Marshal(bodyReq) + if err != nil { + return infos, err + } + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, threatIntelResolverURL+"/cloud-ips", bytes.NewReader(b)) + if err != nil { + return infos, err + } + + req.Header.Set("x-license-key", token) + + q := req.URL.Query() + q.Add("version", wutils.Version) + q.Add("product", utils.Project) + req.URL.RawQuery = q.Encode() + + log.Info().Msgf("query threatintel at %s", req.URL.String()) + + tr := http.DefaultTransport.(*http.Transport).Clone() + tr.TLSClientConfig = &tls.Config{ + InsecureSkipVerify: true, + } + hc := http.Client{ + Timeout: 1 * time.Minute, + Transport: tr, + } + resp, err := hc.Do(req) + if err != nil { + log.Error().Err(err).Msg("failed http request") + return infos, err + } + + if resp.StatusCode != http.StatusOK { + return infos, fmt.Errorf("%d invaid response code", resp.StatusCode) + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + log.Error().Err(err).Msg("failed read response body") + return infos, err + } + defer resp.Body.Close() + + var res IPResponse + if err := json.Unmarshal(body, &res); err != nil { + log.Error().Err(err).Msg("failed to decode response body") + return infos, err + } + + return res.Infos, nil +} + +func requestCloudServices(ctx context.Context) ([]string, error) { + // check if token is present + var infos []string + token, err := FetchLicense(ctx) + if err != nil || token == "" { + log.Error().Err(err).Msg("token is required to access threat intel") + return infos, err + } + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, threatIntelResolverURL+"/cloud-services", nil) + if err != nil { + return infos, err + } + + req.Header.Set("x-license-key", token) + + q := req.URL.Query() + q.Add("version", wutils.Version) + q.Add("product", utils.Project) + req.URL.RawQuery = q.Encode() + + log.Info().Msgf("query threatintel at %s", req.URL.String()) + + tr := http.DefaultTransport.(*http.Transport).Clone() + tr.TLSClientConfig = &tls.Config{ + InsecureSkipVerify: true, + } + hc := http.Client{ + Timeout: 1 * time.Minute, + Transport: tr, + } + resp, err := hc.Do(req) + if err != nil { + log.Error().Err(err).Msg("failed http request") + return infos, err + } + + if resp.StatusCode != http.StatusOK { + return infos, fmt.Errorf("%d invaid response code", resp.StatusCode) + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + log.Error().Err(err).Msg("failed read response body") + return infos, err + } + defer resp.Body.Close() + + var res ServicesResponse + if err := json.Unmarshal(body, &res); err != nil { + log.Error().Err(err).Msg("failed to decode response body") + return infos, err + } + + return res.Infos, nil +} + +func IngestServiceNodes(ctx context.Context, task *asynq.Task) error { + + log := log.WithCtx(ctx) + + log.Info().Msgf("Ingest cloud services nodes") + defer log.Info().Msgf("Ingest cloud services nodes done") + + nc, err := directory.Neo4jClient(ctx) + if err != nil { + return err + } + + session := nc.NewSession(ctx, neo4j.SessionConfig{AccessMode: neo4j.AccessModeWrite}) + defer session.Close(ctx) + + infos, err := requestCloudServices(ctx) + if err != nil { + return err + } + + if _, err = session.Run(ctx, + `UNWIND $batch as row + MERGE (n:Node{node_id:row}) + ON CREATE SET n.pseudo = true, + n.cloud_provider = 'internet', + n.cloud_region = 'internet', + n.active = false, + n.node_name = row`, + map[string]interface{}{"batch": infos}, + ); err != nil { + return err + } + + return nil +} diff --git a/deepfence_worker/cronscheduler/scheduler.go b/deepfence_worker/cronscheduler/scheduler.go index b9b910f747..18e8f47477 100644 --- a/deepfence_worker/cronscheduler/scheduler.go +++ b/deepfence_worker/cronscheduler/scheduler.go @@ -363,6 +363,13 @@ func (s *Scheduler) addCronJobs(ctx context.Context) error { } jobIDs = append(jobIDs, jobID) + jobID, err = s.cron.AddFunc("@every 12h", + s.enqueueTask(namespace, utils.CloudServicesUpdateTask, true, utils.CritialTaskOpts()...)) + if err != nil { + return err + } + jobIDs = append(jobIDs, jobID) + s.jobs.CronJobs[namespace] = CronJobs{jobIDs: jobIDs} return nil @@ -398,8 +405,8 @@ func (s *Scheduler) startInitJobs(ctx context.Context) error { s.enqueueTask(namespace, utils.CachePostureProviders, true, utils.CritialTaskOpts()...)() s.enqueueTask(namespace, utils.RedisRewriteAOF, true, utils.CritialTaskOpts()...)() s.enqueueTask(namespace, utils.AsynqDeleteAllArchivedTasks, true, utils.CritialTaskOpts()...)() - s.enqueueTask(namespace, utils.ThreatIntelUpdateTask, false, utils.CritialTaskOpts()...)() + s.enqueueTask(namespace, utils.CloudServicesUpdateTask, false, utils.CritialTaskOpts()...)() return nil } diff --git a/deepfence_worker/worker.go b/deepfence_worker/worker.go index 92c1a32786..b74289bea7 100644 --- a/deepfence_worker/worker.go +++ b/deepfence_worker/worker.go @@ -238,6 +238,8 @@ func NewWorker(ns directory.NamespaceID, cfg wtils.Config) (Worker, context.Canc worker.AddRetryableHandler(utils.ThreatIntelUpdateTask, cronjobs.FetchThreatIntel) + worker.AddRetryableHandler(utils.CloudServicesUpdateTask, cronjobs.IngestServiceNodes) + worker.AddRetryableHandler(utils.DeleteCloudAccounts, scans.DeleteCloudAccounts) return worker, cancel, nil From 405089db1d8fbdc44ce2c38c89aed9e9c79f5513 Mon Sep 17 00:00:00 2001 From: Thomas Legris Date: Tue, 11 Jun 2024 16:32:43 +0900 Subject: [PATCH 2/4] add ip resolution --- deepfence_server/handler/agent_report.go | 9 +- deepfence_server/ingesters/agent.go | 120 ++++++++++++++++++++++- deepfence_server/model/license.go | 14 +++ 3 files changed, 137 insertions(+), 6 deletions(-) diff --git a/deepfence_server/handler/agent_report.go b/deepfence_server/handler/agent_report.go index ceb1905363..2c91ae8117 100644 --- a/deepfence_server/handler/agent_report.go +++ b/deepfence_server/handler/agent_report.go @@ -13,6 +13,7 @@ import ( jsoniter "github.com/json-iterator/go" "github.com/deepfence/ThreatMapper/deepfence_server/ingesters" + "github.com/deepfence/ThreatMapper/deepfence_server/model" "github.com/deepfence/ThreatMapper/deepfence_server/pkg/scope/report" "github.com/deepfence/ThreatMapper/deepfence_utils/controls" "github.com/deepfence/ThreatMapper/deepfence_utils/directory" @@ -40,7 +41,13 @@ func getAgentReportIngester(ctx context.Context) (*ingesters.Ingester[report.Com if has { return ing.(*ingesters.Ingester[report.CompressedReport]), nil } - newEntry, err := ingesters.NewNeo4jCollector(ctx) + + token, err := model.SimpleFetchLicense(ctx) + if err != nil { + log.Error().Err(err).Msg("Continuing without license key") + } + + newEntry, err := ingesters.NewNeo4jCollector(ctx, token) if err != nil { return nil, err } diff --git a/deepfence_server/ingesters/agent.go b/deepfence_server/ingesters/agent.go index 35ade77a05..edb6ec0831 100644 --- a/deepfence_server/ingesters/agent.go +++ b/deepfence_server/ingesters/agent.go @@ -3,7 +3,11 @@ package ingesters import ( "bytes" "context" + "crypto/tls" + "encoding/json" "fmt" + "io" + "net/http" "os" "strconv" "strings" @@ -11,6 +15,7 @@ import ( "sync/atomic" "time" + "github.com/deepfence/ThreatMapper/deepfence_server/pkg/constants" "github.com/deepfence/ThreatMapper/deepfence_server/pkg/scope/report" "github.com/deepfence/ThreatMapper/deepfence_utils/directory" "github.com/deepfence/ThreatMapper/deepfence_utils/log" @@ -485,7 +490,7 @@ func NewReportIngestionData() ReportIngestionData { } } -func prepareNeo4jIngestion(rpt *report.Report, resolvers *EndpointResolversCache, buf *bytes.Buffer) ReportIngestionData { +func prepareNeo4jIngestion(rpt *report.Report, resolvers *EndpointResolversCache, buf *bytes.Buffer, token string) ReportIngestionData { res := ReportIngestionData{ Hosts: make([]map[string]interface{}, 0, len(rpt.Host)), @@ -620,6 +625,7 @@ func prepareNeo4jIngestion(rpt *report.Report, resolvers *EndpointResolversCache } } } + connections = resolveCloudService(connections, token) res.EndpointEdgesBatch = connections2maps(connections, buf) processEdgesBatch := map[string][]string{} @@ -1038,11 +1044,11 @@ func (nc *neo4jIngester) runDBPusher( log.Info().Msgf("runDBPusher ended") } -func (nc *neo4jIngester) runPreparer() { +func (nc *neo4jIngester) runPreparer(token string) { var buf bytes.Buffer for rpt := range nc.preparersInput { r := computeResolvers(&rpt, &buf) - data := prepareNeo4jIngestion(&rpt, nc.resolvers, &buf) + data := prepareNeo4jIngestion(&rpt, nc.resolvers, &buf, token) select { case nc.batcher <- data: nc.resolversUpdate <- r @@ -1052,7 +1058,7 @@ func (nc *neo4jIngester) runPreparer() { } } -func NewNeo4jCollector(ctx context.Context) (Ingester[report.CompressedReport], error) { +func NewNeo4jCollector(ctx context.Context, token string) (Ingester[report.CompressedReport], error) { rdb, err := newEndpointResolversCache(ctx) if err != nil { @@ -1112,7 +1118,7 @@ func NewNeo4jCollector(ctx context.Context) (Ingester[report.CompressedReport], }() for i := 0; i < preparerWorkersNum; i++ { - go nc.runPreparer() + go nc.runPreparer(token) } go func() { @@ -1332,3 +1338,107 @@ func GetPushBack(ctx context.Context, driver neo4j.DriverWithContext) (int32, er } return int32(rec.Values[0].(int64)), nil } + +const ( + threatIntelResolverURL = "https://threat-intel.deepfence.io/threat-intel" +) + +type CloudInfo struct { + Type string `json:"type"` + Region string `json:"region"` + Provider string `json:"provider"` +} + +func (ci *CloudInfo) NodeID() string { + return fmt.Sprintf("%s-%s", ci.Provider, ci.Type) +} + +type IPResponse struct { + Infos []CloudInfo `json:"Infos"` +} + +type IPRequest struct { + IPv4s []string `json:"ipv4s"` + IPv6s []string `json:"ipv6s"` +} + +func requestCloudInfo(ctx context.Context, strIps []string, token string) ([]CloudInfo, error) { + // check if token is present + var infos []CloudInfo + + bodyReq := IPRequest{ + IPv4s: strIps, + } + b, err := json.Marshal(bodyReq) + if err != nil { + return infos, err + } + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, threatIntelResolverURL+"/cloud-ips", bytes.NewReader(b)) + if err != nil { + return infos, err + } + + req.Header.Set("x-license-key", token) + + q := req.URL.Query() + q.Add("version", constants.Version) + q.Add("product", utils.Project) + req.URL.RawQuery = q.Encode() + + log.Info().Msgf("query threatintel at %s", req.URL.String()) + + tr := http.DefaultTransport.(*http.Transport).Clone() + tr.TLSClientConfig = &tls.Config{ + InsecureSkipVerify: true, + } + hc := http.Client{ + Timeout: 1 * time.Minute, + Transport: tr, + } + resp, err := hc.Do(req) + if err != nil { + log.Error().Err(err).Msg("failed http request") + return infos, err + } + + if resp.StatusCode != http.StatusOK { + return infos, fmt.Errorf("%d invaid response code", resp.StatusCode) + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + log.Error().Err(err).Msg("failed read response body") + return infos, err + } + defer resp.Body.Close() + + var res IPResponse + if err := json.Unmarshal(body, &res); err != nil { + log.Error().Err(err).Msg("failed to decode response body") + return infos, err + } + + return res.Infos, nil +} + +func resolveCloudService(connections []Connection, token string) []Connection { + ips := []string{} + for i := range connections { + if connections[i].rightIP != nil { + ips = append(ips, *connections[i].rightIP) + } + } + if len(ips) == 0 { + return connections + } + infos, err := requestCloudInfo(context.Background(), ips, token) + if err != nil || len(connections) != len(infos) { + log.Error().Err(err).Msgf("issue fetching cloud infos %d/%d", len(infos), len(connections)) + return connections + } + for i := range infos { + connections[i].destination = infos[i].NodeID() + } + return connections +} diff --git a/deepfence_server/model/license.go b/deepfence_server/model/license.go index 0dc7f251cb..1233d56abe 100644 --- a/deepfence_server/model/license.go +++ b/deepfence_server/model/license.go @@ -9,6 +9,7 @@ import ( "net/http" "time" + "github.com/deepfence/ThreatMapper/deepfence_utils/directory" "github.com/deepfence/ThreatMapper/deepfence_utils/log" postgresqlDb "github.com/deepfence/ThreatMapper/deepfence_utils/postgresql/postgresql-db" "github.com/deepfence/ThreatMapper/deepfence_utils/utils" @@ -371,3 +372,16 @@ func formatDate(dt time.Time) string { func parseDate(dateString string) (time.Time, error) { return time.Parse(DateLayout1, dateString) } + +// FetchLicense gets license key from database +func SimpleFetchLicense(ctx context.Context) (string, error) { + pgClient, err := directory.PostgresClient(ctx) + if err != nil { + return "", err + } + license, err := GetLicense(ctx, pgClient) + if err != nil { + return "", err + } + return license.LicenseKey, nil +} From 6240fd5dd4da28cebb86a07a913b280d5ba50825 Mon Sep 17 00:00:00 2001 From: Thomas Legris Date: Tue, 11 Jun 2024 19:20:42 +0900 Subject: [PATCH 3/4] add external service to graph --- deepfence_agent/plugins/YaraHunter | 2 +- deepfence_agent/plugins/cloud-scanner | 2 +- deepfence_agent/plugins/yara-rules | 2 +- deepfence_server/handler/topology.go | 10 +++- deepfence_server/ingesters/agent.go | 8 +-- .../reporters/graph/topology_reporter.go | 53 +++++++++++++++++-- deepfence_worker/cronjobs/neo4j.go | 3 +- golang_deepfence_sdk | 2 +- 8 files changed, 69 insertions(+), 13 deletions(-) diff --git a/deepfence_agent/plugins/YaraHunter b/deepfence_agent/plugins/YaraHunter index c729ccb17f..395346ecce 160000 --- a/deepfence_agent/plugins/YaraHunter +++ b/deepfence_agent/plugins/YaraHunter @@ -1 +1 @@ -Subproject commit c729ccb17f3611052c271084e1bdfa84b55edcc5 +Subproject commit 395346ecce9946ce1a276f45b9fa7ad04ce9bfe2 diff --git a/deepfence_agent/plugins/cloud-scanner b/deepfence_agent/plugins/cloud-scanner index 9e300c1678..c25aedcfdb 160000 --- a/deepfence_agent/plugins/cloud-scanner +++ b/deepfence_agent/plugins/cloud-scanner @@ -1 +1 @@ -Subproject commit 9e300c167816acfd1326ae15614b320c92374c9e +Subproject commit c25aedcfdbc09a9deae9f40cdea9367a6b6ca658 diff --git a/deepfence_agent/plugins/yara-rules b/deepfence_agent/plugins/yara-rules index e5f8c8b78e..8217b51893 160000 --- a/deepfence_agent/plugins/yara-rules +++ b/deepfence_agent/plugins/yara-rules @@ -1 +1 @@ -Subproject commit e5f8c8b78e82bcc2f56a95ed77ed19d8dc8f6a66 +Subproject commit 8217b518934b556ee7f56e6e5fc3e05be8c8d9fa diff --git a/deepfence_server/handler/topology.go b/deepfence_server/handler/topology.go index 18c8259d65..87d06a6653 100644 --- a/deepfence_server/handler/topology.go +++ b/deepfence_server/handler/topology.go @@ -278,7 +278,7 @@ func graphToSummaries( return res } - for cp, crs := range graph.CloudServices { + for cp, crs := range graph.InternalCloudServices { for _, crStub := range crs { cr := string(crStub.ID) nodes[cr] = detailed.NodeSummary{ @@ -293,6 +293,14 @@ func graphToSummaries( nodes["in-the-internet"] = inboundInternetNode nodes["out-the-internet"] = outboundInternetNode + for _, n := range graph.ExternalCloudServices { + nodes[string(n.ID)] = detailed.NodeSummary{ + ID: string(n.ID), + Label: n.Name, + ImmediateParentID: "", + Type: "pseudo", + } + } for h, n := range graph.Processes { for _, idStub := range n { diff --git a/deepfence_server/ingesters/agent.go b/deepfence_server/ingesters/agent.go index edb6ec0831..9c75562825 100644 --- a/deepfence_server/ingesters/agent.go +++ b/deepfence_server/ingesters/agent.go @@ -1424,21 +1424,23 @@ func requestCloudInfo(ctx context.Context, strIps []string, token string) ([]Clo func resolveCloudService(connections []Connection, token string) []Connection { ips := []string{} + ids := []int{} for i := range connections { if connections[i].rightIP != nil { ips = append(ips, *connections[i].rightIP) + ids = append(ids, i) } } if len(ips) == 0 { return connections } infos, err := requestCloudInfo(context.Background(), ips, token) - if err != nil || len(connections) != len(infos) { - log.Error().Err(err).Msgf("issue fetching cloud infos %d/%d", len(infos), len(connections)) + if err != nil || len(ids) != len(infos) { + log.Error().Err(err).Msgf("issue fetching cloud infos %v/%v", infos, connections) return connections } for i := range infos { - connections[i].destination = infos[i].NodeID() + connections[ids[i]].destination = infos[i].NodeID() } return connections } diff --git a/deepfence_server/reporters/graph/topology_reporter.go b/deepfence_server/reporters/graph/topology_reporter.go index fef9a53d5f..d7aa34fcb3 100644 --- a/deepfence_server/reporters/graph/topology_reporter.go +++ b/deepfence_server/reporters/graph/topology_reporter.go @@ -436,7 +436,7 @@ func extractResourceNodeIds(ids []interface{}) []NodeID { return res } -func (ntp *neo4jTopologyReporter) GetCloudServices( +func (ntp *neo4jTopologyReporter) GetInternalCloudServices( ctx context.Context, tx neo4j.ExplicitTransaction, cloudProvider []string, @@ -491,6 +491,46 @@ func (ntp *neo4jTopologyReporter) GetCloudServices( } +func (ntp *neo4jTopologyReporter) GetExternalCloudServices( + ctx context.Context, + tx neo4j.ExplicitTransaction, + cloudProvider []string, + cloudRegions []string, + fieldfilters mo.Option[reporters.FieldsFilters]) ([]NodeStub, error) { + + ctx, span := telemetry.NewSpan(ctx, "toploogy", "get-cloud-services") + defer span.End() + + res := []NodeStub{} + + r, err := tx.Run(ctx, ` + MATCH (n:Node) -[:CONNECTS]- (:Node) + WHERE n.cloud_provider = "internet" + RETURN n.node_id`, + map[string]interface{}{}, + ) + + if err != nil { + return res, err + } + records, err := r.Collect(ctx) + + if err != nil { + return res, err + } + + for _, record := range records { + nodeID := record.Values[0].(string) + res = append(res, + NodeStub{ + ID: NodeID(nodeID), + Name: nodeID, + }) + } + return res, nil + +} + func (ntp *neo4jTopologyReporter) GetPublicCloudResources( ctx context.Context, tx neo4j.ExplicitTransaction, @@ -921,8 +961,9 @@ type RenderedGraph struct { Connections []ConnectionSummary `json:"connections" required:"true"` // PublicCloudResources map[NodeID][]ResourceStub `json:"public-cloud-resources" required:"true"` // NonPublicCloudResources map[NodeID][]ResourceStub `json:"non-public-cloud-resources" required:"true"` - CloudServices map[NodeID][]ResourceStub `json:"cloud-services" required:"true"` - SkippedConnections bool `json:"skipped_connections" required:"true"` + InternalCloudServices map[NodeID][]ResourceStub `json:"cloud-services" required:"true"` + ExternalCloudServices []NodeStub `json:"external-cloud-services" required:"true"` + SkippedConnections bool `json:"skipped_connections" required:"true"` } type TopologyFilters struct { @@ -1239,7 +1280,11 @@ func (ntp *neo4jTopologyReporter) getGraph(ctx context.Context, filters Topology if err != nil { return res, err } - res.CloudServices, err = ntp.GetCloudServices(ctx, tx, cloudFilter, regionFilter, mo.None[reporters.FieldsFilters]()) + res.InternalCloudServices, err = ntp.GetInternalCloudServices(ctx, tx, cloudFilter, regionFilter, mo.None[reporters.FieldsFilters]()) + if err != nil { + return res, err + } + res.ExternalCloudServices, err = ntp.GetExternalCloudServices(ctx, tx, cloudFilter, regionFilter, mo.None[reporters.FieldsFilters]()) if err != nil { return res, err } diff --git a/deepfence_worker/cronjobs/neo4j.go b/deepfence_worker/cronjobs/neo4j.go index 1479f7470c..deb22a37a5 100644 --- a/deepfence_worker/cronjobs/neo4j.go +++ b/deepfence_worker/cronjobs/neo4j.go @@ -780,6 +780,7 @@ func LinkNodes(ctx context.Context, task *asynq.Task) error { AND NOT n.cloud_provider IS NULL AND NOT n.cloud_region IS NULL AND NOT n.node_id IN ["in-the-internet", "out-the-internet", "`+ConsoleAgentId+`"] + AND NOT n.cloud_provider = 'internet' WITH n LIMIT 50000 MERGE (cp:CloudProvider{node_id: n.cloud_provider}) MERGE (cr:CloudRegion{node_id: n.cloud_region}) @@ -816,7 +817,7 @@ func LinkNodes(ctx context.Context, task *asynq.Task) error { WHERE NOT exists((n) -[:ALIAS]-> ()) MERGE (t:ImageTag{node_id: n.docker_image_name + "_" + n.docker_image_tag}) MERGE (n) -[a:ALIAS]-> (t) - SET t.updated_at = TIMESTAMP(), + SET t.updated_at = TIMESTAMP(), a.updated_at = TIMESTAMP()`, map[string]interface{}{}, txConfig); err != nil { return err diff --git a/golang_deepfence_sdk b/golang_deepfence_sdk index d55cb83c33..226fd1e1e6 160000 --- a/golang_deepfence_sdk +++ b/golang_deepfence_sdk @@ -1 +1 @@ -Subproject commit d55cb83c3387857543eb9a0d656bc09b6cc84e59 +Subproject commit 226fd1e1e6bb7b9f4e179d6f738bb7cd9c55dd72 From fa0884e8b9b14f85757efa0f137d2c9b76ef9953 Mon Sep 17 00:00:00 2001 From: Thomas Legris Date: Wed, 12 Jun 2024 18:10:52 +0900 Subject: [PATCH 4/4] add reverse DNS resolution --- deepfence_server/ingesters/agent.go | 31 ++++++++++++++++++++++++----- 1 file changed, 26 insertions(+), 5 deletions(-) diff --git a/deepfence_server/ingesters/agent.go b/deepfence_server/ingesters/agent.go index 9c75562825..85e7577a03 100644 --- a/deepfence_server/ingesters/agent.go +++ b/deepfence_server/ingesters/agent.go @@ -7,6 +7,7 @@ import ( "encoding/json" "fmt" "io" + "net" "net/http" "os" "strconv" @@ -414,6 +415,7 @@ type Connection struct { localPort int leftIP *string rightIP *string + resolved bool } func connections2maps(connections []Connection, buf *bytes.Buffer) []map[string]interface{} { @@ -625,7 +627,8 @@ func prepareNeo4jIngestion(rpt *report.Report, resolvers *EndpointResolversCache } } } - connections = resolveCloudService(connections, token) + connections = resolveCloudServices(connections, token) + connections = resolveReverseDNS(connections) res.EndpointEdgesBatch = connections2maps(connections, buf) processEdgesBatch := map[string][]string{} @@ -768,7 +771,8 @@ func (nc *neo4jIngester) PushToDBSeq(ctx context.Context, batches ReportIngestio if _, err := tx.Run(ctx, ` UNWIND $batch as row MATCH (n:Node{node_id: row.source}) - MATCH (m:Node{node_id: row.destination}) + MERGE (m:Node{node_id: row.destination}) + ON CREATE SET m.pseudo = true, m.active = true, m.updated_at = TIMESTAMP(), m.cloud_region = 'internet', m.cloud_provider = 'internet' MERGE (n)-[r:CONNECTS]->(m) WITH n, r, m, row.pids as rpids UNWIND rpids as pids @@ -1422,9 +1426,9 @@ func requestCloudInfo(ctx context.Context, strIps []string, token string) ([]Clo return res.Infos, nil } -func resolveCloudService(connections []Connection, token string) []Connection { +func resolveCloudServices(connections []Connection, token string) []Connection { ips := []string{} - ids := []int{} + ids := []int{} for i := range connections { if connections[i].rightIP != nil { ips = append(ips, *connections[i].rightIP) @@ -1440,7 +1444,24 @@ func resolveCloudService(connections []Connection, token string) []Connection { return connections } for i := range infos { - connections[ids[i]].destination = infos[i].NodeID() + if !strings.Contains(infos[i].NodeID(), "unknown") { + connections[ids[i]].destination = infos[i].NodeID() + connections[ids[i]].resolved = true + } + } + return connections +} + +func resolveReverseDNS(connections []Connection) []Connection { + for i := range connections { + if connections[i].rightIP != nil && !connections[i].resolved { + names, err := net.LookupAddr(*connections[i].rightIP) + if err != nil || len(names) == 0 { + log.Error().Err(err).Msgf("Reverse DNS failed for %s", *connections[i].rightIP) + continue + } + connections[i].destination = fmt.Sprintf("out-%s", names[0]) + } } return connections }