Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

external access to kafka topics #13911

Draft
wants to merge 11 commits into
base: 2.4/dev
Choose a base branch
from
13 changes: 13 additions & 0 deletions salt/firewall/defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ firewall:
elasticsearch_rest: []
endgame: []
eval: []
external_kafka_data: []
external_suricata: []
fleet: []
heavynode: []
Expand Down Expand Up @@ -470,6 +471,9 @@ firewall:
endgame:
portgroups:
- endgame
external_kafka_data:
portgroups:
- kafka_data
external_suricata:
portgroups:
- external_suricata
Expand Down Expand Up @@ -665,6 +669,9 @@ firewall:
endgame:
portgroups:
- endgame
external_kafka_data:
portgroups:
- kafka_data
external_suricata:
portgroups:
- external_suricata
Expand Down Expand Up @@ -864,6 +871,9 @@ firewall:
endgame:
portgroups:
- endgame
external_kafka_data:
portgroups:
- kafka_data
external_suricata:
portgroups:
- external_suricata
Expand Down Expand Up @@ -1337,6 +1347,9 @@ firewall:
endgame:
portgroups:
- endgame
external_kafka_data:
portgroups:
- kafka_data
receiver:
portgroups: []
customhostgroup0:
Expand Down
57 changes: 37 additions & 20 deletions salt/firewall/map.jinja
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
{% from 'vars/globals.map.jinja' import GLOBALS %}
{% from 'docker/docker.map.jinja' import DOCKER %}
{% import_yaml 'firewall/defaults.yaml' as FIREWALL_DEFAULT %}
{% if GLOBALS.pipeline == 'KAFKA' %}
{% set KAFKA_LOGSTASH = salt['pillar.get']('kafka:logstash', []) %}
{% set KAFKA_CONTROLLERS = salt['pillar.get']('kafka:controllers', []) %}
{% endif %}

