aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala35
1 files changed, 15 insertions, 20 deletions
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
index 8a72e9d2ae..e8ffbc5b95 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
@@ -18,19 +18,17 @@
package org.apache.spark.sql.hive.thriftserver
+import java.io._
+
import scala.collection.mutable.ArrayBuffer
-import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
-import scala.concurrent.{Await, Future, Promise}
+import scala.concurrent.{Await, Promise}
import scala.sys.process.{Process, ProcessLogger}
-import java.io._
-import java.util.concurrent.atomic.AtomicInteger
-
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.scalatest.{BeforeAndAfterAll, FunSuite}
-import org.apache.spark.{SparkException, Logging}
+import org.apache.spark.Logging
import org.apache.spark.sql.catalyst.util.getTempFilePath
class CliSuite extends FunSuite with BeforeAndAfterAll with Logging {
@@ -53,23 +51,20 @@ class CliSuite extends FunSuite with BeforeAndAfterAll with Logging {
""".stripMargin.split("\\s+").toSeq ++ extraArgs
}
- // AtomicInteger is needed because stderr and stdout of the forked process are handled in
- // different threads.
- val next = new AtomicInteger(0)
+ var next = 0
val foundAllExpectedAnswers = Promise.apply[Unit]()
val queryStream = new ByteArrayInputStream(queries.mkString("\n").getBytes)
val buffer = new ArrayBuffer[String]()
+ val lock = new Object
- def captureOutput(source: String)(line: String) {
+ def captureOutput(source: String)(line: String): Unit = lock.synchronized {
buffer += s"$source> $line"
- // If we haven't found all expected answers...
- if (next.get() < expectedAnswers.size) {
- // If another expected answer is found...
- if (line.startsWith(expectedAnswers(next.get()))) {
- // If all expected answers have been found...
- if (next.incrementAndGet() == expectedAnswers.size) {
- foundAllExpectedAnswers.trySuccess(())
- }
+ // If we haven't found all expected answers and another expected answer comes up...
+ if (next < expectedAnswers.size && line.startsWith(expectedAnswers(next))) {
+ next += 1
+ // If all expected answers have been found...
+ if (next == expectedAnswers.size) {
+ foundAllExpectedAnswers.trySuccess(())
}
}
}
@@ -88,8 +83,8 @@ class CliSuite extends FunSuite with BeforeAndAfterAll with Logging {
|=======================
|Spark SQL CLI command line: ${command.mkString(" ")}
|
- |Executed query ${next.get()} "${queries(next.get())}",
- |But failed to capture expected output "${expectedAnswers(next.get())}" within $timeout.
+ |Executed query $next "${queries(next)}",
+ |But failed to capture expected output "${expectedAnswers(next)}" within $timeout.
|
|${buffer.mkString("\n")}
|===========================