diff options
author | Cheng Hao <hao.cheng@intel.com> | 2015-06-29 12:46:33 -0700 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2015-06-29 12:46:33 -0700 |
commit | c6ba2ea341ad23de265d870669b25e6a41f461e5 (patch) | |
tree | 4ca43bb0a07ea2cb81eba05acca728761dbf040f /core/src | |
parent | 637b4eedad84dcff1769454137a64ac70c7f2397 (diff) | |
download | spark-c6ba2ea341ad23de265d870669b25e6a41f461e5.tar.gz spark-c6ba2ea341ad23de265d870669b25e6a41f461e5.tar.bz2 spark-c6ba2ea341ad23de265d870669b25e6a41f461e5.zip |
[SPARK-7862] [SQL] Disable the error message redirect to stderr
This is a follow up of #6404, the ScriptTransformation prints the error msg into stderr directly, probably be a disaster for application log.
Author: Cheng Hao <hao.cheng@intel.com>
Closes #6882 from chenghao-intel/verbose and squashes the following commits:
bfedd77 [Cheng Hao] revert the write
76ff46b [Cheng Hao] update the CircularBuffer
692b19e [Cheng Hao] check the process exitValue for ScriptTransform
47e0970 [Cheng Hao] Use the RedirectThread instead
1de771d [Cheng Hao] naming the threads in ScriptTransformation
8536e81 [Cheng Hao] disable the error message redirection for stderr
Diffstat (limited to 'core/src')
-rw-r--r-- | core/src/main/scala/org/apache/spark/util/Utils.scala | 33 | ||||
-rw-r--r-- | core/src/test/scala/org/apache/spark/util/UtilsSuite.scala | 8 |
2 files changed, 41 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 19157af5b6..a7fc749a2b 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -2333,3 +2333,36 @@ private[spark] class RedirectThread( } } } + +/** + * An [[OutputStream]] that will store the last 10 kilobytes (by default) written to it + * in a circular buffer. The current contents of the buffer can be accessed using + * the toString method. + */ +private[spark] class CircularBuffer(sizeInBytes: Int = 10240) extends java.io.OutputStream { + var pos: Int = 0 + var buffer = new Array[Int](sizeInBytes) + + def write(i: Int): Unit = { + buffer(pos) = i + pos = (pos + 1) % buffer.length + } + + override def toString: String = { + val (end, start) = buffer.splitAt(pos) + val input = new java.io.InputStream { + val iterator = (start ++ end).iterator + + def read(): Int = if (iterator.hasNext) iterator.next() else -1 + } + val reader = new BufferedReader(new InputStreamReader(input)) + val stringBuilder = new StringBuilder + var line = reader.readLine() + while (line != null) { + stringBuilder.append(line) + stringBuilder.append("\n") + line = reader.readLine() + } + stringBuilder.toString() + } +} diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index a61ea3918f..baa4c661cc 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -673,4 +673,12 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging { assert(!Utils.isInDirectory(nullFile, parentDir)) assert(!Utils.isInDirectory(nullFile, childFile3)) } + + test("circular buffer") { + val buffer = new CircularBuffer(25) + val stream = new java.io.PrintStream(buffer, true, "UTF-8") + + stream.println("test circular test circular test circular test circular test circular") + assert(buffer.toString === "t circular test circular\n") + } } |