Skip to content

Commit

Permalink
Re-upload files if they are missing in storage
Browse files Browse the repository at this point in the history
  • Loading branch information
rzvoncek committed Mar 15, 2024
1 parent 36cc3fd commit ba2ea38
Show file tree
Hide file tree
Showing 12 changed files with 272 additions and 168 deletions.
226 changes: 99 additions & 127 deletions medusa/backup_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import pathlib
import time
import traceback
import typing as t
import psutil

from retrying import retry
Expand All @@ -29,103 +30,10 @@
from medusa.cassandra_utils import Cassandra
from medusa.index import add_backup_start_to_index, add_backup_finish_to_index, set_latest_backup_in_index
from medusa.monitoring import Monitoring
from medusa.storage import Storage, format_bytes_str
from medusa.storage import Storage, format_bytes_str, NodeBackup
from medusa.storage.abstract_storage import ManifestObject


class NodeBackupCache(object):
NEVER_BACKED_UP = ['manifest.json', 'schema.cql']

def __init__(self, *, node_backup, differential_mode, enable_md5_checks,
storage_driver, storage_provider, storage_config):
if node_backup:
self._node_backup_cache_is_differential = node_backup.is_differential
self._backup_name = node_backup.name
self._bucket_name = node_backup.storage.config.bucket_name
self._data_path = node_backup.data_path
self._cached_objects = {
(section['keyspace'], section['columnfamily']): {
self._sanitize_file_path(pathlib.Path(object['path'])): object
for object in section['objects']
}
for section in json.loads(node_backup.manifest)
}
self._differential_mode = differential_mode
else:
self._node_backup_cache_is_differential = False
self._backup_name = None
self._bucket_name = None
self._data_path = ''
self._cached_objects = {}
self._differential_mode = False
self._replaced = 0
self._storage_driver = storage_driver
self._storage_provider = storage_provider
self._storage_config = storage_config
self._enable_md5_checks = enable_md5_checks

def _sanitize_file_path(self, path):
# Secondary indexes are stored as subdirectories to the base table, starting with a dot.
# In order to avoid mixing 2i sstables with the base table sstables, the file name isn't enough
# to perform the comparison on differential backups. We need to retain the subdir name for 2i tables.
if path.parts[-2].startswith('.'):
return os.path.join(path.parts[-2], path.parts[-1])
else:
return path.name

@property
def replaced(self):
return self._replaced

@property
def backup_name(self):
return self._backup_name

def replace_or_remove_if_cached(self, *, keyspace, columnfamily, srcs):
retained = list()
skipped = list()
path_prefix = self._storage_driver.get_path_prefix(self._data_path)
for src in srcs:
if src.name in self.NEVER_BACKED_UP:
pass
else:
fqtn = (keyspace, columnfamily)
cached_item = None
if self._storage_provider.lower() == 'google_storage' or self._differential_mode is True:
cached_item = self._cached_objects.get(fqtn, {}).get(self._sanitize_file_path(src))

threshold = self._storage_config.multi_part_upload_threshold
if cached_item is None or not self._storage_driver.file_matches_cache(src,
cached_item,
threshold,
self._enable_md5_checks):
# We have no matching object in the cache matching the file
retained.append(src)
else:
# File was already present in the previous backup
# In case the backup isn't differential or the cache backup isn't differential, copy from cache
if self._differential_mode is False and self._node_backup_cache_is_differential is False:
prefixed_path = '{}{}'.format(path_prefix, cached_item['path'])
cached_item_path = self._storage_driver.get_cache_path(prefixed_path)
retained.append(cached_item_path)
# This backup is differential, but the cached one wasn't
# We must re-upload the files according to the differential format
elif self._differential_mode is True and self._node_backup_cache_is_differential is False:
retained.append(src)
else:
# in case the backup is differential, we want to rule out files, not copy them from cache
manifest_object = self._make_manifest_object(path_prefix, cached_item)
logging.debug("Skipping upload of {} which was already part of the previous backup"
.format(cached_item['path']))
skipped.append(manifest_object)
self._replaced += 1

