From a5b5b936596ceb45f5f5b68bf1d6368534fb9470 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Wed, 19 Aug 2015 11:21:46 +0800 Subject: [SPARK-9939] [SQL] Resorts to Java process API in CliSuite, HiveSparkSubmitSuite and HiveThriftServer2 test suites Scala process API has a known bug ([SI-8768] [1]), which may be the reason why several test suites which fork sub-processes are flaky. This PR replaces Scala process API with Java process API in `CliSuite`, `HiveSparkSubmitSuite`, and `HiveThriftServer2` related test suites to see whether it fix these flaky tests. [1]: https://issues.scala-lang.org/browse/SI-8768 Author: Cheng Lian Closes #8168 from liancheng/spark-9939/use-java-process-api. --- .../spark/sql/hive/thriftserver/CliSuite.scala | 64 +++++++---------- .../thriftserver/HiveThriftServer2Suites.scala | 83 ++++++++++++---------- 2 files changed, 70 insertions(+), 77 deletions(-) (limited to 'sql/hive-thriftserver/src') diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala index 121b3e077f..e59a14ec00 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala @@ -18,18 +18,19 @@ package org.apache.spark.sql.hive.thriftserver import java.io._ +import java.sql.Timestamp +import java.util.Date import scala.collection.mutable.ArrayBuffer import scala.concurrent.duration._ import scala.concurrent.{Await, Promise} -import scala.sys.process.{Process, ProcessLogger} -import scala.util.Failure +import org.apache.spark.sql.test.ProcessTestUtils.ProcessOutputCapturer import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.scalatest.BeforeAndAfter -import org.apache.spark.{Logging, SparkFunSuite} import org.apache.spark.util.Utils +import org.apache.spark.{Logging, SparkFunSuite} /** * A test suite for the `spark-sql` CLI tool. Note that all test cases share the same temporary @@ -70,6 +71,9 @@ class CliSuite extends SparkFunSuite with BeforeAndAfter with Logging { queriesAndExpectedAnswers: (String, String)*): Unit = { val (queries, expectedAnswers) = queriesAndExpectedAnswers.unzip + // Explicitly adds ENTER for each statement to make sure they are actually entered into the CLI. + val queriesString = queries.map(_ + "\n").mkString + val command = { val cliScript = "../../bin/spark-sql".split("/").mkString(File.separator) val jdbcUrl = s"jdbc:derby:;databaseName=$metastorePath;create=true" @@ -83,13 +87,14 @@ class CliSuite extends SparkFunSuite with BeforeAndAfter with Logging { var next = 0 val foundAllExpectedAnswers = Promise.apply[Unit]() - // Explicitly adds ENTER for each statement to make sure they are actually entered into the CLI. - val queryStream = new ByteArrayInputStream(queries.map(_ + "\n").mkString.getBytes) val buffer = new ArrayBuffer[String]() val lock = new Object def captureOutput(source: String)(line: String): Unit = lock.synchronized { - buffer += s"$source> $line" + // This test suite sometimes gets extremely slow out of unknown reason on Jenkins. Here we + // add a timestamp to provide more diagnosis information. + buffer += s"${new Timestamp(new Date().getTime)} - $source> $line" + // If we haven't found all expected answers and another expected answer comes up... if (next < expectedAnswers.size && line.startsWith(expectedAnswers(next))) { next += 1 @@ -98,48 +103,27 @@ class CliSuite extends SparkFunSuite with BeforeAndAfter with Logging { foundAllExpectedAnswers.trySuccess(()) } } else { - errorResponses.foreach( r => { + errorResponses.foreach { r => if (line.startsWith(r)) { foundAllExpectedAnswers.tryFailure( new RuntimeException(s"Failed with error line '$line'")) - }}) - } - } - - // Searching expected output line from both stdout and stderr of the CLI process - val process = (Process(command, None) #< queryStream).run( - ProcessLogger(captureOutput("stdout"), captureOutput("stderr"))) - - // catch the output value - class exitCodeCatcher extends Runnable { - var exitValue = 0 - - override def run(): Unit = { - try { - exitValue = process.exitValue() - } catch { - case rte: RuntimeException => - // ignored as it will get triggered when the process gets destroyed - logDebug("Ignoring exception while waiting for exit code", rte) - } - if (exitValue != 0) { - // process exited: fail fast - foundAllExpectedAnswers.tryFailure( - new RuntimeException(s"Failed with exit code $exitValue")) + } } } } - // spin off the code catche thread. No attempt is made to kill this - // as it will exit once the launched process terminates. - val codeCatcherThread = new Thread(new exitCodeCatcher()) - codeCatcherThread.start() + + val process = new ProcessBuilder(command: _*).start() + + val stdinWriter = new OutputStreamWriter(process.getOutputStream) + stdinWriter.write(queriesString) + stdinWriter.flush() + stdinWriter.close() + + new ProcessOutputCapturer(process.getInputStream, captureOutput("stdout")).start() + new ProcessOutputCapturer(process.getErrorStream, captureOutput("stderr")).start() try { - Await.ready(foundAllExpectedAnswers.future, timeout) - foundAllExpectedAnswers.future.value match { - case Some(Failure(t)) => throw t - case _ => - } + Await.result(foundAllExpectedAnswers.future, timeout) } catch { case cause: Throwable => val message = s""" diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala index 17e7044c46..ded42bca99 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala @@ -22,10 +22,9 @@ import java.net.URL import java.sql.{Date, DriverManager, SQLException, Statement} import scala.collection.mutable.ArrayBuffer +import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration._ import scala.concurrent.{Await, Promise, future} -import scala.concurrent.ExecutionContext.Implicits.global -import scala.sys.process.{Process, ProcessLogger} import scala.util.{Random, Try} import com.google.common.base.Charsets.UTF_8 @@ -38,11 +37,12 @@ import org.apache.hive.service.cli.thrift.TCLIService.Client import org.apache.hive.service.cli.thrift.ThriftCLIServiceClient import org.apache.thrift.protocol.TBinaryProtocol import org.apache.thrift.transport.TSocket -import org.scalatest.{Ignore, BeforeAndAfterAll} +import org.scalatest.BeforeAndAfterAll -import org.apache.spark.{Logging, SparkFunSuite} import org.apache.spark.sql.hive.HiveContext +import org.apache.spark.sql.test.ProcessTestUtils.ProcessOutputCapturer import org.apache.spark.util.Utils +import org.apache.spark.{Logging, SparkFunSuite} object TestData { def getTestDataFilePath(name: String): URL = { @@ -53,7 +53,6 @@ object TestData { val smallKvWithNull = getTestDataFilePath("small_kv_with_null.txt") } -@Ignore // SPARK-9606 class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { override def mode: ServerMode.Value = ServerMode.binary @@ -380,7 +379,6 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { } } -@Ignore // SPARK-9606 class HiveThriftHttpServerSuite extends HiveThriftJdbcTest { override def mode: ServerMode.Value = ServerMode.http @@ -484,7 +482,7 @@ abstract class HiveThriftServer2Test extends SparkFunSuite with BeforeAndAfterAl val tempLog4jConf = Utils.createTempDir().getCanonicalPath Files.write( - """log4j.rootCategory=INFO, console + """log4j.rootCategory=DEBUG, console |log4j.appender.console=org.apache.log4j.ConsoleAppender |log4j.appender.console.target=System.err |log4j.appender.console.layout=org.apache.log4j.PatternLayout @@ -493,7 +491,7 @@ abstract class HiveThriftServer2Test extends SparkFunSuite with BeforeAndAfterAl new File(s"$tempLog4jConf/log4j.properties"), UTF_8) - tempLog4jConf // + File.pathSeparator + sys.props("java.class.path") + tempLog4jConf } s"""$startScript @@ -521,7 +519,7 @@ abstract class HiveThriftServer2Test extends SparkFunSuite with BeforeAndAfterAl */ val THRIFT_HTTP_SERVICE_LIVE = "Started ThriftHttpCLIService in http" - val SERVER_STARTUP_TIMEOUT = 1.minute + val SERVER_STARTUP_TIMEOUT = 3.minutes private def startThriftServer(port: Int, attempt: Int) = { warehousePath = Utils.createTempDir() @@ -543,17 +541,22 @@ abstract class HiveThriftServer2Test extends SparkFunSuite with BeforeAndAfterAl logInfo(s"Trying to start HiveThriftServer2: port=$port, mode=$mode, attempt=$attempt") - val env = Seq( - // Disables SPARK_TESTING to exclude log4j.properties in test directories. - "SPARK_TESTING" -> "0", - // Points SPARK_PID_DIR to SPARK_HOME, otherwise only 1 Thrift server instance can be started - // at a time, which is not Jenkins friendly. - "SPARK_PID_DIR" -> pidDir.getCanonicalPath) - - logPath = Process(command, None, env: _*).lines.collectFirst { - case line if line.contains(LOG_FILE_MARK) => new File(line.drop(LOG_FILE_MARK.length)) - }.getOrElse { - throw new RuntimeException("Failed to find HiveThriftServer2 log file.") + logPath = { + val lines = Utils.executeAndGetOutput( + command = command, + extraEnvironment = Map( + // Disables SPARK_TESTING to exclude log4j.properties in test directories. + "SPARK_TESTING" -> "0", + // Points SPARK_PID_DIR to SPARK_HOME, otherwise only 1 Thrift server instance can be + // started at a time, which is not Jenkins friendly. + "SPARK_PID_DIR" -> pidDir.getCanonicalPath), + redirectStderr = true) + + lines.split("\n").collectFirst { + case line if line.contains(LOG_FILE_MARK) => new File(line.drop(LOG_FILE_MARK.length)) + }.getOrElse { + throw new RuntimeException("Failed to find HiveThriftServer2 log file.") + } } val serverStarted = Promise[Unit]() @@ -561,30 +564,36 @@ abstract class HiveThriftServer2Test extends SparkFunSuite with BeforeAndAfterAl // Ensures that the following "tail" command won't fail. logPath.createNewFile() val successLines = Seq(THRIFT_BINARY_SERVICE_LIVE, THRIFT_HTTP_SERVICE_LIVE) - val failureLines = Seq("HiveServer2 is stopped", "Exception in thread", "Error:") - logTailingProcess = + + logTailingProcess = { + val command = s"/usr/bin/env tail -n +0 -f ${logPath.getCanonicalPath}".split(" ") // Using "-n +0" to make sure all lines in the log file are checked. - Process(s"/usr/bin/env tail -n +0 -f ${logPath.getCanonicalPath}").run(ProcessLogger( - (line: String) => { - diagnosisBuffer += line - successLines.foreach(r => { - if (line.contains(r)) { - serverStarted.trySuccess(()) - } - }) - failureLines.foreach(r => { - if (line.contains(r)) { - serverStarted.tryFailure(new RuntimeException(s"Failed with output '$line'")) - } - }) - })) + val builder = new ProcessBuilder(command: _*) + val captureOutput = (line: String) => diagnosisBuffer.synchronized { + diagnosisBuffer += line + + successLines.foreach { r => + if (line.contains(r)) { + serverStarted.trySuccess(()) + } + } + } + + val process = builder.start() + + new ProcessOutputCapturer(process.getInputStream, captureOutput).start() + new ProcessOutputCapturer(process.getErrorStream, captureOutput).start() + process + } Await.result(serverStarted.future, SERVER_STARTUP_TIMEOUT) } private def stopThriftServer(): Unit = { // The `spark-daemon.sh' script uses kill, which is not synchronous, have to wait for a while. - Process(stopScript, None, "SPARK_PID_DIR" -> pidDir.getCanonicalPath).run().exitValue() + Utils.executeAndGetOutput( + command = Seq(stopScript), + extraEnvironment = Map("SPARK_PID_DIR" -> pidDir.getCanonicalPath)) Thread.sleep(3.seconds.toMillis) warehousePath.delete() -- cgit v1.2.3