Skip to content

Commit

Permalink
Issue-#132 Add --no-compression option
Browse files Browse the repository at this point in the history
  • Loading branch information
James Bensley committed Nov 12, 2023
1 parent 54859e8 commit d9fde95
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 60 deletions.
91 changes: 50 additions & 41 deletions dnas/dnas/redis_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,9 @@ def delete(self: "redis_db", key: str) -> int:

return self.r.delete(key)

def from_file(self: "redis_db", filename: str) -> None:
def from_file(
self: "redis_db", filename: str, compression: bool = True
) -> None:
"""
Restore redis DB from JSON file.
"""
Expand All @@ -103,7 +105,9 @@ def from_file(self: "redis_db", filename: str) -> None:
with open(filename, "r") as f:
self.from_json(f.read())

def from_file_stream(self: "redis_db", filename: str) -> None:
def from_file_stream(
self: "redis_db", filename: str, compression: bool = True
) -> None:
"""
Restore redis DB from JSON file, loading the content one line at time.
"""
Expand Down Expand Up @@ -173,12 +177,12 @@ def from_file_stream(self: "redis_db", filename: str) -> None:
# Load it into REDIS
k = list(json_dict.keys())[0]
v = list(json_dict.values())[0]
self.r.set(k, redis_db.compress(v))
self.set(key=k, value=v, compression=compression)
loaded_kvs += 1

logging.info(f"Loaded {loaded_kvs} k/v's from stream")

def from_json(self: "redis_db", json_str: str):
def from_json(self: "redis_db", json_str: str, compression: bool = True):
"""
Restore redis DB from a JSON string
"""
Expand All @@ -189,10 +193,12 @@ def from_json(self: "redis_db", json_str: str):

json_dict = json.loads(json_str)
for k in json_dict.keys():
self.r.set(k, redis_db.compress(json_dict[k]))
self.set(key=k, value=json_dict[k], compression=compression)
logging.info(f"Loaded {len(json_dict)} k/v's")

def get(self: "redis_db", key: str) -> Union[str, list]:
def get(
self: "redis_db", key: str, compression: bool = True
) -> Union[str, list]:
"""
Return the value stored in "key" from Redis
"""
Expand All @@ -203,16 +209,22 @@ def get(self: "redis_db", key: str) -> Union[str, list]:
if t == "string":
val = self.r.get(key)
if val:
return self.decompress(val.decode("utf-8"))
if compression:
return redis_db.decompress(val.decode("utf-8"))
else:
return val.decode("utf-8")
else:
raise ValueError(
f"Couldn't decode data stored under key {key}"
)
elif t == "list":
return [
redis_db.compress(x.decode("utf-8"))
for x in self.r.lrange(key, 0, -1)
]
if compression:
return [
redis_db.compress(x.decode("utf-8"))
for x in self.r.lrange(key, 0, -1)
]
else:
return [x.decode("utf-8") for x in self.r.lrange(key, 0, -1)]
else:
raise TypeError(f"Unknown redis data type stored under {key}: {t}")

Expand Down Expand Up @@ -248,50 +260,45 @@ def get_queue_msgs(self: "redis_db", key: str) -> list["twitter_msg"]:

return msgs

def get_stats(self: "redis_db", key: str) -> Union[None, "mrt_stats"]:
def get_stats(
self: "redis_db", key: str, compression: bool = True
) -> Union[None, "mrt_stats"]:
"""
Return MRT stats from Redis as JSON, and return as an MRT stats object.
"""
if not key:
raise ValueError(f"Missing required arguments: key={key}")

mrt_s = mrt_stats()
json_str = self.r.get(key)
json_str = self.get(key, compression=compression)
assert type(json_str) == str
if not json_str:
return None
else:
mrt_s.from_json(redis_db.decompress(json_str.decode("utf-8")))
if compression:
mrt_s.from_json(redis_db.decompress(json_str))
else:
mrt_s.from_json(json_str)
return mrt_s

def ping(self: "redis_db") -> None:
assert self.r.ping()

