Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove zookeeper config as admin client is not using it #386

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ Xinfra Monitor supports Apache Kafka 0.8 to 2.0:
<code>./bin/xinfra-monitor-start.sh config/xinfra-monitor.properties</code>. The default
xinfra-monitor.properties in the repo provides an simple example of how to
monitor a single cluster. You probably need to change the value of
<code>zookeeper.connect</code> and <code>bootstrap.servers</code> to point to your cluster.
<code>bootstrap.servers</code> to point to your cluster.
</li>
<br />
<li> The full list of configs and their documentation can be found in the code of
Expand Down Expand Up @@ -165,12 +165,12 @@ whether messages can be properly produced to and consumed from this cluster.
See Service Overview wiki for how these metrics are derived.

```
$ ./bin/single-cluster-monitor.sh --topic test --broker-list localhost:9092 --zookeeper localhost:2181
$ ./bin/single-cluster-monitor.sh --topic test --broker-list localhost:9092
```

### Run MultiClusterMonitor app to monitor a pipeline of Kafka clusters connected by MirrorMaker
Edit `config/multi-cluster-monitor.properties` to specify the right broker and
zookeeper url as suggested by the comment in the properties file
Edit `config/multi-cluster-monitor.properties` to specify the right broker
as suggested by the comment in the properties file

Metrics `produce-availability-avg` and `consume-availability-avg` demonstrate
whether messages can be properly produced to the source cluster and consumed
Expand Down
1 change: 0 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ allprojects {
compile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.8.2'
compile 'org.apache.commons:commons-lang3:3.12.0'
compile 'com.linkedin.avroutil1:helper-all:0.2.118'
compile 'org.apache.zookeeper:zookeeper:3.8.0'
testCompile 'org.mockito:mockito-core:2.24.0'
testCompile 'org.testng:testng:6.8.8'
}
Expand Down
6 changes: 1 addition & 5 deletions config/multi-cluster-monitor.properties
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
# an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

