Skip to content

Commit

Permalink
Added endpoint to process XMessage.
Browse files Browse the repository at this point in the history
  • Loading branch information
chinmoy12c committed Dec 1, 2023
1 parent bd721d1 commit 607b0ab
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 41 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.uci.transformer.controllers;

import com.uci.transformer.odk.ODKConsumerReactive;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import javax.ws.rs.BadRequestException;

@RestController()
public class XMessageController {

@Autowired
private ODKConsumerReactive odkConsumerReactive;

@PostMapping("/xmsg/processXMessage")
public void processXMessage(@RequestBody String xMessage) {
if (xMessage == null || xMessage.isEmpty() || xMessage.isBlank()) {
throw new BadRequestException();
}
odkConsumerReactive.processMessage(xMessage);
}
}
83 changes: 42 additions & 41 deletions src/main/java/com/uci/transformer/odk/ODKConsumerReactive.java
Original file line number Diff line number Diff line change
Expand Up @@ -163,47 +163,7 @@ public void onMessage() {
.doOnNext(new Consumer<ReceiverRecord<String, String>>() {
@Override
public void accept(ReceiverRecord<String, String> stringMessage) {
final long startTime = System.currentTimeMillis();
final Date startDateTime = new Date();
try {
XMessage msg = XMessageParser.parse(new ByteArrayInputStream(stringMessage.value().getBytes()));
logTimeTaken(startTime, 1);
Mono.just(msg)
.flatMap(message -> transform(message))
.subscribeOn(Schedulers.parallel())
.subscribe(transformedMessage -> {
long endTime = System.currentTimeMillis();
long duration = (endTime - startTime);
log.info("Total time spent in processing form: " + duration + ". Start: " + startDateTime + ". End: " + new Date());
logTimeTaken(startTime, 2);
if (transformedMessage != null) {
try {
if (transformedMessage.getTransformers() != null && transformedMessage.getTransformers().get(0) != null
&& transformedMessage.getTransformers().get(0).getMetaData() != null && transformedMessage.getTransformers().get(0).getMetaData().get("type") != null
&& transformedMessage.getTransformers().get(0).getMetaData().get("type").equals("generic")) {
log.info("CP-04" + transformedMessage.toXML());
kafkaProducer.send(genericTransformer, transformedMessage.toXML());

} else {
log.info("CP-05");
kafkaProducer.send(processOutboundTopic, transformedMessage.toXML());
}
} catch (JAXBException e) {
log.error("An error occured : " + e.getMessage());
e.printStackTrace();
}
}
});
} catch (JAXBException e) {
log.error("An error occured : " + e.getMessage());
e.printStackTrace();
} catch (NullPointerException e) {
log.error("An error occured : " + e.getMessage() + " at line no : " + e.getStackTrace()[0].getLineNumber()
+ " in class : " + e.getStackTrace()[0].getClassName());
} catch (Exception e) {
log.error("An error occured : " + e.getMessage());
e.printStackTrace();
}
processMessage(stringMessage.value());
}
})
.doOnError(new Consumer<Throwable>() {
Expand All @@ -216,6 +176,47 @@ public void accept(Throwable e) {

}

public void processMessage(String stringMessage) {
final long startTime = System.currentTimeMillis();
final Date startDateTime = new Date();
try {
XMessage msg = XMessageParser.parse(new ByteArrayInputStream(stringMessage.getBytes()));
logTimeTaken(startTime, 1);
Mono.just(msg)
.flatMap(this::transform)
.subscribeOn(Schedulers.parallel())
.subscribe(transformedMessage -> {
long endTime = System.currentTimeMillis();
long duration = (endTime - startTime);
log.info("Total time spent in processing form: " + duration + ". Start: " + startDateTime + ". End: " + new Date());
logTimeTaken(startTime, 2);
if (transformedMessage != null) {
try {
if (transformedMessage.getTransformers() != null && transformedMessage.getTransformers().get(0) != null
&& transformedMessage.getTransformers().get(0).getMetaData() != null && transformedMessage.getTransformers().get(0).getMetaData().get("type") != null
&& transformedMessage.getTransformers().get(0).getMetaData().get("type").equals("generic")) {
log.info("CP-04" + transformedMessage.toXML());
kafkaProducer.send(genericTransformer, transformedMessage.toXML());

} else {
log.info("CP-05");
kafkaProducer.send(processOutboundTopic, transformedMessage.toXML());
}
} catch (JAXBException e) {
log.error("An error occured : " + e.getMessage());
e.printStackTrace();
}
}
});
} catch (NullPointerException e) {
log.error("An error occured : " + e.getMessage() + " at line no : " + e.getStackTrace()[0].getLineNumber()
+ " in class : " + e.getStackTrace()[0].getClassName());
} catch (Exception e) {
log.error("An error occured : " + e.getMessage());
e.printStackTrace();
}
}

@Override
public Mono<XMessage> transform(XMessage xMessage) {
ArrayList<Transformer> transformers = xMessage.getTransformers();
Expand Down

0 comments on commit 607b0ab

Please sign in to comment.