Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parrallel Producer per partition #394

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public class ProduceService extends AbstractService {
};
private final String _name;
private final ProduceMetrics _sensors;
private KMBaseProducer _producer;
private ArrayList<KMBaseProducer> producerList = new ArrayList<>();
private final KMPartitioner _partitioner;
private ScheduledExecutorService _produceExecutor;
private final ScheduledExecutorService _handleNewPartitionsExecutor;
Expand All @@ -75,6 +75,7 @@ public class ProduceService extends AbstractService {
private final int _threadsNum;
private final AdminClient _adminClient;
private static final String KEY_SERIALIZER_CLASS = "org.apache.kafka.common.serialization.StringSerializer";
private Properties producerProps;

public ProduceService(Map<String, Object> props, String name) throws Exception {
// TODO: Make values of below fields come from configs
Expand Down Expand Up @@ -131,7 +132,7 @@ public ProduceService(Map<String, Object> props, String name) throws Exception {
}

private void initializeProducer(Map<String, Object> props) throws Exception {
Properties producerProps = new Properties();
producerProps = new Properties();
// Assign default config. This has the lowest priority.
producerProps.put(ProducerConfig.ACKS_CONFIG, "-1");
producerProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "20000");
Expand All @@ -150,8 +151,7 @@ private void initializeProducer(Map<String, Object> props) throws Exception {
props.forEach(producerProps::putIfAbsent);
}

_producer = (KMBaseProducer) Class.forName(_producerClassName).getConstructor(Properties.class).newInstance(producerProps);
LOG.info("{}/ProduceService is initialized.", _name);
LOG.info("{}/ProduceService Properties are initialized.", _name);
}

@Override
Expand Down Expand Up @@ -200,7 +200,10 @@ public synchronized void stop() {
if (_running.compareAndSet(true, false)) {
_produceExecutor.shutdown();
_handleNewPartitionsExecutor.shutdown();
_producer.close();
for (KMBaseProducer producer : producerList) {
producer.close();
}
producerList.clear();
LOG.info("{}/ProduceService stopped.", _name);
}
}
Expand Down Expand Up @@ -228,17 +231,34 @@ public boolean isRunning() {
private class ProduceRunnable implements Runnable {
private final int _partition;
private final String _key;
private KMBaseProducer _producer;
private String producerID;

ProduceRunnable(int partition, String key) {
_partition = partition;
_key = key;
_producer=null;
producerID="";

try{
long threadIdLong = Thread.currentThread().getId();
String threadId=String.valueOf(threadIdLong);
producerID=_producerId+threadId+_partition;
producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, producerID);
LOG.info("{}/ProduceService is initialized, with ID", _producerId+" A "+threadId+" B "+_partition+" C "+producerID);
_producer = (KMBaseProducer) Class.forName(_producerClassName).getConstructor(Properties.class).newInstance(producerProps);
producerList.add(_producer);
} catch (Exception e) {
LOG.error("Failed to initialize Produce : ", e);
throw new IllegalStateException(e);
}
}

public void run() {
try {
long nextIndex = _nextIndexPerPartition.get(_partition).get();
long currMs = System.currentTimeMillis();
String message = Utils.jsonFromFields(_topic, nextIndex, currMs, _producerId, _recordSize);
String message = Utils.jsonFromFields(_topic, nextIndex, currMs, producerID, _recordSize);
BaseProducerRecord record = new BaseProducerRecord(_topic, _partition, _key, message);
RecordMetadata metadata = _producer.send(record, _sync);
_sensors._produceDelay.record(System.currentTimeMillis() - currMs);
Expand Down Expand Up @@ -286,7 +306,11 @@ public void run() {
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
_producer.close();
LOG.info("Stopping Producer Service");
for (KMBaseProducer producer : producerList) {
producer.close();
}
producerList.clear();
try {
initializeProducer(new HashMap<>());
} catch (Exception e) {
Expand Down