aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2014-11-02 15:14:44 -0800
committerMichael Armbrust <michael@databricks.com>2014-11-02 15:14:44 -0800
commite4b80894bdb72c0acf8832fd48421c546fbc37e6 (patch)
tree0b61776c1eac4271a7aa3662d3e8bde6402ce3fe /sql
parent9c0eb57c737dd7d97d2cbd4516ddd2cf5d06e4b2 (diff)
downloadspark-e4b80894bdb72c0acf8832fd48421c546fbc37e6.tar.gz
spark-e4b80894bdb72c0acf8832fd48421c546fbc37e6.tar.bz2
spark-e4b80894bdb72c0acf8832fd48421c546fbc37e6.zip
[SPARK-4182][SQL] Fixes ColumnStats classes for boolean, binary and complex data types
`NoopColumnStats` was once used for binary, boolean and complex data types. This `ColumnStats` doesn't return properly shaped column statistics and causes caching failure if a table contains columns of the aforementioned types. This PR adds `BooleanColumnStats`, `BinaryColumnStats` and `GenericColumnStats`, used for boolean, binary and all complex data types respectively. In addition, `NoopColumnStats` returns properly shaped column statistics containing null count and row count, but this class is now used for testing purpose only. Author: Cheng Lian <lian@databricks.com> Closes #3059 from liancheng/spark-4182 and squashes the following commits: b398cfd [Cheng Lian] Fixes failed test case fb3ee85 [Cheng Lian] Fixes SPARK-4182
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala10
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala45
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala3
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala7
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/TestData.scala8
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala28
6 files changed, 82 insertions, 19 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala
index 300cef15bf..c68dceef3b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala
@@ -79,8 +79,9 @@ private[sql] class BasicColumnBuilder[T <: DataType, JvmType](
}
private[sql] abstract class ComplexColumnBuilder[T <: DataType, JvmType](
+ columnStats: ColumnStats,
columnType: ColumnType[T, JvmType])
- extends BasicColumnBuilder[T, JvmType](new NoopColumnStats, columnType)
+ extends BasicColumnBuilder[T, JvmType](columnStats, columnType)
with NullableColumnBuilder
private[sql] abstract class NativeColumnBuilder[T <: NativeType](
@@ -91,7 +92,7 @@ private[sql] abstract class NativeColumnBuilder[T <: NativeType](
with AllCompressionSchemes
with CompressibleColumnBuilder[T]
-private[sql] class BooleanColumnBuilder extends NativeColumnBuilder(new NoopColumnStats, BOOLEAN)
+private[sql] class BooleanColumnBuilder extends NativeColumnBuilder(new BooleanColumnStats, BOOLEAN)
private[sql] class IntColumnBuilder extends NativeColumnBuilder(new IntColumnStats, INT)
@@ -112,10 +113,11 @@ private[sql] class DateColumnBuilder extends NativeColumnBuilder(new DateColumnS
private[sql] class TimestampColumnBuilder
extends NativeColumnBuilder(new TimestampColumnStats, TIMESTAMP)
-private[sql] class BinaryColumnBuilder extends ComplexColumnBuilder(BINARY)
+private[sql] class BinaryColumnBuilder extends ComplexColumnBuilder(new BinaryColumnStats, BINARY)
// TODO (lian) Add support for array, struct and map
-private[sql] class GenericColumnBuilder extends ComplexColumnBuilder(GENERIC)
+private[sql] class GenericColumnBuilder
+ extends ComplexColumnBuilder(new GenericColumnStats, GENERIC)
private[sql] object ColumnBuilder {
val DEFAULT_INITIAL_BUFFER_SIZE = 1024 * 1024
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala
index b9f9f82700..668efe4a3b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala
@@ -70,11 +70,30 @@ private[sql] sealed trait ColumnStats extends Serializable {
def collectedStatistics: Row
}
+/**
+ * A no-op ColumnStats only used for testing purposes.
+ */
private[sql] class NoopColumnStats extends ColumnStats {
+ override def gatherStats(row: Row, ordinal: Int): Unit = super.gatherStats(row, ordinal)
+
+ def collectedStatistics = Row(null, null, nullCount, count, 0L)
+}
- override def gatherStats(row: Row, ordinal: Int): Unit = {}
+private[sql] class BooleanColumnStats extends ColumnStats {
+ protected var upper = false
+ protected var lower = true
- override def collectedStatistics = Row()
+ override def gatherStats(row: Row, ordinal: Int): Unit = {
+ super.gatherStats(row, ordinal)
+ if (!row.isNullAt(ordinal)) {
+ val value = row.getBoolean(ordinal)
+ if (value > upper) upper = value
+ if (value < lower) lower = value
+ sizeInBytes += BOOLEAN.defaultSize
+ }
+ }
+
+ def collectedStatistics = Row(lower, upper, nullCount, count, sizeInBytes)
}
private[sql] class ByteColumnStats extends ColumnStats {
@@ -229,3 +248,25 @@ private[sql] class TimestampColumnStats extends ColumnStats {
def collectedStatistics = Row(lower, upper, nullCount, count, sizeInBytes)
}
+
+private[sql] class BinaryColumnStats extends ColumnStats {
+ override def gatherStats(row: Row, ordinal: Int): Unit = {
+ super.gatherStats(row, ordinal)
+ if (!row.isNullAt(ordinal)) {
+ sizeInBytes += BINARY.actualSize(row, ordinal)
+ }
+ }
+
+ def collectedStatistics = Row(null, null, nullCount, count, sizeInBytes)
+}
+
+private[sql] class GenericColumnStats extends ColumnStats {
+ override def gatherStats(row: Row, ordinal: Int): Unit = {
+ super.gatherStats(row, ordinal)
+ if (!row.isNullAt(ordinal)) {
+ sizeInBytes += GENERIC.actualSize(row, ordinal)
+ }
+ }
+
+ def collectedStatistics = Row(null, null, nullCount, count, sizeInBytes)
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
index ee63134f56..455b415d9d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
@@ -161,6 +161,9 @@ private[sql] case class InMemoryRelation(
}
def cachedColumnBuffers = _cachedColumnBuffers
+
+ override protected def otherCopyArgs: Seq[AnyRef] =
+ Seq(_cachedColumnBuffers, statisticsToBePropagated)
}
private[sql] case class InMemoryColumnarTableScan(
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 6befe1b755..6bf439377a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -21,11 +21,12 @@ import java.util.TimeZone
import org.scalatest.BeforeAndAfterAll
-import org.apache.spark.sql.TestData._
import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+/* Implicits */
+import org.apache.spark.sql.TestData._
import org.apache.spark.sql.test.TestSQLContext._
class SQLQuerySuite extends QueryTest with BeforeAndAfterAll {
@@ -719,7 +720,7 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll {
validateMetadata(sql("SELECT * FROM personWithMeta JOIN salary ON id = personId"))
validateMetadata(sql("SELECT name, salary FROM personWithMeta JOIN salary ON id = personId"))
}
-
+
test("SPARK-3371 Renaming a function expression with group by gives error") {
registerFunction("len", (s: String) => s.length)
checkAnswer(
@@ -934,7 +935,7 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll {
}
test("SPARK-4154 Query does not work if it has 'not between' in Spark SQL and HQL") {
- checkAnswer(sql("SELECT key FROM testData WHERE key not between 0 and 10 order by key"),
+ checkAnswer(sql("SELECT key FROM testData WHERE key not between 0 and 10 order by key"),
(11 to 100).map(i => Seq(i)))
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala b/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala
index 836dd17fcc..ef87a23063 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala
@@ -177,4 +177,12 @@ object TestData {
Salary(0, 2000.0) ::
Salary(1, 1000.0) :: Nil)
salary.registerTempTable("salary")
+
+ case class ComplexData(m: Map[Int, String], s: TestData, a: Seq[Int], b: Boolean)
+ val complexData =
+ TestSQLContext.sparkContext.parallelize(
+ ComplexData(Map(1 -> "1"), TestData(1, "1"), Seq(1), true)
+ :: ComplexData(Map(2 -> "2"), TestData(2, "2"), Seq(2), false)
+ :: Nil).toSchemaRDD
+ complexData.registerTempTable("complexData")
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala
index 9775dd26b7..15903d07df 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala
@@ -17,17 +17,18 @@
package org.apache.spark.sql.columnar
+import org.apache.spark.sql.TestData._
import org.apache.spark.sql.catalyst.expressions.Row
-import org.apache.spark.sql.test.TestSQLContext
+import org.apache.spark.sql.test.TestSQLContext._
import org.apache.spark.sql.{QueryTest, TestData}
import org.apache.spark.storage.StorageLevel.MEMORY_ONLY
class InMemoryColumnarQuerySuite extends QueryTest {
- import org.apache.spark.sql.TestData._
- import org.apache.spark.sql.test.TestSQLContext._
+ // Make sure the tables are loaded.
+ TestData
test("simple columnar query") {
- val plan = TestSQLContext.executePlan(testData.logicalPlan).executedPlan
+ val plan = executePlan(testData.logicalPlan).executedPlan
val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan)
checkAnswer(scan, testData.collect().toSeq)
@@ -42,7 +43,7 @@ class InMemoryColumnarQuerySuite extends QueryTest {
}
test("projection") {
- val plan = TestSQLContext.executePlan(testData.select('value, 'key).logicalPlan).executedPlan
+ val plan = executePlan(testData.select('value, 'key).logicalPlan).executedPlan
val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan)
checkAnswer(scan, testData.collect().map {
@@ -51,7 +52,7 @@ class InMemoryColumnarQuerySuite extends QueryTest {
}
test("SPARK-1436 regression: in-memory columns must be able to be accessed multiple times") {
- val plan = TestSQLContext.executePlan(testData.logicalPlan).executedPlan
+ val plan = executePlan(testData.logicalPlan).executedPlan
val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan)
checkAnswer(scan, testData.collect().toSeq)
@@ -63,7 +64,7 @@ class InMemoryColumnarQuerySuite extends QueryTest {
sql("SELECT * FROM repeatedData"),
repeatedData.collect().toSeq)
- TestSQLContext.cacheTable("repeatedData")
+ cacheTable("repeatedData")
checkAnswer(
sql("SELECT * FROM repeatedData"),
@@ -75,7 +76,7 @@ class InMemoryColumnarQuerySuite extends QueryTest {
sql("SELECT * FROM nullableRepeatedData"),
nullableRepeatedData.collect().toSeq)
- TestSQLContext.cacheTable("nullableRepeatedData")
+ cacheTable("nullableRepeatedData")
checkAnswer(
sql("SELECT * FROM nullableRepeatedData"),
@@ -87,7 +88,7 @@ class InMemoryColumnarQuerySuite extends QueryTest {
sql("SELECT time FROM timestamps"),
timestamps.collect().toSeq)
- TestSQLContext.cacheTable("timestamps")
+ cacheTable("timestamps")
checkAnswer(
sql("SELECT time FROM timestamps"),
@@ -99,10 +100,17 @@ class InMemoryColumnarQuerySuite extends QueryTest {
sql("SELECT * FROM withEmptyParts"),
withEmptyParts.collect().toSeq)
- TestSQLContext.cacheTable("withEmptyParts")
+ cacheTable("withEmptyParts")
checkAnswer(
sql("SELECT * FROM withEmptyParts"),
withEmptyParts.collect().toSeq)
}
+
+ test("SPARK-4182 Caching complex types") {
+ complexData.cache().count()
+ // Shouldn't throw
+ complexData.count()
+ complexData.unpersist()
+ }
}