Skip to content

Commit

Permalink
issue-23
Browse files Browse the repository at this point in the history
  • Loading branch information
mabulgu committed Jul 4, 2020
1 parent 3f21822 commit 837580d
Show file tree
Hide file tree
Showing 24 changed files with 174 additions and 76 deletions.
21 changes: 17 additions & 4 deletions kfk/commons.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,18 @@ def delete_last_applied_configuration(resource_dict):
def add_resource_kv_config(config, dict_part):
if type(config) is tuple:
for config_str in config:
# TODO: exception here
config_arr = config_str.split('=')
config_arr = get_kv_config_arr(config_str)
dict_part[config_arr[0]] = convert_string_to_type(config_arr[1])
else:
# TODO: exception here
config_arr = config.split('=')
config_arr = get_kv_config_arr(config)
dict_part[config_arr[0]] = convert_string_to_type(config_arr[1])


def get_kv_config_arr(config_str):
# TODO: exception here
return config_str.split('=')


def delete_resource_config(config, dict_part):
if type(config) is tuple:
for config_str in config:
Expand Down Expand Up @@ -64,3 +67,13 @@ def create_temp_file(content):
temp_file.write(content)
temp_file.flush()
return temp_file


def transfer_file_to_container(source_file_path, dest_file_path, container, pod, namespace):
os.system(Kubectl().cp(source_file_path, "{namespace}/{pod}:" + dest_file_path).container(container).build().format(
namespace=namespace, pod=pod))


class SafeDict(dict):
def __missing__(self, key):
return '{' + key + '}'
21 changes: 21 additions & 0 deletions kfk/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import platform
import sys
import os

from pathlib import Path

BASE_FOLDER = ".strimzi-kafka-cli"
BASE_PATH = (str(Path.home()) + "/" + BASE_FOLDER) if os.environ.get(
'STRIMZI_KAFKA_CLI_BASE_PATH') is None else os.environ.get('STRIMZI_KAFKA_CLI_BASE_PATH')
STRIMZI_VERSION = "0.18.0" if os.environ.get('STRIMZI_KAFKA_CLI_STRIMZI_VERSION') is None else os.environ.get('STRIMZI_KAFKA_CLI_STRIMZI_VERSION')
STRIMZI_PATH = (BASE_PATH + "/strimzi-{version}".format(version=STRIMZI_VERSION)) if os.environ.get(
'STRIMZI_KAFKA_CLI_STRIMZI_PATH') is None else os.environ.get('STRIMZI_KAFKA_CLI_STRIMZI_PATH')
STRIMZI_RELEASE_URL = "https://github.com/strimzi/strimzi-kafka-operator/releases/download/{version}/strimzi-{version}.tar.gz".format(
version=STRIMZI_VERSION)
KUBECTL_VERSION = "v1.18.0" if os.environ.get('STRIMZI_KAFKA_CLI_KUBECTL_VERSION') is None else os.environ.get('STRIMZI_KAFKA_CLI_KUBECTL_VERSION')
KUBECTL = "kubectl" if platform.system().lower() != "windows" else "kubectl.exe"
KUBECTL_PATH = (BASE_PATH + "/" + KUBECTL) if os.environ.get(
'STRIMZI_KAFKA_CLI_KUBECTL_PATH') is None else os.environ.get('STRIMZI_KAFKA_CLI_KUBECTL_PATH')
PROCESSOR_TYPE = "amd64" if sys.maxsize > 2 ** 32 else "386"
KUBECTL_RELEASE_URL = "https://storage.googleapis.com/kubernetes-release/release/{version}/bin/{operating_system}/{processor_type}/{kubectl}".format(
version=KUBECTL_VERSION, operating_system=platform.system().lower(), processor_type=PROCESSOR_TYPE, kubectl=KUBECTL)
58 changes: 48 additions & 10 deletions kfk/console_command.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,70 @@
import click
import os
import ntpath

from kfk.command import kfk
from kfk.kubectl_command_builder import Kubectl
from kfk.config import *
from kfk.commons import get_kv_config_arr, transfer_file_to_container, SafeDict
from kfk.constants import *


