From ed21476bc0c760616e7e6bb99f6541745fb09595 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Sun, 24 May 2015 09:51:37 -0700 Subject: [SPARK-7805] [SQL] Move SQLTestUtils.scala and ParquetTest.scala to src/test https://issues.apache.org/jira/browse/SPARK-7805 Because `sql/hive`'s tests depend on the test jar of `sql/core`, we do not need to store `SQLTestUtils` and `ParquetTest` in `src/main`. We should only add stuff that will be needed by `sql/console` or Python tests (for Python, we need it in `src/main`, right? davies). Author: Yin Huai Closes #6334 from yhuai/SPARK-7805 and squashes the following commits: af6d0c9 [Yin Huai] mima b86746a [Yin Huai] Move SQLTestUtils.scala and ParquetTest.scala to src/test. --- .../org/apache/spark/sql/parquet/ParquetTest.scala | 102 --------------------- .../main/scala/org/apache/spark/sql/test/README.md | 7 ++ .../org/apache/spark/sql/test/SQLTestUtils.scala | 88 ------------------ .../org/apache/spark/sql/parquet/ParquetTest.scala | 102 +++++++++++++++++++++ .../org/apache/spark/sql/test/SQLTestUtils.scala | 88 ++++++++++++++++++ 5 files changed, 197 insertions(+), 190 deletions(-) delete mode 100644 sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/test/README.md delete mode 100644 sql/core/src/main/scala/org/apache/spark/sql/test/SQLTestUtils.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetTest.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala (limited to 'sql/core') diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala deleted file mode 100644 index 516ba373f4..0000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.parquet - -import java.io.File - -import scala.reflect.ClassTag -import scala.reflect.runtime.universe.TypeTag - -import org.apache.spark.sql.test.SQLTestUtils -import org.apache.spark.sql.{DataFrame, SaveMode} - -/** - * A helper trait that provides convenient facilities for Parquet testing. - * - * NOTE: Considering classes `Tuple1` ... `Tuple22` all extend `Product`, it would be more - * convenient to use tuples rather than special case classes when writing test cases/suites. - * Especially, `Tuple1.apply` can be used to easily wrap a single type/value. - */ -private[sql] trait ParquetTest extends SQLTestUtils { - import sqlContext.implicits.{localSeqToDataFrameHolder, rddToDataFrameHolder} - import sqlContext.sparkContext - - /** - * Writes `data` to a Parquet file, which is then passed to `f` and will be deleted after `f` - * returns. - */ - protected def withParquetFile[T <: Product: ClassTag: TypeTag] - (data: Seq[T]) - (f: String => Unit): Unit = { - withTempPath { file => - sparkContext.parallelize(data).toDF().write.parquet(file.getCanonicalPath) - f(file.getCanonicalPath) - } - } - - /** - * Writes `data` to a Parquet file and reads it back as a [[DataFrame]], - * which is then passed to `f`. The Parquet file will be deleted after `f` returns. - */ - protected def withParquetDataFrame[T <: Product: ClassTag: TypeTag] - (data: Seq[T]) - (f: DataFrame => Unit): Unit = { - withParquetFile(data)(path => f(sqlContext.read.parquet(path))) - } - - /** - * Writes `data` to a Parquet file, reads it back as a [[DataFrame]] and registers it as a - * temporary table named `tableName`, then call `f`. The temporary table together with the - * Parquet file will be dropped/deleted after `f` returns. - */ - protected def withParquetTable[T <: Product: ClassTag: TypeTag] - (data: Seq[T], tableName: String) - (f: => Unit): Unit = { - withParquetDataFrame(data) { df => - sqlContext.registerDataFrameAsTable(df, tableName) - withTempTable(tableName)(f) - } - } - - protected def makeParquetFile[T <: Product: ClassTag: TypeTag]( - data: Seq[T], path: File): Unit = { - data.toDF().write.mode(SaveMode.Overwrite).parquet(path.getCanonicalPath) - } - - protected def makeParquetFile[T <: Product: ClassTag: TypeTag]( - df: DataFrame, path: File): Unit = { - df.write.mode(SaveMode.Overwrite).parquet(path.getCanonicalPath) - } - - protected def makePartitionDir( - basePath: File, - defaultPartitionName: String, - partitionCols: (String, Any)*): File = { - val partNames = partitionCols.map { case (k, v) => - val valueString = if (v == null || v == "") defaultPartitionName else v.toString - s"$k=$valueString" - } - - val partDir = partNames.foldLeft(basePath) { (parent, child) => - new File(parent, child) - } - - assert(partDir.mkdirs(), s"Couldn't create directory $partDir") - partDir - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/test/README.md b/sql/core/src/main/scala/org/apache/spark/sql/test/README.md new file mode 100644 index 0000000000..d867f181b9 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/test/README.md @@ -0,0 +1,7 @@ +README +====== + +Please do not add any class in this place unless it is used by `sql/console` or Python tests. +If you need to create any classes or traits that will be used by tests from both `sql/core` and +`sql/hive`, you can add them in the `src/test` of `sql/core` (tests of `sql/hive` +depend on the test jar of `sql/core`). diff --git a/sql/core/src/main/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/test/SQLTestUtils.scala deleted file mode 100644 index ca66cdc482..0000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/test/SQLTestUtils.scala +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.test - -import java.io.File - -import scala.util.Try - -import org.apache.spark.sql.SQLContext -import org.apache.spark.util.Utils - -trait SQLTestUtils { - val sqlContext: SQLContext - - import sqlContext.{conf, sparkContext} - - protected def configuration = sparkContext.hadoopConfiguration - - /** - * Sets all SQL configurations specified in `pairs`, calls `f`, and then restore all SQL - * configurations. - * - * @todo Probably this method should be moved to a more general place - */ - protected def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = { - val (keys, values) = pairs.unzip - val currentValues = keys.map(key => Try(conf.getConf(key)).toOption) - (keys, values).zipped.foreach(conf.setConf) - try f finally { - keys.zip(currentValues).foreach { - case (key, Some(value)) => conf.setConf(key, value) - case (key, None) => conf.unsetConf(key) - } - } - } - - /** - * Generates a temporary path without creating the actual file/directory, then pass it to `f`. If - * a file/directory is created there by `f`, it will be delete after `f` returns. - * - * @todo Probably this method should be moved to a more general place - */ - protected def withTempPath(f: File => Unit): Unit = { - val path = Utils.createTempDir() - path.delete() - try f(path) finally Utils.deleteRecursively(path) - } - - /** - * Creates a temporary directory, which is then passed to `f` and will be deleted after `f` - * returns. - * - * @todo Probably this method should be moved to a more general place - */ - protected def withTempDir(f: File => Unit): Unit = { - val dir = Utils.createTempDir().getCanonicalFile - try f(dir) finally Utils.deleteRecursively(dir) - } - - /** - * Drops temporary table `tableName` after calling `f`. - */ - protected def withTempTable(tableName: String)(f: => Unit): Unit = { - try f finally sqlContext.dropTempTable(tableName) - } - - /** - * Drops table `tableName` after calling `f`. - */ - protected def withTable(tableName: String)(f: => Unit): Unit = { - try f finally sqlContext.sql(s"DROP TABLE IF EXISTS $tableName") - } -} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetTest.scala new file mode 100644 index 0000000000..516ba373f4 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetTest.scala @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.parquet + +import java.io.File + +import scala.reflect.ClassTag +import scala.reflect.runtime.universe.TypeTag + +import org.apache.spark.sql.test.SQLTestUtils +import org.apache.spark.sql.{DataFrame, SaveMode} + +/** + * A helper trait that provides convenient facilities for Parquet testing. + * + * NOTE: Considering classes `Tuple1` ... `Tuple22` all extend `Product`, it would be more + * convenient to use tuples rather than special case classes when writing test cases/suites. + * Especially, `Tuple1.apply` can be used to easily wrap a single type/value. + */ +private[sql] trait ParquetTest extends SQLTestUtils { + import sqlContext.implicits.{localSeqToDataFrameHolder, rddToDataFrameHolder} + import sqlContext.sparkContext + + /** + * Writes `data` to a Parquet file, which is then passed to `f` and will be deleted after `f` + * returns. + */ + protected def withParquetFile[T <: Product: ClassTag: TypeTag] + (data: Seq[T]) + (f: String => Unit): Unit = { + withTempPath { file => + sparkContext.parallelize(data).toDF().write.parquet(file.getCanonicalPath) + f(file.getCanonicalPath) + } + } + + /** + * Writes `data` to a Parquet file and reads it back as a [[DataFrame]], + * which is then passed to `f`. The Parquet file will be deleted after `f` returns. + */ + protected def withParquetDataFrame[T <: Product: ClassTag: TypeTag] + (data: Seq[T]) + (f: DataFrame => Unit): Unit = { + withParquetFile(data)(path => f(sqlContext.read.parquet(path))) + } + + /** + * Writes `data` to a Parquet file, reads it back as a [[DataFrame]] and registers it as a + * temporary table named `tableName`, then call `f`. The temporary table together with the + * Parquet file will be dropped/deleted after `f` returns. + */ + protected def withParquetTable[T <: Product: ClassTag: TypeTag] + (data: Seq[T], tableName: String) + (f: => Unit): Unit = { + withParquetDataFrame(data) { df => + sqlContext.registerDataFrameAsTable(df, tableName) + withTempTable(tableName)(f) + } + } + + protected def makeParquetFile[T <: Product: ClassTag: TypeTag]( + data: Seq[T], path: File): Unit = { + data.toDF().write.mode(SaveMode.Overwrite).parquet(path.getCanonicalPath) + } + + protected def makeParquetFile[T <: Product: ClassTag: TypeTag]( + df: DataFrame, path: File): Unit = { + df.write.mode(SaveMode.Overwrite).parquet(path.getCanonicalPath) + } + + protected def makePartitionDir( + basePath: File, + defaultPartitionName: String, + partitionCols: (String, Any)*): File = { + val partNames = partitionCols.map { case (k, v) => + val valueString = if (v == null || v == "") defaultPartitionName else v.toString + s"$k=$valueString" + } + + val partDir = partNames.foldLeft(basePath) { (parent, child) => + new File(parent, child) + } + + assert(partDir.mkdirs(), s"Couldn't create directory $partDir") + partDir + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala new file mode 100644 index 0000000000..ca66cdc482 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/SQLTestUtils.scala @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.test + +import java.io.File + +import scala.util.Try + +import org.apache.spark.sql.SQLContext +import org.apache.spark.util.Utils + +trait SQLTestUtils { + val sqlContext: SQLContext + + import sqlContext.{conf, sparkContext} + + protected def configuration = sparkContext.hadoopConfiguration + + /** + * Sets all SQL configurations specified in `pairs`, calls `f`, and then restore all SQL + * configurations. + * + * @todo Probably this method should be moved to a more general place + */ + protected def withSQLConf(pairs: (String, String)*)(f: => Unit): Unit = { + val (keys, values) = pairs.unzip + val currentValues = keys.map(key => Try(conf.getConf(key)).toOption) + (keys, values).zipped.foreach(conf.setConf) + try f finally { + keys.zip(currentValues).foreach { + case (key, Some(value)) => conf.setConf(key, value) + case (key, None) => conf.unsetConf(key) + } + } + } + + /** + * Generates a temporary path without creating the actual file/directory, then pass it to `f`. If + * a file/directory is created there by `f`, it will be delete after `f` returns. + * + * @todo Probably this method should be moved to a more general place + */ + protected def withTempPath(f: File => Unit): Unit = { + val path = Utils.createTempDir() + path.delete() + try f(path) finally Utils.deleteRecursively(path) + } + + /** + * Creates a temporary directory, which is then passed to `f` and will be deleted after `f` + * returns. + * + * @todo Probably this method should be moved to a more general place + */ + protected def withTempDir(f: File => Unit): Unit = { + val dir = Utils.createTempDir().getCanonicalFile + try f(dir) finally Utils.deleteRecursively(dir) + } + + /** + * Drops temporary table `tableName` after calling `f`. + */ + protected def withTempTable(tableName: String)(f: => Unit): Unit = { + try f finally sqlContext.dropTempTable(tableName) + } + + /** + * Drops table `tableName` after calling `f`. + */ + protected def withTable(tableName: String)(f: => Unit): Unit = { + try f finally sqlContext.sql(s"DROP TABLE IF EXISTS $tableName") + } +} -- cgit v1.2.3