Skip to content

Commit

Permalink
Issues #193 - stream redis data (#196)
Browse files Browse the repository at this point in the history
* Stream data out of redis one key at a time

* issue-#193 Add support for streaming in and out of REDIS

---------

Co-authored-by: James Bensley <[email protected]>
  • Loading branch information
jwbensley and James Bensley authored Nov 12, 2023
1 parent 71be790 commit 54859e8
Show file tree
Hide file tree
Showing 7 changed files with 198 additions and 52 deletions.
184 changes: 154 additions & 30 deletions dnas/dnas/redis_db.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
import base64
import gzip
import json
from typing import Union
import logging
from typing import Iterable, Union

from dnas.mrt_stats import mrt_stats
from dnas.redis_auth import redis_auth # type: ignore
from dnas.twitter_msg import twitter_msg
from redis.client import Redis

from redis import Redis


class redis_db:
Expand All @@ -13,11 +17,13 @@ class redis_db:
"""

def __init__(self: "redis_db") -> None:
self.r = Redis(
self.r: Redis = Redis(
host=redis_auth.host,
port=redis_auth.port,
password=redis_auth.password,
)
# Check we have connected:
self.ping()

def add_to_queue(self: "redis_db", key: str, json_str: str) -> None:
"""
Expand All @@ -32,14 +38,36 @@ def add_to_queue(self: "redis_db", key: str, json_str: str) -> None:
if type(json_str) != str:
raise TypeError(f"json_str is not a string: {type(json_str)}")

self.r.lpush(key, json_str)
self.r.lpush(key, redis_db.compress(json_str))

def close(self: "redis_db") -> None:
"""
Close the redis connection.
"""
self.r.close()

@staticmethod
def compress(data: str) -> str:
"""
Gzip compress the import data (result in compressed binary data)
Return a base85 encoded string of the compressed binary data.
"""
compressed = gzip.compress(
data=bytes(data, encoding="utf-8"), compresslevel=9
)
b85 = base64.b85encode(compressed)
return b85.decode("utf-8")

@staticmethod
def decompress(data: str) -> str:
"""
Take in a base85 encoded string, decompress it to the original gzip
binary data, and then decompress that to the original string
"""
compressed = base64.b85decode(data)
uncompressed = gzip.decompress(compressed)
return uncompressed.decode("utf-8")

def del_from_queue(self: "redis_db", key: str, elem: str) -> None:
"""
Delete an entry from a list of strings.
Expand All @@ -52,7 +80,7 @@ def del_from_queue(self: "redis_db", key: str, elem: str) -> None:
if type(elem) != str:
raise TypeError(f"elem is not a string: {type(elem)}")

self.r.lrem(key, 0, elem)
self.r.lrem(key, 0, redis_db.compress(elem))

def delete(self: "redis_db", key: str) -> int:
"""
Expand All @@ -75,6 +103,81 @@ def from_file(self: "redis_db", filename: str) -> None:
with open(filename, "r") as f:
self.from_json(f.read())

def from_file_stream(self: "redis_db", filename: str) -> None:
"""
Restore redis DB from JSON file, loading the content one line at time.
"""
if not filename:
raise ValueError(
f"Missing required arguments: filename={filename}"
)

loaded_kvs = 0
with open(filename, "r") as f:
# First character should be "{" to start the dump
opening = f.read(1)
assert opening == "{"

end = False
while not end:
# Find the start of the next key
char = ""
while char != '"':
char = f.read(1)
# Found the end of the dump
if char == "}":
end = True
break
if end:
break
# Confirm we didn't scan to the end of the file without a match
assert char == '"'

# Scan in the key name including quote marks
key = char
char = ""
while char != '"':
char = f.read(1)
key += char
assert char == '"'

# Find the start of the value
char = ""
while char != "{":
char = f.read(1)
assert char == "{"
dict_depth = 1

