Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Jt400: tests are not cleaning after themselves and parallel run fails #6001

Merged
merged 1 commit into from
Apr 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion integration-tests/jt400/README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -110,4 +110,26 @@ $Env:JT400_KEYED_QUEUE="#lkeyedqueue_if_not_TESTKEYED.DTAQ"
$Env:JT400_MESSAGE_QUEUE="#messagequeue_if_not_TESTMSGQ.MSGQ"
$Env:JT400_MESSAGE_REPLYTO_QUEUE="#messagequeueinquiry_if_not_REPLYMSGQ.MSGQ"
$Env:JT400_USER_SPACE="#userspace_if_not_PROGCALL"
```
```

=== Clear queues after unexpected failures

If tests finishes without unexpected failure, tests are taking care of clearing the data.
In some cases data might stay written into the real server if test fails unexpectedly.
This state should might alter following executions.

To force full clear (of each queue) can be achieved by add ing parameter
```
-Dcq.jt400.clear-all=true
```
Be aware that with `-Dcq.jt400.clear-all=true`, the tests can not successfully finish in parallel run.

Usage of clear queues parameter is *strongly* suggested during development


==== Parallel runs and locking

Simple locking mechanism is implemented for the test to allow parallel executions.

Whenever test is started, new entry is written into keyed data queue `JT400_KEYED_QUEUE` with the key `cq.jt400.global-lock` and entry is removed after the run.
Tests are able to clear this lock even if previous execution fails unexpectedly.
5 changes: 5 additions & 0 deletions integration-tests/jt400/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@
<artifactId>rest-assured</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<profiles>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.quarkus.component.jt400.it;

import jakarta.inject.Singleton;

