-
Notifications
You must be signed in to change notification settings - Fork 0
/
bot.py
167 lines (129 loc) · 5.17 KB
/
bot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
from keep_alive import start
start()
import os
os.system("playwright install")
# check if playright is installed, if not keep trying to install it
while True:
try:
from playwright.sync_api import sync_playwright
break
except:
os.system("playwright install")
import interactions
from interactions import *
from interactions.ext import prefixed_commands
from interactions.ext.prefixed_commands import prefixed_command, PrefixedContext
from characterai import PyAsyncCAI
import characterai
import asyncio
from dotenv import load_dotenv
load_dotenv()
CAiKey = os.getenv("CAIKEY")
char = "YntB_ZeqRq2l_aVf2gWDCZl4oBttQzDvhj9cXafWcF8"
bot_intents: Intents = Intents.MESSAGE_CONTENT | Intents.GUILD_MEMBERS | Intents.GUILDS | Intents.DIRECT_MESSAGES | Intents.MESSAGES | Intents.GUILD_MESSAGES
bot = Client(send_command_tracebacks=False, intents=bot_intents, prefix="AI=")
prefixed_commands.setup(bot)
stopped = False
messages = []
requests = {}
@listen()
async def on_ready():
global client
global chat
await bot.change_presence(
status=Status.IDLE, activity=Activity(type=ActivityType.WATCHING, name="the chat 🔮")
)
client = PyAsyncCAI(CAiKey)
await client.start()
chat = await client.chat.get_chat(char)
print(f"{bot.user} has logged in!")
while True:
if messages != []:
await compose_answer(messages[0])
messages.remove(messages[0])
else:
await asyncio.sleep(1)
if requests != {}:
for key in requests:
if requests.get(key)["limit"] == True:
await asyncio.sleep(5)
requests[key]["limit"] = False
async def respond(message, user):
participants = chat['participants']
if not participants[0]['is_human']:
tgt = participants[0]['user']['username']
else:
tgt = participants[1]['user']['username']
message = f"{user}: {message}"
print(message)
try:
data = await client.chat.send_message(chat["external_id"], tgt, message)
except characterai.FilterError:
return "NSFW Content Detected, please try again with a different message!"
# name = data["src_char"]["participant"]["name"]
text = data["replies"][0]["text"]
return text
async def compose_answer(message):
await message.channel.trigger_typing()
response = await respond(message.content, message.author.display_name)
await message.reply(response)
def calc_request(user):
if requests.get(user.id) == None:
requests[user.id] = { "messages":1 , "limit":True }
return True
else:
if requests[user.id]["limit"] == False:
if requests[user.id]["messages"] >= 5:
return False
else:
requests[user.id]["messages"] += 1
requests[user.id]["limit"] = True
return True
elif requests[user.id]["limit"] == True:
return False
@listen()
async def on_message_create(ctx):
global stopped
global messages
if ctx.message.content == "<@1039221480157884496> halt":
if ctx.message.author == bot.owner:
stopped = True
await ctx.message.reply("AI Answering Halted")
if ctx.message.author.bot:
return
if stopped == False:
if ctx.message.channel.id == 1071105901299241091:
if calc_request(ctx.message.author):
messages.append(ctx.message)
if requests.get(ctx.message.author.id)["messages"] == 1:
await asyncio.sleep(60)
requests.pop(ctx.message.author.id)
elif requests.get(ctx.message.author.id)["limit"]:
msg = await ctx.message.reply("You are sending too many messages, please wait a little before sending another one")
await asyncio.sleep(5)
await msg.delete()
else:
msg = await ctx.message.reply("You are sending too many messages, please wait a minute before sending another one.")
await asyncio.sleep(5)
await msg.delete()
elif ctx.message.channel.type == ChannelType.DM:
if calc_request(ctx.message.author):
messages.append(ctx.message)
if requests.get(ctx.message.author.id)["messages"] == 1:
await asyncio.sleep(60)
requests.pop(ctx.message.author.id)
elif requests.get(ctx.message.author.id)["limit"]:
msg = await ctx.message.reply("You are sending too many messages, please wait a little before sending another one")
await asyncio.sleep(5)
await msg.delete()
else:
msg = await ctx.message.reply("You are sending too many messages, please wait a minute before sending another one.")
await asyncio.sleep(5)
await msg.delete()
if ctx.message.content == "<@1039221480157884496> resume":
if ctx.message.author == bot.owner:
stopped = False
await ctx.message.reply("AI Answering Resumed")
# await bot.process_commands(ctx)
token = os.environ["TOKEN"]
bot.start(token)