aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorCheng Hao <hao.cheng@intel.com>2015-06-29 12:46:33 -0700
committerMichael Armbrust <michael@databricks.com>2015-06-29 12:46:33 -0700
commitc6ba2ea341ad23de265d870669b25e6a41f461e5 (patch)
tree4ca43bb0a07ea2cb81eba05acca728761dbf040f /sql
parent637b4eedad84dcff1769454137a64ac70c7f2397 (diff)
downloadspark-c6ba2ea341ad23de265d870669b25e6a41f461e5.tar.gz
spark-c6ba2ea341ad23de265d870669b25e6a41f461e5.tar.bz2
spark-c6ba2ea341ad23de265d870669b25e6a41f461e5.zip
[SPARK-7862] [SQL] Disable the error message redirect to stderr
This is a follow up of #6404, the ScriptTransformation prints the error msg into stderr directly, probably be a disaster for application log. Author: Cheng Hao <hao.cheng@intel.com> Closes #6882 from chenghao-intel/verbose and squashes the following commits: bfedd77 [Cheng Hao] revert the write 76ff46b [Cheng Hao] update the CircularBuffer 692b19e [Cheng Hao] check the process exitValue for ScriptTransform 47e0970 [Cheng Hao] Use the RedirectThread instead 1de771d [Cheng Hao] naming the threads in ScriptTransformation 8536e81 [Cheng Hao] disable the error message redirection for stderr
Diffstat (limited to 'sql')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala29
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala51
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala2
3 files changed, 36 insertions, 46 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
index 4c708cec57..cbd2bf6b5e 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
@@ -22,6 +22,8 @@ import java.net.URI
import java.util.{ArrayList => JArrayList, Map => JMap, List => JList, Set => JSet}
import javax.annotation.concurrent.GuardedBy
+import org.apache.spark.util.CircularBuffer
+
import scala.collection.JavaConversions._
import scala.language.reflectiveCalls
@@ -66,32 +68,7 @@ private[hive] class ClientWrapper(
with Logging {
// Circular buffer to hold what hive prints to STDOUT and ERR. Only printed when failures occur.
- private val outputBuffer = new java.io.OutputStream {
- var pos: Int = 0
- var buffer = new Array[Int](10240)
- def write(i: Int): Unit = {
- buffer(pos) = i
- pos = (pos + 1) % buffer.size
- }
-
- override def toString: String = {
- val (end, start) = buffer.splitAt(pos)
- val input = new java.io.InputStream {
- val iterator = (start ++ end).iterator
-
- def read(): Int = if (iterator.hasNext) iterator.next() else -1
- }
- val reader = new BufferedReader(new InputStreamReader(input))
- val stringBuilder = new StringBuilder
- var line = reader.readLine()
- while(line != null) {
- stringBuilder.append(line)
- stringBuilder.append("\n")
- line = reader.readLine()
- }
- stringBuilder.toString()
- }
- }
+ private val outputBuffer = new CircularBuffer()
private val shim = version match {
case hive.v12 => new Shim_v0_12()
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
index 611888055d..b967e191c5 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
@@ -35,7 +35,7 @@ import org.apache.spark.sql.execution._
import org.apache.spark.sql.hive.HiveShim._
import org.apache.spark.sql.hive.{HiveContext, HiveInspectors}
import org.apache.spark.sql.types.DataType
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{CircularBuffer, RedirectThread, Utils}
/**
* Transforms the input by forking and running the specified script.
@@ -59,15 +59,13 @@ case class ScriptTransformation(
child.execute().mapPartitions { iter =>
val cmd = List("/bin/bash", "-c", script)
val builder = new ProcessBuilder(cmd)
- // redirectError(Redirect.INHERIT) would consume the error output from buffer and
- // then print it to stderr (inherit the target from the current Scala process).
- // If without this there would be 2 issues:
+ // We need to start threads connected to the process pipeline:
// 1) The error msg generated by the script process would be hidden.
// 2) If the error msg is too big to chock up the buffer, the input logic would be hung
- builder.redirectError(Redirect.INHERIT)
val proc = builder.start()
val inputStream = proc.getInputStream
val outputStream = proc.getOutputStream
+ val errorStream = proc.getErrorStream
val reader = new BufferedReader(new InputStreamReader(inputStream))
val (outputSerde, outputSoi) = ioschema.initOutputSerDe(output)
@@ -152,29 +150,43 @@ case class ScriptTransformation(
val dataOutputStream = new DataOutputStream(outputStream)
val outputProjection = new InterpretedProjection(input, child.output)
+ // TODO make the 2048 configurable?
+ val stderrBuffer = new CircularBuffer(2048)
+ // Consume the error stream from the pipeline, otherwise it will be blocked if
+ // the pipeline is full.
+ new RedirectThread(errorStream, // input stream from the pipeline
+ stderrBuffer, // output to a circular buffer
+ "Thread-ScriptTransformation-STDERR-Consumer").start()
+
// Put the write(output to the pipeline) into a single thread
// and keep the collector as remain in the main thread.
// otherwise it will causes deadlock if the data size greater than
// the pipeline / buffer capacity.
new Thread(new Runnable() {
override def run(): Unit = {
- iter
- .map(outputProjection)
- .foreach { row =>
- if (inputSerde == null) {
- val data = row.mkString("", ioschema.inputRowFormatMap("TOK_TABLEROWFORMATFIELD"),
- ioschema.inputRowFormatMap("TOK_TABLEROWFORMATLINES")).getBytes("utf-8")
-
- outputStream.write(data)
- } else {
- val writable = inputSerde.serialize(
- row.asInstanceOf[GenericInternalRow].values, inputSoi)
- prepareWritable(writable).write(dataOutputStream)
+ Utils.tryWithSafeFinally {
+ iter
+ .map(outputProjection)
+ .foreach { row =>
+ if (inputSerde == null) {
+ val data = row.mkString("", ioschema.inputRowFormatMap("TOK_TABLEROWFORMATFIELD"),
+ ioschema.inputRowFormatMap("TOK_TABLEROWFORMATLINES")).getBytes("utf-8")
+
+ outputStream.write(data)
+ } else {
+ val writable = inputSerde.serialize(
+ row.asInstanceOf[GenericInternalRow].values, inputSoi)
+ prepareWritable(writable).write(dataOutputStream)
+ }
+ }
+ outputStream.close()
+ } {
+ if (proc.waitFor() != 0) {
+ logError(stderrBuffer.toString) // log the stderr circular buffer
}
}
- outputStream.close()
}
- }).start()
+ }, "Thread-ScriptTransformation-Feed").start()
iterator
}
@@ -278,3 +290,4 @@ case class HiveScriptIOSchema (
}
}
}
+
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index f0aad8dbbe..9f7e58f890 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -653,7 +653,7 @@ class SQLQuerySuite extends QueryTest {
.queryExecution.toRdd.count())
}
- ignore("test script transform for stderr") {
+ test("test script transform for stderr") {
val data = (1 to 100000).map { i => (i, i, i) }
data.toDF("d1", "d2", "d3").registerTempTable("script_trans")
assert(0 ===