Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Integration-RFC] Flint Integration Planning And Adaptation Tutorial #144

Open
YANG-DB opened this issue Apr 10, 2024 · 4 comments
Open
Assignees
Labels
documentation Improvements or additions to documentation integration integration related content schema schema related issue

Comments

@YANG-DB
Copy link
Member

YANG-DB commented Apr 10, 2024

Flint Integration Planning And Adaptation Tutorial

The next tutorial is intended to support a general procedure template for integration developers to be able to adapt for transforming their OpenSearch index based integration into a S3 flint based integration.

Ingestion Tools

The initial step for an integration to become Flint (S3) compatible is to understand the ingestion policy. Ingestion can be achieved using many tools including:

AWS Data Firehose:
This is a fully managed service for delivering real-time streaming data to destinations such as Amazon S3. It's suitable for both proprietary API logs (by sending data to Firehose directly through its API) and standard logs. For AWS ELB logs or Apache logs, you'd typically stream these to Firehose using a log agent or AWS Lambda.

Amazon Kinesis Agent:
A pre-built Java application that offers an easy way to collect and send data to AWS services like Amazon Kinesis Streams, Kinesis Data Firehose, or CloudWatch Logs. You can configure it to monitor log files for various services, including Apache HTTP Server, and forward them to S3.

FluentBit / Data-Prepper
Both are open-source data collectors for unifying data collection and consumption for better use and understanding of data. They can be used to collect data from different sources, transform it, and send it to various destinations, including S3. They are highly flexible and can work with proprietary log formats as well as common ones like Apache logs.

AWS Lambda:
For more custom ingestion logic, such as preprocessing or filtering before storing in S3, AWS Lambda can be used. Lambda can be triggered by various AWS services (like S3 uploads, CloudWatch Logs, API Gateway for proprietary APIs) to process and ingest logs into S3.

CloudWatch Logs:
While primarily a monitoring and observability service, CloudWatch Logs can be configured to route your logs to S3 using a subscription filter and AWS Lambda or directly to S3 for AWS service logs like ELB.
Custom API Solutions: For proprietary APIs, a custom application that ingests logs via API and then uploads them to S3 might be necessary. This can be hosted on EC2 or run as a containerized application on ECS or EKS.

Log Storage Format

The critical step is determining the optimal storage format for your data within Amazon S3.

Raw / CSV

  • Description: Raw or CSV (Comma Separated Values) formats are among the simplest and most widely used data storage formats. They are text-based and human-readable, making them easy for basic viewing and editing.
  • Use Cases: Ideal for simple, flat datasets where complex queries are not required. Well-suited for small datasets that require compatibility with a broad range of tools and systems.
  • Considerations: While easy to use and universally compatible, they might not be the most space or query-efficient for large datasets. They lack support for complex data types and hierarchical data structures.

Parquet

  • Description: Apache Parquet is a columnar storage file format optimized for use with big data processing frameworks. It offers efficient data compression and encoding schemes, leading to significant storage savings and improved query performance over row-oriented data like CSV.
  • Use Cases: Highly recommended for analytic workloads where queries often scan particular columns within large datasets. Parquet is ideal for complex data structures with nested fields.
  • Considerations: It requires compatible query engines like Amazon Athena, Apache Spark, or Presto for data processing.

JSON

  • Description: JSON (JavaScript Object Notation) is a lightweight data-interchange format that's easy for humans to read and write and easy for machines to parse and generate.
  • Use Cases: Suitable for semi-structured data where the schema might evolve over time. Its hierarchical structure is good for document-oriented data similar to the structures used in NoSQL databases.
  • Considerations: JSON files are typically larger than CSV and Parquet.

GZIP / ZIP

  • Description: GZIP and ZIP are file formats used for file compression and decompression. They can be applied to various data formats, including raw, CSV, and JSON, to reduce storage space requirements.
  • Use Cases: Applicable in scenarios where storage cost reduction is a priority. Compression is particularly beneficial for large files and datasets that are not frequently accessed.
  • Considerations: Compressed files must be decompressed before processing, which can add computational overhead.

