diff options
3 files changed, 81 insertions, 34 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanTest.scala index 9fe0e9646e..b29e822add 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanTest.scala @@ -231,7 +231,12 @@ object SparkPlanTest { } } - private def executePlan(outputPlan: SparkPlan, spark: SQLContext): Seq[Row] = { + /** + * Runs the plan + * @param outputPlan SparkPlan to be executed + * @param spark SqlContext used for execution of the plan + */ + def executePlan(outputPlan: SparkPlan, spark: SQLContext): Seq[Row] = { val execution = new QueryExecution(spark.sparkSession, null) { override lazy val sparkPlan: SparkPlan = outputPlan transform { case plan: SparkPlan => diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala index f6e6a75c3e..9e25e1d40c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala @@ -32,7 +32,7 @@ import org.apache.hadoop.hive.serde2.AbstractSerDe import org.apache.hadoop.hive.serde2.objectinspector._ import org.apache.hadoop.io.Writable -import org.apache.spark.TaskContext +import org.apache.spark.{SparkException, TaskContext} import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} @@ -127,45 +127,71 @@ case class ScriptTransformation( } val mutableRow = new SpecificMutableRow(output.map(_.dataType)) + private def checkFailureAndPropagate(cause: Throwable = null): Unit = { + if (writerThread.exception.isDefined) { + throw writerThread.exception.get + } + + // Checks if the proc is still alive (incase the command ran was bad) + // The ideal way to do this is to use Java 8's Process#isAlive() + // but it cannot be used because Spark still supports Java 7. + // Following is a workaround used to check if a process is alive in Java 7 + // TODO: Once builds are switched to Java 8, this can be changed + try { + val exitCode = proc.exitValue() + if (exitCode != 0) { + logError(stderrBuffer.toString) // log the stderr circular buffer + throw new SparkException(s"Subprocess exited with status $exitCode. " + + s"Error: ${stderrBuffer.toString}", cause) + } + } catch { + case _: IllegalThreadStateException => + // This means that the process is still alive. Move ahead + } + } + override def hasNext: Boolean = { - if (outputSerde == null) { - if (curLine == null) { - curLine = reader.readLine() + try { + if (outputSerde == null) { if (curLine == null) { - if (writerThread.exception.isDefined) { - throw writerThread.exception.get + curLine = reader.readLine() + if (curLine == null) { + checkFailureAndPropagate() + return false } - false - } else { - true } - } else { - true - } - } else if (scriptOutputWritable == null) { - scriptOutputWritable = reusedWritableObject + } else if (scriptOutputWritable == null) { + scriptOutputWritable = reusedWritableObject - if (scriptOutputReader != null) { - if (scriptOutputReader.next(scriptOutputWritable) <= 0) { - writerThread.exception.foreach(throw _) - false + if (scriptOutputReader != null) { + if (scriptOutputReader.next(scriptOutputWritable) <= 0) { + checkFailureAndPropagate() + return false + } } else { - true - } - } else { - try { - scriptOutputWritable.readFields(scriptOutputStream) - true - } catch { - case _: EOFException => - if (writerThread.exception.isDefined) { - throw writerThread.exception.get - } - false + try { + scriptOutputWritable.readFields(scriptOutputStream) + } catch { + case _: EOFException => + // This means that the stdout of `proc` (ie. TRANSFORM process) has exhausted. + // Ideally the proc should *not* be alive at this point but + // there can be a lag between EOF being written out and the process + // being terminated. So explicitly waiting for the process to be done. + proc.waitFor() + checkFailureAndPropagate() + return false + } } } - } else { + true + } catch { + case NonFatal(e) => + // If this exception is due to abrupt / unclean termination of `proc`, + // then detect it and propagate a better exception message for end users + checkFailureAndPropagate(e) + + throw e } } @@ -284,7 +310,6 @@ private class ScriptTransformationWriterThread( } } } - outputStream.close() threwException = false } catch { case NonFatal(e) => @@ -295,6 +320,7 @@ private class ScriptTransformationWriterThread( throw e } finally { try { + outputStream.close() if (proc.waitFor() != 0) { logError(stderrBuffer.toString) // log the stderr circular buffer } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala index 6f80622407..a8e81d7a3c 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.hive.execution import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe import org.scalatest.exceptions.TestFailedException -import org.apache.spark.TaskContext +import org.apache.spark.{SparkException, TaskContext} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} @@ -109,6 +109,22 @@ class ScriptTransformationSuite extends SparkPlanTest with TestHiveSingleton { } assert(e.getMessage().contains("intentional exception")) } + + test("SPARK-14400 script transformation should fail for bad script command") { + val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a") + + val e = intercept[SparkException] { + val plan = + new ScriptTransformation( + input = Seq(rowsDf.col("a").expr), + script = "some_non_existent_command", + output = Seq(AttributeReference("a", StringType)()), + child = rowsDf.queryExecution.sparkPlan, + ioschema = serdeIOSchema) + SparkPlanTest.executePlan(plan, hiveContext) + } + assert(e.getMessage.contains("Subprocess exited with status")) + } } private case class ExceptionInjectingOperator(child: SparkPlan) extends UnaryExecNode { |