@Singleton
public class InquiryMessageHolder {

private String messageText;

private boolean processed = false;

public String getMessageText() {
return messageText;
}

public void setMessageText(String messageText) {
this.messageText = messageText;
}

public boolean isProcessed() {
return processed;
}

public void setProcessed(boolean processed) {
this.processed = processed;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,19 @@
*/
package org.apache.camel.quarkus.component.jt400.it;

import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

import com.ibm.as400.access.AS400;
import com.ibm.as400.access.MessageQueue;
import com.ibm.as400.access.QueuedMessage;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.PathParam;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.QueryParam;
import jakarta.ws.rs.core.MediaType;
Expand Down Expand Up @@ -80,6 +80,9 @@ public class Jt400Resource {
@Inject
CamelContext context;

@Inject
InquiryMessageHolder inquiryMessageHolder;

@Path("/dataQueue/read/")
@POST
@Produces(MediaType.APPLICATION_JSON)
Expand All @@ -101,7 +104,7 @@ public Response keyedDataQueueRead(String key, @QueryParam("format") String form
Exchange ex = consumerTemplate.receive(getUrlForLibrary(suffix.toString()));

if ("binary".equals(format)) {
return generateResponse(new String(ex.getIn().getBody(byte[].class), Charset.forName("Cp037")), ex);
return generateResponse(new String(ex.getIn().getBody(byte[].class), StandardCharsets.UTF_8), ex);
}
return generateResponse(ex.getIn().getBody(String.class), ex);

Expand All @@ -112,61 +115,85 @@ public Response keyedDataQueueRead(String key, @QueryParam("format") String form
@Consumes(MediaType.TEXT_PLAIN)
@Produces(MediaType.TEXT_PLAIN)
public Response keyedDataQueueWrite(@QueryParam("key") String key,
@QueryParam("searchType") String searchType,
@QueryParam("format") String format,
String data) {
String _format = Optional.ofNullable(format).orElse("text");
boolean keyed = key != null;
StringBuilder suffix = new StringBuilder();
Map<String, Object> headers = new HashMap<>();
String msg;

if (keyed) {
suffix.append(jt400KeyedQueue).append("?keyed=true");
suffix.append(jt400KeyedQueue).append("?keyed=true").append("&format=").append(_format);
headers.put(Jt400Endpoint.KEY, key);
msg = "Hello From KDQ: " + data;
} else {
suffix.append(jt400LifoQueue);
suffix.append(jt400LifoQueue).append("?format=").append(_format);
msg = "Hello From DQ: " + data;
}

Object ex = producerTemplate.requestBodyAndHeaders(
getUrlForLibrary(suffix.toString()),
"Hello " + data,
headers);
return Response.ok().entity(ex).build();
Object retVal;
if ("binary".equals(format)) {
byte[] result = (byte[]) producerTemplate.requestBodyAndHeaders(
getUrlForLibrary(suffix.toString()),
("Hello (bin) " + data).getBytes(StandardCharsets.UTF_8),
headers);
retVal = new String(result, StandardCharsets.UTF_8);
} else {
retVal = producerTemplate.requestBodyAndHeaders(
getUrlForLibrary(suffix.toString()),
msg,
headers);
}

return Response.ok().entity(retVal).build();
}

@Path("/client/inquiryMessage/write/")
@POST
@Path("/route/start/{route}")
@GET
@Produces(MediaType.TEXT_PLAIN)
public Response clientInquiryMessageWrite(String data) throws Exception {
Jt400Endpoint jt400Endpoint = context.getEndpoint(getUrlForLibrary(jt400MessageReplyToQueue), Jt400Endpoint.class);
AS400 as400 = jt400Endpoint.getConfiguration().getConnection();
//send inquiry message (with the same client as is used in the component, to avoid `CPF2451 Message queue TESTMSGQ is allocated to another job`.
MessageQueue queue = new MessageQueue(as400, jt400Endpoint.getConfiguration().getObjectPath());
try {
queue.sendInquiry(data, "/QSYS.LIB/" + jt400Library + ".LIB/" + jt400MessageReplyToQueue);
} catch (Exception e) {
return Response.status(500).entity(e.getMessage()).build();
public Response startRoute(@PathParam("route") String routeName) throws Exception {
if (context.getRouteController().getRouteStatus(routeName).isStartable()) {
context.getRouteController().startRoute(routeName);
}
return Response.ok().build();

return Response.ok().entity(context.getRouteController().getRouteStatus(routeName).isStarted()).build();
}

@Path("/client/queuedMessage/read")
@POST
@Path("/route/stop/{route}")
@GET
@Produces(MediaType.TEXT_PLAIN)
public Response clientQueuedMessageRead(String queueName) throws Exception {
public Response stopRoute(@PathParam("route") String routeName) throws Exception {
if (context.getRouteController().getRouteStatus(routeName).isStoppable()) {
context.getRouteController().stopRoute(routeName);
}
boolean resp = context.getRouteController().getRouteStatus(routeName).isStopped();

//stop component to avoid CPF2451 Message queue REPLYMSGQ is allocated to another job.
Jt400Endpoint jt400Endpoint = context.getEndpoint(getUrlForLibrary(jt400MessageReplyToQueue), Jt400Endpoint.class);
jt400Endpoint.close();

Jt400Endpoint jt400Endpoint = context.getEndpoint(getUrlForLibrary(queueName), Jt400Endpoint.class);
AS400 as400 = jt400Endpoint.getConfiguration().getConnection();
//send inquiry message (with the same client as is used in the component, to avoid `CPF2451 Message queue TESTMSGQ is allocated to another job`.
MessageQueue queue = new MessageQueue(as400, jt400Endpoint.getConfiguration().getObjectPath());
QueuedMessage message = queue.receive(null);
return Response.ok().entity(resp).build();
}

return Response.ok().entity(message != null ? message.getText() : "").build();
@Path("/inquiryMessageSetExpected")
@POST
public void inquiryMessageSetExpected(String msg) {
inquiryMessageHolder.setMessageText(msg);
}

@Path("/inquiryMessageProcessed")
@GET
@Produces(MediaType.TEXT_PLAIN)
public String inquiryMessageProcessed() {
return String.valueOf(inquiryMessageHolder.isProcessed());
}

@Path("/messageQueue/write/")
@POST
@Produces(MediaType.TEXT_PLAIN)
public Response messageQueueWrite(String data) {
Object ex = producerTemplate.requestBody(getUrlForLibrary(jt400MessageQueue), "Hello " + data);
Object ex = producerTemplate.requestBody(getUrlForLibrary(jt400MessageQueue), "Hello from MQ: " + data);

return Response.ok().entity(ex).build();
}
Expand All @@ -176,7 +203,7 @@ public Response messageQueueWrite(String data) {
@Produces(MediaType.APPLICATION_JSON)
public Response messageQueueRead(@QueryParam("queue") String queue) {
Exchange ex = consumerTemplate
.receive(getUrlForLibrary(queue == null ? jt400MessageQueue : queue));
.receive(getUrlForLibrary(queue == null ? jt400MessageQueue : queue) + "?messageAction=SAME");

return generateResponse(ex.getIn().getBody(String.class), ex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@

import com.ibm.as400.access.AS400Message;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.jt400.Jt400Constants;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.jboss.logging.Logger;

@ApplicationScoped
public class Jt400Routes extends RouteBuilder {
private static final Logger LOGGER = Logger.getLogger(Jt400Routes.class);

@ConfigProperty(name = "cq.jt400.library")
String jt400Library;
Expand All @@ -40,16 +43,31 @@ public class Jt400Routes extends RouteBuilder {
@ConfigProperty(name = "cq.jt400.message-replyto-queue")
String jt400MessageReplyToQueue;

@Inject
InquiryMessageHolder inquiryMessageHolder;

@Override
public void configure() throws Exception {
from(getUrlForLibrary(jt400MessageReplyToQueue + "?sendingReply=true"))
.id("inquiryRoute")
//route has tobe stopped to avoid "CPF2451 Message queue REPLYMSGQ is allocated to another job."
.autoStartup(false)
.choice()
.when(header(Jt400Constants.MESSAGE_TYPE).isEqualTo(AS400Message.INQUIRY))
.process((exchange) -> {
String reply = "reply to: " + exchange.getIn().getBody(String.class);
String msg = exchange.getIn().getBody(String.class);
LOGGER.debug(
"Inquiry route: received '" + msg + "' (expecting '" + inquiryMessageHolder.getMessageText()
+ "')");
if (inquiryMessageHolder.getMessageText() != null && !inquiryMessageHolder.getMessageText().equals(msg)) {
throw new IllegalStateException(
"Intentional! Current exchange is not triggered by current test process, therefore ignoring the exchange");
}
String reply = "reply to: " + msg;
exchange.getIn().setBody(reply);
})
.to(getUrlForLibrary(jt400MessageReplyToQueue));
.to(getUrlForLibrary(jt400MessageReplyToQueue))
.process(e -> inquiryMessageHolder.setProcessed(true));
}

private String getUrlForLibrary(String suffix) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,4 @@ cq.jt400.user-space=${JT400_USER_SPACE:PROGCALL}
cq.jt400.message-queue=${JT400_MESSAGE_QUEUE:TESTMSGQ.MSGQ}
cq.jt400.message-replyto-queue=${JT400_MESSAGE_REPLYTO_QUEUE:REPLYMSGQ.MSGQ}
cq.jt400.keyed-queue=${JT400_KEYED_QUEUE:TESTKEYED.DTAQ}
cq.jt400.lifo-queue=${JT400_LIFO_QUEUE:TESTLIFO.DTAQ}
cq.jt400.lifo-queue=${JT400_LIFO_QUEUE:TESTLIFO.DTAQ}
Loading
Loading