"""
Scan until the end of this dict.
This could be a dict of dicts, so track that the outer most dict
is "closed"
"""
value = '"{'
while dict_depth != 0:
char = f.read(1)
value += char
if char == "{":
dict_depth += 1
elif char == "}":
dict_depth -= 1
assert char == "}"
char = f.read(1)
assert char == '"'
value += char

# Compile the loaded value as a string
json_str = "{" + key + ": " + value + "}"
# Parse the string to check it's valid
json_dict: dict = json.loads(json_str)
# Load it into REDIS
k = list(json_dict.keys())[0]
v = list(json_dict.values())[0]
self.r.set(k, redis_db.compress(v))
loaded_kvs += 1

logging.info(f"Loaded {loaded_kvs} k/v's from stream")

def from_json(self: "redis_db", json_str: str):
"""
Restore redis DB from a JSON string
Expand All @@ -86,7 +189,8 @@ def from_json(self: "redis_db", json_str: str):

json_dict = json.loads(json_str)
for k in json_dict.keys():
self.r.set(k, json_dict[k])
self.r.set(k, redis_db.compress(json_dict[k]))
logging.info(f"Loaded {len(json_dict)} k/v's")

def get(self: "redis_db", key: str) -> Union[str, list]:
"""
Expand All @@ -99,13 +203,16 @@ def get(self: "redis_db", key: str) -> Union[str, list]:
if t == "string":
val = self.r.get(key)
if val:
return val.decode("utf-8")
return self.decompress(val.decode("utf-8"))
else:
raise ValueError(
f"Couldn't decode data stored under key {key}"
)
elif t == "list":
return [x.decode("utf-8") for x in self.r.lrange(key, 0, -1)]
return [
redis_db.compress(x.decode("utf-8"))
for x in self.r.lrange(key, 0, -1)
]
else:
raise TypeError(f"Unknown redis data type stored under {key}: {t}")

Expand Down Expand Up @@ -136,7 +243,7 @@ def get_queue_msgs(self: "redis_db", key: str) -> list["twitter_msg"]:
for msg in db_q:
if msg:
t_m = twitter_msg()
t_m.from_json(msg.decode("utf-8"))
t_m.from_json(self.decompress(msg.decode("utf-8")))
msgs.append(t_m)

return msgs
Expand All @@ -153,9 +260,12 @@ def get_stats(self: "redis_db", key: str) -> Union[None, "mrt_stats"]:
if not json_str:
return None
else:
mrt_s.from_json(json_str.decode("utf-8"))
mrt_s.from_json(redis_db.decompress(json_str.decode("utf-8")))
return mrt_s

def ping(self: "redis_db") -> None:
assert self.r.ping()

def set_stats(self: "redis_db", key: str, mrt_s: "mrt_stats"):
"""
Take an MRT stats object, serialise it to JSON, store in Redis.
Expand All @@ -165,7 +275,7 @@ def set_stats(self: "redis_db", key: str, mrt_s: "mrt_stats"):
f"Missing required arguments: key={key}, mrt_s={mrt_s}"
)

self.r.set(key, mrt_s.to_json())
self.r.set(key, redis_db.compress(mrt_s.to_json()))

def set_stats_json(self: "redis_db", key: str, json_str: str):
"""
Expand All @@ -179,7 +289,7 @@ def set_stats_json(self: "redis_db", key: str, json_str: str):
f"Missing required arguments: json_str={json_str}"
)

self.r.set(key, json_str)
self.r.set(key, redis_db.compress(json_str))

