From 0c3fcfc90ac1276c2f39461df813d9d0a9fb8b0d Mon Sep 17 00:00:00 2001
From: Georgi Petrov <32372905+G-D-Petrov@users.noreply.github.com>
Date: Tue, 14 Jan 2025 17:50:13 +0200
Subject: [PATCH] 2048 add support for numpy 2 (#2050)
#### Reference Issues/PRs
#2048
#### What does this implement or fix?
There quite a few distinct changes as most of the work was in the CI and
testing.
I will try to separate the different changes here:
- Numpy 2 Support:
- [support for new c++
interface](https://github.com/man-group/ArcticDB/pull/2050/files#diff-d23d2e3a46aa783e6c27de2ff06123f6149cda2727b09eb48e0755bc29a3ede2)
- [small change for how == and is are
evaluated](https://github.com/man-group/ArcticDB/pull/2050/files#diff-8d6759b6a2a11f86f70ee321e943d6c8e7898d4419ed281ad5a7358426a49a9a)
- [added some more checks during
normalization](https://github.com/man-group/ArcticDB/pull/2050/files#diff-fa52ad937e8dc0daf708927a99994e7264fd9cb62557326919c023696808859eL121)
- changes to tests to support new Numpy conventions (all of the changes
to the pytests)
- Pybind11 2.13 Support (Needed for numpy2):
- removed python 3.6 and added 3.12 (relevant changes
[here](https://github.com/man-group/ArcticDB/pull/2050/files#diff-5c3fa597431eda03ac3339ae6bf7f05e1a50d6fc7333679ec38e21b337cb6721L30),
[here
](https://github.com/man-group/ArcticDB/pull/2050/files#diff-ea07d77332dfd2427efe1d2148bec3d99c7dd3c06160f7975d5ad53069b9083f),
and
[here](https://github.com/man-group/ArcticDB/pull/2050/files#diff-ea07d77332dfd2427efe1d2148bec3d99c7dd3c06160f7975d5ad53069b9083f)
- Added new version of pybind11 and made changes to support the new
interface:
- [changed
PythonUtils](https://github.com/man-group/ArcticDB/pull/2050/files#diff-57acdaae55064408141a79c555b1fe7c4f2d4a6ce7f7b291e814b36abe4bdbce)
to add more info about the Python executable/libs neded for the newer
version of pybind11
- Changes to support new pybind11 interfaces (relevant changes
[here](https://github.com/man-group/ArcticDB/pull/2050/files#diff-9357e7e6906b72682dbae796b1a310682b3f7b334e3bba0cfee71c08f2e8cf57),
[here](https://github.com/man-group/ArcticDB/pull/2050/files#diff-dc9806fb1c292ea747ea6f0d333206c084159dcf6fc94919b98bfce3fd90d8c8),
[here](https://github.com/man-group/ArcticDB/pull/2050/files#diff-02770184046bcf181939ea70f927d1f835687e57fbcc843a71a500684674b8e9),
[here](https://github.com/man-group/ArcticDB/pull/2050/files#diff-87b188a9b8a3a1ab16c10f975faa03e7f33e42347eb3019505b240432a4502e6),
[here](https://github.com/man-group/ArcticDB/pull/2050/files#diff-6caec3c44be32d3a881cccbff1f03bbf8aedcdbf65e6e0c2c963e34bc675a246),
[here](https://github.com/man-group/ArcticDB/pull/2050/files#diff-6c2d76c22148e822d651baf5e8c3c8e23528d7360afe98d3ba085104e46237db),
[here](https://github.com/man-group/ArcticDB/pull/2050/files#diff-c7f1753eda666a2b8fd8c840f0ed2be21b809132b58ea6adbb14f49afb34039f)
and
[here](https://github.com/man-group/ArcticDB/pull/2050/files#diff-ed781e679ed39eaca3b7b13549967958654ab47eb42f920bfa5535933a1c097b)
- Changes for testing: (also added tickets for these):
- [Changed the persistent tests to use http instead of https, because
the sts is broken on windows
](https://github.com/man-group/ArcticDB/pull/2050/files#diff-9c13e1c7a3ca0bbad7e24e48944e815de7bfe7ed57584ce31b0549eb7be6c94dL3)
- #2080
- [change for persistent tests to use only encoding version 1, so as to
not write the encoding_version to the cfg, because older versions cannot
read
it](https://github.com/man-group/ArcticDB/pull/2050/files#diff-11d84f04de731d90230614eaaeb91eb376a0b00061ce90dacbef195db6125005)
- #2077
- [skip for storage lock test because it is
flaky](https://github.com/man-group/ArcticDB/pull/2050/files#diff-f0c82624218a7a1ad65345a6e6ae054e689288ded7b0670999c5eabb9a1c89d9)
- #2079
- [change finalize stage data to not be run against real s3 because it
is too slow and it times
out](https://github.com/man-group/ArcticDB/pull/2050/files#diff-67a5a1e0b68a1442a89e31f917eeaf651954e0b389d15c0ce0a6e7fd80706574)
- #2078
I've also tested in against the full suite of tests incl the persistent
ones (run
[here](https://github.com/man-group/ArcticDB/actions/runs/12388677078/job/34581688721))
**N.B.:** We need to continue using the submodule on pybind11 instead of
getting it from vcpkg, because the latest version of pybind11 are not
available on its vcpkg distribution.
Specifically, the versions that introduce the Numpy 2 Support.
#### Any other comments?
#### Checklist
Checklist for code changes...
- [ ] Have you updated the relevant docstrings, documentation and
copyright notice?
- [ ] Is this contribution tested against [all ArcticDB's
features](../docs/mkdocs/docs/technical/contributing.md)?
- [ ] Do all exceptions introduced raise appropriate [error
messages](https://docs.arcticdb.io/error_messages/)?
- [ ] Are API changes highlighted in the PR description?
- [ ] Is the PR labelled as enhancement or bug so it appears in
autogenerated release notes?
---------
Co-authored-by: phoebusm
---
.../action.yml | 2 +-
.github/workflows/build.yml | 19 +-
.github/workflows/persistent_storage.yml | 6 +
.gitignore | 1 +
.../requirements-compatibility-py311.txt | 2 +
.../requirements-compatibility-py36.txt | 7 -
build_tooling/transform_asv_results.py | 8 +-
cpp/CMake/PythonUtils.cmake | 9 +-
cpp/arcticdb/CMakeLists.txt | 4 +-
cpp/arcticdb/entity/native_tensor.hpp | 4 +-
cpp/arcticdb/pipeline/frame_utils.hpp | 7 +-
cpp/arcticdb/python/python_handlers.cpp | 7 +-
cpp/arcticdb/python/python_module.cpp | 14 +-
cpp/arcticdb/python/python_strings.cpp | 4 +-
cpp/arcticdb/python/python_strings.hpp | 32 +-
.../python/python_to_tensor_frame.cpp | 32 +-
cpp/arcticdb/python/python_utils.hpp | 43 ++-
cpp/arcticdb/util/gil_safe_py_none.cpp | 26 ++
cpp/arcticdb/util/gil_safe_py_none.hpp | 31 ++
cpp/third_party/pybind11 | 2 +-
environment_unix.yml | 9 +-
pyproject.toml | 8 +
python/arcticdb/util/_versions.py | 5 +
python/arcticdb/util/test.py | 7 +-
.../arcticdb/version_store/_normalization.py | 106 +++++--
python/arcticdb/version_store/_store.py | 170 ++++++-----
.../hypothesis/arcticdb/test_sort_merge.py | 2 +-
.../tests/integration/arcticdb/test_arctic.py | 1 +
.../arcticdb/test_persistent_storage.py | 7 +-
.../integration/arcticdb/test_storage_lock.py | 6 +-
.../version_store/test_basic_version_store.py | 14 +-
.../arcticdb/version_store/test_mem_leaks.py | 289 +++++++++---------
.../test_stress_finalize_staged_data.py | 74 ++---
.../arcticdb/version_store/test_append.py | 19 +-
.../arcticdb/version_store/test_filtering.py | 4 +-
python/tests/util/mark.py | 7 +-
python/tests/util/storage_test.py | 2 +-
setup.cfg | 3 +-
38 files changed, 564 insertions(+), 429 deletions(-)
create mode 100644 build_tooling/requirements-compatibility-py311.txt
delete mode 100644 build_tooling/requirements-compatibility-py36.txt
create mode 100644 cpp/arcticdb/util/gil_safe_py_none.cpp
create mode 100644 cpp/arcticdb/util/gil_safe_py_none.hpp
diff --git a/.github/actions/set_persistent_storage_env_vars/action.yml b/.github/actions/set_persistent_storage_env_vars/action.yml
index 1c7c5e5cf3..48f93799a3 100644
--- a/.github/actions/set_persistent_storage_env_vars/action.yml
+++ b/.github/actions/set_persistent_storage_env_vars/action.yml
@@ -2,7 +2,7 @@ name: 'Set Persistent storages env variables'
description: 'Set the necessary variables for Persistent storage tests'
inputs:
bucket: {default: 'arcticdb-ci-test-bucket-02', type: string, description: The name of the S3 bucket that we will test against}
- endpoint: {default: 'https://s3.eu-west-1.amazonaws.com', type: string, description: The address of the S3 endpoint}
+ endpoint: {default: 'http://s3.eu-west-1.amazonaws.com', type: string, description: The address of the S3 endpoint}
region: {default: 'eu-west-1', type: string, description: The S3 region of the bucket}
aws_access_key: {required: true, type: string, description: The value for the AWS Access key}
aws_secret_key: {required: true, type: string, description: The value for the AWS Secret key}
diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml
index 29a79511b8..0043661b54 100644
--- a/.github/workflows/build.yml
+++ b/.github/workflows/build.yml
@@ -36,7 +36,7 @@ jobs:
uses: ./.github/workflows/cibw_docker_image.yml
permissions: {packages: write}
with:
- cibuildwheel_ver: "2.12.1"
+ cibuildwheel_ver: "2.21.3"
force_update: false
common_config:
@@ -163,24 +163,23 @@ jobs:
strategy:
fail-fast: false
matrix:
- python3: ${{fromJson(vars.LINUX_PYTHON_VERSIONS || '[6, 7, 8, 9, 10, 11]')}}
+ python3: ${{fromJson(vars.LINUX_PYTHON_VERSIONS || '[7, 8, 9, 10, 11, 12]')}}
include:
- python_deps_ids: [""]
matrix_override: ${{fromJson(needs.common_config.outputs.linux_matrix)}}
pytest_xdist_mode: "--dist worksteal"
- - python3: 6
- python_deps_ids: ["", -compat36]
- matrix_override:
- - ${{fromJson(needs.common_config.outputs.linux_matrix)[0]}}
- - python_deps_id: -compat36
- python_deps: requirements-compatibility-py36.txt
- pytest_xdist_mode: "" # worksteal Not supported on Python 3.6
- python3: 8
python_deps_ids: ["", -compat38]
matrix_override:
- ${{fromJson(needs.common_config.outputs.linux_matrix)[0]}}
- python_deps_id: -compat38
python_deps: requirements-compatibility-py38.txt
+ - python3: 11
+ python_deps_ids: ["", -compat311]
+ matrix_override:
+ - ${{fromJson(needs.common_config.outputs.linux_matrix)[0]}}
+ - python_deps_id: -compat311
+ python_deps: requirements-compatibility-py311.txt
name: 3.${{matrix.python3}} Linux
uses: ./.github/workflows/build_steps.yml
secrets: inherit
@@ -212,7 +211,7 @@ jobs:
strategy:
fail-fast: false
matrix:
- python3: ${{fromJson(vars.WINDOWS_PYTHON_VERSIONS || '[7, 8, 9, 10, 11]')}}
+ python3: ${{fromJson(vars.WINDOWS_PYTHON_VERSIONS || '[7, 8, 9, 10, 11, 12]')}}
name: 3.${{matrix.python3}} Windows
uses: ./.github/workflows/build_steps.yml
secrets: inherit
diff --git a/.github/workflows/persistent_storage.yml b/.github/workflows/persistent_storage.yml
index 1aed985597..0355307216 100644
--- a/.github/workflows/persistent_storage.yml
+++ b/.github/workflows/persistent_storage.yml
@@ -40,6 +40,12 @@ jobs:
- name: Checkout
uses: actions/checkout@v3.3.0
+ # The latest version of arcticdb will no longer publish wheels for Python 3.6
+ # So we skip the seed if we are testing the latest version with Python 3.6
+ - name: Skip if unsupported
+ if: inputs.arcticdb_version == 'latest' && inputs.python3 == '6'
+ run: exit 0
+
- name: Select Python (Linux)
if: matrix.os == 'linux'
run: echo /opt/python/${{env.python_impl_name}}*/bin >> $GITHUB_PATH
diff --git a/.gitignore b/.gitignore
index 0d6fc84887..1fb834155b 100644
--- a/.gitignore
+++ b/.gitignore
@@ -4,6 +4,7 @@ __pycache__/
.mypy_cache
.hypothesis/
*.egg-info/
+python/venvs/
.vscode/
.vs/
diff --git a/build_tooling/requirements-compatibility-py311.txt b/build_tooling/requirements-compatibility-py311.txt
new file mode 100644
index 0000000000..96867ac468
--- /dev/null
+++ b/build_tooling/requirements-compatibility-py311.txt
@@ -0,0 +1,2 @@
+# Makes sure we are able to use Numpy 1
+numpy<2
diff --git a/build_tooling/requirements-compatibility-py36.txt b/build_tooling/requirements-compatibility-py36.txt
deleted file mode 100644
index fa4d6564d7..0000000000
--- a/build_tooling/requirements-compatibility-py36.txt
+++ /dev/null
@@ -1,7 +0,0 @@
-# The oldest tested deps, for Python 3.6
-pandas~=0.22
-numpy~=1.14
-protobuf~=3.8
-attrs~=19.2
-msgpack~=0.5
-prometheus_client~=0.14
diff --git a/build_tooling/transform_asv_results.py b/build_tooling/transform_asv_results.py
index a464f846e4..bcfe977d13 100644
--- a/build_tooling/transform_asv_results.py
+++ b/build_tooling/transform_asv_results.py
@@ -37,9 +37,7 @@ def df_to_asv_json(results_df: pd.DataFrame):
1 basic_functions.BasicFunctions.time_read_batch [[0.2775141046000044, 0.597266279600126, 0.379... ... {'': 515.5997927188873, ' pd.DataFrame:
results_list = []
for test_name, test_results in data["results"].items():
- flattened_data = pd.json_normalize(
- {"test_name": test_name, "results": str(test_results)}
- )
+ flattened_data = pd.json_normalize({"test_name": test_name, "results": str(test_results)})
flattened_data["commit_hash"] = data["commit_hash"]
flattened_data["env_name"] = data["env_name"]
flattened_data["date"] = data["date"]
diff --git a/cpp/CMake/PythonUtils.cmake b/cpp/CMake/PythonUtils.cmake
index c472d468da..0def4b5ba1 100644
--- a/cpp/CMake/PythonUtils.cmake
+++ b/cpp/CMake/PythonUtils.cmake
@@ -1,4 +1,3 @@
-
# Helpers
function(_python_utils_dump_vars_targets _target _props)
if(TARGET ${_target})
@@ -102,7 +101,7 @@ endfunction()
# Enhanced FindPython
if(DEFINED ARCTICDB_FIND_PYTHON_DEV_MODE)
- message("Using supplied ARCTICDB_FIND_PYTHON_DEV_MODE=${ARCTICDB_FIND_PYTHON_DEV_MODE}.")
+ message(STATUS "Using supplied ARCTICDB_FIND_PYTHON_DEV_MODE=${ARCTICDB_FIND_PYTHON_DEV_MODE}.")
if("${ARCTICDB_FIND_PYTHON_DEV_MODE}" STREQUAL "pybind11")
set(ARCTICDB_FIND_PYTHON_DEV_MODE "")
endif()
@@ -125,6 +124,7 @@ if(ARCTICDB_FIND_PYTHON_DEV_MODE)
if(NOT Python_EXECUTABLE AND NOT Python_ROOT_DIR AND NOT Python_LIBRARY)
# FindPython searches the PATH environment last, but that's arguably the only correct place it should look
find_program(Python_EXECUTABLE NAMES python3 python NAMES_PER_DIR REQUIRED NO_CMAKE_SYSTEM_PATH)
+ set(PYTHON_EXECUTABLE ${Python_EXECUTABLE} CACHE FILEPATH "Python executable found by FindPython")
else()
set(Python_FIND_STRATEGY LOCATION)
endif()
@@ -132,6 +132,8 @@ if(ARCTICDB_FIND_PYTHON_DEV_MODE)
# Let CMake find Python without telling it the BUILD_PYTHON_VERSION we wanted. This way we know third-party stuff that
# is not aware of BUILD_PYTHON_VERSION is going to find the same thing
find_package(Python 3 COMPONENTS Interpreter ${ARCTICDB_FIND_PYTHON_DEV_MODE} REQUIRED)
+ set(PYTHON_INCLUDE_DIRS ${Python_INCLUDE_DIRS})
+
python_utils_dump_vars_if_enabled("After our FindPython before any third-party:")
if(DEFINED Python_FIND_ABI)
@@ -146,7 +148,7 @@ if(ARCTICDB_FIND_PYTHON_DEV_MODE)
MAP_IMPORTED_CONFIG_RELWITHDEBINFO ";RELEASE"
)
endif()
-
+ set(PYBIND11_FINDPYTHON OFF)
else()
set(ARCTICDB_PYTHON_PREFIX PYTHON)
python_utils_check_include_dirs("supplied")
@@ -158,3 +160,4 @@ else()
set(PYBIND11_FINDPYTHON OFF)
endif()
+
diff --git a/cpp/arcticdb/CMakeLists.txt b/cpp/arcticdb/CMakeLists.txt
index 9b88cf8acf..967fc08129 100644
--- a/cpp/arcticdb/CMakeLists.txt
+++ b/cpp/arcticdb/CMakeLists.txt
@@ -62,7 +62,7 @@ else()
add_compile_definitions(_LIBCPP_DISABLE_AVAILABILITY)
SET(HIDE_LINKED_SYMBOLS OFF)
-
+
find_package(pybind11 REQUIRED)
find_package(PCRE REQUIRED)
find_package(Libevent REQUIRED)
@@ -381,6 +381,7 @@ set(arcticdb_srcs
util/lazy.hpp
util/type_traits.hpp
util/variant.hpp
+ util/gil_safe_py_none.hpp
version/de_dup_map.hpp
version/op_log.hpp
version/schema_checks.hpp
@@ -515,6 +516,7 @@ set(arcticdb_srcs
util/type_handler.cpp
version/key_block.hpp
version/key_block.cpp
+ util/gil_safe_py_none.cpp
version/local_versioned_engine.cpp
version/op_log.cpp
version/snapshot.cpp
diff --git a/cpp/arcticdb/entity/native_tensor.hpp b/cpp/arcticdb/entity/native_tensor.hpp
index 74cc9f99c6..4a47bb40fe 100644
--- a/cpp/arcticdb/entity/native_tensor.hpp
+++ b/cpp/arcticdb/entity/native_tensor.hpp
@@ -119,14 +119,16 @@ struct NativeTensor {
[[nodiscard]] auto expanded_dim() const { return expanded_dim_; }
template
const T *ptr_cast(size_t pos) const {
+ util::check(ptr != nullptr, "Unexpected null ptr in NativeTensor");
const bool dimension_condition = ndim() == 1;
const bool elsize_condition = elsize_ != 0;
- const bool strides_condition = strides_[0] % elsize_ == 0;
+ const bool strides_condition = (elsize_condition) && (strides_[0] % elsize_ == 0);
util::warn(dimension_condition, "Cannot safely ptr_cast matrices in NativeTensor");
util::warn(elsize_condition, "Cannot safely ptr_cast when elsize_ is zero in NativeTensor");
util::warn(strides_condition,
"Cannot safely ptr_cast to type of size {} when strides ({}) is not a multiple of elsize ({}) in NativeTensor with dtype {}",
sizeof(T), strides_[0], elsize_, data_type());
+
int64_t signed_pos = pos;
if (dimension_condition && elsize_condition && strides_condition) {
signed_pos *= strides_[0] / elsize_;
diff --git a/cpp/arcticdb/pipeline/frame_utils.hpp b/cpp/arcticdb/pipeline/frame_utils.hpp
index 125156dab0..cc7151ad44 100644
--- a/cpp/arcticdb/pipeline/frame_utils.hpp
+++ b/cpp/arcticdb/pipeline/frame_utils.hpp
@@ -20,9 +20,10 @@
#include
#include
#include
-#include
#include
#include
+#include
+#include
namespace arcticdb {
@@ -135,7 +136,7 @@ std::optional aggregator_set_data(
if (!c_style)
ptr_data = flatten_tensor(flattened_buffer, rows_to_write, tensor, slice_num, regular_slice_size);
- auto none = py::none{};
+ auto none = GilSafePyNone::instance();
std::variant wrapper_or_error;
// GIL will be acquired if there is a string that is not pure ASCII/UTF-8
// In this case a PyObject will be allocated by convert::py_unicode_to_buffer
@@ -147,7 +148,7 @@ std::optional aggregator_set_data(
auto out_ptr = reinterpret_cast(column.buffer().data());
auto& string_pool = agg.segment().string_pool();
for (size_t s = 0; s < rows_to_write; ++s, ++ptr_data) {
- if (*ptr_data == none.ptr()) {
+ if (*ptr_data == none->ptr()) {
*out_ptr++ = not_a_string();
} else if(is_py_nan(*ptr_data)){
*out_ptr++ = nan_placeholder();
diff --git a/cpp/arcticdb/python/python_handlers.cpp b/cpp/arcticdb/python/python_handlers.cpp
index f6e754c42a..319ff6153d 100644
--- a/cpp/arcticdb/python/python_handlers.cpp
+++ b/cpp/arcticdb/python/python_handlers.cpp
@@ -25,19 +25,18 @@ namespace arcticdb {
static inline PyObject** fill_with_none(PyObject** ptr_dest, size_t count, SpinLock& spin_lock) {
- auto none = py::none();
+ auto none = GilSafePyNone::instance();
for(auto i = 0U; i < count; ++i)
- *ptr_dest++ = none.ptr();
+ *ptr_dest++ = none->ptr();
spin_lock.lock();
for(auto i = 0U; i < count; ++i)
- none.inc_ref();
+ Py_INCREF(none->ptr());
spin_lock.unlock();
return ptr_dest;
}
static inline PyObject** fill_with_none(ChunkedBuffer& buffer, size_t offset, size_t count, SpinLock& spin_lock) {
- auto none = py::none();
auto dest = buffer.ptr_cast(offset, count * sizeof(PyObject*));
return fill_with_none(dest, count, spin_lock);
}
diff --git a/cpp/arcticdb/python/python_module.cpp b/cpp/arcticdb/python/python_module.cpp
index 474e74bdeb..ad4b05330e 100644
--- a/cpp/arcticdb/python/python_module.cpp
+++ b/cpp/arcticdb/python/python_module.cpp
@@ -30,6 +30,7 @@
#include
#include
#include
+#include
#include
#include
@@ -213,26 +214,26 @@ void register_error_code_ecosystem(py::module& m, py::exceptionptr();
}
increment_none_refcount(diff, none);
diff --git a/cpp/arcticdb/python/python_strings.hpp b/cpp/arcticdb/python/python_strings.hpp
index 43c92316a7..d80f4c745b 100644
--- a/cpp/arcticdb/python/python_strings.hpp
+++ b/cpp/arcticdb/python/python_strings.hpp
@@ -16,6 +16,7 @@
#include
#include
#include
+#include
namespace arcticdb {
@@ -77,12 +78,6 @@ class DynamicStringReducer {
Py_INCREF(obj);
}
- [[nodiscard]] std::unique_ptr get_py_none() {
- std::lock_guard lock(handler_data_.spin_lock());
- auto none = std::make_unique(py::none{});
- return none;
- }
-
auto get_unique_counts(
const Column& column
) {
@@ -103,7 +98,7 @@ class DynamicStringReducer {
std::pair write_strings_to_destination(
size_t num_rows,
const Column& source_column,
- std::unique_ptr& none,
+ std::shared_ptr& none,
const ankerl::unordered_dense::map py_strings,
const std::optional& sparse_map) {
std::pair counts;
@@ -129,7 +124,7 @@ class DynamicStringReducer {
auto py_strings = assign_python_strings(unique_counts, has_type_conversion, string_pool);
ARCTICDB_SUBSAMPLE(WriteStringsToColumn, 0)
- auto none = get_py_none();
+ auto none = GilSafePyNone::instance();
auto [none_count, nan_count] = write_strings_to_destination(num_rows, source_column, none, py_strings, sparse_map);
increment_none_refcount(none_count, none);
increment_nan_refcount(nan_count);
@@ -182,7 +177,7 @@ class DynamicStringReducer {
}
}
}
- auto none = get_py_none();
+ auto none = GilSafePyNone::instance();
auto [none_count, nan_count] = write_strings_to_destination(num_rows, source_column, none, allocated, source_column.opt_sparse_map());
increment_none_refcount(none_count, none);
increment_nan_refcount(nan_count);
@@ -211,27 +206,24 @@ class DynamicStringReducer {
return py_strings;
}
- void increment_none_refcount(size_t none_count, py::none& none) {
+ void increment_none_refcount(size_t none_count, std::shared_ptr& none) {
+ util::check(none, "Got null pointer to py::none in increment_none_refcount");
std::lock_guard lock(handler_data_.spin_lock());
- for(auto i = 0u; i < none_count; ++i)
- Py_INCREF(none.ptr());
- }
-
- void increment_none_refcount(size_t none_count, std::unique_ptr& none) {
- util::check(static_cast(none), "Got null pointer to py::none in increment_none_refcount");
- increment_none_refcount(none_count, *none);
+ for(auto i = 0u; i < none_count; ++i) {
+ Py_INCREF(none->ptr());
+ }
}
void increment_nan_refcount(size_t none_count) {
std::lock_guard lock(handler_data_.spin_lock());
for(auto i = 0u; i < none_count; ++i)
- handler_data_.py_nan_->inc_ref();
+ Py_INCREF(handler_data_.py_nan_->ptr());
}
std::pair write_strings_to_column_dense(
size_t ,
const Column& source_column,
- const std::unique_ptr& none,
+ const std::shared_ptr& none,
const ankerl::unordered_dense::map& py_strings) {
auto data = source_column.data();
auto src = data.cbegin>, IteratorType::REGULAR, IteratorDensity::DENSE>();
@@ -256,7 +248,7 @@ class DynamicStringReducer {
std::pair write_strings_to_column_sparse(
size_t num_rows,
const Column& source_column,
- const std::unique_ptr& none,
+ const std::shared_ptr& none,
const ankerl::unordered_dense::map& py_strings,
const util::BitSet& sparse_map
) {
diff --git a/cpp/arcticdb/python/python_to_tensor_frame.cpp b/cpp/arcticdb/python/python_to_tensor_frame.cpp
index 105b1ad93d..f1f9fb386e 100644
--- a/cpp/arcticdb/python/python_to_tensor_frame.cpp
+++ b/cpp/arcticdb/python/python_to_tensor_frame.cpp
@@ -56,6 +56,19 @@ std::variant pystring_to_buffer(PyObject *
return {ValueType::BYTES, 8, 1};
}
+// Parse an array descriptor into type and elsize
+static std::tuple parse_array_descriptor(PyObject* obj) {
+ if (pybind11::detail::npy_api::get().PyArray_RUNTIME_VERSION_ < 0x12) {
+ ARCTICDB_DEBUG(log::version(), "Using numpy 1 API to get array descriptor");
+ auto descr = pybind11::detail::array_descriptor1_proxy(obj);
+ return {descr->kind, descr->elsize};
+ } else {
+ ARCTICDB_DEBUG(log::version(), "Using numpy 2 API to get array descriptor");
+ auto descr = pybind11::detail::array_descriptor2_proxy(obj);
+ return {descr->kind, descr->elsize};
+ }
+}
+
/// @brief Determine the type for column composed of arrays
/// In case column is composed of arrays all arrays must have the same element type. This iterates until if finds the
/// first non-empty array and returns its type.
@@ -71,12 +84,11 @@ std::variant pystring_to_buffer(PyObject *
}
const auto arr = pybind11::detail::array_proxy(*begin);
normalization::check(arr->nd == 1, "Only one dimensional arrays are supported in columns.");
- const auto descr = pybind11::detail::array_descriptor_proxy(arr->descr);
+
const ssize_t element_count = arr->dimensions[0];
if(element_count != 0) {
- ValueType value_type = get_value_type(descr->kind);
- const uint8_t value_bytes = static_cast(descr->elsize);
- return {value_type, value_bytes, 2};
+ const auto [kind, val_bytes] = parse_array_descriptor(arr->descr);
+ return {get_value_type(kind), static_cast(val_bytes), 2};
}
begin++;
}
@@ -115,14 +127,14 @@ NativeTensor obj_to_tensor(PyObject *ptr, bool empty_types) {
auto& api = pybind11::detail::npy_api::get();
util::check(api.PyArray_Check_(ptr), "Expected Python array");
const auto arr = pybind11::detail::array_proxy(ptr);
- const auto descr = pybind11::detail::array_descriptor_proxy(arr->descr);
+ const auto[kind, elsize] = parse_array_descriptor(arr->descr);
auto ndim = arr->nd;
const ssize_t size = ndim == 1 ? arr->dimensions[0] : arr->dimensions[0] * arr->dimensions[1];
// In Pandas < 2, empty series dtype is `"float"`, but as of Pandas 2.0, empty series dtype is `"object"`
// The Normalizer in Python cast empty `"float"` series to `"object"` so `EMPTY` is used here.
// See: https://github.com/man-group/ArcticDB/pull/1049
- auto val_type = size == 0 && empty_types ? ValueType::EMPTY : get_value_type(descr->kind);
- auto val_bytes = static_cast(descr->elsize);
+ auto val_type = size == 0 && empty_types ? ValueType::EMPTY : get_value_type(kind);
+ auto val_bytes = static_cast(elsize);
const int64_t element_count = ndim == 1 ? int64_t(arr->dimensions[0]) : int64_t(arr->dimensions[0]) * int64_t(arr->dimensions[1]);
const auto c_style = arr->strides[0] == val_bytes;
@@ -173,7 +185,7 @@ NativeTensor obj_to_tensor(PyObject *ptr, bool empty_types) {
if(current_object != end)
sample = *current_object;
}
- if (empty && descr->kind == 'O') {
+ if (empty && kind == 'O') {
val_type = empty_types ? ValueType::EMPTY : ValueType::UTF_DYNAMIC;
} else if(all_nans || is_unicode(sample)){
val_type = ValueType::UTF_DYNAMIC;
@@ -194,11 +206,11 @@ NativeTensor obj_to_tensor(PyObject *ptr, bool empty_types) {
// and we can't use `val_bytes` to get this information since some dtype have another `elsize` than 8.
const SizeBits size_bits = is_empty_type(val_type) ? SizeBits::S64 : get_size_bits(val_bytes);
const auto dt = combine_data_type(val_type, size_bits);
- const int64_t nbytes = element_count * descr->elsize;
+ const int64_t nbytes = element_count * elsize;
const void* data = nbytes ? arr->data : nullptr;
const std::array strides = {arr->strides[0], arr->nd > 1 ? arr->strides[1] : 0};
const std::array shapes = {arr->dimensions[0], arr->nd > 1 ? arr->dimensions[1] : 0};
- return {nbytes, arr->nd, strides.data(), shapes.data(), dt, descr->elsize, data, ndim};
+ return {nbytes, arr->nd, strides.data(), shapes.data(), dt, elsize, data, ndim};
}
std::shared_ptr py_ndf_to_frame(
diff --git a/cpp/arcticdb/python/python_utils.hpp b/cpp/arcticdb/python/python_utils.hpp
index 4c4ea638f0..f24228b1cb 100644
--- a/cpp/arcticdb/python/python_utils.hpp
+++ b/cpp/arcticdb/python/python_utils.hpp
@@ -16,6 +16,7 @@
#include
#include
#include
+#include
namespace py = pybind11;
@@ -104,14 +105,14 @@ inline void prefill_with_none(
SpinLock& spin_lock,
IncrementRefCount inc_ref_count = IncrementRefCount::ON) {
std::lock_guard lock(spin_lock);
- auto none = py::none();
+ auto none = GilSafePyNone::instance();
for (auto i = 0U; i < num_rows; ++i)
- *ptr_dest++ = none.ptr();
+ *ptr_dest++ = none->ptr();
if(inc_ref_count == IncrementRefCount::ON) {
auto none_count = num_rows - sparse_count;
for (auto j = 0U; j < none_count; ++j)
- none.inc_ref();
+ Py_INCREF(none->ptr());
}
spin_lock.unlock();
}
@@ -155,8 +156,13 @@ PyClass & add_repr(PyClass & py_class){
}
inline py::object &pd_Timestamp() {
- static py::object T = py::module::import("pandas").attr("Timestamp");
- return T;
+ PYBIND11_CONSTINIT static py::gil_safe_call_once_and_store storage;
+ auto &imported_obj = storage // Do NOT make this `static`!
+ .call_once_and_store_result([]() {
+ return py::module_::import("pandas").attr("Timestamp");
+ })
+ .get_stored();
+ return imported_obj;
}
inline bool from_pd_timestamp(const py::object &o, timestamp &ts) {
@@ -169,8 +175,13 @@ inline bool from_pd_timestamp(const py::object &o, timestamp &ts) {
}
inline py::object &dt_datetime() {
- static py::object T = py::module::import("datetime").attr("datetime");
- return T;
+ PYBIND11_CONSTINIT static py::gil_safe_call_once_and_store storage;
+ auto &imported_obj = storage // Do NOT make this `static`!
+ .call_once_and_store_result([]() {
+ return py::module_::import("datetime").attr("datetime");
+ })
+ .get_stored();
+ return imported_obj;
}
inline bool from_datetime(const py::object &o, timestamp &ts) {
@@ -182,8 +193,13 @@ inline bool from_datetime(const py::object &o, timestamp &ts) {
}
inline py::object &np_datetime64() {
- static py::object T = py::module::import("numpy").attr("datetime64");
- return T;
+ PYBIND11_CONSTINIT static py::gil_safe_call_once_and_store storage;
+ auto &imported_obj = storage // Do NOT make this `static`!
+ .call_once_and_store_result([]() {
+ return py::module_::import("numpy").attr("datetime64");
+ })
+ .get_stored();
+ return imported_obj;
}
inline bool from_dt64(const py::object &o, timestamp &ts) {
@@ -271,8 +287,13 @@ inline std::vector named_aggregators_from_dict(const std::unord
}
inline auto pd_to_offset(std::string_view rule) {
- static py::object to_offset = py::module::import("pandas").attr("tseries").attr("frequencies").attr("to_offset");
- return to_offset(rule).attr("nanos").cast();
+ PYBIND11_CONSTINIT static py::gil_safe_call_once_and_store storage;
+ auto &imported_obj = storage // Do NOT make this `static`!
+ .call_once_and_store_result([]() {
+ return py::module_::import("pandas").attr("tseries").attr("frequencies").attr("to_offset");
+ })
+ .get_stored();
+ return imported_obj(rule).attr("nanos").cast();
}
} // namespace arcticdb::python_util
diff --git a/cpp/arcticdb/util/gil_safe_py_none.cpp b/cpp/arcticdb/util/gil_safe_py_none.cpp
new file mode 100644
index 0000000000..aad75ce518
--- /dev/null
+++ b/cpp/arcticdb/util/gil_safe_py_none.cpp
@@ -0,0 +1,26 @@
+/* Copyright 2023 Man Group Operations Limited
+ *
+ * Use of this software is governed by the Business Source License 1.1 included in the file licenses/BSL.txt.
+ *
+ * As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0.
+ */
+
+
+#include
+#include
+
+using namespace arcticdb;
+
+std::shared_ptr GilSafePyNone::instance_;
+std::once_flag GilSafePyNone::init_flag_;
+
+
+void GilSafePyNone::init(){
+ pybind11::gil_scoped_acquire gil_lock;
+ instance_ = std::make_shared();
+};
+
+std::shared_ptr GilSafePyNone::instance(){
+ std::call_once(init_flag_, &init);
+ return instance_;
+};
\ No newline at end of file
diff --git a/cpp/arcticdb/util/gil_safe_py_none.hpp b/cpp/arcticdb/util/gil_safe_py_none.hpp
new file mode 100644
index 0000000000..f66485e2a4
--- /dev/null
+++ b/cpp/arcticdb/util/gil_safe_py_none.hpp
@@ -0,0 +1,31 @@
+/* Copyright 2023 Man Group Operations Limited
+ *
+ * Use of this software is governed by the Business Source License 1.1 included in the file licenses/BSL.txt.
+ *
+ * As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0.
+ */
+
+
+/* The class is introduced purely to selectively bypass the GIL check introduced in pybind11 v2.11.0.
+ * None in python is a global static. Therefore GIL is not needed.
+ * For other object, it's fine as long as GIL is held during allocation and deallocation, e.g. nan
+ * To bypass the check, we could define PYBIND11_NO_ASSERT_GIL_HELD_INCREF_DECREF to globally disable the check
+ */
+
+#pragma once
+
+#include
+#include
+#include
+
+namespace arcticdb {
+class GilSafePyNone {
+private:
+ static std::shared_ptr instance_;
+ static std::once_flag init_flag_;
+ static void init();
+public:
+ static std::shared_ptr instance();
+};
+
+} //namespace arcticdb
\ No newline at end of file
diff --git a/cpp/third_party/pybind11 b/cpp/third_party/pybind11
index 80dc998efc..a2e59f0e70 160000
--- a/cpp/third_party/pybind11
+++ b/cpp/third_party/pybind11
@@ -1 +1 @@
-Subproject commit 80dc998efced8ceb2be59756668a7e90e8bef917
+Subproject commit a2e59f0e7065404b44dfe92a28aca47ba1378dc4
diff --git a/environment_unix.yml b/environment_unix.yml
index 92535b81f3..3026955524 100644
--- a/environment_unix.yml
+++ b/environment_unix.yml
@@ -23,9 +23,7 @@ dependencies:
# TODO: Fix builds for missing symbols.
- libmongocxx <3.9
- zstd
- # TODO: pybind 2.11.X became stricter regarding the handling of reference counts
- # See: https://github.com/pybind/pybind11/issues/4748#issuecomment-1639445403
- - pybind11 <2.11
+ - pybind11
- pcre
- cyrus-sasl
- aws-sdk-cpp >=1.11.405
@@ -56,9 +54,10 @@ dependencies:
# Build dependencies for tests
- libarrow
# Python dependences
- - python
+ # Python 3.13+ is not yet supported
+ - python <3.13
- packaging
- - numpy <2
+ - numpy
- pandas
- attrs
- boto3
diff --git a/pyproject.toml b/pyproject.toml
index 4a00f801c4..94d7e5c162 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -25,6 +25,9 @@ build = "cp*-win_amd64"
before-all = "bash {project}/build_tooling/prep_cpp_build.sh"
before-build = "set"
+[tool.ruff]
+line-length = 120
+
[tool.black]
line-length = 120
target_version = ['py36', 'py37', 'py38', 'py39', 'py310', 'py311']
@@ -43,3 +46,8 @@ exclude = '''
| static
)/
'''
+
+[tool.pytest.ini_options]
+markers = [
+ "pipeline",
+]
\ No newline at end of file
diff --git a/python/arcticdb/util/_versions.py b/python/arcticdb/util/_versions.py
index fcfcd09fbf..571f17532e 100644
--- a/python/arcticdb/util/_versions.py
+++ b/python/arcticdb/util/_versions.py
@@ -5,10 +5,15 @@
As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0.
"""
+
import pandas as pd
+import numpy as np
from packaging import version
PANDAS_VERSION = version.parse(pd.__version__)
CHECK_FREQ_VERSION = version.Version("1.1")
IS_PANDAS_ZERO = PANDAS_VERSION < version.Version("1.0")
IS_PANDAS_TWO = PANDAS_VERSION >= version.Version("2.0")
+IS_NUMPY_TWO = (version.parse(np.__version__) >= version.Version("2.0")) and (
+ version.parse(np.__version__) < version.Version("3.0")
+)
diff --git a/python/arcticdb/util/test.py b/python/arcticdb/util/test.py
index fdc922665c..202571d4d6 100644
--- a/python/arcticdb/util/test.py
+++ b/python/arcticdb/util/test.py
@@ -352,8 +352,11 @@ def random_integers(size, dtype):
platform_int_info = np.iinfo("int_")
iinfo = np.iinfo(dtype)
return np.random.randint(
- max(iinfo.min, platform_int_info.min), min(iinfo.max, platform_int_info.max), size=size
- ).astype(dtype)
+ max(iinfo.min, platform_int_info.min),
+ min(iinfo.max, platform_int_info.max),
+ size=size,
+ dtype=dtype
+ )
def get_wide_dataframe(size=10000, seed=0):
diff --git a/python/arcticdb/version_store/_normalization.py b/python/arcticdb/version_store/_normalization.py
index f3a5688990..c33c24f62c 100644
--- a/python/arcticdb/version_store/_normalization.py
+++ b/python/arcticdb/version_store/_normalization.py
@@ -5,6 +5,7 @@
As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0.
"""
+
import copy
import datetime
from datetime import timedelta
@@ -21,7 +22,12 @@
from arcticc.pb2.descriptors_pb2 import UserDefinedMetadata, NormalizationMetadata, MsgPackSerialization
from arcticc.pb2.storage_pb2 import VersionStoreConfig
from collections import Counter
-from arcticdb.exceptions import ArcticNativeException, ArcticDbNotYetImplemented, NormalizationException, SortingException
+from arcticdb.exceptions import (
+ ArcticNativeException,
+ ArcticDbNotYetImplemented,
+ NormalizationException,
+ SortingException,
+)
from arcticdb.supported_types import DateRangeInput, time_types as supported_time_types
from arcticdb.util._versions import IS_PANDAS_TWO, IS_PANDAS_ZERO
from arcticdb.version_store.read_result import ReadResult
@@ -237,12 +243,16 @@ def _to_primitive(arr, arr_name, dynamic_strings, string_max_len=None, coerce_co
if isinstance(sample, Timestamp):
# If we have pd.Timestamp as the sample, then:
# - 1: check they all have the same timezone
- tz_matches = np.vectorize(lambda element: pd.isna(element) or (isinstance(element, pd.Timestamp) and element.tz == sample.tz))
+ tz_matches = np.vectorize(
+ lambda element: pd.isna(element) or (isinstance(element, pd.Timestamp) and element.tz == sample.tz)
+ )
if not (tz_matches(arr)).all():
- raise NormalizationException(f"Failed to normalize column {arr_name}: first non-null element found is a "
- f"Timestamp with timezone '{sample.tz}', but one or more subsequent elements "
- f"are either not Timestamps or have differing timezones, neither of which is "
- f"supported.")
+ raise NormalizationException(
+ f"Failed to normalize column {arr_name}: first non-null element found is a "
+ f"Timestamp with timezone '{sample.tz}', but one or more subsequent elements "
+ f"are either not Timestamps or have differing timezones, neither of which is "
+ f"supported."
+ )
# - 2: try and clean up all NaNs inside it.
log.debug("Removing all NaNs from column: {} of type datetime64", arr_name)
return arr.astype(DTN64_DTYPE)
@@ -261,14 +271,16 @@ def _to_primitive(arr, arr_name, dynamic_strings, string_max_len=None, coerce_co
f"Do you have mixed dtypes in your column?"
)
- # Pick any unwanted data conversions (e.g. np.NaN to 'nan') or None to the string 'None'
+ # Pick any unwanted data conversions (e.g. np.nan to 'nan') or None to the string 'None'
if np.array_equal(arr, casted_arr):
return casted_arr
else:
if None in arr:
+ # in >Numpy2, checks like arr is None are not support due to being ambiguous
+ # so we have to use arr == None
raise ArcticDbNotYetImplemented(
"You have a None object in the numpy array at positions={} Column type={} for column={} "
- "which cannot be normalized.".format(np.where(arr is None)[0], arr.dtype, arr_name)
+ "which cannot be normalized.".format(np.where(arr == None), arr.dtype, arr_name)
)
else:
raise ArcticDbNotYetImplemented(
@@ -301,7 +313,9 @@ def _from_tz_timestamp(ts, tz):
_range_index_props_are_public = hasattr(RangeIndex, "start")
-def _normalize_single_index(index, index_names, index_norm, dynamic_strings=None, string_max_len=None, empty_types=False):
+def _normalize_single_index(
+ index, index_names, index_norm, dynamic_strings=None, string_max_len=None, empty_types=False
+):
# index: pd.Index or np.ndarray -> np.ndarray
index_tz = None
is_empty = len(index) == 0
@@ -569,7 +583,9 @@ def _index_to_records(self, df, pd_norm, dynamic_strings, string_max_len, empty_
is_categorical = len(df.select_dtypes(include="category").columns) > 0
index = DatetimeIndex([]) if IS_PANDAS_TWO and empty_df and not is_categorical else index
- return _normalize_single_index(index, list(index.names), index_norm, dynamic_strings, string_max_len, empty_types=empty_types)
+ return _normalize_single_index(
+ index, list(index.names), index_norm, dynamic_strings, string_max_len, empty_types=empty_types
+ )
def _index_from_records(self, item, norm_meta):
# type: (NormalizationMetadata.Pandas, _SUPPORTED_NATIVE_RETURN_TYPES, Bool)->Union[Index, DatetimeIndex, MultiIndex]
@@ -599,13 +615,15 @@ class SeriesNormalizer(_PandasNormalizer):
def __init__(self):
self._df_norm = DataFrameNormalizer()
- def normalize(self, item, string_max_len=None, dynamic_strings=False, coerce_columns=None, empty_types=False, **kwargs):
+ def normalize(
+ self, item, string_max_len=None, dynamic_strings=False, coerce_columns=None, empty_types=False, **kwargs
+ ):
df, norm = self._df_norm.normalize(
item.to_frame(),
dynamic_strings=dynamic_strings,
string_max_len=string_max_len,
coerce_columns=coerce_columns,
- empty_types=empty_types
+ empty_types=empty_types,
)
norm.series.CopyFrom(norm.df)
if item.name is not None:
@@ -708,7 +726,11 @@ def gen_blocks():
# TODO: Remove the casting after empty types become the only option
# Issue: https://github.com/man-group/ArcticDB/issues/1562
block = make_block(values=a.reshape((1, _len)), placement=(column_placement_in_block,))
- yield block.astype(np.float64) if _len == 0 and block.dtype == np.dtype('object') and not IS_PANDAS_TWO else block
+ yield (
+ block.astype(np.float64)
+ if _len == 0 and block.dtype == np.dtype("object") and not IS_PANDAS_TWO
+ else block
+ )
column_placement_in_block += 1
if cols is None or len(cols) == 0:
@@ -759,14 +781,16 @@ def denormalize(self, item, norm_meta):
# We cast it back to "float" so that it matches Pandas 1.0 default for empty series.
# Moreover, we explicitly provide the index otherwise Pandas 0.X overrides it for a RangeIndex
empty_columns_names = (
- [] if data is None else
- [
- name for name, np_array in data.items()
+ []
+ if data is None
+ else [
+ name
+ for name, np_array in data.items()
if np_array.dtype in OBJECT_TOKENS and len(df[name]) == 0
]
)
for column_name in empty_columns_names:
- df[column_name] = pd.Series([], index=index, dtype='float64')
+ df[column_name] = pd.Series([], index=index, dtype="float64")
else:
if index is not None:
df = self.df_without_consolidation(columns, index, item, n_indexes, data)
@@ -866,7 +890,9 @@ def _denormalize_multi_index(df: pd.DataFrame, norm_meta: NormalizationMetadata.
return df
- def normalize(self, item, string_max_len=None, dynamic_strings=False, coerce_columns=None, empty_types=False, **kwargs):
+ def normalize(
+ self, item, string_max_len=None, dynamic_strings=False, coerce_columns=None, empty_types=False, **kwargs
+ ):
# type: (DataFrame, Optional[int])->NormalizedInput
norm_meta = NormalizationMetadata()
norm_meta.df.common.mark = True
@@ -933,6 +959,7 @@ class MsgPackNormalizer(Normalizer):
"""
Fall back plan for the time being to store arbitrary data
"""
+
def __init__(self, cfg=None):
self.strict_mode = cfg.strict_mode if cfg is not None else False
@@ -1038,7 +1065,9 @@ def write(obj):
class TimeFrameNormalizer(Normalizer):
- def normalize(self, item, string_max_len=None, dynamic_strings=False, coerce_columns=None, empty_types=False, **kwargs):
+ def normalize(
+ self, item, string_max_len=None, dynamic_strings=False, coerce_columns=None, empty_types=False, **kwargs
+ ):
norm_meta = NormalizationMetadata()
norm_meta.ts.mark = True
index_norm = norm_meta.ts.common.index
@@ -1116,7 +1145,9 @@ def __init__(self, fallback_normalizer=None, use_norm_failure_handler_known_type
self.msg_pack_denorm = MsgPackNormalizer() # must exist for deserialization
self.fallback_normalizer = fallback_normalizer
- def _normalize(self, item, string_max_len=None, dynamic_strings=False, coerce_columns=None, empty_types=False, **kwargs):
+ def _normalize(
+ self, item, string_max_len=None, dynamic_strings=False, coerce_columns=None, empty_types=False, **kwargs
+ ):
normalizer = self.get_normalizer_for_type(item)
if not normalizer:
@@ -1166,7 +1197,14 @@ def get_normalizer_for_type(self, item):
return None
def normalize(
- self, item, string_max_len=None, pickle_on_failure=False, dynamic_strings=False, coerce_columns=None, empty_types=False, **kwargs
+ self,
+ item,
+ string_max_len=None,
+ pickle_on_failure=False,
+ dynamic_strings=False,
+ coerce_columns=None,
+ empty_types=False,
+ **kwargs,
):
"""
:param item: Item to be normalized to something Arctic Native understands.
@@ -1217,11 +1255,11 @@ def denormalize(self, item, norm_meta):
normalize = _NORMALIZER.normalize
denormalize = _NORMALIZER.denormalize
-_MAX_USER_DEFINED_META = 16 << 20 # 16MB
-_WARN_USER_DEFINED_META = 8 << 20 # 8MB
+_MAX_USER_DEFINED_META = 16 << 20 # 16MB
+_WARN_USER_DEFINED_META = 8 << 20 # 8MB
-_MAX_RECURSIVE_METASTRUCT = 16 << 20 # 16MB
-_WARN_RECURSIVE_METASTRUCT = 8 << 20 # 8MB
+_MAX_RECURSIVE_METASTRUCT = 16 << 20 # 16MB
+_WARN_RECURSIVE_METASTRUCT = 8 << 20 # 8MB
def _init_msgpack_metadata():
@@ -1247,9 +1285,11 @@ def normalize_metadata(metadata: Any) -> UserDefinedMetadata:
packed = _msgpack_metadata._msgpack_packb(metadata)
size = len(packed)
if size > _MAX_USER_DEFINED_META:
- raise ArcticDbNotYetImplemented(f'User defined metadata cannot exceed {_MAX_USER_DEFINED_META}B')
+ raise ArcticDbNotYetImplemented(f"User defined metadata cannot exceed {_MAX_USER_DEFINED_META}B")
if size > _WARN_USER_DEFINED_META:
- log.warn(f'User defined metadata is above warning size ({_WARN_USER_DEFINED_META}B), metadata cannot exceed {_MAX_USER_DEFINED_META}B. Current size: {size}B.')
+ log.warn(
+ f"User defined metadata is above warning size ({_WARN_USER_DEFINED_META}B), metadata cannot exceed {_MAX_USER_DEFINED_META}B. Current size: {size}B."
+ )
udm = UserDefinedMetadata()
udm.inline_payload = packed
@@ -1261,9 +1301,13 @@ def normalize_recursive_metastruct(metastruct: Dict[Any, Any]) -> UserDefinedMet
packed = _msgpack_metadata._msgpack_packb(metastruct)
size = len(packed)
if size > _MAX_RECURSIVE_METASTRUCT:
- raise ArcticDbNotYetImplemented(f'Recursively normalized data normalization metadata cannot exceed {_MAX_RECURSIVE_METASTRUCT}B')
+ raise ArcticDbNotYetImplemented(
+ f"Recursively normalized data normalization metadata cannot exceed {_MAX_RECURSIVE_METASTRUCT}B"
+ )
if size > _WARN_RECURSIVE_METASTRUCT:
- log.warn(f'Recursively normalized data normalization metadata is above warning size ({_WARN_RECURSIVE_METASTRUCT}B), cannot exceed {_MAX_RECURSIVE_METASTRUCT}B. Current size: {size}B.')
+ log.warn(
+ f"Recursively normalized data normalization metadata is above warning size ({_WARN_RECURSIVE_METASTRUCT}B), cannot exceed {_MAX_RECURSIVE_METASTRUCT}B. Current size: {size}B."
+ )
udm = UserDefinedMetadata()
udm.inline_payload = packed
@@ -1289,6 +1333,7 @@ def denormalize_dataframe(ret):
return DataFrameNormalizer().denormalize(frame_data, read_result.norm.df)
+
def normalize_dataframe(df, **kwargs):
return DataFrameNormalizer().normalize(df, **kwargs)
@@ -1324,8 +1369,7 @@ def _strip_tz(s, e):
if not getattr(data, "timezone", None):
start, end = _strip_tz(start, end)
data = data[
- start.to_pydatetime(warn=False)
- - timedelta(microseconds=1) : end.to_pydatetime(warn=False)
+ start.to_pydatetime(warn=False) - timedelta(microseconds=1) : end.to_pydatetime(warn=False)
+ timedelta(microseconds=1)
]
return data
diff --git a/python/arcticdb/version_store/_store.py b/python/arcticdb/version_store/_store.py
index b34912c040..4a45c24fb7 100644
--- a/python/arcticdb/version_store/_store.py
+++ b/python/arcticdb/version_store/_store.py
@@ -5,11 +5,13 @@
As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0.
"""
+
import copy
import datetime
import os
import sys
import pandas as pd
+import numpy as np
import pytz
import re
import itertools
@@ -69,8 +71,9 @@
_from_tz_timestamp,
restrict_data_to_date_range_only,
normalize_dt_range_to_ts,
- _denormalize_single_index
+ _denormalize_single_index,
)
+
TimeSeriesType = Union[pd.DataFrame, pd.Series]
from arcticdb.util._versions import PANDAS_VERSION
from packaging.version import Version
@@ -216,9 +219,10 @@ class NativeVersionStore:
"""
_warned_about_list_version_latest_only_and_snapshot: bool = False
- norm_failure_options_msg_write = \
- "Setting the pickle_on_failure parameter to True will allow the object to be written. However, many " \
+ norm_failure_options_msg_write = (
+ "Setting the pickle_on_failure parameter to True will allow the object to be written. However, many "
"operations (such as date_range filtering and column selection) will not work on pickled data."
+ )
norm_failure_options_msg_append = "Data must be normalizable to be appended to existing data."
norm_failure_options_msg_update = "Data must be normalizable to be used to update existing data."
@@ -256,12 +260,10 @@ def _initialize(self, library, env, lib_cfg, custom_normalizer, open_mode, nativ
self._open_mode = open_mode
self._native_cfg = native_cfg
-
@classmethod
def create_store_from_lib_config(cls, lib_cfg, env, open_mode=OpenMode.DELETE):
lib = cls.create_lib_from_lib_config(lib_cfg, env, open_mode)
return cls(library=lib, lib_cfg=lib_cfg, env=env, open_mode=open_mode)
-
@staticmethod
def create_library_config(cfg, env, lib_name, encoding_version=EncodingVersion.V1):
@@ -276,7 +278,9 @@ def create_store_from_config(
cls, cfg, env, lib_name, open_mode=OpenMode.DELETE, encoding_version=EncodingVersion.V1
):
protobuf_cfg, native_cfg = NativeVersionStore.get_environment_cfg_and_native_cfg_from_tuple(cfg)
- lib_cfg = NativeVersionStore.create_library_config(protobuf_cfg, env, lib_name, encoding_version=encoding_version)
+ lib_cfg = NativeVersionStore.create_library_config(
+ protobuf_cfg, env, lib_name, encoding_version=encoding_version
+ )
lib = cls.create_lib_from_config((protobuf_cfg, native_cfg), env, lib_cfg.lib_desc.name, open_mode)
return cls(library=lib, lib_cfg=lib_cfg, env=env, open_mode=open_mode, native_cfg=native_cfg)
@@ -294,14 +298,12 @@ def create_lib_from_config(cfg, env, lib_name, open_mode=OpenMode.DELETE):
lib_idx = _LibraryIndex.create_from_resolver(env, cfg_resolver)
return lib_idx.get_library(lib_name, _OpenMode(open_mode), native_cfg)
-
@staticmethod
def get_environment_cfg_and_native_cfg_from_tuple(cfgs):
if isinstance(cfgs, tuple):
return cfgs
else:
return cfgs, None
-
def __setstate__(self, state):
lib_cfg = LibraryConfig()
@@ -348,15 +350,15 @@ def get_backing_store(self):
return backing_store
def _try_normalize(
- self,
- symbol,
- dataframe,
- metadata,
- pickle_on_failure,
- dynamic_strings,
- coerce_columns,
- norm_failure_options_msg="",
- **kwargs
+ self,
+ symbol,
+ dataframe,
+ metadata,
+ pickle_on_failure,
+ dynamic_strings,
+ coerce_columns,
+ norm_failure_options_msg="",
+ **kwargs,
):
dynamic_schema = self.resolve_defaults(
"dynamic_schema", self._lib_cfg.lib_desc.version.write_options, False, **kwargs
@@ -382,7 +384,9 @@ def _try_normalize(
)
except ArcticDbNotYetImplemented as ex:
log.debug(f"ArcticDbNotYetImplemented: data: \n{dataframe}, metadata: {metadata}")
- raise ArcticDbNotYetImplemented(f"Not supported: normalizing, symbol: {symbol}, Reason: {ex}, {norm_failure_options_msg}")
+ raise ArcticDbNotYetImplemented(
+ f"Not supported: normalizing, symbol: {symbol}, Reason: {ex}, {norm_failure_options_msg}"
+ )
except Exception as ex:
log.debug(f"ArcticNativeException: data: \n{dataframe}, metadata: {metadata}")
raise ArcticNativeException(f"Error while normalizing symbol={symbol}, {ex}")
@@ -476,13 +480,14 @@ def resolve_defaults(param_name, proto_cfg, global_default, existing_value=None,
return global_default
def stage(
- self,
- symbol: str,
- data: Any,
- validate_index: bool = False,
- sort_on_index: bool = False,
- sort_columns: List[str] = None,
- **kwargs):
+ self,
+ symbol: str,
+ data: Any,
+ validate_index: bool = False,
+ sort_on_index: bool = False,
+ sort_columns: List[str] = None,
+ **kwargs,
+ ):
norm_failure_options_msg = kwargs.get("norm_failure_options_msg", self.norm_failure_options_msg_write)
_handle_categorical_columns(symbol, data, True)
udm, item, norm_meta = self._try_normalize(
@@ -829,9 +834,7 @@ def update(
update_query = _PythonVersionStoreUpdateQuery()
dynamic_strings = self._resolve_dynamic_strings(kwargs)
proto_cfg = self._lib_cfg.lib_desc.version.write_options
- dynamic_schema = self.resolve_defaults(
- "dynamic_schema", proto_cfg, False, **kwargs
- )
+ dynamic_schema = self.resolve_defaults("dynamic_schema", proto_cfg, False, **kwargs)
coerce_columns = kwargs.get("coerce_columns", None)
prune_previous_version = self.resolve_defaults(
@@ -1069,7 +1072,7 @@ def batch_read_metadata(
Reads the metadata for multiple symbols in a batch fashion. This is more efficient than making multiple
`read_metadata` calls in succession as some constant-time operations can be executed only once rather than once
for each element of `symbols`.
-
+
If a `symbol` or its `as_of` in the query does not exist then the symbol will not be present in the resulting dict.
Consider using `Library#read_metadata_batch` instead, which has improved error handling behaviour.
@@ -1125,7 +1128,7 @@ def _batch_read_metadata_to_versioned_items(self, symbols, as_ofs, include_error
version=vitem.version,
metadata=meta,
host=self.env,
- timestamp=vitem.timestamp
+ timestamp=vitem.timestamp,
)
)
return meta_items
@@ -1181,7 +1184,7 @@ def batch_read_metadata_multi(
version=vitem.version,
metadata=meta,
host=self.env,
- timestamp=vitem.timestamp
+ timestamp=vitem.timestamp,
)
return results_dict
@@ -1195,7 +1198,7 @@ def _convert_thin_cxx_item_to_python(self, cxx_versioned_item, metadata) -> Vers
version=cxx_versioned_item.version,
metadata=metadata,
host=self.env,
- timestamp=cxx_versioned_item.timestamp
+ timestamp=cxx_versioned_item.timestamp,
)
def batch_write(
@@ -1530,7 +1533,7 @@ def batch_restore_version(
version=result.version.version,
metadata=meta,
host=self.env,
- timestamp=result.version.timestamp
+ timestamp=result.version.timestamp,
)
for result, meta in zip(read_results, metadatas)
]
@@ -1686,9 +1689,9 @@ def _postprocess_df_with_only_rowcount_idx(self, read_result, row_range):
stop = index_meta.start + read_result.frame_data.row_count * step
index = pd.RangeIndex(start=index_meta.start, stop=stop, step=step)
if row_range:
- index=index[row_range[0]:row_range[1]]
+ index = index[row_range[0] : row_range[1]]
elif PANDAS_VERSION < Version("2.0.0"):
- index = pd.RangeIndex(start=0, stop=0,step=1)
+ index = pd.RangeIndex(start=0, stop=0, step=1)
else:
index = pd.DatetimeIndex([])
meta = denormalize_user_metadata(read_result.udm, self._normalizer)
@@ -1699,11 +1702,16 @@ def _postprocess_df_with_only_rowcount_idx(self, read_result, row_range):
version=read_result.version.version,
metadata=meta,
host=self.env,
- timestamp=read_result.version.timestamp
+ timestamp=read_result.version.timestamp,
)
def _resolve_empty_columns(self, columns, implement_read_index):
- if not implement_read_index and columns == []:
+ is_columns_empty = (
+ columns is None
+ or (isinstance(columns, list) and len(columns) == 0)
+ or (isinstance(columns, np.ndarray) and columns.size == 0)
+ )
+ if not implement_read_index and is_columns_empty:
columns = None
return columns
@@ -1841,15 +1849,17 @@ def _read_dataframe(self, symbol, version_query, read_query, read_options):
def _post_process_dataframe(self, read_result, read_query, implement_read_index=False, head=None, tail=None):
index_type = read_result.norm.df.common.WhichOneof("index_type")
- index_is_rowcount = (index_type == "index" and
- not read_result.norm.df.common.index.is_physically_stored and
- len(read_result.frame_data.index_columns) == 0)
+ index_is_rowcount = (
+ index_type == "index"
+ and not read_result.norm.df.common.index.is_physically_stored
+ and len(read_result.frame_data.index_columns) == 0
+ )
if implement_read_index and read_query.columns == [] and index_is_rowcount:
row_range = None
if head:
- row_range=(0, head)
+ row_range = (0, head)
elif tail:
- row_range=(-tail, None)
+ row_range = (-tail, None)
elif read_query.row_filter is not None:
row_range = self._compute_filter_start_end_row(read_result, read_query)
return self._postprocess_df_with_only_rowcount_idx(read_result, row_range)
@@ -1878,7 +1888,7 @@ def _post_process_dataframe(self, read_result, read_query, implement_read_index=
version=vitem.version,
metadata=vitem.metadata,
host=vitem.host,
- timestamp=vitem.timestamp
+ timestamp=vitem.timestamp,
)
return vitem
@@ -1955,7 +1965,7 @@ def restore_version(self, symbol: str, as_of: Optional[VersionQueryInput] = None
version=read_result.version.version,
metadata=meta,
host=self.env,
- timestamp=read_result.version.timestamp
+ timestamp=read_result.version.timestamp,
)
def list_symbols_with_incomplete_data(self) -> List[str]:
@@ -1992,7 +2002,7 @@ def compact_incomplete(
metadata: Optional[Any] = None,
prune_previous_version: Optional[bool] = None,
validate_index: bool = False,
- delete_staged_data_on_failure: bool = False
+ delete_staged_data_on_failure: bool = False,
) -> VersionedItem:
"""
Compact previously written un-indexed chunks of data, produced by a tick collector or parallel
@@ -2022,11 +2032,11 @@ def compact_incomplete(
update operations. This requires that the indexes of the incomplete segments are non-overlapping with each
other, and, in the case of append=True, fall after the last index value in the previous version.
delete_staged_data_on_failure : bool, default=False
- Determines the handling of staged data when an exception occurs during the execution of the
+ Determines the handling of staged data when an exception occurs during the execution of the
``compact_incomplete`` function.
- If set to True, all staged data for the specified symbol will be deleted if an exception occurs.
- - If set to False, the staged data will be retained and will be used in subsequent calls to
+ - If set to False, the staged data will be retained and will be used in subsequent calls to
``compact_incomplete``.
To manually delete staged data, use the ``remove_incomplete`` function.
@@ -2040,7 +2050,15 @@ def compact_incomplete(
)
udm = normalize_metadata(metadata)
vit = self.version_store.compact_incomplete(
- symbol, append, convert_int_to_float, via_iteration, sparsify, udm, prune_previous_version, validate_index, delete_staged_data_on_failure
+ symbol,
+ append,
+ convert_int_to_float,
+ via_iteration,
+ sparsify,
+ udm,
+ prune_previous_version,
+ validate_index,
+ delete_staged_data_on_failure,
)
return self._convert_thin_cxx_item_to_python(vit, metadata)
@@ -2077,7 +2095,7 @@ def _adapt_read_res(self, read_result: ReadResult) -> VersionedItem:
version=read_result.version.version,
metadata=meta,
host=self.env,
- timestamp=read_result.version.timestamp
+ timestamp=read_result.version.timestamp,
)
def list_versions(
@@ -2129,7 +2147,9 @@ def list_versions(
List of dictionaries describing the discovered versions in the library.
"""
if iterate_on_failure:
- log.warning("The iterate_on_failure argument is deprecated and will soon be removed. It's safe to remove since it doesn't change behavior.")
+ log.warning(
+ "The iterate_on_failure argument is deprecated and will soon be removed. It's safe to remove since it doesn't change behavior."
+ )
if latest_only and snapshot and not NativeVersionStore._warned_about_list_version_latest_only_and_snapshot:
log.warning("latest_only has no effect when snapshot is specified")
@@ -2246,7 +2266,7 @@ def snapshot(
and versions written to the library post snapshot creation.
``NoSuchVersionException`` will be thrown if no symbol exist in the library
-
+
Parameters
----------
snap_name : `str`
@@ -2342,9 +2362,7 @@ def delete(self, symbol: str, date_range: Optional[DateRangeInput] = None, **kwa
"""
if date_range is not None:
proto_cfg = self._lib_cfg.lib_desc.version.write_options
- dynamic_schema = self.resolve_defaults(
- "dynamic_schema", proto_cfg, False, **kwargs
- )
+ dynamic_schema = self.resolve_defaults("dynamic_schema", proto_cfg, False, **kwargs)
# All other methods use prune_previous_version, but also support prune_previous_versions here in case
# anyone is relying on it
prune_previous_versions = _assume_false("prune_previous_versions", kwargs)
@@ -2441,12 +2459,11 @@ def has_symbol(
True if the symbol exists as_of the specified revision, False otherwise.
"""
if iterate_on_failure:
- log.warning("The iterate_on_failure argument is deprecated and will soon be removed. It's safe to remove since it doesn't change behavior.")
+ log.warning(
+ "The iterate_on_failure argument is deprecated and will soon be removed. It's safe to remove since it doesn't change behavior."
+ )
- return (
- self._find_version(symbol, as_of=as_of, raise_on_missing=False)
- is not None
- )
+ return self._find_version(symbol, as_of=as_of, raise_on_missing=False) is not None
def column_names(self, symbol: str, as_of: Optional[VersionQueryInput] = None) -> List[str]:
"""
@@ -2533,7 +2550,7 @@ def read_metadata(self, symbol: str, as_of: Optional[VersionQueryInput] = None,
version=version_item.version,
metadata=meta,
host=self.env,
- timestamp=version_item.timestamp
+ timestamp=version_item.timestamp,
)
def get_type(self) -> str:
@@ -2711,11 +2728,11 @@ def open_mode(self):
return self._open_mode
def _process_info(
- self,
- symbol: str,
- dit,
- as_of: VersionQueryInput,
- date_range_ns_precision: bool,
+ self,
+ symbol: str,
+ dit,
+ as_of: VersionQueryInput,
+ date_range_ns_precision: bool,
) -> Dict[str, Any]:
timeseries_descriptor = dit.timeseries_descriptor
columns = [f.name for f in timeseries_descriptor.fields]
@@ -2764,7 +2781,9 @@ def _process_info(
if timeseries_descriptor.normalization.df.has_synthetic_columns:
columns = pd.RangeIndex(0, len(columns))
- date_range = self._get_time_range_from_ts(timeseries_descriptor, dit.start_index, dit.end_index, date_range_ns_precision)
+ date_range = self._get_time_range_from_ts(
+ timeseries_descriptor, dit.start_index, dit.end_index, date_range_ns_precision
+ )
last_update = datetime64(dit.creation_ts, "ns")
return {
"col_names": {"columns": columns, "index": index, "index_dtype": index_dtype},
@@ -2779,12 +2798,7 @@ def _process_info(
"sorted": sorted_value_name(timeseries_descriptor.sorted),
}
- def get_info(
- self,
- symbol: str,
- version: Optional[VersionQueryInput] = None,
- **kwargs
- ) -> Dict[str, Any]:
+ def get_info(self, symbol: str, version: Optional[VersionQueryInput] = None, **kwargs) -> Dict[str, Any]:
"""
Returns descriptive data for `symbol`.
@@ -2933,11 +2947,11 @@ def is_symbol_fragmented(self, symbol: str, segment_size: Optional[int] = None)
return self.version_store.is_symbol_fragmented(symbol, segment_size)
def defragment_symbol_data(
- self,
- symbol: str,
- segment_size: Optional[int] = None,
- prune_previous_versions: Optional[bool] = None,
- **kwargs,
+ self,
+ symbol: str,
+ segment_size: Optional[int] = None,
+ prune_previous_versions: Optional[bool] = None,
+ **kwargs,
) -> VersionedItem:
"""
Compacts fragmented segments by merging row-sliced segments (https://docs.arcticdb.io/technical/on_disk_storage/#data-layer).
@@ -3016,7 +3030,7 @@ def defragment_symbol_data(
metadata=None,
data=None,
host=self.env,
- timestamp=result.timestamp
+ timestamp=result.timestamp,
)
def library(self):
diff --git a/python/tests/hypothesis/arcticdb/test_sort_merge.py b/python/tests/hypothesis/arcticdb/test_sort_merge.py
index 2417d1bf08..f524f712a3 100644
--- a/python/tests/hypothesis/arcticdb/test_sort_merge.py
+++ b/python/tests/hypothesis/arcticdb/test_sort_merge.py
@@ -93,7 +93,7 @@ def merge_and_sort_segment_list(segment_list, int_columns_in_df=None):
# it will become float column and the missing values will be NaN.
int_columns_in_df = int_columns_in_df if int_columns_in_df else []
for col in int_columns_in_df:
- merged[col] = merged[col].replace(np.NaN, 0)
+ merged[col] = merged[col].replace(np.nan, 0)
merged.sort_index(inplace=True)
return merged
diff --git a/python/tests/integration/arcticdb/test_arctic.py b/python/tests/integration/arcticdb/test_arctic.py
index e1d31f60ad..f1c30a8bbc 100644
--- a/python/tests/integration/arcticdb/test_arctic.py
+++ b/python/tests/integration/arcticdb/test_arctic.py
@@ -107,6 +107,7 @@ def test_s3_no_ssl_verification(monkeypatch, s3_no_ssl_storage, client_cert_file
@REAL_S3_TESTS_MARK
+@pytest.mark.skip(reason="This test is not stable")
def test_s3_sts_auth(real_s3_sts_storage):
ac = Arctic(real_s3_sts_storage.arctic_uri)
lib = ac.create_library("test")
diff --git a/python/tests/integration/arcticdb/test_persistent_storage.py b/python/tests/integration/arcticdb/test_persistent_storage.py
index 4830669b8e..54a905e7cb 100644
--- a/python/tests/integration/arcticdb/test_persistent_storage.py
+++ b/python/tests/integration/arcticdb/test_persistent_storage.py
@@ -18,8 +18,11 @@
@pytest.fixture(params=[pytest.param("REAL_S3", marks=REAL_S3_TESTS_MARK)])
-def shared_persistent_arctic_client(real_s3_storage_without_clean_up, encoding_version):
- return real_s3_storage_without_clean_up.create_arctic(encoding_version=encoding_version)
+# Only test with encoding version 0 (a.k.a.) for now
+# because there is a problem when older versions try to read configs with a written encoding version
+# def shared_persistent_arctic_client(real_s3_storage_without_clean_up, encoding_version):
+def shared_persistent_arctic_client(real_s3_storage_without_clean_up):
+ return real_s3_storage_without_clean_up.create_arctic()
# TODO: Add a check if the real storage tests are enabled
diff --git a/python/tests/integration/arcticdb/test_storage_lock.py b/python/tests/integration/arcticdb/test_storage_lock.py
index b3294943c0..6226cc8a52 100644
--- a/python/tests/integration/arcticdb/test_storage_lock.py
+++ b/python/tests/integration/arcticdb/test_storage_lock.py
@@ -1,6 +1,7 @@
import pandas as pd
import numpy as np
import pytest
+import sys
from arcticdb_ext.tools import ReliableStorageLock, ReliableStorageLockManager
from tests.util.mark import PERSISTENT_STORAGE_TESTS_ENABLED, REAL_S3_TESTS_MARK
@@ -23,6 +24,7 @@ def slow_increment_task(lib, symbol, sleep_time, lock_manager, lock):
lock_manager.free_lock_guard()
+@pytest.mark.skip(reason="This test is flaky, skip it temporarily")
@pytest.mark.parametrize("num_processes,max_sleep", [(100, 1), (5, 20)])
@REAL_S3_TESTS_MARK
def test_many_increments(real_s3_version_store, num_processes, max_sleep):
@@ -31,11 +33,11 @@ def test_many_increments(real_s3_version_store, num_processes, max_sleep):
symbol = "counter"
lib.version_store.force_delete_symbol(symbol)
lib.write(symbol, init_df)
- lock = ReliableStorageLock("test_lock", lib._library, 10*one_sec)
+ lock = ReliableStorageLock("test_lock", lib._library, 10 * one_sec)
lock_manager = ReliableStorageLockManager()
processes = [
- Process(target=slow_increment_task, args=(lib, symbol, 0 if i%2==0 else max_sleep, lock_manager, lock))
+ Process(target=slow_increment_task, args=(lib, symbol, 0 if i % 2 == 0 else max_sleep, lock_manager, lock))
for i in range(num_processes)
]
for p in processes:
diff --git a/python/tests/integration/arcticdb/version_store/test_basic_version_store.py b/python/tests/integration/arcticdb/version_store/test_basic_version_store.py
index 92ad6e9a4c..c2c6d3dda8 100644
--- a/python/tests/integration/arcticdb/version_store/test_basic_version_store.py
+++ b/python/tests/integration/arcticdb/version_store/test_basic_version_store.py
@@ -61,10 +61,12 @@ def assert_equal_value(data, expected):
expected = expected.reindex(sorted(expected.columns), axis=1)
assert_frame_equal(received, expected)
+
def assert_equal(received, expected):
assert_frame_equal(received, expected)
assert received.equals(expected)
+
def test_simple_flow(basic_store_no_symbol_list, symbol):
df = sample_dataframe()
modified_df = pd.DataFrame({"col": [1, 2, 3, 4]})
@@ -248,7 +250,7 @@ def test_empty_symbol_name_2(object_version_store):
@pytest.mark.parametrize(
- "method", ("write", "append", "update", "write_metadata" ,"batch_write", "batch_append", "batch_write_metadata")
+ "method", ("write", "append", "update", "write_metadata", "batch_write", "batch_append", "batch_write_metadata")
)
def test_empty_symbol_name(lmdb_version_store_v1, method):
first_arg = [""] if method.startswith("batch_") else ""
@@ -1711,7 +1713,7 @@ def test_dataframe_with_NaN_in_timestamp_column(basic_store):
def test_dataframe_with_nan_and_nat_in_timestamp_column(basic_store):
- df_with_NaN_mixed_in_ts = pd.DataFrame({"col": [pd.Timestamp("now"), pd.NaT, np.NaN]})
+ df_with_NaN_mixed_in_ts = pd.DataFrame({"col": [pd.Timestamp("now"), pd.NaT, np.nan]})
basic_store.write("mixed_nan", df_with_NaN_mixed_in_ts)
returned_df = basic_store.read("mixed_nan").data
# NaN will now be converted to NaT
@@ -1719,16 +1721,16 @@ def test_dataframe_with_nan_and_nat_in_timestamp_column(basic_store):
def test_dataframe_with_nan_and_nat_only(basic_store):
- df_with_nan_and_nat_only = pd.DataFrame({"col": [pd.NaT, pd.NaT, np.NaN]}) # Sample will be pd.NaT
+ df_with_nan_and_nat_only = pd.DataFrame({"col": [pd.NaT, pd.NaT, np.nan]}) # Sample will be pd.NaT
basic_store.write("nan_nat", df_with_nan_and_nat_only)
assert_equal(basic_store.read("nan_nat").data, pd.DataFrame({"col": [pd.NaT, pd.NaT, pd.NaT]}))
def test_coercion_to_float(basic_store):
lib = basic_store
- df = pd.DataFrame({"col": [np.NaN, "1", np.NaN]})
+ df = pd.DataFrame({"col": [np.nan, "1", np.nan]})
# col is now an Object column with all NaNs
- df["col"][1] = np.NaN
+ df["col"][1] = np.nan
assert df["col"].dtype == np.object_
@@ -2350,7 +2352,7 @@ def test_batch_read_row_range(lmdb_version_store_v1, use_row_range_clause):
for idx, sym in enumerate(result_dict.keys()):
df = result_dict[sym].data
row_range = row_ranges[idx]
- assert_equal(df, dfs[idx].iloc[row_range[0]:row_range[1]])
+ assert_equal(df, dfs[idx].iloc[row_range[0] : row_range[1]])
def test_batch_read_columns(basic_store_tombstone_and_sync_passive):
diff --git a/python/tests/stress/arcticdb/version_store/test_mem_leaks.py b/python/tests/stress/arcticdb/version_store/test_mem_leaks.py
index c9b8b4fd97..a261518541 100644
--- a/python/tests/stress/arcticdb/version_store/test_mem_leaks.py
+++ b/python/tests/stress/arcticdb/version_store/test_mem_leaks.py
@@ -27,14 +27,15 @@
from arcticdb.version_store.library import Library, ReadRequest
from arcticdb.version_store.processing import QueryBuilder
from arcticdb.version_store._store import NativeVersionStore
-from tests.util.mark import MACOS, SLOW_TESTS_MARK, WINDOWS, MEMRAY_SUPPORTED, MEMRAY_TESTS_MARK, SKIP_CONDA_MARK
+from tests.util.mark import MACOS, SLOW_TESTS_MARK, WINDOWS, MEMRAY_SUPPORTED, MEMRAY_TESTS_MARK, SKIP_CONDA_MARK
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("Memory_tests")
-#region HELPER functions for non-memray tests
+# region HELPER functions for non-memray tests
+
def nice_bytes_str(bytes):
return f" {bytes / (1024 * 1024):.2f}MB/[{bytes}] "
@@ -135,12 +136,12 @@ def check_process_memory_leaks(
print(f" Maximum growth so far: {nice_bytes_str(max(mem_growth_each_iter))}")
print(f" Number of times there was 50% drop in memory: {count_drops(mem_growth_each_iter, 0.5)}")
- assert (
- max_total_mem_lost_threshold_bytes >= process_growth
- ), f"Memory of the process grew more than defined threshold: {nice_bytes_str(process_growth)} (specified: {nice_bytes_str(max_total_mem_lost_threshold_bytes)} )"
- assert (
- max_machine_memory_percentage >= mem_per
- ), f"Machine utilized more memory than specified threshold :{mem_per}% (specified {max_machine_memory_percentage}%)"
+ assert max_total_mem_lost_threshold_bytes >= process_growth, (
+ f"Memory of the process grew more than defined threshold: {nice_bytes_str(process_growth)} (specified: {nice_bytes_str(max_total_mem_lost_threshold_bytes)} )"
+ )
+ assert max_machine_memory_percentage >= mem_per, (
+ f"Machine utilized more memory than specified threshold :{mem_per}% (specified {max_machine_memory_percentage}%)"
+ )
print(
"The process assessment finished within expectations. Total consumed additional mem is bellow threshold: ",
@@ -161,7 +162,7 @@ def grow_exp(df_to_grow: pd.DataFrame, num_times_xx2: int):
return df_to_grow
-def generate_big_dataframe(rows: int = 1000000, num_exp_time_growth: int=5) -> pd.DataFrame:
+def generate_big_dataframe(rows: int = 1000000, num_exp_time_growth: int = 5) -> pd.DataFrame:
"""
A quick and time efficient wat to generate very large dataframe.
The first parameter will be passed to get_sample_dataframe() so that a dataframe
@@ -177,9 +178,11 @@ def generate_big_dataframe(rows: int = 1000000, num_exp_time_growth: int=5) -> p
logger.info(f"Generation took : {time.time() - st}")
return df
-#endregion
-#region HELPER functions for memray tests
+# endregion
+
+# region HELPER functions for memray tests
+
def construct_df_querybuilder_tests(size: int) -> pd.DataFrame:
df = get_sample_dataframe(size)
@@ -197,28 +200,18 @@ def query_filter_then_groupby_with_aggregations() -> QueryBuilder:
return (
q[q["bool"]]
.groupby("uint8")
- .agg({"uint32": "mean",
- "int32": "sum",
- "strings": "count",
- "float64": "sum",
- "float32": "min",
- "int16": "max"})
+ .agg({"uint32": "mean", "int32": "sum", "strings": "count", "float64": "sum", "float32": "min", "int16": "max"})
)
+
def query_no_filter_only_groupby_with_aggregations() -> QueryBuilder:
"""
groupby composite aggregation query for QueryBuilder memory tests.
The query basically will do aggregation of half of dataframe
"""
q = QueryBuilder()
- return (
- q.groupby("uint8")
- .agg({"uint32": "mean",
- "int32": "sum",
- "strings": "count",
- "float64": "sum",
- "float32": "min",
- "int16": "max"})
+ return q.groupby("uint8").agg(
+ {"uint32": "mean", "int32": "sum", "strings": "count", "float64": "sum", "float32": "min", "int16": "max"}
)
@@ -231,12 +224,7 @@ def query_filter_impossible_cond_groupby_with_aggregations_for_whole_frame() ->
return (
q[q["strings"] != "QASDFGH"]
.groupby("int16")
- .agg({"uint32": "mean",
- "int32": "sum",
- "strings": "count",
- "float64": "sum",
- "float32": "min",
- "int16": "max"})
+ .agg({"uint32": "mean", "int32": "sum", "strings": "count", "float64": "sum", "float32": "min", "int16": "max"})
)
@@ -259,16 +247,18 @@ def query_resample_minutes() -> QueryBuilder:
"""
q = QueryBuilder()
return q.resample("min").agg(
- {"int8" : "min",
- "int16" : "max",
- "int32" : "first",
- "int64" : "last",
- "uint64" : "sum",
- "float32" : "mean",
- "float64" : "sum",
- "strings" : "count",
- "bool" : "sum"}
- )
+ {
+ "int8": "min",
+ "int16": "max",
+ "int32": "first",
+ "int64": "last",
+ "uint64": "sum",
+ "float32": "mean",
+ "float64": "sum",
+ "strings": "count",
+ "bool": "sum",
+ }
+ )
def query_row_range_57percent(size: int) -> QueryBuilder:
@@ -280,16 +270,16 @@ def query_row_range_57percent(size: int) -> QueryBuilder:
start_percentage = random.uniform(0.01, 1.0 - percentage_rows_returned)
result_size_rows = int(0.57 * size)
q = QueryBuilder()
- a = random.randint(0,int((size-1) * start_percentage))
+ a = random.randint(0, int((size - 1) * start_percentage))
b = a + result_size_rows
logger.info(f" GENERATED ROW RANGE {a} - {b}")
- return q.row_range( (a, b) )
+ return q.row_range((a, b))
def query_date_range_57percent(start: pd.Timestamp, end: pd.Timestamp) -> QueryBuilder:
"""
Date range query for QueryBuilder memory tests
- Will generate random date range that will return
+ Will generate random date range that will return
always the specified percentage rows
"""
percentage_rows_returned = 0.57
@@ -301,7 +291,7 @@ def query_date_range_57percent(start: pd.Timestamp, end: pd.Timestamp) -> QueryB
percent_duration = total_duration * percentage_rows_returned
b = a + percent_duration
logger.info(f" GENERATED DATE RANGE {a} - {b}")
- return q.date_range( (a,b))
+ return q.date_range((a, b))
def print_info(data: pd.DataFrame, q: QueryBuilder):
@@ -313,16 +303,19 @@ def gen_random_date(start: pd.Timestamp, end: pd.Timestamp):
"""
Returns random timestamp from specified period
"""
- date_range = pd.date_range(start=start, end=end, freq='s')
+ date_range = pd.date_range(start=start, end=end, freq="s")
return random.choice(date_range)
-#endregion
-#region TESTS non-memray type - "guessing" memory leak through series of repetitions
+# endregion
+
+# region TESTS non-memray type - "guessing" memory leak through series of repetitions
+
-@SKIP_CONDA_MARK # Conda CI runner doesn't have enough storage to perform these stress tests
-@pytest.mark.skipif(WINDOWS,
- reason="Not enough storage on Windows runners, due to large Win OS footprint and less free mem")
+@SKIP_CONDA_MARK # Conda CI runner doesn't have enough storage to perform these stress tests
+@pytest.mark.skipif(
+ WINDOWS, reason="Not enough storage on Windows runners, due to large Win OS footprint and less free mem"
+)
@pytest.mark.skipif(MACOS, reason="Problem on MacOs most probably similar to WINDOWS")
def test_mem_leak_read_all_arctic_lib(arctic_library_lmdb_100gb):
lib: adb.Library = arctic_library_lmdb_100gb
@@ -361,10 +354,12 @@ def proc_to_examine():
check_process_memory_leaks(proc_to_examine, 20, max_mem_bytes, 80.0)
-@pytest.mark.skipif(WINDOWS,
- reason="Not enough storage on Windows runners, due to large Win OS footprint and less free mem")
+
+@pytest.mark.skipif(
+ WINDOWS, reason="Not enough storage on Windows runners, due to large Win OS footprint and less free mem"
+)
@pytest.mark.skipif(MACOS, reason="Problem on MacOs most probably similar to WINDOWS")
-@SKIP_CONDA_MARK # Conda CI runner doesn't have enough storage to perform these stress tests
+@SKIP_CONDA_MARK # Conda CI runner doesn't have enough storage to perform these stress tests
def test_mem_leak_querybuilder_standard(arctic_library_lmdb_100gb):
"""
This test uses old approach with iterations.
@@ -385,13 +380,15 @@ def test_mem_leak_querybuilder_standard(arctic_library_lmdb_100gb):
gc.collect()
def proc_to_examine():
- queries = [query_filter_then_groupby_with_aggregations(),
- query_filter_impossible_cond_groupby_with_aggregations_for_whole_frame(),
- query_no_filter_only_groupby_with_aggregations(),
- query_apply_clause_only(random_string(10)),
- query_resample_minutes(),
- query_row_range_57percent(size),
- query_date_range_57percent(start_date, end_date)]
+ queries = [
+ query_filter_then_groupby_with_aggregations(),
+ query_filter_impossible_cond_groupby_with_aggregations_for_whole_frame(),
+ query_no_filter_only_groupby_with_aggregations(),
+ query_apply_clause_only(random_string(10)),
+ query_resample_minutes(),
+ query_row_range_57percent(size),
+ query_date_range_57percent(start_date, end_date),
+ ]
for q in queries:
data: pd.DataFrame = lib.read(symbol, query_builder=q).data
print_info(data, q)
@@ -405,7 +402,7 @@ def proc_to_examine():
check_process_memory_leaks(proc_to_examine, 10, max_mem_bytes, 80.0)
-@SKIP_CONDA_MARK # Conda CI runner doesn't have enough storage to perform these stress tests
+@SKIP_CONDA_MARK # Conda CI runner doesn't have enough storage to perform these stress tests
def test_mem_leak_read_all_native_store(lmdb_version_store_very_big_map):
lib: NativeVersionStore = lmdb_version_store_very_big_map
@@ -428,20 +425,23 @@ def proc_to_examine():
check_process_memory_leaks(proc_to_examine, 20, max_mem_bytes, 80.0)
-#endregion
-#region TESTS pytest-memray type for memory limit and leaks
+# endregion
+
+# region TESTS pytest-memray type for memory limit and leaks
## NOTE: Currently tests can be executed on Python >= 3.8 only
@pytest.fixture
# NOTE: for now we run only V1 encoding as test is very slow
-def library_with_symbol(arctic_library_lmdb, only_test_encoding_version_v1) -> Generator[Tuple[Library, pd.DataFrame, str], None, None]:
+def library_with_symbol(
+ arctic_library_lmdb, only_test_encoding_version_v1
+) -> Generator[Tuple[Library, pd.DataFrame, str], None, None]:
"""
- As memray instruments memory, we need to take out
- everything not relevant from mem leak measurement out of
- test, and place it in setup of the test - in other words in the fixture
+ As memray instruments memory, we need to take out
+ everything not relevant from mem leak measurement out of
+ test, and place it in setup of the test - in other words in the fixture
Otherwise, memray instruments that code and it results in much slower execution
as well as mixing results of memory leaks - are they in what we test - for example
read() or in construct_df_querybuilder_tests() ? or in other code?
@@ -452,15 +452,18 @@ def library_with_symbol(arctic_library_lmdb, only_test_encoding_version_v1) -> G
lib.write(symbol, df)
yield (lib, df, symbol)
+
@pytest.fixture
# NOTE: for now we run only V1 encoding as test is very slow
-def library_with_tiny_symbol(arctic_library_lmdb, only_test_encoding_version_v1) -> Generator[Tuple[Library, pd.DataFrame, str], None, None]:
+def library_with_tiny_symbol(
+ arctic_library_lmdb, only_test_encoding_version_v1
+) -> Generator[Tuple[Library, pd.DataFrame, str], None, None]:
"""
- As memray instruments memory, we need to take out
- everything not relevant from mem leak measurement out of
- test, and place it in setup of the test - in other words in the fixture
+ As memray instruments memory, we need to take out
+ everything not relevant from mem leak measurement out of
+ test, and place it in setup of the test - in other words in the fixture
Otherwise, memray instruments that code and it results in much slower execution
- as well as mixing results of memory leaks
+ as well as mixing results of memory leaks
"""
lib: Library = arctic_library_lmdb
symbol = "test"
@@ -468,11 +471,12 @@ def library_with_tiny_symbol(arctic_library_lmdb, only_test_encoding_version_v1)
lib.write(symbol, df)
yield (lib, df, symbol)
-def mem_query(lib: Library, df: pd.DataFrame, num_repetitions:int=1, read_batch:bool=False):
+
+def mem_query(lib: Library, df: pd.DataFrame, num_repetitions: int = 1, read_batch: bool = False):
"""
This is the function where we test different types
of queries against a large dataframe. Later this
- function will be used for memory limit and memory leaks
+ function will be used for memory limit and memory leaks
tests
"""
size = df.shape[0]
@@ -484,46 +488,47 @@ def mem_query(lib: Library, df: pd.DataFrame, num_repetitions:int=1, read_batch:
del df
gc.collect()
- queries = [query_filter_then_groupby_with_aggregations(),
- query_no_filter_only_groupby_with_aggregations(),
- query_filter_impossible_cond_groupby_with_aggregations_for_whole_frame(),
- query_apply_clause_only(random_string(10)),
- query_resample_minutes(),
- query_row_range_57percent(size),
- query_date_range_57percent(start_date, end_date)]
-
+ queries = [
+ query_filter_then_groupby_with_aggregations(),
+ query_no_filter_only_groupby_with_aggregations(),
+ query_filter_impossible_cond_groupby_with_aggregations_for_whole_frame(),
+ query_apply_clause_only(random_string(10)),
+ query_resample_minutes(),
+ query_row_range_57percent(size),
+ query_date_range_57percent(start_date, end_date),
+ ]
+
for rep in range(num_repetitions):
- logger.info(f"REPETITION : {rep}")
- if (read_batch):
- logger.info("RUN read_batch() tests")
- read_requests = [ReadRequest(symbol=symbol,
- query_builder=query
- ) for query in queries]
- results_read = lib.read_batch(read_requests)
- cnt = 0
- for result in results_read:
- assert not result.data is None
- if (num_repetitions < 20):
- print_info(result.data, queries[cnt])
- cnt += 1
- del read_requests, results_read
- else:
- logger.info("RUN read() tests")
- for q in queries:
- data: pd.DataFrame = lib.read(symbol, query_builder=q).data
- lib.read_batch
- if (num_repetitions < 20):
- print_info(data, q)
- del data
- gc.collect()
+ logger.info(f"REPETITION : {rep}")
+ if read_batch:
+ logger.info("RUN read_batch() tests")
+ read_requests = [ReadRequest(symbol=symbol, query_builder=query) for query in queries]
+ results_read = lib.read_batch(read_requests)
+ cnt = 0
+ for result in results_read:
+ assert not result.data is None
+ if num_repetitions < 20:
+ print_info(result.data, queries[cnt])
+ cnt += 1
+ del read_requests, results_read
+ else:
+ logger.info("RUN read() tests")
+ for q in queries:
+ data: pd.DataFrame = lib.read(symbol, query_builder=q).data
+ lib.read_batch
+ if num_repetitions < 20:
+ print_info(data, q)
+ del data
+ gc.collect()
del lib, queries
gc.collect()
+
def test_mem_leak_queries_correctness_precheck(library_with_tiny_symbol):
"""
This test does precheck to confirm queries work more or less
- If it fails then perhaps there was a problem with
+ If it fails then perhaps there was a problem with
QueryBuilder functionality.
All checks are based on size of expected dataframe returned by
a queryno equality checks
@@ -539,37 +544,29 @@ def test_mem_leak_queries_correctness_precheck(library_with_tiny_symbol):
lib.write(symbol, df)
- data: pd.DataFrame = lib.read(symbol,
- query_builder=query_filter_impossible_cond_groupby_with_aggregations_for_whole_frame()
- ).data
- assert df.shape[0] == data.shape[0]
-
- data: pd.DataFrame = lib.read(symbol,
- query_builder=query_row_range_57percent(size)
- ).data
+ data: pd.DataFrame = lib.read(
+ symbol, query_builder=query_filter_impossible_cond_groupby_with_aggregations_for_whole_frame()
+ ).data
+ assert len(df.int16.unique()) == data.shape[0]
+
+ data: pd.DataFrame = lib.read(symbol, query_builder=query_row_range_57percent(size)).data
assert df.shape[0] < data.shape[0] * 2
-
- data: pd.DataFrame = lib.read(symbol,
- query_builder=query_date_range_57percent(start_date, end_date)
- ).data
+
+ data: pd.DataFrame = lib.read(symbol, query_builder=query_date_range_57percent(start_date, end_date)).data
assert df.shape[0] < data.shape[0] * 2
- data: pd.DataFrame = lib.read(symbol,
- query_builder=query_apply_clause_only(random_string(10))
- ).data
+ data: pd.DataFrame = lib.read(symbol, query_builder=query_apply_clause_only(random_string(10))).data
assert len(df.columns.to_list()) <= data.shape[0] * 2
assert 200 < data.shape[0]
- data: pd.DataFrame = lib.read(symbol,
- query_builder=query_no_filter_only_groupby_with_aggregations()
- ).data
+ data: pd.DataFrame = lib.read(symbol, query_builder=query_no_filter_only_groupby_with_aggregations()).data
# groupby column becomes index
assert sorted(list(df["uint8"].unique())) == sorted(list(data.index.unique()))
-if MEMRAY_SUPPORTED:
+if MEMRAY_SUPPORTED:
##
- ## PYTEST-MEMRAY integration is available only from ver 3.8 on
+ ## PYTEST-MEMRAY integration is available only from ver 3.8 on
##
from pytest_memray import Stack
@@ -596,16 +593,14 @@ def is_relevant(stack: Stack) -> bool:
pass
return True
-
@MEMRAY_TESTS_MARK
- @pytest.mark.limit_leaks(location_limit = "50 KB" if not MACOS else "60 KB",
- filter_fn = is_relevant)
+ @pytest.mark.limit_leaks(location_limit="50 KB" if not MACOS else "60 KB", filter_fn=is_relevant)
## Unfortunately it is not possible to xfail memray tests. Instead:
## - log issue for investigation and analysis = to fix leak, or filter out stack frame
## - increase meantime the 'location limit' so that test continue to run and reduce risk of larger mem leaks
## - leave some mark like bellow that code is subject to issue investigation with number of the issue for traceability
## - https://man312219.monday.com/boards/7852509418/pulses/8078461031
- #@pytest.mark.skip(reason = "read() memory leaks Monday#8078461031")
+ # @pytest.mark.skip(reason = "read() memory leaks Monday#8078461031")
def test_mem_leak_querybuilder_read_memray(library_with_symbol):
"""
Test to capture memory leaks >= of specified number
@@ -620,14 +615,13 @@ def test_mem_leak_querybuilder_read_memray(library_with_symbol):
@SLOW_TESTS_MARK
@MEMRAY_TESTS_MARK
- @pytest.mark.limit_leaks(location_limit = "160 KB" if not MACOS else "200 KB",
- filter_fn = is_relevant)
+ @pytest.mark.limit_leaks(location_limit="160 KB" if not MACOS else "200 KB", filter_fn=is_relevant)
## Unfortunately it is not possible to xfail memray tests. Instead:
## - log issue for investigation and analysis = to fix leak, or filter out stack frame
## - increase meantime the 'location limit' so that test continue to run and reduce risk of larger mem leaks
## - leave some mark like bellow that code is subject to issue investigation with number of the issue for traceability
## - https://man312219.monday.com/boards/7852509418/pulses/8067881190
- #@pytest.mark.skip(reason = "read() memory leaks Monday#8067881190")
+ # @pytest.mark.skip(reason = "read() memory leaks Monday#8067881190")
def test_mem_leak_querybuilder_read_manyrepeats_memray(library_with_tiny_symbol):
"""
Test to capture memory leaks >= of specified number
@@ -638,18 +632,17 @@ def test_mem_leak_querybuilder_read_manyrepeats_memray(library_with_tiny_symbol)
what we must exclude from calculation
"""
(lib, df, symbol) = library_with_tiny_symbol
- mem_query(lib, df, num_repetitions=125)
+ mem_query(lib, df, num_repetitions=125)
@SLOW_TESTS_MARK
@MEMRAY_TESTS_MARK
- @pytest.mark.limit_leaks(location_limit = "160 KB" if not MACOS else "200 KB",
- filter_fn = is_relevant)
+ @pytest.mark.limit_leaks(location_limit="160 KB" if not MACOS else "200 KB", filter_fn=is_relevant)
## Unfortunately it is not possible to xfail memray tests. Instead:
## - log issue for investigation and analysis = to fix leak, or filter out stack frame
## - increase meantime the 'location limit' so that test continue to run and reduce risk of larger mem leaks
## - leave some mark like bellow that code is subject to issue investigation with number of the issue for traceability
## - https://man312219.monday.com/boards/7852509418/pulses/8067881190
- #@pytest.mark.skip(reason = "read() memory leaks Monday#8067881190")
+ # @pytest.mark.skip(reason = "read() memory leaks Monday#8067881190")
def test_mem_leak_querybuilder_read_batch_manyrepeats_memray(library_with_tiny_symbol):
"""
Test to capture memory leaks >= of specified number
@@ -660,11 +653,10 @@ def test_mem_leak_querybuilder_read_batch_manyrepeats_memray(library_with_tiny_s
what we must exclude from calculation
"""
(lib, df, symbol) = library_with_tiny_symbol
- mem_query(lib, df, num_repetitions=125, read_batch=True)
-
+ mem_query(lib, df, num_repetitions=125, read_batch=True)
@MEMRAY_TESTS_MARK
- @pytest.mark.limit_leaks(location_limit = "25 KB", filter_fn = is_relevant)
+ @pytest.mark.limit_leaks(location_limit="25 KB", filter_fn=is_relevant)
def test_mem_leak_querybuilder_read_batch_memray(library_with_symbol):
"""
Test to capture memory leaks >= of specified number
@@ -677,7 +669,6 @@ def test_mem_leak_querybuilder_read_batch_memray(library_with_symbol):
(lib, df, symbol) = library_with_symbol
mem_query(lib, df, read_batch=True)
-
@MEMRAY_TESTS_MARK
@pytest.mark.limit_memory("490 MB")
def test_mem_limit_querybuilder_read_memray(library_with_symbol):
@@ -704,12 +695,11 @@ def test_mem_limit_querybuilder_read_batch_memray(library_with_symbol):
(lib, df, symbol) = library_with_symbol
mem_query(lib, df, True)
-
@pytest.fixture
- def library_with_big_symbol_(arctic_library_lmdb) -> Generator[Tuple[Library,str], None, None]:
+ def library_with_big_symbol_(arctic_library_lmdb) -> Generator[Tuple[Library, str], None, None]:
"""
- As memray instruments memory, we need to take out
- everything not relevant from mem leak measurement out of
+ As memray instruments memory, we need to take out
+ everything not relevant from mem leak measurement out of
test, so it works as less as possible
"""
lib: Library = arctic_library_lmdb
@@ -719,9 +709,8 @@ def library_with_big_symbol_(arctic_library_lmdb) -> Generator[Tuple[Library,str
del df
yield (lib, symbol)
-
@MEMRAY_TESTS_MARK
- @pytest.mark.limit_leaks(location_limit = "30 KB", filter_fn = is_relevant)
+ @pytest.mark.limit_leaks(location_limit="30 KB", filter_fn=is_relevant)
def test_mem_leak_read_all_arctic_lib_memray(library_with_big_symbol_):
"""
This is a new version of the initial test that reads the whole
@@ -731,8 +720,8 @@ def test_mem_leak_read_all_arctic_lib_memray(library_with_big_symbol_):
(lib, symbol) = library_with_big_symbol_
logger.info("Test starting")
st = time.time()
- data : pd.DataFrame = lib.read(symbol).data
+ data: pd.DataFrame = lib.read(symbol).data
del data
logger.info(f"Test took : {time.time() - st}")
- gc.collect()
\ No newline at end of file
+ gc.collect()
diff --git a/python/tests/stress/arcticdb/version_store/test_stress_finalize_staged_data.py b/python/tests/stress/arcticdb/version_store/test_stress_finalize_staged_data.py
index 93210ce039..3e3737f8d4 100644
--- a/python/tests/stress/arcticdb/version_store/test_stress_finalize_staged_data.py
+++ b/python/tests/stress/arcticdb/version_store/test_stress_finalize_staged_data.py
@@ -25,11 +25,14 @@
# Uncomment for logging
# set_log_level(default_level="DEBUG", console_output=False, file_output_path="/tmp/arcticdb.log")
-def generate_chunk_sizes(number_chunks:np.uint32, min_rows:np.uint32=100, max_rows:np.uint32=10000) -> List[np.uint32]:
+
+def generate_chunk_sizes(
+ number_chunks: np.uint32, min_rows: np.uint32 = 100, max_rows: np.uint32 = 10000
+) -> List[np.uint32]:
return np.random.randint(min_rows, max_rows, number_chunks, dtype=np.uint32)
-class Results:
+class Results:
def __init__(self):
self.options = None
self.iteration = None
@@ -37,56 +40,57 @@ def __init__(self):
self.total_rows_finalized = 0
self.finalization_time = None
- def __str__(self):
- return f"Options: {self.options}\nIteration: {self.iteration}\n# staged chunks: {self.number_staged_chunks}\ntotal rows finalized: {self.total_rows_finalized}\ntime for finalization (s): {self.finalization_time}"
-
-
+ def __str__(self):
+ return f"Options: {self.options}\nIteration: {self.iteration}\n# staged chunks: {self.number_staged_chunks}\ntotal rows finalized: {self.total_rows_finalized}\ntime for finalization (s): {self.finalization_time}"
-def finalize_monotonic_unique_chunks(ac_library, iterations):
+@SLOW_TESTS_MARK
+@SKIP_CONDA_MARK # Conda CI runner doesn't have enough storage to perform these stress tests
+@pytest.mark.skipif(sys.platform == "win32", reason="Not enough storage on Windows runners")
+def test_finalize_monotonic_unique_chunks(arctic_library_lmdb):
"""
- The test is designed to staged thousands of chunks with variable chunk size.
- To experiment on local computer you can move up to 20k number of chunks approx 10k each
+ The test is designed to staged thousands of chunks with variable chunk size.
+ To experiment on local computer you can move up to 20k number of chunks approx 10k each
- For stress testing this number is reduced due to github runner HDD size - 16 GB only
+ For stress testing this number is reduced due to github runner HDD size - 16 GB only
- On local disk you must use "arctic_library_lmdb" fixture as it sets 100 GB limit.
- If you use "basic_arctic_library" you might end with much more space taken eating all your space
- if you want to experiment with more number of chunks
+ On local disk you must use "arctic_library_lmdb" fixture as it sets 100 GB limit.
+ If you use "basic_arctic_library" you might end with much more space taken eating all your space
+ if you want to experiment with more number of chunks
"""
options = [
- {"chunks_descending" : True, "finalization_mode" : StagedDataFinalizeMethod.APPEND},
- {"chunks_descending" : True, "finalization_mode" : StagedDataFinalizeMethod.WRITE},
- {"chunks_descending" : False, "finalization_mode" : StagedDataFinalizeMethod.WRITE},
- {"chunks_descending" : False, "finalization_mode" : StagedDataFinalizeMethod.APPEND},
- ]
+ {"chunks_descending": True, "finalization_mode": StagedDataFinalizeMethod.APPEND},
+ {"chunks_descending": True, "finalization_mode": StagedDataFinalizeMethod.WRITE},
+ {"chunks_descending": False, "finalization_mode": StagedDataFinalizeMethod.WRITE},
+ {"chunks_descending": False, "finalization_mode": StagedDataFinalizeMethod.APPEND},
+ ]
# Will hold the results after each iteration (instance of Results class)
results = []
- lib : Library = ac_library
+ lib: Library = arctic_library_lmdb
total_start_time = time.time()
# We would need to generate as fast as possible kind of random
- # dataframes. To do that we build a large cache and will
+ # dataframes. To do that we build a large cache and will
# sample rows from there as we need to run as fast as we can
cachedDF = CachedDFGenerator(250000)
total_number_rows_all_iterations: int = 0
# This will serve us as a counter and at the same time it provides unique index for each row
- total_number_rows: TimestampNumber = TimestampNumber(0, cachedDF.TIME_UNIT) # Synchronize index frequency
- INITIAL_TIMESTAMP: TimestampNumber = TimestampNumber(0, cachedDF.TIME_UNIT) # Synchronize index frequency
- symbol="staged"
+ total_number_rows: TimestampNumber = TimestampNumber(0, cachedDF.TIME_UNIT) # Synchronize index frequency
+ INITIAL_TIMESTAMP: TimestampNumber = TimestampNumber(0, cachedDF.TIME_UNIT) # Synchronize index frequency
+ symbol = "staged"
num_rows_initially = 99999
print(f"Writing to symbol initially {num_rows_initially} rows")
df = cachedDF.generate_dataframe_timestamp_indexed(num_rows_initially, total_number_rows, cachedDF.TIME_UNIT)
cnt = 0
- for iter in iterations :
+ for iter in [500, 1000, 1500, 2000]:
res = Results()
total_number_rows = INITIAL_TIMESTAMP + num_rows_initially
@@ -98,12 +102,11 @@ def finalize_monotonic_unique_chunks(ac_library, iterations):
print(f"Chunks to stage {len(chunk_list)} ")
stage_chunks(lib, symbol, cachedDF, total_number_rows, chunk_list, options[cnt % 4]["chunks_descending"])
- if (options[cnt % 4]["finalization_mode"] == StagedDataFinalizeMethod.APPEND):
+ if options[cnt % 4]["finalization_mode"] == StagedDataFinalizeMethod.APPEND:
total_number_rows = total_number_rows + sum(chunk_list)
else:
total_number_rows = INITIAL_TIMESTAMP + sum(chunk_list)
-
print("--" * 50)
print(f"STAGED ROWS {total_number_rows.get_value()} after iteration {cnt}")
print(f"SYMBOL ACTUAL ROWS before finalization - {lib._nvs.get_num_rows(symbol)} ")
@@ -128,7 +131,7 @@ def finalize_monotonic_unique_chunks(ac_library, iterations):
results.append(res)
- total_number_rows.to_zero() # next iteration start from 0
+ total_number_rows.to_zero() # next iteration start from 0
for res in results:
print("_" * 100)
@@ -136,20 +139,3 @@ def finalize_monotonic_unique_chunks(ac_library, iterations):
total_time = time.time() - total_start_time
print("TOTAL TIME: ", total_time)
-
-
-@SLOW_TESTS_MARK
-@SKIP_CONDA_MARK # Conda CI runner doesn't have enough storage to perform these stress tests
-@pytest.mark.skipif(sys.platform == "win32", reason="Not enough storage on Windows runners")
-def test_finalize_monotonic_unique_chunks_lmdb(lmdb_library):
- finalize_monotonic_unique_chunks(lmdb_library, [500, 1000, 1500, 2000])
-
-
-@SLOW_TESTS_MARK
-@SKIP_CONDA_MARK # Conda CI runner doesn't have enough storage to perform these stress tests
-@pytest.mark.skipif(sys.platform == "win32", reason="Not enough storage on Windows runners")
-@REAL_S3_TESTS_MARK
-def test_finalize_monotonic_unique_chunks_realS3(real_s3_library):
- finalize_monotonic_unique_chunks(real_s3_library, [50, 100, 150, 200])
-
-
diff --git a/python/tests/unit/arcticdb/version_store/test_append.py b/python/tests/unit/arcticdb/version_store/test_append.py
index 1ab81dd4c4..955f6fb550 100644
--- a/python/tests/unit/arcticdb/version_store/test_append.py
+++ b/python/tests/unit/arcticdb/version_store/test_append.py
@@ -170,17 +170,6 @@ def test_append_snapshot_delete(lmdb_version_store):
assert_frame_equal(lmdb_version_store.read(symbol, as_of="my_snap").data, df1)
-def _random_integers(size, dtype):
- # We do not generate integers outside the int64 range
- platform_int_info = np.iinfo("int_")
- iinfo = np.iinfo(dtype)
- return np.random.randint(
- max(iinfo.min, platform_int_info.min),
- min(iinfo.max, platform_int_info.max),
- size=size,
- ).astype(dtype)
-
-
def test_append_out_of_order_throws(lmdb_version_store):
lib: NativeVersionStore = lmdb_version_store
lib.write("a", pd.DataFrame({"c": [1, 2, 3]}, index=pd.date_range(0, periods=3)))
@@ -197,8 +186,8 @@ def test_append_out_of_order_and_sort(lmdb_version_store_ignore_order, prune_pre
dtidx = pd.date_range("1970-01-01", periods=num_rows)
test = pd.DataFrame(
{
- "uint8": _random_integers(num_rows, np.uint8),
- "uint32": _random_integers(num_rows, np.uint32),
+ "uint8": random_integers(num_rows, np.uint8),
+ "uint32": random_integers(num_rows, np.uint32),
},
index=dtidx,
)
@@ -268,8 +257,8 @@ def test_upsert_with_delete(lmdb_version_store_big_map):
dtidx = pd.date_range("1970-01-01", periods=num_rows)
test = pd.DataFrame(
{
- "uint8": _random_integers(num_rows, np.uint8),
- "uint32": _random_integers(num_rows, np.uint32),
+ "uint8": random_integers(num_rows, np.uint8),
+ "uint32": random_integers(num_rows, np.uint32),
},
index=dtidx,
)
diff --git a/python/tests/unit/arcticdb/version_store/test_filtering.py b/python/tests/unit/arcticdb/version_store/test_filtering.py
index 9afa82dc87..dde7f2cd02 100644
--- a/python/tests/unit/arcticdb/version_store/test_filtering.py
+++ b/python/tests/unit/arcticdb/version_store/test_filtering.py
@@ -32,7 +32,7 @@
generic_filter_test_strings,
generic_filter_test_nans,
)
-from arcticdb.util._versions import IS_PANDAS_TWO, PANDAS_VERSION
+from arcticdb.util._versions import IS_PANDAS_TWO, PANDAS_VERSION, IS_NUMPY_TWO
pytestmark = pytest.mark.pipeline
@@ -1185,7 +1185,7 @@ def test_filter_column_not_present_dynamic(lmdb_version_store_dynamic_schema_v1)
lib.write(symbol, df)
vit = lib.read(symbol, query_builder=q)
- if IS_PANDAS_TWO and sys.platform.startswith("win32"):
+ if (not IS_NUMPY_TWO) and (IS_PANDAS_TWO and sys.platform.startswith("win32")):
# Pandas 2.0.0 changed the behavior of Index creation from numpy arrays:
# "Previously, all indexes created from numpy numeric arrays were forced to 64-bit.
# Now, for example, Index(np.array([1, 2, 3])) will be int32 on 32-bit systems,
diff --git a/python/tests/util/mark.py b/python/tests/util/mark.py
index ffd1d96841..6e72d99788 100644
--- a/python/tests/util/mark.py
+++ b/python/tests/util/mark.py
@@ -5,6 +5,7 @@
As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0.
"""
+
import os
import sys
import pytest
@@ -73,8 +74,10 @@
)
VENV_COMPAT_TESTS_MARK = pytest.mark.skipif(
- MACOS_CONDA_BUILD,
- reason="Skipping compatibility tests because macOS conda builds don't have an available PyPi arcticdb version"
+ MACOS_CONDA_BUILD
+ or sys.version.startswith("3.12")
+ or sys.version.startswith("3.13"), # Waiting for https://github.com/man-group/ArcticDB/issues/2008
+ reason="Skipping compatibility tests because macOS conda builds don't have an available PyPi arcticdb version",
)
diff --git a/python/tests/util/storage_test.py b/python/tests/util/storage_test.py
index b50a122a1f..4cc7f00f36 100644
--- a/python/tests/util/storage_test.py
+++ b/python/tests/util/storage_test.py
@@ -137,7 +137,7 @@ def verify_library(ac):
def is_strategy_branch_valid_format(input_string):
- pattern = r"^(linux|windows)_cp3(6|7|8|9|10|11).*$"
+ pattern = r"^(linux|windows)_cp3(6|7|8|9|10|11|12|13).*$"
match = re.match(pattern, input_string)
return bool(match)
diff --git a/setup.cfg b/setup.cfg
index 413292bc3f..85f5b6e0b1 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -34,13 +34,12 @@ install_requires =
# conda-forge's package and feedstock browser.
#
# See: https://conda-forge.org/feedstock-outputs/
- numpy <2 # To guard against numpy v2 when it gets released https://pythonspeed.com/articles/numpy-2/
+ numpy
pandas
attrs
dataclasses ; python_version < '3.7'
protobuf >=3.5.0.post1 # Per https://github.com/grpc/grpc/blob/v1.45.3/requirements.txt
msgpack >=0.5.0 # msgpack 0.5.0 is required for strict_types argument, needed for correct pickling fallback
-
pyyaml
packaging