diff options
author | zhichao.li <zhichao.li@intel.com> | 2015-06-11 22:28:28 -0700 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2015-06-11 22:28:28 -0700 |
commit | 2dd7f93080ee882afcc2aac1a419802a19a668ce (patch) | |
tree | 25a9891236bb54438deb47b56b8073ee0267770e | |
parent | b9d177c5110cd054fdb9bcbeeb5f4ca9aa645dc1 (diff) | |
download | spark-2dd7f93080ee882afcc2aac1a419802a19a668ce.tar.gz spark-2dd7f93080ee882afcc2aac1a419802a19a668ce.tar.bz2 spark-2dd7f93080ee882afcc2aac1a419802a19a668ce.zip |
[SPARK-7862] [SQL] Fix the deadlock in script transformation for stderr
[Related PR SPARK-7044] (https://github.com/apache/spark/pull/5671)
Author: zhichao.li <zhichao.li@intel.com>
Closes #6404 from zhichao-li/transform and squashes the following commits:
8418c97 [zhichao.li] add comments and remove useless failAfter logic
d9677e1 [zhichao.li] redirect the error desitination to be the same as the current process
-rw-r--r-- | sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala | 7 | ||||
-rw-r--r-- | sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala | 12 |
2 files changed, 17 insertions, 2 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala index fd623370cc..28792db768 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.hive.execution import java.io.{BufferedReader, DataInputStream, DataOutputStream, EOFException, InputStreamReader} +import java.lang.ProcessBuilder.Redirect import java.util.Properties import scala.collection.JavaConversions._ @@ -58,6 +59,12 @@ case class ScriptTransformation( child.execute().mapPartitions { iter => val cmd = List("/bin/bash", "-c", script) val builder = new ProcessBuilder(cmd) + // redirectError(Redirect.INHERIT) would consume the error output from buffer and + // then print it to stderr (inherit the target from the current Scala process). + // If without this there would be 2 issues: + // 1) The error msg generated by the script process would be hidden. + // 2) If the error msg is too big to chock up the buffer, the input logic would be hung + builder.redirectError(Redirect.INHERIT) val proc = builder.start() val inputStream = proc.getInputStream val outputStream = proc.getOutputStream diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 8bd4900497..c8e5e24632 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -645,12 +645,20 @@ class SQLQuerySuite extends QueryTest { .queryExecution.analyzed } - test("test script transform") { + test("test script transform for stdout") { val data = (1 to 100000).map { i => (i, i, i) } data.toDF("d1", "d2", "d3").registerTempTable("script_trans") assert(100000 === sql("SELECT TRANSFORM (d1, d2, d3) USING 'cat' AS (a,b,c) FROM script_trans") - .queryExecution.toRdd.count()) + .queryExecution.toRdd.count()) + } + + test("test script transform for stderr") { + val data = (1 to 100000).map { i => (i, i, i) } + data.toDF("d1", "d2", "d3").registerTempTable("script_trans") + assert(0 === + sql("SELECT TRANSFORM (d1, d2, d3) USING 'cat 1>&2' AS (a,b,c) FROM script_trans") + .queryExecution.toRdd.count()) } test("window function: udaf with aggregate expressin") { |