Skip to content

Commit

Permalink
Merge pull request saltstack#65228 from dwoz/issue/master/65226
Browse files Browse the repository at this point in the history
[master] Fix cluster key rotation
  • Loading branch information
dwoz authored Dec 10, 2023
2 parents aa20365 + f582cb2 commit afdb17b
Show file tree
Hide file tree
Showing 17 changed files with 333 additions and 188 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ on:

env:
COLUMNS: 190
CACHE_SEED: SEED-1 # Bump the number to invalidate all caches
CACHE_SEED: SEED-2 # Bump the number to invalidate all caches
RELENV_DATA: "${{ github.workspace }}/.relenv"

permissions:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/nightly.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ on:

env:
COLUMNS: 190
CACHE_SEED: SEED-1 # Bump the number to invalidate all caches
CACHE_SEED: SEED-2 # Bump the number to invalidate all caches
RELENV_DATA: "${{ github.workspace }}/.relenv"

permissions:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ on:

env:
COLUMNS: 190
CACHE_SEED: SEED-1 # Bump the number to invalidate all caches
CACHE_SEED: SEED-2 # Bump the number to invalidate all caches
RELENV_DATA: "${{ github.workspace }}/.relenv"

permissions:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/scheduled.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ on:

env:
COLUMNS: 190
CACHE_SEED: SEED-1 # Bump the number to invalidate all caches
CACHE_SEED: SEED-2 # Bump the number to invalidate all caches
RELENV_DATA: "${{ github.workspace }}/.relenv"

permissions:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/staging.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ on:

env:
COLUMNS: 190
CACHE_SEED: SEED-1 # Bump the number to invalidate all caches
CACHE_SEED: SEED-2 # Bump the number to invalidate all caches
RELENV_DATA: "${{ github.workspace }}/.relenv"

permissions:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/templates/layout.yml.jinja
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ on:

env:
COLUMNS: 190
CACHE_SEED: SEED-1 # Bump the number to invalidate all caches
CACHE_SEED: SEED-2 # Bump the number to invalidate all caches
RELENV_DATA: "${{ github.workspace }}/.relenv"

<%- endblock env %>
Expand Down
2 changes: 0 additions & 2 deletions salt/channel/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -1045,9 +1045,7 @@ async def handle_pool_publish(self, payload, _):
"""
try:
tag, data = salt.utils.event.SaltEvent.unpack(payload)
log.error("recieved event from peer %s %r", tag, data)
if tag.startswith("cluster/peer"):
log.error("Got peer join %r", data)
peer = data["peer_id"]
aes = data["peers"][self.opts["id"]]["aes"]
sig = data["peers"][self.opts["id"]]["sig"]
Expand Down
28 changes: 16 additions & 12 deletions salt/crypt.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import random
import stat
import sys
import tempfile
import time
import traceback
import uuid
Expand Down Expand Up @@ -1620,19 +1621,22 @@ def generate_key_string(cls, key_size=192, **kwargs):
return b64key.replace("\n", "")

@classmethod
def read_or_generate_key(cls, path, key_size=192, remove=False):
if remove:
os.remove(path)
def write_key(cls, path, key_size=192):
directory = pathlib.Path(path).parent
with salt.utils.files.set_umask(0o177):
try:
with salt.utils.files.fopen(path, "r") as fp:
return fp.read()
except FileNotFoundError:
pass
key = cls.generate_key_string(key_size)
with salt.utils.files.fopen(path, "w") as fp:
fp.write(key)
return key
fd, tmp = tempfile.mkstemp(dir=directory, prefix="aes")
os.close(fd)
with salt.utils.files.fopen(tmp, "w") as fp:
fp.write(cls.generate_key_string(key_size))
os.rename(tmp, path)

@classmethod
def read_key(cls, path):
try:
with salt.utils.files.fopen(path, "r") as fp:
return fp.read()
except FileNotFoundError:
pass

@classmethod
def extract_keys(cls, key_string, key_size):
Expand Down
81 changes: 65 additions & 16 deletions salt/master.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import collections
import copy
import ctypes
import functools
import logging
import multiprocessing
import os
Expand Down Expand Up @@ -142,7 +141,6 @@ def get_serial(cls, opts=None, event=None):
def rotate_secrets(
cls, opts=None, event=None, use_lock=True, owner=False, publisher=None
):
log.info("Rotating master AES key")
if opts is None:
opts = {}

Expand Down Expand Up @@ -173,6 +171,41 @@ def rotate_secrets(
log.debug("Pinging all connected minions due to key rotation")
salt.utils.master.ping_all_connected_minions(opts)

@classmethod
def rotate_cluster_secret(
cls, opts=None, event=None, use_lock=True, owner=False, publisher=None
):
log.debug("Rotating cluster AES key")
if opts is None:
opts = {}

if use_lock:
with cls.secrets["cluster_aes"]["secret"].get_lock():
cls.secrets["cluster_aes"][
"secret"
].value = salt.utils.stringutils.to_bytes(
cls.secrets["cluster_aes"]["reload"](remove=owner)
)
else:
cls.secrets["cluster_aes"][
"secret"
].value = salt.utils.stringutils.to_bytes(
cls.secrets["cluster_aes"]["reload"](remove=owner)
)

if event:
event.fire_event(
{f"rotate_cluster_aes_key": True}, tag="rotate_cluster_aes_key"
)

if publisher:
publisher.send_aes_key_event()

if opts.get("ping_on_rotate"):
# Ping all minions to get them to pick up the new key
log.debug("Pinging all connected minions due to key rotation")
salt.utils.master.ping_all_connected_minions(opts)


class Maintenance(salt.utils.process.SignalHandlingProcess):
"""
Expand Down Expand Up @@ -358,7 +391,7 @@ def handle_key_rotate(self, now, drop_file_wait=5):

