aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorCheng Lian <lian.cs.zju@gmail.com>2014-10-13 13:50:27 -0700
committerMichael Armbrust <michael@databricks.com>2014-10-13 13:50:27 -0700
commit9eb49d4134e23a15142fb592d54d920e89bd8786 (patch)
treecdbeae8115304340b3edb55cdd6eb8078b4054ca /sql
parent9d9ca91fef70eca6fc576be9c99aed5d8ce6e68b (diff)
downloadspark-9eb49d4134e23a15142fb592d54d920e89bd8786.tar.gz
spark-9eb49d4134e23a15142fb592d54d920e89bd8786.tar.bz2
spark-9eb49d4134e23a15142fb592d54d920e89bd8786.zip
[SPARK-3809][SQL] Fixes test suites in hive-thriftserver
As scwf pointed out, `HiveThriftServer2Suite` isn't effective anymore after the Thrift server was made a daemon. On the other hand, these test suites were known flaky, PR #2214 tried to fix them but failed because of unknown Jenkins build error. This PR fixes both sets of issues. In this PR, instead of watching `start-thriftserver.sh` output, the test code start a `tail` process to watch the log file. A `Thread.sleep` has to be introduced because the `kill` command used in `stop-thriftserver.sh` is not synchronous. As for the root cause of the mysterious Jenkins build failure. Please refer to [this comment](https://github.com/apache/spark/pull/2675#issuecomment-58464189) below for details. ---- (Copied from PR description of #2214) This PR fixes two issues of `HiveThriftServer2Suite` and brings 1 enhancement: 1. Although metastore, warehouse directories and listening port are randomly chosen, all test cases share the same configuration. Due to parallel test execution, one of the two test case is doomed to fail 2. We caught any exceptions thrown from a test case and print diagnosis information, but forgot to re-throw the exception... 3. When the forked server process ends prematurely (e.g., fails to start), the `serverRunning` promise is completed with a failure, preventing the test code to keep waiting until timeout. So, embarrassingly, this test suite was failing continuously for several days but no one had ever noticed it... Fortunately no bugs in the production code were covered under the hood. Author: Cheng Lian <lian.cs.zju@gmail.com> Author: wangfei <wangfei1@huawei.com> Closes #2675 from liancheng/fix-thriftserver-tests and squashes the following commits: 1c384b7 [Cheng Lian] Minor code cleanup, restore the logging level hack in TestHive.scala 7805c33 [wangfei] reset SPARK_TESTING to avoid loading Log4J configurations in testing class paths af2b5a9 [Cheng Lian] Removes log level hacks from TestHiveContext d116405 [wangfei] make sure that log4j level is INFO ee92a82 [Cheng Lian] Relaxes timeout 7fd6757 [Cheng Lian] Fixes test suites in hive-thriftserver
Diffstat (limited to 'sql')
-rw-r--r--sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala13
-rw-r--r--sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala86
2 files changed, 60 insertions, 39 deletions
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
index d68dd090b5..fc97a25be3 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
@@ -30,7 +30,7 @@ import java.util.concurrent.atomic.AtomicInteger
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.scalatest.{BeforeAndAfterAll, FunSuite}
-import org.apache.spark.Logging
+import org.apache.spark.{SparkException, Logging}
import org.apache.spark.sql.catalyst.util.getTempFilePath
class CliSuite extends FunSuite with BeforeAndAfterAll with Logging {
@@ -62,8 +62,11 @@ class CliSuite extends FunSuite with BeforeAndAfterAll with Logging {
def captureOutput(source: String)(line: String) {
buffer += s"$source> $line"
+ // If we haven't found all expected answers...
if (next.get() < expectedAnswers.size) {
+ // If another expected answer is found...
if (line.startsWith(expectedAnswers(next.get()))) {
+ // If all expected answers have been found...
if (next.incrementAndGet() == expectedAnswers.size) {
foundAllExpectedAnswers.trySuccess(())
}
@@ -77,7 +80,8 @@ class CliSuite extends FunSuite with BeforeAndAfterAll with Logging {
Future {
val exitValue = process.exitValue()
- logInfo(s"Spark SQL CLI process exit value: $exitValue")
+ foundAllExpectedAnswers.tryFailure(
+ new SparkException(s"Spark SQL CLI process exit value: $exitValue"))
}
try {
@@ -98,6 +102,7 @@ class CliSuite extends FunSuite with BeforeAndAfterAll with Logging {
|End CliSuite failure output
|===========================
""".stripMargin, cause)
+ throw cause
} finally {
warehousePath.delete()
metastorePath.delete()
@@ -109,7 +114,7 @@ class CliSuite extends FunSuite with BeforeAndAfterAll with Logging {
val dataFilePath =
Thread.currentThread().getContextClassLoader.getResource("data/files/small_kv.txt")
- runCliWithin(1.minute)(
+ runCliWithin(3.minute)(
"CREATE TABLE hive_test(key INT, val STRING);"
-> "OK",
"SHOW TABLES;"
@@ -120,7 +125,7 @@ class CliSuite extends FunSuite with BeforeAndAfterAll with Logging {
-> "Time taken: ",
"SELECT COUNT(*) FROM hive_test;"
-> "5",
- "DROP TABLE hive_test"
+ "DROP TABLE hive_test;"
-> "Time taken: "
)
}
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala
index 38977ff162..e3b4e45a3d 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suite.scala
@@ -17,17 +17,17 @@
package org.apache.spark.sql.hive.thriftserver
-import scala.collection.mutable.ArrayBuffer
-import scala.concurrent.ExecutionContext.Implicits.global
-import scala.concurrent.duration._
-import scala.concurrent.{Await, Future, Promise}
-import scala.sys.process.{Process, ProcessLogger}
-
import java.io.File
import java.net.ServerSocket
import java.sql.{DriverManager, Statement}
import java.util.concurrent.TimeoutException
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.duration._
+import scala.concurrent.{Await, Promise}
+import scala.sys.process.{Process, ProcessLogger}
+import scala.util.Try
+
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hive.jdbc.HiveDriver
import org.scalatest.FunSuite
@@ -41,25 +41,25 @@ import org.apache.spark.sql.catalyst.util.getTempFilePath
class HiveThriftServer2Suite extends FunSuite with Logging {
Class.forName(classOf[HiveDriver].getCanonicalName)
- private val listeningHost = "localhost"
- private val listeningPort = {
- // Let the system to choose a random available port to avoid collision with other parallel
- // builds.
- val socket = new ServerSocket(0)
- val port = socket.getLocalPort
- socket.close()
- port
- }
-
- private val warehousePath = getTempFilePath("warehouse")
- private val metastorePath = getTempFilePath("metastore")
- private val metastoreJdbcUri = s"jdbc:derby:;databaseName=$metastorePath;create=true"
-
- def startThriftServerWithin(timeout: FiniteDuration = 30.seconds)(f: Statement => Unit) {
- val serverScript = "../../sbin/start-thriftserver.sh".split("/").mkString(File.separator)
+ def startThriftServerWithin(timeout: FiniteDuration = 1.minute)(f: Statement => Unit) {
+ val startScript = "../../sbin/start-thriftserver.sh".split("/").mkString(File.separator)
+ val stopScript = "../../sbin/stop-thriftserver.sh".split("/").mkString(File.separator)
+
+ val warehousePath = getTempFilePath("warehouse")
+ val metastorePath = getTempFilePath("metastore")
+ val metastoreJdbcUri = s"jdbc:derby:;databaseName=$metastorePath;create=true"
+ val listeningHost = "localhost"
+ val listeningPort = {
+ // Let the system to choose a random available port to avoid collision with other parallel
+ // builds.
+ val socket = new ServerSocket(0)
+ val port = socket.getLocalPort
+ socket.close()
+ port
+ }
val command =
- s"""$serverScript
+ s"""$startScript
| --master local
| --hiveconf hive.root.logger=INFO,console
| --hiveconf ${ConfVars.METASTORECONNECTURLKEY}=$metastoreJdbcUri
@@ -68,29 +68,40 @@ class HiveThriftServer2Suite extends FunSuite with Logging {
| --hiveconf ${ConfVars.HIVE_SERVER2_THRIFT_PORT}=$listeningPort
""".stripMargin.split("\\s+").toSeq
- val serverStarted = Promise[Unit]()
+ val serverRunning = Promise[Unit]()
val buffer = new ArrayBuffer[String]()
+ val LOGGING_MARK =
+ s"starting ${HiveThriftServer2.getClass.getCanonicalName.stripSuffix("$")}, logging to "
+ var logTailingProcess: Process = null
+ var logFilePath: String = null
- def captureOutput(source: String)(line: String) {
- buffer += s"$source> $line"
+ def captureLogOutput(line: String): Unit = {
+ buffer += line
if (line.contains("ThriftBinaryCLIService listening on")) {
- serverStarted.success(())
+ serverRunning.success(())
}
}
- val process = Process(command).run(
- ProcessLogger(captureOutput("stdout"), captureOutput("stderr")))
-
- Future {
- val exitValue = process.exitValue()
- logInfo(s"Spark SQL Thrift server process exit value: $exitValue")
+ def captureThriftServerOutput(source: String)(line: String): Unit = {
+ if (line.startsWith(LOGGING_MARK)) {
+ logFilePath = line.drop(LOGGING_MARK.length).trim
+ // Ensure that the log file is created so that the `tail' command won't fail
+ Try(new File(logFilePath).createNewFile())
+ logTailingProcess = Process(s"/usr/bin/env tail -f $logFilePath")
+ .run(ProcessLogger(captureLogOutput, _ => ()))
+ }
}
+ // Resets SPARK_TESTING to avoid loading Log4J configurations in testing class paths
+ Process(command, None, "SPARK_TESTING" -> "0").run(ProcessLogger(
+ captureThriftServerOutput("stdout"),
+ captureThriftServerOutput("stderr")))
+
val jdbcUri = s"jdbc:hive2://$listeningHost:$listeningPort/"
val user = System.getProperty("user.name")
try {
- Await.result(serverStarted.future, timeout)
+ Await.result(serverRunning.future, timeout)
val connection = DriverManager.getConnection(jdbcUri, user, "")
val statement = connection.createStatement()
@@ -122,10 +133,15 @@ class HiveThriftServer2Suite extends FunSuite with Logging {
|End HiveThriftServer2Suite failure output
|=========================================
""".stripMargin, cause)
+ throw cause
} finally {
warehousePath.delete()
metastorePath.delete()
- process.destroy()
+ Process(stopScript).run().exitValue()
+ // The `spark-daemon.sh' script uses kill, which is not synchronous, have to wait for a while.
+ Thread.sleep(3.seconds.toMillis)
+ Option(logTailingProcess).map(_.destroy())
+ Option(logFilePath).map(new File(_).delete())
}
}