diff options
author | Cheng Lian <lian@databricks.com> | 2014-11-02 15:14:44 -0800 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2014-11-02 15:14:44 -0800 |
commit | e4b80894bdb72c0acf8832fd48421c546fbc37e6 (patch) | |
tree | 0b61776c1eac4271a7aa3662d3e8bde6402ce3fe /sql | |
parent | 9c0eb57c737dd7d97d2cbd4516ddd2cf5d06e4b2 (diff) | |
download | spark-e4b80894bdb72c0acf8832fd48421c546fbc37e6.tar.gz spark-e4b80894bdb72c0acf8832fd48421c546fbc37e6.tar.bz2 spark-e4b80894bdb72c0acf8832fd48421c546fbc37e6.zip |
[SPARK-4182][SQL] Fixes ColumnStats classes for boolean, binary and complex data types
`NoopColumnStats` was once used for binary, boolean and complex data types. This `ColumnStats` doesn't return properly shaped column statistics and causes caching failure if a table contains columns of the aforementioned types.
This PR adds `BooleanColumnStats`, `BinaryColumnStats` and `GenericColumnStats`, used for boolean, binary and all complex data types respectively. In addition, `NoopColumnStats` returns properly shaped column statistics containing null count and row count, but this class is now used for testing purpose only.
Author: Cheng Lian <lian@databricks.com>
Closes #3059 from liancheng/spark-4182 and squashes the following commits:
b398cfd [Cheng Lian] Fixes failed test case
fb3ee85 [Cheng Lian] Fixes SPARK-4182
Diffstat (limited to 'sql')
6 files changed, 82 insertions, 19 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala index 300cef15bf..c68dceef3b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala @@ -79,8 +79,9 @@ private[sql] class BasicColumnBuilder[T <: DataType, JvmType]( } private[sql] abstract class ComplexColumnBuilder[T <: DataType, JvmType]( + columnStats: ColumnStats, columnType: ColumnType[T, JvmType]) - extends BasicColumnBuilder[T, JvmType](new NoopColumnStats, columnType) + extends BasicColumnBuilder[T, JvmType](columnStats, columnType) with NullableColumnBuilder private[sql] abstract class NativeColumnBuilder[T <: NativeType]( @@ -91,7 +92,7 @@ private[sql] abstract class NativeColumnBuilder[T <: NativeType]( with AllCompressionSchemes with CompressibleColumnBuilder[T] -private[sql] class BooleanColumnBuilder extends NativeColumnBuilder(new NoopColumnStats, BOOLEAN) +private[sql] class BooleanColumnBuilder extends NativeColumnBuilder(new BooleanColumnStats, BOOLEAN) private[sql] class IntColumnBuilder extends NativeColumnBuilder(new IntColumnStats, INT) @@ -112,10 +113,11 @@ private[sql] class DateColumnBuilder extends NativeColumnBuilder(new DateColumnS private[sql] class TimestampColumnBuilder extends NativeColumnBuilder(new TimestampColumnStats, TIMESTAMP) -private[sql] class BinaryColumnBuilder extends ComplexColumnBuilder(BINARY) +private[sql] class BinaryColumnBuilder extends ComplexColumnBuilder(new BinaryColumnStats, BINARY) // TODO (lian) Add support for array, struct and map -private[sql] class GenericColumnBuilder extends ComplexColumnBuilder(GENERIC) +private[sql] class GenericColumnBuilder + extends ComplexColumnBuilder(new GenericColumnStats, GENERIC) private[sql] object ColumnBuilder { val DEFAULT_INITIAL_BUFFER_SIZE = 1024 * 1024 diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala index b9f9f82700..668efe4a3b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala @@ -70,11 +70,30 @@ private[sql] sealed trait ColumnStats extends Serializable { def collectedStatistics: Row } +/** + * A no-op ColumnStats only used for testing purposes. + */ private[sql] class NoopColumnStats extends ColumnStats { + override def gatherStats(row: Row, ordinal: Int): Unit = super.gatherStats(row, ordinal) + + def collectedStatistics = Row(null, null, nullCount, count, 0L) +} - override def gatherStats(row: Row, ordinal: Int): Unit = {} +private[sql] class BooleanColumnStats extends ColumnStats { + protected var upper = false + protected var lower = true - override def collectedStatistics = Row() + override def gatherStats(row: Row, ordinal: Int): Unit = { + super.gatherStats(row, ordinal) + if (!row.isNullAt(ordinal)) { + val value = row.getBoolean(ordinal) + if (value > upper) upper = value + if (value < lower) lower = value + sizeInBytes += BOOLEAN.defaultSize + } + } + + def collectedStatistics = Row(lower, upper, nullCount, count, sizeInBytes) } private[sql] class ByteColumnStats extends ColumnStats { @@ -229,3 +248,25 @@ private[sql] class TimestampColumnStats extends ColumnStats { def collectedStatistics = Row(lower, upper, nullCount, count, sizeInBytes) } + +private[sql] class BinaryColumnStats extends ColumnStats { + override def gatherStats(row: Row, ordinal: Int): Unit = { + super.gatherStats(row, ordinal) + if (!row.isNullAt(ordinal)) { + sizeInBytes += BINARY.actualSize(row, ordinal) + } + } + + def collectedStatistics = Row(null, null, nullCount, count, sizeInBytes) +} + +private[sql] class GenericColumnStats extends ColumnStats { + override def gatherStats(row: Row, ordinal: Int): Unit = { + super.gatherStats(row, ordinal) + if (!row.isNullAt(ordinal)) { + sizeInBytes += GENERIC.actualSize(row, ordinal) + } + } + + def collectedStatistics = Row(null, null, nullCount, count, sizeInBytes) +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala index ee63134f56..455b415d9d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala @@ -161,6 +161,9 @@ private[sql] case class InMemoryRelation( } def cachedColumnBuffers = _cachedColumnBuffers + + override protected def otherCopyArgs: Seq[AnyRef] = + Seq(_cachedColumnBuffers, statisticsToBePropagated) } private[sql] case class InMemoryColumnarTableScan( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 6befe1b755..6bf439377a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -21,11 +21,12 @@ import java.util.TimeZone import org.scalatest.BeforeAndAfterAll -import org.apache.spark.sql.TestData._ import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +/* Implicits */ +import org.apache.spark.sql.TestData._ import org.apache.spark.sql.test.TestSQLContext._ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll { @@ -719,7 +720,7 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll { validateMetadata(sql("SELECT * FROM personWithMeta JOIN salary ON id = personId")) validateMetadata(sql("SELECT name, salary FROM personWithMeta JOIN salary ON id = personId")) } - + test("SPARK-3371 Renaming a function expression with group by gives error") { registerFunction("len", (s: String) => s.length) checkAnswer( @@ -934,7 +935,7 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll { } test("SPARK-4154 Query does not work if it has 'not between' in Spark SQL and HQL") { - checkAnswer(sql("SELECT key FROM testData WHERE key not between 0 and 10 order by key"), + checkAnswer(sql("SELECT key FROM testData WHERE key not between 0 and 10 order by key"), (11 to 100).map(i => Seq(i))) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala b/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala index 836dd17fcc..ef87a23063 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala @@ -177,4 +177,12 @@ object TestData { Salary(0, 2000.0) :: Salary(1, 1000.0) :: Nil) salary.registerTempTable("salary") + + case class ComplexData(m: Map[Int, String], s: TestData, a: Seq[Int], b: Boolean) + val complexData = + TestSQLContext.sparkContext.parallelize( + ComplexData(Map(1 -> "1"), TestData(1, "1"), Seq(1), true) + :: ComplexData(Map(2 -> "2"), TestData(2, "2"), Seq(2), false) + :: Nil).toSchemaRDD + complexData.registerTempTable("complexData") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala index 9775dd26b7..15903d07df 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala @@ -17,17 +17,18 @@ package org.apache.spark.sql.columnar +import org.apache.spark.sql.TestData._ import org.apache.spark.sql.catalyst.expressions.Row -import org.apache.spark.sql.test.TestSQLContext +import org.apache.spark.sql.test.TestSQLContext._ import org.apache.spark.sql.{QueryTest, TestData} import org.apache.spark.storage.StorageLevel.MEMORY_ONLY class InMemoryColumnarQuerySuite extends QueryTest { - import org.apache.spark.sql.TestData._ - import org.apache.spark.sql.test.TestSQLContext._ + // Make sure the tables are loaded. + TestData test("simple columnar query") { - val plan = TestSQLContext.executePlan(testData.logicalPlan).executedPlan + val plan = executePlan(testData.logicalPlan).executedPlan val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan) checkAnswer(scan, testData.collect().toSeq) @@ -42,7 +43,7 @@ class InMemoryColumnarQuerySuite extends QueryTest { } test("projection") { - val plan = TestSQLContext.executePlan(testData.select('value, 'key).logicalPlan).executedPlan + val plan = executePlan(testData.select('value, 'key).logicalPlan).executedPlan val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan) checkAnswer(scan, testData.collect().map { @@ -51,7 +52,7 @@ class InMemoryColumnarQuerySuite extends QueryTest { } test("SPARK-1436 regression: in-memory columns must be able to be accessed multiple times") { - val plan = TestSQLContext.executePlan(testData.logicalPlan).executedPlan + val plan = executePlan(testData.logicalPlan).executedPlan val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan) checkAnswer(scan, testData.collect().toSeq) @@ -63,7 +64,7 @@ class InMemoryColumnarQuerySuite extends QueryTest { sql("SELECT * FROM repeatedData"), repeatedData.collect().toSeq) - TestSQLContext.cacheTable("repeatedData") + cacheTable("repeatedData") checkAnswer( sql("SELECT * FROM repeatedData"), @@ -75,7 +76,7 @@ class InMemoryColumnarQuerySuite extends QueryTest { sql("SELECT * FROM nullableRepeatedData"), nullableRepeatedData.collect().toSeq) - TestSQLContext.cacheTable("nullableRepeatedData") + cacheTable("nullableRepeatedData") checkAnswer( sql("SELECT * FROM nullableRepeatedData"), @@ -87,7 +88,7 @@ class InMemoryColumnarQuerySuite extends QueryTest { sql("SELECT time FROM timestamps"), timestamps.collect().toSeq) - TestSQLContext.cacheTable("timestamps") + cacheTable("timestamps") checkAnswer( sql("SELECT time FROM timestamps"), @@ -99,10 +100,17 @@ class InMemoryColumnarQuerySuite extends QueryTest { sql("SELECT * FROM withEmptyParts"), withEmptyParts.collect().toSeq) - TestSQLContext.cacheTable("withEmptyParts") + cacheTable("withEmptyParts") checkAnswer( sql("SELECT * FROM withEmptyParts"), withEmptyParts.collect().toSeq) } + + test("SPARK-4182 Caching complex types") { + complexData.cache().count() + // Shouldn't throw + complexData.count() + complexData.unpersist() + } } |