From a5b5b936596ceb45f5f5b68bf1d6368534fb9470 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Wed, 19 Aug 2015 11:21:46 +0800 Subject: [SPARK-9939] [SQL] Resorts to Java process API in CliSuite, HiveSparkSubmitSuite and HiveThriftServer2 test suites Scala process API has a known bug ([SI-8768] [1]), which may be the reason why several test suites which fork sub-processes are flaky. This PR replaces Scala process API with Java process API in `CliSuite`, `HiveSparkSubmitSuite`, and `HiveThriftServer2` related test suites to see whether it fix these flaky tests. [1]: https://issues.scala-lang.org/browse/SI-8768 Author: Cheng Lian Closes #8168 from liancheng/spark-9939/use-java-process-api. --- .../apache/spark/sql/test/ProcessTestUtils.scala | 37 ++++++++++ sql/hive-thriftserver/pom.xml | 7 ++ .../spark/sql/hive/thriftserver/CliSuite.scala | 64 +++++++---------- .../thriftserver/HiveThriftServer2Suites.scala | 83 ++++++++++++---------- .../spark/sql/hive/HiveSparkSubmitSuite.scala | 49 +++++++++---- 5 files changed, 149 insertions(+), 91 deletions(-) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/test/ProcessTestUtils.scala (limited to 'sql') diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/ProcessTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/ProcessTestUtils.scala new file mode 100644 index 0000000000..152c9c8459 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/ProcessTestUtils.scala @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.test + +import java.io.{IOException, InputStream} + +import scala.sys.process.BasicIO + +object ProcessTestUtils { + class ProcessOutputCapturer(stream: InputStream, capture: String => Unit) extends Thread { + this.setDaemon(true) + + override def run(): Unit = { + try { + BasicIO.processFully(capture)(stream) + } catch { case _: IOException => + // Ignores the IOException thrown when the process termination, which closes the input + // stream abruptly. + } + } + } +} diff --git a/sql/hive-thriftserver/pom.xml b/sql/hive-thriftserver/pom.xml index 2dfbcb2425..3566c87dd2 100644 --- a/sql/hive-thriftserver/pom.xml +++ b/sql/hive-thriftserver/pom.xml @@ -86,6 +86,13 @@ selenium-java test + + org.apache.spark + spark-sql_${scala.binary.version} + test-jar + ${project.version} + test + target/scala-${scala.binary.version}/classes diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala index 121b3e077f..e59a14ec00 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala @@ -18,18 +18,19 @@ package org.apache.spark.sql.hive.thriftserver import java.io._ +import java.sql.Timestamp +import java.util.Date import scala.collection.mutable.ArrayBuffer import scala.concurrent.duration._ import scala.concurrent.{Await, Promise} -import scala.sys.process.{Process, ProcessLogger} -import scala.util.Failure +import org.apache.spark.sql.test.ProcessTestUtils.ProcessOutputCapturer import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.scalatest.BeforeAndAfter -import org.apache.spark.{Logging, SparkFunSuite} import org.apache.spark.util.Utils +import org.apache.spark.{Logging, SparkFunSuite} /** * A test suite for the `spark-sql` CLI tool. Note that all test cases share the same temporary @@ -70,6 +71,9 @@ class CliSuite extends SparkFunSuite with BeforeAndAfter with Logging { queriesAndExpectedAnswers: (String, String)*): Unit = { val (queries, expectedAnswers) = queriesAndExpectedAnswers.unzip + // Explicitly adds ENTER for each statement to make sure they are actually entered into the CLI. + val queriesString = queries.map(_ + "\n").mkString + val command = { val cliScript = "../../bin/spark-sql".split("/").mkString(File.separator) val jdbcUrl = s"jdbc:derby:;databaseName=$metastorePath;create=true" @@ -83,13 +87,14 @@ class CliSuite extends SparkFunSuite with BeforeAndAfter with Logging { var next = 0 val foundAllExpectedAnswers = Promise.apply[Unit]() - // Explicitly adds ENTER for each statement to make sure they are actually entered into the CLI. - val queryStream = new ByteArrayInputStream(queries.map(_ + "\n").mkString.getBytes) val buffer = new ArrayBuffer[String]() val lock = new Object def captureOutput(source: String)(line: String): Unit = lock.synchronized { - buffer += s"$source> $line" + // This test suite sometimes gets extremely slow out of unknown reason on Jenkins. Here we + // add a timestamp to provide more diagnosis information. + buffer += s"${new Timestamp(new Date().getTime)} - $source> $line" + // If we haven't found all expected answers and another expected answer comes up... if (next < expectedAnswers.size && line.startsWith(expectedAnswers(next))) { next += 1 @@ -98,48 +103,27 @@ class CliSuite extends SparkFunSuite with BeforeAndAfter with Logging { foundAllExpectedAnswers.trySuccess(()) } } else { - errorResponses.foreach( r => { + errorResponses.foreach { r => if (line.startsWith(r)) { foundAllExpectedAnswers.tryFailure( new RuntimeException(s"Failed with error line '$line'")) - }}) - } - } - - // Searching expected output line from both stdout and stderr of the CLI process - val process = (Process(command, None) #< queryStream).run( - ProcessLogger(captureOutput("stdout"), captureOutput("stderr"))) - - // catch the output value - class exitCodeCatcher extends Runnable { - var exitValue = 0 - - override def run(): Unit = { - try { - exitValue = process.exitValue() - } catch { - case rte: RuntimeException => - // ignored as it will get triggered when the process gets destroyed - logDebug("Ignoring exception while waiting for exit code", rte) - } - if (exitValue != 0) { - // process exited: fail fast - foundAllExpectedAnswers.tryFailure( - new RuntimeException(s"Failed with exit code $exitValue")) + } } } } - // spin off the code catche thread. No attempt is made to kill this - // as it will exit once the launched process terminates. - val codeCatcherThread = new Thread(new exitCodeCatcher()) - codeCatcherThread.start() + + val process = new ProcessBuilder(command: _*).start() + + val stdinWriter = new OutputStreamWriter(process.getOutputStream) + stdinWriter.write(queriesString) + stdinWriter.flush() + stdinWriter.close() + + new ProcessOutputCapturer(process.getInputStream, captureOutput("stdout")).start() + new ProcessOutputCapturer(process.getErrorStream, captureOutput("stderr")).start() try { - Await.ready(foundAllExpectedAnswers.future, timeout) - foundAllExpectedAnswers.future.value match { - case Some(Failure(t)) => throw t - case _ => - } + Await.result(foundAllExpectedAnswers.future, timeout) } catch { case cause: Throwable => val message = s""" diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala index 17e7044c46..ded42bca99 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala @@ -22,10 +22,9 @@ import java.net.URL import java.sql.{Date, DriverManager, SQLException, Statement} import scala.collection.mutable.ArrayBuffer +import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.duration._ import scala.concurrent.{Await, Promise, future} -import scala.concurrent.ExecutionContext.Implicits.global -import scala.sys.process.{Process, ProcessLogger} import scala.util.{Random, Try} import com.google.common.base.Charsets.UTF_8 @@ -38,11 +37,12 @@ import org.apache.hive.service.cli.thrift.TCLIService.Client import org.apache.hive.service.cli.thrift.ThriftCLIServiceClient import org.apache.thrift.protocol.TBinaryProtocol import org.apache.thrift.transport.TSocket -import org.scalatest.{Ignore, BeforeAndAfterAll} +import org.scalatest.BeforeAndAfterAll -import org.apache.spark.{Logging, SparkFunSuite} import org.apache.spark.sql.hive.HiveContext +import org.apache.spark.sql.test.ProcessTestUtils.ProcessOutputCapturer import org.apache.spark.util.Utils +import org.apache.spark.{Logging, SparkFunSuite} object TestData { def getTestDataFilePath(name: String): URL = { @@ -53,7 +53,6 @@ object TestData { val smallKvWithNull = getTestDataFilePath("small_kv_with_null.txt") } -@Ignore // SPARK-9606 class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { override def mode: ServerMode.Value = ServerMode.binary @@ -380,7 +379,6 @@ class HiveThriftBinaryServerSuite extends HiveThriftJdbcTest { } } -@Ignore // SPARK-9606 class HiveThriftHttpServerSuite extends HiveThriftJdbcTest { override def mode: ServerMode.Value = ServerMode.http @@ -484,7 +482,7 @@ abstract class HiveThriftServer2Test extends SparkFunSuite with BeforeAndAfterAl val tempLog4jConf = Utils.createTempDir().getCanonicalPath Files.write( - """log4j.rootCategory=INFO, console + """log4j.rootCategory=DEBUG, console |log4j.appender.console=org.apache.log4j.ConsoleAppender |log4j.appender.console.target=System.err |log4j.appender.console.layout=org.apache.log4j.PatternLayout @@ -493,7 +491,7 @@ abstract class HiveThriftServer2Test extends SparkFunSuite with BeforeAndAfterAl new File(s"$tempLog4jConf/log4j.properties"), UTF_8) - tempLog4jConf // + File.pathSeparator + sys.props("java.class.path") + tempLog4jConf } s"""$startScript @@ -521,7 +519,7 @@ abstract class HiveThriftServer2Test extends SparkFunSuite with BeforeAndAfterAl */ val THRIFT_HTTP_SERVICE_LIVE = "Started ThriftHttpCLIService in http" - val SERVER_STARTUP_TIMEOUT = 1.minute + val SERVER_STARTUP_TIMEOUT = 3.minutes private def startThriftServer(port: Int, attempt: Int) = { warehousePath = Utils.createTempDir() @@ -543,17 +541,22 @@ abstract class HiveThriftServer2Test extends SparkFunSuite with BeforeAndAfterAl logInfo(s"Trying to start HiveThriftServer2: port=$port, mode=$mode, attempt=$attempt") - val env = Seq( - // Disables SPARK_TESTING to exclude log4j.properties in test directories. - "SPARK_TESTING" -> "0", - // Points SPARK_PID_DIR to SPARK_HOME, otherwise only 1 Thrift server instance can be started - // at a time, which is not Jenkins friendly. - "SPARK_PID_DIR" -> pidDir.getCanonicalPath) - - logPath = Process(command, None, env: _*).lines.collectFirst { - case line if line.contains(LOG_FILE_MARK) => new File(line.drop(LOG_FILE_MARK.length)) - }.getOrElse { - throw new RuntimeException("Failed to find HiveThriftServer2 log file.") + logPath = { + val lines = Utils.executeAndGetOutput( + command = command, + extraEnvironment = Map( + // Disables SPARK_TESTING to exclude log4j.properties in test directories. + "SPARK_TESTING" -> "0", + // Points SPARK_PID_DIR to SPARK_HOME, otherwise only 1 Thrift server instance can be + // started at a time, which is not Jenkins friendly. + "SPARK_PID_DIR" -> pidDir.getCanonicalPath), + redirectStderr = true) + + lines.split("\n").collectFirst { + case line if line.contains(LOG_FILE_MARK) => new File(line.drop(LOG_FILE_MARK.length)) + }.getOrElse { + throw new RuntimeException("Failed to find HiveThriftServer2 log file.") + } } val serverStarted = Promise[Unit]() @@ -561,30 +564,36 @@ abstract class HiveThriftServer2Test extends SparkFunSuite with BeforeAndAfterAl // Ensures that the following "tail" command won't fail. logPath.createNewFile() val successLines = Seq(THRIFT_BINARY_SERVICE_LIVE, THRIFT_HTTP_SERVICE_LIVE) - val failureLines = Seq("HiveServer2 is stopped", "Exception in thread", "Error:") - logTailingProcess = + + logTailingProcess = { + val command = s"/usr/bin/env tail -n +0 -f ${logPath.getCanonicalPath}".split(" ") // Using "-n +0" to make sure all lines in the log file are checked. - Process(s"/usr/bin/env tail -n +0 -f ${logPath.getCanonicalPath}").run(ProcessLogger( - (line: String) => { - diagnosisBuffer += line - successLines.foreach(r => { - if (line.contains(r)) { - serverStarted.trySuccess(()) - } - }) - failureLines.foreach(r => { - if (line.contains(r)) { - serverStarted.tryFailure(new RuntimeException(s"Failed with output '$line'")) - } - }) - })) + val builder = new ProcessBuilder(command: _*) + val captureOutput = (line: String) => diagnosisBuffer.synchronized { + diagnosisBuffer += line + + successLines.foreach { r => + if (line.contains(r)) { + serverStarted.trySuccess(()) + } + } + } + + val process = builder.start() + + new ProcessOutputCapturer(process.getInputStream, captureOutput).start() + new ProcessOutputCapturer(process.getErrorStream, captureOutput).start() + process + } Await.result(serverStarted.future, SERVER_STARTUP_TIMEOUT) } private def stopThriftServer(): Unit = { // The `spark-daemon.sh' script uses kill, which is not synchronous, have to wait for a while. - Process(stopScript, None, "SPARK_PID_DIR" -> pidDir.getCanonicalPath).run().exitValue() + Utils.executeAndGetOutput( + command = Seq(stopScript), + extraEnvironment = Map("SPARK_PID_DIR" -> pidDir.getCanonicalPath)) Thread.sleep(3.seconds.toMillis) warehousePath.delete() diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala index 0c29646114..dc2d85f486 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala @@ -18,9 +18,10 @@ package org.apache.spark.sql.hive import java.io.File +import java.sql.Timestamp +import java.util.Date import scala.collection.mutable.ArrayBuffer -import scala.sys.process.{Process, ProcessLogger} import org.scalatest.Matchers import org.scalatest.concurrent.Timeouts @@ -30,6 +31,7 @@ import org.scalatest.time.SpanSugar._ import org.apache.spark._ import org.apache.spark.sql.QueryTest import org.apache.spark.sql.hive.test.{TestHive, TestHiveContext} +import org.apache.spark.sql.test.ProcessTestUtils.ProcessOutputCapturer import org.apache.spark.sql.types.DecimalType import org.apache.spark.util.{ResetSystemProperties, Utils} @@ -39,6 +41,8 @@ import org.apache.spark.util.{ResetSystemProperties, Utils} class HiveSparkSubmitSuite extends SparkFunSuite with Matchers + // This test suite sometimes gets extremely slow out of unknown reason on Jenkins. Here we + // add a timestamp to provide more diagnosis information. with ResetSystemProperties with Timeouts { @@ -110,28 +114,44 @@ class HiveSparkSubmitSuite val history = ArrayBuffer.empty[String] val commands = Seq("./bin/spark-submit") ++ args val commandLine = commands.mkString("'", "' '", "'") - val process = Process( - commands, - new File(sparkHome), - "SPARK_TESTING" -> "1", - "SPARK_HOME" -> sparkHome - ).run(ProcessLogger( + + val builder = new ProcessBuilder(commands: _*).directory(new File(sparkHome)) + val env = builder.environment() + env.put("SPARK_TESTING", "1") + env.put("SPARK_HOME", sparkHome) + + def captureOutput(source: String)(line: String): Unit = { + // This test suite has some weird behaviors when executed on Jenkins: + // + // 1. Sometimes it gets extremely slow out of unknown reason on Jenkins. Here we add a + // timestamp to provide more diagnosis information. + // 2. Log lines are not correctly redirected to unit-tests.log as expected, so here we print + // them out for debugging purposes. + val logLine = s"${new Timestamp(new Date().getTime)} - $source> $line" // scalastyle:off println - (line: String) => { println(s"stdout> $line"); history += s"out> $line"}, - (line: String) => { println(s"stderr> $line"); history += s"err> $line" } + println(logLine) // scalastyle:on println - )) + history += logLine + } + + val process = builder.start() + new ProcessOutputCapturer(process.getInputStream, captureOutput("stdout")).start() + new ProcessOutputCapturer(process.getErrorStream, captureOutput("stderr")).start() try { - val exitCode = failAfter(180.seconds) { process.exitValue() } + val exitCode = failAfter(180.seconds) { process.waitFor() } if (exitCode != 0) { // include logs in output. Note that logging is async and may not have completed // at the time this exception is raised Thread.sleep(1000) val historyLog = history.mkString("\n") - fail(s"$commandLine returned with exit code $exitCode." + - s" See the log4j logs for more detail." + - s"\n$historyLog") + fail { + s"""spark-submit returned with exit code $exitCode. + |Command line: $commandLine + | + |$historyLog + """.stripMargin + } } } catch { case to: TestFailedDueToTimeoutException => @@ -263,6 +283,7 @@ object SPARK_9757 extends QueryTest with Logging { val hiveContext = new TestHiveContext(sparkContext) import hiveContext.implicits._ + import org.apache.spark.sql.functions._ val dir = Utils.createTempDir() -- cgit v1.2.3