From 5e65e5627239ecc14c9fe20dc7d22f68e5c1660d Mon Sep 17 00:00:00 2001 From: abram axel booth Date: Tue, 7 Jan 2025 13:59:54 -0500 Subject: [PATCH] wip..... --- _TODO_multindex.txt | 16 +++++----- share/admin/search.py | 23 +++++++++------ share/models/index_backfill.py | 8 ++--- share/search/daemon.py | 7 +++-- share/search/index_strategy/__init__.py | 4 +-- share/search/index_strategy/_base.py | 29 ++++++++++--------- share/search/index_strategy/_indexnames.py | 21 ++++++++++++++ share/search/index_strategy/elastic8.py | 20 ++++++------- .../index_strategy/trovesearch_denorm.py | 24 +++++---------- 9 files changed, 86 insertions(+), 66 deletions(-) create mode 100644 share/search/index_strategy/_indexnames.py diff --git a/_TODO_multindex.txt b/_TODO_multindex.txt index 97ed79d49..6c160ac63 100644 --- a/_TODO_multindex.txt +++ b/_TODO_multindex.txt @@ -3,11 +3,11 @@ IndexStrategy revamp plan/log: - update existing classes to dataclasses - IndexStrategy (each instance represents a current or past version of the strategy, identified by subname) - - name (existing attr) - - subname (new; default from CURRENT_STRATEGY_CHECKSUM) - - IndexStrategy.SpecificIndex (each strategy has a set of indexes with unique short-names) + - strategy_name (rename existing `name` attr for disambig) + - strategy_check (new; default CURRENT_STRATEGY_CHECKSUM but may be parsed from or index name or `indexStrategy` query param) + - IndexStrategy.SpecificIndex - index_strategy (existing) - - short_indexname (new; unique within a strategy) + - subname (new; unique within a strategy) - (base SpecificIndex now mainly inward, focused on constructing index names and checking index status) - move search methods from IndexStrategy.SpecificIndex to IndexStrategy @@ -19,6 +19,7 @@ IndexStrategy revamp plan/log: - for_specific_index - for_current_index - SpecificIndex.pls_setup + - SpecificIndex.pls_check_exists - SpecificIndex.pls_handle_cardsearch - SpecificIndex.pls_handle_valuesearch - SpecificIndex.pls_refresh @@ -30,16 +31,17 @@ IndexStrategy revamp plan/log: - add replacement multiindex methods to IndexStrategy (and friends) - (classmethod) each_existing_index (based on index names from elastic; may be any hex) - each_named_index (includes non-existent; ) - - index_shortname_set - - get_index_by_shortname + - get_index_by_subname + - subnames - is_current - pls_setup + - pls_check_exists - pls_start_keeping_live - pls_teardown - pls_handle_cardsearch - pls_handle_valuesearch - pls_ensure_fresh - - Elastic8IndexStrategy.index_definitions (abstractmethod) + - Elastic8IndexStrategy.each_index_definition (abstractmethod) - Elastic8IndexStrategy.each_named_index (based on index_definitions) - update existing base methods diff --git a/share/admin/search.py b/share/admin/search.py index 4f22304c6..b635fb525 100644 --- a/share/admin/search.py +++ b/share/admin/search.py @@ -52,9 +52,9 @@ def _mappings_url_prefix(): def _index_status_by_strategy(): - backfill_by_indexname: dict[str, IndexBackfill] = { - backfill.specific_indexname: backfill - for backfill in ( + _backfill_by_checksum: dict[str, IndexBackfill] = { + _backfill.strategy_checksum: _backfill + for _backfill in ( IndexBackfill.objects .filter(index_strategy_name__in=index_strategy.all_index_strategies().keys()) ) @@ -62,13 +62,18 @@ def _index_status_by_strategy(): status_by_strategy = {} _messenger = IndexMessenger() for _index_strategy in index_strategy.all_index_strategies().values(): - current_index = _index_strategy.for_current_index() + _current_backfill = _backfill_by_checksum.get( + str(_index_strategy.CURRENT_STRATEGY_CHECKSUM), + ) status_by_strategy[_index_strategy.name] = { 'current': { - 'status': current_index.pls_get_status(), + 'status': [ + _index.pls_get_status() + for _index in _index_strategy.each_current_index() + ], 'backfill': _serialize_backfill( current_index, - backfill_by_indexname.get(current_index.indexname), + _backfill_by_checksum.get(current_index.indexname), ), }, 'prior': sorted(( @@ -91,14 +96,14 @@ def _index_status_by_strategy(): def _serialize_backfill( - specific_index: index_strategy.IndexStrategy.SpecificIndex, + strategy: index_strategy.IndexStrategy, backfill: IndexBackfill | None, ): - if not specific_index.is_current: + if not strategy.is_current: return {} if not backfill: return { - 'can_start_backfill': specific_index.pls_check_exists(), + 'can_start_backfill': strategy.pls_check_exists(), } return { 'backfill_status': backfill.backfill_status, diff --git a/share/models/index_backfill.py b/share/models/index_backfill.py index c8e92ffed..6b3f6fdba 100644 --- a/share/models/index_backfill.py +++ b/share/models/index_backfill.py @@ -47,7 +47,7 @@ class IndexBackfill(models.Model): ) backfill_status = models.TextField(choices=BACKFILL_STATUS_CHOICES, default=INITIAL) index_strategy_name = models.TextField(unique=True) - specific_indexname = models.TextField() + strategy_checksum = models.TextField() error_type = models.TextField(blank=True) error_message = models.TextField(blank=True) error_context = models.TextField(blank=True) @@ -77,13 +77,13 @@ def mutex(self): def pls_start(self, index_strategy): with self.mutex() as locked_self: assert locked_self.index_strategy_name == index_strategy.name - current_index = index_strategy.for_current_index() - if locked_self.specific_indexname == current_index.indexname: + _current_checksum = str(index_strategy.CURRENT_STRATEGY_CHECKSUM) + if locked_self.strategy_checksum == _current_checksum: # what is "current" has not changed -- should be INITIAL assert locked_self.backfill_status == IndexBackfill.INITIAL else: # what is "current" has changed! disregard backfill_status - locked_self.specific_indexname = current_index.indexname + locked_self.strategy_checksum = _current_checksum locked_self.backfill_status = IndexBackfill.INITIAL locked_self.__update_error(None) try: diff --git a/share/search/daemon.py b/share/search/daemon.py index 1fa7cce23..35f4b83d8 100644 --- a/share/search/daemon.py +++ b/share/search/daemon.py @@ -232,11 +232,12 @@ def _the_loop_itself(self): def _raise_if_backfill_noncurrent(self): if self.message_type.is_backfill: index_backfill = self.index_strategy.get_or_create_backfill() - if index_backfill.specific_indexname != self.index_strategy.current_indexname: + _current_checksum = str(self.index_strategy.CURRENT_STRATEGY_CHECKSUM) + if index_backfill.strategy_checksum != _current_checksum: raise exceptions.DaemonSetupError( 'IndexerDaemon observes conflicting currence:' - f'\n\tIndexBackfill (from database) says current is "{index_backfill.specific_indexname}"' - f'\n\tIndexStrategy (from static code) says current is "{self.index_strategy.current_indexname}"' + f'\n\tIndexBackfill (from database) says current is "{index_backfill.strategy_checksum}"' + f'\n\tIndexStrategy (from static code) says current is "{_current_checksum}"' '\n\t(may be the daemon is running old code -- will die and retry,' ' but if this keeps happening you may need to reset backfill_status' ' to INITIAL and restart the backfill)' diff --git a/share/search/index_strategy/__init__.py b/share/search/index_strategy/__init__.py index c55220a5c..12299877d 100644 --- a/share/search/index_strategy/__init__.py +++ b/share/search/index_strategy/__init__.py @@ -43,7 +43,7 @@ def _iter_all_index_strategies(): yield TrovesearchDenormIndexStrategy(name='trovesearch_denorm') -def parse_strategy_request(self, requested_strategy_name: str) -> IndexStrategy: +def parse_strategy_request(requested_strategy_name: str) -> IndexStrategy: (_strategyname, *_etc) = requested_strategy_name.split(_INDEXNAME_DELIM) try: _strategy = get_index_strategy( @@ -74,7 +74,7 @@ def get_specific_index(indexname_or_strategyname: str, *, for_search=False) -> I except IndexStrategyError: for _index_strategy in all_index_strategies().values(): try: - return _index_strategy.get_index_by_shortname(indexname_or_strategyname) + return _index_strategy.get_index_by_subname(indexname_or_strategyname) except IndexStrategyError: pass raise IndexStrategyError(f'unrecognized name "{indexname_or_strategyname}"') diff --git a/share/search/index_strategy/_base.py b/share/search/index_strategy/_base.py index 6eb1e7ae9..117f0bff7 100644 --- a/share/search/index_strategy/_base.py +++ b/share/search/index_strategy/_base.py @@ -18,14 +18,12 @@ CardsearchHandle, ValuesearchHandle, ) +from . import _indexnames as indexnames logger = logging.getLogger(__name__) -_INDEXNAME_DELIM = '__' # used to separate indexnames into a list of meaningful values - - @dataclasses.dataclass class IndexStrategy(abc.ABC): '''an abstraction for indexes in different places and ways. @@ -46,13 +44,13 @@ class IndexStrategy(abc.ABC): CURRENT_STRATEGY_CHECKSUM: typing.ClassVar[ChecksumIri] # set on subclasses to protect against accidents name: str - subname: str = '' # if unspecified, uses current + subname: str = '' # if unspecified, uses current checksum def __post_init__(self): - if _INDEXNAME_DELIM in self.name: - raise IndexStrategyError(f'strategy name may not contain "{_INDEXNAME_DELIM}" (got "{self.name}")') + indexnames.raise_if_invalid_indexname_part(self.name) if not self.subname: self.subname = self.CURRENT_STRATEGY_CHECKSUM.hexdigest + indexnames.raise_if_invalid_indexname_part(self.subname) @property def nonurgent_messagequeue_name(self) -> str: @@ -64,8 +62,7 @@ def urgent_messagequeue_name(self) -> str: @property def indexname_prefix(self) -> str: - # note: ends with _INDEXNAME_DELIM - return _INDEXNAME_DELIM.join((self.name, self.subname, '')) + return indexnames.combine_indexname_parts(self.name, self.subname) @property def indexname_wildcard(self) -> str: @@ -78,10 +75,10 @@ def is_current(self) -> bool: @functools.cached_property def all_current_indexnames(self) -> tuple[str, ...]: self.assert_strategy_is_current() - _single_indexname = ''.join(( + _single_indexname = indexnames.combine_indexname_parts( self.indexname_prefix, self.CURRENT_STRATEGY_CHECKSUM.hexdigest, - )) + ) return (_single_indexname,) def assert_message_type(self, message_type: messages.MessageType): @@ -106,11 +103,12 @@ def assert_strategy_is_current(self): def with_hex(self, subname: str): return dataclasses.replace(self, subname=subname) - def get_index_by_shortname(self, shortname: str) -> typing.Self.SpecificIndex: + def get_index_by_subname(self, shortname: str) -> IndexStrategy.SpecificIndex: return self.SpecificIndex(self, shortname) # type: ignore[abstract] - def for_current_index(self) -> IndexStrategy.SpecificIndex: - return self.get_index_by_shortname(self.current_indexname) + def each_current_index(self) -> typing.Iterator[IndexStrategy.SpecificIndex]: + for _subname in self.: + yield self.get_index_by_subname(_subname) def get_or_create_backfill(self): (index_backfill, _) = IndexBackfill.objects.get_or_create(index_strategy_name=self.name) @@ -177,7 +175,10 @@ def is_current(self) -> bool: @property def indexname(self) -> str: - return f'{self.index_strategy.indexname_prefix}{self.short_indexname}' + return indexnames.combine_indexname_parts( + self.index_strategy.indexname_prefix, + self.short_indexname, + ) def pls_setup(self, *, skip_backfill=False): assert self.is_current, 'cannot setup a non-current index' diff --git a/share/search/index_strategy/_indexnames.py b/share/search/index_strategy/_indexnames.py new file mode 100644 index 000000000..3517a6d01 --- /dev/null +++ b/share/search/index_strategy/_indexnames.py @@ -0,0 +1,21 @@ +from share.search.exceptions import IndexStrategyError + + +INDEXNAME_DELIM = '__' # used to separate indexnames into a list of meaningful values + + +def is_valid_indexname_part(indexname_part: str) -> bool: + return bool(INDEXNAME_DELIM not in indexname_part) + + +def raise_if_invalid_indexname_part(indexname_part: str) -> None: + if INDEXNAME_DELIM in indexname_part: + raise IndexStrategyError(f'name may not contain "{INDEXNAME_DELIM}" (got "{indexname_part}")') + + +def combine_indexname_parts(*indexname_parts: str) -> str: + return INDEXNAME_DELIM.join(indexname_parts) + + +def parse_indexname_parts(name: str) -> list[str]: + return name.split(INDEXNAME_DELIM) diff --git a/share/search/index_strategy/elastic8.py b/share/search/index_strategy/elastic8.py index 40a7101dd..20443b8fd 100644 --- a/share/search/index_strategy/elastic8.py +++ b/share/search/index_strategy/elastic8.py @@ -61,12 +61,8 @@ class IndexDefinition: # abstract methods for subclasses to implement @abc.abstractmethod @classmethod - def index_definitions(cls) -> typing.Iterator[IndexDefinition]: - ... - - def each_named_index(self): - for _index_def in self.each_index_definition(): - yield self.get_index_by_shortname('iris') + def index_definitions(cls) -> dict[str, IndexDefinition]: + raise NotImplementedError @abc.abstractmethod def index_settings(self): @@ -135,6 +131,10 @@ def compute_strategy_checksum(self): }, ) + def each_named_index(self): + for _subname, _index_def in self.index_definitions().items(): + yield self.get_index_by_subname('iris') + # abstract method from IndexStrategy def each_existing_index(self): indexname_set = set( @@ -144,7 +144,7 @@ def each_existing_index(self): ) indexname_set.add(self.current_indexname) for indexname in indexname_set: - yield self.get_index_by_shortname(indexname) + yield self.get_index_by_subname(indexname) # abstract method from IndexStrategy def pls_handle_messages_chunk(self, messages_chunk): @@ -208,7 +208,7 @@ def pls_make_default_for_searching(self, specific_index: IndexStrategy.SpecificI def pls_get_default_for_searching(self) -> IndexStrategy.SpecificIndex: # a SpecificIndex for an alias will work fine for searching, but # will error if you try to invoke lifecycle hooks - return self.get_index_by_shortname(self._alias_for_searching) + return self.get_index_by_subname(self._alias_for_searching) # override from IndexStrategy def pls_mark_backfill_complete(self): @@ -218,11 +218,11 @@ def pls_mark_backfill_complete(self): @property def _alias_for_searching(self): - return f'{self.indexname_prefix}search' + return f'{self.indexname_prefix}__search' @property def _alias_for_keeping_live(self): - return f'{self.indexname_prefix}live' + return f'{self.indexname_prefix}__live' def _elastic_actions_with_index(self, messages_chunk, indexnames, action_tracker: _ActionTracker): if not indexnames: diff --git a/share/search/index_strategy/trovesearch_denorm.py b/share/search/index_strategy/trovesearch_denorm.py index 2bbf6b4d0..6e2dcff07 100644 --- a/share/search/index_strategy/trovesearch_denorm.py +++ b/share/search/index_strategy/trovesearch_denorm.py @@ -60,28 +60,18 @@ class TrovesearchDenormIndexStrategy(Elastic8IndexStrategy): ) @classmethod - def each_index_definition(cls) -> typing.Iterator[Elastic8IndexStrategy.IndexDefiniton]: + def each_index_definition(cls) -> dict[str, Elastic8IndexStrategy.IndexDefiniton]: yield Elastic8IndexStrategy.IndexDefinition( + subname='card', settings=cls._index_settings(), mappings=cls._card_index_mappings(), ) - yield strategy.SpecificIndex( - index_strategy=strategy, - shortname='card', - elastic8_index_settings=cls._index_settings(), - elastic8_index_mappings=cls._card_index_mappings(), - ) - yield self.SpecificIndex( - index_strategy=strategy, - shortname='iris', - elastic8_index_settings=cls._index_settings(), - elastic8_index_mappings=cls._iris_index_mappings(), - ) - - yield Elastic8IndexDefinition( + yield Elastic8IndexStrategy.IndexDefinition( + subname='value', settings=cls._index_settings(), - mappings=cls._card_index_mappings(), + mappings=cls._value_index_mappings(), ) + # abstract method from IndexStrategy @property def supported_message_types(self): @@ -95,7 +85,7 @@ def supported_message_types(self): def backfill_message_type(self): return messages.MessageType.BACKFILL_INDEXCARD - def each_index(self) -> typing.Iterator[TrovesearchDenormIndexStrategy.SpecificIndex]: + def each_index(self) -> Iterator[TrovesearchDenormIndexStrategy.SpecificIndex]: yield self.SpecificIndex( index_strategy=self, )