aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorzhichao.li <zhichao.li@intel.com>2015-06-11 22:28:28 -0700
committerMichael Armbrust <michael@databricks.com>2015-06-11 22:28:28 -0700
commit2dd7f93080ee882afcc2aac1a419802a19a668ce (patch)
tree25a9891236bb54438deb47b56b8073ee0267770e
parentb9d177c5110cd054fdb9bcbeeb5f4ca9aa645dc1 (diff)
downloadspark-2dd7f93080ee882afcc2aac1a419802a19a668ce.tar.gz
spark-2dd7f93080ee882afcc2aac1a419802a19a668ce.tar.bz2
spark-2dd7f93080ee882afcc2aac1a419802a19a668ce.zip
[SPARK-7862] [SQL] Fix the deadlock in script transformation for stderr
[Related PR SPARK-7044] (https://github.com/apache/spark/pull/5671) Author: zhichao.li <zhichao.li@intel.com> Closes #6404 from zhichao-li/transform and squashes the following commits: 8418c97 [zhichao.li] add comments and remove useless failAfter logic d9677e1 [zhichao.li] redirect the error desitination to be the same as the current process
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala7
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala12
2 files changed, 17 insertions, 2 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
index fd623370cc..28792db768 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.hive.execution
import java.io.{BufferedReader, DataInputStream, DataOutputStream, EOFException, InputStreamReader}
+import java.lang.ProcessBuilder.Redirect
import java.util.Properties
import scala.collection.JavaConversions._
@@ -58,6 +59,12 @@ case class ScriptTransformation(
child.execute().mapPartitions { iter =>
val cmd = List("/bin/bash", "-c", script)
val builder = new ProcessBuilder(cmd)
+ // redirectError(Redirect.INHERIT) would consume the error output from buffer and
+ // then print it to stderr (inherit the target from the current Scala process).
+ // If without this there would be 2 issues:
+ // 1) The error msg generated by the script process would be hidden.
+ // 2) If the error msg is too big to chock up the buffer, the input logic would be hung
+ builder.redirectError(Redirect.INHERIT)
val proc = builder.start()
val inputStream = proc.getInputStream
val outputStream = proc.getOutputStream
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 8bd4900497..c8e5e24632 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -645,12 +645,20 @@ class SQLQuerySuite extends QueryTest {
.queryExecution.analyzed
}
- test("test script transform") {
+ test("test script transform for stdout") {
val data = (1 to 100000).map { i => (i, i, i) }
data.toDF("d1", "d2", "d3").registerTempTable("script_trans")
assert(100000 ===
sql("SELECT TRANSFORM (d1, d2, d3) USING 'cat' AS (a,b,c) FROM script_trans")
- .queryExecution.toRdd.count())
+ .queryExecution.toRdd.count())
+ }
+
+ test("test script transform for stderr") {
+ val data = (1 to 100000).map { i => (i, i, i) }
+ data.toDF("d1", "d2", "d3").registerTempTable("script_trans")
+ assert(0 ===
+ sql("SELECT TRANSFORM (d1, d2, d3) USING 'cat 1>&2' AS (a,b,c) FROM script_trans")
+ .queryExecution.toRdd.count())
}
test("window function: udaf with aggregate expressin") {