diff --git a/.github/actions/set_persistent_storage_env_vars/action.yml b/.github/actions/set_persistent_storage_env_vars/action.yml index 1c7c5e5cf3..48f93799a3 100644 --- a/.github/actions/set_persistent_storage_env_vars/action.yml +++ b/.github/actions/set_persistent_storage_env_vars/action.yml @@ -2,7 +2,7 @@ name: 'Set Persistent storages env variables' description: 'Set the necessary variables for Persistent storage tests' inputs: bucket: {default: 'arcticdb-ci-test-bucket-02', type: string, description: The name of the S3 bucket that we will test against} - endpoint: {default: 'https://s3.eu-west-1.amazonaws.com', type: string, description: The address of the S3 endpoint} + endpoint: {default: 'http://s3.eu-west-1.amazonaws.com', type: string, description: The address of the S3 endpoint} region: {default: 'eu-west-1', type: string, description: The S3 region of the bucket} aws_access_key: {required: true, type: string, description: The value for the AWS Access key} aws_secret_key: {required: true, type: string, description: The value for the AWS Secret key} diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 29a79511b8..0043661b54 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -36,7 +36,7 @@ jobs: uses: ./.github/workflows/cibw_docker_image.yml permissions: {packages: write} with: - cibuildwheel_ver: "2.12.1" + cibuildwheel_ver: "2.21.3" force_update: false common_config: @@ -163,24 +163,23 @@ jobs: strategy: fail-fast: false matrix: - python3: ${{fromJson(vars.LINUX_PYTHON_VERSIONS || '[6, 7, 8, 9, 10, 11]')}} + python3: ${{fromJson(vars.LINUX_PYTHON_VERSIONS || '[7, 8, 9, 10, 11, 12]')}} include: - python_deps_ids: [""] matrix_override: ${{fromJson(needs.common_config.outputs.linux_matrix)}} pytest_xdist_mode: "--dist worksteal" - - python3: 6 - python_deps_ids: ["", -compat36] - matrix_override: - - ${{fromJson(needs.common_config.outputs.linux_matrix)[0]}} - - python_deps_id: -compat36 - python_deps: requirements-compatibility-py36.txt - pytest_xdist_mode: "" # worksteal Not supported on Python 3.6 - python3: 8 python_deps_ids: ["", -compat38] matrix_override: - ${{fromJson(needs.common_config.outputs.linux_matrix)[0]}} - python_deps_id: -compat38 python_deps: requirements-compatibility-py38.txt + - python3: 11 + python_deps_ids: ["", -compat311] + matrix_override: + - ${{fromJson(needs.common_config.outputs.linux_matrix)[0]}} + - python_deps_id: -compat311 + python_deps: requirements-compatibility-py311.txt name: 3.${{matrix.python3}} Linux uses: ./.github/workflows/build_steps.yml secrets: inherit @@ -212,7 +211,7 @@ jobs: strategy: fail-fast: false matrix: - python3: ${{fromJson(vars.WINDOWS_PYTHON_VERSIONS || '[7, 8, 9, 10, 11]')}} + python3: ${{fromJson(vars.WINDOWS_PYTHON_VERSIONS || '[7, 8, 9, 10, 11, 12]')}} name: 3.${{matrix.python3}} Windows uses: ./.github/workflows/build_steps.yml secrets: inherit diff --git a/.github/workflows/persistent_storage.yml b/.github/workflows/persistent_storage.yml index 1aed985597..0355307216 100644 --- a/.github/workflows/persistent_storage.yml +++ b/.github/workflows/persistent_storage.yml @@ -40,6 +40,12 @@ jobs: - name: Checkout uses: actions/checkout@v3.3.0 + # The latest version of arcticdb will no longer publish wheels for Python 3.6 + # So we skip the seed if we are testing the latest version with Python 3.6 + - name: Skip if unsupported + if: inputs.arcticdb_version == 'latest' && inputs.python3 == '6' + run: exit 0 + - name: Select Python (Linux) if: matrix.os == 'linux' run: echo /opt/python/${{env.python_impl_name}}*/bin >> $GITHUB_PATH diff --git a/.gitignore b/.gitignore index 0d6fc84887..1fb834155b 100644 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,7 @@ __pycache__/ .mypy_cache .hypothesis/ *.egg-info/ +python/venvs/ .vscode/ .vs/ diff --git a/build_tooling/requirements-compatibility-py311.txt b/build_tooling/requirements-compatibility-py311.txt new file mode 100644 index 0000000000..96867ac468 --- /dev/null +++ b/build_tooling/requirements-compatibility-py311.txt @@ -0,0 +1,2 @@ +# Makes sure we are able to use Numpy 1 +numpy<2 diff --git a/build_tooling/requirements-compatibility-py36.txt b/build_tooling/requirements-compatibility-py36.txt deleted file mode 100644 index fa4d6564d7..0000000000 --- a/build_tooling/requirements-compatibility-py36.txt +++ /dev/null @@ -1,7 +0,0 @@ -# The oldest tested deps, for Python 3.6 -pandas~=0.22 -numpy~=1.14 -protobuf~=3.8 -attrs~=19.2 -msgpack~=0.5 -prometheus_client~=0.14 diff --git a/build_tooling/transform_asv_results.py b/build_tooling/transform_asv_results.py index a464f846e4..bcfe977d13 100644 --- a/build_tooling/transform_asv_results.py +++ b/build_tooling/transform_asv_results.py @@ -37,9 +37,7 @@ def df_to_asv_json(results_df: pd.DataFrame): 1 basic_functions.BasicFunctions.time_read_batch [[0.2775141046000044, 0.597266279600126, 0.379... ... {'': 515.5997927188873, ' pd.DataFrame: results_list = [] for test_name, test_results in data["results"].items(): - flattened_data = pd.json_normalize( - {"test_name": test_name, "results": str(test_results)} - ) + flattened_data = pd.json_normalize({"test_name": test_name, "results": str(test_results)}) flattened_data["commit_hash"] = data["commit_hash"] flattened_data["env_name"] = data["env_name"] flattened_data["date"] = data["date"] diff --git a/cpp/CMake/PythonUtils.cmake b/cpp/CMake/PythonUtils.cmake index c472d468da..0def4b5ba1 100644 --- a/cpp/CMake/PythonUtils.cmake +++ b/cpp/CMake/PythonUtils.cmake @@ -1,4 +1,3 @@ - # Helpers function(_python_utils_dump_vars_targets _target _props) if(TARGET ${_target}) @@ -102,7 +101,7 @@ endfunction() # Enhanced FindPython if(DEFINED ARCTICDB_FIND_PYTHON_DEV_MODE) - message("Using supplied ARCTICDB_FIND_PYTHON_DEV_MODE=${ARCTICDB_FIND_PYTHON_DEV_MODE}.") + message(STATUS "Using supplied ARCTICDB_FIND_PYTHON_DEV_MODE=${ARCTICDB_FIND_PYTHON_DEV_MODE}.") if("${ARCTICDB_FIND_PYTHON_DEV_MODE}" STREQUAL "pybind11") set(ARCTICDB_FIND_PYTHON_DEV_MODE "") endif() @@ -125,6 +124,7 @@ if(ARCTICDB_FIND_PYTHON_DEV_MODE) if(NOT Python_EXECUTABLE AND NOT Python_ROOT_DIR AND NOT Python_LIBRARY) # FindPython searches the PATH environment last, but that's arguably the only correct place it should look find_program(Python_EXECUTABLE NAMES python3 python NAMES_PER_DIR REQUIRED NO_CMAKE_SYSTEM_PATH) + set(PYTHON_EXECUTABLE ${Python_EXECUTABLE} CACHE FILEPATH "Python executable found by FindPython") else() set(Python_FIND_STRATEGY LOCATION) endif() @@ -132,6 +132,8 @@ if(ARCTICDB_FIND_PYTHON_DEV_MODE) # Let CMake find Python without telling it the BUILD_PYTHON_VERSION we wanted. This way we know third-party stuff that # is not aware of BUILD_PYTHON_VERSION is going to find the same thing find_package(Python 3 COMPONENTS Interpreter ${ARCTICDB_FIND_PYTHON_DEV_MODE} REQUIRED) + set(PYTHON_INCLUDE_DIRS ${Python_INCLUDE_DIRS}) + python_utils_dump_vars_if_enabled("After our FindPython before any third-party:") if(DEFINED Python_FIND_ABI) @@ -146,7 +148,7 @@ if(ARCTICDB_FIND_PYTHON_DEV_MODE) MAP_IMPORTED_CONFIG_RELWITHDEBINFO ";RELEASE" ) endif() - + set(PYBIND11_FINDPYTHON OFF) else() set(ARCTICDB_PYTHON_PREFIX PYTHON) python_utils_check_include_dirs("supplied") @@ -158,3 +160,4 @@ else() set(PYBIND11_FINDPYTHON OFF) endif() + diff --git a/cpp/arcticdb/CMakeLists.txt b/cpp/arcticdb/CMakeLists.txt index 9b88cf8acf..967fc08129 100644 --- a/cpp/arcticdb/CMakeLists.txt +++ b/cpp/arcticdb/CMakeLists.txt @@ -62,7 +62,7 @@ else() add_compile_definitions(_LIBCPP_DISABLE_AVAILABILITY) SET(HIDE_LINKED_SYMBOLS OFF) - + find_package(pybind11 REQUIRED) find_package(PCRE REQUIRED) find_package(Libevent REQUIRED) @@ -381,6 +381,7 @@ set(arcticdb_srcs util/lazy.hpp util/type_traits.hpp util/variant.hpp + util/gil_safe_py_none.hpp version/de_dup_map.hpp version/op_log.hpp version/schema_checks.hpp @@ -515,6 +516,7 @@ set(arcticdb_srcs util/type_handler.cpp version/key_block.hpp version/key_block.cpp + util/gil_safe_py_none.cpp version/local_versioned_engine.cpp version/op_log.cpp version/snapshot.cpp diff --git a/cpp/arcticdb/entity/native_tensor.hpp b/cpp/arcticdb/entity/native_tensor.hpp index 74cc9f99c6..4a47bb40fe 100644 --- a/cpp/arcticdb/entity/native_tensor.hpp +++ b/cpp/arcticdb/entity/native_tensor.hpp @@ -119,14 +119,16 @@ struct NativeTensor { [[nodiscard]] auto expanded_dim() const { return expanded_dim_; } template const T *ptr_cast(size_t pos) const { + util::check(ptr != nullptr, "Unexpected null ptr in NativeTensor"); const bool dimension_condition = ndim() == 1; const bool elsize_condition = elsize_ != 0; - const bool strides_condition = strides_[0] % elsize_ == 0; + const bool strides_condition = (elsize_condition) && (strides_[0] % elsize_ == 0); util::warn(dimension_condition, "Cannot safely ptr_cast matrices in NativeTensor"); util::warn(elsize_condition, "Cannot safely ptr_cast when elsize_ is zero in NativeTensor"); util::warn(strides_condition, "Cannot safely ptr_cast to type of size {} when strides ({}) is not a multiple of elsize ({}) in NativeTensor with dtype {}", sizeof(T), strides_[0], elsize_, data_type()); + int64_t signed_pos = pos; if (dimension_condition && elsize_condition && strides_condition) { signed_pos *= strides_[0] / elsize_; diff --git a/cpp/arcticdb/pipeline/frame_utils.hpp b/cpp/arcticdb/pipeline/frame_utils.hpp index 125156dab0..cc7151ad44 100644 --- a/cpp/arcticdb/pipeline/frame_utils.hpp +++ b/cpp/arcticdb/pipeline/frame_utils.hpp @@ -20,9 +20,10 @@ #include #include #include -#include #include #include +#include +#include namespace arcticdb { @@ -135,7 +136,7 @@ std::optional aggregator_set_data( if (!c_style) ptr_data = flatten_tensor(flattened_buffer, rows_to_write, tensor, slice_num, regular_slice_size); - auto none = py::none{}; + auto none = GilSafePyNone::instance(); std::variant wrapper_or_error; // GIL will be acquired if there is a string that is not pure ASCII/UTF-8 // In this case a PyObject will be allocated by convert::py_unicode_to_buffer @@ -147,7 +148,7 @@ std::optional aggregator_set_data( auto out_ptr = reinterpret_cast(column.buffer().data()); auto& string_pool = agg.segment().string_pool(); for (size_t s = 0; s < rows_to_write; ++s, ++ptr_data) { - if (*ptr_data == none.ptr()) { + if (*ptr_data == none->ptr()) { *out_ptr++ = not_a_string(); } else if(is_py_nan(*ptr_data)){ *out_ptr++ = nan_placeholder(); diff --git a/cpp/arcticdb/python/python_handlers.cpp b/cpp/arcticdb/python/python_handlers.cpp index f6e754c42a..319ff6153d 100644 --- a/cpp/arcticdb/python/python_handlers.cpp +++ b/cpp/arcticdb/python/python_handlers.cpp @@ -25,19 +25,18 @@ namespace arcticdb { static inline PyObject** fill_with_none(PyObject** ptr_dest, size_t count, SpinLock& spin_lock) { - auto none = py::none(); + auto none = GilSafePyNone::instance(); for(auto i = 0U; i < count; ++i) - *ptr_dest++ = none.ptr(); + *ptr_dest++ = none->ptr(); spin_lock.lock(); for(auto i = 0U; i < count; ++i) - none.inc_ref(); + Py_INCREF(none->ptr()); spin_lock.unlock(); return ptr_dest; } static inline PyObject** fill_with_none(ChunkedBuffer& buffer, size_t offset, size_t count, SpinLock& spin_lock) { - auto none = py::none(); auto dest = buffer.ptr_cast(offset, count * sizeof(PyObject*)); return fill_with_none(dest, count, spin_lock); } diff --git a/cpp/arcticdb/python/python_module.cpp b/cpp/arcticdb/python/python_module.cpp index 474e74bdeb..ad4b05330e 100644 --- a/cpp/arcticdb/python/python_module.cpp +++ b/cpp/arcticdb/python/python_module.cpp @@ -30,6 +30,7 @@ #include #include #include +#include #include #include @@ -213,26 +214,26 @@ void register_error_code_ecosystem(py::module& m, py::exceptionptr(); } increment_none_refcount(diff, none); diff --git a/cpp/arcticdb/python/python_strings.hpp b/cpp/arcticdb/python/python_strings.hpp index 43c92316a7..d80f4c745b 100644 --- a/cpp/arcticdb/python/python_strings.hpp +++ b/cpp/arcticdb/python/python_strings.hpp @@ -16,6 +16,7 @@ #include #include #include +#include namespace arcticdb { @@ -77,12 +78,6 @@ class DynamicStringReducer { Py_INCREF(obj); } - [[nodiscard]] std::unique_ptr get_py_none() { - std::lock_guard lock(handler_data_.spin_lock()); - auto none = std::make_unique(py::none{}); - return none; - } - auto get_unique_counts( const Column& column ) { @@ -103,7 +98,7 @@ class DynamicStringReducer { std::pair write_strings_to_destination( size_t num_rows, const Column& source_column, - std::unique_ptr& none, + std::shared_ptr& none, const ankerl::unordered_dense::map py_strings, const std::optional& sparse_map) { std::pair counts; @@ -129,7 +124,7 @@ class DynamicStringReducer { auto py_strings = assign_python_strings(unique_counts, has_type_conversion, string_pool); ARCTICDB_SUBSAMPLE(WriteStringsToColumn, 0) - auto none = get_py_none(); + auto none = GilSafePyNone::instance(); auto [none_count, nan_count] = write_strings_to_destination(num_rows, source_column, none, py_strings, sparse_map); increment_none_refcount(none_count, none); increment_nan_refcount(nan_count); @@ -182,7 +177,7 @@ class DynamicStringReducer { } } } - auto none = get_py_none(); + auto none = GilSafePyNone::instance(); auto [none_count, nan_count] = write_strings_to_destination(num_rows, source_column, none, allocated, source_column.opt_sparse_map()); increment_none_refcount(none_count, none); increment_nan_refcount(nan_count); @@ -211,27 +206,24 @@ class DynamicStringReducer { return py_strings; } - void increment_none_refcount(size_t none_count, py::none& none) { + void increment_none_refcount(size_t none_count, std::shared_ptr& none) { + util::check(none, "Got null pointer to py::none in increment_none_refcount"); std::lock_guard lock(handler_data_.spin_lock()); - for(auto i = 0u; i < none_count; ++i) - Py_INCREF(none.ptr()); - } - - void increment_none_refcount(size_t none_count, std::unique_ptr& none) { - util::check(static_cast(none), "Got null pointer to py::none in increment_none_refcount"); - increment_none_refcount(none_count, *none); + for(auto i = 0u; i < none_count; ++i) { + Py_INCREF(none->ptr()); + } } void increment_nan_refcount(size_t none_count) { std::lock_guard lock(handler_data_.spin_lock()); for(auto i = 0u; i < none_count; ++i) - handler_data_.py_nan_->inc_ref(); + Py_INCREF(handler_data_.py_nan_->ptr()); } std::pair write_strings_to_column_dense( size_t , const Column& source_column, - const std::unique_ptr& none, + const std::shared_ptr& none, const ankerl::unordered_dense::map& py_strings) { auto data = source_column.data(); auto src = data.cbegin>, IteratorType::REGULAR, IteratorDensity::DENSE>(); @@ -256,7 +248,7 @@ class DynamicStringReducer { std::pair write_strings_to_column_sparse( size_t num_rows, const Column& source_column, - const std::unique_ptr& none, + const std::shared_ptr& none, const ankerl::unordered_dense::map& py_strings, const util::BitSet& sparse_map ) { diff --git a/cpp/arcticdb/python/python_to_tensor_frame.cpp b/cpp/arcticdb/python/python_to_tensor_frame.cpp index 105b1ad93d..f1f9fb386e 100644 --- a/cpp/arcticdb/python/python_to_tensor_frame.cpp +++ b/cpp/arcticdb/python/python_to_tensor_frame.cpp @@ -56,6 +56,19 @@ std::variant pystring_to_buffer(PyObject * return {ValueType::BYTES, 8, 1}; } +// Parse an array descriptor into type and elsize +static std::tuple parse_array_descriptor(PyObject* obj) { + if (pybind11::detail::npy_api::get().PyArray_RUNTIME_VERSION_ < 0x12) { + ARCTICDB_DEBUG(log::version(), "Using numpy 1 API to get array descriptor"); + auto descr = pybind11::detail::array_descriptor1_proxy(obj); + return {descr->kind, descr->elsize}; + } else { + ARCTICDB_DEBUG(log::version(), "Using numpy 2 API to get array descriptor"); + auto descr = pybind11::detail::array_descriptor2_proxy(obj); + return {descr->kind, descr->elsize}; + } +} + /// @brief Determine the type for column composed of arrays /// In case column is composed of arrays all arrays must have the same element type. This iterates until if finds the /// first non-empty array and returns its type. @@ -71,12 +84,11 @@ std::variant pystring_to_buffer(PyObject * } const auto arr = pybind11::detail::array_proxy(*begin); normalization::check(arr->nd == 1, "Only one dimensional arrays are supported in columns."); - const auto descr = pybind11::detail::array_descriptor_proxy(arr->descr); + const ssize_t element_count = arr->dimensions[0]; if(element_count != 0) { - ValueType value_type = get_value_type(descr->kind); - const uint8_t value_bytes = static_cast(descr->elsize); - return {value_type, value_bytes, 2}; + const auto [kind, val_bytes] = parse_array_descriptor(arr->descr); + return {get_value_type(kind), static_cast(val_bytes), 2}; } begin++; } @@ -115,14 +127,14 @@ NativeTensor obj_to_tensor(PyObject *ptr, bool empty_types) { auto& api = pybind11::detail::npy_api::get(); util::check(api.PyArray_Check_(ptr), "Expected Python array"); const auto arr = pybind11::detail::array_proxy(ptr); - const auto descr = pybind11::detail::array_descriptor_proxy(arr->descr); + const auto[kind, elsize] = parse_array_descriptor(arr->descr); auto ndim = arr->nd; const ssize_t size = ndim == 1 ? arr->dimensions[0] : arr->dimensions[0] * arr->dimensions[1]; // In Pandas < 2, empty series dtype is `"float"`, but as of Pandas 2.0, empty series dtype is `"object"` // The Normalizer in Python cast empty `"float"` series to `"object"` so `EMPTY` is used here. // See: https://github.com/man-group/ArcticDB/pull/1049 - auto val_type = size == 0 && empty_types ? ValueType::EMPTY : get_value_type(descr->kind); - auto val_bytes = static_cast(descr->elsize); + auto val_type = size == 0 && empty_types ? ValueType::EMPTY : get_value_type(kind); + auto val_bytes = static_cast(elsize); const int64_t element_count = ndim == 1 ? int64_t(arr->dimensions[0]) : int64_t(arr->dimensions[0]) * int64_t(arr->dimensions[1]); const auto c_style = arr->strides[0] == val_bytes; @@ -173,7 +185,7 @@ NativeTensor obj_to_tensor(PyObject *ptr, bool empty_types) { if(current_object != end) sample = *current_object; } - if (empty && descr->kind == 'O') { + if (empty && kind == 'O') { val_type = empty_types ? ValueType::EMPTY : ValueType::UTF_DYNAMIC; } else if(all_nans || is_unicode(sample)){ val_type = ValueType::UTF_DYNAMIC; @@ -194,11 +206,11 @@ NativeTensor obj_to_tensor(PyObject *ptr, bool empty_types) { // and we can't use `val_bytes` to get this information since some dtype have another `elsize` than 8. const SizeBits size_bits = is_empty_type(val_type) ? SizeBits::S64 : get_size_bits(val_bytes); const auto dt = combine_data_type(val_type, size_bits); - const int64_t nbytes = element_count * descr->elsize; + const int64_t nbytes = element_count * elsize; const void* data = nbytes ? arr->data : nullptr; const std::array strides = {arr->strides[0], arr->nd > 1 ? arr->strides[1] : 0}; const std::array shapes = {arr->dimensions[0], arr->nd > 1 ? arr->dimensions[1] : 0}; - return {nbytes, arr->nd, strides.data(), shapes.data(), dt, descr->elsize, data, ndim}; + return {nbytes, arr->nd, strides.data(), shapes.data(), dt, elsize, data, ndim}; } std::shared_ptr py_ndf_to_frame( diff --git a/cpp/arcticdb/python/python_utils.hpp b/cpp/arcticdb/python/python_utils.hpp index 4c4ea638f0..f24228b1cb 100644 --- a/cpp/arcticdb/python/python_utils.hpp +++ b/cpp/arcticdb/python/python_utils.hpp @@ -16,6 +16,7 @@ #include #include #include +#include namespace py = pybind11; @@ -104,14 +105,14 @@ inline void prefill_with_none( SpinLock& spin_lock, IncrementRefCount inc_ref_count = IncrementRefCount::ON) { std::lock_guard lock(spin_lock); - auto none = py::none(); + auto none = GilSafePyNone::instance(); for (auto i = 0U; i < num_rows; ++i) - *ptr_dest++ = none.ptr(); + *ptr_dest++ = none->ptr(); if(inc_ref_count == IncrementRefCount::ON) { auto none_count = num_rows - sparse_count; for (auto j = 0U; j < none_count; ++j) - none.inc_ref(); + Py_INCREF(none->ptr()); } spin_lock.unlock(); } @@ -155,8 +156,13 @@ PyClass & add_repr(PyClass & py_class){ } inline py::object &pd_Timestamp() { - static py::object T = py::module::import("pandas").attr("Timestamp"); - return T; + PYBIND11_CONSTINIT static py::gil_safe_call_once_and_store storage; + auto &imported_obj = storage // Do NOT make this `static`! + .call_once_and_store_result([]() { + return py::module_::import("pandas").attr("Timestamp"); + }) + .get_stored(); + return imported_obj; } inline bool from_pd_timestamp(const py::object &o, timestamp &ts) { @@ -169,8 +175,13 @@ inline bool from_pd_timestamp(const py::object &o, timestamp &ts) { } inline py::object &dt_datetime() { - static py::object T = py::module::import("datetime").attr("datetime"); - return T; + PYBIND11_CONSTINIT static py::gil_safe_call_once_and_store storage; + auto &imported_obj = storage // Do NOT make this `static`! + .call_once_and_store_result([]() { + return py::module_::import("datetime").attr("datetime"); + }) + .get_stored(); + return imported_obj; } inline bool from_datetime(const py::object &o, timestamp &ts) { @@ -182,8 +193,13 @@ inline bool from_datetime(const py::object &o, timestamp &ts) { } inline py::object &np_datetime64() { - static py::object T = py::module::import("numpy").attr("datetime64"); - return T; + PYBIND11_CONSTINIT static py::gil_safe_call_once_and_store storage; + auto &imported_obj = storage // Do NOT make this `static`! + .call_once_and_store_result([]() { + return py::module_::import("numpy").attr("datetime64"); + }) + .get_stored(); + return imported_obj; } inline bool from_dt64(const py::object &o, timestamp &ts) { @@ -271,8 +287,13 @@ inline std::vector named_aggregators_from_dict(const std::unord } inline auto pd_to_offset(std::string_view rule) { - static py::object to_offset = py::module::import("pandas").attr("tseries").attr("frequencies").attr("to_offset"); - return to_offset(rule).attr("nanos").cast(); + PYBIND11_CONSTINIT static py::gil_safe_call_once_and_store storage; + auto &imported_obj = storage // Do NOT make this `static`! + .call_once_and_store_result([]() { + return py::module_::import("pandas").attr("tseries").attr("frequencies").attr("to_offset"); + }) + .get_stored(); + return imported_obj(rule).attr("nanos").cast(); } } // namespace arcticdb::python_util diff --git a/cpp/arcticdb/util/gil_safe_py_none.cpp b/cpp/arcticdb/util/gil_safe_py_none.cpp new file mode 100644 index 0000000000..aad75ce518 --- /dev/null +++ b/cpp/arcticdb/util/gil_safe_py_none.cpp @@ -0,0 +1,26 @@ +/* Copyright 2023 Man Group Operations Limited + * + * Use of this software is governed by the Business Source License 1.1 included in the file licenses/BSL.txt. + * + * As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0. + */ + + +#include +#include + +using namespace arcticdb; + +std::shared_ptr GilSafePyNone::instance_; +std::once_flag GilSafePyNone::init_flag_; + + +void GilSafePyNone::init(){ + pybind11::gil_scoped_acquire gil_lock; + instance_ = std::make_shared(); +}; + +std::shared_ptr GilSafePyNone::instance(){ + std::call_once(init_flag_, &init); + return instance_; +}; \ No newline at end of file diff --git a/cpp/arcticdb/util/gil_safe_py_none.hpp b/cpp/arcticdb/util/gil_safe_py_none.hpp new file mode 100644 index 0000000000..f66485e2a4 --- /dev/null +++ b/cpp/arcticdb/util/gil_safe_py_none.hpp @@ -0,0 +1,31 @@ +/* Copyright 2023 Man Group Operations Limited + * + * Use of this software is governed by the Business Source License 1.1 included in the file licenses/BSL.txt. + * + * As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0. + */ + + +/* The class is introduced purely to selectively bypass the GIL check introduced in pybind11 v2.11.0. + * None in python is a global static. Therefore GIL is not needed. + * For other object, it's fine as long as GIL is held during allocation and deallocation, e.g. nan + * To bypass the check, we could define PYBIND11_NO_ASSERT_GIL_HELD_INCREF_DECREF to globally disable the check + */ + +#pragma once + +#include +#include +#include + +namespace arcticdb { +class GilSafePyNone { +private: + static std::shared_ptr instance_; + static std::once_flag init_flag_; + static void init(); +public: + static std::shared_ptr instance(); +}; + +} //namespace arcticdb \ No newline at end of file diff --git a/cpp/third_party/pybind11 b/cpp/third_party/pybind11 index 80dc998efc..a2e59f0e70 160000 --- a/cpp/third_party/pybind11 +++ b/cpp/third_party/pybind11 @@ -1 +1 @@ -Subproject commit 80dc998efced8ceb2be59756668a7e90e8bef917 +Subproject commit a2e59f0e7065404b44dfe92a28aca47ba1378dc4 diff --git a/environment_unix.yml b/environment_unix.yml index 92535b81f3..3026955524 100644 --- a/environment_unix.yml +++ b/environment_unix.yml @@ -23,9 +23,7 @@ dependencies: # TODO: Fix builds for missing symbols. - libmongocxx <3.9 - zstd - # TODO: pybind 2.11.X became stricter regarding the handling of reference counts - # See: https://github.com/pybind/pybind11/issues/4748#issuecomment-1639445403 - - pybind11 <2.11 + - pybind11 - pcre - cyrus-sasl - aws-sdk-cpp >=1.11.405 @@ -56,9 +54,10 @@ dependencies: # Build dependencies for tests - libarrow # Python dependences - - python + # Python 3.13+ is not yet supported + - python <3.13 - packaging - - numpy <2 + - numpy - pandas - attrs - boto3 diff --git a/pyproject.toml b/pyproject.toml index 4a00f801c4..94d7e5c162 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,6 +25,9 @@ build = "cp*-win_amd64" before-all = "bash {project}/build_tooling/prep_cpp_build.sh" before-build = "set" +[tool.ruff] +line-length = 120 + [tool.black] line-length = 120 target_version = ['py36', 'py37', 'py38', 'py39', 'py310', 'py311'] @@ -43,3 +46,8 @@ exclude = ''' | static )/ ''' + +[tool.pytest.ini_options] +markers = [ + "pipeline", +] \ No newline at end of file diff --git a/python/arcticdb/util/_versions.py b/python/arcticdb/util/_versions.py index fcfcd09fbf..571f17532e 100644 --- a/python/arcticdb/util/_versions.py +++ b/python/arcticdb/util/_versions.py @@ -5,10 +5,15 @@ As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0. """ + import pandas as pd +import numpy as np from packaging import version PANDAS_VERSION = version.parse(pd.__version__) CHECK_FREQ_VERSION = version.Version("1.1") IS_PANDAS_ZERO = PANDAS_VERSION < version.Version("1.0") IS_PANDAS_TWO = PANDAS_VERSION >= version.Version("2.0") +IS_NUMPY_TWO = (version.parse(np.__version__) >= version.Version("2.0")) and ( + version.parse(np.__version__) < version.Version("3.0") +) diff --git a/python/arcticdb/util/test.py b/python/arcticdb/util/test.py index fdc922665c..202571d4d6 100644 --- a/python/arcticdb/util/test.py +++ b/python/arcticdb/util/test.py @@ -352,8 +352,11 @@ def random_integers(size, dtype): platform_int_info = np.iinfo("int_") iinfo = np.iinfo(dtype) return np.random.randint( - max(iinfo.min, platform_int_info.min), min(iinfo.max, platform_int_info.max), size=size - ).astype(dtype) + max(iinfo.min, platform_int_info.min), + min(iinfo.max, platform_int_info.max), + size=size, + dtype=dtype + ) def get_wide_dataframe(size=10000, seed=0): diff --git a/python/arcticdb/version_store/_normalization.py b/python/arcticdb/version_store/_normalization.py index f3a5688990..c33c24f62c 100644 --- a/python/arcticdb/version_store/_normalization.py +++ b/python/arcticdb/version_store/_normalization.py @@ -5,6 +5,7 @@ As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0. """ + import copy import datetime from datetime import timedelta @@ -21,7 +22,12 @@ from arcticc.pb2.descriptors_pb2 import UserDefinedMetadata, NormalizationMetadata, MsgPackSerialization from arcticc.pb2.storage_pb2 import VersionStoreConfig from collections import Counter -from arcticdb.exceptions import ArcticNativeException, ArcticDbNotYetImplemented, NormalizationException, SortingException +from arcticdb.exceptions import ( + ArcticNativeException, + ArcticDbNotYetImplemented, + NormalizationException, + SortingException, +) from arcticdb.supported_types import DateRangeInput, time_types as supported_time_types from arcticdb.util._versions import IS_PANDAS_TWO, IS_PANDAS_ZERO from arcticdb.version_store.read_result import ReadResult @@ -237,12 +243,16 @@ def _to_primitive(arr, arr_name, dynamic_strings, string_max_len=None, coerce_co if isinstance(sample, Timestamp): # If we have pd.Timestamp as the sample, then: # - 1: check they all have the same timezone - tz_matches = np.vectorize(lambda element: pd.isna(element) or (isinstance(element, pd.Timestamp) and element.tz == sample.tz)) + tz_matches = np.vectorize( + lambda element: pd.isna(element) or (isinstance(element, pd.Timestamp) and element.tz == sample.tz) + ) if not (tz_matches(arr)).all(): - raise NormalizationException(f"Failed to normalize column {arr_name}: first non-null element found is a " - f"Timestamp with timezone '{sample.tz}', but one or more subsequent elements " - f"are either not Timestamps or have differing timezones, neither of which is " - f"supported.") + raise NormalizationException( + f"Failed to normalize column {arr_name}: first non-null element found is a " + f"Timestamp with timezone '{sample.tz}', but one or more subsequent elements " + f"are either not Timestamps or have differing timezones, neither of which is " + f"supported." + ) # - 2: try and clean up all NaNs inside it. log.debug("Removing all NaNs from column: {} of type datetime64", arr_name) return arr.astype(DTN64_DTYPE) @@ -261,14 +271,16 @@ def _to_primitive(arr, arr_name, dynamic_strings, string_max_len=None, coerce_co f"Do you have mixed dtypes in your column?" ) - # Pick any unwanted data conversions (e.g. np.NaN to 'nan') or None to the string 'None' + # Pick any unwanted data conversions (e.g. np.nan to 'nan') or None to the string 'None' if np.array_equal(arr, casted_arr): return casted_arr else: if None in arr: + # in >Numpy2, checks like arr is None are not support due to being ambiguous + # so we have to use arr == None raise ArcticDbNotYetImplemented( "You have a None object in the numpy array at positions={} Column type={} for column={} " - "which cannot be normalized.".format(np.where(arr is None)[0], arr.dtype, arr_name) + "which cannot be normalized.".format(np.where(arr == None), arr.dtype, arr_name) ) else: raise ArcticDbNotYetImplemented( @@ -301,7 +313,9 @@ def _from_tz_timestamp(ts, tz): _range_index_props_are_public = hasattr(RangeIndex, "start") -def _normalize_single_index(index, index_names, index_norm, dynamic_strings=None, string_max_len=None, empty_types=False): +def _normalize_single_index( + index, index_names, index_norm, dynamic_strings=None, string_max_len=None, empty_types=False +): # index: pd.Index or np.ndarray -> np.ndarray index_tz = None is_empty = len(index) == 0 @@ -569,7 +583,9 @@ def _index_to_records(self, df, pd_norm, dynamic_strings, string_max_len, empty_ is_categorical = len(df.select_dtypes(include="category").columns) > 0 index = DatetimeIndex([]) if IS_PANDAS_TWO and empty_df and not is_categorical else index - return _normalize_single_index(index, list(index.names), index_norm, dynamic_strings, string_max_len, empty_types=empty_types) + return _normalize_single_index( + index, list(index.names), index_norm, dynamic_strings, string_max_len, empty_types=empty_types + ) def _index_from_records(self, item, norm_meta): # type: (NormalizationMetadata.Pandas, _SUPPORTED_NATIVE_RETURN_TYPES, Bool)->Union[Index, DatetimeIndex, MultiIndex] @@ -599,13 +615,15 @@ class SeriesNormalizer(_PandasNormalizer): def __init__(self): self._df_norm = DataFrameNormalizer() - def normalize(self, item, string_max_len=None, dynamic_strings=False, coerce_columns=None, empty_types=False, **kwargs): + def normalize( + self, item, string_max_len=None, dynamic_strings=False, coerce_columns=None, empty_types=False, **kwargs + ): df, norm = self._df_norm.normalize( item.to_frame(), dynamic_strings=dynamic_strings, string_max_len=string_max_len, coerce_columns=coerce_columns, - empty_types=empty_types + empty_types=empty_types, ) norm.series.CopyFrom(norm.df) if item.name is not None: @@ -708,7 +726,11 @@ def gen_blocks(): # TODO: Remove the casting after empty types become the only option # Issue: https://github.com/man-group/ArcticDB/issues/1562 block = make_block(values=a.reshape((1, _len)), placement=(column_placement_in_block,)) - yield block.astype(np.float64) if _len == 0 and block.dtype == np.dtype('object') and not IS_PANDAS_TWO else block + yield ( + block.astype(np.float64) + if _len == 0 and block.dtype == np.dtype("object") and not IS_PANDAS_TWO + else block + ) column_placement_in_block += 1 if cols is None or len(cols) == 0: @@ -759,14 +781,16 @@ def denormalize(self, item, norm_meta): # We cast it back to "float" so that it matches Pandas 1.0 default for empty series. # Moreover, we explicitly provide the index otherwise Pandas 0.X overrides it for a RangeIndex empty_columns_names = ( - [] if data is None else - [ - name for name, np_array in data.items() + [] + if data is None + else [ + name + for name, np_array in data.items() if np_array.dtype in OBJECT_TOKENS and len(df[name]) == 0 ] ) for column_name in empty_columns_names: - df[column_name] = pd.Series([], index=index, dtype='float64') + df[column_name] = pd.Series([], index=index, dtype="float64") else: if index is not None: df = self.df_without_consolidation(columns, index, item, n_indexes, data) @@ -866,7 +890,9 @@ def _denormalize_multi_index(df: pd.DataFrame, norm_meta: NormalizationMetadata. return df - def normalize(self, item, string_max_len=None, dynamic_strings=False, coerce_columns=None, empty_types=False, **kwargs): + def normalize( + self, item, string_max_len=None, dynamic_strings=False, coerce_columns=None, empty_types=False, **kwargs + ): # type: (DataFrame, Optional[int])->NormalizedInput norm_meta = NormalizationMetadata() norm_meta.df.common.mark = True @@ -933,6 +959,7 @@ class MsgPackNormalizer(Normalizer): """ Fall back plan for the time being to store arbitrary data """ + def __init__(self, cfg=None): self.strict_mode = cfg.strict_mode if cfg is not None else False @@ -1038,7 +1065,9 @@ def write(obj): class TimeFrameNormalizer(Normalizer): - def normalize(self, item, string_max_len=None, dynamic_strings=False, coerce_columns=None, empty_types=False, **kwargs): + def normalize( + self, item, string_max_len=None, dynamic_strings=False, coerce_columns=None, empty_types=False, **kwargs + ): norm_meta = NormalizationMetadata() norm_meta.ts.mark = True index_norm = norm_meta.ts.common.index @@ -1116,7 +1145,9 @@ def __init__(self, fallback_normalizer=None, use_norm_failure_handler_known_type self.msg_pack_denorm = MsgPackNormalizer() # must exist for deserialization self.fallback_normalizer = fallback_normalizer - def _normalize(self, item, string_max_len=None, dynamic_strings=False, coerce_columns=None, empty_types=False, **kwargs): + def _normalize( + self, item, string_max_len=None, dynamic_strings=False, coerce_columns=None, empty_types=False, **kwargs + ): normalizer = self.get_normalizer_for_type(item) if not normalizer: @@ -1166,7 +1197,14 @@ def get_normalizer_for_type(self, item): return None def normalize( - self, item, string_max_len=None, pickle_on_failure=False, dynamic_strings=False, coerce_columns=None, empty_types=False, **kwargs + self, + item, + string_max_len=None, + pickle_on_failure=False, + dynamic_strings=False, + coerce_columns=None, + empty_types=False, + **kwargs, ): """ :param item: Item to be normalized to something Arctic Native understands. @@ -1217,11 +1255,11 @@ def denormalize(self, item, norm_meta): normalize = _NORMALIZER.normalize denormalize = _NORMALIZER.denormalize -_MAX_USER_DEFINED_META = 16 << 20 # 16MB -_WARN_USER_DEFINED_META = 8 << 20 # 8MB +_MAX_USER_DEFINED_META = 16 << 20 # 16MB +_WARN_USER_DEFINED_META = 8 << 20 # 8MB -_MAX_RECURSIVE_METASTRUCT = 16 << 20 # 16MB -_WARN_RECURSIVE_METASTRUCT = 8 << 20 # 8MB +_MAX_RECURSIVE_METASTRUCT = 16 << 20 # 16MB +_WARN_RECURSIVE_METASTRUCT = 8 << 20 # 8MB def _init_msgpack_metadata(): @@ -1247,9 +1285,11 @@ def normalize_metadata(metadata: Any) -> UserDefinedMetadata: packed = _msgpack_metadata._msgpack_packb(metadata) size = len(packed) if size > _MAX_USER_DEFINED_META: - raise ArcticDbNotYetImplemented(f'User defined metadata cannot exceed {_MAX_USER_DEFINED_META}B') + raise ArcticDbNotYetImplemented(f"User defined metadata cannot exceed {_MAX_USER_DEFINED_META}B") if size > _WARN_USER_DEFINED_META: - log.warn(f'User defined metadata is above warning size ({_WARN_USER_DEFINED_META}B), metadata cannot exceed {_MAX_USER_DEFINED_META}B. Current size: {size}B.') + log.warn( + f"User defined metadata is above warning size ({_WARN_USER_DEFINED_META}B), metadata cannot exceed {_MAX_USER_DEFINED_META}B. Current size: {size}B." + ) udm = UserDefinedMetadata() udm.inline_payload = packed @@ -1261,9 +1301,13 @@ def normalize_recursive_metastruct(metastruct: Dict[Any, Any]) -> UserDefinedMet packed = _msgpack_metadata._msgpack_packb(metastruct) size = len(packed) if size > _MAX_RECURSIVE_METASTRUCT: - raise ArcticDbNotYetImplemented(f'Recursively normalized data normalization metadata cannot exceed {_MAX_RECURSIVE_METASTRUCT}B') + raise ArcticDbNotYetImplemented( + f"Recursively normalized data normalization metadata cannot exceed {_MAX_RECURSIVE_METASTRUCT}B" + ) if size > _WARN_RECURSIVE_METASTRUCT: - log.warn(f'Recursively normalized data normalization metadata is above warning size ({_WARN_RECURSIVE_METASTRUCT}B), cannot exceed {_MAX_RECURSIVE_METASTRUCT}B. Current size: {size}B.') + log.warn( + f"Recursively normalized data normalization metadata is above warning size ({_WARN_RECURSIVE_METASTRUCT}B), cannot exceed {_MAX_RECURSIVE_METASTRUCT}B. Current size: {size}B." + ) udm = UserDefinedMetadata() udm.inline_payload = packed @@ -1289,6 +1333,7 @@ def denormalize_dataframe(ret): return DataFrameNormalizer().denormalize(frame_data, read_result.norm.df) + def normalize_dataframe(df, **kwargs): return DataFrameNormalizer().normalize(df, **kwargs) @@ -1324,8 +1369,7 @@ def _strip_tz(s, e): if not getattr(data, "timezone", None): start, end = _strip_tz(start, end) data = data[ - start.to_pydatetime(warn=False) - - timedelta(microseconds=1) : end.to_pydatetime(warn=False) + start.to_pydatetime(warn=False) - timedelta(microseconds=1) : end.to_pydatetime(warn=False) + timedelta(microseconds=1) ] return data diff --git a/python/arcticdb/version_store/_store.py b/python/arcticdb/version_store/_store.py index b34912c040..4a45c24fb7 100644 --- a/python/arcticdb/version_store/_store.py +++ b/python/arcticdb/version_store/_store.py @@ -5,11 +5,13 @@ As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0. """ + import copy import datetime import os import sys import pandas as pd +import numpy as np import pytz import re import itertools @@ -69,8 +71,9 @@ _from_tz_timestamp, restrict_data_to_date_range_only, normalize_dt_range_to_ts, - _denormalize_single_index + _denormalize_single_index, ) + TimeSeriesType = Union[pd.DataFrame, pd.Series] from arcticdb.util._versions import PANDAS_VERSION from packaging.version import Version @@ -216,9 +219,10 @@ class NativeVersionStore: """ _warned_about_list_version_latest_only_and_snapshot: bool = False - norm_failure_options_msg_write = \ - "Setting the pickle_on_failure parameter to True will allow the object to be written. However, many " \ + norm_failure_options_msg_write = ( + "Setting the pickle_on_failure parameter to True will allow the object to be written. However, many " "operations (such as date_range filtering and column selection) will not work on pickled data." + ) norm_failure_options_msg_append = "Data must be normalizable to be appended to existing data." norm_failure_options_msg_update = "Data must be normalizable to be used to update existing data." @@ -256,12 +260,10 @@ def _initialize(self, library, env, lib_cfg, custom_normalizer, open_mode, nativ self._open_mode = open_mode self._native_cfg = native_cfg - @classmethod def create_store_from_lib_config(cls, lib_cfg, env, open_mode=OpenMode.DELETE): lib = cls.create_lib_from_lib_config(lib_cfg, env, open_mode) return cls(library=lib, lib_cfg=lib_cfg, env=env, open_mode=open_mode) - @staticmethod def create_library_config(cfg, env, lib_name, encoding_version=EncodingVersion.V1): @@ -276,7 +278,9 @@ def create_store_from_config( cls, cfg, env, lib_name, open_mode=OpenMode.DELETE, encoding_version=EncodingVersion.V1 ): protobuf_cfg, native_cfg = NativeVersionStore.get_environment_cfg_and_native_cfg_from_tuple(cfg) - lib_cfg = NativeVersionStore.create_library_config(protobuf_cfg, env, lib_name, encoding_version=encoding_version) + lib_cfg = NativeVersionStore.create_library_config( + protobuf_cfg, env, lib_name, encoding_version=encoding_version + ) lib = cls.create_lib_from_config((protobuf_cfg, native_cfg), env, lib_cfg.lib_desc.name, open_mode) return cls(library=lib, lib_cfg=lib_cfg, env=env, open_mode=open_mode, native_cfg=native_cfg) @@ -294,14 +298,12 @@ def create_lib_from_config(cfg, env, lib_name, open_mode=OpenMode.DELETE): lib_idx = _LibraryIndex.create_from_resolver(env, cfg_resolver) return lib_idx.get_library(lib_name, _OpenMode(open_mode), native_cfg) - @staticmethod def get_environment_cfg_and_native_cfg_from_tuple(cfgs): if isinstance(cfgs, tuple): return cfgs else: return cfgs, None - def __setstate__(self, state): lib_cfg = LibraryConfig() @@ -348,15 +350,15 @@ def get_backing_store(self): return backing_store def _try_normalize( - self, - symbol, - dataframe, - metadata, - pickle_on_failure, - dynamic_strings, - coerce_columns, - norm_failure_options_msg="", - **kwargs + self, + symbol, + dataframe, + metadata, + pickle_on_failure, + dynamic_strings, + coerce_columns, + norm_failure_options_msg="", + **kwargs, ): dynamic_schema = self.resolve_defaults( "dynamic_schema", self._lib_cfg.lib_desc.version.write_options, False, **kwargs @@ -382,7 +384,9 @@ def _try_normalize( ) except ArcticDbNotYetImplemented as ex: log.debug(f"ArcticDbNotYetImplemented: data: \n{dataframe}, metadata: {metadata}") - raise ArcticDbNotYetImplemented(f"Not supported: normalizing, symbol: {symbol}, Reason: {ex}, {norm_failure_options_msg}") + raise ArcticDbNotYetImplemented( + f"Not supported: normalizing, symbol: {symbol}, Reason: {ex}, {norm_failure_options_msg}" + ) except Exception as ex: log.debug(f"ArcticNativeException: data: \n{dataframe}, metadata: {metadata}") raise ArcticNativeException(f"Error while normalizing symbol={symbol}, {ex}") @@ -476,13 +480,14 @@ def resolve_defaults(param_name, proto_cfg, global_default, existing_value=None, return global_default def stage( - self, - symbol: str, - data: Any, - validate_index: bool = False, - sort_on_index: bool = False, - sort_columns: List[str] = None, - **kwargs): + self, + symbol: str, + data: Any, + validate_index: bool = False, + sort_on_index: bool = False, + sort_columns: List[str] = None, + **kwargs, + ): norm_failure_options_msg = kwargs.get("norm_failure_options_msg", self.norm_failure_options_msg_write) _handle_categorical_columns(symbol, data, True) udm, item, norm_meta = self._try_normalize( @@ -829,9 +834,7 @@ def update( update_query = _PythonVersionStoreUpdateQuery() dynamic_strings = self._resolve_dynamic_strings(kwargs) proto_cfg = self._lib_cfg.lib_desc.version.write_options - dynamic_schema = self.resolve_defaults( - "dynamic_schema", proto_cfg, False, **kwargs - ) + dynamic_schema = self.resolve_defaults("dynamic_schema", proto_cfg, False, **kwargs) coerce_columns = kwargs.get("coerce_columns", None) prune_previous_version = self.resolve_defaults( @@ -1069,7 +1072,7 @@ def batch_read_metadata( Reads the metadata for multiple symbols in a batch fashion. This is more efficient than making multiple `read_metadata` calls in succession as some constant-time operations can be executed only once rather than once for each element of `symbols`. - + If a `symbol` or its `as_of` in the query does not exist then the symbol will not be present in the resulting dict. Consider using `Library#read_metadata_batch` instead, which has improved error handling behaviour. @@ -1125,7 +1128,7 @@ def _batch_read_metadata_to_versioned_items(self, symbols, as_ofs, include_error version=vitem.version, metadata=meta, host=self.env, - timestamp=vitem.timestamp + timestamp=vitem.timestamp, ) ) return meta_items @@ -1181,7 +1184,7 @@ def batch_read_metadata_multi( version=vitem.version, metadata=meta, host=self.env, - timestamp=vitem.timestamp + timestamp=vitem.timestamp, ) return results_dict @@ -1195,7 +1198,7 @@ def _convert_thin_cxx_item_to_python(self, cxx_versioned_item, metadata) -> Vers version=cxx_versioned_item.version, metadata=metadata, host=self.env, - timestamp=cxx_versioned_item.timestamp + timestamp=cxx_versioned_item.timestamp, ) def batch_write( @@ -1530,7 +1533,7 @@ def batch_restore_version( version=result.version.version, metadata=meta, host=self.env, - timestamp=result.version.timestamp + timestamp=result.version.timestamp, ) for result, meta in zip(read_results, metadatas) ] @@ -1686,9 +1689,9 @@ def _postprocess_df_with_only_rowcount_idx(self, read_result, row_range): stop = index_meta.start + read_result.frame_data.row_count * step index = pd.RangeIndex(start=index_meta.start, stop=stop, step=step) if row_range: - index=index[row_range[0]:row_range[1]] + index = index[row_range[0] : row_range[1]] elif PANDAS_VERSION < Version("2.0.0"): - index = pd.RangeIndex(start=0, stop=0,step=1) + index = pd.RangeIndex(start=0, stop=0, step=1) else: index = pd.DatetimeIndex([]) meta = denormalize_user_metadata(read_result.udm, self._normalizer) @@ -1699,11 +1702,16 @@ def _postprocess_df_with_only_rowcount_idx(self, read_result, row_range): version=read_result.version.version, metadata=meta, host=self.env, - timestamp=read_result.version.timestamp + timestamp=read_result.version.timestamp, ) def _resolve_empty_columns(self, columns, implement_read_index): - if not implement_read_index and columns == []: + is_columns_empty = ( + columns is None + or (isinstance(columns, list) and len(columns) == 0) + or (isinstance(columns, np.ndarray) and columns.size == 0) + ) + if not implement_read_index and is_columns_empty: columns = None return columns @@ -1841,15 +1849,17 @@ def _read_dataframe(self, symbol, version_query, read_query, read_options): def _post_process_dataframe(self, read_result, read_query, implement_read_index=False, head=None, tail=None): index_type = read_result.norm.df.common.WhichOneof("index_type") - index_is_rowcount = (index_type == "index" and - not read_result.norm.df.common.index.is_physically_stored and - len(read_result.frame_data.index_columns) == 0) + index_is_rowcount = ( + index_type == "index" + and not read_result.norm.df.common.index.is_physically_stored + and len(read_result.frame_data.index_columns) == 0 + ) if implement_read_index and read_query.columns == [] and index_is_rowcount: row_range = None if head: - row_range=(0, head) + row_range = (0, head) elif tail: - row_range=(-tail, None) + row_range = (-tail, None) elif read_query.row_filter is not None: row_range = self._compute_filter_start_end_row(read_result, read_query) return self._postprocess_df_with_only_rowcount_idx(read_result, row_range) @@ -1878,7 +1888,7 @@ def _post_process_dataframe(self, read_result, read_query, implement_read_index= version=vitem.version, metadata=vitem.metadata, host=vitem.host, - timestamp=vitem.timestamp + timestamp=vitem.timestamp, ) return vitem @@ -1955,7 +1965,7 @@ def restore_version(self, symbol: str, as_of: Optional[VersionQueryInput] = None version=read_result.version.version, metadata=meta, host=self.env, - timestamp=read_result.version.timestamp + timestamp=read_result.version.timestamp, ) def list_symbols_with_incomplete_data(self) -> List[str]: @@ -1992,7 +2002,7 @@ def compact_incomplete( metadata: Optional[Any] = None, prune_previous_version: Optional[bool] = None, validate_index: bool = False, - delete_staged_data_on_failure: bool = False + delete_staged_data_on_failure: bool = False, ) -> VersionedItem: """ Compact previously written un-indexed chunks of data, produced by a tick collector or parallel @@ -2022,11 +2032,11 @@ def compact_incomplete( update operations. This requires that the indexes of the incomplete segments are non-overlapping with each other, and, in the case of append=True, fall after the last index value in the previous version. delete_staged_data_on_failure : bool, default=False - Determines the handling of staged data when an exception occurs during the execution of the + Determines the handling of staged data when an exception occurs during the execution of the ``compact_incomplete`` function. - If set to True, all staged data for the specified symbol will be deleted if an exception occurs. - - If set to False, the staged data will be retained and will be used in subsequent calls to + - If set to False, the staged data will be retained and will be used in subsequent calls to ``compact_incomplete``. To manually delete staged data, use the ``remove_incomplete`` function. @@ -2040,7 +2050,15 @@ def compact_incomplete( ) udm = normalize_metadata(metadata) vit = self.version_store.compact_incomplete( - symbol, append, convert_int_to_float, via_iteration, sparsify, udm, prune_previous_version, validate_index, delete_staged_data_on_failure + symbol, + append, + convert_int_to_float, + via_iteration, + sparsify, + udm, + prune_previous_version, + validate_index, + delete_staged_data_on_failure, ) return self._convert_thin_cxx_item_to_python(vit, metadata) @@ -2077,7 +2095,7 @@ def _adapt_read_res(self, read_result: ReadResult) -> VersionedItem: version=read_result.version.version, metadata=meta, host=self.env, - timestamp=read_result.version.timestamp + timestamp=read_result.version.timestamp, ) def list_versions( @@ -2129,7 +2147,9 @@ def list_versions( List of dictionaries describing the discovered versions in the library. """ if iterate_on_failure: - log.warning("The iterate_on_failure argument is deprecated and will soon be removed. It's safe to remove since it doesn't change behavior.") + log.warning( + "The iterate_on_failure argument is deprecated and will soon be removed. It's safe to remove since it doesn't change behavior." + ) if latest_only and snapshot and not NativeVersionStore._warned_about_list_version_latest_only_and_snapshot: log.warning("latest_only has no effect when snapshot is specified") @@ -2246,7 +2266,7 @@ def snapshot( and versions written to the library post snapshot creation. ``NoSuchVersionException`` will be thrown if no symbol exist in the library - + Parameters ---------- snap_name : `str` @@ -2342,9 +2362,7 @@ def delete(self, symbol: str, date_range: Optional[DateRangeInput] = None, **kwa """ if date_range is not None: proto_cfg = self._lib_cfg.lib_desc.version.write_options - dynamic_schema = self.resolve_defaults( - "dynamic_schema", proto_cfg, False, **kwargs - ) + dynamic_schema = self.resolve_defaults("dynamic_schema", proto_cfg, False, **kwargs) # All other methods use prune_previous_version, but also support prune_previous_versions here in case # anyone is relying on it prune_previous_versions = _assume_false("prune_previous_versions", kwargs) @@ -2441,12 +2459,11 @@ def has_symbol( True if the symbol exists as_of the specified revision, False otherwise. """ if iterate_on_failure: - log.warning("The iterate_on_failure argument is deprecated and will soon be removed. It's safe to remove since it doesn't change behavior.") + log.warning( + "The iterate_on_failure argument is deprecated and will soon be removed. It's safe to remove since it doesn't change behavior." + ) - return ( - self._find_version(symbol, as_of=as_of, raise_on_missing=False) - is not None - ) + return self._find_version(symbol, as_of=as_of, raise_on_missing=False) is not None def column_names(self, symbol: str, as_of: Optional[VersionQueryInput] = None) -> List[str]: """ @@ -2533,7 +2550,7 @@ def read_metadata(self, symbol: str, as_of: Optional[VersionQueryInput] = None, version=version_item.version, metadata=meta, host=self.env, - timestamp=version_item.timestamp + timestamp=version_item.timestamp, ) def get_type(self) -> str: @@ -2711,11 +2728,11 @@ def open_mode(self): return self._open_mode def _process_info( - self, - symbol: str, - dit, - as_of: VersionQueryInput, - date_range_ns_precision: bool, + self, + symbol: str, + dit, + as_of: VersionQueryInput, + date_range_ns_precision: bool, ) -> Dict[str, Any]: timeseries_descriptor = dit.timeseries_descriptor columns = [f.name for f in timeseries_descriptor.fields] @@ -2764,7 +2781,9 @@ def _process_info( if timeseries_descriptor.normalization.df.has_synthetic_columns: columns = pd.RangeIndex(0, len(columns)) - date_range = self._get_time_range_from_ts(timeseries_descriptor, dit.start_index, dit.end_index, date_range_ns_precision) + date_range = self._get_time_range_from_ts( + timeseries_descriptor, dit.start_index, dit.end_index, date_range_ns_precision + ) last_update = datetime64(dit.creation_ts, "ns") return { "col_names": {"columns": columns, "index": index, "index_dtype": index_dtype}, @@ -2779,12 +2798,7 @@ def _process_info( "sorted": sorted_value_name(timeseries_descriptor.sorted), } - def get_info( - self, - symbol: str, - version: Optional[VersionQueryInput] = None, - **kwargs - ) -> Dict[str, Any]: + def get_info(self, symbol: str, version: Optional[VersionQueryInput] = None, **kwargs) -> Dict[str, Any]: """ Returns descriptive data for `symbol`. @@ -2933,11 +2947,11 @@ def is_symbol_fragmented(self, symbol: str, segment_size: Optional[int] = None) return self.version_store.is_symbol_fragmented(symbol, segment_size) def defragment_symbol_data( - self, - symbol: str, - segment_size: Optional[int] = None, - prune_previous_versions: Optional[bool] = None, - **kwargs, + self, + symbol: str, + segment_size: Optional[int] = None, + prune_previous_versions: Optional[bool] = None, + **kwargs, ) -> VersionedItem: """ Compacts fragmented segments by merging row-sliced segments (https://docs.arcticdb.io/technical/on_disk_storage/#data-layer). @@ -3016,7 +3030,7 @@ def defragment_symbol_data( metadata=None, data=None, host=self.env, - timestamp=result.timestamp + timestamp=result.timestamp, ) def library(self): diff --git a/python/tests/hypothesis/arcticdb/test_sort_merge.py b/python/tests/hypothesis/arcticdb/test_sort_merge.py index 2417d1bf08..f524f712a3 100644 --- a/python/tests/hypothesis/arcticdb/test_sort_merge.py +++ b/python/tests/hypothesis/arcticdb/test_sort_merge.py @@ -93,7 +93,7 @@ def merge_and_sort_segment_list(segment_list, int_columns_in_df=None): # it will become float column and the missing values will be NaN. int_columns_in_df = int_columns_in_df if int_columns_in_df else [] for col in int_columns_in_df: - merged[col] = merged[col].replace(np.NaN, 0) + merged[col] = merged[col].replace(np.nan, 0) merged.sort_index(inplace=True) return merged diff --git a/python/tests/integration/arcticdb/test_arctic.py b/python/tests/integration/arcticdb/test_arctic.py index e1d31f60ad..f1c30a8bbc 100644 --- a/python/tests/integration/arcticdb/test_arctic.py +++ b/python/tests/integration/arcticdb/test_arctic.py @@ -107,6 +107,7 @@ def test_s3_no_ssl_verification(monkeypatch, s3_no_ssl_storage, client_cert_file @REAL_S3_TESTS_MARK +@pytest.mark.skip(reason="This test is not stable") def test_s3_sts_auth(real_s3_sts_storage): ac = Arctic(real_s3_sts_storage.arctic_uri) lib = ac.create_library("test") diff --git a/python/tests/integration/arcticdb/test_persistent_storage.py b/python/tests/integration/arcticdb/test_persistent_storage.py index 4830669b8e..54a905e7cb 100644 --- a/python/tests/integration/arcticdb/test_persistent_storage.py +++ b/python/tests/integration/arcticdb/test_persistent_storage.py @@ -18,8 +18,11 @@ @pytest.fixture(params=[pytest.param("REAL_S3", marks=REAL_S3_TESTS_MARK)]) -def shared_persistent_arctic_client(real_s3_storage_without_clean_up, encoding_version): - return real_s3_storage_without_clean_up.create_arctic(encoding_version=encoding_version) +# Only test with encoding version 0 (a.k.a.) for now +# because there is a problem when older versions try to read configs with a written encoding version +# def shared_persistent_arctic_client(real_s3_storage_without_clean_up, encoding_version): +def shared_persistent_arctic_client(real_s3_storage_without_clean_up): + return real_s3_storage_without_clean_up.create_arctic() # TODO: Add a check if the real storage tests are enabled diff --git a/python/tests/integration/arcticdb/test_storage_lock.py b/python/tests/integration/arcticdb/test_storage_lock.py index b3294943c0..6226cc8a52 100644 --- a/python/tests/integration/arcticdb/test_storage_lock.py +++ b/python/tests/integration/arcticdb/test_storage_lock.py @@ -1,6 +1,7 @@ import pandas as pd import numpy as np import pytest +import sys from arcticdb_ext.tools import ReliableStorageLock, ReliableStorageLockManager from tests.util.mark import PERSISTENT_STORAGE_TESTS_ENABLED, REAL_S3_TESTS_MARK @@ -23,6 +24,7 @@ def slow_increment_task(lib, symbol, sleep_time, lock_manager, lock): lock_manager.free_lock_guard() +@pytest.mark.skip(reason="This test is flaky, skip it temporarily") @pytest.mark.parametrize("num_processes,max_sleep", [(100, 1), (5, 20)]) @REAL_S3_TESTS_MARK def test_many_increments(real_s3_version_store, num_processes, max_sleep): @@ -31,11 +33,11 @@ def test_many_increments(real_s3_version_store, num_processes, max_sleep): symbol = "counter" lib.version_store.force_delete_symbol(symbol) lib.write(symbol, init_df) - lock = ReliableStorageLock("test_lock", lib._library, 10*one_sec) + lock = ReliableStorageLock("test_lock", lib._library, 10 * one_sec) lock_manager = ReliableStorageLockManager() processes = [ - Process(target=slow_increment_task, args=(lib, symbol, 0 if i%2==0 else max_sleep, lock_manager, lock)) + Process(target=slow_increment_task, args=(lib, symbol, 0 if i % 2 == 0 else max_sleep, lock_manager, lock)) for i in range(num_processes) ] for p in processes: diff --git a/python/tests/integration/arcticdb/version_store/test_basic_version_store.py b/python/tests/integration/arcticdb/version_store/test_basic_version_store.py index 92ad6e9a4c..c2c6d3dda8 100644 --- a/python/tests/integration/arcticdb/version_store/test_basic_version_store.py +++ b/python/tests/integration/arcticdb/version_store/test_basic_version_store.py @@ -61,10 +61,12 @@ def assert_equal_value(data, expected): expected = expected.reindex(sorted(expected.columns), axis=1) assert_frame_equal(received, expected) + def assert_equal(received, expected): assert_frame_equal(received, expected) assert received.equals(expected) + def test_simple_flow(basic_store_no_symbol_list, symbol): df = sample_dataframe() modified_df = pd.DataFrame({"col": [1, 2, 3, 4]}) @@ -248,7 +250,7 @@ def test_empty_symbol_name_2(object_version_store): @pytest.mark.parametrize( - "method", ("write", "append", "update", "write_metadata" ,"batch_write", "batch_append", "batch_write_metadata") + "method", ("write", "append", "update", "write_metadata", "batch_write", "batch_append", "batch_write_metadata") ) def test_empty_symbol_name(lmdb_version_store_v1, method): first_arg = [""] if method.startswith("batch_") else "" @@ -1711,7 +1713,7 @@ def test_dataframe_with_NaN_in_timestamp_column(basic_store): def test_dataframe_with_nan_and_nat_in_timestamp_column(basic_store): - df_with_NaN_mixed_in_ts = pd.DataFrame({"col": [pd.Timestamp("now"), pd.NaT, np.NaN]}) + df_with_NaN_mixed_in_ts = pd.DataFrame({"col": [pd.Timestamp("now"), pd.NaT, np.nan]}) basic_store.write("mixed_nan", df_with_NaN_mixed_in_ts) returned_df = basic_store.read("mixed_nan").data # NaN will now be converted to NaT @@ -1719,16 +1721,16 @@ def test_dataframe_with_nan_and_nat_in_timestamp_column(basic_store): def test_dataframe_with_nan_and_nat_only(basic_store): - df_with_nan_and_nat_only = pd.DataFrame({"col": [pd.NaT, pd.NaT, np.NaN]}) # Sample will be pd.NaT + df_with_nan_and_nat_only = pd.DataFrame({"col": [pd.NaT, pd.NaT, np.nan]}) # Sample will be pd.NaT basic_store.write("nan_nat", df_with_nan_and_nat_only) assert_equal(basic_store.read("nan_nat").data, pd.DataFrame({"col": [pd.NaT, pd.NaT, pd.NaT]})) def test_coercion_to_float(basic_store): lib = basic_store - df = pd.DataFrame({"col": [np.NaN, "1", np.NaN]}) + df = pd.DataFrame({"col": [np.nan, "1", np.nan]}) # col is now an Object column with all NaNs - df["col"][1] = np.NaN + df["col"][1] = np.nan assert df["col"].dtype == np.object_ @@ -2350,7 +2352,7 @@ def test_batch_read_row_range(lmdb_version_store_v1, use_row_range_clause): for idx, sym in enumerate(result_dict.keys()): df = result_dict[sym].data row_range = row_ranges[idx] - assert_equal(df, dfs[idx].iloc[row_range[0]:row_range[1]]) + assert_equal(df, dfs[idx].iloc[row_range[0] : row_range[1]]) def test_batch_read_columns(basic_store_tombstone_and_sync_passive): diff --git a/python/tests/stress/arcticdb/version_store/test_mem_leaks.py b/python/tests/stress/arcticdb/version_store/test_mem_leaks.py index c9b8b4fd97..a261518541 100644 --- a/python/tests/stress/arcticdb/version_store/test_mem_leaks.py +++ b/python/tests/stress/arcticdb/version_store/test_mem_leaks.py @@ -27,14 +27,15 @@ from arcticdb.version_store.library import Library, ReadRequest from arcticdb.version_store.processing import QueryBuilder from arcticdb.version_store._store import NativeVersionStore -from tests.util.mark import MACOS, SLOW_TESTS_MARK, WINDOWS, MEMRAY_SUPPORTED, MEMRAY_TESTS_MARK, SKIP_CONDA_MARK +from tests.util.mark import MACOS, SLOW_TESTS_MARK, WINDOWS, MEMRAY_SUPPORTED, MEMRAY_TESTS_MARK, SKIP_CONDA_MARK logging.basicConfig(level=logging.INFO) logger = logging.getLogger("Memory_tests") -#region HELPER functions for non-memray tests +# region HELPER functions for non-memray tests + def nice_bytes_str(bytes): return f" {bytes / (1024 * 1024):.2f}MB/[{bytes}] " @@ -135,12 +136,12 @@ def check_process_memory_leaks( print(f" Maximum growth so far: {nice_bytes_str(max(mem_growth_each_iter))}") print(f" Number of times there was 50% drop in memory: {count_drops(mem_growth_each_iter, 0.5)}") - assert ( - max_total_mem_lost_threshold_bytes >= process_growth - ), f"Memory of the process grew more than defined threshold: {nice_bytes_str(process_growth)} (specified: {nice_bytes_str(max_total_mem_lost_threshold_bytes)} )" - assert ( - max_machine_memory_percentage >= mem_per - ), f"Machine utilized more memory than specified threshold :{mem_per}% (specified {max_machine_memory_percentage}%)" + assert max_total_mem_lost_threshold_bytes >= process_growth, ( + f"Memory of the process grew more than defined threshold: {nice_bytes_str(process_growth)} (specified: {nice_bytes_str(max_total_mem_lost_threshold_bytes)} )" + ) + assert max_machine_memory_percentage >= mem_per, ( + f"Machine utilized more memory than specified threshold :{mem_per}% (specified {max_machine_memory_percentage}%)" + ) print( "The process assessment finished within expectations. Total consumed additional mem is bellow threshold: ", @@ -161,7 +162,7 @@ def grow_exp(df_to_grow: pd.DataFrame, num_times_xx2: int): return df_to_grow -def generate_big_dataframe(rows: int = 1000000, num_exp_time_growth: int=5) -> pd.DataFrame: +def generate_big_dataframe(rows: int = 1000000, num_exp_time_growth: int = 5) -> pd.DataFrame: """ A quick and time efficient wat to generate very large dataframe. The first parameter will be passed to get_sample_dataframe() so that a dataframe @@ -177,9 +178,11 @@ def generate_big_dataframe(rows: int = 1000000, num_exp_time_growth: int=5) -> p logger.info(f"Generation took : {time.time() - st}") return df -#endregion -#region HELPER functions for memray tests +# endregion + +# region HELPER functions for memray tests + def construct_df_querybuilder_tests(size: int) -> pd.DataFrame: df = get_sample_dataframe(size) @@ -197,28 +200,18 @@ def query_filter_then_groupby_with_aggregations() -> QueryBuilder: return ( q[q["bool"]] .groupby("uint8") - .agg({"uint32": "mean", - "int32": "sum", - "strings": "count", - "float64": "sum", - "float32": "min", - "int16": "max"}) + .agg({"uint32": "mean", "int32": "sum", "strings": "count", "float64": "sum", "float32": "min", "int16": "max"}) ) + def query_no_filter_only_groupby_with_aggregations() -> QueryBuilder: """ groupby composite aggregation query for QueryBuilder memory tests. The query basically will do aggregation of half of dataframe """ q = QueryBuilder() - return ( - q.groupby("uint8") - .agg({"uint32": "mean", - "int32": "sum", - "strings": "count", - "float64": "sum", - "float32": "min", - "int16": "max"}) + return q.groupby("uint8").agg( + {"uint32": "mean", "int32": "sum", "strings": "count", "float64": "sum", "float32": "min", "int16": "max"} ) @@ -231,12 +224,7 @@ def query_filter_impossible_cond_groupby_with_aggregations_for_whole_frame() -> return ( q[q["strings"] != "QASDFGH"] .groupby("int16") - .agg({"uint32": "mean", - "int32": "sum", - "strings": "count", - "float64": "sum", - "float32": "min", - "int16": "max"}) + .agg({"uint32": "mean", "int32": "sum", "strings": "count", "float64": "sum", "float32": "min", "int16": "max"}) ) @@ -259,16 +247,18 @@ def query_resample_minutes() -> QueryBuilder: """ q = QueryBuilder() return q.resample("min").agg( - {"int8" : "min", - "int16" : "max", - "int32" : "first", - "int64" : "last", - "uint64" : "sum", - "float32" : "mean", - "float64" : "sum", - "strings" : "count", - "bool" : "sum"} - ) + { + "int8": "min", + "int16": "max", + "int32": "first", + "int64": "last", + "uint64": "sum", + "float32": "mean", + "float64": "sum", + "strings": "count", + "bool": "sum", + } + ) def query_row_range_57percent(size: int) -> QueryBuilder: @@ -280,16 +270,16 @@ def query_row_range_57percent(size: int) -> QueryBuilder: start_percentage = random.uniform(0.01, 1.0 - percentage_rows_returned) result_size_rows = int(0.57 * size) q = QueryBuilder() - a = random.randint(0,int((size-1) * start_percentage)) + a = random.randint(0, int((size - 1) * start_percentage)) b = a + result_size_rows logger.info(f" GENERATED ROW RANGE {a} - {b}") - return q.row_range( (a, b) ) + return q.row_range((a, b)) def query_date_range_57percent(start: pd.Timestamp, end: pd.Timestamp) -> QueryBuilder: """ Date range query for QueryBuilder memory tests - Will generate random date range that will return + Will generate random date range that will return always the specified percentage rows """ percentage_rows_returned = 0.57 @@ -301,7 +291,7 @@ def query_date_range_57percent(start: pd.Timestamp, end: pd.Timestamp) -> QueryB percent_duration = total_duration * percentage_rows_returned b = a + percent_duration logger.info(f" GENERATED DATE RANGE {a} - {b}") - return q.date_range( (a,b)) + return q.date_range((a, b)) def print_info(data: pd.DataFrame, q: QueryBuilder): @@ -313,16 +303,19 @@ def gen_random_date(start: pd.Timestamp, end: pd.Timestamp): """ Returns random timestamp from specified period """ - date_range = pd.date_range(start=start, end=end, freq='s') + date_range = pd.date_range(start=start, end=end, freq="s") return random.choice(date_range) -#endregion -#region TESTS non-memray type - "guessing" memory leak through series of repetitions +# endregion + +# region TESTS non-memray type - "guessing" memory leak through series of repetitions + -@SKIP_CONDA_MARK # Conda CI runner doesn't have enough storage to perform these stress tests -@pytest.mark.skipif(WINDOWS, - reason="Not enough storage on Windows runners, due to large Win OS footprint and less free mem") +@SKIP_CONDA_MARK # Conda CI runner doesn't have enough storage to perform these stress tests +@pytest.mark.skipif( + WINDOWS, reason="Not enough storage on Windows runners, due to large Win OS footprint and less free mem" +) @pytest.mark.skipif(MACOS, reason="Problem on MacOs most probably similar to WINDOWS") def test_mem_leak_read_all_arctic_lib(arctic_library_lmdb_100gb): lib: adb.Library = arctic_library_lmdb_100gb @@ -361,10 +354,12 @@ def proc_to_examine(): check_process_memory_leaks(proc_to_examine, 20, max_mem_bytes, 80.0) -@pytest.mark.skipif(WINDOWS, - reason="Not enough storage on Windows runners, due to large Win OS footprint and less free mem") + +@pytest.mark.skipif( + WINDOWS, reason="Not enough storage on Windows runners, due to large Win OS footprint and less free mem" +) @pytest.mark.skipif(MACOS, reason="Problem on MacOs most probably similar to WINDOWS") -@SKIP_CONDA_MARK # Conda CI runner doesn't have enough storage to perform these stress tests +@SKIP_CONDA_MARK # Conda CI runner doesn't have enough storage to perform these stress tests def test_mem_leak_querybuilder_standard(arctic_library_lmdb_100gb): """ This test uses old approach with iterations. @@ -385,13 +380,15 @@ def test_mem_leak_querybuilder_standard(arctic_library_lmdb_100gb): gc.collect() def proc_to_examine(): - queries = [query_filter_then_groupby_with_aggregations(), - query_filter_impossible_cond_groupby_with_aggregations_for_whole_frame(), - query_no_filter_only_groupby_with_aggregations(), - query_apply_clause_only(random_string(10)), - query_resample_minutes(), - query_row_range_57percent(size), - query_date_range_57percent(start_date, end_date)] + queries = [ + query_filter_then_groupby_with_aggregations(), + query_filter_impossible_cond_groupby_with_aggregations_for_whole_frame(), + query_no_filter_only_groupby_with_aggregations(), + query_apply_clause_only(random_string(10)), + query_resample_minutes(), + query_row_range_57percent(size), + query_date_range_57percent(start_date, end_date), + ] for q in queries: data: pd.DataFrame = lib.read(symbol, query_builder=q).data print_info(data, q) @@ -405,7 +402,7 @@ def proc_to_examine(): check_process_memory_leaks(proc_to_examine, 10, max_mem_bytes, 80.0) -@SKIP_CONDA_MARK # Conda CI runner doesn't have enough storage to perform these stress tests +@SKIP_CONDA_MARK # Conda CI runner doesn't have enough storage to perform these stress tests def test_mem_leak_read_all_native_store(lmdb_version_store_very_big_map): lib: NativeVersionStore = lmdb_version_store_very_big_map @@ -428,20 +425,23 @@ def proc_to_examine(): check_process_memory_leaks(proc_to_examine, 20, max_mem_bytes, 80.0) -#endregion -#region TESTS pytest-memray type for memory limit and leaks +# endregion + +# region TESTS pytest-memray type for memory limit and leaks ## NOTE: Currently tests can be executed on Python >= 3.8 only @pytest.fixture # NOTE: for now we run only V1 encoding as test is very slow -def library_with_symbol(arctic_library_lmdb, only_test_encoding_version_v1) -> Generator[Tuple[Library, pd.DataFrame, str], None, None]: +def library_with_symbol( + arctic_library_lmdb, only_test_encoding_version_v1 +) -> Generator[Tuple[Library, pd.DataFrame, str], None, None]: """ - As memray instruments memory, we need to take out - everything not relevant from mem leak measurement out of - test, and place it in setup of the test - in other words in the fixture + As memray instruments memory, we need to take out + everything not relevant from mem leak measurement out of + test, and place it in setup of the test - in other words in the fixture Otherwise, memray instruments that code and it results in much slower execution as well as mixing results of memory leaks - are they in what we test - for example read() or in construct_df_querybuilder_tests() ? or in other code? @@ -452,15 +452,18 @@ def library_with_symbol(arctic_library_lmdb, only_test_encoding_version_v1) -> G lib.write(symbol, df) yield (lib, df, symbol) + @pytest.fixture # NOTE: for now we run only V1 encoding as test is very slow -def library_with_tiny_symbol(arctic_library_lmdb, only_test_encoding_version_v1) -> Generator[Tuple[Library, pd.DataFrame, str], None, None]: +def library_with_tiny_symbol( + arctic_library_lmdb, only_test_encoding_version_v1 +) -> Generator[Tuple[Library, pd.DataFrame, str], None, None]: """ - As memray instruments memory, we need to take out - everything not relevant from mem leak measurement out of - test, and place it in setup of the test - in other words in the fixture + As memray instruments memory, we need to take out + everything not relevant from mem leak measurement out of + test, and place it in setup of the test - in other words in the fixture Otherwise, memray instruments that code and it results in much slower execution - as well as mixing results of memory leaks + as well as mixing results of memory leaks """ lib: Library = arctic_library_lmdb symbol = "test" @@ -468,11 +471,12 @@ def library_with_tiny_symbol(arctic_library_lmdb, only_test_encoding_version_v1) lib.write(symbol, df) yield (lib, df, symbol) -def mem_query(lib: Library, df: pd.DataFrame, num_repetitions:int=1, read_batch:bool=False): + +def mem_query(lib: Library, df: pd.DataFrame, num_repetitions: int = 1, read_batch: bool = False): """ This is the function where we test different types of queries against a large dataframe. Later this - function will be used for memory limit and memory leaks + function will be used for memory limit and memory leaks tests """ size = df.shape[0] @@ -484,46 +488,47 @@ def mem_query(lib: Library, df: pd.DataFrame, num_repetitions:int=1, read_batch: del df gc.collect() - queries = [query_filter_then_groupby_with_aggregations(), - query_no_filter_only_groupby_with_aggregations(), - query_filter_impossible_cond_groupby_with_aggregations_for_whole_frame(), - query_apply_clause_only(random_string(10)), - query_resample_minutes(), - query_row_range_57percent(size), - query_date_range_57percent(start_date, end_date)] - + queries = [ + query_filter_then_groupby_with_aggregations(), + query_no_filter_only_groupby_with_aggregations(), + query_filter_impossible_cond_groupby_with_aggregations_for_whole_frame(), + query_apply_clause_only(random_string(10)), + query_resample_minutes(), + query_row_range_57percent(size), + query_date_range_57percent(start_date, end_date), + ] + for rep in range(num_repetitions): - logger.info(f"REPETITION : {rep}") - if (read_batch): - logger.info("RUN read_batch() tests") - read_requests = [ReadRequest(symbol=symbol, - query_builder=query - ) for query in queries] - results_read = lib.read_batch(read_requests) - cnt = 0 - for result in results_read: - assert not result.data is None - if (num_repetitions < 20): - print_info(result.data, queries[cnt]) - cnt += 1 - del read_requests, results_read - else: - logger.info("RUN read() tests") - for q in queries: - data: pd.DataFrame = lib.read(symbol, query_builder=q).data - lib.read_batch - if (num_repetitions < 20): - print_info(data, q) - del data - gc.collect() + logger.info(f"REPETITION : {rep}") + if read_batch: + logger.info("RUN read_batch() tests") + read_requests = [ReadRequest(symbol=symbol, query_builder=query) for query in queries] + results_read = lib.read_batch(read_requests) + cnt = 0 + for result in results_read: + assert not result.data is None + if num_repetitions < 20: + print_info(result.data, queries[cnt]) + cnt += 1 + del read_requests, results_read + else: + logger.info("RUN read() tests") + for q in queries: + data: pd.DataFrame = lib.read(symbol, query_builder=q).data + lib.read_batch + if num_repetitions < 20: + print_info(data, q) + del data + gc.collect() del lib, queries gc.collect() + def test_mem_leak_queries_correctness_precheck(library_with_tiny_symbol): """ This test does precheck to confirm queries work more or less - If it fails then perhaps there was a problem with + If it fails then perhaps there was a problem with QueryBuilder functionality. All checks are based on size of expected dataframe returned by a queryno equality checks @@ -539,37 +544,29 @@ def test_mem_leak_queries_correctness_precheck(library_with_tiny_symbol): lib.write(symbol, df) - data: pd.DataFrame = lib.read(symbol, - query_builder=query_filter_impossible_cond_groupby_with_aggregations_for_whole_frame() - ).data - assert df.shape[0] == data.shape[0] - - data: pd.DataFrame = lib.read(symbol, - query_builder=query_row_range_57percent(size) - ).data + data: pd.DataFrame = lib.read( + symbol, query_builder=query_filter_impossible_cond_groupby_with_aggregations_for_whole_frame() + ).data + assert len(df.int16.unique()) == data.shape[0] + + data: pd.DataFrame = lib.read(symbol, query_builder=query_row_range_57percent(size)).data assert df.shape[0] < data.shape[0] * 2 - - data: pd.DataFrame = lib.read(symbol, - query_builder=query_date_range_57percent(start_date, end_date) - ).data + + data: pd.DataFrame = lib.read(symbol, query_builder=query_date_range_57percent(start_date, end_date)).data assert df.shape[0] < data.shape[0] * 2 - data: pd.DataFrame = lib.read(symbol, - query_builder=query_apply_clause_only(random_string(10)) - ).data + data: pd.DataFrame = lib.read(symbol, query_builder=query_apply_clause_only(random_string(10))).data assert len(df.columns.to_list()) <= data.shape[0] * 2 assert 200 < data.shape[0] - data: pd.DataFrame = lib.read(symbol, - query_builder=query_no_filter_only_groupby_with_aggregations() - ).data + data: pd.DataFrame = lib.read(symbol, query_builder=query_no_filter_only_groupby_with_aggregations()).data # groupby column becomes index assert sorted(list(df["uint8"].unique())) == sorted(list(data.index.unique())) -if MEMRAY_SUPPORTED: +if MEMRAY_SUPPORTED: ## - ## PYTEST-MEMRAY integration is available only from ver 3.8 on + ## PYTEST-MEMRAY integration is available only from ver 3.8 on ## from pytest_memray import Stack @@ -596,16 +593,14 @@ def is_relevant(stack: Stack) -> bool: pass return True - @MEMRAY_TESTS_MARK - @pytest.mark.limit_leaks(location_limit = "50 KB" if not MACOS else "60 KB", - filter_fn = is_relevant) + @pytest.mark.limit_leaks(location_limit="50 KB" if not MACOS else "60 KB", filter_fn=is_relevant) ## Unfortunately it is not possible to xfail memray tests. Instead: ## - log issue for investigation and analysis = to fix leak, or filter out stack frame ## - increase meantime the 'location limit' so that test continue to run and reduce risk of larger mem leaks ## - leave some mark like bellow that code is subject to issue investigation with number of the issue for traceability ## - https://man312219.monday.com/boards/7852509418/pulses/8078461031 - #@pytest.mark.skip(reason = "read() memory leaks Monday#8078461031") + # @pytest.mark.skip(reason = "read() memory leaks Monday#8078461031") def test_mem_leak_querybuilder_read_memray(library_with_symbol): """ Test to capture memory leaks >= of specified number @@ -620,14 +615,13 @@ def test_mem_leak_querybuilder_read_memray(library_with_symbol): @SLOW_TESTS_MARK @MEMRAY_TESTS_MARK - @pytest.mark.limit_leaks(location_limit = "160 KB" if not MACOS else "200 KB", - filter_fn = is_relevant) + @pytest.mark.limit_leaks(location_limit="160 KB" if not MACOS else "200 KB", filter_fn=is_relevant) ## Unfortunately it is not possible to xfail memray tests. Instead: ## - log issue for investigation and analysis = to fix leak, or filter out stack frame ## - increase meantime the 'location limit' so that test continue to run and reduce risk of larger mem leaks ## - leave some mark like bellow that code is subject to issue investigation with number of the issue for traceability ## - https://man312219.monday.com/boards/7852509418/pulses/8067881190 - #@pytest.mark.skip(reason = "read() memory leaks Monday#8067881190") + # @pytest.mark.skip(reason = "read() memory leaks Monday#8067881190") def test_mem_leak_querybuilder_read_manyrepeats_memray(library_with_tiny_symbol): """ Test to capture memory leaks >= of specified number @@ -638,18 +632,17 @@ def test_mem_leak_querybuilder_read_manyrepeats_memray(library_with_tiny_symbol) what we must exclude from calculation """ (lib, df, symbol) = library_with_tiny_symbol - mem_query(lib, df, num_repetitions=125) + mem_query(lib, df, num_repetitions=125) @SLOW_TESTS_MARK @MEMRAY_TESTS_MARK - @pytest.mark.limit_leaks(location_limit = "160 KB" if not MACOS else "200 KB", - filter_fn = is_relevant) + @pytest.mark.limit_leaks(location_limit="160 KB" if not MACOS else "200 KB", filter_fn=is_relevant) ## Unfortunately it is not possible to xfail memray tests. Instead: ## - log issue for investigation and analysis = to fix leak, or filter out stack frame ## - increase meantime the 'location limit' so that test continue to run and reduce risk of larger mem leaks ## - leave some mark like bellow that code is subject to issue investigation with number of the issue for traceability ## - https://man312219.monday.com/boards/7852509418/pulses/8067881190 - #@pytest.mark.skip(reason = "read() memory leaks Monday#8067881190") + # @pytest.mark.skip(reason = "read() memory leaks Monday#8067881190") def test_mem_leak_querybuilder_read_batch_manyrepeats_memray(library_with_tiny_symbol): """ Test to capture memory leaks >= of specified number @@ -660,11 +653,10 @@ def test_mem_leak_querybuilder_read_batch_manyrepeats_memray(library_with_tiny_s what we must exclude from calculation """ (lib, df, symbol) = library_with_tiny_symbol - mem_query(lib, df, num_repetitions=125, read_batch=True) - + mem_query(lib, df, num_repetitions=125, read_batch=True) @MEMRAY_TESTS_MARK - @pytest.mark.limit_leaks(location_limit = "25 KB", filter_fn = is_relevant) + @pytest.mark.limit_leaks(location_limit="25 KB", filter_fn=is_relevant) def test_mem_leak_querybuilder_read_batch_memray(library_with_symbol): """ Test to capture memory leaks >= of specified number @@ -677,7 +669,6 @@ def test_mem_leak_querybuilder_read_batch_memray(library_with_symbol): (lib, df, symbol) = library_with_symbol mem_query(lib, df, read_batch=True) - @MEMRAY_TESTS_MARK @pytest.mark.limit_memory("490 MB") def test_mem_limit_querybuilder_read_memray(library_with_symbol): @@ -704,12 +695,11 @@ def test_mem_limit_querybuilder_read_batch_memray(library_with_symbol): (lib, df, symbol) = library_with_symbol mem_query(lib, df, True) - @pytest.fixture - def library_with_big_symbol_(arctic_library_lmdb) -> Generator[Tuple[Library,str], None, None]: + def library_with_big_symbol_(arctic_library_lmdb) -> Generator[Tuple[Library, str], None, None]: """ - As memray instruments memory, we need to take out - everything not relevant from mem leak measurement out of + As memray instruments memory, we need to take out + everything not relevant from mem leak measurement out of test, so it works as less as possible """ lib: Library = arctic_library_lmdb @@ -719,9 +709,8 @@ def library_with_big_symbol_(arctic_library_lmdb) -> Generator[Tuple[Library,str del df yield (lib, symbol) - @MEMRAY_TESTS_MARK - @pytest.mark.limit_leaks(location_limit = "30 KB", filter_fn = is_relevant) + @pytest.mark.limit_leaks(location_limit="30 KB", filter_fn=is_relevant) def test_mem_leak_read_all_arctic_lib_memray(library_with_big_symbol_): """ This is a new version of the initial test that reads the whole @@ -731,8 +720,8 @@ def test_mem_leak_read_all_arctic_lib_memray(library_with_big_symbol_): (lib, symbol) = library_with_big_symbol_ logger.info("Test starting") st = time.time() - data : pd.DataFrame = lib.read(symbol).data + data: pd.DataFrame = lib.read(symbol).data del data logger.info(f"Test took : {time.time() - st}") - gc.collect() \ No newline at end of file + gc.collect() diff --git a/python/tests/stress/arcticdb/version_store/test_stress_finalize_staged_data.py b/python/tests/stress/arcticdb/version_store/test_stress_finalize_staged_data.py index 93210ce039..3e3737f8d4 100644 --- a/python/tests/stress/arcticdb/version_store/test_stress_finalize_staged_data.py +++ b/python/tests/stress/arcticdb/version_store/test_stress_finalize_staged_data.py @@ -25,11 +25,14 @@ # Uncomment for logging # set_log_level(default_level="DEBUG", console_output=False, file_output_path="/tmp/arcticdb.log") -def generate_chunk_sizes(number_chunks:np.uint32, min_rows:np.uint32=100, max_rows:np.uint32=10000) -> List[np.uint32]: + +def generate_chunk_sizes( + number_chunks: np.uint32, min_rows: np.uint32 = 100, max_rows: np.uint32 = 10000 +) -> List[np.uint32]: return np.random.randint(min_rows, max_rows, number_chunks, dtype=np.uint32) -class Results: +class Results: def __init__(self): self.options = None self.iteration = None @@ -37,56 +40,57 @@ def __init__(self): self.total_rows_finalized = 0 self.finalization_time = None - def __str__(self): - return f"Options: {self.options}\nIteration: {self.iteration}\n# staged chunks: {self.number_staged_chunks}\ntotal rows finalized: {self.total_rows_finalized}\ntime for finalization (s): {self.finalization_time}" - - + def __str__(self): + return f"Options: {self.options}\nIteration: {self.iteration}\n# staged chunks: {self.number_staged_chunks}\ntotal rows finalized: {self.total_rows_finalized}\ntime for finalization (s): {self.finalization_time}" -def finalize_monotonic_unique_chunks(ac_library, iterations): +@SLOW_TESTS_MARK +@SKIP_CONDA_MARK # Conda CI runner doesn't have enough storage to perform these stress tests +@pytest.mark.skipif(sys.platform == "win32", reason="Not enough storage on Windows runners") +def test_finalize_monotonic_unique_chunks(arctic_library_lmdb): """ - The test is designed to staged thousands of chunks with variable chunk size. - To experiment on local computer you can move up to 20k number of chunks approx 10k each + The test is designed to staged thousands of chunks with variable chunk size. + To experiment on local computer you can move up to 20k number of chunks approx 10k each - For stress testing this number is reduced due to github runner HDD size - 16 GB only + For stress testing this number is reduced due to github runner HDD size - 16 GB only - On local disk you must use "arctic_library_lmdb" fixture as it sets 100 GB limit. - If you use "basic_arctic_library" you might end with much more space taken eating all your space - if you want to experiment with more number of chunks + On local disk you must use "arctic_library_lmdb" fixture as it sets 100 GB limit. + If you use "basic_arctic_library" you might end with much more space taken eating all your space + if you want to experiment with more number of chunks """ options = [ - {"chunks_descending" : True, "finalization_mode" : StagedDataFinalizeMethod.APPEND}, - {"chunks_descending" : True, "finalization_mode" : StagedDataFinalizeMethod.WRITE}, - {"chunks_descending" : False, "finalization_mode" : StagedDataFinalizeMethod.WRITE}, - {"chunks_descending" : False, "finalization_mode" : StagedDataFinalizeMethod.APPEND}, - ] + {"chunks_descending": True, "finalization_mode": StagedDataFinalizeMethod.APPEND}, + {"chunks_descending": True, "finalization_mode": StagedDataFinalizeMethod.WRITE}, + {"chunks_descending": False, "finalization_mode": StagedDataFinalizeMethod.WRITE}, + {"chunks_descending": False, "finalization_mode": StagedDataFinalizeMethod.APPEND}, + ] # Will hold the results after each iteration (instance of Results class) results = [] - lib : Library = ac_library + lib: Library = arctic_library_lmdb total_start_time = time.time() # We would need to generate as fast as possible kind of random - # dataframes. To do that we build a large cache and will + # dataframes. To do that we build a large cache and will # sample rows from there as we need to run as fast as we can cachedDF = CachedDFGenerator(250000) total_number_rows_all_iterations: int = 0 # This will serve us as a counter and at the same time it provides unique index for each row - total_number_rows: TimestampNumber = TimestampNumber(0, cachedDF.TIME_UNIT) # Synchronize index frequency - INITIAL_TIMESTAMP: TimestampNumber = TimestampNumber(0, cachedDF.TIME_UNIT) # Synchronize index frequency - symbol="staged" + total_number_rows: TimestampNumber = TimestampNumber(0, cachedDF.TIME_UNIT) # Synchronize index frequency + INITIAL_TIMESTAMP: TimestampNumber = TimestampNumber(0, cachedDF.TIME_UNIT) # Synchronize index frequency + symbol = "staged" num_rows_initially = 99999 print(f"Writing to symbol initially {num_rows_initially} rows") df = cachedDF.generate_dataframe_timestamp_indexed(num_rows_initially, total_number_rows, cachedDF.TIME_UNIT) cnt = 0 - for iter in iterations : + for iter in [500, 1000, 1500, 2000]: res = Results() total_number_rows = INITIAL_TIMESTAMP + num_rows_initially @@ -98,12 +102,11 @@ def finalize_monotonic_unique_chunks(ac_library, iterations): print(f"Chunks to stage {len(chunk_list)} ") stage_chunks(lib, symbol, cachedDF, total_number_rows, chunk_list, options[cnt % 4]["chunks_descending"]) - if (options[cnt % 4]["finalization_mode"] == StagedDataFinalizeMethod.APPEND): + if options[cnt % 4]["finalization_mode"] == StagedDataFinalizeMethod.APPEND: total_number_rows = total_number_rows + sum(chunk_list) else: total_number_rows = INITIAL_TIMESTAMP + sum(chunk_list) - print("--" * 50) print(f"STAGED ROWS {total_number_rows.get_value()} after iteration {cnt}") print(f"SYMBOL ACTUAL ROWS before finalization - {lib._nvs.get_num_rows(symbol)} ") @@ -128,7 +131,7 @@ def finalize_monotonic_unique_chunks(ac_library, iterations): results.append(res) - total_number_rows.to_zero() # next iteration start from 0 + total_number_rows.to_zero() # next iteration start from 0 for res in results: print("_" * 100) @@ -136,20 +139,3 @@ def finalize_monotonic_unique_chunks(ac_library, iterations): total_time = time.time() - total_start_time print("TOTAL TIME: ", total_time) - - -@SLOW_TESTS_MARK -@SKIP_CONDA_MARK # Conda CI runner doesn't have enough storage to perform these stress tests -@pytest.mark.skipif(sys.platform == "win32", reason="Not enough storage on Windows runners") -def test_finalize_monotonic_unique_chunks_lmdb(lmdb_library): - finalize_monotonic_unique_chunks(lmdb_library, [500, 1000, 1500, 2000]) - - -@SLOW_TESTS_MARK -@SKIP_CONDA_MARK # Conda CI runner doesn't have enough storage to perform these stress tests -@pytest.mark.skipif(sys.platform == "win32", reason="Not enough storage on Windows runners") -@REAL_S3_TESTS_MARK -def test_finalize_monotonic_unique_chunks_realS3(real_s3_library): - finalize_monotonic_unique_chunks(real_s3_library, [50, 100, 150, 200]) - - diff --git a/python/tests/unit/arcticdb/version_store/test_append.py b/python/tests/unit/arcticdb/version_store/test_append.py index 1ab81dd4c4..955f6fb550 100644 --- a/python/tests/unit/arcticdb/version_store/test_append.py +++ b/python/tests/unit/arcticdb/version_store/test_append.py @@ -170,17 +170,6 @@ def test_append_snapshot_delete(lmdb_version_store): assert_frame_equal(lmdb_version_store.read(symbol, as_of="my_snap").data, df1) -def _random_integers(size, dtype): - # We do not generate integers outside the int64 range - platform_int_info = np.iinfo("int_") - iinfo = np.iinfo(dtype) - return np.random.randint( - max(iinfo.min, platform_int_info.min), - min(iinfo.max, platform_int_info.max), - size=size, - ).astype(dtype) - - def test_append_out_of_order_throws(lmdb_version_store): lib: NativeVersionStore = lmdb_version_store lib.write("a", pd.DataFrame({"c": [1, 2, 3]}, index=pd.date_range(0, periods=3))) @@ -197,8 +186,8 @@ def test_append_out_of_order_and_sort(lmdb_version_store_ignore_order, prune_pre dtidx = pd.date_range("1970-01-01", periods=num_rows) test = pd.DataFrame( { - "uint8": _random_integers(num_rows, np.uint8), - "uint32": _random_integers(num_rows, np.uint32), + "uint8": random_integers(num_rows, np.uint8), + "uint32": random_integers(num_rows, np.uint32), }, index=dtidx, ) @@ -268,8 +257,8 @@ def test_upsert_with_delete(lmdb_version_store_big_map): dtidx = pd.date_range("1970-01-01", periods=num_rows) test = pd.DataFrame( { - "uint8": _random_integers(num_rows, np.uint8), - "uint32": _random_integers(num_rows, np.uint32), + "uint8": random_integers(num_rows, np.uint8), + "uint32": random_integers(num_rows, np.uint32), }, index=dtidx, ) diff --git a/python/tests/unit/arcticdb/version_store/test_filtering.py b/python/tests/unit/arcticdb/version_store/test_filtering.py index 9afa82dc87..dde7f2cd02 100644 --- a/python/tests/unit/arcticdb/version_store/test_filtering.py +++ b/python/tests/unit/arcticdb/version_store/test_filtering.py @@ -32,7 +32,7 @@ generic_filter_test_strings, generic_filter_test_nans, ) -from arcticdb.util._versions import IS_PANDAS_TWO, PANDAS_VERSION +from arcticdb.util._versions import IS_PANDAS_TWO, PANDAS_VERSION, IS_NUMPY_TWO pytestmark = pytest.mark.pipeline @@ -1185,7 +1185,7 @@ def test_filter_column_not_present_dynamic(lmdb_version_store_dynamic_schema_v1) lib.write(symbol, df) vit = lib.read(symbol, query_builder=q) - if IS_PANDAS_TWO and sys.platform.startswith("win32"): + if (not IS_NUMPY_TWO) and (IS_PANDAS_TWO and sys.platform.startswith("win32")): # Pandas 2.0.0 changed the behavior of Index creation from numpy arrays: # "Previously, all indexes created from numpy numeric arrays were forced to 64-bit. # Now, for example, Index(np.array([1, 2, 3])) will be int32 on 32-bit systems, diff --git a/python/tests/util/mark.py b/python/tests/util/mark.py index ffd1d96841..6e72d99788 100644 --- a/python/tests/util/mark.py +++ b/python/tests/util/mark.py @@ -5,6 +5,7 @@ As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0. """ + import os import sys import pytest @@ -73,8 +74,10 @@ ) VENV_COMPAT_TESTS_MARK = pytest.mark.skipif( - MACOS_CONDA_BUILD, - reason="Skipping compatibility tests because macOS conda builds don't have an available PyPi arcticdb version" + MACOS_CONDA_BUILD + or sys.version.startswith("3.12") + or sys.version.startswith("3.13"), # Waiting for https://github.com/man-group/ArcticDB/issues/2008 + reason="Skipping compatibility tests because macOS conda builds don't have an available PyPi arcticdb version", ) diff --git a/python/tests/util/storage_test.py b/python/tests/util/storage_test.py index b50a122a1f..4cc7f00f36 100644 --- a/python/tests/util/storage_test.py +++ b/python/tests/util/storage_test.py @@ -137,7 +137,7 @@ def verify_library(ac): def is_strategy_branch_valid_format(input_string): - pattern = r"^(linux|windows)_cp3(6|7|8|9|10|11).*$" + pattern = r"^(linux|windows)_cp3(6|7|8|9|10|11|12|13).*$" match = re.match(pattern, input_string) return bool(match) diff --git a/setup.cfg b/setup.cfg index 413292bc3f..85f5b6e0b1 100644 --- a/setup.cfg +++ b/setup.cfg @@ -34,13 +34,12 @@ install_requires = # conda-forge's package and feedstock browser. # # See: https://conda-forge.org/feedstock-outputs/ - numpy <2 # To guard against numpy v2 when it gets released https://pythonspeed.com/articles/numpy-2/ + numpy pandas attrs dataclasses ; python_version < '3.7' protobuf >=3.5.0.post1 # Per https://github.com/grpc/grpc/blob/v1.45.3/requirements.txt msgpack >=0.5.0 # msgpack 0.5.0 is required for strict_types argument, needed for correct pickling fallback - pyyaml packaging