Skip to content

Commit

Permalink
wip.....
Browse files Browse the repository at this point in the history
  • Loading branch information
aaxelb committed Jan 7, 2025
1 parent 86ef59f commit 5e65e56
Show file tree
Hide file tree
Showing 9 changed files with 86 additions and 66 deletions.
16 changes: 9 additions & 7 deletions _TODO_multindex.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ IndexStrategy revamp plan/log:

- update existing classes to dataclasses
- IndexStrategy (each instance represents a current or past version of the strategy, identified by subname)
- name (existing attr)
- subname (new; default from CURRENT_STRATEGY_CHECKSUM)
- IndexStrategy.SpecificIndex (each strategy has a set of indexes with unique short-names)
- strategy_name (rename existing `name` attr for disambig)
- strategy_check (new; default CURRENT_STRATEGY_CHECKSUM but may be parsed from or index name or `indexStrategy` query param)
- IndexStrategy.SpecificIndex
- index_strategy (existing)
- short_indexname (new; unique within a strategy)
- subname (new; unique within a strategy)
- (base SpecificIndex now mainly inward, focused on constructing index names and checking index status)

- move search methods from IndexStrategy.SpecificIndex to IndexStrategy
Expand All @@ -19,6 +19,7 @@ IndexStrategy revamp plan/log:
- for_specific_index
- for_current_index
- SpecificIndex.pls_setup
- SpecificIndex.pls_check_exists
- SpecificIndex.pls_handle_cardsearch
- SpecificIndex.pls_handle_valuesearch
- SpecificIndex.pls_refresh
Expand All @@ -30,16 +31,17 @@ IndexStrategy revamp plan/log:
- add replacement multiindex methods to IndexStrategy (and friends)
- (classmethod) each_existing_index (based on index names from elastic; may be any hex)
- each_named_index (includes non-existent; )
- index_shortname_set
- get_index_by_shortname
- get_index_by_subname
- subnames
- is_current
- pls_setup
- pls_check_exists
- pls_start_keeping_live
- pls_teardown
- pls_handle_cardsearch
- pls_handle_valuesearch
- pls_ensure_fresh
- Elastic8IndexStrategy.index_definitions (abstractmethod)
- Elastic8IndexStrategy.each_index_definition (abstractmethod)
- Elastic8IndexStrategy.each_named_index (based on index_definitions)

- update existing base methods
Expand Down
23 changes: 14 additions & 9 deletions share/admin/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,23 +52,28 @@ def _mappings_url_prefix():