if to_rotate:
if self.opts.get("cluster_id", None):
SMaster.rotate_secrets(
SMaster.rotate_cluster_secret(
self.opts, self.event, owner=True, publisher=self.ipc_publisher
)
else:
Expand Down Expand Up @@ -714,6 +747,20 @@ def _pre_flight(self):
log.critical("Master failed pre flight checks, exiting\n")
sys.exit(salt.defaults.exitcodes.EX_GENERIC)

def read_or_generate_key(self, remove=False, fs_wait=0.1):
"""
Used to manage a cluster aes session key file.
"""
path = os.path.join(self.opts["cluster_pki_dir"], ".aes")
if remove:
os.remove(path)
key = salt.crypt.Crypticle.read_key(path)
if key:
return key
salt.crypt.Crypticle.write_key(path)
time.sleep(fs_wait)
return salt.crypt.Crypticle.read_key(path)

def start(self):
"""
Turn on the master server components
Expand All @@ -731,22 +778,18 @@ def start(self):
# signal handlers
with salt.utils.process.default_signals(signal.SIGINT, signal.SIGTERM):
if self.opts["cluster_id"]:
keypath = os.path.join(self.opts["cluster_pki_dir"], ".aes")
cluster_keygen = functools.partial(
salt.crypt.Crypticle.read_or_generate_key,
keypath,
)
# Setup the secrets here because the PubServerChannel may need
# them as well.
SMaster.secrets["cluster_aes"] = {
"secret": multiprocessing.Array(
ctypes.c_char, salt.utils.stringutils.to_bytes(cluster_keygen())
ctypes.c_char,
salt.utils.stringutils.to_bytes(self.read_or_generate_key()),
),
"serial": multiprocessing.Value(
ctypes.c_longlong,
lock=False, # We'll use the lock from 'secret'
),
"reload": cluster_keygen,
"reload": self.read_or_generate_key,
}

SMaster.secrets["aes"] = {
Expand Down Expand Up @@ -779,7 +822,7 @@ def start(self):
ipc_publisher.pre_fork(self.process_manager)
self.process_manager.add_process(
EventMonitor,
args=[self.opts],
args=[self.opts, ipc_publisher],
name="EventMonitor",
)

Expand Down Expand Up @@ -908,19 +951,19 @@ class EventMonitor(salt.utils.process.SignalHandlingProcess):
- Handle key rotate events.
"""

def __init__(self, opts, channels=None, name="EventMonitor"):
def __init__(self, opts, ipc_publisher, channels=None, name="EventMonitor"):
super().__init__(name=name)
self.opts = opts
if channels is None:
channels = []
self.channels = channels
self.ipc_publisher = ipc_publisher

async def handle_event(self, package):
"""
Event handler for publish forwarder
"""
tag, data = salt.utils.event.SaltEvent.unpack(package)
log.debug("Event monitor got event %s %r", tag, data)
if tag.startswith("salt/job") and tag.endswith("/publish"):
peer_id = data.pop("__peer_id", None)
if peer_id:
Expand All @@ -937,9 +980,15 @@ async def handle_event(self, package):
for chan in self.channels:
tasks.append(asyncio.create_task(chan.publish(data)))
await asyncio.gather(*tasks)
elif tag == "rotate_aes_key":
log.debug("Event monitor recieved rotate aes key event, rotating key.")
SMaster.rotate_secrets(self.opts, owner=False)
elif tag == "rotate_cluster_aes_key":
peer_id = data.pop("__peer_id", None)
if peer_id:
log.debug("Rotating AES session key")
SMaster.rotate_cluster_secret(
self.opts, owner=False, publisher=self.ipc_publisher
)
else:
log.trace("Ignore tag %s", tag)

def run(self):
io_loop = tornado.ioloop.IOLoop()
Expand Down
Empty file.
Loading

0 comments on commit afdb17b

Please sign in to comment.