Skip to content

Commit

Permalink
Merge pull request #43 from jwbensley/issue-26
Browse files Browse the repository at this point in the history
Not really fixing issue #26 - this is a temporary hack
  • Loading branch information
jwbensley authored Mar 7, 2022
2 parents 22ccadb + 6ede7b6 commit c08ef2b
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 60 deletions.
4 changes: 3 additions & 1 deletion dnas/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ class config:
LOG_TWITTER = os.path.join(LOG_DIR, "gen_tweets.log")



"""
The time format used for generating new timestamps and parsing existing
timestamps. The format below is the same format used by the MRT archives.
Expand All @@ -50,6 +49,9 @@ class config:
# Base dir to save MRT files to
DL_DIR = os.path.join(BASE_DIR, "downloads/")

# Temporary directory to split MRT files into
SPLIT_DIR = "/tmp/" # Set to None to disable

"""
If the machine running this code is in a different timezone to the
route-views MRT archives, an additional offset in hous is required.
Expand Down
65 changes: 31 additions & 34 deletions dnas/mrt_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,20 +62,28 @@ def parse_rib_dump(filename):
f"Missing required arguments: filename={filename}."
)

orig_filename = '_'.join(filename.split("_")[:-1])
if not orig_filename:
# If parsing a chunk of an MRT file, try to work out the orig filename
if cfg.SPLIT_DIR:
orig_filename = '_'.join(filename.split("_")[:-1])
else:
# Else, assume parsing a full MRT file
orig_filename = filename
file_ts = mrt_parser.get_timestamp(orig_filename)

rib_data = mrt_stats()
rib_data.timestamp = file_ts
if orig_filename not in rib_data.file_list:
rib_data.file_list.append(orig_filename)
rib_data.file_list.append(orig_filename)

if cfg.SPLIT_DIR:
mrt_entries = mrtparse.Reader(
os.path.join(cfg.SPLIT_DIR, os.path.basename(filename))
)
else:
mrt_entries = mrtparse.Reader(filename)

mrt_entries = mrtparse.Reader(filename)
for idx, mrt_e in enumerate(mrt_entries):
if "prefix" not in mrt_e.data:
continue #############################################
continue #### FIX ME - Skip the peer table record at the start?

ts = mrt_parser.posix_to_ts(
next(iter(mrt_e.data["timestamp"].items()))[0]
Expand Down Expand Up @@ -216,19 +224,26 @@ def parse_upd_dump(filename):
upd_prefix = {}
advt_per_origin_asn = {}
upd_peer_asn = {}

orig_filename = '_'.join(filename.split("_")[:-1])
if not orig_filename:

# If parsing a chunk of an MRT file, try to work out the orig filename
if cfg.SPLIT_DIR:
orig_filename = '_'.join(filename.split("_")[:-1])
else:
# Else, assume parsing a full MRT file
orig_filename = filename

file_ts = mrt_parser.get_timestamp(orig_filename)

upd_stats = mrt_stats()
upd_stats.timestamp = file_ts

if orig_filename not in upd_stats.file_list:
upd_stats.file_list.append(orig_filename)
upd_stats.file_list.append(os.path.basename(orig_filename))

mrt_entries = mrtparse.Reader(filename)
if cfg.SPLIT_DIR:
mrt_entries = mrtparse.Reader(
os.path.join(cfg.SPLIT_DIR, os.path.basename(filename))
)
else:
mrt_entries = mrtparse.Reader(filename)
for idx, mrt_e in enumerate(mrt_entries):

ts = mrt_parser.posix_to_ts(
Expand Down Expand Up @@ -369,30 +384,12 @@ def parse_upd_dump(filename):
) for prefix in prefixes
]

##### upd_stats should always be emtpy so remove this after tests are added !!!!!!!!!!!!
"""
if len(longest_as_path[0].as_path) == len(upd_stats.longest_as_path[0].as_path):
s_paths = [mrt_e.as_path for mrt_e in upd_stats.longest_as_path]
u_paths = [mrt_e.as_path for mrt_e in longest_as_path]
for u_path in u_paths:
if u_path not in s_paths:
upd_stats.longest_as_path.extend(u_path)
elif len(longest_as_path[0].as_path) > len(upd_stats.longest_as_path[0].as_path):
upd_stats.longest_as_path = longest_as_path.copy()
"""

################## FIX ME - REMOVE "if" upd_states is empty
if len(longest_as_path[0].as_path) > len(upd_stats.longest_as_path[0].as_path):
upd_stats.longest_as_path = longest_as_path.copy()