return retained, skipped

def _make_manifest_object(self, path_prefix, cached_item):
return ManifestObject('{}{}'.format(path_prefix, cached_item['path']), cached_item['size'], cached_item['MD5'])


def throttle_backup():
"""
Makes sure to only use idle IO for backups
Expand Down Expand Up @@ -198,7 +106,7 @@ def handle_backup(config, backup_name_arg, stagger_time, enable_md5_checks_flag,

logging.debug("Done with backup, returning backup result information")
return (info["actual_backup_duration"], info["actual_start_time"], info["end_time"],
info["node_backup"], info["node_backup_cache"], info["num_files"],
info["node_backup"], info["num_files"], info["num_replaced"],
info["start_time"], info["backup_name"])

except Exception as e:
Expand Down Expand Up @@ -250,20 +158,21 @@ def start_backup(storage, node_backup, cassandra, differential_mode, stagger_tim
# Perform the actual backup
actual_start = datetime.datetime.now()
enable_md5 = enable_md5_checks_flag or medusa.utils.evaluate_boolean(config.checks.enable_md5_checks)
num_files, node_backup_cache = do_backup(
cassandra, node_backup, storage, differential_mode, enable_md5, config, backup_name)
num_files, num_replaced = do_backup(
cassandra, node_backup, storage, enable_md5, backup_name
)
end = datetime.datetime.now()
actual_backup_duration = end - actual_start

print_backup_stats(actual_backup_duration, actual_start, end, node_backup, node_backup_cache, num_files, start)
print_backup_stats(actual_backup_duration, actual_start, end, node_backup, num_files, num_replaced, start)
update_monitoring(actual_backup_duration, backup_name, monitoring, node_backup)
return {
"actual_backup_duration": actual_backup_duration,
"actual_start_time": actual_start,
"end_time": end,
"node_backup": node_backup,
"node_backup_cache": node_backup_cache,
"num_files": num_files,
"num_replaced": num_replaced,
"start_time": start,
"backup_name": backup_name
}
Expand All @@ -286,39 +195,31 @@ def get_server_type_and_version(cassandra):
return server_type, release_version


def do_backup(cassandra, node_backup, storage, differential_mode, enable_md5_checks,
config, backup_name):
# Load last backup as a cache
node_backup_cache = NodeBackupCache(
node_backup=storage.latest_node_backup(fqdn=config.storage.fqdn),
differential_mode=differential_mode,
enable_md5_checks=enable_md5_checks,
storage_driver=storage.storage_driver,
storage_provider=storage.storage_provider,
storage_config=config.storage
)
def do_backup(cassandra, node_backup, storage, enable_md5_checks, backup_name):

# the cassandra snapshot we use defines __exit__ that cleans up the snapshot
# so even if exception is thrown, a new snapshot will be created on the next run
# this is not too good and we will use just one snapshot in the future
logging.info('Creating snapshot')
with cassandra.create_snapshot(backup_name) as snapshot:
manifest = []
num_files = backup_snapshots(storage, manifest, node_backup, node_backup_cache, snapshot)
num_files, num_replaced = backup_snapshots(storage, manifest, node_backup, snapshot, enable_md5_checks)

if node_backup.is_dse:
logging.info('Creating DSE snapshot')
with cassandra.create_dse_snapshot(backup_name) as snapshot:
num_files += backup_snapshots(storage, manifest, node_backup, node_backup_cache, snapshot)
dse_num_files, dse_replaced = backup_snapshots(storage, manifest, node_backup, snapshot, enable_md5_checks)
num_files += dse_num_files
num_replaced += dse_replaced

logging.info('Updating backup index')
node_backup.manifest = json.dumps(manifest)
add_backup_finish_to_index(storage, node_backup)
set_latest_backup_in_index(storage, node_backup)
return num_files, node_backup_cache
return num_files, num_replaced


def print_backup_stats(actual_backup_duration, actual_start, end, node_backup, node_backup_cache, num_files, start):
def print_backup_stats(actual_backup_duration, actual_start, end, node_backup, num_files, num_replaced, start):
logging.info('Backup done')

logging.info("""- Started: {:%Y-%m-%d %H:%M:%S}
Expand All @@ -334,13 +235,13 @@ def print_backup_stats(actual_backup_duration, actual_start, end, node_backup, n
))

