Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Minor: [Website] Update publishing date for Our journey at F5 with Apache Arrow (part 2) #377

Merged
merged 1 commit into from
Jun 26, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
---
layout: post
title: "Our journey at F5 with Apache Arrow (part 2): Adaptive Schemas and Sorting to Optimize Arrow Usage"
date: "2023-06-15 00:00:00"
date: "2023-06-26 00:00:00"
author: Laurent Quérel
categories: [application]
---
Expand Down Expand Up @@ -42,7 +42,7 @@ The following Go Arrow schema definition provides an example of such a schema, i
var (
// Arrow schema for the OTLP Arrow Traces record (without attributes, links, and events).
TracesSchema = arrow.NewSchema([]arrow.Field{
// Nullabe:true means the field is optional, in this case of 16 bit unsigned integers
// Nullabe:true means the field is optional, in this case of 16 bit unsigned integers
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 it appears my editor removed trailing spaces automatically as well. I think that is ok

{Name: constants.ID, Type: arrow.PrimitiveTypes.Uint16, Nullable: true},
{Name: constants.Resource, Type: arrow.StructOf([]arrow.Field{
{Name: constants.ID, Type: arrow.PrimitiveTypes.Uint16, Nullable: true},
Expand Down Expand Up @@ -77,11 +77,11 @@ var (
)
```

In this example, Arrow field-level metadata are employed to designate when a field is optional (Nullable: true) or to specify the minimal dictionary encoding applicable to a particular field (Metadata Dictionary8/16/…). Now let’s imagine a scenario utilizing this schema in a straightforward scenario, wherein only a handful of fields are actually in use, and the cardinality of most dictionary-encoded fields is low (i.e., below 2^8). Ideally, we'd want a system capable of dynamically constructing the following simplified schema, which, in essence, is a strict subset of the original schema.
In this example, Arrow field-level metadata are employed to designate when a field is optional (Nullable: true) or to specify the minimal dictionary encoding applicable to a particular field (Metadata Dictionary8/16/…). Now let’s imagine a scenario utilizing this schema in a straightforward scenario, wherein only a handful of fields are actually in use, and the cardinality of most dictionary-encoded fields is low (i.e., below 2^8). Ideally, we'd want a system capable of dynamically constructing the following simplified schema, which, in essence, is a strict subset of the original schema.

```go
var (
// Simplified schema definition generated by the Arrow Record encoder based on
// Simplified schema definition generated by the Arrow Record encoder based on
// the data observed.
TracesSchema = arrow.NewSchema([]arrow.Field{
{Name: constants.ID, Type: arrow.PrimitiveTypes.Uint16, Nullable: true},
Expand All @@ -99,7 +99,7 @@ var (
)
```

Additionally, we desire a system capable of automatically adapting the aforementioned schema if it encounters new fields or existing fields with a cardinality exceeding the size of the current dictionary definition in future batches. In extreme scenarios, if the cardinality of a specific field surpasses a certain threshold, we would prefer the system to automatically revert to the non-dictionary representation (mechanism of dictionary overflow). That is precisely what we will elaborate on in the remainder of this section.
Additionally, we desire a system capable of automatically adapting the aforementioned schema if it encounters new fields or existing fields with a cardinality exceeding the size of the current dictionary definition in future batches. In extreme scenarios, if the cardinality of a specific field surpasses a certain threshold, we would prefer the system to automatically revert to the non-dictionary representation (mechanism of dictionary overflow). That is precisely what we will elaborate on in the remainder of this section.

An overview of the different components and events used to implement this approach is depicted in figure 1.

Expand All @@ -114,32 +114,32 @@ More specifically, the process of the Adaptive Arrow schema component consists o

**Initialization phase**

During the initialization phase, the Arrow Record Encoder reads the annotated Arrow schema (i.e. the reference schema) and generates a collection of transformations. When these transformations are applied to the reference schema, they yield the first minimal Arrow schema that adheres to the constraints depicted by these annotations. In this initial iteration, all optional fields are eliminated, and all dictionary-encoded fields are configured to utilize the smallest encoding as defined by the annotation (only `Dictionary8` in the previous example). These transformations form a tree, reflecting the structure of the reference schema.
During the initialization phase, the Arrow Record Encoder reads the annotated Arrow schema (i.e. the reference schema) and generates a collection of transformations. When these transformations are applied to the reference schema, they yield the first minimal Arrow schema that adheres to the constraints depicted by these annotations. In this initial iteration, all optional fields are eliminated, and all dictionary-encoded fields are configured to utilize the smallest encoding as defined by the annotation (only `Dictionary8` in the previous example). These transformations form a tree, reflecting the structure of the reference schema.

**Feeding phase**

Following the initialization is the feeding phase. Here, the Arrow Record Encoder scans the batch and attempts to store all the fields in an Arrow Record Builder, which is defined by the schema created in the prior step. If a field exists in the data but is not included in the schema, the encoder will trigger a `missing field` event. This process continues until the current batch is completely processed. An additional internal check is conducted on all dictionary-encoded fields in the Arrow Record builder to ensure there’s no dictionary overflow (i.e. more unique entries than the cardinality of the index permits). `Dictionary overflow` events are generated if such a situation is detected. Consequently, by the end, all unknown fields and dictionary overflow would have been detected, or alternatively, no discrepancies would have surfaced if the data aligns perfectly with the schema.
Following the initialization is the feeding phase. Here, the Arrow Record Encoder scans the batch and attempts to store all the fields in an Arrow Record Builder, which is defined by the schema created in the prior step. If a field exists in the data but is not included in the schema, the encoder will trigger a `missing field` event. This process continues until the current batch is completely processed. An additional internal check is conducted on all dictionary-encoded fields in the Arrow Record builder to ensure there’s no dictionary overflow (i.e. more unique entries than the cardinality of the index permits). `Dictionary overflow` events are generated if such a situation is detected. Consequently, by the end, all unknown fields and dictionary overflow would have been detected, or alternatively, no discrepancies would have surfaced if the data aligns perfectly with the schema.

**Corrective phase**

If at least one event has been generated, a corrective phase will be initiated to fix the schema. This optional stage considers all the events generated in the previous stage and adjusts the transformation tree accordingly to align with the observed data. A `missing field` event will remove a NoField transformation for the corresponding field. A `dictionary overflow` event will modify the dictionary transformation to mirror the event (e.g. changing the index type from uint8 to uint16, or if the maximum index size has been reached, the transformation will remove the dictionary-encoding and revert to the original non-dictionary-encoded type). The updated transformation tree is subsequently used to create a new schema and a fresh Arrow Record Builder. This Record Builder is then utilized to replay the preceding feeding phase with the batch that wasn’t processed correctly.

**Routing phase**

Once a Record Builder has been properly fed, an Arrow Record is created, and the system transitions into the routing phase. The router component calculates a schema signature of the record and utilizes this signature to route the record to an existing Arrow stream compatible with the signature, or it initiates a new stream if there is no match.
Once a Record Builder has been properly fed, an Arrow Record is created, and the system transitions into the routing phase. The router component calculates a schema signature of the record and utilizes this signature to route the record to an existing Arrow stream compatible with the signature, or it initiates a new stream if there is no match.

This four-phase process should gradually adapt and stabilize the schema to a structure and definition that is optimized for a specific data stream. Unused fields will never unnecessarily consume memory. Dictionary-encoded fields will be defined with the most optimal index size based on the observed data cardinality, and fields with a cardinality exceeding a certain threshold (defined by configuration) will automatically revert to their non-dictionary-encoded versions.
This four-phase process should gradually adapt and stabilize the schema to a structure and definition that is optimized for a specific data stream. Unused fields will never unnecessarily consume memory. Dictionary-encoded fields will be defined with the most optimal index size based on the observed data cardinality, and fields with a cardinality exceeding a certain threshold (defined by configuration) will automatically revert to their non-dictionary-encoded versions.

To effectively execute this approach, you must ensure that there is a sufficient level of flexibility on the receiver side. It's crucial that your downstream pipeline remains functional even when some fields are missing in the schema or when various dictionary index configurations are employed. While this may not always be feasible without implementing additional transformations upon reception, it proves worthwhile in certain scenarios.

The following results highlight the significant memory usage reduction achieved through the application of various optimization techniques. These results were gathered using a schema akin to the one previously presented. The considerable memory efficiency underscores the effectiveness of this approach.
The following results highlight the significant memory usage reduction achieved through the application of various optimization techniques. These results were gathered using a schema akin to the one previously presented. The considerable memory efficiency underscores the effectiveness of this approach.

<figure style="text-align: center;">
<img src="{{ site.baseurl }}/img/journey-apache-arrow/memory-usage-25k-traces.png" width="100%" class="img-responsive" alt="Fig 2: Comparative analysis of memory usage for different schema optimizations.">
<figcaption>Fig 2: Comparative analysis of memory usage for different schema optimizations.</figcaption>
</figure>

The concept of a transformation tree enables a generalized approach to perform various types of schema optimizations based on the knowledge acquired from the data. This architecture is highly flexible; the current implementation allows for the removal of unused fields, the application of the most specific dictionary encoding, and the optimization of union type variants. In the future, there is potential for introducing additional optimizations that can be expressed as transformations on the initial schema. An implementation of this approach is available [here](https://github.com/f5/otel-arrow-adapter/tree/main/pkg/otel/common/schema).
The concept of a transformation tree enables a generalized approach to perform various types of schema optimizations based on the knowledge acquired from the data. This architecture is highly flexible; the current implementation allows for the removal of unused fields, the application of the most specific dictionary encoding, and the optimization of union type variants. In the future, there is potential for introducing additional optimizations that can be expressed as transformations on the initial schema. An implementation of this approach is available [here](https://github.com/f5/otel-arrow-adapter/tree/main/pkg/otel/common/schema).

## Handling recursive schema definition

Expand Down Expand Up @@ -169,18 +169,18 @@ These Arrow records, also referred to as tables, form a hierarchy with `METRICS`
<figcaption>Fig 4: A simplified entity-relationship diagram representing OTel sum & gauge metrics.</figcaption>
</figure>

The relationship between the primary `METRICS` table and the secondary `RESOURCE_ATTRS`, `SCOPE_ATTRS`, and `NUMBER_DATA_POINTS` tables is established through a unique `id` in the main table and a `parent_id` column in each of the secondary tables. This {id,parent_id} pair represents an overhead that should be minimized to the greatest extent possible post-compression.
The relationship between the primary `METRICS` table and the secondary `RESOURCE_ATTRS`, `SCOPE_ATTRS`, and `NUMBER_DATA_POINTS` tables is established through a unique `id` in the main table and a `parent_id` column in each of the secondary tables. This {id,parent_id} pair represents an overhead that should be minimized to the greatest extent possible post-compression.

To achieve this, the ordering process for the different tables adheres to the hierarchy, starting from the main table down to the leaf. The main table is sorted (by one or multiple columns), and then an incremental id is assigned to each row. This numerical id is stored using delta-encoding, which is implemented on top of Arrow.
To achieve this, the ordering process for the different tables adheres to the hierarchy, starting from the main table down to the leaf. The main table is sorted (by one or multiple columns), and then an incremental id is assigned to each row. This numerical id is stored using delta-encoding, which is implemented on top of Arrow.

The secondary tables directly connected to the main table are sorted using the same principle, but the `parent_id` column is consistently utilized as the last column in the sort statement. Including the `parent_id` column in the sort statement enables the use of a variation of delta encoding. The efficiency of this approach is summarized in the chart below.
The secondary tables directly connected to the main table are sorted using the same principle, but the `parent_id` column is consistently utilized as the last column in the sort statement. Including the `parent_id` column in the sort statement enables the use of a variation of delta encoding. The efficiency of this approach is summarized in the chart below.

<figure style="text-align: center;">
<img src="{{ site.baseurl }}/img/journey-apache-arrow/compressed-message-size.png" width="100%" class="img-responsive" alt="Fig 5: Comparative analysis of compression ratios - OTLP Protocol vs. Two variations of the OTel Arrow Protocol with multivariate metrics stream. (lower percentage is better)">
<figcaption>Fig 5: Comparative analysis of compression ratios - OTLP Protocol vs. Two variations of the OTel Arrow Protocol with multivariate metrics stream. (lower percentage is better)</figcaption>
</figure>

The second column presents the average size of the OTLP batch both pre- and post-ZSTD compression for batches of varying sizes. This column serves as a reference point for the ensuing two columns. The third column displays results for the OTel Arrow protocol without any sorting applied, while the final column showcases results for the OTel Arrow protocol with sorting enabled.
The second column presents the average size of the OTLP batch both pre- and post-ZSTD compression for batches of varying sizes. This column serves as a reference point for the ensuing two columns. The third column displays results for the OTel Arrow protocol without any sorting applied, while the final column showcases results for the OTel Arrow protocol with sorting enabled.

Before compression, the average batch sizes for the two OTel Arrow configurations are predictably similar. However, post-compression, the benefits of sorting each individual table on the compression ratio become immediately apparent. Without sorting, the OTel Arrow protocol exhibits a compression ratio that's 1.40 to 1.67 times better than the reference. When sorting is enabled, the OTel Arrow protocol outperforms the reference by a factor ranging from 4.94 to 7.21 times!

Expand All @@ -192,9 +192,9 @@ Decomposing a complex schema into multiple simpler schemas to enhance sorting ca

This article concludes our two-part series on Apache Arrow, wherein we have explored various strategies to maximize the utility of Apache Arrow within specific contexts. The adaptive schema architecture presented in the second part of this series paves the way for future optimization possibilities. We look forward to seeing what the community can add based on this contribution.

Apache Arrow is an exceptional project, continually enhanced by a thriving ecosystem. However, throughout our exploration, we have noticed certain gaps or points of friction that, if addressed, could significantly enrich the overall experience.
* Designing an efficient Arrow schema can, in some cases, prove to be a challenging task. Having the **ability to collect statistics** at the record level could facilitate this design phase (data distribution per field, dictionary stats, Arrow array sizes before/after compression, and so on). These statistics would also assist in identifying the most effective columns on which to base the record sorting.
* **Native support for recursive schemas** would also increase adoption by simplifying the use of Arrow in complex scenarios. While I'm not aware of any attempts to address this limitation within the Arrow system, it doesn't seem insurmountable and would constitute a valuable extension. This would help reduce the complexity of integrating Arrow with other systems that rely on such recursive definitions.
Apache Arrow is an exceptional project, continually enhanced by a thriving ecosystem. However, throughout our exploration, we have noticed certain gaps or points of friction that, if addressed, could significantly enrich the overall experience.
* Designing an efficient Arrow schema can, in some cases, prove to be a challenging task. Having the **ability to collect statistics** at the record level could facilitate this design phase (data distribution per field, dictionary stats, Arrow array sizes before/after compression, and so on). These statistics would also assist in identifying the most effective columns on which to base the record sorting.
* **Native support for recursive schemas** would also increase adoption by simplifying the use of Arrow in complex scenarios. While I'm not aware of any attempts to address this limitation within the Arrow system, it doesn't seem insurmountable and would constitute a valuable extension. This would help reduce the complexity of integrating Arrow with other systems that rely on such recursive definitions.
* **Harmonizing the support for data types as well as IPC stream capabilities** would also be a major benefit. Predominant client libraries support nested and hierarchical schemas, but their use is limited due to a lack of full support across the rest of the ecosystem. For example, list and/or union types are not well supported by query engines or Parquet bridges. Also, the advanced dictionary support within IPC streams is not consistent across different implementations (i.e. delta dictionaries and replacement dictionaries are not supported by all implementations).
* **Optimizing the support of complex schemas** in terms of memory consumption and compression rate could be improved by natively integrating the concept of the dynamic schema presented in this article.
* **Detecting dictionary overflows** (index level) is not something that is easy to test on the fly. The API could be improved to indicate this overflow as soon as an insertion occurs.
Expand Down