# This properties file specifies an example configure to monitor a pipeline of Kafka clusters.
# User probably needs to change zookeeper.connect and bootstrap.servers to point to respective clusters.
# User probably needs to change bootstrap.servers to point to respective clusters.
# More clusters can be added in the map for "topic.management.config.per.cluster" to reference
# each cluster in the pipeline. The "produce.service.props" should use the first cluster and
# the "consume.service.props" should use the last cluster in the pipeline.
Expand All @@ -20,15 +20,13 @@
"class.name": "com.linkedin.kmf.apps.MultiClusterMonitor",
"topic": "kafka-monitor-topic",
"produce.service.props": {
"zookeeper.connect": "localhost:2181/first_cluster",
"bootstrap.servers": "localhost:9092",
"produce.record.delay.ms": 100,
"produce.producer.props": {
"client.id": "kafka-monitor-client-id"
}
},
"consume.service.props": {
"zookeeper.connect": "localhost:2181/last_cluster",
"bootstrap.servers": "localhost:9095",
"consume.latency.sla.ms": "20000",
"consume.consumer.props": {
Expand All @@ -39,7 +37,6 @@
"topic.management.props.per.cluster" : {
"first-cluster" : {
"bootstrap.servers": "localhost:9092",
"zookeeper.connect": "localhost:2181/first_cluster",
"topic-management.topicCreationEnabled": true,
"topic-management.replicationFactor" : 1,
"topic-management.partitionsToBrokersRatio" : 2.0,
Expand All @@ -50,7 +47,6 @@

"last-cluster" : {
"bootstrap.servers": "localhost:9095",
"zookeeper.connect": "localhost:2181/last_cluster",
"topic-management.topicCreationEnabled": true,
"topic-management.replicationFactor" : 1,
"topic-management.partitionsToBrokersRatio" : 2.0,
Expand Down
6 changes: 0 additions & 6 deletions config/xinfra-monitor.properties
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
"single-cluster-monitor": {
"class.name": "com.linkedin.xinfra.monitor.apps.SingleClusterMonitor",
"topic": "xinfra-monitor-topic",
"zookeeper.connect": "localhost:2181",
"bootstrap.servers": "localhost:9092,localhost:9093",
"request.timeout.ms": 9000,
"produce.record.delay.ms": 100,
Expand All @@ -72,7 +71,6 @@

"offset-commit-service": {
"class.name": "com.linkedin.xinfra.monitor.services.OffsetCommitService",
"zookeeper.connect": "localhost:2181",
"bootstrap.servers": "localhost:9092,localhost:9093",
"consumer.props": {
"group.id": "target-consumer-group"
Expand Down Expand Up @@ -122,7 +120,6 @@

"cluster-topic-manipulation-service":{
"class.name":"com.linkedin.xinfra.monitor.services.ClusterTopicManipulationService",
"zookeeper.connect": "localhost:2181",
"bootstrap.servers":"localhost:9092,localhost:9093",
"topic": "xinfra-monitor-topic"
},
Expand All @@ -131,7 +128,6 @@
# "produce-service": {
# "class.name": "com.linkedin.kmf.services.ProduceService",
# "topic": "xinfra-monitor-topic",
# "zookeeper.connect": "localhost:2181",
# "bootstrap.servers": "localhost:9092",
# "consume.latency.sla.ms": "20000",
# "consume.consumer.props": {
Expand All @@ -142,7 +138,6 @@
# "consume-service": {
# "class.name": "com.linkedin.kmf.services.ConsumeService",
# "topic": "xinfra-monitor-topic",
# "zookeeper.connect": "localhost:2181",
# "bootstrap.servers": "localhost:9092",
# "consume.latency.sla.ms": "20000",
# "consume.consumer.props": {
Expand All @@ -166,7 +161,6 @@
"reporter-kafka-service": {
"class.name": "com.linkedin.xinfra.monitor.services.KafkaMetricsReporterService",
"report.interval.sec": 3,
"zookeeper.connect": "localhost:2181",
"bootstrap.servers": "localhost:9092",
"topic": "xinfra-monitor-topic-metrics",
"report.kafka.topic.replication.factor": 1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,14 +188,6 @@ private static ArgumentParser argParser() {
.dest("brokerList")
.help("Comma-separated list of Kafka brokers in the form HOST1:PORT1,HOST2:PORT2,...");

parser.addArgument("--zookeeper")
.action(net.sourceforge.argparse4j.impl.Arguments.store())
.required(true)
.type(String.class)
.metavar("HOST:PORT")
.dest("zkConnect")
.help("The connection string for the zookeeper connection in the form host:port");

parser.addArgument("--record-size")
.action(net.sourceforge.argparse4j.impl.Arguments.store())
.required(false)
Expand Down Expand Up @@ -330,7 +322,6 @@ public static void main(String[] args) throws Exception {
Namespace res = parser.parseArgs(args);
Map<String, Object> props = new HashMap<>();
// produce service config
props.put(ProduceServiceConfig.ZOOKEEPER_CONNECT_CONFIG, res.getString("zkConnect"));
props.put(ProduceServiceConfig.BOOTSTRAP_SERVERS_CONFIG, res.getString("brokerList"));
if (res.getString("producerClassName") != null)
props.put(ProduceServiceConfig.PRODUCER_CLASS_CONFIG, res.getString("producerClassName"));
Expand Down
2 changes: 0 additions & 2 deletions src/main/java/com/linkedin/xinfra/monitor/common/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,6 @@
*/
public class Utils {
private static final Logger LOG = LoggerFactory.getLogger(Utils.class);
public static final int ZK_CONNECTION_TIMEOUT_MS = 30_000;
public static final int ZK_SESSION_TIMEOUT_MS = 30_000;
private static final long LIST_PARTITION_REASSIGNMENTS_TIMEOUT_MS = 60000L;
private static final int LIST_PARTITION_REASSIGNMENTS_MAX_ATTEMPTS = 3;
private static final String LIST_PARTITION_REASSIGNMENTS_TIMEOUT_MS_CONFIG = "list.partition.reassignment.timeout.ms";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ public class ClusterTopicManipulationService implements Service {

private final ClusterTopicManipulationMetrics _clusterTopicManipulationMetrics;
private final TopicFactory _topicFactory;
private final String _zkConnect;

public ClusterTopicManipulationService(String name, AdminClient adminClient, Map<String, Object> props)
throws ClassNotFoundException, NoSuchMethodException, IllegalAccessException, InvocationTargetException,
Expand Down Expand Up @@ -96,7 +95,6 @@ public ClusterTopicManipulationService(String name, AdminClient adminClient, Map
TopicManagementServiceConfig.TOPIC_FACTORY_PROPS_CONFIG) : new HashMap();

_clusterTopicManipulationMetrics = new ClusterTopicManipulationMetrics(metrics, tags);
_zkConnect = config.getString(TopicManagementServiceConfig.ZOOKEEPER_CONNECT_CONFIG);
_topicFactory =
(TopicFactory) Class.forName(topicFactoryClassName).getConstructor(Map.class).newInstance(topicFactoryConfig);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class ConsumerFactoryImpl implements ConsumerFactory {
private final int _latencyPercentileMaxMs;
private final int _latencyPercentileGranularityMs;
private static final String[] NON_OVERRIDABLE_PROPERTIES =
new String[] {ConsumeServiceConfig.BOOTSTRAP_SERVERS_CONFIG, ConsumeServiceConfig.ZOOKEEPER_CONNECT_CONFIG};
new String[] {ConsumeServiceConfig.BOOTSTRAP_SERVERS_CONFIG};
private final int _latencySlaMs;
private static AdminClient adminClient;
private static final Logger LOG = LoggerFactory.getLogger(ConsumerFactoryImpl.class);
Expand All @@ -46,7 +46,6 @@ public ConsumerFactoryImpl(Map<String, Object> props) throws Exception {
? (Map) props.get(ConsumeServiceConfig.CONSUMER_PROPS_CONFIG) : new HashMap<>();
ConsumeServiceConfig config = new ConsumeServiceConfig(props);
_topic = config.getString(ConsumeServiceConfig.TOPIC_CONFIG);
String zkConnect = config.getString(ConsumeServiceConfig.ZOOKEEPER_CONNECT_CONFIG);
String brokerList = config.getString(ConsumeServiceConfig.BOOTSTRAP_SERVERS_CONFIG);
String consumerClassName = config.getString(ConsumeServiceConfig.CONSUMER_CLASS_CONFIG);
_latencySlaMs = config.getInt(ConsumeServiceConfig.LATENCY_SLA_MS_CONFIG);
Expand All @@ -72,7 +71,6 @@ public ConsumerFactoryImpl(Map<String, Object> props) throws Exception {

/* Assign config specified for ConsumeService. */
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
consumerProps.put(CommonServiceConfig.ZOOKEEPER_CONNECT_CONFIG, zkConnect);

/* Assign config specified for consumer. This has the highest priority. */
consumerProps.putAll(consumerPropsOverride);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,6 @@ public void run() {

@SuppressWarnings("FieldCanBeLocal")
static class TopicManagementHelper {
private final String _zkConnect;
private final int _replicationFactor;
private final double _minPartitionsToBrokersRatio;
private final int _minPartitionNum;
Expand All @@ -270,7 +269,6 @@ static class TopicManagementHelper {
_topicAddPartitionEnabled = config.getBoolean(TopicManagementServiceConfig.TOPIC_ADD_PARTITION_ENABLED_CONFIG);
_topicReassignPartitionAndElectLeaderEnabled = config.getBoolean(TopicManagementServiceConfig.TOPIC_REASSIGN_PARTITION_AND_ELECT_LEADER_ENABLED_CONFIG);
_topic = config.getString(TopicManagementServiceConfig.TOPIC_CONFIG);
_zkConnect = config.getString(TopicManagementServiceConfig.ZOOKEEPER_CONNECT_CONFIG);
_replicationFactor = config.getInt(TopicManagementServiceConfig.TOPIC_REPLICATION_FACTOR_CONFIG);
_minPartitionsToBrokersRatio = config.getDouble(TopicManagementServiceConfig.PARTITIONS_TO_BROKERS_RATIO_CONFIG);
_minPartitionNum = config.getInt(TopicManagementServiceConfig.MIN_PARTITION_NUM_CONFIG);
Expand All @@ -296,18 +294,18 @@ static class TopicManagementHelper {
}

private void logConfigurationValues() {
LOGGER.info("TopicManagementHelper for cluster with Zookeeper connect {} is configured with " +
LOGGER.info("TopicManagementHelper for cluster with bootstrap servers {} is configured with " +
"[topic={}, topicCreationEnabled={}, topicAddPartitionEnabled={}, " +
"topicReassignPartitionAndElectLeaderEnabled={}, minPartitionsToBrokersRatio={}, " +
"minPartitionNum={}]", _zkConnect, _topic, _topicCreationEnabled, _topicAddPartitionEnabled,
"minPartitionNum={}]", _bootstrapServers, _topic, _topicCreationEnabled, _topicAddPartitionEnabled,
_topicReassignPartitionAndElectLeaderEnabled, _minPartitionsToBrokersRatio, _minPartitionNum);
}

@SuppressWarnings("unchecked")
void maybeCreateTopic() throws Exception {
if (!_topicCreationEnabled) {
LOGGER.info("Topic creation is not enabled for {} in a cluster with Zookeeper URL {}. " +
"Refer to config: {}", _topic, _zkConnect, TopicManagementServiceConfig.TOPIC_CREATION_ENABLED_CONFIG);
LOGGER.info("Topic creation is not enabled for {} in a cluster with bootstrap servers {}. " +
"Refer to config: {}", _topic, _bootstrapServers, TopicManagementServiceConfig.TOPIC_CREATION_ENABLED_CONFIG);
return;
}
NewTopic newTopic = new NewTopic(_topic, minPartitionNum(), (short) _replicationFactor);
Expand All @@ -328,8 +326,8 @@ int minPartitionNum() throws InterruptedException, ExecutionException {
void maybeAddPartitions(final int requiredMinPartitionNum)
throws ExecutionException, InterruptedException, CancellationException, TimeoutException {
if (!_topicAddPartitionEnabled) {
LOGGER.info("Adding partition to {} topic is not enabled in a cluster with Zookeeper URL {}. " +
"Refer to config: {}", _topic, _zkConnect, TopicManagementServiceConfig.TOPIC_ADD_PARTITION_ENABLED_CONFIG);
LOGGER.info("Adding partition to {} topic is not enabled in a cluster with bootstrap servers {}. " +
"Refer to config: {}", _topic, _bootstrapServers, TopicManagementServiceConfig.TOPIC_ADD_PARTITION_ENABLED_CONFIG);
return;
}
Map<String, KafkaFuture<TopicDescription>> kafkaFutureMap =
Expand Down Expand Up @@ -447,8 +445,8 @@ private Set<Node> getAvailableBrokers() throws ExecutionException, InterruptedEx

void maybeReassignPartitionAndElectLeader() throws ExecutionException, InterruptedException, TimeoutException {
if (!_topicReassignPartitionAndElectLeaderEnabled) {
LOGGER.info("Reassign partition and elect leader to {} topic is not enabled in a cluster with Zookeeper URL {}. " +
"Refer to config: {}", _topic, _zkConnect, TopicManagementServiceConfig.TOPIC_REASSIGN_PARTITION_AND_ELECT_LEADER_ENABLED_CONFIG);
LOGGER.info("Reassign partition and elect leader to {} topic is not enabled in a cluster with bootstrap servers {}. " +
"Refer to config: {}", _topic, _bootstrapServers, TopicManagementServiceConfig.TOPIC_REASSIGN_PARTITION_AND_ELECT_LEADER_ENABLED_CONFIG);
return;
}
List<TopicPartitionInfo> partitionInfoList =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ public Service createService() throws JsonProcessingException {
@SuppressWarnings("unchecked")
private Properties prepareConfigs(Map<String, Object> props) {

String zkConnect = (String) props.get(CommonServiceConfig.ZOOKEEPER_CONNECT_CONFIG);
String brokerList = (String) props.get(CommonServiceConfig.BOOTSTRAP_SERVERS_CONFIG);

Properties consumerProps = new Properties();
Expand All @@ -70,7 +69,6 @@ private Properties prepareConfigs(Map<String, Object> props) {
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
consumerProps.put(CommonServiceConfig.ZOOKEEPER_CONNECT_CONFIG, zkConnect);

Map<String, String> customProps = (Map<String, String>) props.get(CommonServiceConfig.CONSUMER_PROPS_CONFIG);
if (customProps != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@
public class ProduceService extends AbstractService {
private static final Logger LOG = LoggerFactory.getLogger(ProduceService.class);
private static final String[] NON_OVERRIDABLE_PROPERTIES = new String[]{
ProduceServiceConfig.BOOTSTRAP_SERVERS_CONFIG,
ProduceServiceConfig.ZOOKEEPER_CONNECT_CONFIG
ProduceServiceConfig.BOOTSTRAP_SERVERS_CONFIG
};
private final String _name;
private final ProduceMetrics _sensors;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@ public class CommonServiceConfig {
public static final String CONSUMER_PROPS_CONFIG = "consumer.props";
public static final String CONSUMER_PROPS_DOC = "consumer props";

public static final String ZOOKEEPER_CONNECT_CONFIG = "zookeeper.connect";
public static final String ZOOKEEPER_CONNECT_DOC = "Zookeeper connect string.";

public static final String BOOTSTRAP_SERVERS_CONFIG = CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
public static final String BOOTSTRAP_SERVERS_DOC = CommonClientConfigs.BOOTSTRAP_SERVERS_DOC;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@ public class ConsumeServiceConfig extends AbstractConfig {

private static final ConfigDef CONFIG;

public static final String ZOOKEEPER_CONNECT_CONFIG = CommonServiceConfig.ZOOKEEPER_CONNECT_CONFIG;
public static final String ZOOKEEPER_CONNECT_DOC = CommonServiceConfig.ZOOKEEPER_CONNECT_DOC;

public static final String BOOTSTRAP_SERVERS_CONFIG = CommonServiceConfig.BOOTSTRAP_SERVERS_CONFIG;
public static final String BOOTSTRAP_SERVERS_DOC = CommonServiceConfig.BOOTSTRAP_SERVERS_DOC;

Expand All @@ -48,11 +45,7 @@ public class ConsumeServiceConfig extends AbstractConfig {
+ "as the fraction of messages that are either lost or whose delivery latency exceeds this value";

static {
CONFIG = new ConfigDef().define(ZOOKEEPER_CONNECT_CONFIG,
ConfigDef.Type.STRING,
ConfigDef.Importance.HIGH,
ZOOKEEPER_CONNECT_DOC)
.define(BOOTSTRAP_SERVERS_CONFIG,
CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG,
ConfigDef.Type.STRING,
ConfigDef.Importance.HIGH,
BOOTSTRAP_SERVERS_DOC)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,6 @@ public class KafkaMetricsReporterServiceConfig extends AbstractConfig {
public static final String REPORT_INTERVAL_SEC_CONFIG = CommonServiceConfig.REPORT_INTERVAL_SEC_CONFIG;
public static final String REPORT_INTERVAL_SEC_DOC = CommonServiceConfig.REPORT_INTERVAL_SEC_DOC;

public static final String ZOOKEEPER_CONNECT_CONFIG = CommonServiceConfig.ZOOKEEPER_CONNECT_CONFIG;
public static final String ZOOKEEPER_CONNECT_DOC = CommonServiceConfig.ZOOKEEPER_CONNECT_DOC;

public static final String BOOTSTRAP_SERVERS_CONFIG = CommonServiceConfig.BOOTSTRAP_SERVERS_CONFIG;
public static final String BOOTSTRAP_SERVERS_DOC = CommonServiceConfig.BOOTSTRAP_SERVERS_DOC;

Expand All @@ -50,10 +47,6 @@ public class KafkaMetricsReporterServiceConfig extends AbstractConfig {
1,
ConfigDef.Importance.LOW,
REPORT_INTERVAL_SEC_DOC)
.define(ZOOKEEPER_CONNECT_CONFIG,
ConfigDef.Type.STRING,
ConfigDef.Importance.HIGH,
ZOOKEEPER_CONNECT_DOC)
.define(BOOTSTRAP_SERVERS_CONFIG,
ConfigDef.Type.STRING,
ConfigDef.Importance.HIGH,
Expand Down
Loading