-
Notifications
You must be signed in to change notification settings - Fork 10
/
Copy pathwebui.py
257 lines (209 loc) · 9.24 KB
/
webui.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
import os
from zipfile import ZipFile
import json
import copy
from loguru import logger
from magic_pdf.pipe.UNIPipe import UNIPipe
from magic_pdf.pipe.OCRPipe import OCRPipe
from magic_pdf.pipe.TXTPipe import TXTPipe
from magic_pdf.rw.DiskReaderWriter import DiskReaderWriter
import magic_pdf.model as model_config
model_config.__use_inside_model__ = True
import gradio as gr
import time
from gradio_pdf import PDF
from pdf2image import convert_from_path
import threading
from magic_pdf.data.data_reader_writer import FileBasedDataWriter
# 创建一个全局变量来存储日志信息
log_messages = []
def init_model():
from magic_pdf.model.doc_analyze_by_custom_model import ModelSingleton
try:
model_manager = ModelSingleton()
model_manager.get_model(False, False)
logger.info(f"txt_model init final")
model_manager.get_model(True, False)
logger.info(f"ocr_model init final")
return 0
except Exception as e:
logger.exception(e)
return -1
gr.set_static_paths(paths=[".temp/","static/"])
def json_md_dump(
pipe,
md_writer,
pdf_name,
content_list,
md_content,
):
# 写入模型结果到 model.json
orig_model_list = copy.deepcopy(pipe.model_list)
md_writer.write_string(
f"model.json",
json.dumps(orig_model_list, ensure_ascii=False, indent=4)
)
# 写入中间结果到 middle.json
md_writer.write_string(
f"middle.json",
json.dumps(pipe.pdf_mid_data, ensure_ascii=False, indent=4),
)
# text文本结果写入到 conent_list.json
md_writer.write_string(
f"content_list.json",
json.dumps(content_list, ensure_ascii=False, indent=4),
)
# 写入结果到 .md 文件中
md_writer.write_string(
f"content.md",
md_content
)
return f"{pdf_name}.md"
def pdf_parse_main(
pdf_path: str,
progress=gr.Progress(),
parse_method: str = 'auto',
model_json_path: str = None,
is_json_md_dump: bool = True,
output_dir: str = None
):
"""
执行从 pdf 转换到 json、md 的过程,输出 md 和 json 文件到 pdf 文件所在的目录
:param pdf_path: .pdf 文件的路径,可以是相对路径,也可以是绝对路径
:param parse_method: 解析方法, 共 auto、ocr、txt 三种,默认 auto,如果效果不好,可以尝试 ocr
:param model_json_path: 已经存在的模型数据文件,如果为空则使用内置模型,pdf 和 model_json 务必对应
:param is_json_md_dump: 是否将解析后的数据写入到 .json 和 .md 文件中,默认 True,会将不同阶段的数据写入到不同的 .json 文件中(共3个.json文件),md内容会保存到 .md 文件中
:param output_dir: 输出结果的目录地址,会生成一个以 pdf 文件名命名的文件夹并保存所有结果
"""
progress(0, desc="正在启动任务...")
logger.info("任务开始处理了")
# 定义一个日志处理器函数,将日志信息添加到全局变量中
def log_to_textbox(message):
progress(1, desc=message)
# 配置 loguru 日志记录
logger.add(log_to_textbox, format="{time} {level} {message}", level="INFO")
try:
pdf_name = os.path.basename(pdf_path).split(".")[0]
pdf_path_parent = os.path.dirname(pdf_path)
if output_dir:
output_path = os.path.join(output_dir, pdf_name)
else:
output_path = os.path.join(pdf_path_parent, pdf_name)
output_image_path = os.path.join(output_path, 'images')
# 获取图片的父路径,为的是以相对路径保存到 .md 和 conent_list.json 文件中
image_path_parent = os.path.basename(output_image_path)
pdf_bytes = open(pdf_path, "rb").read() # 读取 pdf 文件的二进制数据
if model_json_path:
# 读取已经被模型解析后的pdf文件的 json 原始数据,list 类型
model_json = json.loads(open(model_json_path, "r", encoding="utf-8").read())
else:
model_json = []
# 执行解析步骤
# image_writer = DiskReaderWriter(output_image_path)
image_writer, md_writer = FileBasedDataWriter(output_image_path), FileBasedDataWriter(output_path)
# 选择解析方式
# jso_useful_key = {"_pdf_type": "", "model_list": model_json}
# pipe = UNIPipe(pdf_bytes, jso_useful_key, image_writer)
if parse_method == "auto":
jso_useful_key = {"_pdf_type": "", "model_list": model_json}
pipe = UNIPipe(pdf_bytes, jso_useful_key, image_writer)
elif parse_method == "txt":
pipe = TXTPipe(pdf_bytes, model_json, image_writer)
elif parse_method == "ocr":
pipe = OCRPipe(pdf_bytes, model_json, image_writer)
else:
logger.error("unknown parse method, only auto, ocr, txt allowed")
exit(1)
# 执行分类
pipe.pipe_classify()
# 如果没有传入模型数据,则使用内置模型解析
if not model_json:
if model_config.__use_inside_model__:
pipe.pipe_analyze() # 解析
else:
logger.error("need model list input")
exit(1)
# 执行解析
pipe.pipe_parse()
# 保存 text 和 md 格式的结果
content_list = pipe.pipe_mk_uni_format(image_path_parent, drop_mode="none")
md_content = pipe.pipe_mk_markdown(image_path_parent, drop_mode="none")
if is_json_md_dump:
json_md_dump(pipe, md_writer, pdf_name, content_list, md_content)
# 输出md_content
return [md_content,os.path.join(output_path, f"{pdf_name}.md")]
except Exception as e:
logger.exception(e)
# 定义一个函数来获取最新的日志信息
def get_logs():
return "\n".join(log_messages)
def zip_files_and_dirs(files_to_zip, zip_file_path):
# 创建一个ZipFile对象
with ZipFile(zip_file_path, 'w') as zip_object:
for item in files_to_zip:
if os.path.isfile(item):
# 如果是文件,直接添加到压缩文件
zip_object.write(item, os.path.basename(item))
elif os.path.isdir(item):
# 如果是目录,遍历目录并添加到压缩文件
for folder_name, sub_folders, file_names in os.walk(item):
for filename in file_names:
file_path = os.path.join(folder_name, filename)
# 使用相对路径来存储文件在压缩包中的路径
relative_path = os.path.relpath(file_path, item)
zip_object.write(file_path, os.path.join(os.path.basename(item), relative_path))
else:
print(f"警告: {item} 既不是文件也不是目录,跳过此项。")
def export_zip(base_path):
# 获得父级路径
parent_path = os.path.dirname(base_path)
# 获得文件名
file_name = os.path.basename(base_path)
# image_dir
images_dir = os.path.join(parent_path, "images")
# 压缩文件
# 定义要压缩的文件和目标压缩文件的路径
files_to_zip = [base_path, images_dir]
zip_file_path =os.path.join(parent_path, f"{file_name}.zip")
zip_files_and_dirs(files_to_zip, zip_file_path)
return zip_file_path
def pdf_parse( pdf_path: str,
progress=gr.Progress()):
# 文件迁移到脚本目录的.temp
file_name = os.path.basename(pdf_path)
pdf_name = file_name.split(".")[0]
target_pdf_path = os.path.join(os.path.dirname(__file__), ".temp",file_name )
# 复制文件到脚本目录的.temp
with open(target_pdf_path, "wb") as f:
f.write(open(pdf_path, "rb").read())
# 开始解析
[markdown_content,file_path] = pdf_parse_main(target_pdf_path,progress)
# 替换markdown_content的所有图片,增加 /file=相对路径
markdown_content = markdown_content.replace("![](", "![](/file=.temp/" + pdf_name+"/")
return [markdown_content,file_path]
with gr.Blocks() as demo:
with gr.Row(equal_height=True):
with gr.Column():
pdf_input = PDF(label="上传PDF文档",interactive=True)
#pdf_input = gr.File(label="上传PDF文档", file_types=["pdf"])
extract_button = gr.Button("开始抽取")
with gr.Column():
# 保存文件地址,用于后期打包
base_path = gr.State("")
export_button = gr.Button("打包下载")
download_output = gr.File(label="导出")
markdown_output = gr.Markdown(label="识别结果")
extract_button.click(
pdf_parse,
inputs=[pdf_input],
outputs=[markdown_output , base_path]
)
export_button.click(
export_zip,
inputs=[base_path],
outputs=[download_output]
)
logger.info(f"waiting for model init")
model_init = init_model()
logger.info(f"model_init: {model_init}")
demo.queue().launch(inbrowser=True,allowed_paths=["./temp"])