logging.info('- {} files copied from host'.format(
num_files - node_backup_cache.replaced
num_files - num_replaced
))

if node_backup_cache.backup_name is not None:
if node_backup.name is not None:
logging.info('- {} copied from previous backup ({})'.format(
node_backup_cache.replaced,
node_backup_cache.backup_name
num_replaced,
node_backup.name
))


Expand All @@ -359,40 +260,111 @@ def update_monitoring(actual_backup_duration, backup_name, monitoring, node_back
logging.debug('Done emitting metrics')


def backup_snapshots(storage, manifest, node_backup, node_backup_cache, snapshot):
def backup_snapshots(storage, manifest, node_backup, snapshot, enable_md5_checks):
try:
num_files = 0
for snapshot_path in snapshot.find_dirs():
logging.debug("Backing up {}".format(snapshot_path))
replaced = 0
multipart_threshold = storage.config.multi_part_upload_threshold

if node_backup.is_differential:
logging.info(f'Listing already backed up files for node {node_backup.fqdn}')
files_in_storage = storage.list_files_per_table()
else:
files_in_storage = dict()

(needs_backup, already_backed_up) = node_backup_cache.replace_or_remove_if_cached(
for snapshot_path in snapshot.find_dirs():
fqtn = f"{snapshot_path.keyspace}.{snapshot_path.columnfamily}"
logging.info(f"Backing up {fqtn}")

needs_backup, needs_reupload, already_backed_up = check_already_uploaded(
storage=storage,
node_backup=node_backup,
files_in_storage=files_in_storage,
multipart_threshold=multipart_threshold,
enable_md5_checks=enable_md5_checks,
keyspace=snapshot_path.keyspace,
columnfamily=snapshot_path.columnfamily,
srcs=list(snapshot_path.list_files()))

replaced += len(already_backed_up)
num_files += len(needs_backup) + len(already_backed_up)

dst_path = str(node_backup.datapath(keyspace=snapshot_path.keyspace,
columnfamily=snapshot_path.columnfamily))
dst_path = str(node_backup.datapath(
keyspace=snapshot_path.keyspace,
columnfamily=snapshot_path.columnfamily)
)
logging.debug("Snapshot destination path: {}".format(dst_path))

manifest_objects = list()
if len(needs_backup) > 0:
manifest_objects += storage.storage_driver.upload_blobs(needs_backup, dst_path)

if len(needs_reupload) > 0:
logging.info(
f"Re-uploading {len(needs_reupload)} files in {fqtn}"
)
storage.storage_driver.upload_blobs(needs_reupload, dst_path)

# Reintroducing already backed up objects in the manifest in differential
for obj in already_backed_up:
manifest_objects.append(obj)
if len(already_backed_up) > 0 and node_backup.is_differential:
logging.info(
f"Skipping upload of {len(already_backed_up)} files in {fqtn} because they are already in storage"
)
for obj in already_backed_up:
manifest_objects.append(obj)

manifest.append(make_manifest_object(node_backup.fqdn, snapshot_path, manifest_objects, storage))

return num_files
return num_files, replaced
except Exception as e:
logging.error('Error occurred during backup: {}'.format(str(e)))
traceback.print_exc()
raise e


def check_already_uploaded(
storage: Storage,
node_backup: NodeBackup,
multipart_threshold: int,
enable_md5_checks: bool,
files_in_storage: t.Dict[str, t.Dict[str, t.Dict[str, ManifestObject]]],
keyspace: str,
columnfamily: str,
srcs: t.List[pathlib.Path]
) -> (t.List[pathlib.Path], t.List[ManifestObject]):

NEVER_BACKED_UP = ['manifest.json', 'schema.cql']
needs_backup = list()
needs_reupload = list()
already_backed_up = list()

# in full mode we upload always everything
# TODO make it so that we copy in-storage
if node_backup.is_differential is False:
return [src for src in srcs if src.name not in NEVER_BACKED_UP], needs_reupload, already_backed_up

table_files_in_storage = files_in_storage.get(keyspace, {}).get(columnfamily, {})

for src in srcs:
if src.name in NEVER_BACKED_UP:
continue
else:
item_in_storage = table_files_in_storage.get(src.name, None)
# object is not in storage
if item_in_storage is None:
needs_backup.append(src)
continue
# object is in storage but with different size or digest
storage_driver = storage.storage_driver
if not storage_driver.file_matches_storage(src, item_in_storage, multipart_threshold, enable_md5_checks):
needs_reupload.append(src)
continue
# object is in storage with correct size and digest
already_backed_up.append(item_in_storage)

return needs_backup, needs_reupload, already_backed_up


def make_manifest_object(fqdn, snapshot_path, manifest_objects, storage):
return {
'keyspace': snapshot_path.keyspace,
Expand Down
29 changes: 29 additions & 0 deletions medusa/storage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@
import operator
import pathlib
import re
import typing as t

from retrying import retry

import medusa.index

from medusa.storage.cluster_backup import ClusterBackup
from medusa.storage.node_backup import NodeBackup
from medusa.storage.abstract_storage import ManifestObject, AbstractBlob
from medusa.storage.google_storage import GoogleStorage
from medusa.storage.local_storage import LocalStorage
from medusa.storage.s3_storage import S3Storage
Expand Down Expand Up @@ -444,3 +446,30 @@ def remove_latest_backup_marker(self, fqdn):

def delete_objects(self, objects, concurrent_transfers=None):
self.storage_driver.delete_objects(objects, concurrent_transfers)

@staticmethod
def get_keyspace_and_table(manifest_object: ManifestObject) -> t.Tuple[str, str, ManifestObject]:
p = pathlib.Path(manifest_object.path)
# 2i tables or the dse internal folder, we merge table and index name as a new table
if p.parent.name.startswith('.') or p.parent.name.endswith('nodes'):
keyspace, table = p.parent.parent.parent.name, f"{p.parent.parent.name}.{p.parent.name}"
else:
keyspace, table = p.parent.parent.name, p.parent.name
return keyspace, table, manifest_object

def list_files_per_table(self) -> t.Dict[str, t.Dict[str, t.Set[ManifestObject]]]:
if self.config.prefix != '':
prefix = f"{self.config.prefix}/"
else:
prefix = ""
fdns_data_prefix = f"{prefix}{self.config.fqdn}/data/"
all_blobs: t.List[AbstractBlob] = self.storage_driver.list_blobs(prefix=fdns_data_prefix)
all_files = [ManifestObject(blob.name, blob.size, blob.hash) for blob in all_blobs]
keyspace_table_mo_tuples = map(Storage.get_keyspace_and_table, all_files)

files_by_keyspace_and_table = dict()
for ks, ks_files in itertools.groupby(keyspace_table_mo_tuples, lambda t: t[0]):
files_by_keyspace_and_table[ks] = dict()
for tt, t_files in itertools.groupby(ks_files, lambda tf: tf[1]):
files_by_keyspace_and_table[ks][tt] = {pathlib.Path(tf[2].path).name: tf[2] for tf in t_files}
return files_by_keyspace_and_table
Loading

0 comments on commit ba2ea38

Please sign in to comment.