This repository has been archived by the owner on Jun 6, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathconvert_mysql.py
146 lines (123 loc) · 5.28 KB
/
convert_mysql.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
import csv
import re
import dask.dataframe as dd
import pandas as pd
from dask.diagnostics import ProgressBar
from tqdm import tqdm
from constants import (
DEBUG_MYSQL_CSV,
DEBUG_MYSQL_LOG,
DEBUG_POSTGRESQL_CSV,
PG_LOG_DTYPES,
)
def convert_mysql_general_log_to_mysql_csv(mysql_log_path, output_csv_path):
"""
Convert the general log into CSV format.
By default, the MySQL general log format is not set up for read_csv.
There is a general lack of clear delineation between different queries,
even vim chokes trying to open some of the log files.
This converts the annoying input into slightly cleaner CSV,
with headers
TODO(WAN): Write a wrapper around IOBase to avoid writing an intermediate file.
Parameters
----------
mysql_log_path : Path
The MySQL general log to be converted.
output_csv_path : Path
The output destination for the CSVfied MySQL query log.
"""
# Regexes for recognizing various MySQL query log constructs.
regex_header = re.compile(r"[\s\S]*Time\s+Id\s+Command\s+Argument")
regex_date_id = re.compile(r"^(\d+.*)Z(\d+)")
regex_time_id_command_argument = re.compile(
r"(\d+.*)Z(\d+) (Connect|Init DB|Query|Quit|Statistics)\t([\s\S]*)"
)
with open(output_csv_path, "w", encoding="utf-8") as output_csv:
writer = csv.writer(output_csv, lineterminator="\n", quoting=csv.QUOTE_ALL)
def buffer_to_line(buffer):
# Join the buffer.
joined_buf = "\n".join(buffer)
# PostgreSQL vs MySQL things.
joined_buf = joined_buf.replace("'", "'")
# TODO(WAN): Normalize the query? But valid queries not guaranteed, e.g., log-raw.
return joined_buf
def write_line(line):
# Parse the line into a (time, id, command, argument)
match = regex_time_id_command_argument.match(line)
if match is None:
assert regex_header.match(line) is not None, f"Bad line: {line}"
# If control flow reaches this point, the line is hopefully junk.
else:
writer.writerow(match.groups())
writer.writerow(["Time", "Id", "Command", "Argument"])
buffer = []
# num_lines is wasteful, but eh, progress tracking is nice.
with open(mysql_log_path, "r", encoding="latin-1") as dummy_file:
num_lines = sum(1 for _ in dummy_file)
with tqdm(
open(mysql_log_path, "r", encoding="latin-1"), total=num_lines
) as mysql_log:
# Iterate over each line in the query log as delimited by \n.
# Note that this is not a complete log entry,
# because query strings can contain \n's as well.
for line in mysql_log:
# First, remove any trailing \n's.
line = line.rstrip("\n")
# If there is no date, this is _probably_ part of the previous line.
# TODO(WAN): Except for when it isn't.
if regex_date_id.match(line) is None:
# Continuation of previous line.
buffer.append(line)
continue
# Otherwise, finish the current line and initialize the next.
write_line(buffer_to_line(buffer))
buffer = [line]
write_line(buffer_to_line(buffer))
def convert_mysql_csv_to_postgresql_csv(mysql_csv_path, output_csv_path):
"""
Convert a CSVfied MySQL query log to a PostgreSQL csvlog.
Parameters
----------
mysql_log_path : Path
The CSVfied MySQL general log to be converted.
output_csv_path : Path
The output destination for the PostgreSQL query log.
"""
# blocksize=None is necessary. dask defaults to chunking at \n boundaries,
# but since our queries can contain \n tokens, we can't let it do that and
# must live without the parallelism.
mysql_df = dd.read_csv(mysql_csv_path, blocksize=None)
def augment(df):
thread_id = df["Id"].iloc[0]
df = df.sort_values(["Time"])
# TODO(WAN): Right now, we assume autocommit=1. But maybe we can parse this out.
df["session_id"] = thread_id
df["session_line_num"] = range(df.shape[0])
df["virtual_transaction_id"] = [
f"AAC/{thread_id}/{n}" for n in range(df.shape[0])
]
df = df.drop(columns=["Id"])
# TODO(WAN): This is kind of an abuse of PostgreSQL portal names.
df["message"] = "execute " + df["Command"] + ": " + df["Argument"]
df = df.drop(columns=["Command", "Argument"])
df = df.rename(columns={"Time": "log_time"})
for key in PG_LOG_DTYPES:
if key not in df.columns:
df[key] = pd.NA
return df[PG_LOG_DTYPES.keys()]
postgresql_df = mysql_df.groupby("Id").apply(augment, meta=PG_LOG_DTYPES)
postgresql_df = postgresql_df.sort_values("log_time")
postgresql_df.to_csv(
output_csv_path,
single_file=True,
index=False,
header=False,
quoting=csv.QUOTE_ALL,
)
def main():
pbar = ProgressBar()
pbar.register()
convert_mysql_general_log_to_mysql_csv(DEBUG_MYSQL_LOG, DEBUG_MYSQL_CSV)
convert_mysql_csv_to_postgresql_csv(DEBUG_MYSQL_CSV, DEBUG_POSTGRESQL_CSV)
if __name__ == "__main__":
main()