Table Definition Statement

Once the ingestion process has established and the log format was selected, a table definition is needed to allow queries to run on top of S3 using the build in catalog - wherever it is Hive or Glue.

It effectively maps the data stored in S3 to a structured format that query services can understand, allowing for complex analytical queries to be executed efficiently. This includes column names, data types, and potentially partition keys, which are critical for optimizing query performance.

  • **partition keys are highly relevant if using the parquet format or any format supportive of partitions **

Using AWS Glue Data Catalog

  • Overview: AWS Glue Data Catalog is a fully managed, schema repository service that makes it easy to discover, understand, and manage data. When defining a table in the Glue Data Catalog, it can directly run queries.
  • Benefits: The Glue Data Catalog provides a unified metadata repository across various services, enabling seamless data discovery, query execution, and governance. It also supports schema versioning and schema auto-discovery for some formats, reducing the manual effort required to manage data schemas.
  • Considerations: When defining tables in the Glue Data Catalog, it's important to accurately map the data schema and specify partitions correctly to leverage the full power of data lake analytics. Glue crawlers can be used to automate the discovery and registration of data schemas.

Integrating with Hive Metastore

  • Overview: integrating S3 data with an external Hive Metastore allows for querying S3 data using familiar tools and frameworks that support Hive, such as Apache Spark and Athena.
  • Benefits: This approach leverages existing investments in Hive and allows for a hybrid data lake architecture, combining the scalability and cost-effectiveness of S3 with the robust query capabilities of Hive.

Automated Schema Detection and Table Creation

In the context of Flint (S3) based integrations, manually defining table schemas for large or complex datasets can be cumbersome and prone to errors.

AWS provides a powerful tool, the AWS Glue Crawler, to automate the process of schema detection and table creation, significantly simplifying data management tasks.

AWS Glue Crawler

  • Functionality: AWS Glue Crawlers automatically scan your data stored in Amazon S3 to determine its structure and infer schemas. Upon detection, it creates or updates table definitions in the AWS Glue Data Catalog. This process includes identifying columns, data types, and partitioning structures.
    • Efficiency: Automates the discovery of data schema, reducing the time and effort required for manual schema definition.
    • Accuracy: Minimizes error in schema definition, ensuring that the data catalog accurately reflects the underlying data structure.
    • Dynamic: useful for data lakes where data structures can evolve over time. Glue Crawlers can be scheduled to run at regular intervals, ensuring that the catalog remains up-to-date with the latest schema changes.
  • Use Cases: Ideal for initial dataset explorations, when integrating new data sources, or for datasets that frequently change structure.

Considerations for Using AWS Glue Crawler

  • Data Patterns: While Glue Crawlers are adept at schema detection, they rely on consistent patterns within your data. Highly irregular or complex nested structures may require additional configuration or manual tweaking post-crawl.
  • Crawl Scheduling: Determine the frequency of crawls based on the dynamism of your data. Regularly scheduled crawls ensure that your data catalog remains current but consider the cost implications of frequent crawls.

Best Practices for Automated Schema Detection

  • Initial Manual Review: Even with automation, a manual review of the inferred schema and table definitions can be beneficial, especially for complex datasets, to ensure accuracy and completeness.
  • Data Preparation: Standardizing and cleaning your data before crawling can improve the accuracy of schema detection. This includes structuring files consistently and removing anomalies.
  • Metadata Enhancement: After the crawl, enriching the table definitions with additional metadata (such as descriptions and classifications) can facilitate data discovery and governance within large organizations.

Projections and materialized views table definitions

Once a table is defined, it can now used as the base of creating multiple data projections and Materialized views that reflects different aspects of the data and allow both to visualize and to summarize data in an effective and efficient way.

There are many strategists for this phase and we cant review all of them here. For better understanding and selecting a projection strategy the following considerations are needed to be reviewed:

  • Domain of the data - whether its Observability / Security or any other specific domain that has its particular entities and relationships which will determine the dimensions which are important to reflect and project.

  • Partitioning of the data - as mentioned in the previous part, the partitioning will determine the way data is collected and stored and therefor will directly effect the type of projections and visualizations