def set_stats(self: "redis_db", key: str, mrt_s: "mrt_stats"):
"""
Take an MRT stats object, serialise it to JSON, store in Redis.
"""
if not key or not mrt_s:
raise ValueError(
f"Missing required arguments: key={key}, mrt_s={mrt_s}"
)

self.r.set(key, redis_db.compress(mrt_s.to_json()))

def set_stats_json(self: "redis_db", key: str, json_str: str):
def set(self: "redis_db", key: str, value: str, compression: bool = True):
"""
Take JSON serialisation of an MRT stats object, and store in Redis.
Take a key and a string and store it in Redis.
"""
if not key:
raise ValueError(f"Missing required arguments: key={key}")

if not json_str:
if not key or not value:
raise ValueError(
f"Missing required arguments: json_str={json_str}"
f"Missing required arguments: key={key}, value={value}"
)

self.r.set(key, redis_db.compress(json_str))
if compression:
self.r.set(key, redis_db.compress(value))
else:
self.r.set(key, value)

def to_file(self: "redis_db", filename: str):
def to_file(self: "redis_db", filename: str, compression: bool = True):
"""
Dump the entire redis DB to a JSON file.
"""
Expand All @@ -301,9 +308,11 @@ def to_file(self: "redis_db", filename: str):
)

with open(filename, "w") as f:
f.write(self.to_json())
f.write(self.to_json(compression=compression))

