Skip to content

Commit

Permalink
Merge pull request #14 from ctrliq/devel
Browse files Browse the repository at this point in the history
Sync to Upstream 23.3.1
  • Loading branch information
cigamit authored Nov 8, 2023
2 parents 704ae6d + 86ee6bf commit e64900b
Show file tree
Hide file tree
Showing 46 changed files with 476 additions and 457 deletions.
5 changes: 5 additions & 0 deletions .pip-tools.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
[tool.pip-tools]
resolver = "backtracking"
allow-unsafe = true
strip-extras = true
quiet = true
2 changes: 1 addition & 1 deletion awx/api/templates/instance_install_bundle/requirements.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
---
collections:
- name: ansible.receptor
version: 2.0.0
version: 2.0.2
19 changes: 10 additions & 9 deletions awx/main/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -694,16 +694,18 @@
category_slug='logging',
)
register(
'LOG_AGGREGATOR_MAX_DISK_USAGE_GB',
'LOG_AGGREGATOR_ACTION_QUEUE_SIZE',
field_class=fields.IntegerField,
default=1,
default=131072,
min_value=1,
label=_('Maximum disk persistence for external log aggregation (in GB)'),
label=_('Maximum number of messages that can be stored in the log action queue'),
help_text=_(
'Amount of data to store (in gigabytes) during an outage of '
'the external log aggregator (defaults to 1). '
'Equivalent to the rsyslogd queue.maxdiskspace setting for main_queue. '
'Notably, this is used for the rsyslogd main queue (for input messages).'
'Defines how large the rsyslog action queue can grow in number of messages '
'stored. This can have an impact on memory utilization. When the queue '
'reaches 75% of this number, the queue will start writing to disk '
'(queue.highWatermark in rsyslog). When it reaches 90%, NOTICE, INFO, and '
'DEBUG messages will start to be discarded (queue.discardMark with '
'queue.discardSeverity=5).'
),
category=_('Logging'),
category_slug='logging',
Expand All @@ -718,8 +720,7 @@
'Amount of data to store (in gigabytes) if an rsyslog action takes time '
'to process an incoming message (defaults to 1). '
'Equivalent to the rsyslogd queue.maxdiskspace setting on the action (e.g. omhttp). '
'Like LOG_AGGREGATOR_MAX_DISK_USAGE_GB, it stores files in the directory specified '
'by LOG_AGGREGATOR_MAX_DISK_USAGE_PATH.'
'It stores files in the directory specified by LOG_AGGREGATOR_MAX_DISK_USAGE_PATH.'
),
category=_('Logging'),
category_slug='logging',
Expand Down
41 changes: 29 additions & 12 deletions awx/main/credential_plugins/dsv.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,28 @@

from django.conf import settings
from django.utils.translation import gettext_lazy as _
from thycotic.secrets.vault import SecretsVault

from delinea.secrets.vault import PasswordGrantAuthorizer, SecretsVault