Each direct query will include a list of partitioned fields as part of its where clause

Each aggregation query will include a list of partitioned fields as part of its group-by clause

  • Data nature - data can be representing time series samples, aggregated values of histogram metrics - each of these must be taken into account while planning.

Each aggregation query will include a list of core dimensions (fields) as part of its group-by clause

The additional dimensions in the aggregation queries are meant to allow simple filtering and grouping without the need to fetch additional data


Index & Dashboard Naming Conventions

Index name should represent the integration source:

  • aws_vps_flow...
  • aws_cloud_front...
  • apache...

Live queries - should have a live phrase indicating the purpose of this MV query including the time frame this query works on:

  • aws_vps_flow_live_week_view
  • aws_cloud_front_hour_view

Each dashboard / visualization should be aware of the MV index naming convention and have its index pattern take this into account during the integration installation process.

When a visualization uses this MV query projection it should state it in its description:

  • vpc_``live_requests_view_graph

Aggregated queries - should have agg phrase indicating the purpose of this MV query including the time frame this query works on and the aggregated type and dimensions :

  • aws_vps_flow_agg_sum_weekly_requests_And_bytes
  • aws_vps_flow_agg_avg_daily_http_status

When a visualization uses this MV query projection it should state it in its description:

  • vpc_agg``_avg_daily_http_status_pie_chart

MV naming Parts:

  • integration_id : integration identification
  • projection-type: live stream or aggregated projection
  • timeframe: time for the live stream or the aggregated timeframe
  • projected_fields: fields being aggregated or view indicating showing all
  • version: integration / MV version

$integration_id$_$projection-type$_$timeframe$_$projected_fields$_version

Another fields important for future considerations

  • Partition dimensions other than time
  • Aggregated group_by dimensions (fields) attached to each time frame
    • These dimensions help filtering and composite aggregation on the pre-aggregated data

The next plan will display an opinionated strategy that is composed of the following parts:
Domain: Observability**, Partitioning:** Time**, Data nature:** time-series data points with

Live stream view:

The live stream view is a direct copy of the most recent data stream (weekly or daily depending on the resolution)
According to the partition nature of the data, the live stream should have the appropriate where clause that gets the correct partition in addition to the time based live stream specification

WHERE 
    ((`year` = 'StartYear' AND `month` >= 'StartMonth' AND `day` >= 'StartDay') OR
     (`year` = 'EndYear' AND `month` <= 'EndMonth' AND `day` <= 'EndDay'))

Aggregation view of Numeric field:

According to the domain this integration is covering, there will be numerous aggregations summaries of numeric data points such as amount of requests| duration of events | average of bytes and so on.
Each such aggregation is mostly time based and has the following time bucket strategy:
- Daily
- Hourly
- Weekly

Bysupporting multi-scale time based aggregation we can support multiple granular resolution while preserving efficient storage and compute processes .

The following is a 60 minute MV query aggregation summary of total bytes and packets.
In addition It adds the following dimensions for filters

  • status_code
    - direction
    - src.svc_name
    - dst.svc_name
    - accountid
  • region

-- One Hour Aggregation MV of VPC connections / bytes / packets
CREATE MATERIALIZED VIEW IF NOT EXISTS vpcflow_mview_60_min_connections AS
    SELECT
        date_trunc('hour', from_unixtime(start_time / 1000)) AS start_time,
        date_trunc('hour', from_unixtime(start_time / 1000)) + INTERVAL 1 HOUR AS end_time,
 
        status_code as `aws.vpc.status_code`,
        -- action as `aws.vpc.action`, (add to groupBy)

        connection_info.direction AS `aws.vpc.connection.direction`,
        src_endpoint.svc_name as `aws.vpc.pkt-src-aws-service`,
        dst_endpoint.svc_name as `aws.vpc.pkt-dst-aws-service`,
                
        -- vpc_id as `aws.vpc.vpc-id`,(add to groupBy)
        accountid as `aws.vpc.account-id`,    
        region as `aws.vpc.region`,
         
        COUNT(*) AS total_connections,
        SUM(CAST(IFNULL(traffic.bytes, 0) AS LONG)) AS total_bytes,
        SUM(CAST(IFNULL(traffic.packets, 0) AS LONG)) AS total_packets
    FROM
        `vpcflow-db`.vpc_flow
    GROUP BY
        date_trunc('hour', from_unixtime(start_time / 1000)), region, accountid, status_code,src_endpoint.svc_name, dst_endpoint.svc_name, connection_info.direction
    ORDER BY
        start_time
