aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/TestUtils.scala11
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala25
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/catalyst/LogicalPlanToSQLSuite.scala15
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala4
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala21
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala36
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala12
7 files changed, 78 insertions, 46 deletions
diff --git a/core/src/main/scala/org/apache/spark/TestUtils.scala b/core/src/main/scala/org/apache/spark/TestUtils.scala
index b5b201409a..fd0477541e 100644
--- a/core/src/main/scala/org/apache/spark/TestUtils.scala
+++ b/core/src/main/scala/org/apache/spark/TestUtils.scala
@@ -20,7 +20,6 @@ package org.apache.spark
import java.io.{ByteArrayInputStream, File, FileInputStream, FileOutputStream}
import java.net.{URI, URL}
import java.nio.charset.StandardCharsets
-import java.nio.file.Paths
import java.util.Arrays
import java.util.concurrent.{CountDownLatch, TimeUnit}
import java.util.jar.{JarEntry, JarOutputStream}
@@ -28,6 +27,8 @@ import java.util.jar.{JarEntry, JarOutputStream}
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
+import scala.sys.process.{Process, ProcessLogger}
+import scala.util.Try
import com.google.common.io.{ByteStreams, Files}
import javax.tools.{JavaFileObject, SimpleJavaFileObject, ToolProvider}
@@ -185,6 +186,14 @@ private[spark] object TestUtils {
assert(spillListener.numSpilledStages == 0, s"expected $identifier to not spill, but did")
}
+ /**
+ * Test if a command is available.
+ */
+ def testCommandAvailable(command: String): Boolean = {
+ val attempt = Try(Process(command).run(ProcessLogger(_ => ())).exitValue())
+ attempt.isSuccess && attempt.get == 0
+ }
+
}
diff --git a/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala
index 287ae6ff6e..1a0eb250e7 100644
--- a/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala
@@ -21,8 +21,6 @@ import java.io.File
import scala.collection.Map
import scala.io.Codec
-import scala.sys.process._
-import scala.util.Try
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{LongWritable, Text}
@@ -39,7 +37,7 @@ class PipedRDDSuite extends SparkFunSuite with SharedSparkContext {
}
test("basic pipe") {
- assume(testCommandAvailable("cat"))
+ assume(TestUtils.testCommandAvailable("cat"))
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
val piped = nums.pipe(Seq("cat"))
@@ -53,7 +51,7 @@ class PipedRDDSuite extends SparkFunSuite with SharedSparkContext {
}
test("basic pipe with tokenization") {
- assume(testCommandAvailable("wc"))
+ assume(TestUtils.testCommandAvailable("wc"))
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
// verify that both RDD.pipe(command: String) and RDD.pipe(command: String, env) work good
@@ -66,7 +64,7 @@ class PipedRDDSuite extends SparkFunSuite with SharedSparkContext {
}
test("failure in iterating over pipe input") {
- assume(testCommandAvailable("cat"))
+ assume(TestUtils.testCommandAvailable("cat"))
val nums =
sc.makeRDD(Array(1, 2, 3, 4), 2)
.mapPartitionsWithIndex((index, iterator) => {
@@ -86,7 +84,7 @@ class PipedRDDSuite extends SparkFunSuite with SharedSparkContext {
}
test("advanced pipe") {
- assume(testCommandAvailable("cat"))
+ assume(TestUtils.testCommandAvailable("cat"))
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
val bl = sc.broadcast(List("0"))
@@ -147,7 +145,7 @@ class PipedRDDSuite extends SparkFunSuite with SharedSparkContext {
}
test("pipe with env variable") {
- assume(testCommandAvailable(envCommand))
+ assume(TestUtils.testCommandAvailable(envCommand))
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
val piped = nums.pipe(s"$envCommand MY_TEST_ENV", Map("MY_TEST_ENV" -> "LALALA"))
val c = piped.collect()
@@ -159,7 +157,7 @@ class PipedRDDSuite extends SparkFunSuite with SharedSparkContext {
}
test("pipe with process which cannot be launched due to bad command") {
- assume(!testCommandAvailable("some_nonexistent_command"))
+ assume(!TestUtils.testCommandAvailable("some_nonexistent_command"))
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
val command = Seq("some_nonexistent_command")
val piped = nums.pipe(command)
@@ -170,7 +168,7 @@ class PipedRDDSuite extends SparkFunSuite with SharedSparkContext {
}
test("pipe with process which is launched but fails with non-zero exit status") {
- assume(testCommandAvailable("cat"))
+ assume(TestUtils.testCommandAvailable("cat"))
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
val command = Seq("cat", "nonexistent_file")
val piped = nums.pipe(command)
@@ -181,7 +179,7 @@ class PipedRDDSuite extends SparkFunSuite with SharedSparkContext {
}
test("basic pipe with separate working directory") {
- assume(testCommandAvailable("cat"))
+ assume(TestUtils.testCommandAvailable("cat"))
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
val piped = nums.pipe(Seq("cat"), separateWorkingDir = true)
val c = piped.collect()
@@ -208,13 +206,8 @@ class PipedRDDSuite extends SparkFunSuite with SharedSparkContext {
testExportInputFile("mapreduce_map_input_file")
}
- def testCommandAvailable(command: String): Boolean = {
- val attempt = Try(Process(command).run(ProcessLogger(_ => ())).exitValue())
- attempt.isSuccess && attempt.get == 0
- }
-
def testExportInputFile(varName: String) {
- assume(testCommandAvailable(envCommand))
+ assume(TestUtils.testCommandAvailable(envCommand))
val nums = new HadoopRDD(sc, new JobConf(), classOf[TextInputFormat], classOf[LongWritable],
classOf[Text], 2) {
override def getPartitions: Array[Partition] = Array(generateFakeHadoopPartition())
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/LogicalPlanToSQLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/LogicalPlanToSQLSuite.scala
index 557ea44d1c..fe171a6ee8 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/LogicalPlanToSQLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/LogicalPlanToSQLSuite.scala
@@ -23,6 +23,7 @@ import java.nio.file.{Files, NoSuchFileException, Paths}
import scala.io.Source
import scala.util.control.NonFatal
+import org.apache.spark.TestUtils
import org.apache.spark.sql.Column
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions.Attribute
@@ -576,6 +577,8 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils {
}
test("script transformation - schemaless") {
+ assume(TestUtils.testCommandAvailable("/bin/bash"))
+
checkSQL("SELECT TRANSFORM (a, b, c, d) USING 'cat' FROM parquet_t2",
"script_transformation_1")
checkSQL("SELECT TRANSFORM (*) USING 'cat' FROM parquet_t2",
@@ -583,11 +586,15 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils {
}
test("script transformation - alias list") {
+ assume(TestUtils.testCommandAvailable("/bin/bash"))
+
checkSQL("SELECT TRANSFORM (a, b, c, d) USING 'cat' AS (d1, d2, d3, d4) FROM parquet_t2",
"script_transformation_alias_list")
}
test("script transformation - alias list with type") {
+ assume(TestUtils.testCommandAvailable("/bin/bash"))
+
checkSQL(
"""FROM
|(FROM parquet_t1 SELECT TRANSFORM(key, value) USING 'cat' AS (thing1 int, thing2 string)) t
@@ -597,6 +604,8 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils {
}
test("script transformation - row format delimited clause with only one format property") {
+ assume(TestUtils.testCommandAvailable("/bin/bash"))
+
checkSQL(
"""SELECT TRANSFORM (key) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
|USING 'cat' AS (tKey) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
@@ -606,6 +615,8 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils {
}
test("script transformation - row format delimited clause with multiple format properties") {
+ assume(TestUtils.testCommandAvailable("/bin/bash"))
+
checkSQL(
"""SELECT TRANSFORM (key)
|ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\t'
@@ -617,6 +628,8 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils {
}
test("script transformation - row format serde clauses with SERDEPROPERTIES") {
+ assume(TestUtils.testCommandAvailable("/bin/bash"))
+
checkSQL(
"""SELECT TRANSFORM (key, value)
|ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
@@ -630,6 +643,8 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils {
}
test("script transformation - row format serde clauses without SERDEPROPERTIES") {
+ assume(TestUtils.testCommandAvailable("/bin/bash"))
+
checkSQL(
"""SELECT TRANSFORM (key, value)
|ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
index 13ceed7c79..05a15166f8 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
@@ -226,7 +226,8 @@ abstract class HiveComparisonTest
testCaseName: String,
sql: String,
reset: Boolean = true,
- tryWithoutResettingFirst: Boolean = false) {
+ tryWithoutResettingFirst: Boolean = false,
+ skip: Boolean = false) {
// testCaseName must not contain ':', which is not allowed to appear in a filename of Windows
assert(!testCaseName.contains(":"))
@@ -255,6 +256,7 @@ abstract class HiveComparisonTest
}
test(testCaseName) {
+ assume(!skip)
logDebug(s"=== HIVE TEST: $testCaseName ===")
val sqlWithoutComment =
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
index e5b23dafcf..2ae66d1b2f 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
@@ -27,7 +27,7 @@ import scala.util.Try
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.scalatest.BeforeAndAfter
-import org.apache.spark.SparkFiles
+import org.apache.spark.{SparkFiles, TestUtils}
import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SparkSession}
import org.apache.spark.sql.catalyst.expressions.Cast
import org.apache.spark.sql.catalyst.parser.ParseException
@@ -389,13 +389,15 @@ class HiveQuerySuite extends HiveComparisonTest with SQLTestUtils with BeforeAnd
}
createQueryTest("transform",
- "SELECT TRANSFORM (key) USING 'cat' AS (tKey) FROM src")
+ "SELECT TRANSFORM (key) USING 'cat' AS (tKey) FROM src",
+ skip = !TestUtils.testCommandAvailable("/bin/bash"))
createQueryTest("schema-less transform",
"""
|SELECT TRANSFORM (key, value) USING 'cat' FROM src;
|SELECT TRANSFORM (*) USING 'cat' FROM src;
- """.stripMargin)
+ """.stripMargin,
+ skip = !TestUtils.testCommandAvailable("/bin/bash"))
val delimiter = "'\t'"
@@ -403,19 +405,22 @@ class HiveQuerySuite extends HiveComparisonTest with SQLTestUtils with BeforeAnd
s"""
|SELECT TRANSFORM (key) ROW FORMAT DELIMITED FIELDS TERMINATED BY ${delimiter}
|USING 'cat' AS (tKey) ROW FORMAT DELIMITED FIELDS TERMINATED BY ${delimiter} FROM src;
- """.stripMargin.replaceAll("\n", " "))
+ """.stripMargin.replaceAll("\n", " "),
+ skip = !TestUtils.testCommandAvailable("/bin/bash"))
createQueryTest("transform with custom field delimiter2",
s"""
|SELECT TRANSFORM (key, value) ROW FORMAT DELIMITED FIELDS TERMINATED BY ${delimiter}
|USING 'cat' ROW FORMAT DELIMITED FIELDS TERMINATED BY ${delimiter} FROM src;
- """.stripMargin.replaceAll("\n", " "))
+ """.stripMargin.replaceAll("\n", " "),
+ skip = !TestUtils.testCommandAvailable("/bin/bash"))
createQueryTest("transform with custom field delimiter3",
s"""
|SELECT TRANSFORM (*) ROW FORMAT DELIMITED FIELDS TERMINATED BY ${delimiter}
|USING 'cat' ROW FORMAT DELIMITED FIELDS TERMINATED BY ${delimiter} FROM src;
- """.stripMargin.replaceAll("\n", " "))
+ """.stripMargin.replaceAll("\n", " "),
+ skip = !TestUtils.testCommandAvailable("/bin/bash"))
createQueryTest("transform with SerDe",
"""
@@ -423,9 +428,11 @@ class HiveQuerySuite extends HiveComparisonTest with SQLTestUtils with BeforeAnd
|'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
|USING 'cat' AS (tKey, tValue) ROW FORMAT SERDE
|'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe' FROM src;
- """.stripMargin.replaceAll(System.lineSeparator(), " "))
+ """.stripMargin.replaceAll(System.lineSeparator(), " "),
+ skip = !TestUtils.testCommandAvailable("/bin/bash"))
test("transform with SerDe2") {
+ assume(TestUtils.testCommandAvailable("/bin/bash"))
sql("CREATE TABLE small_src(key INT, value STRING)")
sql("INSERT OVERWRITE TABLE small_src SELECT key, value FROM src LIMIT 10")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index f47cf4a9c6..953e29127f 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -17,16 +17,14 @@
package org.apache.spark.sql.hive.execution
-import java.io.{File, PrintWriter}
+import java.io.File
import java.nio.charset.StandardCharsets
import java.sql.{Date, Timestamp}
-import scala.sys.process.{Process, ProcessLogger}
-import scala.util.Try
-
import com.google.common.io.Files
import org.apache.hadoop.fs.Path
+import org.apache.spark.TestUtils
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, FunctionRegistry, NoSuchPartitionException}
@@ -85,18 +83,17 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
}
test("script") {
+ assume(TestUtils.testCommandAvailable("/bin/bash"))
+ assume(TestUtils.testCommandAvailable("echo | sed"))
val scriptFilePath = getTestResourcePath("test_script.sh")
- if (testCommandAvailable("bash") && testCommandAvailable("echo | sed")) {
- val df = Seq(("x1", "y1", "z1"), ("x2", "y2", "z2")).toDF("c1", "c2", "c3")
- df.createOrReplaceTempView("script_table")
- val query1 = sql(
- s"""
- |SELECT col1 FROM (from(SELECT c1, c2, c3 FROM script_table) tempt_table
- |REDUCE c1, c2, c3 USING 'bash $scriptFilePath' AS
- |(col1 STRING, col2 STRING)) script_test_table""".stripMargin)
- checkAnswer(query1, Row("x1_y1") :: Row("x2_y2") :: Nil)
- }
- // else skip this test
+ val df = Seq(("x1", "y1", "z1"), ("x2", "y2", "z2")).toDF("c1", "c2", "c3")
+ df.createOrReplaceTempView("script_table")
+ val query1 = sql(
+ s"""
+ |SELECT col1 FROM (from(SELECT c1, c2, c3 FROM script_table) tempt_table
+ |REDUCE c1, c2, c3 USING 'bash $scriptFilePath' AS
+ |(col1 STRING, col2 STRING)) script_test_table""".stripMargin)
+ checkAnswer(query1, Row("x1_y1") :: Row("x2_y2") :: Nil)
}
test("UDTF") {
@@ -1070,12 +1067,14 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
}
test("Star Expansion - script transform") {
+ assume(TestUtils.testCommandAvailable("/bin/bash"))
val data = (1 to 100000).map { i => (i, i, i) }
data.toDF("d1", "d2", "d3").createOrReplaceTempView("script_trans")
assert(100000 === sql("SELECT TRANSFORM (*) USING 'cat' FROM script_trans").count())
}
test("test script transform for stdout") {
+ assume(TestUtils.testCommandAvailable("/bin/bash"))
val data = (1 to 100000).map { i => (i, i, i) }
data.toDF("d1", "d2", "d3").createOrReplaceTempView("script_trans")
assert(100000 ===
@@ -1083,6 +1082,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
}
test("test script transform for stderr") {
+ assume(TestUtils.testCommandAvailable("/bin/bash"))
val data = (1 to 100000).map { i => (i, i, i) }
data.toDF("d1", "d2", "d3").createOrReplaceTempView("script_trans")
assert(0 ===
@@ -1090,6 +1090,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
}
test("test script transform data type") {
+ assume(TestUtils.testCommandAvailable("/bin/bash"))
val data = (1 to 5).map { i => (i, i) }
data.toDF("key", "value").createOrReplaceTempView("test")
checkAnswer(
@@ -2012,9 +2013,4 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
)
}
}
-
- def testCommandAvailable(command: String): Boolean = {
- val attempt = Try(Process(command).run(ProcessLogger(_ => ())).exitValue())
- attempt.isSuccess && attempt.get == 0
- }
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala
index 0e837766e2..d3475a79a7 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.hive.execution
import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
import org.scalatest.exceptions.TestFailedException
-import org.apache.spark.{SparkException, TaskContext}
+import org.apache.spark.{SparkException, TaskContext, TestUtils}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
@@ -50,6 +50,8 @@ class ScriptTransformationSuite extends SparkPlanTest with TestHiveSingleton {
)
test("cat without SerDe") {
+ assume(TestUtils.testCommandAvailable("/bin/bash"))
+
val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a")
checkAnswer(
rowsDf,
@@ -64,6 +66,8 @@ class ScriptTransformationSuite extends SparkPlanTest with TestHiveSingleton {
}
test("cat with LazySimpleSerDe") {
+ assume(TestUtils.testCommandAvailable("/bin/bash"))
+
val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a")
checkAnswer(
rowsDf,
@@ -78,6 +82,8 @@ class ScriptTransformationSuite extends SparkPlanTest with TestHiveSingleton {
}
test("script transformation should not swallow errors from upstream operators (no serde)") {
+ assume(TestUtils.testCommandAvailable("/bin/bash"))
+
val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a")
val e = intercept[TestFailedException] {
checkAnswer(
@@ -95,6 +101,8 @@ class ScriptTransformationSuite extends SparkPlanTest with TestHiveSingleton {
}
test("script transformation should not swallow errors from upstream operators (with serde)") {
+ assume(TestUtils.testCommandAvailable("/bin/bash"))
+
val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a")
val e = intercept[TestFailedException] {
checkAnswer(
@@ -112,6 +120,8 @@ class ScriptTransformationSuite extends SparkPlanTest with TestHiveSingleton {
}
test("SPARK-14400 script transformation should fail for bad script command") {
+ assume(TestUtils.testCommandAvailable("/bin/bash"))
+
val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a")
val e = intercept[SparkException] {