aboutsummaryrefslogtreecommitdiff
path: root/sql/hive/src/main
diff options
context:
space:
mode:
authorCheng Hao <hao.cheng@intel.com>2015-04-23 10:35:22 -0700
committerReynold Xin <rxin@databricks.com>2015-04-23 10:35:22 -0700
commitcc48e6387abdd909921cb58e0588cdf226556bcd (patch)
treed3c36de282dccd523ee7bba07e12873a6dab117f /sql/hive/src/main
parent975f53e4f978759db7639cd08498ad8cd0ae2a56 (diff)
downloadspark-cc48e6387abdd909921cb58e0588cdf226556bcd.tar.gz
spark-cc48e6387abdd909921cb58e0588cdf226556bcd.tar.bz2
spark-cc48e6387abdd909921cb58e0588cdf226556bcd.zip
[SPARK-7044] [SQL] Fix the deadlock in script transformation
Author: Cheng Hao <hao.cheng@intel.com> Closes #5625 from chenghao-intel/transform and squashes the following commits: 5ec1dd2 [Cheng Hao] fix the deadlock issue in ScriptTransform
Diffstat (limited to 'sql/hive/src/main')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala33
1 files changed, 21 insertions, 12 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
index cab0fdd357..3eddda3b28 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
@@ -145,20 +145,29 @@ case class ScriptTransformation(
val dataOutputStream = new DataOutputStream(outputStream)
val outputProjection = new InterpretedProjection(input, child.output)
- iter
- .map(outputProjection)
- .foreach { row =>
- if (inputSerde == null) {
- val data = row.mkString("", ioschema.inputRowFormatMap("TOK_TABLEROWFORMATFIELD"),
- ioschema.inputRowFormatMap("TOK_TABLEROWFORMATLINES")).getBytes("utf-8")
-
- outputStream.write(data)
- } else {
- val writable = inputSerde.serialize(row.asInstanceOf[GenericRow].values, inputSoi)
- prepareWritable(writable).write(dataOutputStream)
+ // Put the write(output to the pipeline) into a single thread
+ // and keep the collector as remain in the main thread.
+ // otherwise it will causes deadlock if the data size greater than
+ // the pipeline / buffer capacity.
+ new Thread(new Runnable() {
+ override def run(): Unit = {
+ iter
+ .map(outputProjection)
+ .foreach { row =>
+ if (inputSerde == null) {
+ val data = row.mkString("", ioschema.inputRowFormatMap("TOK_TABLEROWFORMATFIELD"),
+ ioschema.inputRowFormatMap("TOK_TABLEROWFORMATLINES")).getBytes("utf-8")
+
+ outputStream.write(data)
+ } else {
+ val writable = inputSerde.serialize(row.asInstanceOf[GenericRow].values, inputSoi)
+ prepareWritable(writable).write(dataOutputStream)
+ }
}
+ outputStream.close()
}
- outputStream.close()
+ }).start()
+
iterator
}
}