Skip to content

Commit

Permalink
Merge pull request #365 from HSLdevcom/develop
Browse files Browse the repository at this point in the history
Update pulsar-version to 3.0.6
  • Loading branch information
thjarvin authored Nov 18, 2024
2 parents af79c6c + 4ae08ef commit 5a3312f
Show file tree
Hide file tree
Showing 6 changed files with 6 additions and 174 deletions.
2 changes: 0 additions & 2 deletions .github/workflows/test-and-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ jobs:
cache: 'maven'
- name: Build with Maven
run: mvn clean package -Dmaven.javadoc.skip=true -B -V
- name: Run integration tests
run: mvn failsafe:integration-test && mvn failsafe:verify
publish:
needs: test
runs-on: ubuntu-latest
Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@
<modelVersion>4.0.0</modelVersion>
<groupId>fi.hsl</groupId>
<artifactId>transitdata-common</artifactId>
<version>1.6.4</version>
<version>1.6.6</version>
<packaging>jar</packaging>
<name>Common utilities for Transitdata projects</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<pulsar.version>2.3.1</pulsar.version>
<pulsar.version>3.0.6</pulsar.version>
<testcontainers.version>1.18.3</testcontainers.version>
</properties>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import fi.hsl.common.config.ConfigUtils;
import fi.hsl.common.transitdata.TransitdataProperties;
import org.apache.pulsar.client.api.*;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -41,15 +40,6 @@ public class ITBaseTestSuite {
@ClassRule
public static PulsarContainer pulsar = MockContainers.newPulsarContainer();

@BeforeClass
public static void setUp() throws Exception {
MockContainers.configurePulsarContainer(pulsar, TENANT, NAMESPACE);

if (PRINT_PULSAR_LOG) {
MockContainers.tail(pulsar, logger);
}
}

protected static PulsarApplication createPulsarApp(String config, String testId) throws Exception {
logger.info("Creating Pulsar Application for config " + config);
Config configObj = PulsarMockApplication.readConfig(config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,9 @@
import com.typesafe.config.Config;
import fi.hsl.common.config.ConfigParser;
import fi.hsl.common.config.ConfigUtils;
import fi.hsl.common.health.HealthServer;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.shade.org.apache.http.HttpResponse;
import org.apache.pulsar.shade.org.apache.http.HttpStatus;
import org.apache.pulsar.shade.org.apache.http.client.HttpClient;
import org.apache.pulsar.shade.org.apache.http.client.methods.*;
import org.apache.pulsar.shade.org.apache.http.impl.client.HttpClientBuilder;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.slf4j.Logger;
Expand All @@ -21,13 +14,9 @@
import org.testcontainers.containers.PulsarContainer;
import redis.clients.jedis.Jedis;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.Charset;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.function.BooleanSupplier;

import static org.junit.Assert.*;

Expand All @@ -47,16 +36,7 @@ public class ITPulsarApplication {

@ClassRule
public static PulsarContainer pulsar = MockContainers.newPulsarContainer();

@BeforeClass
public static void setUp() throws Exception {
MockContainers.configurePulsarContainer(pulsar, TENANT, NAMESPACE);

if (PRINT_PULSAR_LOG) {
MockContainers.tail(pulsar, logger);
}
}


@Test
public void testRedisContainer() {
Jedis jedis = MockContainers.newMockJedisConnection(redis);
Expand Down Expand Up @@ -244,119 +224,4 @@ public void testInitFailure(Config config) {
logger.debug("Exception as expected");
}
}

@Test
public void testHttpServer() throws Exception {
Config base = PulsarMockApplication.readConfig(CONFIG_FILE);

PulsarApplication app = PulsarMockApplication.newInstance(base, redis, pulsar);
assertNotNull(app);

logger.info("Pulsar Application created, testing HealthServer");

final Producer<byte[]> producer = app.getContext().getSingleProducer();
final Consumer<byte[]> consumer = app.getContext().getConsumer();
final Jedis jedis = app.getContext().getJedis();
final HealthServer healthServer = app.getContext().getHealthServer();

assertTrue(consumer.isConnected());
assertTrue(producer.isConnected());
assertTrue(jedis.isConnected());

logger.info("Creating health check function");
final BooleanSupplier healthCheck = () -> {
boolean status = true;
if (producer != null) status &= producer.isConnected();
if (consumer != null) status &= consumer.isConnected();
if (jedis != null) status &= jedis.isConnected();
return status;
};
healthServer.addCheck(healthCheck);

String url = "http://localhost:" + healthServer.port + healthServer.endpoint;

logger.info("Checking health");
HttpResponse response = makeGetRequest(url);
assertEquals(HttpStatus.SC_OK, response.getStatusLine().getStatusCode());
assertEquals("OK", getContent(response));

logger.info("Disconnecting Jedis and checking health");
jedis.disconnect();
assertFalse(jedis.isConnected());

response = makeGetRequest(url);
assertEquals(HttpStatus.SC_SERVICE_UNAVAILABLE, response.getStatusLine().getStatusCode());
assertEquals("FAIL", getContent(response));

logger.info("Reconnecting Jedis and checking health");
jedis.connect();
assertTrue(jedis.isConnected());

response = makeGetRequest(url);
assertEquals(HttpStatus.SC_OK, response.getStatusLine().getStatusCode());
assertEquals("OK", getContent(response));

logger.info("Closing Pulsar consumer and checking health");
consumer.close();
assertFalse(consumer.isConnected());

response = makeGetRequest(url);
assertEquals(HttpStatus.SC_SERVICE_UNAVAILABLE, response.getStatusLine().getStatusCode());
assertEquals("FAIL", getContent(response));

response = makePostRequest(url);
assertEquals(HttpStatus.SC_METHOD_NOT_ALLOWED, response.getStatusLine().getStatusCode());
assertEquals("Method Not Allowed", getContent(response));

url = "http://localhost:" + healthServer.port + "/foo";
response = makeGetRequest(url);
assertEquals(HttpStatus.SC_NOT_FOUND, response.getStatusLine().getStatusCode());
assertEquals("Not Found", getContent(response));

url = "http://localhost:" + healthServer.port + healthServer.endpoint + "foo";
response = makeGetRequest(url);
assertEquals(HttpStatus.SC_NOT_FOUND, response.getStatusLine().getStatusCode());
assertEquals("Not Found", getContent(response));

app.close();
assertFalse(consumer.isConnected());
assertFalse(producer.isConnected());
assertFalse(jedis.isConnected());
}

private HttpResponse makeGetRequest(final String url) throws IOException {
return makeRequest("GET", url);
}

private HttpResponse makePostRequest(final String url) throws IOException {
return makeRequest("POST", url);
}

private HttpResponse makeRequest(final String method, final String url) throws IOException {
HttpClient client = HttpClientBuilder.create().build();
HttpUriRequest request;
switch (method.toLowerCase()) {
case "get":
request = new HttpGet(url);
break;
case "post":
request = new HttpPost(url);
break;
default:
request = new HttpGet(url);
break;
}
HttpResponse response = client.execute(request);
return response;
}

private String getContent(final HttpResponse response) throws IOException {
BufferedReader reader = new BufferedReader(new InputStreamReader(response.getEntity().getContent()));
StringBuffer content = new StringBuffer();
String line;
while ((line = reader.readLine()) != null) {
content.append(line);
}
return content.toString();
}
}
Original file line number Diff line number Diff line change
@@ -1,19 +1,13 @@
package fi.hsl.common.pulsar;

import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.PulsarContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import redis.clients.jedis.Jedis;

import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;

public class MockContainers {

static final Logger logger = LoggerFactory.getLogger(MockContainers.class);
Expand Down Expand Up @@ -44,23 +38,7 @@ public static Jedis newMockJedisConnection(GenericContainer redis) {
public static PulsarContainer newPulsarContainer() {
return new PulsarContainer("2.3.1");
}

public static PulsarContainer configurePulsarContainer(PulsarContainer pulsar, final String tenant, final String namespace) throws Exception {
PulsarAdmin admin = PulsarAdmin.builder()
.serviceHttpUrl(pulsar.getHttpServiceUrl())
.build();

TenantInfo info = new TenantInfo();
Set<String> clusters = new HashSet<>(Arrays.asList("standalone"));
info.setAllowedClusters(clusters);
info.setAdminRoles(new HashSet<>(Arrays.asList("all")));
admin.tenants().createTenant(tenant, info);

admin.namespaces().createNamespace(tenant + "/" + namespace, clusters);
logger.info("Pulsar setup done");
return pulsar;
}


public static PulsarClient newMockPulsarClient(PulsarContainer pulsar) throws Exception {
return PulsarClient.builder()
.serviceUrl(pulsar.getPulsarBrokerUrl())
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/fi/hsl/common/pulsar/PulsarApplication.java
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,8 @@ protected Consumer<byte[]> createConsumer(@NotNull PulsarClient client, @NotNull

if (config.getBoolean("pulsar.consumer.multipleTopics")) {
if (config.hasPath("pulsar.consumer.topics")) {
List<String> topics = config.getStringList("pulsar.consumer.topics");
String topicsString = config.getString("pulsar.consumer.topics");
List<String> topics = Arrays.asList(topicsString.split(","));
log.info("Creating Pulsar consumer for topics: [ {} ]", String.join(", ", topics));
builder = builder.topics(topics);
} else {
Expand Down

0 comments on commit 5a3312f

Please sign in to comment.