-
Notifications
You must be signed in to change notification settings - Fork 14
/
reactbot.py
375 lines (314 loc) · 17 KB
/
reactbot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
# -*- coding: utf-8 -*-
"""
This module runs basically everything.
Attributes:
VERSION = "2.0.0" (String): Version Number: release.version_num.revision_num
# Config Variables
EMOJI_LIST (List): List of Strings for emojis to be added to announcements
USER_LIST (JSON): List of users in JSON format
ADMIN (List): ["U25PPE8HH", "U262D4BT6", "U0LAMSXUM", "U3EAHHF40"] testing defaults
TTPB (String): Config variable, sets channel for cleverbot integration
GENERAL (Stirng): Config variable, sets channel for general's name
LOG (boolean): Global Variable
LOGC (boolean): Global Variable
.. _Google Python Style Guide:
http://google.github.io/styleguide/pyguide.html
"""
from slackclient import SlackClient
import threading, websocket, json, re, time, codecs, random, logging
import scripts
from bot import Bot
from pb_logging import PBLogger
logger = PBLogger('ReactBot')
class ReactBot(Bot):
# Set the name for the logger
# Add custom log handler to logger
def __init__(self, token, bot_name=""):
super(ReactBot, self).__init__(token, bot_name)
self.connect_to_slack(token)
self.annoyance = 0 # Used for the "No, this is PantherHackers" response
def connect_to_slack(self, token):
# Initiates connection to the server based on the token, receives websocket URL "bot_conn"
logger.info("Starting RTM connection")
self.BOT_CONN = self.SLACK_CLIENT.api_call(
"rtm.start",
token = token
)
logger.info("Initializing info")
self.initialize_info()
# Creates WebSocketApp based on the URL returned by the RTM API
# Assigns local methods to websocket methods
logger.info("Initializing WebSocketApplication")
self.WEBSOCKET = websocket.WebSocketApp(self.BOT_CONN["url"],
on_message=self.on_message,
on_error=self.on_error,
on_close=self.on_close,
on_open=self.on_open)
def initialize_info(self):
# Update current USER_LIST (since members may join while PantherBot is off, its safe to make an API call every initial run)
# When database is implemented, this should be sure to cross reference the database's list with this so new users are added.
self.USER_LIST = self.SLACK_CLIENT.api_call(
"users.list"
)
# Obtains the User Greeting for new users from the stored file
with open("user_greeting.txt") as file:
self.USER_GREETING = file.read()
# List of polls for all channels
self.POLLING_LIST = {}
pub_channels = self.SLACK_CLIENT.api_call(
"channels.list",
exclude_archived=1
)
pri_channels = self.SLACK_CLIENT.api_call(
"groups.list",
exclude_archived=1
)
for c in pub_channels["channels"]:
# Sets channel polling option to an array of format ["", [], "none"]
self.POLLING_LIST[c["id"]] = ["",[],"none"]
for c in pri_channels["groups"]:
self.POLLING_LIST[c["id"]] = ["",[],"none"]
# Update current General Channel (usually announcements)
temp_list_of_channels = Bot.channels_to_ids(self, ["announcements"])
if not temp_list_of_channels:
temp_list_of_channels = Bot.channels_to_ids(self, ["general"])
try:
if Bot.GENERAL_CHANNEL is "":
Bot.GENERAL_CHANNEL = temp_list_of_channels[0]
except:
logger.warning("The #general channel has been renamed to a non-default value")
def on_message(self, ws, message):
message_thread = threading.Thread(target=self.on_message_thread, args=(message,))
self.THREADS.append(message_thread)
message_thread.start()
return
def on_message_thread(self, message):
s = message.decode('utf-8')
message_json = json.loads(unicode(s))
logger.info(message_json["type"].replace("_"," ").title())
try:
if Bot.GENERAL_CHANNEL == message_json["channel"]:
if "member_joined_channel" == message_json["type"]:
self.message_user(message_json["user"], self.USER_GREETING)
except:
pass
if "message" == message_json["type"]:
if "subtype" in message_json:
if message_json["subtype"] == "bot_message":
#polling logic
if "POLL_BEGIN" in message_json["text"]:
self.POLLING_LIST[message_json["channel"]][0] = message_json["ts"]
temp_options = self.POLLING_LIST[message_json["channel"]][1]
for key in temp_options:
Bot.emoji_reaction(self, message_json, temp_options[key].strip(":"))
return
# Announcement reactions
self.react_announcement(message_json)
# General commands
# if riyansDenial(message_json):
# return
if self.other_message(message_json):
return
if self.command_message(message_json):
return
if self.admin_message(message_json):
return
def on_error(self, ws, error):
return
def on_close(self, ws):
return
def on_open(self, ws):
return
# message_json Message
# Sends a message to the same channel that message_json originates from
# Provide a username and icon URL to override the default ones
def response_message(self, message_json, l, username=None, icon_url=None):
for text in l:
self.SLACK_CLIENT.api_call(
"chat.postMessage",
channel=message_json["channel"],
text=text,
username=username if username is not None else self.BOT_NAME,
icon_url=icon_url if icon_url is not None else self.BOT_ICON_URL
)
logger.debug("Message sent")
# Command Messages are messages that begin with the `!` prefix
# Returns True if a message_json or trigger was used in this method
def command_message(self, message_json):
# Checks if message starts with an exclamation point, and does the respective task
if message_json["text"].startswith("!"):
# Checks if the message is longer than a single character
if len(message_json["text"]) <= 1:
return False
# Put all ! command parameters into an array
args = message_json["text"].split()
command_string = args[0][1:].lower()
args.pop(0) # gets rid of the command
# Checks if pattern differs from command messages
# by containing digits or another "$" character
if not command_string.isalpha():
return False
if command_string == "version":
self.response_message(message_json, [self.VERSION])
return True
if command_string == "talk":
ch = Bot.channels_to_ids(self, [self.TTPB])
c = ch[0]
if message_json["channel"] != c:
self.response_message(message_json, ["Talk to me in #" + self.TTPB])
return True
# list that contains the message_json and args for all methods
method_args = []
method_args.append(message_json)
if command_string == "poll":
method_args.append(self.polling_list[message_json["channel"]])
method_args.append(self.SLACK_CLIENT)
if command_string == "pugbomb":
method_args.append(self.pb_cooldown)
if len(args) > 0:
method_args.append(args)
# This is in a try statement since it is checking if a module exists with the command_string name,
# It makes the try statement that was previously around the `called_function` section below much smaller,
# and also less likely to skip an error that should be printed to the console.
try:
# Check if the command is an admin command using the script's self-declaration method
check_admin_function = getattr(self.COMMANDS_LIST[command_string], "is_admin_command")
if check_admin_function():
self.response_message(message_json, ["Sorry, admin commands may only be used with the $ symbol (ie. `$admin`)"])
return True
except:
# If it fails, outputs that no command was found or syntax was broken, since all scripts must follow this convention.
self.response_message(message_json, ["You seem to have used a function that doesnt exist, or used it incorrectly. See `!help` for a list of functions and parameters"])
return True
# Finds the command with the name matching the text given, and executes it, assumed to exist because of above check
called_function = getattr(self.COMMANDS_LIST[command_string], "run")
script_response = called_function(*method_args)
if script_response.status_code is 0:
self.response_message(message_json, script_response.messages_to_send)
else:
error_cleanup = getattr(script_response.module_called, "error_cleanup")
script_response = error_cleanup(script_response.status_code)
self.response_message(message_json, script_response.messages_to_send)
if script_response.special_condition:
special_condition_function = getattr(script_response.module_called, "special_condition")
special_condition_function(self)
return True
return False
# Admin Messages are messages that begin with the `$` prefix
# Returns True if a message_json or trigger was used in this method
def admin_message(self, message_json):
# Repeats above except for admin commands
if message_json["text"].startswith("$"):
# Checks if message is longer than "$"
if len(message_json["text"]) > 1:
args = message_json["text"].split()
command_string = args[0][1:].lower()
args.pop(0)
# Checks if pattern differs from admin commands
# by containing digits or another "$" character
if not command_string.isalpha():
return False
# list that contains the message_json and args for all methods
method_args = []
method_args.append(message_json)
method_args.append(args)
method_args.append(self.SLACK_CLIENT)
method_args.append(self)
method_args.append(self.response_message)
# This is in a try statement since it is checking if a module exists with the command_string name,
# It makes the try statement that was previously around the `called_function` section below much smaller,
# and also less likely to skip an error that should be printed to the console.
try:
# Check if the command is an admin command using the script's self-declaration method
check_admin_function = getattr(self.COMMANDS_LIST[command_string], "is_admin_command")
if not check_admin_function():
self.response_message(message_json, ["Sorry, normal commands should be used with the `!` prefix (ie `!coin`)"])
return True
except:
# If it fails, outputs that no command was found or syntax was broken, since all scripts must follow this convention.
self.response_message(message_json, ["You seem to have used a function that doesnt exist, or used it incorrectly. See `!help` for a list of functions and parameters"])
return True
user_info = self.SLACK_CLIENT.api_call(
"users.info",
user = message_json["user"]
)
if user_info["user"]["is_admin"] is not True:
self.response_message(message_json, ["You don't seem to be an authorized user to use these commands."])
return True
# Finds the command with the name matching the text given, and executes it, assumed to exist because of above check
called_function = getattr(self.COMMANDS_LIST[command_string], "run")
script_response = called_function(*method_args)
if script_response.status_code is 0:
self.response_message(message_json, script_response.messages_to_send)
else:
error_cleanup = getattr(script_response.module_called, "error_cleanup")
script_response = error_cleanup(script_response.status_code)
self.response_message(message_json, script_response.messages_to_send)
if script_response.special_condition:
special_condition_function = getattr(script_response.module_called, "special_condition")
special_condition_function(self)
return True
return False
# Other Messages are messages that don't follow standard conventions (such as "Hey PantherBot!")
# Returns True if a message_json or trigger was used in this method
def other_message(self, message_json):
if "subtype" not in message_json:
message_txt = message_json["text"].lower()
try:
if re.match(".*panther +hackers.*", str(message_txt)):
rebukes = ["No, this is PantherHackers.", "_No,_ this is PantherHackers.", "_*NO, THIS IS PANTHERHACKERS!!*_"] # List of responses in increasing intensity
reaction_image_urls = ["https://s14.postimg.org/od6weheap/annoyance_level_0.png", "https://s14.postimg.org/zcs3q3k5d/annoyance_level_1.png", "https://s14.postimg.org/4vc8yjp2p/annoyance_level_2.png"] # List of avatars to temporarily switch to
# Reset annoyance to 0 when it reaches 2
if (self.annoyance >= len(rebukes)):
self.annoyance = 0
rebuke = rebukes[self.annoyance]
image_url = reaction_image_urls[self.annoyance]
self.annoyance += 1 # Increase annoyance each time this is triggered
self.response_message(message_json, [rebuke], icon_url=image_url)
return True
elif message_txt == "hey pantherbot":
# returns user info that said hey
# TODO make this use USER_LIST
temp_user = self.SLACK_CLIENT.api_call(
"users.info",
user = message_json["user"]
)
logger.info("We did it reddit")
self.response_message(message_json, ["Hello, " + temp_user["user"]["profile"]["first_name"] + "! :tada:"])
return True
elif message_txt == "pantherbot ping":
self.response_message(message_json, ["PONG"])
return True
elif message_txt == ":rip: pantherbot" or message_txt == "rip pantherbot":
self.response_message(message_json, [":rip:"])
return True
# No response was necessary
return False
except:
logger.error("Error with checking in other_message: likely the message contained unicode characters")
elif "subtype" in message_json:
if message_json["subtype"] == "channel_leave" or message_json["subtype"] == "group_leave":
return True
else:
return False
return False
def message_user(self, user_id, text):
dm_json = self.SLACK_CLIENT.api_call(
"im.open",
user=user_id
)
self.send_msg(dm_json["channel"]["id"], text, is_id=True)
# Reacts to all messages posted in the GENERAL channel with a pre-defined list of emojis
def react_announcement(self, message_json):
if self.GENERAL_CHANNEL != "" and message_json["channel"] == self.GENERAL_CHANNEL:
temp_list = list(self.EMOJI_LIST)
Bot.emoji_reaction(self, message_json["channel"], message_json["ts"], "pantherbot")
for x in range(0, 3):
num = random.randrange(0, len(temp_list))
Bot.emoji_reaction(self, message_json["channel"], message_json["ts"], temp_list.pop(num))
def riyans_denial(self, message_json):
if "U0LJJ7413" in message_json["user"]:
if message_json["text"][:1] in ["!", "$"] or message_json["text"].lower() in ["hey pantherbot", "pantherbot ping"]:
self.response_message(message_json, ["No."])
return True
return False