Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support of animated sticker #78

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
*.pyc
__pycache__
*.egg-info
build/

node_modules
web/lib/import-map.json
web/packs/*.json

*.session
/*.json
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ pillow
telethon
cryptg
python-magic
lottie[all]
267 changes: 250 additions & 17 deletions sticker/lib/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,29 +15,262 @@
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from functools import partial
from io import BytesIO
import numpy as np
import os.path
import subprocess
import json
import tempfile
import mimetypes

from PIL import Image
try:
import magic
except ImportError:
print("[Warning] Magic is not installed, using file extensions to guess mime types")
magic = None
from PIL import Image, ImageSequence, ImageFilter

from . import matrix

open_utf8 = partial(open, encoding='UTF-8')

def convert_image(data: bytes) -> (bytes, int, int):
image: Image.Image = Image.open(BytesIO(data)).convert("RGBA")
new_file = BytesIO()
image.save(new_file, "png")
w, h = image.size
if w > 256 or h > 256:
# Set the width and height to lower values so clients wouldn't show them as huge images
if w > h:
h = int(h / (w / 256))
w = 256

def guess_mime(data: bytes) -> str:
mime = None
if magic:
try:
return magic.Magic(mime=True).from_buffer(data)
except Exception:
pass
else:
with tempfile.NamedTemporaryFile() as temp:
temp.write(data)
temp.close()
mime, _ = mimetypes.guess_type(temp.name)
return mime or "image/png"


def _video_to_webp(data: bytes) -> bytes:
mime = guess_mime(data)
ext = mimetypes.guess_extension(mime)
with tempfile.NamedTemporaryFile(suffix=ext) as video:
video.write(data)
video.flush()
with tempfile.NamedTemporaryFile(suffix=".webp") as webp:
print(".", end="", flush=True)
ffmpeg_encoder_args = []
if mime == "video/webm":
encode = subprocess.run(["ffprobe", "-v", "error", "-select_streams", "v:0", "-show_entries", "stream=codec_name", "-of", "default=nokey=1:noprint_wrappers=1", video.name], capture_output=True, text=True).stdout.strip()
ffmpeg_encoder = None
if encode == "vp8":
ffmpeg_encoder = "libvpx"
elif encode == "vp9":
ffmpeg_encoder = "libvpx-vp9"
if ffmpeg_encoder:
ffmpeg_encoder_args = ["-c:v", ffmpeg_encoder]
result = subprocess.run(["ffmpeg", "-y", "-threads", "auto", *ffmpeg_encoder_args, "-i", video.name, "-lossless", "1", webp.name],
capture_output=True)
if result.returncode != 0:
raise RuntimeError(f"Run ffmpeg failed with code {result.returncode}, Error occurred:\n{result.stderr}")
webp.seek(0)
return webp.read()


def video_to_webp(data: bytes) -> bytes:
mime = guess_mime(data)
ext = mimetypes.guess_extension(mime)
# run ffmpeg to fix duration
with tempfile.NamedTemporaryFile(suffix=ext) as temp:
temp.write(data)
temp.flush()
with tempfile.NamedTemporaryFile(suffix=ext) as temp_fixed:
print(".", end="", flush=True)
result = subprocess.run(["ffmpeg", "-y", "-threads", "auto", "-i", temp.name, "-codec", "copy", temp_fixed.name],
capture_output=True)
if result.returncode != 0:
raise RuntimeError(f"Run ffmpeg failed with code {result.returncode}, Error occurred:\n{result.stderr}")
temp_fixed.seek(0)
data = temp_fixed.read()
return _video_to_webp(data)


def process_frame(frame):
"""
Process GIF frame, repair edges, ensure no white or semi-transparent pixels, while keeping color information intact.
"""
frame = frame.convert('RGBA')

# Decompose Alpha channel
alpha = frame.getchannel('A')

# Process Alpha channel with threshold, remove semi-transparent pixels
# Threshold can be adjusted as needed (0-255), 128 is the middle value
threshold = 128
alpha = alpha.point(lambda x: 255 if x >= threshold else 0)

# Process Alpha channel with MinFilter, remove edge noise
alpha = alpha.filter(ImageFilter.MinFilter(3))

# Process Alpha channel with MaxFilter, repair edges
alpha = alpha.filter(ImageFilter.MaxFilter(3))

# Apply processed Alpha channel back to image
frame.putalpha(alpha)

return frame


def webp_to_others(data: bytes, mimetype: str) -> bytes:
format = mimetypes.guess_extension(mimetype)[1:]
print(format)
with Image.open(BytesIO(data)) as webp:
with BytesIO() as img:
print(".", end="", flush=True)
webp.info.pop('background', None)

if mimetype == "image/gif":
frames = []
duration = [100, ]

for frame in ImageSequence.Iterator(webp):
frame = process_frame(frame)
frames.append(frame)
duration.append(frame.info.get('duration', duration[-1]))

frames[0].save(img, format=format, save_all=True, lossless=True, quality=100, method=6,
append_images=frames[1:], loop=0, duration=duration[1:], disposal=2)
else:
webp.save(img, format=format, lossless=True, quality=100, method=6)

img.seek(0)
return img.read()


def is_uniform_animated_webp(data: bytes) -> bool:
with Image.open(BytesIO(data)) as img:
if img.n_frames <= 1:
return True

img_iter = ImageSequence.Iterator(img)
first_frame = np.array(img_iter[0].convert("RGBA"))

for frame in img_iter:
current_frame = np.array(frame.convert("RGBA"))
if not np.array_equal(first_frame, current_frame):
return False

return True


def webp_to_gif_or_png(data: bytes) -> bytes:
with Image.open(BytesIO(data)) as image:
# check if the webp is animated
is_animated = getattr(image, "is_animated", False)
if is_animated and not is_uniform_animated_webp(data):
return webp_to_others(data, "image/gif")
else:
w = int(w / (h / 256))
h = 256
return new_file.getvalue(), w, h
# convert to png
return webp_to_others(data, "image/png")


def opermize_gif(data: bytes) -> bytes:
with tempfile.NamedTemporaryFile() as gif:
gif.write(data)
gif.flush()
# use gifsicle to optimize gif
result = subprocess.run(["gifsicle", "--batch", "--optimize=3", "--colors=256", gif.name],
capture_output=True)
if result.returncode != 0:
raise RuntimeError(f"Run gifsicle failed with code {result.returncode}, Error occurred:\n{result.stderr}")
gif.seek(0)
return gif.read()


def _convert_image(data: bytes, mimetype: str) -> (bytes, int, int):
with Image.open(BytesIO(data)) as image:
with BytesIO() as new_file:
# Determine if the image is a GIF
is_animated = getattr(image, "is_animated", False)
if is_animated:
frames = [frame.convert("RGBA") for frame in ImageSequence.Iterator(image)]
# Save the new GIF
frames[0].save(
new_file,
format='GIF',
save_all=True,
append_images=frames[1:],
loop=image.info.get('loop', 0), # Default loop to 0 if not present
duration=image.info.get('duration', 100), # Set a default duration if not present
disposal=image.info.get('disposal', 2) # Default to disposal method 2 (restore to background)
)
# Get the size of the first frame to determine resizing
w, h = frames[0].size
else:
suffix = mimetypes.guess_extension(mimetype)
if suffix:
suffix = suffix[1:]
image = image.convert("RGBA")
image.save(new_file, format=suffix)
w, h = image.size
if w > 256 or h > 256:
# Set the width and height to lower values so clients wouldn't show them as huge images
if w > h:
h = int(h / (w / 256))
w = 256
else:
w = int(w / (h / 256))
h = 256
return new_file.getvalue(), w, h


def _convert_sticker(data: bytes) -> (bytes, str, int, int):
mimetype = guess_mime(data)
if mimetype.startswith("video/"):
xz-dev marked this conversation as resolved.
Show resolved Hide resolved
data = video_to_webp(data)
print(".", end="", flush=True)
elif mimetype.startswith("application/gzip"):
print(".", end="", flush=True)
# unzip file
import gzip
with gzip.open(BytesIO(data), "rb") as f:
data = f.read()
mimetype = guess_mime(data)
suffix = mimetypes.guess_extension(mimetype)
with tempfile.NamedTemporaryFile(suffix=suffix) as temp:
temp.write(data)
with tempfile.NamedTemporaryFile(suffix=".webp") as gif:
# run lottie_convert.py input output
print(".", end="", flush=True)
import subprocess
cmd = ["lottie_convert.py", temp.name, gif.name]
result = subprocess.run(cmd, capture_output=True, text=True)
retcode = result.returncode
if retcode != 0:
raise RuntimeError(f"Run {cmd} failed with code {retcode}, Error occurred:\n{result.stderr}")
gif.seek(0)
data = gif.read()
mimetype = guess_mime(data)
if mimetype == "image/webp":
data = webp_to_gif_or_png(data)
mimetype = guess_mime(data)
rlt = _convert_image(data, mimetype)
data = rlt[0]
if mimetype == "image/gif":
print(".", end="", flush=True)
data = opermize_gif(data)
return data, mimetype, rlt[1], rlt[2]


def convert_sticker(data: bytes) -> (bytes, str, int, int):
try:
return _convert_sticker(data)
except Exception as e:
mimetype = guess_mime(data)
print(f"Error converting image, mimetype: {mimetype}")
ext = mimetypes.guess_extension(mimetype)
with tempfile.NamedTemporaryFile(delete=False, suffix=ext) as temp:
temp.write(data)
print(f"Saved to {temp.name}")
raise e


def add_to_index(name: str, output_dir: str) -> None:
Expand All @@ -57,23 +290,23 @@ def add_to_index(name: str, output_dir: str) -> None:


def make_sticker(mxc: str, width: int, height: int, size: int,
body: str = "") -> matrix.StickerInfo:
mimetype: str, body: str = "") -> matrix.StickerInfo:
return {
"body": body,
"url": mxc,
"info": {
"w": width,
"h": height,
"size": size,
"mimetype": "image/png",
"mimetype": mimetype,

# Element iOS compatibility hack
"thumbnail_url": mxc,
"thumbnail_info": {
"w": width,
"h": height,
"size": size,
"mimetype": "image/png",
"mimetype": mimetype,
},
},
"msgtype": "m.sticker",
Expand Down
6 changes: 3 additions & 3 deletions sticker/pack.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,11 @@ async def upload_sticker(file: str, directory: str, old_stickers: Dict[str, matr
}
print(f".. using existing upload")
else:
image_data, width, height = util.convert_image(image_data)
image_data, mimetype, width, height = util.convert_sticker(image_data)
print(".", end="", flush=True)
mxc = await matrix.upload(image_data, "image/png", file)
mxc = await matrix.upload(image_data, mimetype, file)
print(".", end="", flush=True)
sticker = util.make_sticker(mxc, width, height, len(image_data), name)
sticker = util.make_sticker(mxc, width, height, len(image_data), mimetype, name)
sticker["id"] = sticker_id
print(" uploaded", flush=True)
return sticker
Expand Down
6 changes: 3 additions & 3 deletions sticker/stickerimport.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ async def reupload_document(client: TelegramClient, document: Document) -> matri
print(f"Reuploading {document.id}", end="", flush=True)
data = await client.download_media(document, file=bytes)
print(".", end="", flush=True)
data, width, height = util.convert_image(data)
data, mimetype, width, height = util.convert_sticker(data)
print(".", end="", flush=True)
mxc = await matrix.upload(data, "image/png", f"{document.id}.png")
mxc = await matrix.upload(data, mimetype, f"{document.id}.png")
print(".", flush=True)
return util.make_sticker(mxc, width, height, len(data))
return util.make_sticker(mxc, width, height, len(data), mimetype)


def add_meta(document: Document, info: matrix.StickerInfo, pack: StickerSetFull) -> None:
Expand Down