Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cannot recover when any changelog topic partition becomes empty (as a result of some retention policy) #597

Open
2 tasks done
cristianmatache opened this issue Dec 29, 2023 · 0 comments

Comments

@cristianmatache
Copy link

cristianmatache commented Dec 29, 2023

Checklist

  • I have included information about relevant versions
  • I have verified that the issue persists when using the master branch of Faust.

Steps to reproduce

  • if a topic partition is not empty: low watermark = minimum/earliest available offset, high watermark = maximum/latest available offset +1
  • if a topic partition is empty low watermark = high watermark

A changelog topic can become empty as a result of a Kafka cleanup policy (i.e., time/sized-based retention)
The case when the topic is empty is not handled properly in Faust recovery.

The recovery service needs to replay messages between low watermark (earliest offset) to high watermark - 1 (latest offset). Faust does this for the active and the standby partitions. Afterwards, it runs some consistency checks.

Active partitions

Let's start with the active partitions:

  • Building highwaters for active partitions
    self.log.dev("Build highwaters for active partitions")
    await self._wait(
    T(self._build_highwaters)(
    consumer, assigned_active_tps, active_highwaters, "active"
    ),
    timeout=self.app.conf.broker_request_timeout,
    )
    • async def _build_highwaters(
      self, consumer: ConsumerT, tps: Set[TP], destination: Counter[TP], title: str
      ) -> None:
      # -- Build highwater
      highwaters = await consumer.highwaters(*tps)
      highwaters = {
      # FIXME the -1 here is because of the way we commit offsets
      tp: value - 1 if value is not None else -1
      for tp, value in highwaters.items()
      }

      If the partition is empty high - 1 does not exist, and the recovery will fail. There is even a FIXME in building the highwaters. In my opinion, it would be better to also get the low watermarks and -1 if high is None or low == high else high - 1
  • Building earliest offsets for active partitions
    await self._wait(
    T(self._build_offsets)(
    consumer, assigned_active_tps, active_offsets, "active"
    ),
    timeout=self.app.conf.broker_request_timeout,
    )
    • Similarly, if the topic partition is empty low - 1 (offset low after the +1 adjustment) would not exist
      async def _build_offsets(
      self, consumer: ConsumerT, tps: Set[TP], destination: Counter[TP], title: str
      ) -> None:
      # -- Update offsets
      # Offsets may have been compacted, need to get to the recent ones
      earliest = await consumer.earliest_offsets(*tps)
      # FIXME To be consistent with the offset -1 logic
      earliest = {
      tp: offset - 1 if offset is not None else None
      for tp, offset in earliest.items()
      }

      In my opinion, this could be None if offset is None else min(offset, highwaters.get(offset, offset))

Standby partitions

Moreover, recovering standby partitions has a separate issue in the consistency checks. First, let's see what is the sequence of steps for active partitions such that we can draw a parallel.

Active:

  • Find latest/max offsets, min/earliest offsets, run consistency checks and seek to offset
    self.log.dev("Build highwaters for active partitions")
    await self._wait(
    T(self._build_highwaters)(
    consumer, assigned_active_tps, active_highwaters, "active"
    ),
    timeout=self.app.conf.broker_request_timeout,
    )
    self.log.dev("Build offsets for active partitions")
    await self._wait(
    T(self._build_offsets)(
    consumer, assigned_active_tps, active_offsets, "active"
    ),
    timeout=self.app.conf.broker_request_timeout,
    )
    if self.app.conf.recovery_consistency_check:
    for tp in assigned_active_tps:
    if (

    self.log.dev("Seek offsets for active partitions")
    await self._wait(
    T(self._seek_offsets)(
    consumer, assigned_active_tps, active_offsets, "active"
    ),
    timeout=self.app.conf.broker_request_timeout,

Standby:

  • Find min/earliest offsets, seek to offsets, find max/latest offsets, run consistency checks
    self.log.dev("Build offsets for standby partitions")
    await self._wait(
    T(self._build_offsets)(
    consumer, assigned_standby_tps, standby_offsets, "standby"
    ),
    timeout=self.app.conf.broker_request_timeout,
    )

    if standby_tps:
    self.log.info("Starting standby partitions...")
    self.log.dev("Seek standby offsets")
    await self._wait(
    T(self._seek_offsets)(
    consumer, standby_tps, standby_offsets, "standby"
    ),
    timeout=self.app.conf.broker_request_timeout,
    )
    self.log.dev("Build standby highwaters")
    await self._wait(
    T(self._build_highwaters)(
    consumer,
    standby_tps,
    standby_highwaters,
    "standby",
    ),
    timeout=self.app.conf.broker_request_timeout,
    )
    if self.app.conf.recovery_consistency_check:
    for tp in standby_tps:
    if (
    standby_offsets[tp]
    and standby_highwaters[tp]
    and standby_offsets[tp] > standby_highwaters[tp]
    ):
    raise ConsistencyError(
    E_PERSISTED_OFFSET.format(
    tp,
    standby_offsets[tp],
    standby_highwaters[tp],
    ),
    )

The problem is that after seeking the offsets may be updated asynchronously so by the time the consistency checks run they may no longer hold.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant