From 01c3987c902e3c607d843bbc32b8a972f86e0095 Mon Sep 17 00:00:00 2001 From: nmcdonnell-kx Date: Mon, 31 Oct 2022 18:00:59 +0000 Subject: [PATCH 1/4] * Support multithreaded use of arrowkdb with peach * Upgrade build to use libarrow/libparquet 9.0.0 * Add support for reading Parquet files with row groups --- CMakeLists.txt | 1 + README.md | 69 +++++++----------------------- docs/reference.md | 89 +++++++++++++++++++++++++++++++++++++- q/arrowkdb.q | 10 ++++- src/ArrowKdb.cpp | 16 +++++++ src/ArrowKdb.h | 7 +++ src/GenericStore.h | 18 ++++++++ src/TableData.cpp | 104 ++++++++++++++++++++++++++++++++++++++++++++- src/TableData.h | 29 +++++++++++++ travis_setup.sh | 21 +++------ 10 files changed, 293 insertions(+), 71 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 4c55a07..de789c4 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -21,6 +21,7 @@ message(STATUS "C API : ${ARROW_INSTALL}") include_directories ( ${ARROW_INSTALL}/include ${_VCPKG_ROOT_DIR}/installed/${VCPKG_TARGET_TRIPLET}/include # where arrow has been installed using vcpkg + C:/Git/vcpkg/installed/x64-windows-static/include ${CMAKE_BINARY_DIR} # For 'k.h', downloaded below ) diff --git a/README.md b/README.md index 20a8041..c9adc8e 100644 --- a/README.md +++ b/README.md @@ -42,7 +42,7 @@ Conversely, Arrow is an in-memory format meant for direct and efficient use for ### Requirements - kdb+ ≥ 3.5 64-bit (Linux/MacOS/Windows) -- Apache Arrow ≥ 2.0.0 +- Apache Arrow = 9.0.0 (or ≥ 2.0.0 if building `arrowkdb` from source) - C++11 or later - CMake ≥ 3.1.3 @@ -54,65 +54,33 @@ Conversely, Arrow is an in-memory format meant for direct and efficient use for Follow the instructions [here](https://arrow.apache.org/install/#c-and-glib-c-packages-for-debian-gnulinux-ubuntu-and-centos) to install `libarrow-dev` and `libparquet-dev` from Apache's APT or Yum repositories. -#### MacOS - -Follow the instructions [here](https://arrow.apache.org/install/#c-and-glib-c-packages-on-homebrew) to install `apache-arrow` using Homebrew. - -#### Windows (using `vcpkg`) - -A `vcpkg` installation of Arrow is available as described [here](https://arrow.apache.org/install/#c-package-on-vcpkg). This requires installation the of the `x64-windows` triplet for Arrow then copying the `vcpkg` installed DLLs (Arrow, Parquet and compression libs) to the `%QHOME%\w64` directory: - -```bash -C:\Git> git clone https://github.com/Microsoft/vcpkg.git -C:\Git> cd vcpkg -C:\Git\vcpkg> bootstrap-vcpkg.bat -C:\Git\vcpkg> vcpkg integrate install -C:\Git\vcpkg> vcpkg install arrow:x64-windows -C:\Git\vcpkg> copy C:\Git\vcpkg\installed\x64-windows\bin\*.dll %QHOME%\w64 -``` - -#### Windows (building Arrow from source) - -It is also possible to build Arrow from source. Full details are provided [here](https://arrow.apache.org/docs/developers/cpp/windows.html) but the basic steps are as follows: - -##### Snappy - -First download and build snappy which is required by Parquet. From a Visual Studio command prompt: +Note: If using the packaged version of `arrowkdb` you should install version 9.0.0 of both: ```bash -C:\Git> git clone https://github.com/google/snappy.git -C:\Git> cd snappy +sudo apt install -y -V libarrow-dev=9.0.0-1 +sudo apt install -y -V libparquet-dev=9.0.0-1 ``` -Create an install directory and set an environment variable to this directory (substituting the correct absolute path as appropriate). This environment variable is used again later when building Arrow: +#### MacOS -```bash -C:\Git\snappy> mkdir install -C:\Git\snappy> set SNAPPY_INSTALL=C:\Git\snappy\install -``` +Follow the instructions [here](https://arrow.apache.org/install/#c-and-glib-c-packages-on-homebrew) to install `apache-arrow` using Homebrew. -Create the CMake build directory and generate the build files (this will default to using the Visual Studio CMake generator when run from a VS command prompt): +#### Windows -```bash -C:\Git\snappy> mkdir build -C:\Git\snappy> cd build -C:\Git\snappy\build> cmake -DCMAKE_INSTALL_PREFIX=%SNAPPY_INSTALL% -DSNAPPY_BUILD_BENCHMARKS:BOOL=0 -DSNAPPY_BUILD_TESTS:BOOL=0 .. -``` +On Windows it is necessary to build Arrow from source. Full details are provided [here](https://arrow.apache.org/docs/developers/cpp/windows.html) but the basic steps are as follows. -Build and install snappy: +From a Visual Studio command prompt, clone the Arrow source from github: ```bash -C:\Git\snappy\build> cmake --build . --config Release -C:\Git\snappy\build> cmake --build . --config Release --target install +C:\Git> git clone https://github.com/apache/arrow.git +C:\Git> cd arrow ``` -##### Arrow - -From a Visual Studio command prompt, clone the Arrow source from github: +Switch to the 9.0.0 tag: ```bash -C:\Git> git clone https://github.com/apache/arrow.git -C:\Git> cd arrow\cpp +C:\Git\arrow> git checkout refs/tags/apache-arrow-9.0.0 -- +C:\Git> cd cpp ``` Create an install directory and set an environment variable to this directory (substituting the correct absolute path as appropriate). This environment variable is used again later when building `arrowkdb`: @@ -127,7 +95,7 @@ Create the CMake build directory and generate the build files (this will default ```bash C:\Git\arrow\cpp> mkdir build C:\Git\arrow\cpp> cd build -C:\Git\arrow\cpp\build> cmake .. -DARROW_PARQUET=ON -DARROW_WITH_SNAPPY=ON -DARROW_BUILD_STATIC=OFF -DSnappy_LIB=%SNAPPY_INSTALL%\lib\snappy.lib -DSnappy_INCLUDE_DIR=%SNAPPY_INSTALL%\include -DCMAKE_INSTALL_PREFIX=%ARROW_INSTALL% +C:\Git\arrow\cpp\build> cmake .. -DARROW_PARQUET=ON -DARROW_WITH_SNAPPY=ON -DARROW_BUILD_STATIC=OFF -DARROW_COMPUTE=OFF -DARROW_DEPENDENCY_USE_SHARED=OFF -DCMAKE_INSTALL_PREFIX=%ARROW_INSTALL% ``` Build and install Arrow: @@ -154,7 +122,7 @@ It is recommended that a user install this interface through a release. This is 2. Download a release from [here](https://github.com/KxSystems/arrowkdb/releases) for your system architecture. 3. Install script `arrowkdb.q` to `$QHOME`, and binary file `lib/arrowkdb.(so|dll)` to `$QHOME/[mlw](64)`, by executing the following from the Release directory: -``` +```bash ## Linux/MacOS chmod +x install.sh && ./install.sh @@ -187,9 +155,6 @@ cd build ## Linux/MacOS cmake .. -## Windows (using the vcpkg Arrow installation) -cmake .. -DCMAKE_TOOLCHAIN_FILE=C:/Git/vcpkg/scripts/buildsystems/vcpkg.cmake - ## Windows (using the Arrow installation which was build from source as above) cmake .. -DARROW_INSTALL=%ARROW_INSTALL% ``` @@ -216,8 +181,6 @@ Documentation outlining the functionality available for this interface can be fo ## Status -**Warning: This interface is currently a pre-release alpha and subject to non-backwards compatible changes without notice.** - The arrowkdb interface is provided here under an Apache 2.0 license. If you find issues with the interface or have feature requests, please consider raising an issue [here](https://github.com/KxSystems/arrowkdb/issues). diff --git a/docs/reference.md b/docs/reference.md index 7366a71..4a042e5 100644 --- a/docs/reference.md +++ b/docs/reference.md @@ -2232,7 +2232,94 @@ q)read_table~table 1b ``` -## Arrow IPC files +### `pq.readParquetNumRowGroups` + +*Read the number of row groups used by a Parquet file* + +```syntax +.arrowkdb.pq.readParquetNumRowGroups[parquet_file] +``` + +Where `parquet_file` is a string containing the Parquet file name + +returns the number of row groups + +```q +q)table:([]a:10000000#0;b:10000000#1) +q).arrowkdb.pq.writeParquetFromTable["file.parquet";table;::] +q).arrowkdb.pq.readParquetNumRowGroups["file.parquet"] +10i +``` + +### `pq.readParquetRowGroups` + +*Read a set of row groups from a Parquet file into an Arrow table then convert to a kdb+ mixed list of array data* + +```syntax +.arrowkdb.pq.readParquetRowGroups[parquet_file;row_groups;columns;options] +``` + +Where: + +- `parquet_file` is a string containing the Parquet file name +- `row_groups` is an integer list (6h) of row groups indices to read, or generic null (::) to read all row groups +- `columns` is an integer list (6h) of column indices to read, or generic null (::) to read all columns +- `options` is a kdb+ dictionary of options or generic null (::) to use defaults. Dictionary key must be a 11h list. Values list can be 7h, 11h or mixed list of -7|-11|4h. + +returns the array data + +Supported options: + +- `PARQUET_MULTITHREADED_READ` - Flag indicating whether the Parquet reader should run in multithreaded mode. This can improve performance by processing multiple columns in parallel. Long, default 0. +- `USE_MMAP` - Flag indicating whether the Parquet file should be memory mapped in. This can improve performance on systems which support mmap. Long, default: 0. +- `DECIMAL128_AS_DOUBLE` - Flag indicating whether to override the default type mapping for the Arrow decimal128 datatype and instead represent it as a double (9h). Long, default 0. + +```q +q)table:([]a:10000000#0;b:10000000#1) +q).arrowkdb.pq.writeParquetFromTable["file.parquet";table;::] +q).arrowkdb.pq.readParquetRowGroups["file.parquet";1 2i;enlist 0i;::] +0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0.. +q)count .arrowkdb.pq.readParquetRowGroups["file.parquet";1 2i;enlist 0i;::] +1 +q)count first .arrowkdb.pq.readParquetRowGroups["file.parquet";1 2i;enlist 0i;::] +2097152 +``` + +### `pq.readParquetRowGroupsToTable` + +*Read a set of row groups from a Parquet file into an Arrow table then convert to a kdb+ table* + +```syntax +.arrowkdb.pq.readParquetRowGroupsToTable[parquet_file;row_groups;columns;options] +``` + +Where: + +- `parquet_file` is a string containing the Parquet file name +- `row_groups` is an integer list (6h) of row groups indices to read, or generic null (::) to read all row groups +- `columns` is an integer list (6h) of column indices to read, or generic null (::) to read all columns +- `options` is a kdb+ dictionary of options or generic null (::) to use defaults. Dictionary key must be a 11h list. Values list can be 7h, 11h or mixed list of -7|-11|4h. + +returns the kdb+ table + +Supported options: + +- `PARQUET_MULTITHREADED_READ` - Flag indicating whether the Parquet reader should run in multithreaded mode. This can improve performance by processing multiple columns in parallel. Long, default 0. +- `USE_MMAP` - Flag indicating whether the Parquet file should be memory mapped in. This can improve performance on systems which support mmap. Long, default: 0. +- `DECIMAL128_AS_DOUBLE` - Flag indicating whether to override the default type mapping for the Arrow decimal128 datatype and instead represent it as a double (9h). Long, default 0. + +```q +q)table:([]a:10000000#0;b:10000000#1) +q).arrowkdb.pq.writeParquetFromTable["file.parquet";table;::] +q)meta .arrowkdb.pq.readParquetRowGroupsToTable["file.parquet";1 2i;enlist 0i;::] +c| t f a +-| ----- +a| j +q)count .arrowkdb.pq.readParquetRowGroupsToTable["file.parquet";1 2i;enlist 0i;::] +2097152 +``` + +### Arrow IPC files ### `ipc.writeArrow` diff --git a/q/arrowkdb.q b/q/arrowkdb.q index 7dc5027..8fd01f4 100644 --- a/q/arrowkdb.q +++ b/q/arrowkdb.q @@ -114,6 +114,9 @@ pq.readParquetSchema:`arrowkdb 2:(`readParquetSchema;1); pq.readParquetData:`arrowkdb 2:(`readParquetData;2); pq.readParquetToTable:{[filename;options] flip (fd.fieldName each sc.schemaFields[pq.readParquetSchema[filename]])!(pq.readParquetData[filename;options])}; pq.readParquetColumn:`arrowkdb 2:(`readParquetColumn;3); +pq.readParquetNumRowGroups:`arrowkdb 2:(`readParquetNumRowGroups;1); +pq.readParquetRowGroups:`arrowkdb 2:(`readParquetRowGroups;4); +pq.readParquetRowGroupsToTable:{[filename;row_groups;columns;options] flip (fd.fieldName each sc.schemaFields[pq.readParquetSchema[filename]](columns))!(pq.readParquetRowGroups[filename;row_groups;columns;options])}; // arrow files @@ -134,8 +137,13 @@ ipc.parseArrowToTable:{[serialized;options] flip (fd.fieldName each sc.schemaFie // utils util.buildInfo:`arrowkdb 2:(`buildInfo;1); +util.init:`arrowkdb 2:(`init;1); // testing ts.writeReadArray:`arrowkdb 2:(`writeReadArray;3); -ts.writeReadTable:`arrowkdb 2:(`writeReadTable;3); \ No newline at end of file +ts.writeReadTable:`arrowkdb 2:(`writeReadTable;3); + + +// initialise +util.init[]; diff --git a/src/ArrowKdb.cpp b/src/ArrowKdb.cpp index 75c2d4b..505ae7b 100644 --- a/src/ArrowKdb.cpp +++ b/src/ArrowKdb.cpp @@ -8,6 +8,9 @@ #include "HelperFunctions.h" #include "ArrowKdb.h" +#include "DatatypeStore.h" +#include "FieldStore.h" +#include "SchemaStore.h" // Main is only used for profiling on windows with arrowkdb.exe @@ -77,3 +80,16 @@ EXP K buildInfo(K unused) return xD(keys, values); } + +EXP K init(K unused) +{ + // Turn on symbol locking + setm(1); + + // Create the singletons + kx::arrowkdb::GetDatatypeStore(); + kx::arrowkdb::GetFieldStore(); + kx::arrowkdb::GetSchemaStore(); + + return (K)0; +} diff --git a/src/ArrowKdb.h b/src/ArrowKdb.h index 01a6b9f..9bf5ac7 100644 --- a/src/ArrowKdb.h +++ b/src/ArrowKdb.h @@ -20,6 +20,13 @@ extern "C" * version, shared object version, git description and compiler used. */ EXP K buildInfo(K unused); + + /** + * @brief Initialise the library + * @param unused + * @return null + */ + EXP K init(K unused); } #endif // __ARROW_KDB_H__ diff --git a/src/GenericStore.h b/src/GenericStore.h index 8dc26d6..bd85ce5 100644 --- a/src/GenericStore.h +++ b/src/GenericStore.h @@ -3,6 +3,8 @@ #include #include +#include +#include #include #include @@ -29,6 +31,7 @@ class GenericStore long counter; // incremented before an object is added // Forward and reverse lookup maps between the identifiers and their objects + std::shared_timed_mutex mutex; std::map forward_lookup; std::map reverse_lookup; @@ -75,6 +78,9 @@ class GenericStore */ long Add(T value) { + // Get write lock + std::unique_lock(mutex); + if (auto equal = FindEqual(value)) return equal; @@ -96,6 +102,9 @@ class GenericStore */ bool Remove(long value_id) { + // Get write lock + std::unique_lock(mutex); + auto lookup = forward_lookup.find(value_id); if (lookup == forward_lookup.end()) return false; @@ -119,6 +128,9 @@ class GenericStore */ T Find(long value_id) { + // Get read lock + std::shared_lock(mutex); + auto lookup = forward_lookup.find(value_id); if (lookup == forward_lookup.end()) return T(); @@ -135,6 +147,9 @@ class GenericStore */ long ReverseFind(T value) { + // Get read lock + std::shared_lock(mutex); + // Reverse lookup is only used internally by the interface so insert the // object if it's not already present. This avoids having to add this logic // into all the calling functions. @@ -152,6 +167,9 @@ class GenericStore */ const std::vector List(void) { + // Get read lock + std::shared_lock(mutex); + std::vector result; for (auto it : forward_lookup) result.push_back(it.first); diff --git a/src/TableData.cpp b/src/TableData.cpp index cf04159..8cbeb73 100644 --- a/src/TableData.cpp +++ b/src/TableData.cpp @@ -213,6 +213,27 @@ K readParquetSchema(K parquet_file) KDB_EXCEPTION_CATCH; } +K readParquetNumRowGroups(K parquet_file) +{ + KDB_EXCEPTION_TRY; + + if (!kx::arrowkdb::IsKdbString(parquet_file)) + return krr((S)"parquet_file not 11h or 0 of 10h"); + + std::shared_ptr infile; + PARQUET_ASSIGN_OR_THROW( + infile, + arrow::io::ReadableFile::Open(kx::arrowkdb::GetKdbString(parquet_file), + arrow::default_memory_pool())); + + std::unique_ptr reader; + PARQUET_THROW_NOT_OK(parquet::arrow::OpenFile(infile, arrow::default_memory_pool(), &reader)); + + return ki(reader->num_row_groups()); + + KDB_EXCEPTION_CATCH; +} + K readParquetData(K parquet_file, K options) { KDB_EXCEPTION_TRY; @@ -301,6 +322,87 @@ K readParquetColumn(K parquet_file, K column_index, K options) KDB_EXCEPTION_CATCH; } +K readParquetRowGroups(K parquet_file, K row_groups, K columns, K options) +{ + KDB_EXCEPTION_TRY; + + if (!kx::arrowkdb::IsKdbString(parquet_file)) + return krr((S)"parquet_file not 11h or 0 of 10h"); + if (row_groups->t != 101 && row_groups->t != KI) + return krr((S)"row_groups not 101h or 6h"); + if (columns->t != 101 && columns->t != KI) + return krr((S)"columns not 101h or 6h"); + + // Function to convert a KI to vector + auto ki_to_vector_int = [](K ki, std::vector& vec) { + for (auto i = 0; i < ki->n; ++i) + vec.push_back(kI(ki)[i]); + }; + + // Convert row_groups and columns + std::vector rows; + if (row_groups->t == KI) + ki_to_vector_int(row_groups, rows); + std::vector cols; + if (columns->t == KI) + ki_to_vector_int(columns, cols); + + // Parse the options + auto read_options = kx::arrowkdb::KdbOptions(options, kx::arrowkdb::Options::string_options, kx::arrowkdb::Options::int_options); + + // Use multi threading + int64_t parquet_multithreaded_read = 0; + read_options.GetIntOption(kx::arrowkdb::Options::PARQUET_MULTITHREADED_READ, parquet_multithreaded_read); + + // Use memmap + int64_t use_mmap = 0; + read_options.GetIntOption(kx::arrowkdb::Options::USE_MMAP, use_mmap); + + // Type mapping overrides + kx::arrowkdb::TypeMappingOverride type_overrides{ read_options }; + + std::shared_ptr infile; + if (use_mmap) { + PARQUET_ASSIGN_OR_THROW( + infile, + arrow::io::MemoryMappedFile::Open(kx::arrowkdb::GetKdbString(parquet_file), + arrow::io::FileMode::READ)); + } else { + PARQUET_ASSIGN_OR_THROW( + infile, + arrow::io::ReadableFile::Open(kx::arrowkdb::GetKdbString(parquet_file), + arrow::default_memory_pool())); + } + + std::unique_ptr reader; + PARQUET_THROW_NOT_OK(parquet::arrow::OpenFile(infile, arrow::default_memory_pool(), &reader)); + + reader->set_use_threads(parquet_multithreaded_read); + + std::shared_ptr table; + if (row_groups->t == 101 && columns->t == 101) + PARQUET_THROW_NOT_OK(reader->ReadTable(&table)); + else if (row_groups->t == 101) + PARQUET_THROW_NOT_OK(reader->ReadTable(cols, &table)); + else if (columns->t == 101) + PARQUET_THROW_NOT_OK(reader->ReadRowGroups(rows, &table)); + else + PARQUET_THROW_NOT_OK(reader->ReadRowGroups(rows, cols, &table)); + + const auto schema = table->schema(); + SchemaContainsNullable(schema); + const auto col_num = schema->num_fields(); + K data = ktn(0, col_num); + for (auto i = 0; i < col_num; ++i) { + auto chunked_array = table->column(i); + kK(data)[i] = kx::arrowkdb::ReadChunkedArray(chunked_array, type_overrides); + } + + return data; + + KDB_EXCEPTION_CATCH; +} + K writeArrow(K arrow_file, K schema_id, K array_data, K options) { KDB_EXCEPTION_TRY; @@ -536,7 +638,7 @@ K parseArrowData(K char_array, K options) // Get all the record batches in advance std::vector> all_batches; - PARQUET_THROW_NOT_OK(reader->ReadAll(&all_batches)); + PARQUET_ASSIGN_OR_THROW(all_batches, reader->ToRecordBatches()); // Created a chunked array for each column by amalgamating each column's // arrays across all batches diff --git a/src/TableData.h b/src/TableData.h index 06d19d7..1f837ee 100644 --- a/src/TableData.h +++ b/src/TableData.h @@ -157,6 +157,35 @@ extern "C" */ EXP K readParquetColumn(K parquet_file, K column_index, K options); + /** + * @brief Reads the number of row groups used by a parquet file + * + * @param parquet_file String name of the parquet file to read + * @return Number of row groups as a -6h + */ + EXP K readParquetNumRowGroups(K parquet_file); + + /** + * @brief Reads a set of row groups from a parquet file + * + * Supported options: + * + * DECIMAL128_AS_DOUBLE (long) - Flag indicating whether to override the + * default type mapping for the arrow decimal128 datatype and instead + * represent it as a double (9h). Default 0. + * + * @param parquet_file String name of the parquet file to read + * @param row_groups Integer list (6h) of row groups indicies to read, or + * generic null (::) to read all row groups + * @param columns Integer list (6h) of column indicies to read, or + * generic null (::) to read all columns + * @options Dictionary of options or generic null (::) to use + * defaults. Dictionary key must be a 11h list. Values list can be 7h, 11h or + * mixed list of -7|-11|4h. + * @return Mixed list of arrow array objects + */ + EXP K readParquetRowGroups(K parquet_file, K row_groups, K columns, K options); + /** * @brief Creates an arrow IPC record batch file with the specified arrow * schema and populates it from a mixed list of arrow array objects. diff --git a/travis_setup.sh b/travis_setup.sh index a94d9f6..1d8f241 100644 --- a/travis_setup.sh +++ b/travis_setup.sh @@ -7,8 +7,8 @@ if [[ "$TRAVIS_OS_NAME" == "linux" ]]; then wget https://apache.jfrog.io/artifactory/arrow/$(lsb_release --id --short | tr 'A-Z' 'a-z')/apache-arrow-apt-source-latest-$(lsb_release --codename --short).deb sudo apt install -y -V ./apache-arrow-apt-source-latest-$(lsb_release --codename --short).deb sudo apt update - sudo apt install -y -V libarrow-dev - sudo apt install -y -V libparquet-dev + sudo apt install -y -V libarrow-dev=9.0.0-1 + sudo apt install -y -V libparquet-dev=9.0.0-1 elif [[ "$TRAVIS_OS_NAME" == "osx" ]]; then brew install apache-arrow mkdir -p cbuild/install @@ -18,23 +18,14 @@ elif [[ "$TRAVIS_OS_NAME" == "windows" ]]; then mkdir -p cbuild/install export ARROW_INSTALL=$(pwd)/cbuild/install cd cbuild - # Build and install snappy - git clone https://github.com/google/snappy.git - cd snappy - mkdir install - export SNAPPY_INSTALL=$(pwd)/install - mkdir build - cd build - cmake -G "Visual Studio 15 2017 Win64" -DCMAKE_INSTALL_PREFIX=$SNAPPY_INSTALL -DSNAPPY_BUILD_BENCHMARKS:BOOL=0 -DSNAPPY_BUILD_TESTS:BOOL=0 .. - cmake --build . --config Release - cmake --build . --config Release --target install - cd ../.. # Build and install arrow git clone https://github.com/apache/arrow.git - cd arrow/cpp + cd arrow + git checkout refs/tags/apache-arrow-9.0.0 -- + cd cpp mkdir build cd build - cmake -G "Visual Studio 15 2017 Win64" -DARROW_PARQUET=ON -DARROW_WITH_SNAPPY=ON -DARROW_BUILD_STATIC=OFF -DSnappy_LIB=$SNAPPY_INSTALL/lib/snappy.lib -DSnappy_INCLUDE_DIR=$SNAPPY_INSTALL/include -DCMAKE_INSTALL_PREFIX=$ARROW_INSTALL .. + cmake -G "Visual Studio 15 2017 Win64" -DARROW_PARQUET=ON -DARROW_WITH_SNAPPY=ON -DARROW_BUILD_STATIC=OFF -DARROW_COMPUTE=OFF -DARROW_DEPENDENCY_USE_SHARED=OFF -DCMAKE_INSTALL_PREFIX=$ARROW_INSTALL .. cmake --build . --config Release cmake --build . --config Release --target install cd ../.. From e01a665855de3efdaaf56280e00a03f582168ade Mon Sep 17 00:00:00 2001 From: nmcdonnell-kx Date: Mon, 31 Oct 2022 19:03:39 +0000 Subject: [PATCH 2/4] Linux fixes --- CMakeLists.txt | 2 +- src/GenericStore.h | 10 +++++----- src/SchemaStore.cpp | 4 ++-- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index de789c4..29f623d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -6,7 +6,7 @@ cmake_minimum_required(VERSION 3.1.3) project(arrowkdb CXX) set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -DKXVER=3") -set(CMAKE_CXX_STANDARD 11) +set(CMAKE_CXX_STANDARD 14) set(CMAKE_CXX_STANDARD_REQUIRED ON) set(CMAKE_CXX_EXTENSIONS OFF) diff --git a/src/GenericStore.h b/src/GenericStore.h index bd85ce5..0bbd6e2 100644 --- a/src/GenericStore.h +++ b/src/GenericStore.h @@ -79,7 +79,7 @@ class GenericStore long Add(T value) { // Get write lock - std::unique_lock(mutex); + std::unique_lock lock(mutex); if (auto equal = FindEqual(value)) return equal; @@ -103,7 +103,7 @@ class GenericStore bool Remove(long value_id) { // Get write lock - std::unique_lock(mutex); + std::unique_lock lock(mutex); auto lookup = forward_lookup.find(value_id); if (lookup == forward_lookup.end()) @@ -129,7 +129,7 @@ class GenericStore T Find(long value_id) { // Get read lock - std::shared_lock(mutex); + std::shared_lock lock(mutex); auto lookup = forward_lookup.find(value_id); if (lookup == forward_lookup.end()) @@ -148,7 +148,7 @@ class GenericStore long ReverseFind(T value) { // Get read lock - std::shared_lock(mutex); + std::shared_lock lock(mutex); // Reverse lookup is only used internally by the interface so insert the // object if it's not already present. This avoids having to add this logic @@ -168,7 +168,7 @@ class GenericStore const std::vector List(void) { // Get read lock - std::shared_lock(mutex); + std::shared_lock lock(mutex); std::vector result; for (auto it : forward_lookup) diff --git a/src/SchemaStore.cpp b/src/SchemaStore.cpp index 6ad27d2..b95a1b9 100644 --- a/src/SchemaStore.cpp +++ b/src/SchemaStore.cpp @@ -84,7 +84,7 @@ K schemaFields(K schema_id) auto fields = schema->fields(); K k_fields = ktn(KI, fields.size()); - for (auto i = 0; i < fields.size(); ++i) + for (auto i = 0ul; i < fields.size(); ++i) kI(k_fields)[i] = kx::arrowkdb::GetFieldStore()->ReverseFind(fields[i]); return k_fields; @@ -145,7 +145,7 @@ K inferSchema(K table) K k_array_data = kK(dict)[1]; assert(k_array_data->n == field_names.size()); arrow::FieldVector fields; - for (auto i = 0; i < field_names.size(); ++i) { + for (auto i = 0ul; i < field_names.size(); ++i) { auto datatype = kx::arrowkdb::GetArrowType(kK(k_array_data)[i]); // Construct each arrow field From b9d52e20f9c7b61a5df47b1afc03a65e46b59ef1 Mon Sep 17 00:00:00 2001 From: nmcdonnell-kx Date: Mon, 31 Oct 2022 19:32:23 +0000 Subject: [PATCH 3/4] * Support latest v2 parquet file formats * Upgrade xcode version --- .travis.yml | 2 +- README.md | 2 +- docs/reference.md | 4 ++-- src/TableData.cpp | 9 +++++++++ src/TableData.h | 19 ++++++++++++++----- 5 files changed, 27 insertions(+), 9 deletions(-) diff --git a/.travis.yml b/.travis.yml index 3237bc0..7deb2e9 100644 --- a/.travis.yml +++ b/.travis.yml @@ -4,7 +4,7 @@ jobs: os: linux - dist: focal os: linux - - osx_image: xcode10.2 + - osx_image: xcode12.5 os: osx - os: windows language: c diff --git a/README.md b/README.md index c9adc8e..d48f31e 100644 --- a/README.md +++ b/README.md @@ -43,7 +43,7 @@ Conversely, Arrow is an in-memory format meant for direct and efficient use for - kdb+ ≥ 3.5 64-bit (Linux/MacOS/Windows) - Apache Arrow = 9.0.0 (or ≥ 2.0.0 if building `arrowkdb` from source) -- C++11 or later +- C++14 or later - CMake ≥ 3.1.3 diff --git a/docs/reference.md b/docs/reference.md index 4a042e5..e150c04 100644 --- a/docs/reference.md +++ b/docs/reference.md @@ -2061,7 +2061,7 @@ The mixed list of Arrow array data should be ordered in schema field number and Supported options: - `PARQUET_CHUNK_SIZE` - Controls the approximate size of encoded data pages within a column chunk. Long, default 1MB. -- `PARQUET_VERSION` - Select the Parquet format version, either `V1.0` or `V2.0`. `V2.0` is more fully featured but may be incompatible with older Parquet implementations. String, default `V1.0` +- `PARQUET_VERSION` - Select the Parquet format version: `V1.0`, `V2.0`, `V2.4`, `V2.6` or `V2.LATEST`. Later versions are more fully featured but may be incompatible with older Parquet implementations. Default `V1.0` - `DECIMAL128_AS_DOUBLE` - Flag indicating whether to override the default type mapping for the Arrow decimal128 datatype and instead represent it as a double (9h). Long, default 0. ??? warning "The Parquet format is compressed and designed for for maximum space efficiency which may cause a performance overhead compared to Arrow. Parquet is also less fully featured than Arrow which can result in schema limitations" @@ -2099,7 +2099,7 @@ returns generic null on success Supported options: - `PARQUET_CHUNK_SIZE` - Controls the approximate size of encoded data pages within a column chunk. Long, default 1MB. -- `PARQUET_VERSION` - Select the Parquet format version, either `V1.0` or `V2.0`. `V2.0` is more fully featured but may be incompatible with older Parquet implementations. String, default `V1.0` +- `PARQUET_VERSION` - Select the Parquet format version: `V1.0`, `V2.0`, `V2.4`, `V2.6` or `V2.LATEST`. Later versions are more fully featured but may be incompatible with older Parquet implementations. Default `V1.0` ??? warning "Inferred schemas only support a subset of the Arrow datatypes and is considerably less flexible than creating them with the datatype/field/schema constructors" diff --git a/src/TableData.cpp b/src/TableData.cpp index 8cbeb73..0deeb88 100644 --- a/src/TableData.cpp +++ b/src/TableData.cpp @@ -157,6 +157,15 @@ K writeParquet(K parquet_file, K schema_id, K array_data, K options) if (parquet_version == "V2.0") { parquet_props_builder.version(parquet::ParquetVersion::PARQUET_2_0); parquet_props_builder.data_page_version(parquet::ParquetDataPageVersion::V2); + } else if (parquet_version == "V2.4") { + parquet_props_builder.version(parquet::ParquetVersion::PARQUET_2_4); + parquet_props_builder.data_page_version(parquet::ParquetDataPageVersion::V2); + } else if (parquet_version == "V2.6") { + parquet_props_builder.version(parquet::ParquetVersion::PARQUET_2_6); + parquet_props_builder.data_page_version(parquet::ParquetDataPageVersion::V2); + } else if (parquet_version == "V2.LATEST") { + parquet_props_builder.version(parquet::ParquetVersion::PARQUET_2_LATEST); + parquet_props_builder.data_page_version(parquet::ParquetDataPageVersion::V2); } else { // Not using v2.0 so map timestamp(ns) to timestamp(us) with truncation arrow_props_builder.coerce_timestamps(arrow::TimeUnit::MICRO); diff --git a/src/TableData.h b/src/TableData.h index 1f837ee..35f42b3 100644 --- a/src/TableData.h +++ b/src/TableData.h @@ -87,9 +87,10 @@ extern "C" * PARQUET_CHUNK_SIZE (long) - Controls the approximate size of encoded data * pages within a column chunk. Default 1MB * - * PARQUET_VERSION (string) - Selects the Parquet format version, either - * `V1.0` or `V2.0`. `V2.0` is more fully featured but may be incompatible - * with older Parquet implementations. Default `V1.0` + * PARQUET_VERSION (string) - Selects the Parquet format version: `V1.0`, + * `V2.0`, `V2.4`, `V2.6` or `V2.LATEST`. Later versions are more fully + * featured but may be incompatible with older Parquet implementations. + * Default `V1.0` * * DECIMAL128_AS_DOUBLE (long) - Flag indicating whether to override the * default type mapping for the arrow decimal128 datatype and instead @@ -170,14 +171,22 @@ extern "C" * * Supported options: * + * PARQUET_MULTITHREADED_READ (long) - Flag indicating whether the parquet + * reader should run in multithreaded mode. This can improve performance by + * processing multiple columns in parallel. Default 0 + * + * USE_MMAP (long) - Flag indicating whether the parquet file should be memory + * mapped in. This can improve performance on systems which support mmap. + * Default 0 + * * DECIMAL128_AS_DOUBLE (long) - Flag indicating whether to override the * default type mapping for the arrow decimal128 datatype and instead * represent it as a double (9h). Default 0. * * @param parquet_file String name of the parquet file to read - * @param row_groups Integer list (6h) of row groups indicies to read, or + * @param row_groups Integer list (6h) of row groups indices to read, or * generic null (::) to read all row groups - * @param columns Integer list (6h) of column indicies to read, or + * @param columns Integer list (6h) of column indices to read, or * generic null (::) to read all columns * @options Dictionary of options or generic null (::) to use * defaults. Dictionary key must be a 11h list. Values list can be 7h, 11h or From 501e497729bfa3e071ae3a518f2aeeb72f399a9b Mon Sep 17 00:00:00 2001 From: nmcdonnell-kx Date: Tue, 1 Nov 2022 13:56:51 +0000 Subject: [PATCH 4/4] Bug fix --- README.md | 2 +- docs/reference.md | 5 +++++ src/GenericStore.h | 44 +++++++++++++++++++++++++++++--------------- 3 files changed, 35 insertions(+), 16 deletions(-) diff --git a/README.md b/README.md index d48f31e..bd271ef 100644 --- a/README.md +++ b/README.md @@ -42,7 +42,7 @@ Conversely, Arrow is an in-memory format meant for direct and efficient use for ### Requirements - kdb+ ≥ 3.5 64-bit (Linux/MacOS/Windows) -- Apache Arrow = 9.0.0 (or ≥ 2.0.0 if building `arrowkdb` from source) +- Apache Arrow = 9.0.0 (or ≥ 6.0.0 if building `arrowkdb` from source) - C++14 or later - CMake ≥ 3.1.3 diff --git a/docs/reference.md b/docs/reference.md index e150c04..d33c10f 100644 --- a/docs/reference.md +++ b/docs/reference.md @@ -156,6 +156,11 @@ These functions are exposed within the `.arrowkdb` namespace, allowing users to kdb+ list [pq.readParquetToTable](#pqreadparquettotable) Read an Arrow table from a Parquet file and convert to a kdb+ table + [pq.readParquetNumRowGroups](#pqreadparquetnumrowgroups) Read the number of row groups used by a Parquet file + [pq.readParquetRowGroups](#pqreadparquetrowgroups) Read a set of row groups from a Parquet file into an Arrow + table then convert to a kdb+ mixed list of array data + [pq.readParquetRowGroupsToTable](#pqreadparquetrowgroupstotable) Read a set of row groups from a Parquet file into an Arrow + table then convert to a kdb+ table [Arrow IPC files](#arrow-ipc-files) [ipc.writeArrow](#ipcwritearrow) Convert a kdb+ mixed list of array data to an Arrow table diff --git a/src/GenericStore.h b/src/GenericStore.h index 0bbd6e2..4e15d21 100644 --- a/src/GenericStore.h +++ b/src/GenericStore.h @@ -55,6 +55,29 @@ class GenericStore return 0; } + /** + * @brief Adds an arrow object to the lookup maps. If an existing equal + * object is already present it will return the identifier for that instead. + * This avoid polluting the store with multiple equal objects. + * + * @param value Arrow object to add + * @return Identifier for that object + */ + long AddInternal(T value) + { + if (auto equal = FindEqual(value)) + return equal; + + // Add forward lookup: long > value + long value_id = ++counter; + forward_lookup[value_id] = value; + + // Add reverse lookup: value > long + reverse_lookup[value] = value_id; + + return value_id; + } + public: /** * @brief Returns the singlton instance, constructing it not already existing @@ -81,17 +104,7 @@ class GenericStore // Get write lock std::unique_lock lock(mutex); - if (auto equal = FindEqual(value)) - return equal; - - // Add forward lookup: long > value - long value_id = ++counter; - forward_lookup[value_id] = value; - - // Add reverse lookup: value > long - reverse_lookup[value] = value_id; - - return value_id; + return AddInternal(value); } /** @@ -147,15 +160,16 @@ class GenericStore */ long ReverseFind(T value) { - // Get read lock - std::shared_lock lock(mutex); + // Get write lock + std::unique_lock lock(mutex); // Reverse lookup is only used internally by the interface so insert the // object if it's not already present. This avoids having to add this logic // into all the calling functions. auto lookup = reverse_lookup.find(value); - if (lookup == reverse_lookup.end()) - return Add(value); + if (lookup == reverse_lookup.end()) { + return AddInternal(value); + } else return lookup->second; }