aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/test
diff options
context:
space:
mode:
Diffstat (limited to 'sql/core/src/test')
-rw-r--r--sql/core/src/test/resources/log4j.properties52
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala201
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/PlannerSuite.scala62
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala75
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala211
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/TestData.scala72
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/TgfSuite.scala71
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala126
8 files changed, 870 insertions, 0 deletions
diff --git a/sql/core/src/test/resources/log4j.properties b/sql/core/src/test/resources/log4j.properties
new file mode 100644
index 0000000000..7bb6789bd3
--- /dev/null
+++ b/sql/core/src/test/resources/log4j.properties
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Set everything to be logged to the file core/target/unit-tests.log
+log4j.rootLogger=DEBUG, CA, FA
+
+#Console Appender
+log4j.appender.CA=org.apache.log4j.ConsoleAppender
+log4j.appender.CA.layout=org.apache.log4j.PatternLayout
+log4j.appender.CA.layout.ConversionPattern=%d{HH:mm:ss.SSS} %p %c: %m%n
+log4j.appender.CA.Threshold = WARN
+
+
+#File Appender
+log4j.appender.FA=org.apache.log4j.FileAppender
+log4j.appender.FA.append=false
+log4j.appender.FA.file=target/unit-tests.log
+log4j.appender.FA.layout=org.apache.log4j.PatternLayout
+log4j.appender.FA.layout.ConversionPattern=%d{HH:mm:ss.SSS} %p %c{1}: %m%n
+
+# Set the logger level of File Appender to WARN
+log4j.appender.FA.Threshold = INFO
+
+# Some packages are noisy for no good reason.
+log4j.additivity.org.apache.hadoop.hive.serde2.lazy.LazyStruct=false
+log4j.logger.org.apache.hadoop.hive.serde2.lazy.LazyStruct=OFF
+
+log4j.additivity.org.apache.hadoop.hive.metastore.RetryingHMSHandler=false
+log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=OFF
+
+log4j.additivity.hive.ql.metadata.Hive=false
+log4j.logger.hive.ql.metadata.Hive=OFF
+
+# Parquet logging
+parquet.hadoop.InternalParquetRecordReader=WARN
+log4j.logger.parquet.hadoop.InternalParquetRecordReader=WARN
+parquet.hadoop.ParquetInputFormat=WARN
+log4j.logger.parquet.hadoop.ParquetInputFormat=WARN
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala
new file mode 100644
index 0000000000..37c90a18a0
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DslQuerySuite.scala
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.scalatest.{BeforeAndAfterAll, FunSuite}
+
+import org.apache.spark.sql.catalyst.analysis._
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.types._
+import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.test._
+
+/* Implicits */
+import TestSQLContext._
+
+class DslQuerySuite extends QueryTest {
+ import TestData._
+
+ test("table scan") {
+ checkAnswer(
+ testData,
+ testData.collect().toSeq)
+ }
+
+ test("agg") {
+ checkAnswer(
+ testData2.groupBy('a)('a, Sum('b)),
+ Seq((1,3),(2,3),(3,3))
+ )
+ }
+
+ test("select *") {
+ checkAnswer(
+ testData.select(Star(None)),
+ testData.collect().toSeq)
+ }
+
+ test("simple select") {
+ checkAnswer(
+ testData.where('key === 1).select('value),
+ Seq(Seq("1")))
+ }
+
+ test("sorting") {
+ checkAnswer(
+ testData2.orderBy('a.asc, 'b.asc),
+ Seq((1,1), (1,2), (2,1), (2,2), (3,1), (3,2)))
+
+ checkAnswer(
+ testData2.orderBy('a.asc, 'b.desc),
+ Seq((1,2), (1,1), (2,2), (2,1), (3,2), (3,1)))
+
+ checkAnswer(
+ testData2.orderBy('a.desc, 'b.desc),
+ Seq((3,2), (3,1), (2,2), (2,1), (1,2), (1,1)))
+
+ checkAnswer(
+ testData2.orderBy('a.desc, 'b.asc),
+ Seq((3,1), (3,2), (2,1), (2,2), (1,1), (1,2)))
+ }
+
+ test("average") {
+ checkAnswer(
+ testData2.groupBy()(Average('a)),
+ 2.0)
+ }
+
+ test("count") {
+ checkAnswer(
+ testData2.groupBy()(Count(1)),
+ testData2.count()
+ )
+ }
+
+ test("null count") {
+ checkAnswer(
+ testData3.groupBy('a)('a, Count('b)),
+ Seq((1,0), (2, 1))
+ )
+
+ checkAnswer(
+ testData3.groupBy()(Count('a), Count('b), Count(1), CountDistinct('a :: Nil), CountDistinct('b :: Nil)),
+ (2, 1, 2, 2, 1) :: Nil
+ )
+ }
+
+ test("inner join where, one match per row") {
+ checkAnswer(
+ upperCaseData.join(lowerCaseData, Inner).where('n === 'N),
+ Seq(
+ (1, "A", 1, "a"),
+ (2, "B", 2, "b"),
+ (3, "C", 3, "c"),
+ (4, "D", 4, "d")
+ ))
+ }
+
+ test("inner join ON, one match per row") {
+ checkAnswer(
+ upperCaseData.join(lowerCaseData, Inner, Some('n === 'N)),
+ Seq(
+ (1, "A", 1, "a"),
+ (2, "B", 2, "b"),
+ (3, "C", 3, "c"),
+ (4, "D", 4, "d")
+ ))
+ }
+
+ test("inner join, where, multiple matches") {
+ val x = testData2.where('a === 1).subquery('x)
+ val y = testData2.where('a === 1).subquery('y)
+ checkAnswer(
+ x.join(y).where("x.a".attr === "y.a".attr),
+ (1,1,1,1) ::
+ (1,1,1,2) ::
+ (1,2,1,1) ::
+ (1,2,1,2) :: Nil
+ )
+ }
+
+ test("inner join, no matches") {
+ val x = testData2.where('a === 1).subquery('x)
+ val y = testData2.where('a === 2).subquery('y)
+ checkAnswer(
+ x.join(y).where("x.a".attr === "y.a".attr),
+ Nil)
+ }
+
+ test("big inner join, 4 matches per row") {
+ val bigData = testData.unionAll(testData).unionAll(testData).unionAll(testData)
+ val bigDataX = bigData.subquery('x)
+ val bigDataY = bigData.subquery('y)
+
+ checkAnswer(
+ bigDataX.join(bigDataY).where("x.key".attr === "y.key".attr),
+ testData.flatMap(
+ row => Seq.fill(16)((row ++ row).toSeq)).collect().toSeq)
+ }
+
+ test("cartisian product join") {
+ checkAnswer(
+ testData3.join(testData3),
+ (1, null, 1, null) ::
+ (1, null, 2, 2) ::
+ (2, 2, 1, null) ::
+ (2, 2, 2, 2) :: Nil)
+ }
+
+ test("left outer join") {
+ checkAnswer(
+ upperCaseData.join(lowerCaseData, LeftOuter, Some('n === 'N)),
+ (1, "A", 1, "a") ::
+ (2, "B", 2, "b") ::
+ (3, "C", 3, "c") ::
+ (4, "D", 4, "d") ::
+ (5, "E", null, null) ::
+ (6, "F", null, null) :: Nil)
+ }
+
+ test("right outer join") {
+ checkAnswer(
+ lowerCaseData.join(upperCaseData, RightOuter, Some('n === 'N)),
+ (1, "a", 1, "A") ::
+ (2, "b", 2, "B") ::
+ (3, "c", 3, "C") ::
+ (4, "d", 4, "D") ::
+ (null, null, 5, "E") ::
+ (null, null, 6, "F") :: Nil)
+ }
+
+ test("full outer join") {
+ val left = upperCaseData.where('N <= 4).subquery('left)
+ val right = upperCaseData.where('N >= 3).subquery('right)
+
+ checkAnswer(
+ left.join(right, FullOuter, Some("left.N".attr === "right.N".attr)),
+ (1, "A", null, null) ::
+ (2, "B", null, null) ::
+ (3, "C", 3, "C") ::
+ (4, "D", 4, "D") ::
+ (null, null, 5, "E") ::
+ (null, null, 6, "F") :: Nil)
+ }
+} \ No newline at end of file
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/PlannerSuite.scala
new file mode 100644
index 0000000000..83908edf5a
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/PlannerSuite.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package execution
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.TestData._
+import org.apache.spark.sql.test.TestSQLContext._
+import org.apache.spark.sql.test.TestSQLContext.planner._
+
+class PlannerSuite extends FunSuite {
+
+
+ test("unions are collapsed") {
+ val query = testData.unionAll(testData).unionAll(testData).logicalPlan
+ val planned = BasicOperators(query).head
+ val logicalUnions = query collect { case u: logical.Union => u}
+ val physicalUnions = planned collect { case u: execution.Union => u}
+
+ assert(logicalUnions.size === 2)
+ assert(physicalUnions.size === 1)
+ }
+
+ test("count is partially aggregated") {
+ val query = testData.groupBy('value)(Count('key)).analyze.logicalPlan
+ val planned = PartialAggregation(query).head
+ val aggregations = planned.collect { case a: Aggregate => a }
+
+ assert(aggregations.size === 2)
+ }
+
+ test("count distinct is not partially aggregated") {
+ val query = testData.groupBy('value)(CountDistinct('key :: Nil)).analyze.logicalPlan
+ val planned = PartialAggregation(query.logicalPlan)
+ assert(planned.isEmpty)
+ }
+
+ test("mixed aggregates are not partially aggregated") {
+ val query =
+ testData.groupBy('value)(Count('value), CountDistinct('key :: Nil)).analyze.logicalPlan
+ val planned = PartialAggregation(query)
+ assert(planned.isEmpty)
+ }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
new file mode 100644
index 0000000000..728feceded
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.scalatest.{BeforeAndAfterAll, FunSuite}
+
+import org.apache.spark.sql.catalyst.analysis._
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.types._
+import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.test._
+
+/* Implicits */
+import TestSQLContext._
+
+class QueryTest extends FunSuite {
+ /**
+ * Runs the plan and makes sure the answer matches the expected result.
+ * @param plan the query to be executed
+ * @param expectedAnswer the expected result, can either be an Any, Seq[Product], or Seq[ Seq[Any] ].
+ */
+ protected def checkAnswer(rdd: SchemaRDD, expectedAnswer: Any): Unit = {
+ val convertedAnswer = expectedAnswer match {
+ case s: Seq[_] if s.isEmpty => s
+ case s: Seq[_] if s.head.isInstanceOf[Product] &&
+ !s.head.isInstanceOf[Seq[_]] => s.map(_.asInstanceOf[Product].productIterator.toIndexedSeq)
+ case s: Seq[_] => s
+ case singleItem => Seq(Seq(singleItem))
+ }
+
+ val isSorted = rdd.logicalPlan.collect { case s: logical.Sort => s}.nonEmpty
+ def prepareAnswer(answer: Seq[Any]) = if (!isSorted) answer.sortBy(_.toString) else answer
+ val sparkAnswer = try rdd.collect().toSeq catch {
+ case e: Exception =>
+ fail(
+ s"""
+ |Exception thrown while executing query:
+ |${rdd.logicalPlan}
+ |== Exception ==
+ |$e
+ """.stripMargin)
+ }
+ if(prepareAnswer(convertedAnswer) != prepareAnswer(sparkAnswer)) {
+ fail(s"""
+ |Results do not match for query:
+ |${rdd.logicalPlan}
+ |== Analyzed Plan ==
+ |${rdd.queryExecution.analyzed}
+ |== RDD ==
+ |$rdd
+ |== Results ==
+ |${sideBySide(
+ prepareAnswer(convertedAnswer).map(_.toString),
+ prepareAnswer(sparkAnswer).map(_.toString)).mkString("\n")}
+ """.stripMargin)
+ }
+ }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
new file mode 100644
index 0000000000..5728313d6d
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.scalatest.{BeforeAndAfterAll, FunSuite}
+
+import org.apache.spark.sql.catalyst.analysis._
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.types._
+import org.apache.spark.sql.test._
+
+/* Implicits */
+import TestSQLContext._
+import TestData._
+
+class SQLQuerySuite extends QueryTest {
+ test("agg") {
+ checkAnswer(
+ sql("SELECT a, SUM(b) FROM testData2 GROUP BY a"),
+ Seq((1,3),(2,3),(3,3))
+ )
+ }
+
+ test("select *") {
+ checkAnswer(
+ sql("SELECT * FROM testData"),
+ testData.collect().toSeq)
+ }
+
+ test("simple select") {
+ checkAnswer(
+ sql("SELECT value FROM testData WHERE key = 1"),
+ Seq(Seq("1")))
+ }
+
+ test("sorting") {
+ checkAnswer(
+ sql("SELECT * FROM testData2 ORDER BY a ASC, b ASC"),
+ Seq((1,1), (1,2), (2,1), (2,2), (3,1), (3,2)))
+
+ checkAnswer(
+ sql("SELECT * FROM testData2 ORDER BY a ASC, b DESC"),
+ Seq((1,2), (1,1), (2,2), (2,1), (3,2), (3,1)))
+
+ checkAnswer(
+ sql("SELECT * FROM testData2 ORDER BY a DESC, b DESC"),
+ Seq((3,2), (3,1), (2,2), (2,1), (1,2), (1,1)))
+
+ checkAnswer(
+ sql("SELECT * FROM testData2 ORDER BY a DESC, b ASC"),
+ Seq((3,1), (3,2), (2,1), (2,2), (1,1), (1,2)))
+ }
+
+ test("average") {
+ checkAnswer(
+ sql("SELECT AVG(a) FROM testData2"),
+ 2.0)
+ }
+
+ test("count") {
+ checkAnswer(
+ sql("SELECT COUNT(*) FROM testData2"),
+ testData2.count()
+ )
+ }
+
+ // No support for primitive nulls yet.
+ ignore("null count") {
+ checkAnswer(
+ sql("SELECT a, COUNT(b) FROM testData3"),
+ Seq((1,0), (2, 1))
+ )
+
+ checkAnswer(
+ testData3.groupBy()(Count('a), Count('b), Count(1), CountDistinct('a :: Nil), CountDistinct('b :: Nil)),
+ (2, 1, 2, 2, 1) :: Nil
+ )
+ }
+
+ test("inner join where, one match per row") {
+ checkAnswer(
+ sql("SELECT * FROM upperCaseData JOIN lowerCaseData WHERE n = N"),
+ Seq(
+ (1, "A", 1, "a"),
+ (2, "B", 2, "b"),
+ (3, "C", 3, "c"),
+ (4, "D", 4, "d")
+ ))
+ }
+
+ test("inner join ON, one match per row") {
+ checkAnswer(
+ sql("SELECT * FROM upperCaseData JOIN lowerCaseData ON n = N"),
+ Seq(
+ (1, "A", 1, "a"),
+ (2, "B", 2, "b"),
+ (3, "C", 3, "c"),
+ (4, "D", 4, "d")
+ ))
+ }
+
+ test("inner join, where, multiple matches") {
+ checkAnswer(
+ sql("""
+ |SELECT * FROM
+ | (SELECT * FROM testData2 WHERE a = 1) x JOIN
+ | (SELECT * FROM testData2 WHERE a = 1) y
+ |WHERE x.a = y.a""".stripMargin),
+ (1,1,1,1) ::
+ (1,1,1,2) ::
+ (1,2,1,1) ::
+ (1,2,1,2) :: Nil
+ )
+ }
+
+ test("inner join, no matches") {
+ checkAnswer(
+ sql(
+ """
+ |SELECT * FROM
+ | (SELECT * FROM testData2 WHERE a = 1) x JOIN
+ | (SELECT * FROM testData2 WHERE a = 2) y
+ |WHERE x.a = y.a""".stripMargin),
+ Nil)
+ }
+
+ test("big inner join, 4 matches per row") {
+
+
+ checkAnswer(
+ sql(
+ """
+ |SELECT * FROM
+ | (SELECT * FROM testData UNION ALL
+ | SELECT * FROM testData UNION ALL
+ | SELECT * FROM testData UNION ALL
+ | SELECT * FROM testData) x JOIN
+ | (SELECT * FROM testData UNION ALL
+ | SELECT * FROM testData UNION ALL
+ | SELECT * FROM testData UNION ALL
+ | SELECT * FROM testData) y
+ |WHERE x.key = y.key""".stripMargin),
+ testData.flatMap(
+ row => Seq.fill(16)((row ++ row).toSeq)).collect().toSeq)
+ }
+
+ ignore("cartisian product join") {
+ checkAnswer(
+ testData3.join(testData3),
+ (1, null, 1, null) ::
+ (1, null, 2, 2) ::
+ (2, 2, 1, null) ::
+ (2, 2, 2, 2) :: Nil)
+ }
+
+ test("left outer join") {
+ checkAnswer(
+ sql("SELECT * FROM upperCaseData LEFT OUTER JOIN lowerCaseData ON n = N"),
+ (1, "A", 1, "a") ::
+ (2, "B", 2, "b") ::
+ (3, "C", 3, "c") ::
+ (4, "D", 4, "d") ::
+ (5, "E", null, null) ::
+ (6, "F", null, null) :: Nil)
+ }
+
+ test("right outer join") {
+ checkAnswer(
+ sql("SELECT * FROM lowerCaseData RIGHT OUTER JOIN upperCaseData ON n = N"),
+ (1, "a", 1, "A") ::
+ (2, "b", 2, "B") ::
+ (3, "c", 3, "C") ::
+ (4, "d", 4, "D") ::
+ (null, null, 5, "E") ::
+ (null, null, 6, "F") :: Nil)
+ }
+
+ test("full outer join") {
+ checkAnswer(
+ sql(
+ """
+ |SELECT * FROM
+ | (SELECT * FROM upperCaseData WHERE N <= 4) left FULL OUTER JOIN
+ | (SELECT * FROM upperCaseData WHERE N >= 3) right
+ | ON left.N = right.N
+ """.stripMargin),
+ (1, "A", null, null) ::
+ (2, "B", null, null) ::
+ (3, "C", 3, "C") ::
+ (4, "D", 4, "D") ::
+ (null, null, 5, "E") ::
+ (null, null, 6, "F") :: Nil)
+ }
+} \ No newline at end of file
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala b/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala
new file mode 100644
index 0000000000..640292571b
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.test._
+
+/* Implicits */
+import TestSQLContext._
+
+object TestData {
+ case class TestData(key: Int, value: String)
+ val testData: SchemaRDD = TestSQLContext.sparkContext.parallelize(
+ (1 to 100).map(i => TestData(i, i.toString)))
+ testData.registerAsTable("testData")
+
+ case class TestData2(a: Int, b: Int)
+ val testData2: SchemaRDD =
+ TestSQLContext.sparkContext.parallelize(
+ TestData2(1, 1) ::
+ TestData2(1, 2) ::
+ TestData2(2, 1) ::
+ TestData2(2, 2) ::
+ TestData2(3, 1) ::
+ TestData2(3, 2) :: Nil
+ )
+ testData2.registerAsTable("testData2")
+
+ // TODO: There is no way to express null primitives as case classes currently...
+ val testData3 =
+ logical.LocalRelation('a.int, 'b.int).loadData(
+ (1, null) ::
+ (2, 2) :: Nil
+ )
+
+ case class UpperCaseData(N: Int, L: String)
+ val upperCaseData =
+ TestSQLContext.sparkContext.parallelize(
+ UpperCaseData(1, "A") ::
+ UpperCaseData(2, "B") ::
+ UpperCaseData(3, "C") ::
+ UpperCaseData(4, "D") ::
+ UpperCaseData(5, "E") ::
+ UpperCaseData(6, "F") :: Nil
+ )
+ upperCaseData.registerAsTable("upperCaseData")
+
+ case class LowerCaseData(n: Int, l: String)
+ val lowerCaseData =
+ TestSQLContext.sparkContext.parallelize(
+ LowerCaseData(1, "a") ::
+ LowerCaseData(2, "b") ::
+ LowerCaseData(3, "c") ::
+ LowerCaseData(4, "d") :: Nil
+ )
+ lowerCaseData.registerAsTable("lowerCaseData")
+} \ No newline at end of file
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/TgfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/TgfSuite.scala
new file mode 100644
index 0000000000..08265b7a6a
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/TgfSuite.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package execution
+
+import org.scalatest.{BeforeAndAfterAll, FunSuite}
+
+import org.apache.spark.sql.catalyst.analysis._
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.types._
+import org.apache.spark.sql.test._
+
+
+import TestSQLContext._
+
+/**
+ * This is an example TGF that uses UnresolvedAttributes 'name and 'age to access specific columns
+ * from the input data. These will be replaced during analysis with specific AttributeReferences
+ * and then bound to specific ordinals during query planning. While TGFs could also access specific
+ * columns using hand-coded ordinals, doing so violates data independence.
+ *
+ * Note: this is only a rough example of how TGFs can be expressed, the final version will likely
+ * involve a lot more sugar for cleaner use in Scala/Java/etc.
+ */
+case class ExampleTGF(input: Seq[Attribute] = Seq('name, 'age)) extends Generator {
+ def children = input
+ protected def makeOutput() = 'nameAndAge.string :: Nil
+
+ val Seq(nameAttr, ageAttr) = input
+
+ override def apply(input: Row): TraversableOnce[Row] = {
+ val name = nameAttr.apply(input)
+ val age = ageAttr.apply(input).asInstanceOf[Int]
+
+ Iterator(
+ new GenericRow(Array[Any](s"$name is $age years old")),
+ new GenericRow(Array[Any](s"Next year, $name will be ${age + 1} years old")))
+ }
+}
+
+class TgfSuite extends QueryTest {
+ val inputData =
+ logical.LocalRelation('name.string, 'age.int).loadData(
+ ("michael", 29) :: Nil
+ )
+
+ test("simple tgf example") {
+ checkAnswer(
+ inputData.generate(ExampleTGF()),
+ Seq(
+ "michael is 29 years old" :: Nil,
+ "Next year, michael will be 30 years old" :: Nil))
+ }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
new file mode 100644
index 0000000000..8b2ccb52d8
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.parquet
+
+import org.scalatest.{BeforeAndAfterAll, FunSuite}
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.expressions.Row
+import org.apache.spark.sql.catalyst.util.getTempFilePath
+import org.apache.spark.sql.test.TestSQLContext
+
+import org.apache.hadoop.mapreduce.Job
+import org.apache.hadoop.fs.{Path, FileSystem}
+
+import parquet.schema.MessageTypeParser
+import parquet.hadoop.ParquetFileWriter
+import parquet.hadoop.util.ContextUtil
+
+class ParquetQuerySuite extends FunSuite with BeforeAndAfterAll {
+ override def beforeAll() {
+ ParquetTestData.writeFile
+ }
+
+ override def afterAll() {
+ ParquetTestData.testFile.delete()
+ }
+
+ test("Import of simple Parquet file") {
+ val result = getRDD(ParquetTestData.testData).collect()
+ assert(result.size === 15)
+ result.zipWithIndex.foreach {
+ case (row, index) => {
+ val checkBoolean =
+ if (index % 3 == 0)
+ row(0) == true
+ else
+ row(0) == false
+ assert(checkBoolean === true, s"boolean field value in line $index did not match")
+ if (index % 5 == 0) assert(row(1) === 5, s"int field value in line $index did not match")
+ assert(row(2) === "abc", s"string field value in line $index did not match")
+ assert(row(3) === (index.toLong << 33), s"long value in line $index did not match")
+ assert(row(4) === 2.5F, s"float field value in line $index did not match")
+ assert(row(5) === 4.5D, s"double field value in line $index did not match")
+ }
+ }
+ }
+
+ test("Projection of simple Parquet file") {
+ val scanner = new ParquetTableScan(
+ ParquetTestData.testData.output,
+ ParquetTestData.testData,
+ None)(TestSQLContext.sparkContext)
+ val projected = scanner.pruneColumns(ParquetTypesConverter
+ .convertToAttributes(MessageTypeParser
+ .parseMessageType(ParquetTestData.subTestSchema)))
+ assert(projected.output.size === 2)
+ val result = projected
+ .execute()
+ .map(_.copy())
+ .collect()
+ result.zipWithIndex.foreach {
+ case (row, index) => {
+ if (index % 3 == 0)
+ assert(row(0) === true, s"boolean field value in line $index did not match (every third row)")
+ else
+ assert(row(0) === false, s"boolean field value in line $index did not match")
+ assert(row(1) === (index.toLong << 33), s"long field value in line $index did not match")
+ assert(row.size === 2, s"number of columns in projection in line $index is incorrect")
+ }
+ }
+ }
+
+ test("Writing metadata from scratch for table CREATE") {
+ val job = new Job()
+ val path = new Path(getTempFilePath("testtable").getCanonicalFile.toURI.toString)
+ val fs: FileSystem = FileSystem.getLocal(ContextUtil.getConfiguration(job))
+ ParquetTypesConverter.writeMetaData(
+ ParquetTestData.testData.output,
+ path,
+ TestSQLContext.sparkContext.hadoopConfiguration)
+ assert(fs.exists(new Path(path, ParquetFileWriter.PARQUET_METADATA_FILE)))
+ val metaData = ParquetTypesConverter.readMetaData(path)
+ assert(metaData != null)
+ ParquetTestData
+ .testData
+ .parquetSchema
+ .checkContains(metaData.getFileMetaData.getSchema) // throws exception if incompatible
+ metaData
+ .getFileMetaData
+ .getSchema
+ .checkContains(ParquetTestData.testData.parquetSchema) // throws exception if incompatible
+ fs.delete(path, true)
+ }
+
+ /**
+ * Computes the given [[ParquetRelation]] and returns its RDD.
+ *
+ * @param parquetRelation The Parquet relation.
+ * @return An RDD of Rows.
+ */
+ private def getRDD(parquetRelation: ParquetRelation): RDD[Row] = {
+ val scanner = new ParquetTableScan(
+ parquetRelation.output,
+ parquetRelation,
+ None)(TestSQLContext.sparkContext)
+ scanner
+ .execute
+ .map(_.copy())
+ }
+}
+