diff options
author | Cheng Hao <hao.cheng@intel.com> | 2015-04-23 10:35:22 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2015-04-23 10:35:22 -0700 |
commit | cc48e6387abdd909921cb58e0588cdf226556bcd (patch) | |
tree | d3c36de282dccd523ee7bba07e12873a6dab117f /sql/hive/src/main | |
parent | 975f53e4f978759db7639cd08498ad8cd0ae2a56 (diff) | |
download | spark-cc48e6387abdd909921cb58e0588cdf226556bcd.tar.gz spark-cc48e6387abdd909921cb58e0588cdf226556bcd.tar.bz2 spark-cc48e6387abdd909921cb58e0588cdf226556bcd.zip |
[SPARK-7044] [SQL] Fix the deadlock in script transformation
Author: Cheng Hao <hao.cheng@intel.com>
Closes #5625 from chenghao-intel/transform and squashes the following commits:
5ec1dd2 [Cheng Hao] fix the deadlock issue in ScriptTransform
Diffstat (limited to 'sql/hive/src/main')
-rw-r--r-- | sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala | 33 |
1 files changed, 21 insertions, 12 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala index cab0fdd357..3eddda3b28 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala @@ -145,20 +145,29 @@ case class ScriptTransformation( val dataOutputStream = new DataOutputStream(outputStream) val outputProjection = new InterpretedProjection(input, child.output) - iter - .map(outputProjection) - .foreach { row => - if (inputSerde == null) { - val data = row.mkString("", ioschema.inputRowFormatMap("TOK_TABLEROWFORMATFIELD"), - ioschema.inputRowFormatMap("TOK_TABLEROWFORMATLINES")).getBytes("utf-8") - - outputStream.write(data) - } else { - val writable = inputSerde.serialize(row.asInstanceOf[GenericRow].values, inputSoi) - prepareWritable(writable).write(dataOutputStream) + // Put the write(output to the pipeline) into a single thread + // and keep the collector as remain in the main thread. + // otherwise it will causes deadlock if the data size greater than + // the pipeline / buffer capacity. + new Thread(new Runnable() { + override def run(): Unit = { + iter + .map(outputProjection) + .foreach { row => + if (inputSerde == null) { + val data = row.mkString("", ioschema.inputRowFormatMap("TOK_TABLEROWFORMATFIELD"), + ioschema.inputRowFormatMap("TOK_TABLEROWFORMATLINES")).getBytes("utf-8") + + outputStream.write(data) + } else { + val writable = inputSerde.serialize(row.asInstanceOf[GenericRow].values, inputSoi) + prepareWritable(writable).write(dataOutputStream) + } } + outputStream.close() } - outputStream.close() + }).start() + iterator } } |