def _index_status_by_strategy():
backfill_by_indexname: dict[str, IndexBackfill] = {
backfill.specific_indexname: backfill
for backfill in (
_backfill_by_checksum: dict[str, IndexBackfill] = {
_backfill.strategy_checksum: _backfill
for _backfill in (
IndexBackfill.objects
.filter(index_strategy_name__in=index_strategy.all_index_strategies().keys())
)
}
status_by_strategy = {}
_messenger = IndexMessenger()
for _index_strategy in index_strategy.all_index_strategies().values():
current_index = _index_strategy.for_current_index()
_current_backfill = _backfill_by_checksum.get(
str(_index_strategy.CURRENT_STRATEGY_CHECKSUM),
)
status_by_strategy[_index_strategy.name] = {
'current': {
'status': current_index.pls_get_status(),
'status': [
_index.pls_get_status()
for _index in _index_strategy.each_current_index()
],
'backfill': _serialize_backfill(
current_index,
backfill_by_indexname.get(current_index.indexname),
_backfill_by_checksum.get(current_index.indexname),
),
},
'prior': sorted((
Expand All @@ -91,14 +96,14 @@ def _index_status_by_strategy():


def _serialize_backfill(
specific_index: index_strategy.IndexStrategy.SpecificIndex,
strategy: index_strategy.IndexStrategy,
backfill: IndexBackfill | None,
):
if not specific_index.is_current:
if not strategy.is_current:
return {}
if not backfill:
return {
'can_start_backfill': specific_index.pls_check_exists(),
'can_start_backfill': strategy.pls_check_exists(),
}
return {
'backfill_status': backfill.backfill_status,
Expand Down
8 changes: 4 additions & 4 deletions share/models/index_backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class IndexBackfill(models.Model):
)
backfill_status = models.TextField(choices=BACKFILL_STATUS_CHOICES, default=INITIAL)
index_strategy_name = models.TextField(unique=True)
specific_indexname = models.TextField()
strategy_checksum = models.TextField()
error_type = models.TextField(blank=True)
error_message = models.TextField(blank=True)
error_context = models.TextField(blank=True)
Expand Down Expand Up @@ -77,13 +77,13 @@ def mutex(self):
def pls_start(self, index_strategy):
with self.mutex() as locked_self:
assert locked_self.index_strategy_name == index_strategy.name
current_index = index_strategy.for_current_index()
if locked_self.specific_indexname == current_index.indexname:
_current_checksum = str(index_strategy.CURRENT_STRATEGY_CHECKSUM)
if locked_self.strategy_checksum == _current_checksum:
# what is "current" has not changed -- should be INITIAL
assert locked_self.backfill_status == IndexBackfill.INITIAL
else:
# what is "current" has changed! disregard backfill_status
locked_self.specific_indexname = current_index.indexname
locked_self.strategy_checksum = _current_checksum
locked_self.backfill_status = IndexBackfill.INITIAL
locked_self.__update_error(None)
try:
Expand Down
7 changes: 4 additions & 3 deletions share/search/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,11 +232,12 @@ def _the_loop_itself(self):
def _raise_if_backfill_noncurrent(self):
if self.message_type.is_backfill:
index_backfill = self.index_strategy.get_or_create_backfill()
if index_backfill.specific_indexname != self.index_strategy.current_indexname:
_current_checksum = str(self.index_strategy.CURRENT_STRATEGY_CHECKSUM)
if index_backfill.strategy_checksum != _current_checksum:
raise exceptions.DaemonSetupError(
'IndexerDaemon observes conflicting currence:'
f'\n\tIndexBackfill (from database) says current is "{index_backfill.specific_indexname}"'
f'\n\tIndexStrategy (from static code) says current is "{self.index_strategy.current_indexname}"'
f'\n\tIndexBackfill (from database) says current is "{index_backfill.strategy_checksum}"'
f'\n\tIndexStrategy (from static code) says current is "{_current_checksum}"'
'\n\t(may be the daemon is running old code -- will die and retry,'
' but if this keeps happening you may need to reset backfill_status'
' to INITIAL and restart the backfill)'
Expand Down
4 changes: 2 additions & 2 deletions share/search/index_strategy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def _iter_all_index_strategies():
yield TrovesearchDenormIndexStrategy(name='trovesearch_denorm')


def parse_strategy_request(self, requested_strategy_name: str) -> IndexStrategy:
def parse_strategy_request(requested_strategy_name: str) -> IndexStrategy:
(_strategyname, *_etc) = requested_strategy_name.split(_INDEXNAME_DELIM)
try:
_strategy = get_index_strategy(
Expand Down Expand Up @@ -74,7 +74,7 @@ def get_specific_index(indexname_or_strategyname: str, *, for_search=False) -> I
except IndexStrategyError:
for _index_strategy in all_index_strategies().values():
try:
return _index_strategy.get_index_by_shortname(indexname_or_strategyname)
return _index_strategy.get_index_by_subname(indexname_or_strategyname)
except IndexStrategyError:
pass
raise IndexStrategyError(f'unrecognized name "{indexname_or_strategyname}"')
Expand Down
29 changes: 15 additions & 14 deletions share/search/index_strategy/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,12 @@
CardsearchHandle,
ValuesearchHandle,
)
from . import _indexnames as indexnames


logger = logging.getLogger(__name__)


_INDEXNAME_DELIM = '__' # used to separate indexnames into a list of meaningful values


@dataclasses.dataclass
class IndexStrategy(abc.ABC):
'''an abstraction for indexes in different places and ways.
Expand All @@ -46,13 +44,13 @@ class IndexStrategy(abc.ABC):
CURRENT_STRATEGY_CHECKSUM: typing.ClassVar[ChecksumIri] # set on subclasses to protect against accidents

name: str
subname: str = '' # if unspecified, uses current
subname: str = '' # if unspecified, uses current checksum

def __post_init__(self):
if _INDEXNAME_DELIM in self.name:
raise IndexStrategyError(f'strategy name may not contain "{_INDEXNAME_DELIM}" (got "{self.name}")')
indexnames.raise_if_invalid_indexname_part(self.name)
if not self.subname:
self.subname = self.CURRENT_STRATEGY_CHECKSUM.hexdigest
indexnames.raise_if_invalid_indexname_part(self.subname)

@property
def nonurgent_messagequeue_name(self) -> str:
Expand All @@ -64,8 +62,7 @@ def urgent_messagequeue_name(self) -> str:

@property
def indexname_prefix(self) -> str:
# note: ends with _INDEXNAME_DELIM
return _INDEXNAME_DELIM.join((self.name, self.subname, ''))
return indexnames.combine_indexname_parts(self.name, self.subname)

@property
def indexname_wildcard(self) -> str:
Expand All @@ -78,10 +75,10 @@ def is_current(self) -> bool:
@functools.cached_property
def all_current_indexnames(self) -> tuple[str, ...]:
self.assert_strategy_is_current()
_single_indexname = ''.join((
_single_indexname = indexnames.combine_indexname_parts(
self.indexname_prefix,
self.CURRENT_STRATEGY_CHECKSUM.hexdigest,
))
)
return (_single_indexname,)

def assert_message_type(self, message_type: messages.MessageType):
Expand All @@ -106,11 +103,12 @@ def assert_strategy_is_current(self):
def with_hex(self, subname: str):
return dataclasses.replace(self, subname=subname)

def get_index_by_shortname(self, shortname: str) -> typing.Self.SpecificIndex:
def get_index_by_subname(self, shortname: str) -> IndexStrategy.SpecificIndex:
return self.SpecificIndex(self, shortname) # type: ignore[abstract]

def for_current_index(self) -> IndexStrategy.SpecificIndex:
return self.get_index_by_shortname(self.current_indexname)
def each_current_index(self) -> typing.Iterator[IndexStrategy.SpecificIndex]:
for _subname in self.:
yield self.get_index_by_subname(_subname)

def get_or_create_backfill(self):
(index_backfill, _) = IndexBackfill.objects.get_or_create(index_strategy_name=self.name)
Expand Down Expand Up @@ -177,7 +175,10 @@ def is_current(self) -> bool:

@property
def indexname(self) -> str:
return f'{self.index_strategy.indexname_prefix}{self.short_indexname}'
return indexnames.combine_indexname_parts(
self.index_strategy.indexname_prefix,
self.short_indexname,
)

def pls_setup(self, *, skip_backfill=False):
assert self.is_current, 'cannot setup a non-current index'
Expand Down
21 changes: 21 additions & 0 deletions share/search/index_strategy/_indexnames.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from share.search.exceptions import IndexStrategyError


INDEXNAME_DELIM = '__' # used to separate indexnames into a list of meaningful values


def is_valid_indexname_part(indexname_part: str) -> bool:
return bool(INDEXNAME_DELIM not in indexname_part)


def raise_if_invalid_indexname_part(indexname_part: str) -> None:
if INDEXNAME_DELIM in indexname_part:
raise IndexStrategyError(f'name may not contain "{INDEXNAME_DELIM}" (got "{indexname_part}")')


def combine_indexname_parts(*indexname_parts: str) -> str:
return INDEXNAME_DELIM.join(indexname_parts)


def parse_indexname_parts(name: str) -> list[str]:
return name.split(INDEXNAME_DELIM)
20 changes: 10 additions & 10 deletions share/search/index_strategy/elastic8.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,8 @@ class IndexDefinition:
# abstract methods for subclasses to implement
@abc.abstractmethod
@classmethod
def index_definitions(cls) -> typing.Iterator[IndexDefinition]:
...

def each_named_index(self):
for _index_def in self.each_index_definition():
yield self.get_index_by_shortname('iris')
def index_definitions(cls) -> dict[str, IndexDefinition]:
raise NotImplementedError

@abc.abstractmethod
def index_settings(self):
Expand Down Expand Up @@ -135,6 +131,10 @@ def compute_strategy_checksum(self):
},
)

def each_named_index(self):
for _subname, _index_def in self.index_definitions().items():
yield self.get_index_by_subname('iris')

# abstract method from IndexStrategy
def each_existing_index(self):
indexname_set = set(
Expand All @@ -144,7 +144,7 @@ def each_existing_index(self):
)
indexname_set.add(self.current_indexname)
for indexname in indexname_set:
yield self.get_index_by_shortname(indexname)
yield self.get_index_by_subname(indexname)

# abstract method from IndexStrategy
def pls_handle_messages_chunk(self, messages_chunk):
Expand Down Expand Up @@ -208,7 +208,7 @@ def pls_make_default_for_searching(self, specific_index: IndexStrategy.SpecificI
def pls_get_default_for_searching(self) -> IndexStrategy.SpecificIndex:
# a SpecificIndex for an alias will work fine for searching, but
# will error if you try to invoke lifecycle hooks
return self.get_index_by_shortname(self._alias_for_searching)
return self.get_index_by_subname(self._alias_for_searching)

# override from IndexStrategy
def pls_mark_backfill_complete(self):
Expand All @@ -218,11 +218,11 @@ def pls_mark_backfill_complete(self):

@property
def _alias_for_searching(self):
return f'{self.indexname_prefix}search'
return f'{self.indexname_prefix}__search'

@property
def _alias_for_keeping_live(self):
return f'{self.indexname_prefix}live'
return f'{self.indexname_prefix}__live'

def _elastic_actions_with_index(self, messages_chunk, indexnames, action_tracker: _ActionTracker):
if not indexnames:
Expand Down
24 changes: 7 additions & 17 deletions share/search/index_strategy/trovesearch_denorm.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,28 +60,18 @@ class TrovesearchDenormIndexStrategy(Elastic8IndexStrategy):
)

@classmethod
def each_index_definition(cls) -> typing.Iterator[Elastic8IndexStrategy.IndexDefiniton]:
def each_index_definition(cls) -> dict[str, Elastic8IndexStrategy.IndexDefiniton]:
yield Elastic8IndexStrategy.IndexDefinition(
subname='card',
settings=cls._index_settings(),
mappings=cls._card_index_mappings(),
)
yield strategy.SpecificIndex(
index_strategy=strategy,
shortname='card',
elastic8_index_settings=cls._index_settings(),
elastic8_index_mappings=cls._card_index_mappings(),
)
yield self.SpecificIndex(
index_strategy=strategy,
shortname='iris',
elastic8_index_settings=cls._index_settings(),
elastic8_index_mappings=cls._iris_index_mappings(),
)

yield Elastic8IndexDefinition(
yield Elastic8IndexStrategy.IndexDefinition(
subname='value',
settings=cls._index_settings(),
mappings=cls._card_index_mappings(),
mappings=cls._value_index_mappings(),
)

# abstract method from IndexStrategy
@property
def supported_message_types(self):
Expand All @@ -95,7 +85,7 @@ def supported_message_types(self):
def backfill_message_type(self):
return messages.MessageType.BACKFILL_INDEXCARD

def each_index(self) -> typing.Iterator[TrovesearchDenormIndexStrategy.SpecificIndex]:
def each_index(self) -> Iterator[TrovesearchDenormIndexStrategy.SpecificIndex]:
yield self.SpecificIndex(
index_strategy=self,
)
Expand Down

0 comments on commit 5e65e56

Please sign in to comment.