def to_file(self: "redis_db", filename: str):
"""
Expand All @@ -193,31 +303,45 @@ def to_file(self: "redis_db", filename: str):
with open(filename, "w") as f:
f.write(self.to_json())

def to_file_stream(self: "redis_db", filename: str):
"""
to_json returns a giant dict of the entire DB which can be serialised
as a JSON string. The DB is now too big for this (the server runs out
of memory). Instead, write the DB to file, one key at a time:
"""
if not filename:
raise ValueError(
f"Missing required arguments: filename={filename}"
)

with open(filename, "w") as f:
for line in self.to_json_stream():
f.write(line)

def to_json(self: "redis_db") -> str:
"""
Dump the entire redis DB to JSON
"""
d: dict = {}
for k in self.r.keys("*"):
t = self.r.type(k).decode("utf-8")
if t == "string":
val = self.r.get(k)
if val:
d[k.decode("utf-8")] = val.decode("utf-8")
else:
raise ValueError(
f"Couldn't decode data stored under key {k.decode('utf-8')}"
)
elif t == "list":
d[k.decode("utf-8")] = [
x.decode("utf-8") for x in self.r.lrange(k, 0, -1)
]
else:
raise TypeError(
f"Unsupported data type {t} stored under key {k.decode('utf-8')}"
)
d[k.decode("utf-8")] = self.get(key=k)

if d:
return json.dumps(d)
json_str = json.dumps(d)
logging.info(f"Dumped {len(d)} k/v's")
return json_str
else:
raise ValueError("Database is empty")

def to_json_stream(self: "redis_db") -> Iterable:
yield ("{")
keys = self.r.keys("*")
for idx, key in enumerate(keys):
k = key.decode("utf-8")
d = json.dumps({k: self.get(key=k)})
if idx == len(keys) - 1:
yield (f"{d[1:-1]}")
else:
yield (f"{d[1:-1]}, ")
yield ("}")
logging.info(f"Dumped {len(keys)} k/v's as stream")
38 changes: 26 additions & 12 deletions dnas/scripts/redis_mgmt.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,25 +33,32 @@ def delete(key: str) -> None:
logging.info(f"Nothing to delete for {key}")


def dump_json(filename: str) -> None:
def dump_json(filename: str, stream: bool) -> None:
"""
Dump the entire redis DB to a JSON file.
"""
if not filename:
raise ValueError(f"Missing required arguments: filename={filename}")

rdb.to_file(filename)
if stream:
rdb.to_file_stream(filename=filename)
else:
rdb.to_file(filename=filename)
logging.info(f"Written DB dump to {filename}")


def load_json(filename: str) -> None:
def load_json(filename: str, stream: bool) -> None:
"""
Import a JOSN dump into redis.
"""
if not filename:
raise ValueError(f"Missing required arguments: filename={filename}")

rdb.from_file(filename)
if stream:
rdb.from_file_stream(filename=filename)
else:
rdb.from_file(filename=filename)
logging.info(f"Loaded DB dump from {filename}")


def parse_args() -> dict:
Expand Down Expand Up @@ -151,6 +158,13 @@ def parse_args() -> dict:
required=False,
default=None,
)
parser.add_argument(
"--stream",
help="When dumping/loading from a JSON file, stream the data",
default=False,
action="store_true",
required=False,
)
parser.add_argument(
"--wipe",
help="Erase the entire redis DB",
Expand Down Expand Up @@ -273,20 +287,20 @@ def main():
)

if args["dump"]:
dump_json(args["dump"])
dump_json(filename=args["dump"], stream=args["stream"])
elif args["load"]:
load_json(args["load"])
load_json(filename=args["load"], stream=args["stream"])
elif args["wipe"]:
wipe()

if args["daily"]:
print_stats_daily(args["daily"])
print_stats_daily(ymd=args["daily"])

if args["delete"]:
delete(args["delete"])
delete(key=args["delete"])

if args["diff"]:
print_stats_diff(args["diff"])
print_stats_diff(keys=args["diff"])

if args["global"]:
print_stats_global()
Expand All @@ -295,13 +309,13 @@ def main():
print_keys()

if args["pprint"]:
pprint_key(args["pprint"])
pprint_keykey = args["pprint"]

if args["print"]:
print_key(args["print"])
print_key(key=args["print"])

if args["stats"]:
print_stats(args["stats"])
print_stats(key=args["stats"])

rdb.close()

Expand Down
Loading

0 comments on commit 54859e8

Please sign in to comment.