Skip to content

Commit

Permalink
Merge pull request #530 from MohamedSabthar/event-stream-fixes
Browse files Browse the repository at this point in the history
Handle Ballerina errors when calling byte stream next method in native side
  • Loading branch information
MohamedSabthar authored Jul 24, 2024
2 parents d4e9f7a + d8b390d commit f193322
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ public static Object getStreamEntryRecord(BObject entityObj, long inputArraySize
} while (arraySize > 0);
bytes = output.toByteArray();
}
} catch (IOException ex) {
} catch (RuntimeException | IOException ex) {
return IOUtils.createError(IOConstants.ErrorCode.GenericError,
"Error occurred while reading stream:" + ex.getMessage());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import static io.ballerina.stdlib.mime.util.MimeConstants.MULTIPART_AS_PRIMARY_TYPE;
import static io.ballerina.stdlib.mime.util.MimeConstants.TEXT_EVENT_STREAM;
import static io.ballerina.stdlib.mime.util.MimeUtil.isNotNullAndEmpty;
import static io.ballerina.stdlib.mime.util.MimeUtil.removeJavaExceptionPrefix;

/**
* Entity body related operations are included here.
Expand Down Expand Up @@ -397,7 +398,18 @@ public void notifySuccess(Object result) {
latch.countDown();
return;
}
writeContentPart((BMap) result, outputStream);
if (result instanceof BError error) {
entity.addNativeData(ENTITY_BYTE_STREAM, null);
this.notifyFailure(error);
}
try {
writeContentPart((BMap) result, outputStream);
} catch (Exception e) {
latch.countDown();
throw ErrorCreator.createError(StringUtils.fromString(
"Error occurred while writing the stream content: "
+ removeJavaExceptionPrefix(e.getMessage())));
}
writeContent(env, entity, outputStream, iteratorObj, latch);
}

Expand All @@ -407,7 +419,7 @@ public void notifyFailure(BError bError) {
throw ErrorCreator.createError(StringUtils.fromString(
"Error occurred while streaming content: " + bError.getMessage()));
}
}, null, null);
}, null, null, new Object[]{});
}

/**
Expand Down Expand Up @@ -435,7 +447,18 @@ public void notifySuccess(Object result) {
EntityBodyHandler.closeMessageOutputStream(outputStream);
return;
}
writeContentPart((BMap) result, outputStream);
if (result instanceof BError error) {
entity.addNativeData(ENTITY_BYTE_STREAM, null);
this.notifyFailure(error);
}
try {
writeContentPart((BMap) result, outputStream);
} catch (Exception e) {
EntityBodyHandler.closeMessageOutputStream(outputStream);
throw ErrorCreator.createError(StringUtils.fromString(
"Error occurred while writing the stream content: "
+ removeJavaExceptionPrefix(e.getMessage())));
}
writeEvent(env, entity, outputStream, iteratorObj);
}

Expand All @@ -453,7 +476,7 @@ private static void closeMessageOutputStream(OutputStream messageOutputStream) {
if (messageOutputStream != null) {
messageOutputStream.close();
}
} catch (IOException e) {
} catch (Exception e) {
log.error("Couldn't close message output stream", e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
import java.nio.charset.StandardCharsets;
import java.util.Enumeration;
import java.util.Random;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static io.ballerina.stdlib.mime.util.MimeConstants.ASSIGNMENT;
import static io.ballerina.stdlib.mime.util.MimeConstants.BODY_PARTS;
Expand Down Expand Up @@ -547,4 +549,11 @@ public static Module getMimePackage() {
}

private MimeUtil() {}

public static String removeJavaExceptionPrefix(String errorMessage) {
String prefixRegex = "^(?:[a-z]+\\.)+[A-Za-z]+Exception:";
Pattern pattern = Pattern.compile(prefixRegex);
Matcher matcher = pattern.matcher(errorMessage);
return matcher.find() ? matcher.replaceFirst("").trim() : errorMessage;
}
}

0 comments on commit f193322

Please sign in to comment.