"""
if len(longest_comm_set[0].comm_set) == len(upd_stats.longest_comm_set[0].comm_set):
s_comms = [mrt_e.comm_set for mrt_e in upd_stats.longest_comm_set]
u_comms = [mrt_e.comm_set for mrt_e in longest_comm_set]
for u_comm in u_comms:
if u_comm not in s_comms:
upd_stats.longest_comm_set.extend(u_comm)
elif len(longest_comm_set[0].comm_set) > len(upd_stats.longest_comm_set[0].comm_set):
upd_stats.longest_comm_set = longest_comm_set.copy()
"""
################## FIX ME - REMOVE "if" upd_states is empty
if len(longest_comm_set[0].comm_set) > len(upd_stats.longest_comm_set[0].comm_set):
upd_stats.longest_comm_set = longest_comm_set.copy()

Expand Down
47 changes: 27 additions & 20 deletions dnas/mrt_splitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def __iter__(self):

def __next__(self):
"""
Move to the next entri in the MRT file.
Move to the next entry in the MRT file.
"""
mrt_entry = bytearray(self.f.read(12))

Expand All @@ -81,50 +81,57 @@ def __next__(self):
for i in mrt_entry[-4:]:
length = (length << 8) + i

###logging.debug(f"MRT entry length is {length}")
mrt_entry += bytes(self.f.read(length))
self.data = mrt_entry

return self

def split(self, no_of_chunks):
def split(self, no_chunks=None, outdir=None):
"""
Split the MRT data into N equal chunks written to disk.
Return the total number of MRT entries and a list of file names.
Return the total number of MRT entries and the list of chunk filenames.
"""

if not self.f:
raise AttributeError("No MRT file is currently open")

if (not no_of_chunks or
not isinstance(no_of_chunks, int) or
no_of_chunks < 1):
if (not no_chunks or
not isinstance(no_chunks, int) or
no_chunks < 1):
raise ValueError(
f"Number of chunks to split MRT file into must be a positive "
f"integer, not {no_of_chunks}"
f"integer, not {no_chunks}"
)

# If no output dir is specified, write to the input directory:
if not outdir:
outdir = os.path.dirname(self.filename)

# Skip the peer table which is the first entry in the RIB dump
mrt_a = mrt_archives()
if mrt_a.is_rib_from_filename(self.filename):
next(self)

chunk_names = []
chunk_files = []
for i in range(0, no_of_chunks):
chunk_filenames = []
chunk_fds = []
for i in range(0, no_chunks):
chunk_name = self.filename + "_" + str(i)
chunk_names.append(chunk_name)
logging.debug(f"Opening {chunk_name} for output")
f = open(chunk_name, "wb")
chunk_files.append(f)
chunk_filenames.append(chunk_name)
chunk_outpath = os.path.join(
outdir,
os.path.basename(self.filename) + "_" + str(i)
)
logging.debug(f"Opening {chunk_outpath} for output")
f = open(chunk_outpath, "wb")
chunk_fds.append(f)

for idx, entry in enumerate(self):
chunk_files[idx % no_of_chunks].write(entry.data)
chunk_fds[idx % no_chunks].write(entry.data)

for i in range(0, len(chunk_files)):
chunk_files[i].close()
for i in range(0, len(chunk_fds)):
chunk_fds[i].close()

total = idx + 1
logging.debug(f"Split {total} mrt_entries into {no_of_chunks} files.")
logging.debug(f"Split {total} mrt_entries into {no_chunks} files.")

return total, chunk_names
return total, chunk_filenames
20 changes: 15 additions & 5 deletions scripts/parse_mrts.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,10 @@ def parse_file(filename=None, keep_chunks=False):
logging.info(f"Processing {filename}...")

splitter = mrt_splitter(filename)
num_entries, file_chunks = splitter.split(no_cpu)
num_entries, file_chunks = splitter.split(
no_chunks=no_cpu,
outdir=cfg.SPLIT_DIR
)
try:
splitter.close()
except StopIteration:
Expand All @@ -150,9 +153,16 @@ def parse_file(filename=None, keep_chunks=False):
mrt_chunks = Pool.map(mrt_parser.parse_upd_dump, file_chunks)
Pool.close()

for i in range(0, len(file_chunks)):
if not keep_chunks:
os.remove(file_chunks[i])
if not keep_chunks:
for i in range(0, len(file_chunks)):
if cfg.SPLIT_DIR:
os.remove(
os.path.join(
cfg.SPLIT_DIR, os.path.basename(file_chunks[i])
)
)
else:
os.remove(file_chunks[i])

mrt_s = mrt_stats()
for chunk in mrt_chunks:
Expand Down Expand Up @@ -416,7 +426,7 @@ def main():
f"{logging.getLevelName(logging.getLogger().getEffectiveLevel())}"
)

if (not args["rib"] and not args["update"]):
if (not args["rib"] and not args["update"] and not args["single"]):
raise ValueError(
"At least one of --rib and/or --update must be specified!"
)
Expand Down

0 comments on commit c08ef2b

Please sign in to comment.