Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace permission config with DB tables #348

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .mypy.ini
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[mypy]
plugins = sqlmypy
namespace_packages = True
python_version = 3.8
python_version = 3.6
warn_unused_configs = True
strict = False
strict_optional = False
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Expand youtube.py error information
- Handle 'a' vs 'an' in drinks plugin
- Apply rate limiting to regex hooks
- Replaced dynamic permissions config with DB based system
### Fixed
- Fixed config reloading
- Fix matching exception in horoscope test
Expand Down
7 changes: 0 additions & 7 deletions cloudbot/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,3 @@ def load_config(self):

self.update(data)
logger.debug("Config loaded from file.")

def save_config(self):
"""saves the contents of the config dict to the config file"""
with self.path.open("w", encoding="utf-8") as f:
json.dump(self, f, indent=4)

logger.info("Config saved to file.")
287 changes: 170 additions & 117 deletions cloudbot/permissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@
from typing import Optional

from irclib.util.compare import match_mask
from sqlalchemy import Boolean, Column, ForeignKey, String
from sqlalchemy.orm import relationship

from cloudbot.util import database
from cloudbot.util.database import Session

logger = logging.getLogger("cloudbot")

Expand All @@ -10,6 +15,47 @@
backdoor: Optional[str] = None


class Group(database.Base):
__tablename__ = "perm_group"

name = Column(String, nullable=False, primary_key=True)
members = relationship("GroupMember", back_populates="group", uselist=True)
perms = relationship(
"GroupPermission", back_populates="group", uselist=True
)

config = Column(Boolean, default=False)

def is_member(self, mask):
for member in self.members:
if match_mask(mask.lower(), member.mask):
return True

return False


class GroupMember(database.Base):
__tablename__ = "group_member"

group_id = Column(String, ForeignKey(Group.name), primary_key=True)
group = relationship(Group, back_populates="members", uselist=False)

mask = Column(String, primary_key=True, nullable=False)

config = Column(Boolean, default=False)


class GroupPermission(database.Base):
__tablename__ = "group_perm"

group_id = Column(String, ForeignKey(Group.name), primary_key=True)
group = relationship(Group, back_populates="perms", uselist=False)

name = Column(String, primary_key=True, nullable=False)

config = Column(Boolean, default=False)


class PermissionManager:
def __init__(self, conn):
logger.info(
Expand All @@ -18,75 +64,72 @@ def __init__(self, conn):
conn.name,
)

# stuff
self.name = conn.name
self.config = conn.config

self.group_perms = {}
self.group_users = {}
self.perm_users = {}
session = Session()
Group.__table__.create(session.bind, checkfirst=True)
GroupPermission.__table__.create(session.bind, checkfirst=True)
GroupMember.__table__.create(session.bind, checkfirst=True)

self.reload()

