From 1796ccb6a6c28e334b4be71755778b262d216fd9 Mon Sep 17 00:00:00 2001 From: Rick Bernotas Date: Tue, 7 Mar 2017 14:45:30 -0600 Subject: [PATCH] LIVY-322 - subprocess.call() commands in a PySpark snippet can potentially insert raw text into the sys_stdout in the fake_shell main(). This will then fail to be correctly parsed by PythonInterpreter in the sendRequest, as it will trigger a JsonParseException that is not caught. Added code to catch the JsonParseException and then retry reads of stdout until a valid line of JSON is reached, or 100 retries have been attempted. --- .../livy/repl/PythonInterpreter.scala | 31 ++++++++++++++++++- 1 file changed, 30 insertions(+), 1 deletion(-) diff --git a/repl/src/main/scala/com/cloudera/livy/repl/PythonInterpreter.scala b/repl/src/main/scala/com/cloudera/livy/repl/PythonInterpreter.scala index 2195d0ea1..f6acdbdd3 100644 --- a/repl/src/main/scala/com/cloudera/livy/repl/PythonInterpreter.scala +++ b/repl/src/main/scala/com/cloudera/livy/repl/PythonInterpreter.scala @@ -28,6 +28,7 @@ import scala.collection.mutable.ArrayBuffer import scala.collection.JavaConverters._ import scala.util.control.NonFatal +import com.fasterxml.jackson.core.JsonParseException import org.apache.spark.{SparkConf, SparkContext} import org.json4s.{DefaultFormats, JValue} import org.json4s.JsonAST.JObject @@ -254,7 +255,35 @@ private class PythonInterpreter(process: Process, gatewayServer: GatewayServer, stdin.flush() Option(stdout.readLine()).map { case line => - parse(line) + try { + parse(line) + } + catch { + // LIVY-322 - If a statement puts raw text in the stdout without fake_shell parsing it, + // retry the readLine up to 100 times in an attempt to find the next parsable line of JSON. + case e: JsonParseException => retryRead(100) + } + } + } + + // LIVY-322 - Method for retrying reads from stdout if a JsonParseException is encountered + // due to non-JSON formatted text in the stdout. Accepts a param of the number of times + // to retry the read, and recurses until either it finds a parsable line of JSON, or + // exhausts the maximum level of retries. If the maximum number of retries is exhausted, + // an Exception is thrown. + private def retryRead(maxRetries: Int): JValue = { + if (maxRetries > 0) { + try { + parse(stdout.readLine()) + } catch { + case e: JsonParseException => + retryRead(maxRetries - 1) + } + } + else { + throw new Exception( + "Livy is unable to find valid JSON in the response from fake_shell. " + + "Please recreate the session.") } }