Skip to content

Commit

Permalink
Re-upload files if they are missing in storage
Browse files Browse the repository at this point in the history
  • Loading branch information
rzvoncek committed Mar 14, 2024
1 parent b0cc958 commit 3d2b9c9
Show file tree
Hide file tree
Showing 11 changed files with 182 additions and 53 deletions.
81 changes: 47 additions & 34 deletions medusa/backup_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import pathlib
import time
import traceback
import typing as t
import psutil

from retrying import retry
Expand All @@ -43,13 +44,6 @@ def __init__(self, *, node_backup, differential_mode, enable_md5_checks,
self._backup_name = node_backup.name
self._bucket_name = node_backup.storage.config.bucket_name
self._data_path = node_backup.data_path
self._cached_objects = {
(section['keyspace'], section['columnfamily']): {
self._sanitize_file_path(pathlib.Path(object['path'])): object
for object in section['objects']
}
for section in json.loads(node_backup.manifest)
}
self._differential_mode = differential_mode
else:
self._node_backup_cache_is_differential = False
Expand Down Expand Up @@ -81,49 +75,60 @@ def replaced(self):
def backup_name(self):
return self._backup_name

def replace_or_remove_if_cached(self, *, keyspace, columnfamily, srcs):
retained = list()
skipped = list()
@property
def is_differential(self):
return self._differential_mode

def replace_or_remove_if_cached(
self,
*,
files_in_storage: t.Dict[str, t.Dict[str, t.Set[ManifestObject]]],
keyspace: str,
columnfamily: str,
srcs: t.List[pathlib.Path]
):
needs_backup = list()
already_backed_up = list()

path_prefix = self._storage_driver.get_path_prefix(self._data_path)
table_files_in_storage = files_in_storage.get(keyspace, {}).get(columnfamily, {})

for src in srcs:
if src.name in self.NEVER_BACKED_UP:
pass
else:
fqtn = (keyspace, columnfamily)
cached_item = None
if self._storage_provider.lower() == 'google_storage' or self._differential_mode is True:
cached_item = self._cached_objects.get(fqtn, {}).get(self._sanitize_file_path(src))
cached_item: ManifestObject = None

# see if this particular file has already been backed up
if self._differential_mode is True:
cached_item = table_files_in_storage.get(self._sanitize_file_path(src), None)

# if it wasn't, or we now have a file with same name but different size or hash, then back it up
threshold = self._storage_config.multi_part_upload_threshold
if cached_item is None or not self._storage_driver.file_matches_cache(src,
cached_item,
threshold,
self._enable_md5_checks):
# We have no matching object in the cache matching the file
retained.append(src)
if cached_item is None:
needs_backup.append(src)
elif not self._storage_driver.file_matches_cache(src, cached_item, threshold, self._enable_md5_checks):
needs_backup.append(src)
# file was already present in the previous (differential) backup
else:
# File was already present in the previous backup
# In case the backup isn't differential or the cache backup isn't differential, copy from cache
if self._differential_mode is False and self._node_backup_cache_is_differential is False:
prefixed_path = '{}{}'.format(path_prefix, cached_item['path'])
prefixed_path = '{}{}'.format(path_prefix, cached_item.path)
cached_item_path = self._storage_driver.get_cache_path(prefixed_path)
retained.append(cached_item_path)
needs_backup.append(cached_item_path)
# This backup is differential, but the cached one wasn't
# We must re-upload the files according to the differential format
elif self._differential_mode is True and self._node_backup_cache_is_differential is False:
retained.append(src)
needs_backup.append(src)
else:
# in case the backup is differential, we want to rule out files, not copy them from cache
manifest_object = self._make_manifest_object(path_prefix, cached_item)
logging.debug("Skipping upload of {} which was already part of the previous backup"
.format(cached_item['path']))
skipped.append(manifest_object)
logging.debug(
f"Skipping upload of {cached_item.path} which was already part of the previous backup"
)
already_backed_up.append(cached_item)
self._replaced += 1

return retained, skipped

def _make_manifest_object(self, path_prefix, cached_item):
return ManifestObject('{}{}'.format(path_prefix, cached_item['path']), cached_item['size'], cached_item['MD5'])
return needs_backup, already_backed_up


def throttle_backup():
Expand Down Expand Up @@ -362,10 +367,18 @@ def update_monitoring(actual_backup_duration, backup_name, monitoring, node_back
def backup_snapshots(storage, manifest, node_backup, node_backup_cache, snapshot):
try:
num_files = 0

if node_backup_cache.is_differential:
logging.info(f'Listing already backed up files for node {node_backup.fqdn}')
files_in_storage = storage.list_files_per_table()
else:
files_in_storage = dict()

for snapshot_path in snapshot.find_dirs():
logging.debug("Backing up {}".format(snapshot_path))
logging.info(f"Backing up {snapshot_path.keyspace}.{snapshot_path.columnfamily}")

(needs_backup, already_backed_up) = node_backup_cache.replace_or_remove_if_cached(
needs_backup, already_backed_up = node_backup_cache.replace_or_remove_if_cached(
files_in_storage=files_in_storage,
keyspace=snapshot_path.keyspace,
columnfamily=snapshot_path.columnfamily,
srcs=list(snapshot_path.list_files()))
Expand Down
28 changes: 28 additions & 0 deletions medusa/storage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@
import operator
import pathlib
import re
import typing as t

from retrying import retry

import medusa.index

from medusa.storage.cluster_backup import ClusterBackup
from medusa.storage.node_backup import NodeBackup
from medusa.storage.abstract_storage import ManifestObject
from medusa.storage.google_storage import GoogleStorage
from medusa.storage.local_storage import LocalStorage
from medusa.storage.s3_storage import S3Storage
Expand Down Expand Up @@ -444,3 +446,29 @@ def remove_latest_backup_marker(self, fqdn):

def delete_objects(self, objects, concurrent_transfers=None):
self.storage_driver.delete_objects(objects, concurrent_transfers)

@staticmethod
def get_keyspace_and_table(manifest_object: ManifestObject) -> t.Tuple[str, str, ManifestObject]:
p = pathlib.Path(manifest_object.path)
# 2i tables or the dse internal folder, we merge table and index name as a new table
if p.parent.name.startswith('.') or p.parent.name.endswith('nodes'):
keyspace, table, file = p.parent.parent.parent.name, f"{p.parent.parent.name}.{p.parent.name}", p.name
else:
keyspace, table, file = p.parent.parent.name, p.parent.name, p.name
return keyspace, table, manifest_object

def list_files_per_table(self) -> t.Dict[str, t.Dict[str, t.Set[ManifestObject]]]:
if self.config.prefix != '':
prefix = f"{self.config.prefix}/"
else:
prefix = ""
fdns_data_prefix = f"{prefix}{self.config.fqdn}/data/"
all_files: t.List[ManifestObject] = self.storage_driver.list_blobs(prefix=fdns_data_prefix)
keyspace_table_mo_tuples = map(Storage.get_keyspace_and_table, all_files)

files_by_keyspace_and_table = dict()
for ks, ks_files in itertools.groupby(keyspace_table_mo_tuples, lambda t: t[0]):
files_by_keyspace_and_table[ks] = dict()
for tt, t_files in itertools.groupby(ks_files, lambda tf: tf[1]):
files_by_keyspace_and_table[ks][tt] = {tf[2] for tf in t_files}
return files_by_keyspace_and_table
3 changes: 2 additions & 1 deletion medusa/storage/abstract_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import hashlib
import io
import logging
import pathlib
import typing as t

from pathlib import Path
Expand Down Expand Up @@ -401,7 +402,7 @@ def blob_matches_manifest(blob, object_in_manifest, enable_md5_checks=False):

@staticmethod
@abc.abstractmethod
def file_matches_cache(src, cached_item, threshold=None, enable_md5_checks=False):
def file_matches_cache(src: pathlib.Path, cached_item: ManifestObject, threshold=None, enable_md5_checks=False):
"""
Compares a local file with its entry in the cache of backed up items. This happens when doing an actual backup.
Expand Down
7 changes: 4 additions & 3 deletions medusa/storage/azure_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import json
import logging
import os
import pathlib
import typing as t

from azure.core.credentials import AzureNamedKeyCredential
Expand Down Expand Up @@ -233,12 +234,12 @@ def blob_matches_manifest(blob, object_in_manifest, enable_md5_checks=False):
)

@staticmethod
def file_matches_cache(src, cached_item, threshold=None, enable_md5_checks=False):
def file_matches_cache(src: pathlib.Path, cached_item: ManifestObject, threshold=None, enable_md5_checks=False):
return AzureStorage.compare_with_manifest(
actual_size=src.stat().st_size,
size_in_manifest=cached_item['size'],
size_in_manifest=cached_item.size,
actual_hash=AbstractStorage.generate_md5_hash(src) if enable_md5_checks else None,
hash_in_manifest=cached_item['MD5'],
hash_in_manifest=cached_item.MD5,
)

@staticmethod
Expand Down
7 changes: 4 additions & 3 deletions medusa/storage/google_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pathlib

import aiohttp
import base64
Expand Down Expand Up @@ -254,12 +255,12 @@ def blob_matches_manifest(blob, object_in_manifest, enable_md5_checks=False):
)

@staticmethod
def file_matches_cache(src, cached_item, threshold=None, enable_md5_checks=False):
def file_matches_cache(src: pathlib.Path, cached_item: ManifestObject, threshold=None, enable_md5_checks=False):
return GoogleStorage.compare_with_manifest(
actual_size=src.stat().st_size,
size_in_manifest=cached_item['size'],
size_in_manifest=cached_item.size,
actual_hash=AbstractStorage.generate_md5_hash(src) if enable_md5_checks else None,
hash_in_manifest=cached_item['MD5']
hash_in_manifest=cached_item.MD5
)

@staticmethod
Expand Down
5 changes: 3 additions & 2 deletions medusa/storage/local_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io
import logging
import os
import pathlib
import typing as t
from pathlib import Path

Expand Down Expand Up @@ -186,10 +187,10 @@ def blob_matches_manifest(blob, object_in_manifest, enable_md5_checks=False):
)

@staticmethod
def file_matches_cache(src, cached_item, threshold=None, enable_md5_checks=False):
def file_matches_cache(src: pathlib.Path, cached_item: ManifestObject, threshold=None, enable_md5_checks=False):
return LocalStorage.compare_with_manifest(
actual_size=src.stat().st_size,
size_in_manifest=cached_item['size']
size_in_manifest=cached_item.size
)

@staticmethod
Expand Down
8 changes: 5 additions & 3 deletions medusa/storage/s3_base_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
# limitations under the License.
import asyncio
import base64
import pathlib

import boto3
import botocore.session
import concurrent.futures
Expand Down Expand Up @@ -426,7 +428,7 @@ def blob_matches_manifest(blob: AbstractBlob, object_in_manifest: dict, enable_m
)

@staticmethod
def file_matches_cache(src, cached_item, threshold=None, enable_md5_checks=False):
def file_matches_cache(src: pathlib.Path, cached_item: ManifestObject, threshold=None, enable_md5_checks=False):

threshold = int(threshold) if threshold else -1

Expand All @@ -440,9 +442,9 @@ def file_matches_cache(src, cached_item, threshold=None, enable_md5_checks=False

return S3BaseStorage.compare_with_manifest(
actual_size=src.stat().st_size,
size_in_manifest=cached_item['size'],
size_in_manifest=cached_item.size,
actual_hash=md5_hash,
hash_in_manifest=cached_item['MD5'],
hash_in_manifest=cached_item.MD5,
threshold=threshold
)

Expand Down
4 changes: 3 additions & 1 deletion medusa/storage/s3_rgw.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pathlib

from medusa.storage.abstract_storage import ManifestObject
from medusa.storage.s3_base_storage import S3BaseStorage
from medusa.storage.s3_storage import S3Storage

Expand All @@ -28,7 +30,7 @@ def blob_matches_manifest(blob, object_in_manifest, enable_md5_checks=False):
return S3Storage.blob_matches_manifest(blob, object_in_manifest, enable_md5_checks)

@staticmethod
def file_matches_cache(src, cached_item, threshold=None, enable_md5_checks=False):
def file_matches_cache(src: pathlib.Path, cached_item: ManifestObject, threshold=None, enable_md5_checks=False):
# for S3RGW, we never set threshold so the S3's multipart never happens
return S3Storage.file_matches_cache(src, cached_item, None, enable_md5_checks)

Expand Down
23 changes: 22 additions & 1 deletion tests/integration/features/integration_tests.feature
Original file line number Diff line number Diff line change
Expand Up @@ -850,7 +850,7 @@ Feature: Integration tests
When I load 100 rows in the "medusa.test" table
When I perform a backup in "full" mode of the node named "first_backup" with md5 checks "disabled"
Then I can verify the backup named "first_backup" with md5 checks "enabled" successfully
And I delete a random sstable from backup "first_backup" in the "test" table in keyspace "medusa"
And I delete a random sstable from "full" backup "first_backup" in the "test" table in keyspace "medusa"
Then verifying backup "first_backup" fails
When I delete the backup named "first_backup"
Then I cannot see the backup named "first_backup" when I list the backups
Expand Down Expand Up @@ -1093,3 +1093,24 @@ Feature: Integration tests
Examples: S3 storage
| storage | client encryption |
| s3_us_west_oregon | without_client_encryption |

@30
Scenario Outline: Create an differential backup, corrupt it, then fix by doing another backup, and verify it
Given I have a fresh ccm cluster "<client encryption>" running named "scenario30"
Given I am using "<storage>" as storage provider in ccm cluster "<client encryption>"
When I create the "test" table in keyspace "medusa"
When I load 100 rows in the "medusa.test" table
When I perform a backup in "differential" mode of the node named "first_backup" with md5 checks "disabled"
Then I can verify the backup named "first_backup" with md5 checks "enabled" successfully
And I delete a random sstable from "differential" backup "first_backup" in the "test" table in keyspace "medusa"
Then verifying backup "first_backup" fails
When I perform a backup in "differential" mode of the node named "second_backup" with md5 checks "disabled"
Then I can verify the backup named "first_backup" with md5 checks "enabled" successfully
Then I can verify the backup named "second_backup" with md5 checks "enabled" successfully
When I delete the backup named "first_backup"
Then I cannot see the backup named "first_backup" when I list the backups

@local
Examples: Local storage
| storage | client encryption |
| local | with_client_encryption |
16 changes: 11 additions & 5 deletions tests/integration/features/steps/integration_steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -1401,15 +1401,21 @@ def _the_backup_named_is_incomplete(context, backup_name):
assert not backup.finished


@then(u'I delete a random sstable from backup "{backup_name}" in the "{table}" table in keyspace "{keyspace}"')
def _i_delete_a_random_sstable(context, backup_name, table, keyspace):
@then(u'I delete a random sstable from "{backup_type}" backup "{backup_name}" '
u'in the "{table}" table in keyspace "{keyspace}"')
def _i_delete_a_random_sstable(context, backup_type, backup_name, table, keyspace):
with Storage(config=context.medusa_config.storage) as storage:
path_root = BUCKET_ROOT

fqdn = "127.0.0.1"
path_sstables = "{}/{}{}/{}/data/{}/{}*".format(
path_root, storage.prefix_path, fqdn, backup_name, keyspace, table
)
if backup_type == "full":
path_sstables = "{}/{}{}/{}/data/{}/{}*".format(
path_root, storage.prefix_path, fqdn, backup_name, keyspace, table
)
else:
path_sstables = "{}/{}{}/data/{}/{}*".format(
path_root, storage.prefix_path, fqdn, keyspace, table
)

table_path = glob.glob(path_sstables)[0]
sstable_files = os.listdir(table_path)
Expand Down
Loading

0 comments on commit 3d2b9c9

Please sign in to comment.