-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathobject_removal.py
150 lines (124 loc) · 4.75 KB
/
object_removal.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
"""
This script performs motion compensation on videos to remove moving objects
using macroblock analysis and hierarchical image processing. It saves the output
video and generates a GIF of the processed frames. The script utilizes OpenCV
and NumPy for video manipulation and numerical operations.
"""
import numpy as np
import cv2
import os
import logging
import argparse
from pathlib import Path
from tqdm import tqdm
from utils import (
save_as_gif,
divide_to_macroblocks,
create_upper_levels,
motion_exist,
move_to_low_levels,
rebuild_image,
)
def process_video(input_file, output_path):
"""
Perform motion compensation to remove objects in motion from a video.
Args:
input_file: Path to the input video file.
output_path: Path where the output video will be saved.
"""
logging.info(f"Starting video processing for {input_file}")
output_file = os.path.join(output_path, f"{Path(input_file).stem}_no_object.avi")
output_file_gif = os.path.join(
output_path, f"{Path(input_file).stem}_no_object.gif"
)
try:
vid = cv2.VideoCapture(input_file)
if not vid.isOpened():
logging.error(f"Failed to open video file: {input_file}")
return
frame_count = int(vid.get(cv2.CAP_PROP_FRAME_COUNT))
frame_width = int(vid.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(vid.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = int(vid.get(cv2.CAP_PROP_FPS))
size = (frame_width, frame_height)
out = cv2.VideoWriter(
output_file, cv2.VideoWriter_fourcc(*"MJPG"), fps, size, False
)
ret, background = vid.read()
if not ret:
logging.error("Failed to read the first frame as background.")
return
background = cv2.cvtColor(background, cv2.COLOR_BGR2GRAY)
output_frames = []
with tqdm(total=frame_count, desc="Processing frames", unit="frame") as pbar:
while vid.isOpened():
ret, curr_frame = vid.read()
if not ret:
break
curr_frame = cv2.cvtColor(curr_frame, cv2.COLOR_BGR2GRAY)
hierimg1, hierimg2 = create_upper_levels(background, curr_frame)
background_macroblocks = divide_to_macroblocks(4, hierimg1[2])
macroblocked_image = divide_to_macroblocks(4, hierimg2[2])
movement_blocks = [
i
for i in range(len(background_macroblocks))
if motion_exist(background_macroblocks[i], macroblocked_image[i])
]
background_macroblocks, macroblocked_image, movement_blocks = (
move_to_low_levels(movement_blocks, hierimg1, hierimg2)
)
for block_idx in movement_blocks:
macroblocked_image[block_idx] = background_macroblocks[block_idx]
corrected_frame = rebuild_image(
frame_height // 16, frame_width // 16, np.uint8(macroblocked_image)
)
out.write(corrected_frame)
output_frames.append(corrected_frame)
background = rebuild_image(
frame_height // 16,
frame_width // 16,
np.uint8(background_macroblocks),
)
pbar.update(1)
vid.release()
out.release()
save_as_gif(output_frames, output_file_gif)
logging.info(f"New video created: {output_file}")
cv2.destroyAllWindows()
except Exception as e:
logging.error(f"Error occurred during video processing: {str(e)}")
def parse_arguments():
"""
Parses command-line arguments for input/output paths.
Returns:
argparse.Namespace: Parsed command-line arguments.
"""
parser = argparse.ArgumentParser(
description="Motion compensation video encoding and decoding."
)
parser.add_argument(
"--video_path",
type=str,
default="OriginalVideos/formula.mp4",
help="Path to the input video file.",
)
parser.add_argument(
"--output_path",
type=str,
required=True,
help="Path to the output directory for saving files.",
)
return parser.parse_args()
if __name__ == "__main__":
args = parse_arguments()
# Setup logging
logging.basicConfig(
format="%(asctime)s - %(levelname)s - %(message)s", level="INFO"
)
os.makedirs(args.output_path, exist_ok=True)
try:
logging.info("Starting video processing...")
process_video(args.video_path, args.output_path)
logging.info(f"Video processing completed.")
except Exception as e:
logging.error(f"An error occurred: {e}")