aboutsummaryrefslogtreecommitdiff
path: root/sql/hive-thriftserver
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2015-08-19 11:21:46 +0800
committerCheng Lian <lian@databricks.com>2015-08-19 11:21:46 +0800
commita5b5b936596ceb45f5f5b68bf1d6368534fb9470 (patch)
tree714e05b09611e74b87d73bd2f32b2cc648702944 /sql/hive-thriftserver
parent90273eff9604439a5a5853077e232d34555c67d7 (diff)
downloadspark-a5b5b936596ceb45f5f5b68bf1d6368534fb9470.tar.gz
spark-a5b5b936596ceb45f5f5b68bf1d6368534fb9470.tar.bz2
spark-a5b5b936596ceb45f5f5b68bf1d6368534fb9470.zip
[SPARK-9939] [SQL] Resorts to Java process API in CliSuite, HiveSparkSubmitSuite and HiveThriftServer2 test suites
Scala process API has a known bug ([SI-8768] [1]), which may be the reason why several test suites which fork sub-processes are flaky. This PR replaces Scala process API with Java process API in `CliSuite`, `HiveSparkSubmitSuite`, and `HiveThriftServer2` related test suites to see whether it fix these flaky tests. [1]: https://issues.scala-lang.org/browse/SI-8768 Author: Cheng Lian <lian@databricks.com> Closes #8168 from liancheng/spark-9939/use-java-process-api.
Diffstat (limited to 'sql/hive-thriftserver')
-rw-r--r--sql/hive-thriftserver/pom.xml7
-rw-r--r--sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala64
-rw-r--r--sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala83
3 files changed, 77 insertions, 77 deletions
diff --git a/sql/hive-thriftserver/pom.xml b/sql/hive-thriftserver/pom.xml
index 2dfbcb2425..3566c87dd2 100644
--- a/sql/hive-thriftserver/pom.xml
+++ b/sql/hive-thriftserver/pom.xml
@@ -86,6 +86,13 @@
<artifactId>selenium-java</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-sql_${scala.binary.version}</artifactId>
+ <type>test-jar</type>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
index 121b3e077f..e59a14ec00 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
@@ -18,18 +18,19 @@
package org.apache.spark.sql.hive.thriftserver
import java.io._
+import java.sql.Timestamp
+import java.util.Date
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration._
import scala.concurrent.{Await, Promise}
-import scala.sys.process.{Process, ProcessLogger}
-import scala.util.Failure
+import org.apache.spark.sql.test.ProcessTestUtils.ProcessOutputCapturer
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.scalatest.BeforeAndAfter
-import org.apache.spark.{Logging, SparkFunSuite}
import org.apache.spark.util.Utils
+import org.apache.spark.{Logging, SparkFunSuite}
/**
* A test suite for the `spark-sql` CLI tool. Note that all test cases share the same temporary
@@ -70,6 +71,9 @@ class CliSuite extends SparkFunSuite with BeforeAndAfter with Logging {
queriesAndExpectedAnswers: (String, String)*): Unit = {
val (queries, expectedAnswers) = queriesAndExpectedAnswers.unzip
+ // Explicitly adds ENTER for each statement to make sure they are actually entered into the CLI.
+ val queriesString = queries.map(_ + "\n").mkString
+
val command = {
val cliScript = "../../bin/spark-sql".split("/").mkString(File.separator)
val jdbcUrl = s"jdbc:derby:;databaseName=$metastorePath;create=true"
@@ -83,13 +87,14 @@ class CliSuite extends SparkFunSuite with BeforeAndAfter with Logging {
var next = 0
val foundAllExpectedAnswers = Promise.apply[Unit]()
- // Explicitly adds ENTER for each statement to make sure they are actually entered into the CLI.
- val queryStream = new ByteArrayInputStream(queries.map(_ + "\n").mkString.getBytes)
val buffer = new ArrayBuffer[String]()
val lock = new Object
def captureOutput(source: String)(line: String): Unit = lock.synchronized {
- buffer += s"$source> $line"
+ // This test suite sometimes gets extremely slow out of unknown reason on Jenkins. Here we
+ // add a timestamp to provide more diagnosis information.
+ buffer += s"${new Timestamp(new Date().getTime)} - $source> $line"
+
// If we haven't found all expected answers and another expected answer comes up...
if (next < expectedAnswers.size && line.startsWith(expectedAnswers(next))) {
next += 1
@@ -98,48 +103,27 @@ class CliSuite extends SparkFunSuite with BeforeAndAfter with Logging {
foundAllExpectedAnswers.trySuccess(())
}
} else {
- errorResponses.foreach( r => {
+ errorResponses.foreach { r =>
if (line.startsWith(r)) {
foundAllExpectedAnswers.tryFailure(
new RuntimeException(s"Failed with error line '$line'"))
- }})
- }
- }
-
- // Searching expected output line from both stdout and stderr of the CLI process
- val process = (Process(command, None) #< queryStream).run(
- ProcessLogger(captureOutput("stdout"), captureOutput("stderr")))
-
- // catch the output value
- class exitCodeCatcher extends Runnable {
- var exitValue = 0
-
- override def run(): Unit = {
- try {
- exitValue = process.exitValue()
- } catch {
- case rte: RuntimeException =>
- // ignored as it will get triggered when the process gets destroyed
- logDebug("Ignoring exception while waiting for exit code", rte)
- }
- if (exitValue != 0) {
- // process exited: fail fast
- foundAllExpectedAnswers.tryFailure(
- new RuntimeException(s"Failed with exit code $exitValue"))
+ }
}
}
}
- // spin off the code catche thread. No attempt is made to kill this
- // as it will exit once the launched process terminates.
- val codeCatcherThread = new Thread(new exitCodeCatcher())
- codeCatcherThread.start()
+
+ val process = new ProcessBuilder(command: _*).start()
+
+ val stdinWriter = new OutputStreamWriter(process.getOutputStream)
+ stdinWriter.write(queriesString)
+ stdinWriter.flush()
+ stdinWriter.close()
+
+ new ProcessOutputCapturer(process.getInputStream, captureOutput("stdout")).start()
+ new ProcessOutputCapturer(process.getErrorStream, captureOutput("stderr")).start()
try {
- Await.ready(foundAllExpectedAnswers.future, timeout)
- foundAllExpectedAnswers.future.value match {
- case Some(Failure(t)) => throw t
- case _ =>
- }
+ Await.result(foundAllExpectedAnswers.future, timeout)
} catch { case cause: Throwable =>
val message =
s"""
diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
index 17e7044c46..ded42bca99 100644
--- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
+++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala
@@ -22,10 +22,9 @@ import java.net.URL
import java.sql.{Date, DriverManager, SQLException, Statement}
import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.concurrent.{Await, Promise, future}
-import scala.concurrent.ExecutionContext.Implicits.global
-import scala.sys.process.{Process, ProcessLogger}
import scala.util.{Random, Try}
import com.google.common.base.Charsets.UTF_8
@@ -38,11 +37,12 @@ import org.apache.hive.service.cli.thrift.TCLIService.Client
import org.apache.hive.service.cli.thrift.ThriftCLIServiceClient
import org.apache.thrift.protocol.TBinaryProtocol
import org.apache.thrift.transport.TSocket
-import org.scalatest.{Ignore, BeforeAndAfterAll}
+import org.scalatest.BeforeAndAfterAll
-import org.apache.spark.{Logging, SparkFunSuite}
import org.apache.spark.sql.hive.HiveContext
+import org.apache.spark.sql.test.ProcessTestUtils.ProcessOutputCapturer
import org.apache.spark.util.Utils
+import org.apache.spark.{Logging, SparkFunSuite}
object TestData {
def getTestDataFilePath(name: String): URL = {
@@ -53,7 +53,6 @@ object TestData {
val smallKvWithNull = getTestDataFilePath("small_kv_with_null.txt")
}
-@Ignore // SPARK-9606
class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
override def mode: ServerMode.Value = ServerMode.binary
@@ -380,7 +379,6 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest {
}
}
-@Ignore // SPARK-9606
class HiveThriftHttpServerSuite extends HiveThriftJdbcTest {
override def mode: ServerMode.Value = ServerMode.http
@@ -484,7 +482,7 @@ abstract class HiveThriftServer2Test extends SparkFunSuite with BeforeAndAfterAl
val tempLog4jConf = Utils.createTempDir().getCanonicalPath
Files.write(
- """log4j.rootCategory=INFO, console
+ """log4j.rootCategory=DEBUG, console
|log4j.appender.console=org.apache.log4j.ConsoleAppender
|log4j.appender.console.target=System.err
|log4j.appender.console.layout=org.apache.log4j.PatternLayout
@@ -493,7 +491,7 @@ abstract class HiveThriftServer2Test extends SparkFunSuite with BeforeAndAfterAl
new File(s"$tempLog4jConf/log4j.properties"),
UTF_8)
- tempLog4jConf // + File.pathSeparator + sys.props("java.class.path")
+ tempLog4jConf
}
s"""$startScript
@@ -521,7 +519,7 @@ abstract class HiveThriftServer2Test extends SparkFunSuite with BeforeAndAfterAl
*/
val THRIFT_HTTP_SERVICE_LIVE = "Started ThriftHttpCLIService in http"
- val SERVER_STARTUP_TIMEOUT = 1.minute
+ val SERVER_STARTUP_TIMEOUT = 3.minutes
private def startThriftServer(port: Int, attempt: Int) = {
warehousePath = Utils.createTempDir()
@@ -543,17 +541,22 @@ abstract class HiveThriftServer2Test extends SparkFunSuite with BeforeAndAfterAl
logInfo(s"Trying to start HiveThriftServer2: port=$port, mode=$mode, attempt=$attempt")
- val env = Seq(
- // Disables SPARK_TESTING to exclude log4j.properties in test directories.
- "SPARK_TESTING" -> "0",
- // Points SPARK_PID_DIR to SPARK_HOME, otherwise only 1 Thrift server instance can be started
- // at a time, which is not Jenkins friendly.
- "SPARK_PID_DIR" -> pidDir.getCanonicalPath)
-
- logPath = Process(command, None, env: _*).lines.collectFirst {
- case line if line.contains(LOG_FILE_MARK) => new File(line.drop(LOG_FILE_MARK.length))
- }.getOrElse {
- throw new RuntimeException("Failed to find HiveThriftServer2 log file.")
+ logPath = {
+ val lines = Utils.executeAndGetOutput(
+ command = command,
+ extraEnvironment = Map(
+ // Disables SPARK_TESTING to exclude log4j.properties in test directories.
+ "SPARK_TESTING" -> "0",
+ // Points SPARK_PID_DIR to SPARK_HOME, otherwise only 1 Thrift server instance can be
+ // started at a time, which is not Jenkins friendly.
+ "SPARK_PID_DIR" -> pidDir.getCanonicalPath),
+ redirectStderr = true)
+
+ lines.split("\n").collectFirst {
+ case line if line.contains(LOG_FILE_MARK) => new File(line.drop(LOG_FILE_MARK.length))
+ }.getOrElse {
+ throw new RuntimeException("Failed to find HiveThriftServer2 log file.")
+ }
}
val serverStarted = Promise[Unit]()
@@ -561,30 +564,36 @@ abstract class HiveThriftServer2Test extends SparkFunSuite with BeforeAndAfterAl
// Ensures that the following "tail" command won't fail.
logPath.createNewFile()
val successLines = Seq(THRIFT_BINARY_SERVICE_LIVE, THRIFT_HTTP_SERVICE_LIVE)
- val failureLines = Seq("HiveServer2 is stopped", "Exception in thread", "Error:")
- logTailingProcess =
+
+ logTailingProcess = {
+ val command = s"/usr/bin/env tail -n +0 -f ${logPath.getCanonicalPath}".split(" ")
// Using "-n +0" to make sure all lines in the log file are checked.
- Process(s"/usr/bin/env tail -n +0 -f ${logPath.getCanonicalPath}").run(ProcessLogger(
- (line: String) => {
- diagnosisBuffer += line
- successLines.foreach(r => {
- if (line.contains(r)) {
- serverStarted.trySuccess(())
- }
- })
- failureLines.foreach(r => {
- if (line.contains(r)) {
- serverStarted.tryFailure(new RuntimeException(s"Failed with output '$line'"))
- }
- })
- }))
+ val builder = new ProcessBuilder(command: _*)
+ val captureOutput = (line: String) => diagnosisBuffer.synchronized {
+ diagnosisBuffer += line
+
+ successLines.foreach { r =>
+ if (line.contains(r)) {
+ serverStarted.trySuccess(())
+ }
+ }
+ }
+
+ val process = builder.start()
+
+ new ProcessOutputCapturer(process.getInputStream, captureOutput).start()
+ new ProcessOutputCapturer(process.getErrorStream, captureOutput).start()
+ process
+ }
Await.result(serverStarted.future, SERVER_STARTUP_TIMEOUT)
}
private def stopThriftServer(): Unit = {
// The `spark-daemon.sh' script uses kill, which is not synchronous, have to wait for a while.
- Process(stopScript, None, "SPARK_PID_DIR" -> pidDir.getCanonicalPath).run().exitValue()
+ Utils.executeAndGetOutput(
+ command = Seq(stopScript),
+ extraEnvironment = Map("SPARK_PID_DIR" -> pidDir.getCanonicalPath))
Thread.sleep(3.seconds.toMillis)
warehousePath.delete()