aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorTejas Patil <tejasp@fb.com>2016-05-27 12:05:11 -0700
committerReynold Xin <rxin@databricks.com>2016-05-27 12:05:11 -0700
commita96e4151a9d429cfaf457c07b4ce174890a3b39b (patch)
tree569dc53323762261e23aeebda4e964a0280616a7 /sql
parentb376a4eabc82d622ea26290345c01465af7a628d (diff)
downloadspark-a96e4151a9d429cfaf457c07b4ce174890a3b39b.tar.gz
spark-a96e4151a9d429cfaf457c07b4ce174890a3b39b.tar.bz2
spark-a96e4151a9d429cfaf457c07b4ce174890a3b39b.zip
[SPARK-14400][SQL] ScriptTransformation does not fail the job for bad user command
## What changes were proposed in this pull request? - Refer to the Jira for the problem: jira : https://issues.apache.org/jira/browse/SPARK-14400 - The fix is to check if the process has exited with a non-zero exit code in `hasNext()`. I have moved this and checking of writer thread exception to a separate method. ## How was this patch tested? - Ran a job which had incorrect transform script command and saw that the job fails - Existing unit tests for `ScriptTransformationSuite`. Added a new unit test Author: Tejas Patil <tejasp@fb.com> Closes #12194 from tejasapatil/script_transform.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanTest.scala7
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala90
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala18
3 files changed, 81 insertions, 34 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanTest.scala
index 9fe0e9646e..b29e822add 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanTest.scala
@@ -231,7 +231,12 @@ object SparkPlanTest {
}
}
- private def executePlan(outputPlan: SparkPlan, spark: SQLContext): Seq[Row] = {
+ /**
+ * Runs the plan
+ * @param outputPlan SparkPlan to be executed
+ * @param spark SqlContext used for execution of the plan
+ */
+ def executePlan(outputPlan: SparkPlan, spark: SQLContext): Seq[Row] = {
val execution = new QueryExecution(spark.sparkSession, null) {
override lazy val sparkPlan: SparkPlan = outputPlan transform {
case plan: SparkPlan =>
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
index f6e6a75c3e..9e25e1d40c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
@@ -32,7 +32,7 @@ import org.apache.hadoop.hive.serde2.AbstractSerDe
import org.apache.hadoop.hive.serde2.objectinspector._
import org.apache.hadoop.io.Writable
-import org.apache.spark.TaskContext
+import org.apache.spark.{SparkException, TaskContext}
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
@@ -127,45 +127,71 @@ case class ScriptTransformation(
}
val mutableRow = new SpecificMutableRow(output.map(_.dataType))
+ private def checkFailureAndPropagate(cause: Throwable = null): Unit = {
+ if (writerThread.exception.isDefined) {
+ throw writerThread.exception.get
+ }
+
+ // Checks if the proc is still alive (incase the command ran was bad)
+ // The ideal way to do this is to use Java 8's Process#isAlive()
+ // but it cannot be used because Spark still supports Java 7.
+ // Following is a workaround used to check if a process is alive in Java 7
+ // TODO: Once builds are switched to Java 8, this can be changed
+ try {
+ val exitCode = proc.exitValue()
+ if (exitCode != 0) {
+ logError(stderrBuffer.toString) // log the stderr circular buffer
+ throw new SparkException(s"Subprocess exited with status $exitCode. " +
+ s"Error: ${stderrBuffer.toString}", cause)
+ }
+ } catch {
+ case _: IllegalThreadStateException =>
+ // This means that the process is still alive. Move ahead
+ }
+ }
+
override def hasNext: Boolean = {
- if (outputSerde == null) {
- if (curLine == null) {
- curLine = reader.readLine()
+ try {
+ if (outputSerde == null) {
if (curLine == null) {
- if (writerThread.exception.isDefined) {
- throw writerThread.exception.get
+ curLine = reader.readLine()
+ if (curLine == null) {
+ checkFailureAndPropagate()
+ return false
}
- false
- } else {
- true
}
- } else {
- true
- }
- } else if (scriptOutputWritable == null) {
- scriptOutputWritable = reusedWritableObject
+ } else if (scriptOutputWritable == null) {
+ scriptOutputWritable = reusedWritableObject
- if (scriptOutputReader != null) {
- if (scriptOutputReader.next(scriptOutputWritable) <= 0) {
- writerThread.exception.foreach(throw _)
- false
+ if (scriptOutputReader != null) {
+ if (scriptOutputReader.next(scriptOutputWritable) <= 0) {
+ checkFailureAndPropagate()
+ return false
+ }
} else {
- true
- }
- } else {
- try {
- scriptOutputWritable.readFields(scriptOutputStream)
- true
- } catch {
- case _: EOFException =>
- if (writerThread.exception.isDefined) {
- throw writerThread.exception.get
- }
- false
+ try {
+ scriptOutputWritable.readFields(scriptOutputStream)
+ } catch {
+ case _: EOFException =>
+ // This means that the stdout of `proc` (ie. TRANSFORM process) has exhausted.
+ // Ideally the proc should *not* be alive at this point but
+ // there can be a lag between EOF being written out and the process
+ // being terminated. So explicitly waiting for the process to be done.
+ proc.waitFor()
+ checkFailureAndPropagate()
+ return false
+ }
}
}
- } else {
+
true
+ } catch {
+ case NonFatal(e) =>
+ // If this exception is due to abrupt / unclean termination of `proc`,
+ // then detect it and propagate a better exception message for end users
+ checkFailureAndPropagate(e)
+
+ throw e
}
}
@@ -284,7 +310,6 @@ private class ScriptTransformationWriterThread(
}
}
}
- outputStream.close()
threwException = false
} catch {
case NonFatal(e) =>
@@ -295,6 +320,7 @@ private class ScriptTransformationWriterThread(
throw e
} finally {
try {
+ outputStream.close()
if (proc.waitFor() != 0) {
logError(stderrBuffer.toString) // log the stderr circular buffer
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala
index 6f80622407..a8e81d7a3c 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.hive.execution
import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
import org.scalatest.exceptions.TestFailedException
-import org.apache.spark.TaskContext
+import org.apache.spark.{SparkException, TaskContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
@@ -109,6 +109,22 @@ class ScriptTransformationSuite extends SparkPlanTest with TestHiveSingleton {
}
assert(e.getMessage().contains("intentional exception"))
}
+
+ test("SPARK-14400 script transformation should fail for bad script command") {
+ val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a")
+
+ val e = intercept[SparkException] {
+ val plan =
+ new ScriptTransformation(
+ input = Seq(rowsDf.col("a").expr),
+ script = "some_non_existent_command",
+ output = Seq(AttributeReference("a", StringType)()),
+ child = rowsDf.queryExecution.sparkPlan,
+ ioschema = serdeIOSchema)
+ SparkPlanTest.executePlan(plan, hiveContext)
+ }
+ assert(e.getMessage.contains("Subprocess exited with status"))
+ }
}
private case class ExceptionInjectingOperator(child: SparkPlan) extends UnaryExecNode {