def to_file_stream(self: "redis_db", filename: str):
def to_file_stream(
self: "redis_db", filename: str, compression: bool = True
):
"""
to_json returns a giant dict of the entire DB which can be serialised
as a JSON string. The DB is now too big for this (the server runs out
Expand All @@ -315,16 +324,16 @@ def to_file_stream(self: "redis_db", filename: str):
)

with open(filename, "w") as f:
for line in self.to_json_stream():
for line in self.to_json_stream(compression=compression):
f.write(line)

def to_json(self: "redis_db") -> str:
def to_json(self: "redis_db", compression: bool = True) -> str:
"""
Dump the entire redis DB to JSON
"""
d: dict = {}
for k in self.r.keys("*"):
d[k.decode("utf-8")] = self.get(key=k)
d[k.decode("utf-8")] = self.get(key=k, compression=compression)

if d:
json_str = json.dumps(d)
Expand All @@ -333,12 +342,12 @@ def to_json(self: "redis_db") -> str:
else:
raise ValueError("Database is empty")

def to_json_stream(self: "redis_db") -> Iterable:
def to_json_stream(self: "redis_db", compression: bool = True) -> Iterable:
yield ("{")
keys = self.r.keys("*")
for idx, key in enumerate(keys):
k = key.decode("utf-8")
d = json.dumps({k: self.get(key=k)})
d = json.dumps({k: self.get(key=k, compression=compression)})
if idx == len(keys) - 1:
yield (f"{d[1:-1]}")
else:
Expand Down
4 changes: 2 additions & 2 deletions dnas/scripts/parse_mrts.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,14 +353,14 @@ def parse_files(filelist: list[str], args: dict) -> None:
elif file not in day_stats.file_list:
logging.info(f"Added {file} to {day_key} file list")
day_stats.file_list.append(file)
rdb.set_stats(day_key, day_stats)
rdb.set(day_key, day_stats.to_json())

else:
if arch:
mrt_s.add_archive(arch.NAME)
else:
logging.warning(f"Unable to add archive name to stats object")
rdb.set_stats(day_key, mrt_s)
rdb.set(day_key, mrt_s.to_json())
logging.info(f"Created new entry {day_key} from {file}")

if args["remove"]:
Expand Down
37 changes: 26 additions & 11 deletions dnas/scripts/redis_mgmt.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,31 +33,31 @@ def delete(key: str) -> None:
logging.info(f"Nothing to delete for {key}")


def dump_json(filename: str, stream: bool) -> None:
def dump_json(filename: str, compression: bool, stream: bool) -> None:
"""
Dump the entire redis DB to a JSON file.
"""
if not filename:
raise ValueError(f"Missing required arguments: filename={filename}")

if stream:
rdb.to_file_stream(filename=filename)
rdb.to_file_stream(compression=compression, filename=filename)
else:
rdb.to_file(filename=filename)
rdb.to_file(compression=compression, filename=filename)
logging.info(f"Written DB dump to {filename}")


def load_json(filename: str, stream: bool) -> None:
def load_json(filename: str, compression: bool, stream: bool) -> None:
"""
Import a JOSN dump into redis.
"""
if not filename:
raise ValueError(f"Missing required arguments: filename={filename}")

if stream:
rdb.from_file_stream(filename=filename)
rdb.from_file_stream(compression=compression, filename=filename)
else:
rdb.from_file(filename=filename)
rdb.from_file(compression=compression, filename=filename)
logging.info(f"Loaded DB dump from {filename}")


Expand Down Expand Up @@ -136,6 +136,13 @@ def parse_args() -> dict:
required=False,
default=None,
)
parser.add_argument(
"--no-compression",
help="Disable compression When dumping/loading a JSON file",
default=False,
action="store_true",
required=False,
)
parser.add_argument(
"--pprint",
help="Pretty print the value stored in redis at the given key.",
Expand Down Expand Up @@ -287,9 +294,17 @@ def main():
)

if args["dump"]:
dump_json(filename=args["dump"], stream=args["stream"])
dump_json(
filename=args["dump"],
compression=not args["no_compression"],
stream=args["stream"],
)
elif args["load"]:
load_json(filename=args["load"], stream=args["stream"])
load_json(
filename=args["load"],
compression=not args["no_compression"],
stream=args["stream"],
)
elif args["wipe"]:
wipe()

Expand All @@ -308,12 +323,12 @@ def main():
if args["keys"]:
print_keys()

if args["pprint"]:
pprint_keykey = args["pprint"]

if args["print"]:
print_key(key=args["print"])

if args["pprint"]:
pprint_key(args["pprint"])

if args["stats"]:
print_stats(key=args["stats"])

Expand Down
12 changes: 6 additions & 6 deletions dnas/scripts/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,12 @@ def gen_day_stats(
f"No existing global stats obj for day {ymd}, "
f"storing compiled stats under {day_key}"
)
rdb.set_stats(day_key, day_stats)
rdb.set(day_key, day_stats.to_json())
else:
logging.debug(f"Retrieved existing day stats from {day_key}")
if db_day_stats.merge(day_stats):
db_day_stats.merge_archives(day_stats)
rdb.set_stats(day_key, db_day_stats)
rdb.set(day_key, db_day_stats.to_json())
logging.info(
f"Merged {ymd} stats with existing day stats under "
f"{day_key}"
Expand Down Expand Up @@ -155,7 +155,7 @@ def gen_diff(ymd: str) -> None:
)
else:
logging.info(f"Storing new diff stats for {ymd} under {diff_key}")
rdb.set_stats(diff_key, new_diff)
rdb.set(diff_key, new_diff.to_json())

else:
if new_diff.is_empty():
Expand All @@ -166,7 +166,7 @@ def gen_diff(ymd: str) -> None:
logging.info(
f"Overwitten existing diff for {ymd} under {diff_key}"
)
rdb.set_stats(diff_key, new_diff)
rdb.set(diff_key, new_diff.to_json())

rdb.close()

Expand Down Expand Up @@ -342,14 +342,14 @@ def upd_global_with_day(ymd: str) -> None:
f"No existing gobal stats in redis, creating new entry with day "
f"stats for {ymd}"
)
rdb.set_stats(global_key, day_stats)
rdb.set(global_key, day_stats.to_json())

# Else there are global stats and day stats to merge
else:
if global_stats.merge(day_stats):
global_stats.merge_archives(day_stats)
logging.info(f"Global stats merged with day stats from {ymd}")
rdb.set_stats(global_key, global_stats)
rdb.set(global_key, global_stats.to_json())
else:
logging.info(
f"No update to global stats with day stats from {ymd}"
Expand Down

0 comments on commit d9fde95

Please sign in to comment.