def reload(self):
self.group_perms = {}
self.group_users = {}
self.perm_users = {}
logger.info(
"[%s|permissions] Reloading permissions for %s.",
self.name,
self.name,
)
groups = self.config.get("permissions", {})
# work out the permissions and users each group has
for key, value in groups.items():
if not key.islower():
logger.warning(
"[%s|permissions] Warning! Non-lower-case group %r in "
"config. This will cause problems when setting "
"permissions using the bot's permissions commands",
self.name,
key,
session = Session()

updated = []
for group_id, data in self.config.get("permissions", {}).items():
group = self.get_group(group_id)
if not group:
group = Group(name=group_id)
session.add(group)

group.config = True
updated.append(group)

for user in data["users"]:
member = session.get(
GroupMember, {"group_id": group_id, "mask": user}
)
key = key.lower()
self.group_perms[key] = []
self.group_users[key] = []
for permission in value["perms"]:
self.group_perms[key].append(permission.lower())
for user in value["users"]:
self.group_users[key].append(user.lower())

for group, users in self.group_users.items():
group_perms = self.group_perms[group]
for perm in group_perms:
if self.perm_users.get(perm) is None:
self.perm_users[perm] = []
self.perm_users[perm].extend(users)

logger.debug(
"[%s|permissions] Group permissions: %s",
self.name,
self.group_perms,
)
logger.debug(
"[%s|permissions] Group users: %s", self.name, self.group_users
)
logger.debug(
"[%s|permissions] Permission users: %s", self.name, self.perm_users
)
if not member:
member = GroupMember(group_id=group_id, mask=user)
session.add(member)

def has_perm_mask(self, user_mask, perm, notice=True):
if backdoor:
if match_mask(user_mask.lower(), backdoor.lower()):
return True
member.config = True
updated.append(member)

if not perm.lower() in self.perm_users:
# no one has access
return False
for perm in data["perms"]:
binding = session.get(
GroupPermission, {"group_id": group_id, "name": perm}
)
if not binding:
binding = GroupPermission(group_id=group_id, name=perm)
session.add(binding)

binding.config = True
updated.append(binding)

session.commit()

for item in session.query(GroupMember).filter_by(config=True).all():
if item not in updated:
session.delete(item)

allowed_users = self.perm_users[perm.lower()]
for item in session.query(GroupPermission).filter_by(config=True).all():
if item not in updated:
session.delete(item)

for item in session.query(Group).filter_by(config=True).all():
if item not in updated:
session.delete(item)

session.commit()

def has_perm_mask(self, user_mask, perm, notice=True):
if backdoor and match_mask(user_mask.lower(), backdoor.lower()):
return True

for allowed_mask in allowed_users:
for allowed_mask in self.get_perm_users(perm):
if match_mask(user_mask.lower(), allowed_mask):
if notice:
logger.info(
Expand All @@ -95,101 +138,111 @@ def has_perm_mask(self, user_mask, perm, notice=True):
user_mask,
perm,
)

return True

return False

def get_perm_users(self, perm):
session = Session()
member_masks = (
session.query(GroupMember.mask)
.filter(
GroupMember.group_id.in_(
session.query(GroupPermission.group_id).filter(
GroupPermission.name == perm
)
)
)
.all()
)

return [item[0] for item in member_masks]

def get_groups(self):
return set().union(self.group_perms.keys(), self.group_users.keys())
return Session().query(Group).all()

def get_group_permissions(self, name):
group = self.get_group(name)
if not group:
return []

def get_group_permissions(self, group):
return self.group_perms.get(group.lower())
return [perm.name for perm in group.perms]

def get_group_users(self, group):
return self.group_users.get(group.lower())
def get_group_users(self, name):
group = self.get_group(name)
if not group:
return []

return [member.mask for member in group.members]

def get_user_permissions(self, user_mask):
permissions = set()
for permission, users in self.perm_users.items():
for mask_to_check in users:
if match_mask(user_mask.lower(), mask_to_check):
permissions.add(permission)
return permissions
return {
perm.name
for group in self.get_user_groups(user_mask)
for perm in group.perms
}

def get_user_groups(self, user_mask):
groups = []
for group, users in self.group_users.items():
for mask_to_check in users:
if match_mask(user_mask.lower(), mask_to_check):
groups.append(group)
continue
return groups
return [
group for group in self.get_groups() if group.is_member(user_mask)
]

def get_group(self, group_id):
return Session().get(Group, group_id)

def group_exists(self, group):
"""
Checks whether a group exists
"""
return group.lower() in self.group_perms
return self.get_group(group) is not None

def user_in_group(self, user_mask, group):
def user_in_group(self, user_mask, group_id):
"""
Checks whether a user is matched by any masks in a given group
"""
users = self.group_users.get(group.lower())
if not users:
group = self.get_group(group_id)
if not group:
return False
for mask_to_check in users:
if match_mask(user_mask.lower(), mask_to_check):
return True
return False

def remove_group_user(self, group, user_mask):
return group.is_member(user_mask)

def remove_group_user(self, group_id, user_mask):
"""
Removes all users that match user_mask from group. Returns a list of user masks removed from the group.
Use permission_manager.reload() to make this change take affect.
Use bot.config.save_config() to save this change to file.
"""
masks_removed = []
group = self.get_group(group_id)
if not group:
return []

config_groups = self.config.get("permissions", {})
masks_removed = []

for mask_to_check in list(self.group_users[group.lower()]):
session = Session()
for member in group.members:
mask_to_check = member.mask
if match_mask(user_mask.lower(), mask_to_check):
masks_removed.append(mask_to_check)
# We're going to act like the group keys are all lowercase.
# The user has been warned (above) if they aren't.
# Okay, maybe a warning, but no support.
if group not in config_groups:
logger.warning(
"[%s|permissions] Can't remove user from group due to"
" upper-case group names!",
self.name,
)
continue
config_group = config_groups.get(group)
config_users = config_group.get("users")
config_users.remove(mask_to_check)
session.delete(member)

Session().commit()

return masks_removed

def add_user_to_group(self, user_mask, group):
def add_user_to_group(self, user_mask, group_id):
"""
Adds user to group. Returns whether this actually did anything.
Use permission_manager.reload() to make this change take affect.
Use bot.config.save_config() to save this change to file.
"""
if self.user_in_group(user_mask, group):
if self.user_in_group(user_mask, group_id):
return False
# We're going to act like the group keys are all lowercase.
# The user has been warned (above) if they aren't.
groups = self.config.setdefault("permissions", {})
if group in groups:
group_dict = groups.get(group)
users = group_dict["users"]
users.append(user_mask)
else:
# create the group
group_dict = {"users": [user_mask], "perms": []}
groups[group] = group_dict

group = self.get_group(group_id)
session = Session()
if not group:
group = Group(name=group_id)
session.add(group)

group.members.append(GroupMember(mask=user_mask))

session.commit()

return True
2 changes: 1 addition & 1 deletion cloudbot/util/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@


def configure(bind: Engine = None) -> None:
metadata.bind = bind
metadata.bind = bind # type: ignore[assignment]
close_all_sessions()
Session.remove()
Session.configure(bind=bind)
Loading