Skip to content

Commit

Permalink
update vpc flow with flint-s3 based DDL assets and dashboard (opensea…
Browse files Browse the repository at this point in the history
…rch-project#1691)

* update vpc flow with flint-s3 based DDL assets and dashboard

Signed-off-by: YANGDB <[email protected]>

* update MV to use auto sync refresh

Signed-off-by: YANGDB <[email protected]>

* Update server/adaptors/integrations/__data__/repository/aws_vpc_flow/assets/vpc_agg_30min_connections_mv-1.0.0.sql

Co-authored-by: Simeon Widdis <[email protected]>
Signed-off-by: YANGDB <[email protected]>

* Update server/adaptors/integrations/__data__/repository/aws_vpc_flow/assets/vpc_window-agg_60min_dest_ip_total-bytes_mv-1.0.0.sql

Co-authored-by: Simeon Widdis <[email protected]>
Signed-off-by: YANGDB <[email protected]>

* Update server/adaptors/integrations/__data__/repository/aws_vpc_flow/assets/vpc_window-agg_60min_dest_ip_cardinality_mv-1.0.0.sql

Co-authored-by: Simeon Widdis <[email protected]>
Signed-off-by: YANGDB <[email protected]>

* Update server/adaptors/integrations/__data__/repository/aws_vpc_flow/assets/vpc_live_week_mv-1.0.0.sql

Co-authored-by: Simeon Widdis <[email protected]>
Signed-off-by: YANGDB <[email protected]>

* Update server/adaptors/integrations/__data__/repository/aws_vpc_flow/assets/vpc_agg_60min_connections_mv-1.0.0.sql

Co-authored-by: Simeon Widdis <[email protected]>
Signed-off-by: YANGDB <[email protected]>

* remove comments from MV queries

Signed-off-by: YANGDB <[email protected]>

* update to remove HIVE as not supported yet in current EMR version

Signed-off-by: YANGDB <[email protected]>

* update refresh sync rate

Signed-off-by: YANGDB <[email protected]>

* update watermark_delay

Signed-off-by: YANGDB <[email protected]>

* add live only workflow & dashboard
add pre-agg + live workflow & dashboard
add refresh-workflow for the pre-agg queries

Signed-off-by: YANGDB <[email protected]>

* add live only workflow & dashboard
add pre-agg + live workflow & dashboard
add refresh-workflow for the pre-agg queries

Signed-off-by: YANGDB <[email protected]>

* add live all only workflow & dashboard

Signed-off-by: YANGDB <[email protected]>

* correct table name hard coded issue

Signed-off-by: YANGDB <[email protected]>

* add vpc table creation based on CSV format

Signed-off-by: YANGDB <[email protected]>

---------

Signed-off-by: YANGDB <[email protected]>
Co-authored-by: Simeon Widdis <[email protected]>
  • Loading branch information
2 people authored and RyanL1997 committed Apr 18, 2024
1 parent 088ada5 commit eed560d
Show file tree
Hide file tree
Showing 18 changed files with 1,216 additions and 0 deletions.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
CREATE EXTERNAL TABLE IF NOT EXISTS {table_name} (
cloud STRUCT<
account_uid: STRING,
region: STRING,
zone: STRING,
provider: STRING
>,
src_endpoint STRUCT<
port: INT,
svc_name: STRING,
ip: STRING,
intermediate_ips: ARRAY<STRING>,
interface_uid: STRING,
vpc_uid: STRING,
instance_uid: STRING,
subnet_uid: STRING
>,
dst_endpoint STRUCT<
port: INT,
svc_name: STRING,
ip: STRING,
intermediate_ips: ARRAY<STRING>,
interface_uid: STRING,
vpc_uid: STRING,
instance_uid: STRING,
subnet_uid: STRING
>,
connection_info STRUCT<
protocol_num: INT,
tcp_flags: INT,
protocol_ver: STRING,
boundary_id: INT,
boundary: STRING,
direction_id: INT,
direction: STRING
>,
traffic STRUCT<
packets: BIGINT,
bytes: BIGINT
>,
time BIGINT,
start_time BIGINT,
end_time BIGINT,
status_code STRING,
severity_id INT,
severity STRING,
class_name STRING,
class_uid INT,
category_name STRING,
category_uid INT,
activity_name STRING,
activity_id INT,
disposition STRING,
disposition_id INT,
type_uid INT,
type_name STRING,
region STRING,
accountid STRING,
eventday STRING
)
USING json
LOCATION '{s3_bucket_location}'
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
CREATE MATERIALIZED VIEW IF NOT EXISTS {table_name}__agg_30_min_connections_mview AS
SELECT
CAST(from_unixtime(CAST((start_time / 1000) AS BIGINT) DIV 1800 * 1800) AS TIMESTAMP) AS interval_start_time,
CAST(from_unixtime((CAST((start_time / 1000) AS BIGINT) DIV 1800 * 1800) + 1799) AS TIMESTAMP) AS interval_end_time,

status_code as `aws.vpc.status_code`,
CAST(IFNULL(connection_info['direction'], 'Unknown') AS STRING) AS `aws.vpc.connection.direction`,
CAST(IFNULL(src_endpoint.svc_name, 'Unknown') AS STRING) AS `aws.vpc.pkt-src-aws-service`,
CAST(IFNULL(dst_endpoint.svc_name, 'Unknown') AS STRING) AS `aws.vpc.pkt-dst-aws-service`,

accountid as `aws.vpc.account-id`,
region as `aws.vpc.region`,

COUNT(*) AS total_connections,
SUM(CAST(IFNULL(traffic.bytes, 0) AS LONG)) AS total_bytes,
SUM(CAST(IFNULL(traffic.packets, 0) AS LONG)) AS total_packets
FROM
{table_name}
GROUP BY
CAST((start_time / 1000) AS BIGINT) DIV 1800 * 1800,
region,
accountid,
status_code,
src_endpoint.svc_name,
dst_endpoint.svc_name,
connection_info['direction']
ORDER BY
interval_start_time
WITH (
auto_refresh = false
)

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
REFRESH MATERIALIZED VIEW {table_name}__agg_30_min_connections_mview
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
CREATE MATERIALIZED VIEW IF NOT EXISTS {table_name}__agg_60_min_connections_mview AS
SELECT
date_trunc('hour', from_unixtime(start_time / 1000)) AS interval_start_time,
date_trunc('hour', from_unixtime(start_time / 1000)) + INTERVAL 1 HOUR AS interval_end_time,

status_code as `aws.vpc.status_code`,
CAST(IFNULL(connection_info['direction'], 'Unknown') AS STRING) AS `aws.vpc.connection.direction`,
CAST(IFNULL(src_endpoint.svc_name, 'Unknown') AS STRING) AS `aws.vpc.pkt-src-aws-service`,
CAST(IFNULL(dst_endpoint.svc_name, 'Unknown') AS STRING) AS `aws.vpc.pkt-dst-aws-service`,

accountid as `aws.vpc.account-id`,
region as `aws.vpc.region`,

COUNT(*) AS total_connections,
SUM(CAST(IFNULL(traffic.bytes, 0) AS LONG)) AS total_bytes,
SUM(CAST(IFNULL(traffic.packets, 0) AS LONG)) AS total_packets
FROM
{table_name}
GROUP BY
date_trunc('hour', from_unixtime(start_time / 1000)),
region,
accountid,
status_code,
src_endpoint.svc_name,
dst_endpoint.svc_name,
connection_info['direction']
ORDER BY
interval_start_time
WITH (
auto_refresh = false
)

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
REFRESH MATERIALIZED VIEW {table_name}__agg_60_min_connections_mview
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
CREATE MATERIALIZED VIEW {table_name}__week_live_mview AS
SELECT
cloud.account_uid AS `aws.vpc.cloud_account_uid`,
cloud.region AS `aws.vpc.cloud_region`,
cloud.zone AS `aws.vpc.cloud_zone`,
cloud.provider AS `aws.vpc.cloud_provider`,

CAST(IFNULL(src_endpoint.port, 0) AS LONG) AS `aws.vpc.srcport`,
CAST(IFNULL(src_endpoint.svc_name, 'Unknown') AS STRING) AS `aws.vpc.pkt-src-aws-service`,
CAST(IFNULL(src_endpoint.ip, '0.0.0.0') AS STRING) AS `aws.vpc.srcaddr`,
CAST(IFNULL(src_endpoint.interface_uid, 'Unknown') AS STRING) AS `aws.vpc.src-interface_uid`,
CAST(IFNULL(src_endpoint.vpc_uid, 'Unknown') AS STRING) AS `aws.vpc.src-vpc_uid`,
CAST(IFNULL(src_endpoint.instance_uid, 'Unknown') AS STRING) AS `aws.vpc.src-instance_uid`,
CAST(IFNULL(src_endpoint.subnet_uid, 'Unknown') AS STRING) AS `aws.vpc.src-subnet_uid`,

CAST(IFNULL(dst_endpoint.port, 0) AS LONG) AS `aws.vpc.dstport`,
CAST(IFNULL(dst_endpoint.svc_name, 'Unknown') AS STRING) AS `aws.vpc.pkt-dst-aws-service`,
CAST(IFNULL(dst_endpoint.ip, '0.0.0.0') AS STRING) AS `aws.vpc.dstaddr`,
CAST(IFNULL(dst_endpoint.interface_uid, 'Unknown') AS STRING) AS `aws.vpc.dst-interface_uid`,
CAST(IFNULL(dst_endpoint.vpc_uid, 'Unknown') AS STRING) AS `aws.vpc.dst-vpc_uid`,
CAST(IFNULL(dst_endpoint.instance_uid, 'Unknown') AS STRING) AS `aws.vpc.dst-instance_uid`,
CAST(IFNULL(dst_endpoint.subnet_uid, 'Unknown') AS STRING) AS `aws.vpc.dst-subnet_uid`,
CASE
WHEN regexp(dst_endpoint.ip, '(10\\..*)|(192\\.168\\..*)|(172\\.1[6-9]\\..*)|(172\\.2[0-9]\\..*)|(172\\.3[0-1]\\.*)')
THEN 'ingress'
ELSE 'egress'
END AS `aws.vpc.flow-direction`,

CAST(IFNULL(connection_info['protocol_num'], 0) AS INT) AS `aws.vpc.connection.protocol_num`,
CAST(IFNULL(connection_info['tcp_flags'], '0') AS STRING) AS `aws.vpc.connection.tcp_flags`,
CAST(IFNULL(connection_info['protocol_ver'], '0') AS STRING) AS `aws.vpc.connection.protocol_ver`,
CAST(IFNULL(connection_info['boundary'], 'Unknown') AS STRING) AS `aws.vpc.connection.boundary`,
CAST(IFNULL(connection_info['direction'], 'Unknown') AS STRING) AS `aws.vpc.connection.direction`,

CAST(IFNULL(traffic.packets, 0) AS LONG) AS `aws.vpc.packets`,
CAST(IFNULL(traffic.bytes, 0) AS LONG) AS `aws.vpc.bytes`,

CAST(FROM_UNIXTIME(time / 1000) AS TIMESTAMP) AS `@timestamp`,
CAST(FROM_UNIXTIME(start_time / 1000) AS TIMESTAMP) AS `start_time`,
CAST(FROM_UNIXTIME(start_time / 1000) AS TIMESTAMP) AS `interval_start_time`,
CAST(FROM_UNIXTIME(end_time / 1000) AS TIMESTAMP) AS `end_time`,
status_code AS `aws.vpc.status_code`,

severity AS `aws.vpc.severity`,
class_name AS `aws.vpc.class_name`,
category_name AS `aws.vpc.category_name`,
activity_name AS `aws.vpc.activity_name`,
disposition AS `aws.vpc.disposition`,
type_name AS `aws.vpc.type_name`,

region AS `aws.vpc.region`,
accountid AS `aws.vpc.account-id`
FROM
{table_name}
WITH (
auto_refresh = true,
refresh_interval = '1 Minute',
checkpoint_location = '{s3_checkpoint_location}',
watermark_delay = '10 Second',
extra_options = '{ "{table_name}": { "maxFilesPerTrigger": "10" }}'
)

Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
CREATE MATERIALIZED VIEW {table_name}__week_live_mview AS
SELECT
cloud.account_uid AS `aws.vpc.cloud_account_uid`,
cloud.region AS `aws.vpc.cloud_region`,
cloud.zone AS `aws.vpc.cloud_zone`,
cloud.provider AS `aws.vpc.cloud_provider`,

CAST(IFNULL(src_endpoint.port, 0) AS LONG) AS `aws.vpc.srcport`,
CAST(IFNULL(src_endpoint.svc_name, 'Unknown') AS STRING) AS `aws.vpc.pkt-src-aws-service`,
CAST(IFNULL(src_endpoint.ip, '0.0.0.0') AS STRING) AS `aws.vpc.srcaddr`,
CAST(IFNULL(src_endpoint.interface_uid, 'Unknown') AS STRING) AS `aws.vpc.src-interface_uid`,
CAST(IFNULL(src_endpoint.vpc_uid, 'Unknown') AS STRING) AS `aws.vpc.src-vpc_uid`,
CAST(IFNULL(src_endpoint.instance_uid, 'Unknown') AS STRING) AS `aws.vpc.src-instance_uid`,
CAST(IFNULL(src_endpoint.subnet_uid, 'Unknown') AS STRING) AS `aws.vpc.src-subnet_uid`,

CAST(IFNULL(dst_endpoint.port, 0) AS LONG) AS `aws.vpc.dstport`,
CAST(IFNULL(dst_endpoint.svc_name, 'Unknown') AS STRING) AS `aws.vpc.pkt-dst-aws-service`,
CAST(IFNULL(dst_endpoint.ip, '0.0.0.0') AS STRING) AS `aws.vpc.dstaddr`,
CAST(IFNULL(dst_endpoint.interface_uid, 'Unknown') AS STRING) AS `aws.vpc.dst-interface_uid`,
CAST(IFNULL(dst_endpoint.vpc_uid, 'Unknown') AS STRING) AS `aws.vpc.dst-vpc_uid`,
CAST(IFNULL(dst_endpoint.instance_uid, 'Unknown') AS STRING) AS `aws.vpc.dst-instance_uid`,
CAST(IFNULL(dst_endpoint.subnet_uid, 'Unknown') AS STRING) AS `aws.vpc.dst-subnet_uid`,
CASE
WHEN regexp(dst_endpoint.ip, '(10\\..*)|(192\\.168\\..*)|(172\\.1[6-9]\\..*)|(172\\.2[0-9]\\..*)|(172\\.3[0-1]\\.*)')
THEN 'ingress'
ELSE 'egress'
END AS `aws.vpc.flow-direction`,

CAST(IFNULL(connection_info['protocol_num'], 0) AS INT) AS `aws.vpc.connection.protocol_num`,
CAST(IFNULL(connection_info['tcp_flags'], '0') AS STRING) AS `aws.vpc.connection.tcp_flags`,
CAST(IFNULL(connection_info['protocol_ver'], '0') AS STRING) AS `aws.vpc.connection.protocol_ver`,
CAST(IFNULL(connection_info['boundary'], 'Unknown') AS STRING) AS `aws.vpc.connection.boundary`,
CAST(IFNULL(connection_info['direction'], 'Unknown') AS STRING) AS `aws.vpc.connection.direction`,

CAST(IFNULL(traffic.packets, 0) AS LONG) AS `aws.vpc.packets`,
CAST(IFNULL(traffic.bytes, 0) AS LONG) AS `aws.vpc.bytes`,

CAST(FROM_UNIXTIME(time / 1000) AS TIMESTAMP) AS `@timestamp`,
CAST(FROM_UNIXTIME(start_time / 1000) AS TIMESTAMP) AS `start_time`,
CAST(FROM_UNIXTIME(start_time / 1000) AS TIMESTAMP) AS `interval_start_time`,
CAST(FROM_UNIXTIME(end_time / 1000) AS TIMESTAMP) AS `end_time`,
status_code AS `aws.vpc.status_code`,

severity AS `aws.vpc.severity`,
class_name AS `aws.vpc.class_name`,
category_name AS `aws.vpc.category_name`,
activity_name AS `aws.vpc.activity_name`,
disposition AS `aws.vpc.disposition`,
type_name AS `aws.vpc.type_name`,

region AS `aws.vpc.region`,
accountid AS `aws.vpc.account-id`
FROM
{table_name},
(SELECT MAX(CAST(FROM_UNIXTIME(start_time / 1000) AS TIMESTAMP)) AS max_start_time FROM {table_name}) AS latest
WHERE
CAST(FROM_UNIXTIME(start_time / 1000) AS TIMESTAMP) >= DATE_SUB(latest.max_start_time, 7)
WITH (
auto_refresh = false
)

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
REFRESH MATERIALIZED VIEW {table_name}__week_live_mview
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
CREATE MATERIALIZED VIEW IF NOT EXISTS {table_name}__window_agg_60_min_network_ip_bytes_mview AS
WITH hourly_buckets AS (
SELECT
date_trunc('hour', from_unixtime(start_time / 1000)) AS interval_start_time,
CAST(IFNULL(dst_endpoint.ip, '0.0.0.0') AS STRING) AS dstaddr,
SUM(CAST(IFNULL(traffic.bytes, 0) AS LONG)) AS total_bytes
FROM
{table_name}
GROUP BY
interval_start_time,
dstaddr
),
ranked_addresses AS (
SELECT
CAST(interval_start_time AS TIMESTAMP),
dstaddr,
total_bytes,
RANK() OVER (PARTITION BY interval_start_time ORDER BY total_bytes DESC) AS bytes_rank
FROM
hourly_buckets
)
SELECT
CAST(interval_start_time AS TIMESTAMP),
dstaddr,
total_bytes
FROM
ranked_addresses
WHERE
bytes_rank <= 50
ORDER BY
interval_start_time ASC,
bytes_rank ASC
WITH (
auto_refresh = false
)

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
REFRESH MATERIALIZED VIEW {table_name}__window_agg_60_min_network_ip_bytes_mview
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
CREATE MATERIALIZED VIEW IF NOT EXISTS {table_name}__window_agg_60_min_network_ip_cardinality_mview AS
WITH hourly_buckets AS (
SELECT
date_trunc('hour', from_unixtime(start_time / 1000)) AS interval_start_time,
CAST(IFNULL(dst_endpoint.ip, '0.0.0.0') AS STRING) AS dstaddr,
COUNT(*) AS total_count
FROM
{table_name}
GROUP BY
interval_start_time,
dstaddr
),
ranked_addresses AS (
SELECT
CAST(interval_start_time AS TIMESTAMP),
dstaddr,
total_count,
RANK() OVER (PARTITION BY interval_start_time ORDER BY total_count DESC) AS addr_rank
FROM
hourly_buckets
)
SELECT
CAST(interval_start_time AS TIMESTAMP),
dstaddr,
total_count
FROM
ranked_addresses
WHERE
addr_rank <= 50
ORDER BY
interval_start_time ASC,
addr_rank ASC
WITH (
auto_refresh = false
)

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
REFRESH MATERIALIZED VIEW {table_name}__window_agg_60_min_network_ip_cardinality_mview
Loading

0 comments on commit eed560d

Please sign in to comment.