aboutsummaryrefslogtreecommitdiff
path: root/sql/hive-thriftserver
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2014-11-02 15:15:52 -0800
committerMichael Armbrust <michael@databricks.com>2014-11-02 15:15:52 -0800
commit495a132031ae002c787371f2fd0ba4be2437e7c8 (patch)
treeb4afbcdb461f7153e38569c8ebd2d608e448fca3 /sql/hive-thriftserver
parente4b80894bdb72c0acf8832fd48421c546fbc37e6 (diff)
downloadspark-495a132031ae002c787371f2fd0ba4be2437e7c8.tar.gz
spark-495a132031ae002c787371f2fd0ba4be2437e7c8.tar.bz2
spark-495a132031ae002c787371f2fd0ba4be2437e7c8.zip
[SQL] Fixes race condition in CliSuite
`CliSuite` has been flaky for a while, this PR tries to improve this situation by fixing a race condition in `CliSuite`. The `captureOutput` function is used to capture both stdout and stderr output of the forked external process in two background threads and search for expected strings, but wasn't been properly synchronized before. Author: Cheng Lian <lian@databricks.com> Closes #3060 from liancheng/fix-cli-suite and squashes the following commits: a70569c [Cheng Lian] Fixes race condition in CliSuite
Diffstat (limited to 'sql/hive-thriftserver')
-rw-r--r--sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala35
1 files changed, 15 insertions, 20 deletions
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
index 8a72e9d2ae..e8ffbc5b95 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
@@ -18,19 +18,17 @@
package org.apache.spark.sql.hive.thriftserver
+import java.io._
+
import scala.collection.mutable.ArrayBuffer
-import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
-import scala.concurrent.{Await, Future, Promise}
+import scala.concurrent.{Await, Promise}
import scala.sys.process.{Process, ProcessLogger}
-import java.io._
-import java.util.concurrent.atomic.AtomicInteger
-
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.scalatest.{BeforeAndAfterAll, FunSuite}
-import org.apache.spark.{SparkException, Logging}
+import org.apache.spark.Logging
import org.apache.spark.sql.catalyst.util.getTempFilePath
class CliSuite extends FunSuite with BeforeAndAfterAll with Logging {
@@ -53,23 +51,20 @@ class CliSuite extends FunSuite with BeforeAndAfterAll with Logging {
""".stripMargin.split("\\s+").toSeq ++ extraArgs
}
- // AtomicInteger is needed because stderr and stdout of the forked process are handled in
- // different threads.
- val next = new AtomicInteger(0)
+ var next = 0
val foundAllExpectedAnswers = Promise.apply[Unit]()
val queryStream = new ByteArrayInputStream(queries.mkString("\n").getBytes)
val buffer = new ArrayBuffer[String]()
+ val lock = new Object
- def captureOutput(source: String)(line: String) {
+ def captureOutput(source: String)(line: String): Unit = lock.synchronized {
buffer += s"$source> $line"
- // If we haven't found all expected answers...
- if (next.get() < expectedAnswers.size) {
- // If another expected answer is found...
- if (line.startsWith(expectedAnswers(next.get()))) {
- // If all expected answers have been found...
- if (next.incrementAndGet() == expectedAnswers.size) {
- foundAllExpectedAnswers.trySuccess(())
- }
+ // If we haven't found all expected answers and another expected answer comes up...
+ if (next < expectedAnswers.size && line.startsWith(expectedAnswers(next))) {
+ next += 1
+ // If all expected answers have been found...
+ if (next == expectedAnswers.size) {
+ foundAllExpectedAnswers.trySuccess(())
}
}
}
@@ -88,8 +83,8 @@ class CliSuite extends FunSuite with BeforeAndAfterAll with Logging {
|=======================
|Spark SQL CLI command line: ${command.mkString(" ")}
|
- |Executed query ${next.get()} "${queries(next.get())}",
- |But failed to capture expected output "${expectedAnswers(next.get())}" within $timeout.
+ |Executed query $next "${queries(next)}",
+ |But failed to capture expected output "${expectedAnswers(next)}" within $timeout.
|
|${buffer.mkString("\n")}
|===========================