@click.option('-n', '--namespace', help='Namespace to use', required=True)
@click.option('-c', '--cluster', help='Cluster to use', required=True)
@click.option('--topic', help='Topic Name', required=True)
@click.option('--from-beginning', help='Consumes messages from beginning', is_flag=True)
@click.option('--consumer.config', 'consumer_config', help='Consumer config properties file.')
@click.option('--topic', help='Topic Name', required=True)
@kfk.command()
def console_consumer(topic, cluster, from_beginning, namespace):
def console_consumer(topic, consumer_config, from_beginning, cluster, namespace):
"""The console consumer is a tool that reads data from Kafka and outputs it to standard output."""
native_command = "bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic {" \
native_command = "bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:{port} --topic {" \
"topic} {from_beginning}"
pod = cluster + "-kafka-0"
container = "kafka"
if consumer_config is not None:
native_command = apply_client_config_from_file(native_command, consumer_config, "--consumer-property",
container, pod, namespace)
print(native_command)
os.system(
Kubectl().exec("-it", "{cluster}-kafka-0").container("kafka").namespace(namespace).exec_command(
native_command).build().format(cluster=cluster, topic=topic,
Kubectl().exec("-it", pod).container(container).namespace(namespace).exec_command(
native_command).build().format(port=KAFKA_PORT, topic=topic,
from_beginning=(from_beginning and '--from-beginning' or '')))


@click.option('-n', '--namespace', help='Namespace to use', required=True)
@click.option('-c', '--cluster', help='Cluster to use', required=True)
@click.option('--producer.config', 'producer_config', help='Producer config properties file.')
@click.option('--topic', help='Topic Name', required=True)
@kfk.command()
def console_producer(topic, cluster, namespace):
def console_producer(topic, producer_config, cluster, namespace):
"""The console producer is a tool that reads data from standard input and publish it to Kafka."""
native_command = "bin/kafka-console-producer.sh --broker-list my-cluster-kafka-brokers:9092 --topic {topic}"
native_command = "bin/kafka-console-producer.sh --broker-list my-cluster-kafka-brokers:{port} --topic {topic}"
pod = cluster + "-kafka-0"
container = "kafka"
if producer_config is not None:
native_command = apply_client_config_from_file(native_command, producer_config, "--producer-property",
container, pod, namespace)
os.system(
Kubectl().exec("-it", "{cluster}-kafka-0").container("kafka").namespace(namespace).exec_command(
native_command).build().format(cluster=cluster, topic=topic))
Kubectl().exec("-it", pod).container(container).namespace(namespace).exec_command(
native_command).build().format(port=KAFKA_PORT, topic=topic))


def apply_client_config_from_file(native_command, config_file_path, property_flag, container, pod, namespace):
port = KAFKA_PORT
delete_file_command = ""
with open(config_file_path) as file:
for cnt, producer_property in enumerate(file):
producer_property = producer_property.strip()
if "security.protocol" in producer_property:
producer_property_arr = get_kv_config_arr(producer_property)
if producer_property_arr[1] == KAFKA_SSL:
port = KAFKA_SECURE_PORT
if "ssl.truststore.location" in producer_property or "ssl.keystore.location" in producer_property:
producer_property_arr = get_kv_config_arr(producer_property)
file_path = producer_property_arr[1]
file_name = ntpath.basename(file_path)
new_file_path = "/tmp/" + file_name
transfer_file_to_container(file_path, new_file_path, container, pod, namespace)
producer_property = producer_property_arr[0] + "=" + new_file_path
delete_file_command = delete_file_command + "rm -rf" + SPACE + new_file_path + SEMICOLON
native_command = native_command + SPACE + property_flag + SPACE + producer_property
return native_command.format_map(SafeDict(port=port)) + SEMICOLON + delete_file_command
26 changes: 5 additions & 21 deletions kfk/constants.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,6 @@
import platform
import sys
import os

from pathlib import Path

BASE_FOLDER = ".strimzi-kafka-cli"
BASE_PATH = (str(Path.home()) + "/" + BASE_FOLDER) if os.environ.get(
'STRIMZI_KAFKA_CLI_BASE_PATH') is None else os.environ.get('STRIMZI_KAFKA_CLI_BASE_PATH')
STRIMZI_VERSION = "0.18.0" if os.environ.get('STRIMZI_KAFKA_CLI_STRIMZI_VERSION') is None else os.environ.get('STRIMZI_KAFKA_CLI_STRIMZI_VERSION')
STRIMZI_PATH = (BASE_PATH + "/strimzi-{version}".format(version=STRIMZI_VERSION)) if os.environ.get(
'STRIMZI_KAFKA_CLI_STRIMZI_PATH') is None else os.environ.get('STRIMZI_KAFKA_CLI_STRIMZI_PATH')
STRIMZI_RELEASE_URL = "https://github.com/strimzi/strimzi-kafka-operator/releases/download/{version}/strimzi-{version}.tar.gz".format(
version=STRIMZI_VERSION)
KUBECTL_VERSION = "v1.18.0" if os.environ.get('STRIMZI_KAFKA_CLI_KUBECTL_VERSION') is None else os.environ.get('STRIMZI_KAFKA_CLI_KUBECTL_VERSION')
KUBECTL = "kubectl" if platform.system().lower() != "windows" else "kubectl.exe"
KUBECTL_PATH = (BASE_PATH + "/" + KUBECTL) if os.environ.get(
'STRIMZI_KAFKA_CLI_KUBECTL_PATH') is None else os.environ.get('STRIMZI_KAFKA_CLI_KUBECTL_PATH')
PROCESSOR_TYPE = "amd64" if sys.maxsize > 2 ** 32 else "386"
KUBECTL_RELEASE_URL = "https://storage.googleapis.com/kubernetes-release/release/{version}/bin/{operating_system}/{processor_type}/{kubectl}".format(
version=KUBECTL_VERSION, operating_system=platform.system().lower(), processor_type=PROCESSOR_TYPE, kubectl=KUBECTL)
SPACE = " "
SEMICOLON = ";"
AMPERSAND = "&"
KAFKA_PORT = "9092"
KAFKA_SECURE_PORT = "9093"
KAFKA_SSL = "SSL"
9 changes: 7 additions & 2 deletions kfk/kubectl_command_builder.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from kfk.constants import SPACE, KUBECTL_PATH
from kfk.config import KUBECTL_PATH
from kfk.constants import SPACE