dsv_inputs = {
'fields': [
{
'id': 'tenant',
'label': _('Tenant'),
'help_text': _('The tenant e.g. "ex" when the URL is https://ex.secretservercloud.com'),
'help_text': _('The tenant e.g. "ex" when the URL is https://ex.secretsvaultcloud.com'),
'type': 'string',
},
{
'id': 'tld',
'label': _('Top-level Domain (TLD)'),
'help_text': _('The TLD of the tenant e.g. "com" when the URL is https://ex.secretservercloud.com'),
'choices': ['ca', 'com', 'com.au', 'com.sg', 'eu'],
'help_text': _('The TLD of the tenant e.g. "com" when the URL is https://ex.secretsvaultcloud.com'),
'choices': ['ca', 'com', 'com.au', 'eu'],
'default': 'com',
},
{'id': 'client_id', 'label': _('Client ID'), 'type': 'string'},
{
'id': 'client_id',
'label': _('Client ID'),
'type': 'string',
},
{
'id': 'client_secret',
'label': _('Client Secret'),
Expand Down Expand Up @@ -51,12 +54,26 @@
'id': 'url_template',
'label': _('URL template'),
'type': 'string',
'default': 'https://{}.secretsvaultcloud.{}/v1',
'default': 'https://{}.secretsvaultcloud.{}',
}
)

dsv_plugin = CredentialPlugin(
'Thycotic DevOps Secrets Vault',
dsv_inputs,
lambda **kwargs: SecretsVault(**{k: v for (k, v) in kwargs.items() if k in [field['id'] for field in dsv_inputs['fields']]}).get_secret(kwargs['path'])['data'][kwargs['secret_field']], # fmt: skip
)

def dsv_backend(**kwargs):
tenant_name = kwargs['tenant']
tenant_tld = kwargs.get('tld', 'com')
tenant_url_template = kwargs.get('url_template', 'https://{}.secretsvaultcloud.{}')
client_id = kwargs['client_id']
client_secret = kwargs['client_secret']
secret_path = kwargs['path']
secret_field = kwargs['secret_field']

tenant_url = tenant_url_template.format(tenant_name, tenant_tld.strip("."))

authorizer = PasswordGrantAuthorizer(tenant_url, client_id, client_secret)
dsv_secret = SecretsVault(tenant_url, authorizer).get_secret(secret_path)

return dsv_secret['data'][secret_field]


dsv_plugin = CredentialPlugin(name='Thycotic DevOps Secrets Vault', inputs=dsv_inputs, backend=dsv_backend)
7 changes: 5 additions & 2 deletions awx/main/dispatch/control.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,11 @@ def status(self, *args, **kwargs):
def running(self, *args, **kwargs):
return self.control_with_reply('running', *args, **kwargs)

def cancel(self, task_ids, *args, **kwargs):
return self.control_with_reply('cancel', *args, extra_data={'task_ids': task_ids}, **kwargs)
def cancel(self, task_ids, with_reply=True):
if with_reply:
return self.control_with_reply('cancel', extra_data={'task_ids': task_ids})
else:
self.control({'control': 'cancel', 'task_ids': task_ids, 'reply_to': None}, extra_data={'task_ids': task_ids})

def schedule(self, *args, **kwargs):
return self.control_with_reply('schedule', *args, **kwargs)
Expand Down
5 changes: 3 additions & 2 deletions awx/main/dispatch/worker/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,9 @@ def control(self, body):
if task_ids and not msg:
logger.info(f'Could not locate running tasks to cancel with ids={task_ids}')

with pg_bus_conn() as conn:
conn.notify(reply_queue, json.dumps(msg))
if reply_queue is not None:
with pg_bus_conn() as conn:
conn.notify(reply_queue, json.dumps(msg))
elif control == 'reload':
for worker in self.pool.workers:
worker.quit()
Expand Down
6 changes: 5 additions & 1 deletion awx/main/management/commands/cleanup_activitystream.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ class Command(BaseCommand):
def add_arguments(self, parser):
parser.add_argument('--days', dest='days', type=int, default=90, metavar='N', help='Remove activity stream events more than N days old')
parser.add_argument('--dry-run', dest='dry_run', action='store_true', default=False, help='Dry run mode (show items that would be removed)')
parser.add_argument(
'--batch-size', dest='batch_size', type=int, default=500, metavar='X', help='Remove activity stream events in batch of X events. Defaults to 500.'
)

def init_logging(self):
log_levels = dict(enumerate([logging.ERROR, logging.INFO, logging.DEBUG, 0]))
Expand All @@ -48,7 +51,7 @@ def cleanup_activitystream(self):
else:
pks_to_delete.add(asobj.pk)
# Cleanup objects in batches instead of deleting each one individually.
if len(pks_to_delete) >= 500:
if len(pks_to_delete) >= self.batch_size:
ActivityStream.objects.filter(pk__in=pks_to_delete).delete()
n_deleted_items += len(pks_to_delete)
pks_to_delete.clear()
Expand All @@ -63,4 +66,5 @@ def handle(self, *args, **options):
self.days = int(options.get('days', 30))
self.cutoff = now() - datetime.timedelta(days=self.days)
self.dry_run = bool(options.get('dry_run', False))
self.batch_size = int(options.get('batch_size', 500))
self.cleanup_activitystream()
120 changes: 77 additions & 43 deletions awx/main/management/commands/cleanup_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@


# Django
from django.apps import apps
from django.core.management.base import BaseCommand, CommandError
from django.db import transaction, connection
from django.db.models import Min, Max
Expand Down Expand Up @@ -150,6 +151,9 @@ class Command(BaseCommand):
def add_arguments(self, parser):
parser.add_argument('--days', dest='days', type=int, default=90, metavar='N', help='Remove jobs/updates executed more than N days ago. Defaults to 90.')
parser.add_argument('--dry-run', dest='dry_run', action='store_true', default=False, help='Dry run mode (show items that would be removed)')
parser.add_argument(
'--batch-size', dest='batch_size', type=int, default=100000, metavar='X', help='Remove jobs in batch of X jobs. Defaults to 100000.'
)
parser.add_argument('--jobs', dest='only_jobs', action='store_true', default=False, help='Remove jobs')
parser.add_argument('--ad-hoc-commands', dest='only_ad_hoc_commands', action='store_true', default=False, help='Remove ad hoc commands')
parser.add_argument('--project-updates', dest='only_project_updates', action='store_true', default=False, help='Remove project updates')
Expand Down Expand Up @@ -195,39 +199,58 @@ def cleanup_workflow_jobs_partition(self):
delete_meta.delete_jobs()
return (delete_meta.jobs_no_delete_count, delete_meta.jobs_to_delete_count)

def _handle_unpartitioned_events(self, model, pk_list):
"""
If unpartitioned job events remain, it will cascade those from jobs in pk_list
if the unpartitioned table is no longer necessary, it will drop the table
"""
def has_unpartitioned_table(self, model):
tblname = unified_job_class_to_event_table_name(model)
rel_name = model().event_parent_key
with connection.cursor() as cursor:
cursor.execute(f"SELECT 1 FROM pg_tables WHERE tablename = '_unpartitioned_{tblname}';")
row = cursor.fetchone()
if row is None:
self.logger.debug(f'Unpartitioned table for {rel_name} does not exist, you are fully migrated')
return
if pk_list:
with connection.cursor() as cursor:
pk_list_csv = ','.join(map(str, pk_list))
cursor.execute(f"DELETE FROM _unpartitioned_{tblname} WHERE {rel_name} IN ({pk_list_csv})")
return False
return True

def _delete_unpartitioned_table(self, model):
"If the unpartitioned table is no longer necessary, it will drop the table"
tblname = unified_job_class_to_event_table_name(model)
if not self.has_unpartitioned_table(model):
self.logger.debug(f'Table _unpartitioned_{tblname} does not exist, you are fully migrated.')
return

with connection.cursor() as cursor:
# same as UnpartitionedJobEvent.objects.aggregate(Max('created'))
cursor.execute(f'SELECT MAX("_unpartitioned_{tblname}"."created") FROM "_unpartitioned_{tblname}"')
cursor.execute(f'SELECT MAX("_unpartitioned_{tblname}"."created") FROM "_unpartitioned_{tblname}";')
row = cursor.fetchone()
last_created = row[0]
if last_created:
self.logger.info(f'Last event created in _unpartitioned_{tblname} was {last_created.isoformat()}')
else:
self.logger.info(f'Table _unpartitioned_{tblname} has no events in it')
if (last_created is None) or (last_created < self.cutoff):
self.logger.warning(f'Dropping table _unpartitioned_{tblname} since no records are newer than {self.cutoff}')
cursor.execute(f'DROP TABLE _unpartitioned_{tblname}')

def cleanup_jobs(self):
batch_size = 100000
if last_created:
self.logger.info(f'Last event created in _unpartitioned_{tblname} was {last_created.isoformat()}')
else:
self.logger.info(f'Table _unpartitioned_{tblname} has no events in it')

if (last_created is None) or (last_created < self.cutoff):
self.logger.warning(
f'Dropping table _unpartitioned_{tblname} since no records are newer than {self.cutoff}\n'
'WARNING - this will happen in a separate transaction so a failure will not roll back prior cleanup'
)
with connection.cursor() as cursor:
cursor.execute(f'DROP TABLE _unpartitioned_{tblname};')

def _delete_unpartitioned_events(self, model, pk_list):
"If unpartitioned job events remain, it will cascade those from jobs in pk_list"
tblname = unified_job_class_to_event_table_name(model)
rel_name = model().event_parent_key

# Bail if the unpartitioned table does not exist anymore
if not self.has_unpartitioned_table(model):
return

# Table still exists, delete individual unpartitioned events
if pk_list:
with connection.cursor() as cursor:
self.logger.debug(f'Deleting {len(pk_list)} events from _unpartitioned_{tblname}, use a longer cleanup window to delete the table.')
pk_list_csv = ','.join(map(str, pk_list))
cursor.execute(f"DELETE FROM _unpartitioned_{tblname} WHERE {rel_name} IN ({pk_list_csv});")

def cleanup_jobs(self):
# Hack to avoid doing N+1 queries as each item in the Job query set does
# an individual query to get the underlying UnifiedJob.
Job.polymorphic_super_sub_accessors_replaced = True
Expand All @@ -242,13 +265,14 @@ def cleanup_jobs(self):
deleted = 0
info = qs.aggregate(min=Min('id'), max=Max('id'))
if info['min'] is not None:
for start in range(info['min'], info['max'] + 1, batch_size):
qs_batch = qs.filter(id__gte=start, id__lte=start + batch_size)
for start in range(info['min'], info['max'] + 1, self.batch_size):
qs_batch = qs.filter(id__gte=start, id__lte=start + self.batch_size)
pk_list = qs_batch.values_list('id', flat=True)

_, results = qs_batch.delete()
deleted += results['main.Job']
self._handle_unpartitioned_events(Job, pk_list)
# Avoid dropping the job event table in case we have interacted with it already
self._delete_unpartitioned_events(Job, pk_list)

return skipped, deleted

Expand All @@ -271,7 +295,7 @@ def cleanup_ad_hoc_commands(self):
deleted += 1

if not self.dry_run:
self._handle_unpartitioned_events(AdHocCommand, pk_list)
self._delete_unpartitioned_events(AdHocCommand, pk_list)

skipped += AdHocCommand.objects.filter(created__gte=self.cutoff).count()
return skipped, deleted
Expand Down Expand Up @@ -299,7 +323,7 @@ def cleanup_project_updates(self):
deleted += 1

if not self.dry_run:
self._handle_unpartitioned_events(ProjectUpdate, pk_list)
self._delete_unpartitioned_events(ProjectUpdate, pk_list)

skipped += ProjectUpdate.objects.filter(created__gte=self.cutoff).count()
return skipped, deleted
Expand Down Expand Up @@ -327,7 +351,7 @@ def cleanup_inventory_updates(self):
deleted += 1

if not self.dry_run:
self._handle_unpartitioned_events(InventoryUpdate, pk_list)
self._delete_unpartitioned_events(InventoryUpdate, pk_list)

skipped += InventoryUpdate.objects.filter(created__gte=self.cutoff).count()
return skipped, deleted
Expand All @@ -351,7 +375,7 @@ def cleanup_management_jobs(self):
deleted += 1

if not self.dry_run:
self._handle_unpartitioned_events(SystemJob, pk_list)
self._delete_unpartitioned_events(SystemJob, pk_list)

skipped += SystemJob.objects.filter(created__gte=self.cutoff).count()
return skipped, deleted
Expand Down Expand Up @@ -396,12 +420,12 @@ def cleanup_notifications(self):
skipped += Notification.objects.filter(created__gte=self.cutoff).count()
return skipped, deleted

@transaction.atomic
def handle(self, *args, **options):
self.verbosity = int(options.get('verbosity', 1))
self.init_logging()
self.days = int(options.get('days', 90))
self.dry_run = bool(options.get('dry_run', False))
self.batch_size = int(options.get('batch_size', 100000))
try:
self.cutoff = now() - datetime.timedelta(days=self.days)
except OverflowError:
Expand All @@ -423,19 +447,29 @@ def handle(self, *args, **options):
del s.receivers[:]
s.sender_receivers_cache.clear()

for m in model_names:
if m not in models_to_cleanup:
continue
with transaction.atomic():
for m in models_to_cleanup:
skipped, deleted = getattr(self, 'cleanup_%s' % m)()

skipped, deleted = getattr(self, 'cleanup_%s' % m)()
func = getattr(self, 'cleanup_%s_partition' % m, None)
if func:
skipped_partition, deleted_partition = func()
skipped += skipped_partition
deleted += deleted_partition

func = getattr(self, 'cleanup_%s_partition' % m, None)
if func:
skipped_partition, deleted_partition = func()
skipped += skipped_partition
deleted += deleted_partition
if self.dry_run:
self.logger.log(99, '%s: %d would be deleted, %d would be skipped.', m.replace('_', ' '), deleted, skipped)
else:
self.logger.log(99, '%s: %d deleted, %d skipped.', m.replace('_', ' '), deleted, skipped)

if self.dry_run:
self.logger.log(99, '%s: %d would be deleted, %d would be skipped.', m.replace('_', ' '), deleted, skipped)
else:
self.logger.log(99, '%s: %d deleted, %d skipped.', m.replace('_', ' '), deleted, skipped)
# Deleting unpartitioned tables cannot be done in same transaction as updates to related tables
if not self.dry_run:
with transaction.atomic():
for m in models_to_cleanup:
unified_job_class_name = m[:-1].title().replace('Management', 'System').replace('_', '')
unified_job_class = apps.get_model('main', unified_job_class_name)
try:
unified_job_class().event_class
except (NotImplementedError, AttributeError):
continue # no need to run this for models without events
self._delete_unpartitioned_table(unified_job_class)
5 changes: 5 additions & 0 deletions awx/main/models/unified_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -1439,6 +1439,11 @@ def cancel_dispatcher_process(self):
if not self.celery_task_id:
return
canceled = []
if not connection.get_autocommit():
# this condition is purpose-written for the task manager, when it cancels jobs in workflows
ControlDispatcher('dispatcher', self.controller_node).cancel([self.celery_task_id], with_reply=False)
return True # task manager itself needs to act under assumption that cancel was received

try:
# Use control and reply mechanism to cancel and obtain confirmation
timeout = 5
Expand Down
Loading

0 comments on commit e64900b

Please sign in to comment.