aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorCheng Hao <hao.cheng@intel.com>2015-04-23 10:35:22 -0700
committerReynold Xin <rxin@databricks.com>2015-04-23 10:35:22 -0700
commitcc48e6387abdd909921cb58e0588cdf226556bcd (patch)
treed3c36de282dccd523ee7bba07e12873a6dab117f /sql
parent975f53e4f978759db7639cd08498ad8cd0ae2a56 (diff)
downloadspark-cc48e6387abdd909921cb58e0588cdf226556bcd.tar.gz
spark-cc48e6387abdd909921cb58e0588cdf226556bcd.tar.bz2
spark-cc48e6387abdd909921cb58e0588cdf226556bcd.zip
[SPARK-7044] [SQL] Fix the deadlock in script transformation
Author: Cheng Hao <hao.cheng@intel.com> Closes #5625 from chenghao-intel/transform and squashes the following commits: 5ec1dd2 [Cheng Hao] fix the deadlock issue in ScriptTransform
Diffstat (limited to 'sql')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala33
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala8
2 files changed, 29 insertions, 12 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
index cab0fdd357..3eddda3b28 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
@@ -145,20 +145,29 @@ case class ScriptTransformation(
val dataOutputStream = new DataOutputStream(outputStream)
val outputProjection = new InterpretedProjection(input, child.output)
- iter
- .map(outputProjection)
- .foreach { row =>
- if (inputSerde == null) {
- val data = row.mkString("", ioschema.inputRowFormatMap("TOK_TABLEROWFORMATFIELD"),
- ioschema.inputRowFormatMap("TOK_TABLEROWFORMATLINES")).getBytes("utf-8")
-
- outputStream.write(data)
- } else {
- val writable = inputSerde.serialize(row.asInstanceOf[GenericRow].values, inputSoi)
- prepareWritable(writable).write(dataOutputStream)
+ // Put the write(output to the pipeline) into a single thread
+ // and keep the collector as remain in the main thread.
+ // otherwise it will causes deadlock if the data size greater than
+ // the pipeline / buffer capacity.
+ new Thread(new Runnable() {
+ override def run(): Unit = {
+ iter
+ .map(outputProjection)
+ .foreach { row =>
+ if (inputSerde == null) {
+ val data = row.mkString("", ioschema.inputRowFormatMap("TOK_TABLEROWFORMATFIELD"),
+ ioschema.inputRowFormatMap("TOK_TABLEROWFORMATLINES")).getBytes("utf-8")
+
+ outputStream.write(data)
+ } else {
+ val writable = inputSerde.serialize(row.asInstanceOf[GenericRow].values, inputSoi)
+ prepareWritable(writable).write(dataOutputStream)
+ }
}
+ outputStream.close()
}
- outputStream.close()
+ }).start()
+
iterator
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 47b4cb9ca6..4f8d0ac0e7 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -561,4 +561,12 @@ class SQLQuerySuite extends QueryTest {
sql("select d from dn union all select d * 2 from dn")
.queryExecution.analyzed
}
+
+ test("test script transform") {
+ val data = (1 to 100000).map { i => (i, i, i) }
+ data.toDF("d1", "d2", "d3").registerTempTable("script_trans")
+ assert(100000 ===
+ sql("SELECT TRANSFORM (d1, d2, d3) USING 'cat' AS (a,b,c) FROM script_trans")
+ .queryExecution.toRdd.count())
+ }
}