forked from ZKeeer/WeChatAssistant
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrevocation.py
170 lines (150 loc) · 7.17 KB
/
revocation.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
# -*-encoding:utf-8-*-
import os
import random
import re
import shutil
import time
import itchat
class Revocation:
msg_store = {}
def __init__(self):
if not os.path.exists("./Cache/"):
os.mkdir("./Cache/")
if not os.path.exists("./Revocation/"):
os.mkdir("./Revocation/")
# ClearTimeOutMsg用于清理消息字典,把超时消息清理掉
# 为减少资源占用,此函数只在有新消息动态时调用
def ClearTimeOutMsg(self):
"""
清理msg_store中超时的消息
:return: 无
"""
if self.msg_store.__len__() > 0:
for msgid in list(self.msg_store): # 由于字典在遍历过程中不能删除元素,故使用此方法
if time.time() - self.msg_store.get(msgid, None)["msg_time"] > 300.0: # 超时两分钟
item = self.msg_store.pop(msgid)
# 可下载类消息,并删除相关文件
if item['msg_type'] in ['Picture', 'Recording', 'Video', 'Attachment']:
try:
os.remove("./Cache/" + item['msg_content'])
except BaseException as e:
pass
def GetOldMsg(self, msg):
"""
从存储的消息中查找撤回到消息
:param msg: 微信消息
:return: 无
"""
old_msg_id = ""
old_msg = {}
itchat.search_chatrooms()
if re.search(r"\!\[CDATA\[.*撤回了一条消息\]\]", msg['Content']):
if re.search("\<msgid\>(.*?)\<\/msgid\>", msg['Content']):
old_msg_id = re.search("\<msgid\>(.*?)\<\/msgid\>", msg['Content']).group(1)
elif re.search("\;msgid\>\;(.*?)\<", msg['Content']):
old_msg_id = re.search("\;msgid\>\;(.*?)\<", msg['Content']).group(1)
old_msg = self.msg_store.get(old_msg_id, {})
return old_msg_id, old_msg
def GetMsgFrom(self, msg):
"""
获取消息来源:联系人和群名
:param msg: 微信消息
:return: 无
"""
msg_group = r""
if itchat.search_friends(userName=msg['FromUserName']):
if itchat.search_friends(userName=msg['FromUserName'])['RemarkName']:
msg_from = itchat.search_friends(userName=msg['FromUserName'])['RemarkName'] # 消息发送人备注
elif itchat.search_friends(userName=msg['FromUserName'])['NickName']: # 消息发送人昵称
msg_from = itchat.search_friends(userName=msg['FromUserName'])['NickName'] # 消息发送人昵称
else:
msg_from = r"读取好友失败"
else:
msg_from = msg.get('ActualNickName', "")
if not msg_from:
msg_from = itchat.search_friends(userName=msg['FromUserName'])
if itchat.search_chatrooms(userName=msg['FromUserName']):
msg_group += r"[{}] ".format(itchat.search_chatrooms(userName=msg['FromUserName'])['NickName'])
return msg_from, msg_group
def SaveMsg(self, msg):
"""
储存所有接收到的消息,以便撤回时查找
:param msg: 微信消息
:return: 无
"""
msg_id = msg['MsgId'] # 消息ID
msg_time = msg['CreateTime'] # 消息时间
msg_from, msg_group = self.GetMsgFrom(msg) # 消息来源
msg_type = msg['Type'] # 消息类型
msg_content = None # 根据消息类型不同,消息内容不同
msg_url = None # 分享类消息有url
# 图片 语音 附件 视频,可下载消息将内容下载暂存到当前目录
if msg['Type'] == 'Text':
msg_content = msg['Text']
elif msg['Type'] == 'Picture':
msg_content = msg['FileName']
msg['Text'](msg['FileName'])
if os.path.exists("./Cache/{}".format(msg_content)):
msg_content_modify = msg_content.replace('.', '-{}.'.format(random.choice(range(0, 100))))
os.rename(msg_content, msg_content_modify)
msg_content = msg_content_modify
shutil.move(msg_content, r"./Cache/")
elif msg['Type'] == 'Card':
msg_content = r"{}的名片".format(msg['RecommendInfo']['NickName'])
elif msg['Type'] == 'Map':
x, y, location = re.search("<location x=\"(.*?)\" y=\"(.*?)\".*label=\"(.*?)\".*",
msg['OriContent']).group(1, 2, 3)
if location:
msg_content = location
else:
msg_content = r"纬度->{} 经度->{}".format(x, y)
elif msg['Type'] == 'Sharing':
msg_content = msg['Text']
msg_url = msg['Url']
elif msg['Type'] == 'Recording':
msg_content = msg['FileName']
msg['Text'](msg['FileName'])
shutil.move(msg_content, r"./Cache/")
elif msg['Type'] == 'Attachment':
msg_content = msg['FileName']
msg['Text'](msg['FileName'])
shutil.move(msg_content, r"./Cache/")
elif msg['Type'] == 'Video':
msg_content = msg['FileName']
msg['Text'](msg['FileName'])
shutil.move(msg_content, r"./Cache/")
elif msg['Type'] == 'Friends':
msg_content = msg['Text']
self.msg_store.update(
{msg_id: {"msg_from": msg_from, "msg_time": msg_time, "msg_type": msg_type,
"msg_content": msg_content, "msg_url": msg_url, "msg_group": msg_group}})
def GetMsgToSend(self, old_msg, msg_time_to_user):
msg_send = "{0}{1}{0}{2}".format("=" * 6, "撤回消息", "\n")
msg_send += "时间: {0}{1}谁: {2}{1}".format(
msg_time_to_user, "\n", old_msg.get('msg_from', None))
if old_msg.get('msg_group', None):
msg_send += "群组: {}{}".format(old_msg['msg_group'], "\n")
msg_send += "类型: {0}{1}内容: {2}{1}".format(old_msg.get('msg_type', None), "\n", old_msg.get('msg_content', None))
if old_msg['msg_type'] == "Sharing":
msg_send += r"网址: {}{}".format(old_msg.get('msg_url', None), "\n")
elif old_msg['msg_type'] in ['Picture', 'Recording', 'Video', 'Attachment']:
msg_send += r"存储: Revocation文件夹中{}使用命令: 查看文件[{}]".format("\n", old_msg.get('msg_content', None))
shutil.move(r"./Cache/{}".format(old_msg['msg_content']), r"./Revocation/")
return msg_send
def Revocation(self, msg):
"""
监听撤回消息通知
:param msg: 微信消息
:return: 无
"""
mytime = time.localtime()
msg_time_touser = time.strftime("%Y/%m/%d %H:%M:%S", mytime)
# 创建可下载消息内容的存放文件夹,并将暂存在当前目录的文件移动到该文件中
if not os.path.exists("./Revocation/"):
os.mkdir("./Revocation/")
msg_id, old_msg = self.GetOldMsg(msg)
if old_msg:
msg_send = self.GetMsgToSend(old_msg, msg_time_touser)
# 将撤回消息的通知以及细节发送到文件助手
itchat.send(msg_send, toUserName='filehelper')
# self.msg_store.pop(msg_id)