{# add our ip to self #}
{% do FIREWALL_DEFAULT.firewall.hostgroups.self.append(GLOBALS.node_ip) %}
Expand All @@ -19,27 +23,40 @@
{% endif %}

{# Only add Kafka firewall items when Kafka enabled #}
{% set role = GLOBALS.role.split('-')[1] %}

{% if GLOBALS.pipeline == 'KAFKA' and role in ['manager', 'managersearch', 'standalone'] %}
{% do FIREWALL_DEFAULT.firewall.role[role].chain["DOCKER-USER"].hostgroups[role].portgroups.append('kafka_controller') %}
{% do FIREWALL_DEFAULT.firewall.role[role].chain["DOCKER-USER"].hostgroups.receiver.portgroups.append('kafka_controller') %}
{% endif %}

{% if GLOBALS.pipeline == 'KAFKA' and role == 'receiver' %}
{% do FIREWALL_DEFAULT.firewall.role[role].chain["DOCKER-USER"].hostgroups.self.portgroups.append('kafka_controller') %}
{% do FIREWALL_DEFAULT.firewall.role[role].chain["DOCKER-USER"].hostgroups.standalone.portgroups.append('kafka_controller') %}
{% do FIREWALL_DEFAULT.firewall.role[role].chain["DOCKER-USER"].hostgroups.manager.portgroups.append('kafka_controller') %}
{% do FIREWALL_DEFAULT.firewall.role[role].chain["DOCKER-USER"].hostgroups.managersearch.portgroups.append('kafka_controller') %}
{% do FIREWALL_DEFAULT.firewall.role[role].chain["DOCKER-USER"].hostgroups.receiver.portgroups.append('kafka_controller') %}
{% endif %}

{% if GLOBALS.pipeline == 'KAFKA' and role in ['manager', 'managersearch', 'standalone', 'receiver'] %}
{% for r in ['manager', 'managersearch', 'standalone', 'receiver', 'fleet', 'idh', 'sensor', 'searchnode','heavynode', 'elastic_agent_endpoint', 'desktop'] %}
{% if FIREWALL_DEFAULT.firewall.role[role].chain["DOCKER-USER"].hostgroups[r] is defined %}
{% do FIREWALL_DEFAULT.firewall.role[role].chain["DOCKER-USER"].hostgroups[r].portgroups.append('kafka_data') %}
{% if GLOBALS.pipeline == 'KAFKA' %}
{% set role = GLOBALS.role.split('-')[1] %}
{% if role in ['manager', 'managersearch', 'standalone'] %}
{% do FIREWALL_DEFAULT.firewall.role[role].chain["DOCKER-USER"].hostgroups[role].portgroups.append('kafka_controller') %}
{% do FIREWALL_DEFAULT.firewall.role[role].chain["DOCKER-USER"].hostgroups.receiver.portgroups.append('kafka_controller') %}
{% endif %}
{% if role == 'receiver' %}
{% do FIREWALL_DEFAULT.firewall.role[role].chain["DOCKER-USER"].hostgroups.self.portgroups.append('kafka_controller') %}
{% do FIREWALL_DEFAULT.firewall.role[role].chain["DOCKER-USER"].hostgroups.standalone.portgroups.append('kafka_controller') %}
{% do FIREWALL_DEFAULT.firewall.role[role].chain["DOCKER-USER"].hostgroups.manager.portgroups.append('kafka_controller') %}
{% do FIREWALL_DEFAULT.firewall.role[role].chain["DOCKER-USER"].hostgroups.managersearch.portgroups.append('kafka_controller') %}
{% do FIREWALL_DEFAULT.firewall.role[role].chain["DOCKER-USER"].hostgroups.receiver.portgroups.append('kafka_controller') %}
{% endif %}
{% if role in ['manager', 'managersearch', 'standalone', 'receiver'] %}
{% for r in ['manager', 'managersearch', 'standalone', 'receiver', 'fleet', 'idh', 'sensor', 'searchnode','heavynode', 'elastic_agent_endpoint', 'desktop', 'self'] %}
{% if FIREWALL_DEFAULT.firewall.role[role].chain["DOCKER-USER"].hostgroups[r] is defined %}
{% do FIREWALL_DEFAULT.firewall.role[role].chain["DOCKER-USER"].hostgroups[r].portgroups.append('kafka_data') %}
{# Remove redis port #}
{% if 'redis' in FIREWALL_DEFAULT.firewall.role[role].chain["DOCKER-USER"].hostgroups[r].portgroups %}
{% do FIREWALL_DEFAULT.firewall.role[role].chain["DOCKER-USER"].hostgroups[r].portgroups.remove('redis') %}
{% endif %}
{# Check if logstash was manually enabled for this node before removing logstash port #}
{% if GLOBALS.hostname not in KAFKA_LOGSTASH %}
{% if 'elastic_agent_data' in FIREWALL_DEFAULT.firewall.role[role].chain["DOCKER-USER"].hostgroups[r].portgroups %}
{% do FIREWALL_DEFAULT.firewall.role[role].chain["DOCKER-USER"].hostgroups[r].portgroups.remove('elastic_agent_data') %}
{% endif %}
{% endif %}
{% endif %}
{% endfor %}
{# Remove external_kafka_data portgroup from any kafka node that isn't a broker #}
{% if GLOBALS.hostname in KAFKA_CONTROLLERS %}
{% do FIREWALL_DEFAULT.firewall.role[role].chain["DOCKER-USER"].hostgroups.pop('external_kafka_data') %}
{% endif %}
{% endfor %}
{% endif %}
{% endif %}

{% set FIREWALL_MERGED = salt['pillar.get']('firewall', FIREWALL_DEFAULT.firewall, merge=True) %}
1 change: 1 addition & 0 deletions salt/firewall/soc_firewall.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ firewall:
elasticsearch_rest: *hostgroupsettingsadv
endgame: *hostgroupsettingsadv
eval: *hostgroupsettings
external_kafka_data: *hostgroupsettingsadv
external_suricata: *hostgroupsettings
fleet: *hostgroupsettings
heavynode: *hostgroupsettings
Expand Down
5 changes: 3 additions & 2 deletions salt/kafka/config.map.jinja
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
{% set KAFKA_NODES_PILLAR = salt['pillar.get']('kafka:nodes') %}
{% set KAFKA_PASSWORD = salt['pillar.get']('kafka:config:password') %}
{% set KAFKA_TRUSTPASS = salt['pillar.get']('kafka:config:trustpass') %}
{% set DEFAULT_BROKER_LISTENER = 'BROKER://' + GLOBALS.node_ip + ':9092' %}

{# Create list of KRaft controllers #}
{% set controllers = [] %}
Expand All @@ -28,7 +29,7 @@
{# Generate server.properties for 'broker' , 'controller', 'broker,controller' node types
anything above this line is a configuration needed for ALL Kafka nodes #}
{% if node_type == 'broker' %}
{% do KAFKAMERGED.config.broker.update({'advertised_x_listeners': 'BROKER://'+ GLOBALS.node_ip +':9092' }) %}
{% do KAFKAMERGED.config.broker.update({'advertised_x_listeners': DEFAULT_BROKER_LISTENER }) %}
{% do KAFKAMERGED.config.broker.update({'controller_x_quorum_x_voters': kafka_controller_quorum_voters }) %}
{% do KAFKAMERGED.config.broker.update({'node_x_id': salt['pillar.get']('kafka:nodes:'+ GLOBALS.hostname +':nodeid') }) %}
{% do KAFKAMERGED.config.broker.update({'ssl_x_keystore_x_password': KAFKA_PASSWORD }) %}
Expand All @@ -50,7 +51,7 @@

{# Kafka nodes of this type are not recommended for use outside of development / testing. #}
{% if node_type == 'broker,controller' %}
{% do KAFKAMERGED.config.broker.update({'advertised_x_listeners': 'BROKER://'+ GLOBALS.node_ip +':9092' }) %}
{% do KAFKAMERGED.config.broker.update({'advertised_x_listeners': DEFAULT_BROKER_LISTENER }) %}
{% do KAFKAMERGED.config.broker.update({'controller_x_listener_x_names': KAFKAMERGED.config.controller.controller_x_listener_x_names }) %}
{% do KAFKAMERGED.config.broker.update({'controller_x_quorum_x_voters': kafka_controller_quorum_voters }) %}
{% do KAFKAMERGED.config.broker.update({'node_x_id': salt['pillar.get']('kafka:nodes:'+ GLOBALS.hostname +':nodeid') }) %}
Expand Down
26 changes: 26 additions & 0 deletions salt/kafka/config.sls
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
{% from 'allowed_states.map.jinja' import allowed_states %}
{% if sls.split('.')[0] in allowed_states %}
{% from 'vars/globals.map.jinja' import GLOBALS %}
{% set kafka_external_certs = salt['pillar.get']('kafka:config:external') %}

kafka_group:
group.present:
Expand Down Expand Up @@ -69,6 +70,31 @@ kafka_kraft_{{sc}}_properties:
- show_changes: False
{% endfor %}

{% if GLOBALS.is_manager and kafka_external_certs %}
{% for external, values in kafka_external_certs.items() %}
custom_cert_dir_{{ external }}:
file.directory:
- name: /opt/so/conf/kafka/{{ external }}
- user: 939
- group: 939
- makedirs: True

custom_cert_{{ external }}_properties:
file.managed:
- source: salt://kafka/etc/external.properties.jinja
- name: /opt/so/conf/kafka/{{ external }}/{{ values.name }}.properties
- template: jinja
- mode: 600
- user: 939
- group: 939
- makedirs: True
- show_changes: False
- defaults:
external: {{ external }}
values: {{ values }}
{% endfor %}
{% endif %}

reset_quorum_on_changes:
cmd.run:
- name: rm -f /nsm/kafka/data/__cluster_metadata-0/quorum-state
Expand Down
79 changes: 74 additions & 5 deletions salt/kafka/defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,8 @@ kafka:
password:
trustpass:
broker:
advertised_x_listeners:
auto_x_create_x_topics_x_enable: true
controller_x_quorum_x_voters:
controller_x_quorum_x_request_x_timeout_x_ms: 3000
default_x_replication_x_factor: 1
inter_x_broker_x_listener_x_name: BROKER
listeners: BROKER://0.0.0.0:9092
Expand Down Expand Up @@ -42,12 +41,11 @@ kafka:
ssl_x_truststore_x_location: /etc/pki/kafka-truststore.jks
ssl_x_truststore_x_type: JKS
ssl_x_truststore_x_password:
ssl_x_keystore_x_location: /etc/pki/kafka.p12
ssl_x_keystore_x_location: /etc/pki/kafka-client.p12
ssl_x_keystore_x_type: PKCS12
ssl_x_keystore_x_password:
controller:
controller_x_listener_x_names: CONTROLLER
controller_x_quorum_x_voters:
listeners: CONTROLLER://0.0.0.0:9093
listener_x_security_x_protocol_x_map: CONTROLLER:SSL
log_x_dirs: /nsm/kafka/data
Expand All @@ -61,4 +59,75 @@ kafka:
ssl_x_keystore_x_password:
ssl_x_truststore_x_location: /etc/pki/kafka-truststore.jks
ssl_x_truststore_x_type: JKS
ssl_x_truststore_x_password:
ssl_x_truststore_x_password:
external:
custom001:
enabled: false
name: custom001
password: default_placeholder
hostname: localhost
ip: 127.0.0.1
days_valid: 365
custom002:
enabled: false
name: custom002
password: default_placeholder
hostname: localhost
ip: 127.0.0.1
days_valid: 365
custom003:
enabled: false
name: custom003
password: default_placeholder
hostname: localhost
ip: 127.0.0.1
days_valid: 365
custom004:
enabled: false
name: custom004
password: default_placeholder
hostname: localhost
ip: 127.0.0.1
days_valid: 365
custom005:
enabled: false
name: custom005
password: default_placeholder
hostname: localhost
ip: 127.0.0.1
days_valid: 365
custom006:
enabled: false
name: custom006
password: default_placeholder
hostname: localhost
ip: 127.0.0.1
days_valid: 365
custom007:
enabled: false
name: custom007
password: default_placeholder
hostname: localhost
ip: 127.0.0.1
days_valid: 365
custom008:
enabled: false
name: custom008
password: default_placeholder
hostname: localhost
ip: 127.0.0.1
days_valid: 365
custom009:
enabled: false
name: custom009
password: default_placeholder
hostname: localhost
ip: 127.0.0.1
days_valid: 365
custom010:
enabled: false
name: custom010
password: default_placeholder
hostname: localhost
ip: 127.0.0.1
days_valid: 365
3 changes: 2 additions & 1 deletion salt/kafka/enabled.sls
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,12 @@ so-kafka:
{% endfor %}
- binds:
- /etc/pki/kafka.p12:/etc/pki/kafka.p12:ro
- /etc/pki/kafka-client.p12:/etc/pki/kafka-client.p12:ro
- /opt/so/conf/kafka/kafka-truststore.jks:/etc/pki/kafka-truststore.jks:ro
- /nsm/kafka/data/:/nsm/kafka/data/:rw
- /opt/so/log/kafka:/opt/kafka/logs/:rw
- /opt/so/conf/kafka/server.properties:/opt/kafka/config/kraft/server.properties:ro
- /opt/so/conf/kafka/client.properties:/opt/kafka/config/kraft/client.properties
- /opt/so/conf/kafka/client.properties:/opt/kafka/config/kraft/client.properties:ro
- watch:
{% for sc in ['server', 'client'] %}
- file: kafka_kraft_{{sc}}_properties
Expand Down
14 changes: 14 additions & 0 deletions salt/kafka/etc/external.properties.jinja
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{# Copyright Security Onion Solutions LLC and/or licensed to Security Onion Solutions LLC under one
or more contributor license agreements. Licensed under the Elastic License 2.0 as shown at
https://securityonion.net/license; you may not use this file except in compliance with the
Elastic License 2.0. #}

{%- from 'kafka/config.map.jinja' import KAFKACLIENT with context %}
{%- set custom_key_path = "/opt/so/conf/kafka/" ~ external ~ "/" ~ values.name ~ ".p12" %}
{%- set custom_trust_path = "/opt/so/conf/kafka/" ~ external ~ "/kafka-truststore.jks" -%}

{%- do KAFKACLIENT.update({'ssl_x_keystore_x_password': values.password }) %}
{%- do KAFKACLIENT.update({'ssl_x_keystore_x_location': custom_key_path }) %}
{%- do KAFKACLIENT.update({'ssl_x_truststore_x_location': custom_trust_path }) -%}

{{ KAFKACLIENT | yaml(False) | replace("_x_", ".") }}
Loading
Loading