Skip to content

Commit

Permalink
Recreate memory leak behavior reported in [Issue 341](reactor/reactor…
Browse files Browse the repository at this point in the history
  • Loading branch information
basking2 committed Apr 24, 2023
0 parents commit e703308
Show file tree
Hide file tree
Showing 4 changed files with 151 additions and 0 deletions.
38 changes: 38 additions & 0 deletions Reactor Kafka 341/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
target/
!.mvn/wrapper/maven-wrapper.jar
!**/src/main/**/target/
!**/src/test/**/target/

### IntelliJ IDEA ###
.idea/modules.xml
.idea/jarRepositories.xml
.idea/compiler.xml
.idea/libraries/
*.iws
*.iml
*.ipr

### Eclipse ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache

### NetBeans ###
/nbproject/private/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/
build/
!**/src/main/**/build/
!**/src/test/**/build/

### VS Code ###
.vscode/

### Mac OS ###
.DS_Store
7 changes: 7 additions & 0 deletions Reactor Kafka 341/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Usage

```shell

mvn package
java -Xmx300m -jar target target/reactor-kafka-341-1.0-SNAPSHOT.jar
```
60 changes: 60 additions & 0 deletions Reactor Kafka 341/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.github.basking2</groupId>
<artifactId>reactor-kafka-341</artifactId>
<version>1.0-SNAPSHOT</version>

<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.4.1</version>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>ReactorKafka341</mainClass>
</transformer>
</transformers>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

<dependencies>
<dependency>
<groupId>io.projectreactor.kafka</groupId>
<artifactId>reactor-kafka</artifactId>
<version>1.3.16</version>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.4.13</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.1.2</version>
</dependency>
</dependencies>

</project>
46 changes: 46 additions & 0 deletions Reactor Kafka 341/src/main/java/ReactorKafka341.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderOptions;
import reactor.kafka.sender.SenderRecord;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;

public class ReactorKafka341 {
public static void main(final String[] args) {

while (true) {
final KafkaSender<Integer, String> sender = sender();

final Flux<SenderRecord<Integer, String, String>> flux = Flux
.just(((int)(10000*Math.random()))+"")
.map(event -> SenderRecord.<Integer, String, String>create("topic", null, null, null, event, event))
.publishOn(Schedulers.parallel());
;

sender
.send(flux)
.doFinally(signalType -> sender.close())
.blockLast(Duration.ofSeconds(30))
;
}
}

private static KafkaSender<Integer, String> sender() {
final Map<String, Object> properties = new HashMap<>();

properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

final SenderOptions<Integer, String> opts = SenderOptions.create(properties);
opts.stopOnError(false);

return KafkaSender.create(opts);
}
}

0 comments on commit e703308

Please sign in to comment.