Skip to content

Commit

Permalink
Raw Vpc schema integration (1.0.0 parquet ) (#1853)
Browse files Browse the repository at this point in the history
* revert default vpc flow logs integration into standard vpc schema

Signed-off-by: YANGDB <[email protected]>

* update sample queries


---------

Signed-off-by: YANGDB <[email protected]>
  • Loading branch information
YANG-DB authored May 29, 2024
1 parent dd77fdd commit b3fb03e
Show file tree
Hide file tree
Showing 19 changed files with 86 additions and 450 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"license": "Apache-2.0",
"type": "logs_vpc",
"labels": ["Observability", "Logs", "AWS", "Cloud", "Flint S3"],
"author": "Haidong Wang",
"author": "OpenSearch",
"sourceUrl": "https://github.com/opensearch-project/dashboards-observability/tree/main/server/adaptors/integrations/__data__/repository/aws_vpc_flow/info",
"workflows": [
{
Expand All @@ -26,18 +26,6 @@
"label": "Dashboards & Visualizations For Flint Integrations using live queries",
"description": "Dashboards and visualizations aligned with Flint S3 datasource ",
"enabled_by_default": false
},
{
"name": "flint-pre-agg-dashboards",
"label": "Dashboards & Visualizations For Flint Integrations using pre-aggregated queries",
"description": "This step creates the MV pre-aggregated queries without running them, in order to actually update their data select the following `flint-pre-agg-refresh` workflow option ",
"enabled_by_default": false
},
{
"name": "flint-pre-agg-refresh",
"label": "Refreshing and populate the pre-aggregated projections ",
"description": "This step populate the pre-aggregated projections by enabling the REFRESH command to run, this step depends on selection of the previous `flint-pre-agg-dashboards` step",
"enabled_by_default": false
}
],
"statics": {
Expand Down Expand Up @@ -76,11 +64,11 @@
],
"assets": [
{
"name": "create_table_parquet_vpc",
"name": "create_table_vpc_schema",
"version": "1.0.0",
"extension": "sql",
"type": "query",
"workflows": ["flint-live-dashboards","flint-pre-agg-dashboards"]
"workflows": ["flint-live-dashboards"]
},
{
"name": "example_queries",
Expand Down Expand Up @@ -110,75 +98,12 @@
"type": "savedObjectBundle",
"workflows": ["flint-live-dashboards"]
},
{
"name": "aws_vpc_flow_flint-pre_agg",
"version": "1.0.0",
"extension": "ndjson",
"type": "savedObjectBundle",
"workflows": ["flint-pre-agg-dashboards"]
},
{
"name": "vpc_live_all_mv",
{
"name": "aws_vpc_live_stream_mv_schema",
"version": "1.0.0",
"extension": "sql",
"type": "query",
"workflows": ["flint-live-dashboards"]
},
{
"name": "vpc_live_week_mv",
"version": "1.0.0",
"extension": "sql",
"type": "query",
"workflows": ["flint-pre-agg-dashboards"]
},
{
"name": "vpc_agg_60min_connections_mv",
"version": "1.0.0",
"extension": "sql",
"type": "query",
"workflows": ["flint-pre-agg-dashboards"]
},
{
"name": "vpc_window-agg_60min_dest_ip_cardinality_mv",
"version": "1.0.0",
"extension": "sql",
"type": "query",
"workflows": ["flint-pre-agg-dashboards"]
},
{
"name": "vpc_window-agg_60min_dest_ip_total-bytes_mv",
"version": "1.0.0",
"extension": "sql",
"type": "query",
"workflows": ["flint-pre-agg-dashboards"]
},
{
"name": "vpc_live_week_refresh",
"version": "1.0.0",
"extension": "sql",
"type": "query",
"workflows": ["flint-pre-agg-refresh"]
},
{
"name": "vpc_agg_60min_connections_refresh",
"version": "1.0.0",
"extension": "sql",
"type": "query",
"workflows": ["flint-pre-agg-refresh"]
},
{
"name": "vpc_window-agg_60min_dest_ip_cardinality_refresh",
"version": "1.0.0",
"extension": "sql",
"type": "query",
"workflows": ["flint-pre-agg-refresh"]
},
{
"name": "vpc_window-agg_60min_dest_ip_total-bytes_refresh",
"version": "1.0.0",
"extension": "sql",
"type": "query",
"workflows": ["flint-pre-agg-refresh"]
}
],
"sampleData": {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
CREATE MATERIALIZED VIEW {table_name}__live_mview AS
SELECT
CAST(IFNULL(srcport, 0) AS LONG) AS `aws.vpc.srcport`,
CAST(IFNULL(pkt_srcaddr, 'Unknown') AS STRING) AS `aws.vpc.pkt-src-aws-service`,
CAST(IFNULL(srcaddr, '0.0.0.0') AS STRING) AS `aws.vpc.srcaddr`,
CAST(IFNULL(interface_id, 'Unknown') AS STRING) AS `aws.vpc.src-interface_uid`,
CAST(IFNULL(vpc_id, 'Unknown') AS STRING) AS `aws.vpc.src-vpc_uid`,
CAST(IFNULL(instance_id, 'Unknown') AS STRING) AS `aws.vpc.src-instance_uid`,
CAST(IFNULL(subnet_id, 'Unknown') AS STRING) AS `aws.vpc.src-subnet_uid`,
CAST(IFNULL(dstport, 0) AS LONG) AS `aws.vpc.dstport`,
CAST(IFNULL(pkt_dstaddr, 'Unknown') AS STRING) AS `aws.vpc.pkt-dst-aws-service`,
CAST(IFNULL(dstaddr, '0.0.0.0') AS STRING) AS `aws.vpc.dstaddr`,
CAST(IFNULL(flow_direction, 'Unknown') AS STRING) AS `aws.vpc.flow-direction`,
CAST(IFNULL(tcp_flags, '0') AS STRING) AS `aws.vpc.connection.tcp_flags`,
CAST(IFNULL(packets, 0) AS LONG) AS `aws.vpc.packets`,
CAST(IFNULL(bytes, 0) AS LONG) AS `aws.vpc.bytes`,
CAST(FROM_UNIXTIME(start ) AS TIMESTAMP) AS `@timestamp`,
CAST(FROM_UNIXTIME(start ) AS TIMESTAMP) AS `start_time`,
CAST(FROM_UNIXTIME(start ) AS TIMESTAMP) AS `interval_start_time`,
CAST(FROM_UNIXTIME(`end` ) AS TIMESTAMP) AS `end_time`,
CAST(IFNULL(log_status, 'Unknown') AS STRING) AS `aws.vpc.status_code`,
CAST(IFNULL(version, 0) AS LONG) AS `aws.vpc.version`,
CAST(IFNULL(type, 'Unknown') AS STRING) AS `aws.vpc.type_name`,
CAST(IFNULL(traffic_path, 0) AS LONG) AS `aws.vpc.traffic_path`,
CAST(IFNULL(az_id, 'Unknown') AS STRING) AS `aws.vpc.az_id`,
CAST(IFNULL(action, 'Unknown') AS STRING) AS `aws.vpc.action`,
CAST(IFNULL(region, 'Unknown') AS STRING) AS `aws.vpc.region`,
CAST(IFNULL(account_id, 'Unknown') AS STRING) AS `aws.vpc.account-id`,
CAST(IFNULL(sublocation_type, 'Unknown') AS STRING) AS `aws.vpc.sublocation_type`,
CAST(IFNULL(sublocation_id, 'Unknown') AS STRING) AS `aws.vpc.sublocation_id`

FROM
{table_name}
WITH (
auto_refresh = true,
refresh_interval = '15 Minute',
checkpoint_location = '{s3_checkpoint_location}',
watermark_delay = '1 Minute',
extra_options = '{ "{table_name}": { "maxFilesPerTrigger": "10" }}'
)
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
CREATE SKIPPING INDEX ON {table_name} (
accountid BLOOM_FILTER,
account_id BLOOM_FILTER,
region VALUE_SET,
severity_id VALUE_SET,
`src_endpoint.ip` BLOOM_FILTER,
`dst_endpoint.ip` BLOOM_FILTER,
`src_endpoint.svc_name` VALUE_SET,
`dst_endpoint.svc_name` VALUE_SET,
`traffic.bytes` MIN_MAX
srcaddr BLOOM_FILTER,
dstaddr BLOOM_FILTER,
pkt_src_aws_service VALUE_SET,
pkt_dst_aws_service VALUE_SET,
bytes MIN_MAX
) WITH (
auto_refresh = true,
refresh_interval = '15 Minutes',
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
CREATE EXTERNAL TABLE IF NOT EXISTS {table_name} (
version int,
account_id string,
interface_id string,
srcaddr string,
dstaddr string,
srcport int,
dstport int,
protocol bigint,
packets bigint,
bytes bigint,
start bigint,
`end` bigint,
action string,
log_status string,
vpc_id string,
subnet_id string,
instance_id string,
tcp_flags int,
type string,
pkt_srcaddr string,
pkt_dstaddr string,
region string,
az_id string,
sublocation_type string,
sublocation_id string,
pkt_src_aws_service string,
pkt_dst_aws_service string,
flow_direction string,
traffic_path int
) USING parquet
LOCATION '{s3_bucket_location}'
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{"attributes":{"createdTimeMs":1713289099101,"savedQuery":{"data_sources":"[{\"name\":\"mys3\",\"type\":\"s3glue\",\"label\":\"mys3\",\"value\":\"mys3\"}]","description":"","name":"agg_60_min_connections_view","query":"SELECT date_trunc('hour', from_unixtime(start_time / 1000)) AS interval_start_time, date_trunc('hour', from_unixtime(start_time / 1000)) + INTERVAL 1 HOUR AS interval_end_time, accountid as `aws.vpc.account-id`, region as `aws.vpc.region`, COUNT(*) AS total_connections, SUM(CAST(IFNULL(traffic.bytes, 0) AS LONG)) AS total_bytes, SUM(CAST(IFNULL(traffic.packets, 0) AS LONG)) AS total_packets FROM {table_name} GROUP BY date_trunc('hour', from_unixtime(start_time / 1000)), region, accountid\n","query_lang":"SQL","selected_date_range":{"end":"now","start":"now-15m","text":""},"selected_fields":{"text":"","tokens":[]},"selected_timestamp":{"name":"","type":"timestamp"}},"title":"Hourly count connections summary","version":1},"id":"1d07d010-fc18-11ee-99c9-43e5dbd0692c","references":[],"type":"observability-search","updated_at":"2024-04-16T17:52:30.414Z","version":"WzI3NTEsMV0="}
{"attributes":{"createdTimeMs":1713293044079,"savedQuery":{"data_sources":"[{\"name\":\"mys3\",\"type\":\"s3glue\",\"label\":\"mys3\",\"value\":\"mys3\"}]","description":"","name":"window_agg_60_min_network_ip_bytes","query":"WITH hourly_buckets AS ( SELECT date_trunc('hour', from_unixtime(start_time / 1000)) AS interval_start_time, CAST(IFNULL(dst_endpoint.ip, '0.0.0.0') AS STRING) AS dstaddr, SUM(CAST(IFNULL(traffic.bytes, 0) AS LONG)) AS total_bytes FROM {table_name} GROUP BY interval_start_time, dstaddr ), ranked_addresses AS ( SELECT CAST(interval_start_time AS TIMESTAMP), dstaddr, total_bytes, RANK() OVER (PARTITION BY interval_start_time ORDER BY total_bytes DESC) AS bytes_rank FROM hourly_buckets ) SELECT CAST(interval_start_time AS TIMESTAMP), dstaddr, total_bytes FROM ranked_addresses WHERE bytes_rank <= 50 ORDER BY interval_start_time ASC, bytes_rank ASC","query_lang":"SQL","selected_date_range":{"end":"now","start":"now-15m","text":""},"selected_fields":{"text":"","tokens":[]},"selected_timestamp":{"name":"","type":"timestamp"}},"title":"window hourly network ip bytes summary","version":1},"id":"4c6b8820-fc21-11ee-ab45-d3075d0510e6","references":[],"type":"observability-search","updated_at":"2024-04-16T18:44:47.956Z","version":"WzI4MzAsMV0="}
{"attributes":{"createdTimeMs":1713290175184,"savedQuery":{"data_sources":"[{\"name\":\"mys3\",\"type\":\"s3glue\",\"label\":\"mys3\",\"value\":\"mys3\"}]","description":"","name":"TopCommonErrorServicesQuery","query":"SELECT src_endpoint.svc_name AS source_service, dst_endpoint.svc_name AS destination_service, COUNT(*) AS error_count FROM {table_name} WHERE severity_id >= 4 GROUP BY src_endpoint.svc_name, dst_endpoint.svc_name ORDER BY error_count DESC LIMIT 10;\n","query_lang":"SQL","selected_date_range":{"end":"now","start":"now-15m","text":""},"selected_fields":{"text":"","tokens":[]},"selected_timestamp":{"name":"","type":"timestamp"}},"title":"Top 10 pairs of errored source / destination services","version":1},"id":"9e6a9b40-fc1a-11ee-99c9-43e5dbd0692c","references":[],"type":"observability-search","updated_at":"2024-04-16T17:56:15.220Z","version":"WzI3NTIsMV0="}
{"attributes":{"createdTimeMs":1713290175184,"savedQuery":{"data_sources":"[{\"name\":\"mys3\",\"type\":\"s3glue\",\"label\":\"mys3\",\"value\":\"mys3\"}]","description":"","name":"TopCommonErrorServicesQuery","query":"SELECT pkt_src_aws_service AS source_service, pkt_dst_aws_service AS destination_service, COUNT(*) AS error_count FROM {table_name} where log_status IN ('SKIPDATA', 'RETIREDDATA') GROUP BY pkt_src_aws_service, pkt_dst_aws_service ORDER BY error_count DESC LIMIT 10","query_lang":"SQL","selected_date_range":{"end":"now","start":"now-15m","text":""},"selected_fields":{"text":"","tokens":[]},"selected_timestamp":{"name":"","type":"timestamp"}},"title":"Top 10 pairs of errored source / destination services","version":1},"id":"9e6a9b40-fc1a-11ee-99c9-43e5dbd0692c","references":[],"type":"observability-search","updated_at":"2024-04-16T17:56:15.220Z","version":"WzI3NTIsMV0="}
{"attributes":{"createdTimeMs":1713290175184,"savedQuery":{"data_sources":"[{\"name\":\"mys3\",\"type\":\"s3glue\",\"label\":\"mys3\",\"value\":\"mys3\"}]","description":"","name":"HourAggRequestsAndBytes","query":"SELECT date_trunc('hour', FROM_UNIXTIME(CAST(IFNULL(start, 0) AS LONG))) AS interval_start_time, CAST(IFNULL(pkt_srcaddr, 'Unknown') AS STRING) AS `aws.vpc.pkt-src-aws-service`, CAST(IFNULL(srcaddr, '0.0.0.0') AS STRING) AS `aws.vpc.srcaddr`, CAST(IFNULL(pkt_dstaddr, 'Unknown') AS STRING) AS `aws.vpc.pkt-dst-aws-service`, CAST(IFNULL(dstaddr, '0.0.0.0') AS STRING) AS `aws.vpc.dstaddr`, CAST(IFNULL(action, 'Unknown') AS STRING) AS `aws.vpc.action`, CAST(IFNULL(region, 'Unknown') AS STRING) AS `aws.vpc.region`, CAST(IFNULL(account_id, 'Unknown') AS STRING) AS `aws.vpc.account-id`, CAST(IFNULL(log_status, 'Unknown') AS STRING) AS `aws.vpc.status_code`, CAST(IFNULL(flow_direction, 'Unknown') AS STRING) AS `aws.vpc.connection.direction`, COUNT(*) AS total_connections, SUM(CAST(IFNULL(bytes, 0) AS LONG)) AS total_bytes, SUM(CAST(IFNULL(packets, 0) AS LONG)) AS total_packets FROM `zero_etl_walkthrough`.`default`.`amazon_vpc_flow` GROUP BY date_trunc('hour', FROM_UNIXTIME(CAST(IFNULL(start, 0) AS LONG))), pkt_srcaddr, srcaddr, pkt_dstaddr, dstaddr, action, region, account_id, log_status, flow_direction","query_lang":"SQL","selected_date_range":{"end":"now","start":"now-15m","text":""},"selected_fields":{"text":"","tokens":[]},"selected_timestamp":{"name":"","type":"timestamp"}},"title":"Hour aggregation by requests and bytes sum ","version":1},"id":"9e6a9b40-fc1a-11ee-99c9-43e5dbd9992c","references":[],"type":"observability-search","updated_at":"2024-04-16T17:56:15.220Z","version":"WzI3NTIsMV0="}
{"attributes":{"createdTimeMs":1713290175184,"savedQuery":{"data_sources":"[{\"name\":\"mys3\",\"type\":\"s3glue\",\"label\":\"mys3\",\"value\":\"mys3\"}]","description":"","name":"HourWindowTopIpByCardinality","query":" WITH hourly_buckets AS (SELECT date_trunc('hour', FROM_UNIXTIME(CAST(IFNULL(start, 0) AS LONG))) AS interval_start_time, CAST(IFNULL(dstaddr, '0.0.0.0') AS STRING) AS dstaddr, SUM(CAST(IFNULL(bytes, 0) AS LONG)) AS total_bytes FROM {table_name} GROUP BY interval_start_time, dstaddr), ranked_addresses AS (SELECT CAST(interval_start_time AS TIMESTAMP), dstaddr, total_bytes, RANK() OVER (PARTITION BY interval_start_time ORDER BY total_bytes DESC) AS bytes_rank FROM hourly_buckets) SELECT CAST(interval_start_time AS TIMESTAMP), dstaddr, total_bytes FROM ranked_addresses WHERE bytes_rank <= 50 ORDER BY interval_start_time ASC, bytes_rank ASC","query_lang":"SQL","selected_date_range":{"end":"now","start":"now-15m","text":""},"selected_fields":{"text":"","tokens":[]},"selected_timestamp":{"name":"","type":"timestamp"}},"title":"Hour window of top IP by cardinality ","version":1},"id":"9e6add40-fc1a-11ee-99c9-43e5dbd9992c","references":[],"type":"observability-search","updated_at":"2024-04-16T17:56:15.220Z","version":"WzI3NTIsMV0="}
{"exportedCount":7,"missingRefCount":0,"missingReferences":[]}

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

Loading

0 comments on commit b3fb03e

Please sign in to comment.