class Kubectl:
Expand Down Expand Up @@ -31,7 +32,11 @@ def exec(self, flag, pod_name):
return self

def exec_command(self, command):
self.cmd_str = self.cmd_str + SPACE + "--" + SPACE + command
self.cmd_str = self.cmd_str + SPACE + "--" + SPACE + "bash -c" + SPACE + "\"" + command + "\""
return self

def cp(self, source_path, destination_path):
self.cmd_str = self.cmd_str + SPACE + "cp" + SPACE + source_path + SPACE + destination_path
return self

def kafkas(self, *vals):
Expand Down
2 changes: 1 addition & 1 deletion kfk/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import tarfile
import ssl

from kfk.constants import *
from kfk.config import *
from pathlib import Path

ssl._create_default_https_context = ssl._create_unverified_context
Expand Down
4 changes: 2 additions & 2 deletions kfk/topics_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from kfk.option_extensions import NotRequiredIf, RequiredIf
from kfk.commons import print_missing_options_for_command, delete_last_applied_configuration, resource_exists, \
get_resource_as_file, add_resource_kv_config, delete_resource_config, create_temp_file, print_resource_found_msg
from kfk.constants import *
from kfk.config import *
from kfk.kubectl_command_builder import Kubectl


Expand Down Expand Up @@ -40,7 +40,7 @@ def topics(topic, list, create, partitions, replication_factor, describe, output
Kubectl().get().kafkatopics().label("strimzi.io/cluster={cluster}").namespace(namespace).build().format(
cluster=cluster))
elif create:
with open(r'{strimzi_path}/examples/topic/kafka-topic.yaml'.format(strimzi_path=STRIMZI_PATH).format(
with open('{strimzi_path}/examples/topic/kafka-topic.yaml'.format(strimzi_path=STRIMZI_PATH).format(
version=STRIMZI_VERSION)) as file:
topic_dict = yaml.full_load(file)

Expand Down
4 changes: 2 additions & 2 deletions kfk/users_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from kfk.commons import print_missing_options_for_command, resource_exists, get_resource_as_file, \
delete_last_applied_configuration, add_resource_kv_config, delete_resource_config, create_temp_file, \
print_resource_found_msg
from kfk.constants import *
from kfk.config import *
from kfk.kubectl_command_builder import Kubectl


Expand Down Expand Up @@ -64,7 +64,7 @@ def list_option(cluster, namespace):


def create_option(user, authentication_type, quota, cluster, namespace):
with open(r'{strimzi_path}/examples/user/kafka-user.yaml'.format(strimzi_path=STRIMZI_PATH).format(
with open('{strimzi_path}/examples/user/kafka-user.yaml'.format(strimzi_path=STRIMZI_PATH).format(
version=STRIMZI_VERSION)) as file:
user_dict = yaml.full_load(file)

Expand Down
2 changes: 1 addition & 1 deletion kfk/version_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import pkg_resources

from kfk.command import kfk
from kfk.constants import KUBECTL_VERSION, STRIMZI_VERSION
from kfk.config import KUBECTL_VERSION, STRIMZI_VERSION


@kfk.command()
Expand Down
5 changes: 5 additions & 0 deletions tests/files/client.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
security.protocol=SSL
ssl.truststore.location=~/Desktop/truststore.jks
ssl.truststore.password=123456
ssl.keystore.location=~/Desktop/user.p12
ssl.keystore.password=123456
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
40 changes: 36 additions & 4 deletions tests/test_console_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,32 @@ def setUp(self):

@mock.patch('kfk.console_command.os')
def test_console_consumer(self, mock_os):
from_beginning = False
result = self.runner.invoke(kfk, ['console-consumer', '--topic', self.topic, '-c', self.cluster, '-n',
self.namespace])
assert result.exit_code == 0

native_command = "bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic {" \
"topic} {from_beginning}"
"topic} "
mock_os.system.assert_called_with(
Kubectl().exec("-it", "{cluster}-kafka-0").container("kafka").namespace(self.namespace).exec_command(
native_command).build().format(cluster=self.cluster, topic=self.topic,
from_beginning=(from_beginning and '--from-beginning' or '')))
native_command).build().format(cluster=self.cluster, topic=self.topic))

@mock.patch('kfk.console_command.transfer_file_to_container')
@mock.patch('kfk.console_command.os')
def test_console_consumer_with_consumer_config(self, mock_os, mock_transfer_file_to_container):
result = self.runner.invoke(kfk, ['console-consumer', '--topic', self.topic, '--consumer.config',
'files/client.properties', '-c', self.cluster, '-n', self.namespace])
assert result.exit_code == 0

native_command = "bin/kafka-console-consumer.sh --bootstrap-server {cluster}-kafka-bootstrap:9093 --topic {" \
"topic} --consumer-property security.protocol=SSL --consumer-property " \
"ssl.truststore.location=/tmp/truststore.jks --consumer-property " \
"ssl.truststore.password=123456 --consumer-property ssl.keystore.location=/tmp/user.p12 " \
"--consumer-property ssl.keystore.password=123456;rm -rf /tmp/truststore.jks;rm -rf " \
"/tmp/user.p12;"
mock_os.system.assert_called_with(
Kubectl().exec("-it", "{cluster}-kafka-0").container("kafka").namespace(self.namespace).exec_command(
native_command).build().format(cluster=self.cluster, topic=self.topic))

@mock.patch('kfk.console_command.os')
def test_console_consumer_with_from_beginning(self, mock_os):
Expand All @@ -49,3 +64,20 @@ def test_console_producer(self, mock_os):
mock_os.system.assert_called_with(
Kubectl().exec("-it", "{cluster}-kafka-0").container("kafka").namespace(self.namespace).exec_command(
native_command).build().format(cluster=self.cluster, topic=self.topic))

@mock.patch('kfk.console_command.transfer_file_to_container')
@mock.patch('kfk.console_command.os')
def test_console_producer_with_producer_config(self, mock_os, mock_transfer_file_to_container):
result = self.runner.invoke(kfk, ['console-producer', '--topic', self.topic, '--producer.config',
'files/client.properties', '-c', self.cluster, '-n', self.namespace])
assert result.exit_code == 0
native_command = "bin/kafka-console-producer.sh --broker-list {cluster}-kafka-brokers:9093 --topic {topic} " \
"--producer-property security.protocol=SSL --producer-property " \
"ssl.truststore.location=/tmp/truststore.jks --producer-property " \
"ssl.truststore.password=123456 --producer-property ssl.keystore.location=/tmp/user.p12 " \
"--producer-property ssl.keystore.password=123456;rm -rf /tmp/truststore.jks;rm -rf " \
"/tmp/user.p12;"

mock_os.system.assert_called_with(
Kubectl().exec("-it", "{cluster}-kafka-0").container("kafka").namespace(self.namespace).exec_command(
native_command).build().format(cluster=self.cluster, topic=self.topic))
4 changes: 2 additions & 2 deletions tests/test_kubectl_cmd_builder.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from unittest import TestCase
from kfk.kubectl_command_builder import Kubectl
from kfk.constants import KUBECTL_PATH
from kfk.config import KUBECTL_PATH


class TestKubectl(TestCase):
Expand All @@ -23,7 +23,7 @@ def test_exec(self):
self.assertEqual(
Kubectl().exec("-it", "test-pod").container("test-container").namespace("test-namespace").exec_command(
"echo 'test'").build(),
"{kubectl} exec -it test-pod -c test-container -n test-namespace -- echo 'test'".format(kubectl=KUBECTL_PATH))
"{kubectl} exec -it test-pod -c test-container -n test-namespace -- bash -c \"echo 'test'\"".format(kubectl=KUBECTL_PATH))

def test_kafkas_all(self):
self.assertEqual(Kubectl().get().kafkas().build(), "{kubectl} get kafkas".format(kubectl=KUBECTL_PATH))
Expand Down
Loading

0 comments on commit 837580d

Please sign in to comment.