From 54859e87c80300a961af0e7cbed05a603889fa19 Mon Sep 17 00:00:00 2001 From: James Bensley Date: Sun, 12 Nov 2023 18:47:15 +0100 Subject: [PATCH] Issues #193 - stream redis data (#196) * Stream data out of redis one key at a time * issue-#193 Add support for streaming in and out of REDIS --------- Co-authored-by: James Bensley --- dnas/dnas/redis_db.py | 184 +++++++++++++++++++++++++++++++------ dnas/scripts/redis_mgmt.py | 38 +++++--- docker/README.md | 9 +- docker/manual_day.sh | 4 + docker/manual_range.sh | 4 + redis/backup_redis.sh | 9 +- tox.ini | 2 +- 7 files changed, 198 insertions(+), 52 deletions(-) diff --git a/dnas/dnas/redis_db.py b/dnas/dnas/redis_db.py index c85b06e..3aa7a38 100644 --- a/dnas/dnas/redis_db.py +++ b/dnas/dnas/redis_db.py @@ -1,10 +1,14 @@ +import base64 +import gzip import json -from typing import Union +import logging +from typing import Iterable, Union from dnas.mrt_stats import mrt_stats from dnas.redis_auth import redis_auth # type: ignore from dnas.twitter_msg import twitter_msg -from redis.client import Redis + +from redis import Redis class redis_db: @@ -13,11 +17,13 @@ class redis_db: """ def __init__(self: "redis_db") -> None: - self.r = Redis( + self.r: Redis = Redis( host=redis_auth.host, port=redis_auth.port, password=redis_auth.password, ) + # Check we have connected: + self.ping() def add_to_queue(self: "redis_db", key: str, json_str: str) -> None: """ @@ -32,7 +38,7 @@ def add_to_queue(self: "redis_db", key: str, json_str: str) -> None: if type(json_str) != str: raise TypeError(f"json_str is not a string: {type(json_str)}") - self.r.lpush(key, json_str) + self.r.lpush(key, redis_db.compress(json_str)) def close(self: "redis_db") -> None: """ @@ -40,6 +46,28 @@ def close(self: "redis_db") -> None: """ self.r.close() + @staticmethod + def compress(data: str) -> str: + """ + Gzip compress the import data (result in compressed binary data) + Return a base85 encoded string of the compressed binary data. + """ + compressed = gzip.compress( + data=bytes(data, encoding="utf-8"), compresslevel=9 + ) + b85 = base64.b85encode(compressed) + return b85.decode("utf-8") + + @staticmethod + def decompress(data: str) -> str: + """ + Take in a base85 encoded string, decompress it to the original gzip + binary data, and then decompress that to the original string + """ + compressed = base64.b85decode(data) + uncompressed = gzip.decompress(compressed) + return uncompressed.decode("utf-8") + def del_from_queue(self: "redis_db", key: str, elem: str) -> None: """ Delete an entry from a list of strings. @@ -52,7 +80,7 @@ def del_from_queue(self: "redis_db", key: str, elem: str) -> None: if type(elem) != str: raise TypeError(f"elem is not a string: {type(elem)}") - self.r.lrem(key, 0, elem) + self.r.lrem(key, 0, redis_db.compress(elem)) def delete(self: "redis_db", key: str) -> int: """ @@ -75,6 +103,81 @@ def from_file(self: "redis_db", filename: str) -> None: with open(filename, "r") as f: self.from_json(f.read()) + def from_file_stream(self: "redis_db", filename: str) -> None: + """ + Restore redis DB from JSON file, loading the content one line at time. + """ + if not filename: + raise ValueError( + f"Missing required arguments: filename={filename}" + ) + + loaded_kvs = 0 + with open(filename, "r") as f: + # First character should be "{" to start the dump + opening = f.read(1) + assert opening == "{" + + end = False + while not end: + # Find the start of the next key + char = "" + while char != '"': + char = f.read(1) + # Found the end of the dump + if char == "}": + end = True + break + if end: + break + # Confirm we didn't scan to the end of the file without a match + assert char == '"' + + # Scan in the key name including quote marks + key = char + char = "" + while char != '"': + char = f.read(1) + key += char + assert char == '"' + + # Find the start of the value + char = "" + while char != "{": + char = f.read(1) + assert char == "{" + dict_depth = 1 + + """ + Scan until the end of this dict. + This could be a dict of dicts, so track that the outer most dict + is "closed" + """ + value = '"{' + while dict_depth != 0: + char = f.read(1) + value += char + if char == "{": + dict_depth += 1 + elif char == "}": + dict_depth -= 1 + assert char == "}" + char = f.read(1) + assert char == '"' + value += char + + # Compile the loaded value as a string + json_str = "{" + key + ": " + value + "}" + # Parse the string to check it's valid + json_dict: dict = json.loads(json_str) + # Load it into REDIS + k = list(json_dict.keys())[0] + v = list(json_dict.values())[0] + self.r.set(k, redis_db.compress(v)) + loaded_kvs += 1 + + logging.info(f"Loaded {loaded_kvs} k/v's from stream") + def from_json(self: "redis_db", json_str: str): """ Restore redis DB from a JSON string @@ -86,7 +189,8 @@ def from_json(self: "redis_db", json_str: str): json_dict = json.loads(json_str) for k in json_dict.keys(): - self.r.set(k, json_dict[k]) + self.r.set(k, redis_db.compress(json_dict[k])) + logging.info(f"Loaded {len(json_dict)} k/v's") def get(self: "redis_db", key: str) -> Union[str, list]: """ @@ -99,13 +203,16 @@ def get(self: "redis_db", key: str) -> Union[str, list]: if t == "string": val = self.r.get(key) if val: - return val.decode("utf-8") + return self.decompress(val.decode("utf-8")) else: raise ValueError( f"Couldn't decode data stored under key {key}" ) elif t == "list": - return [x.decode("utf-8") for x in self.r.lrange(key, 0, -1)] + return [ + redis_db.compress(x.decode("utf-8")) + for x in self.r.lrange(key, 0, -1) + ] else: raise TypeError(f"Unknown redis data type stored under {key}: {t}") @@ -136,7 +243,7 @@ def get_queue_msgs(self: "redis_db", key: str) -> list["twitter_msg"]: for msg in db_q: if msg: t_m = twitter_msg() - t_m.from_json(msg.decode("utf-8")) + t_m.from_json(self.decompress(msg.decode("utf-8"))) msgs.append(t_m) return msgs @@ -153,9 +260,12 @@ def get_stats(self: "redis_db", key: str) -> Union[None, "mrt_stats"]: if not json_str: return None else: - mrt_s.from_json(json_str.decode("utf-8")) + mrt_s.from_json(redis_db.decompress(json_str.decode("utf-8"))) return mrt_s + def ping(self: "redis_db") -> None: + assert self.r.ping() + def set_stats(self: "redis_db", key: str, mrt_s: "mrt_stats"): """ Take an MRT stats object, serialise it to JSON, store in Redis. @@ -165,7 +275,7 @@ def set_stats(self: "redis_db", key: str, mrt_s: "mrt_stats"): f"Missing required arguments: key={key}, mrt_s={mrt_s}" ) - self.r.set(key, mrt_s.to_json()) + self.r.set(key, redis_db.compress(mrt_s.to_json())) def set_stats_json(self: "redis_db", key: str, json_str: str): """ @@ -179,7 +289,7 @@ def set_stats_json(self: "redis_db", key: str, json_str: str): f"Missing required arguments: json_str={json_str}" ) - self.r.set(key, json_str) + self.r.set(key, redis_db.compress(json_str)) def to_file(self: "redis_db", filename: str): """ @@ -193,31 +303,45 @@ def to_file(self: "redis_db", filename: str): with open(filename, "w") as f: f.write(self.to_json()) + def to_file_stream(self: "redis_db", filename: str): + """ + to_json returns a giant dict of the entire DB which can be serialised + as a JSON string. The DB is now too big for this (the server runs out + of memory). Instead, write the DB to file, one key at a time: + """ + if not filename: + raise ValueError( + f"Missing required arguments: filename={filename}" + ) + + with open(filename, "w") as f: + for line in self.to_json_stream(): + f.write(line) + def to_json(self: "redis_db") -> str: """ Dump the entire redis DB to JSON """ d: dict = {} for k in self.r.keys("*"): - t = self.r.type(k).decode("utf-8") - if t == "string": - val = self.r.get(k) - if val: - d[k.decode("utf-8")] = val.decode("utf-8") - else: - raise ValueError( - f"Couldn't decode data stored under key {k.decode('utf-8')}" - ) - elif t == "list": - d[k.decode("utf-8")] = [ - x.decode("utf-8") for x in self.r.lrange(k, 0, -1) - ] - else: - raise TypeError( - f"Unsupported data type {t} stored under key {k.decode('utf-8')}" - ) + d[k.decode("utf-8")] = self.get(key=k) if d: - return json.dumps(d) + json_str = json.dumps(d) + logging.info(f"Dumped {len(d)} k/v's") + return json_str else: raise ValueError("Database is empty") + + def to_json_stream(self: "redis_db") -> Iterable: + yield ("{") + keys = self.r.keys("*") + for idx, key in enumerate(keys): + k = key.decode("utf-8") + d = json.dumps({k: self.get(key=k)}) + if idx == len(keys) - 1: + yield (f"{d[1:-1]}") + else: + yield (f"{d[1:-1]}, ") + yield ("}") + logging.info(f"Dumped {len(keys)} k/v's as stream") diff --git a/dnas/scripts/redis_mgmt.py b/dnas/scripts/redis_mgmt.py index e5cc3cc..265a418 100755 --- a/dnas/scripts/redis_mgmt.py +++ b/dnas/scripts/redis_mgmt.py @@ -33,25 +33,32 @@ def delete(key: str) -> None: logging.info(f"Nothing to delete for {key}") -def dump_json(filename: str) -> None: +def dump_json(filename: str, stream: bool) -> None: """ Dump the entire redis DB to a JSON file. """ if not filename: raise ValueError(f"Missing required arguments: filename={filename}") - rdb.to_file(filename) + if stream: + rdb.to_file_stream(filename=filename) + else: + rdb.to_file(filename=filename) logging.info(f"Written DB dump to {filename}") -def load_json(filename: str) -> None: +def load_json(filename: str, stream: bool) -> None: """ Import a JOSN dump into redis. """ if not filename: raise ValueError(f"Missing required arguments: filename={filename}") - rdb.from_file(filename) + if stream: + rdb.from_file_stream(filename=filename) + else: + rdb.from_file(filename=filename) + logging.info(f"Loaded DB dump from {filename}") def parse_args() -> dict: @@ -151,6 +158,13 @@ def parse_args() -> dict: required=False, default=None, ) + parser.add_argument( + "--stream", + help="When dumping/loading from a JSON file, stream the data", + default=False, + action="store_true", + required=False, + ) parser.add_argument( "--wipe", help="Erase the entire redis DB", @@ -273,20 +287,20 @@ def main(): ) if args["dump"]: - dump_json(args["dump"]) + dump_json(filename=args["dump"], stream=args["stream"]) elif args["load"]: - load_json(args["load"]) + load_json(filename=args["load"], stream=args["stream"]) elif args["wipe"]: wipe() if args["daily"]: - print_stats_daily(args["daily"]) + print_stats_daily(ymd=args["daily"]) if args["delete"]: - delete(args["delete"]) + delete(key=args["delete"]) if args["diff"]: - print_stats_diff(args["diff"]) + print_stats_diff(keys=args["diff"]) if args["global"]: print_stats_global() @@ -295,13 +309,13 @@ def main(): print_keys() if args["pprint"]: - pprint_key(args["pprint"]) + pprint_keykey = args["pprint"] if args["print"]: - print_key(args["print"]) + print_key(key=args["print"]) if args["stats"]: - print_stats(args["stats"]) + print_stats(key=args["stats"]) rdb.close() diff --git a/docker/README.md b/docker/README.md index 660e00a..1580146 100644 --- a/docker/README.md +++ b/docker/README.md @@ -2,16 +2,13 @@ ## Building -To build the Redis DB container and DNAS containers use the following: +To build pull the latest code version, build the DNAS containers, and the Redis DB container, use the following: ```bash -cd /opt/dnas/ -source venv/bin/activate -cd docker/ -docker-compose build +/opt/dnas/docker/build_dnas.sh ``` -One can run `docker-compose build` to rebuild the Redis and DNAS containers any time. However this doesn't pull the latest software version from Git. To pull the latest code from the Git repo and rebuild the DNAS container run the build script: `/opt/dnas/docker/build_dnas.sh` +One can run `docker-compose build` to rebuild the Redis and DNAS containers any time. However this doesn't pull the latest software version from Git. ### Build Issues diff --git a/docker/manual_day.sh b/docker/manual_day.sh index ae8a9c2..4aca2a6 100755 --- a/docker/manual_day.sh +++ b/docker/manual_day.sh @@ -13,6 +13,10 @@ set -o pipefail # Error if any command returns a non-zero exist status set -e +# shellcheck disable=SC1091 +source /opt/dnas/venv/bin/activate +cd "/opt/dnas/docker/" + SCRIPTS="/opt/dnas/dnas/scripts" docker-compose run --rm --name tmp_getter dnas_getter -- \ diff --git a/docker/manual_range.sh b/docker/manual_range.sh index 2db24e6..6e32375 100755 --- a/docker/manual_range.sh +++ b/docker/manual_range.sh @@ -24,6 +24,10 @@ then exit 1 fi +# shellcheck disable=SC1091 +source /opt/dnas/venv/bin/activate +cd "/opt/dnas/docker/" + SCRIPTS="/opt/dnas/dnas/scripts" SY="${1}" SM="${2}" diff --git a/redis/backup_redis.sh b/redis/backup_redis.sh index e78d35d..9bf51c4 100755 --- a/redis/backup_redis.sh +++ b/redis/backup_redis.sh @@ -2,14 +2,17 @@ set -eu +# shellcheck disable=SC1091 +source /opt/dnas/venv/bin/activate + datetime=$(date "+%Y-%m-%d") backup_dir="/opt/dnas/redis/backups" backup_file="${backup_dir}/redis-${datetime}.json" mkdir -p "${backup_dir}" chown bensley:bensley "${backup_dir}" -source /opt/dnas/venv/bin/activate cd /opt/dnas/dnas/scripts/ || exit 1 -./redis_mgmt.py --dump "${backup_file}" +./redis_mgmt.py --stream --dump "${backup_file}" +echo "Uncompressed size is: $(ls -lh "${backup_file}")" gzip "${backup_file}" - +echo "Compressed size is: $(ls -lh "${backup_file}.gz")" diff --git a/tox.ini b/tox.ini index 88c454e..1a1fdd2 100644 --- a/tox.ini +++ b/tox.ini @@ -30,7 +30,7 @@ deps = types-redis types-requests changedir = {toxinidir} -commands = mypy --config-file mypy.ini ./ +commands = mypy --config-file mypy.ini --exclude venv/ ./ [testenv:pytest] skip_install=true