WITH (
  auto_refresh = false
)

Timely window summary of a grouping query

A time window of aggregated summary of a group of values is important to be able to identify trends and anomalies . They allow to formulate a baseline to compare with.

The following is a 60 minute MV query aggregation window summary of top destination IPs ordered by amount of sent bytes

-- One Hour Aggregation time window  of top IP dest by  bytes sum group by hourly 
CREATE MATERIALIZED VIEW IF NOT EXISTS vpcflow_mview_60_min_network_ip_bytes_window AS
WITH hourly_buckets AS (
  SELECT
    date_trunc('hour', from_unixtime(start_time / 1000)) AS hour_bucket,
    dst_endpoint.ip AS dstaddr,
    SUM(CAST(IFNULL(traffic.bytes, 0) AS LONG)) AS total_bytes
  FROM
    `vpcflow-db`.vpc_flow
  GROUP BY
    hour_bucket,
    dstaddr
),
ranked_addresses AS (
  SELECT
    CAST(hour_bucket  AS TIMESTAMP),
    dstaddr,
    total_bytes,
    RANK() OVER (PARTITION BY hour_bucket ORDER BY total_bytes DESC) AS bytes_rank
  FROM
    hourly_buckets
)
SELECT
  CAST(hour_bucket  AS TIMESTAMP),
  dstaddr,
  total_bytes
FROM
  ranked_addresses
WHERE
  bytes_rank <= 50
ORDER BY
  hour_bucket ASC,
  bytes_rank ASC
WITH (
  auto_refresh = false
);

SQL support and best practice

  • time/date fields support
CAST(`FROM_UNIXTIME``(`time/ 1000) AS TIMESTAMP) AS `@timestamp`
  • time/date fields duration and calculations
date_trunc('hour', from_unixtime(start_time / 1000)) AS start_time,
date_trunc('hour', from_unixtime(start_time / 1000)) + INTERVAL 1 HOUR AS end_time,
  • empty values checks
  `CAST``(``IFNULL``(`src_endpoint.port,0) `AS LONG``)`` `
  • types casting
SUM(CAST(IFNULL(traffic.bytes / 1000, 0) AS LONG)) AS total_kbytes,
  • conditional fields definition
  CASE
        WHEN regexp(dst_endpoint.ip, '(10\\..*)|(192\\.168\\..*)|(172\\.1[6-9]\\..*)|(172\\.2[0-9]\\..*)|(172\\.3[0-1]\\.*)')
        THEN 'ingress'
        ELSE 'egress'
  END AS `aws.vpc.flow-direction`,
@Swiddis
Copy link
Collaborator

Swiddis commented Apr 10, 2024

Index name should represent the integration source

It's worth noting that we depend on the existing convention in the code based on the user's table name, changing this would be a code change.

@YANG-DB
Copy link
Member Author

YANG-DB commented Apr 11, 2024

@Swiddis yes - this should be changed ASAP

@seankao-az
Copy link

ASAP, as in making it to AOS 2.13 release? Is it feasible at this point

@Swiddis
Copy link
Collaborator

Swiddis commented Apr 11, 2024

I don't think this is in scope for 2.13. It'd be a hefty change to get all these fields bundled in query construction, and we already depend on the old convention.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
documentation Improvements or additions to documentation integration integration related content schema schema related issue
Projects
Status: Planned work items
Status: No status
Development

No branches or pull requests

3 participants