From 7dbca68e92416ec5f023c8807bb06470c01a6d3a Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Mon, 14 Apr 2014 15:22:43 -0700 Subject: [BUGFIX] In-memory columnar storage bug fixes Fixed several bugs of in-memory columnar storage to make `HiveInMemoryCompatibilitySuite` pass. @rxin @marmbrus It is reasonable to include `HiveInMemoryCompatibilitySuite` in this PR, but I didn't, since it significantly increases test execution time. What do you think? **UPDATE** `HiveCompatibilitySuite` has been made to cache tables in memory. `HiveInMemoryCompatibilitySuite` was removed. Author: Cheng Lian Author: Michael Armbrust Closes #374 from liancheng/inMemBugFix and squashes the following commits: 6ad6d9b [Cheng Lian] Merged HiveCompatibilitySuite and HiveInMemoryCompatibilitySuite 5bdbfe7 [Cheng Lian] Revert 882c538 & 8426ddc, which introduced regression 882c538 [Cheng Lian] Remove attributes field from InMemoryColumnarTableScan 32cc9ce [Cheng Lian] Code style cleanup 99382bf [Cheng Lian] Enable compression by default 4390bcc [Cheng Lian] Report error for any Throwable in HiveComparisonTest d1df4fd [Michael Armbrust] Remove test tables that might always get created anyway? ab9e807 [Michael Armbrust] Fix the logged console version of failed test cases to use the new syntax. 1965123 [Michael Armbrust] Don't use coalesce for gathering all data to a single partition, as it does not work correctly with mutable rows. e36cdd0 [Michael Armbrust] Spelling. 2d0e168 [Michael Armbrust] Run Hive tests in-memory too. 6360723 [Cheng Lian] Made PreInsertionCasts support SparkLogicalPlan and InMemoryColumnarTableScan c9b0f6f [Cheng Lian] Let InsertIntoTable support InMemoryColumnarTableScan 9c8fc40 [Cheng Lian] Disable compression by default e619995 [Cheng Lian] Bug fix: incorrect byte order in CompressionScheme.columnHeaderSize 8426ddc [Cheng Lian] Bug fix: InMemoryColumnarTableScan should cache columns specified by the attributes argument 036cd09 [Cheng Lian] Clean up unused imports 44591a5 [Cheng Lian] Bug fix: NullableColumnAccessor.hasNext must take nulls into account 052bf41 [Cheng Lian] Bug fix: should only gather compressibility info for non-null values 95b3301 [Cheng Lian] Fixed bugs in IntegralDelta --- .../scala/org/apache/spark/sql/SchemaRDD.scala | 2 +- .../sql/columnar/NullableColumnAccessor.scala | 2 + .../compression/CompressibleColumnBuilder.scala | 4 +- .../columnar/compression/CompressionScheme.scala | 4 +- .../columnar/compression/compressionSchemes.scala | 20 ++++----- .../org/apache/spark/sql/execution/Exchange.scala | 9 +++- .../org/apache/spark/sql/execution/SparkPlan.scala | 4 +- .../org/apache/spark/sql/CachedTableSuite.scala | 5 +-- .../spark/sql/columnar/ColumnarQuerySuite.scala | 42 ----------------- .../sql/columnar/InMemoryColumnarQuerySuite.scala | 52 ++++++++++++++++++++++ .../sql/columnar/NullableColumnAccessorSuite.scala | 4 ++ .../columnar/compression/IntegralDeltaSuite.scala | 15 +++++-- .../spark/sql/hive/HiveMetastoreCatalog.scala | 42 ++++++++++------- .../org/apache/spark/sql/hive/HiveStrategies.scala | 4 ++ .../scala/org/apache/spark/sql/hive/TestHive.scala | 10 ++--- .../scala/org/apache/spark/sql/hive/hiveUdfs.scala | 16 +++---- .../sql/hive/execution/HiveComparisonTest.scala | 10 +---- .../hive/execution/HiveCompatibilitySuite.scala | 12 ++++- 18 files changed, 150 insertions(+), 107 deletions(-) delete mode 100644 sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarQuerySuite.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala index 16da7fd92b..91500416ee 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala @@ -99,7 +99,7 @@ class SchemaRDD( def baseSchemaRDD = this // ========================================================================================= - // RDD functions: Copy the interal row representation so we present immutable data to users. + // RDD functions: Copy the internal row representation so we present immutable data to users. // ========================================================================================= override def compute(split: Partition, context: TaskContext): Iterator[Row] = diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnAccessor.scala index 7d49ab07f7..b7f8826861 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnAccessor.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnAccessor.scala @@ -54,4 +54,6 @@ private[sql] trait NullableColumnAccessor extends ColumnAccessor { pos += 1 } + + abstract override def hasNext = seenNulls < nullCount || super.hasNext } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala index fd3b1adf96..0f808f68f2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala @@ -65,7 +65,9 @@ private[sql] trait CompressibleColumnBuilder[T <: NativeType] abstract override def appendFrom(row: Row, ordinal: Int) { super.appendFrom(row, ordinal) - gatherCompressibilityStats(row, ordinal) + if (!row.isNullAt(ordinal)) { + gatherCompressibilityStats(row, ordinal) + } } abstract override def build() = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressionScheme.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressionScheme.scala index c605a8e443..ba1810dd2a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressionScheme.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressionScheme.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.columnar.compression -import java.nio.ByteBuffer +import java.nio.{ByteOrder, ByteBuffer} import org.apache.spark.sql.catalyst.types.NativeType import org.apache.spark.sql.columnar.{ColumnType, NativeColumnType} @@ -84,7 +84,7 @@ private[sql] object CompressionScheme { } def columnHeaderSize(columnBuffer: ByteBuffer): Int = { - val header = columnBuffer.duplicate() + val header = columnBuffer.duplicate().order(ByteOrder.nativeOrder) val nullCount = header.getInt(4) // Column type ID + null count + null positions 4 + 4 + 4 * nullCount diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/compressionSchemes.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/compressionSchemes.scala index e92cf5ac4f..800009d319 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/compressionSchemes.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/compressionSchemes.scala @@ -397,26 +397,27 @@ private[sql] sealed abstract class IntegralDelta[I <: IntegralType] extends Comp if (initial) { initial = false - prev = value _compressedSize += 1 + columnType.defaultSize } else { val (smallEnough, _) = byteSizedDelta(value, prev) _compressedSize += (if (smallEnough) 1 else 1 + columnType.defaultSize) } + + prev = value } override def compress(from: ByteBuffer, to: ByteBuffer, columnType: NativeColumnType[I]) = { to.putInt(typeId) if (from.hasRemaining) { - val prev = columnType.extract(from) - + var prev = columnType.extract(from) to.put(Byte.MinValue) columnType.append(prev, to) while (from.hasRemaining) { val current = columnType.extract(from) val (smallEnough, delta) = byteSizedDelta(current, prev) + prev = current if (smallEnough) { to.put(delta) @@ -443,13 +444,8 @@ private[sql] sealed abstract class IntegralDelta[I <: IntegralType] extends Comp override def next() = { val delta = buffer.get() - - if (delta > Byte.MinValue) { - addDelta(prev, delta) - } else { - prev = columnType.extract(buffer) - prev - } + prev = if (delta > Byte.MinValue) addDelta(prev, delta) else columnType.extract(buffer) + prev } override def hasNext = buffer.hasRemaining @@ -465,7 +461,7 @@ private[sql] case object IntDelta extends IntegralDelta[IntegerType.type] { override protected def byteSizedDelta(x: Int, y: Int): (Boolean, Byte) = { val delta = x - y - if (delta < Byte.MaxValue) (true, delta.toByte) else (false, 0: Byte) + if (math.abs(delta) <= Byte.MaxValue) (true, delta.toByte) else (false, 0: Byte) } } @@ -478,6 +474,6 @@ private[sql] case object LongDelta extends IntegralDelta[LongType.type] { override protected def byteSizedDelta(x: Long, y: Long): (Boolean, Byte) = { val delta = x - y - if (delta < Byte.MaxValue) (true, delta.toByte) else (false, 0: Byte) + if (math.abs(delta) <= Byte.MaxValue) (true, delta.toByte) else (false, 0: Byte) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala index 450c142c0b..070557e47c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala @@ -61,7 +61,14 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una shuffled.map(_._1) case SinglePartition => - child.execute().coalesce(1, shuffle = true) + val rdd = child.execute().mapPartitions { iter => + val mutablePair = new MutablePair[Null, Row]() + iter.map(r => mutablePair.update(null, r)) + } + val partitioner = new HashPartitioner(1) + val shuffled = new ShuffledRDD[Null, Row, MutablePair[Null, Row]](rdd, partitioner) + shuffled.setSerializer(new SparkSqlSerializer(new SparkConf(false))) + shuffled.map(_._2) case _ => sys.error(s"Exchange not implemented for $newPartitioning") // TODO: Handle BroadcastPartitioning. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index daa423cb8e..5d89697db5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -70,8 +70,8 @@ case class SparkLogicalPlan(alreadyPlanned: SparkPlan) SparkLogicalPlan( alreadyPlanned match { case ExistingRdd(output, rdd) => ExistingRdd(output.map(_.newInstance), rdd) - case InMemoryColumnarTableScan(output, child) => - InMemoryColumnarTableScan(output.map(_.newInstance), child) + case scan @ InMemoryColumnarTableScan(output, child) => + scan.copy(attributes = output.map(_.newInstance)) case _ => sys.error("Multiple instance of the same relation detected.") }).asInstanceOf[this.type] } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index 7c6a642278..0331f90272 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -17,11 +17,10 @@ package org.apache.spark.sql -import org.scalatest.FunSuite import org.apache.spark.sql.TestData._ -import org.apache.spark.sql.test.TestSQLContext -import org.apache.spark.sql.execution.SparkLogicalPlan import org.apache.spark.sql.columnar.InMemoryColumnarTableScan +import org.apache.spark.sql.execution.SparkLogicalPlan +import org.apache.spark.sql.test.TestSQLContext class CachedTableSuite extends QueryTest { TestData // Load test tables. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarQuerySuite.scala deleted file mode 100644 index 2ed4cf2170..0000000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarQuerySuite.scala +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.columnar - -import org.apache.spark.sql.{QueryTest, TestData} -import org.apache.spark.sql.execution.SparkLogicalPlan -import org.apache.spark.sql.test.TestSQLContext - -class ColumnarQuerySuite extends QueryTest { - import TestData._ - import TestSQLContext._ - - test("simple columnar query") { - val plan = TestSQLContext.executePlan(testData.logicalPlan).executedPlan - val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan.output, plan)) - - checkAnswer(scan, testData.collect().toSeq) - } - - test("SPARK-1436 regression: in-memory columns must be able to be accessed multiple times") { - val plan = TestSQLContext.executePlan(testData.logicalPlan).executedPlan - val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan.output, plan)) - - checkAnswer(scan, testData.collect().toSeq) - checkAnswer(scan, testData.collect().toSeq) - } -} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala new file mode 100644 index 0000000000..16a13b8a74 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.columnar + +import org.apache.spark.sql.{QueryTest, TestData} +import org.apache.spark.sql.catalyst.expressions.Row +import org.apache.spark.sql.execution.SparkLogicalPlan +import org.apache.spark.sql.test.TestSQLContext + +class InMemoryColumnarQuerySuite extends QueryTest { + import TestData._ + import TestSQLContext._ + + test("simple columnar query") { + val plan = TestSQLContext.executePlan(testData.logicalPlan).executedPlan + val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan.output, plan)) + + checkAnswer(scan, testData.collect().toSeq) + } + + test("projection") { + val plan = TestSQLContext.executePlan(testData.select('value, 'key).logicalPlan).executedPlan + val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan.output, plan)) + + checkAnswer(scan, testData.collect().map { + case Row(key: Int, value: String) => value -> key + }.toSeq) + } + + test("SPARK-1436 regression: in-memory columns must be able to be accessed multiple times") { + val plan = TestSQLContext.executePlan(testData.logicalPlan).executedPlan + val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan.output, plan)) + + checkAnswer(scan, testData.collect().toSeq) + checkAnswer(scan, testData.collect().toSeq) + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnAccessorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnAccessorSuite.scala index 4a21eb6201..35ab14cbc3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnAccessorSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnAccessorSuite.scala @@ -68,12 +68,16 @@ class NullableColumnAccessorSuite extends FunSuite { val row = new GenericMutableRow(1) (0 until 4).foreach { _ => + assert(accessor.hasNext) accessor.extractTo(row, 0) assert(row(0) === randomRow(0)) + assert(accessor.hasNext) accessor.extractTo(row, 0) assert(row.isNullAt(0)) } + + assert(!accessor.hasNext) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/IntegralDeltaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/IntegralDeltaSuite.scala index 1390e5eef6..ce419ca726 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/IntegralDeltaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/IntegralDeltaSuite.scala @@ -22,6 +22,7 @@ import org.scalatest.FunSuite import org.apache.spark.sql.catalyst.expressions.GenericMutableRow import org.apache.spark.sql.catalyst.types.IntegralType import org.apache.spark.sql.columnar._ +import org.apache.spark.sql.columnar.ColumnarTestUtils._ class IntegralDeltaSuite extends FunSuite { testIntegralDelta(new IntColumnStats, INT, IntDelta) @@ -63,7 +64,7 @@ class IntegralDeltaSuite extends FunSuite { } else { val oneBoolean = columnType.defaultSize 1 + oneBoolean + deltas.map { - d => if (math.abs(d) < Byte.MaxValue) 1 else 1 + oneBoolean + d => if (math.abs(d) <= Byte.MaxValue) 1 else 1 + oneBoolean }.sum }) @@ -78,7 +79,7 @@ class IntegralDeltaSuite extends FunSuite { expectResult(input.head, "The first value is wrong")(columnType.extract(buffer)) (input.tail, deltas).zipped.foreach { (value, delta) => - if (delta < Byte.MaxValue) { + if (math.abs(delta) <= Byte.MaxValue) { expectResult(delta, "Wrong delta")(buffer.get()) } else { expectResult(Byte.MinValue, "Expecting escaping mark here")(buffer.get()) @@ -105,11 +106,17 @@ class IntegralDeltaSuite extends FunSuite { test(s"$scheme: simple case") { val input = columnType match { - case INT => Seq(1: Int, 2: Int, 130: Int) - case LONG => Seq(1: Long, 2: Long, 130: Long) + case INT => Seq(2: Int, 1: Int, 2: Int, 130: Int) + case LONG => Seq(2: Long, 1: Long, 2: Long, 130: Long) } skeleton(input.map(_.asInstanceOf[I#JvmType])) } + + test(s"$scheme: long random series") { + // Have to workaround with `Any` since no `ClassTag[I#JvmType]` available here. + val input = Array.fill[Any](10000)(makeRandomValue(columnType)) + skeleton(input.map(_.asInstanceOf[I#JvmType])) + } } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index fc053c56c0..c36b5878cb 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -33,6 +33,8 @@ import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.catalyst.types._ +import org.apache.spark.sql.execution.SparkLogicalPlan +import org.apache.spark.sql.columnar.InMemoryColumnarTableScan /* Implicit conversions */ import scala.collection.JavaConversions._ @@ -115,23 +117,31 @@ class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with Logging { case p: LogicalPlan if !p.childrenResolved => p case p @ InsertIntoTable(table: MetastoreRelation, _, child, _) => - val childOutputDataTypes = child.output.map(_.dataType) - // Only check attributes, not partitionKeys since they are always strings. - // TODO: Fully support inserting into partitioned tables. - val tableOutputDataTypes = table.attributes.map(_.dataType) - - if (childOutputDataTypes == tableOutputDataTypes) { - p - } else { - // Only do the casting when child output data types differ from table output data types. - val castedChildOutput = child.output.zip(table.output).map { - case (input, output) if input.dataType != output.dataType => - Alias(Cast(input, output.dataType), input.name)() - case (input, _) => input - } - - p.copy(child = logical.Project(castedChildOutput, child)) + castChildOutput(p, table, child) + + case p @ logical.InsertIntoTable(SparkLogicalPlan(InMemoryColumnarTableScan( + _, HiveTableScan(_, table, _))), _, child, _) => + castChildOutput(p, table, child) + } + + def castChildOutput(p: InsertIntoTable, table: MetastoreRelation, child: LogicalPlan) = { + val childOutputDataTypes = child.output.map(_.dataType) + // Only check attributes, not partitionKeys since they are always strings. + // TODO: Fully support inserting into partitioned tables. + val tableOutputDataTypes = table.attributes.map(_.dataType) + + if (childOutputDataTypes == tableOutputDataTypes) { + p + } else { + // Only do the casting when child output data types differ from table output data types. + val castedChildOutput = child.output.zip(table.output).map { + case (input, output) if input.dataType != output.dataType => + Alias(Cast(input, output.dataType), input.name)() + case (input, _) => input } + + p.copy(child = logical.Project(castedChildOutput, child)) + } } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 3ca1d93c11..ac817b21a1 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.planning._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution._ +import org.apache.spark.sql.columnar.InMemoryColumnarTableScan trait HiveStrategies { // Possibly being too clever with types here... or not clever enough. @@ -42,6 +43,9 @@ trait HiveStrategies { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case logical.InsertIntoTable(table: MetastoreRelation, partition, child, overwrite) => InsertIntoHiveTable(table, partition, planLater(child), overwrite)(hiveContext) :: Nil + case logical.InsertIntoTable(SparkLogicalPlan(InMemoryColumnarTableScan( + _, HiveTableScan(_, table, _))), partition, child, overwrite) => + InsertIntoHiveTable(table, partition, planLater(child), overwrite)(hiveContext) :: Nil case _ => Nil } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala index 2fea970295..465e5f146f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala @@ -160,12 +160,6 @@ class TestHiveContext(sc: SparkContext) extends LocalHiveContext(sc) { TestTable("src1", "CREATE TABLE src1 (key INT, value STRING)".cmd, s"LOAD DATA LOCAL INPATH '${getHiveFile("data/files/kv3.txt")}' INTO TABLE src1".cmd), - TestTable("dest1", - "CREATE TABLE IF NOT EXISTS dest1 (key INT, value STRING)".cmd), - TestTable("dest2", - "CREATE TABLE IF NOT EXISTS dest2 (key INT, value STRING)".cmd), - TestTable("dest3", - "CREATE TABLE IF NOT EXISTS dest3 (key INT, value STRING)".cmd), TestTable("srcpart", () => { runSqlHive( "CREATE TABLE srcpart (key INT, value STRING) PARTITIONED BY (ds STRING, hr STRING)") @@ -257,6 +251,7 @@ class TestHiveContext(sc: SparkContext) extends LocalHiveContext(sc) { private val loadedTables = new collection.mutable.HashSet[String] + var cacheTables: Boolean = false def loadTestTable(name: String) { if (!(loadedTables contains name)) { // Marks the table as loaded first to prevent infite mutually recursive table loading. @@ -265,6 +260,9 @@ class TestHiveContext(sc: SparkContext) extends LocalHiveContext(sc) { val createCmds = testTables.get(name).map(_.commands).getOrElse(sys.error(s"Unknown test table $name")) createCmds.foreach(_()) + + if (cacheTables) + cacheTable(name) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala index f9b437d435..55a4363af6 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala @@ -130,8 +130,7 @@ trait HiveFunctionFactory { } } -abstract class HiveUdf - extends Expression with Logging with HiveFunctionFactory { +abstract class HiveUdf extends Expression with Logging with HiveFunctionFactory { self: Product => type UDFType @@ -146,7 +145,7 @@ abstract class HiveUdf lazy val functionInfo = getFunctionInfo(name) lazy val function = createFunction[UDFType](name) - override def toString = s"${nodeName}#${functionInfo.getDisplayName}(${children.mkString(",")})" + override def toString = s"$nodeName#${functionInfo.getDisplayName}(${children.mkString(",")})" } case class HiveSimpleUdf(name: String, children: Seq[Expression]) extends HiveUdf { @@ -202,10 +201,11 @@ case class HiveSimpleUdf(name: String, children: Seq[Expression]) extends HiveUd } } -case class HiveGenericUdf( - name: String, - children: Seq[Expression]) extends HiveUdf with HiveInspectors { +case class HiveGenericUdf(name: String, children: Seq[Expression]) + extends HiveUdf with HiveInspectors { + import org.apache.hadoop.hive.ql.udf.generic.GenericUDF._ + type UDFType = GenericUDF @transient @@ -357,7 +357,7 @@ case class HiveGenericUdaf( override def toString = s"$nodeName#$name(${children.mkString(",")})" - def newInstance = new HiveUdafFunction(name, children, this) + def newInstance() = new HiveUdafFunction(name, children, this) } /** @@ -435,7 +435,7 @@ case class HiveGenericUdtf( } } - override def toString() = s"$nodeName#$name(${children.mkString(",")})" + override def toString = s"$nodeName#$name(${children.mkString(",")})" } case class HiveUdafFunction( diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala index 3cc4562a88..6c91f40d0f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala @@ -218,10 +218,7 @@ abstract class HiveComparisonTest val quotes = "\"\"\"" queryList.zipWithIndex.map { case (query, i) => - s""" - |val q$i = $quotes$query$quotes.q - |q$i.stringResult() - """.stripMargin + s"""val q$i = hql($quotes$query$quotes); q$i.collect()""" }.mkString("\n== Console version of this test ==\n", "\n", "\n") } @@ -287,7 +284,6 @@ abstract class HiveComparisonTest |Error: ${e.getMessage} |${stackTraceToString(e)} |$queryString - |$consoleTestCase """.stripMargin stringToFile( new File(hiveFailedDirectory, testCaseName), @@ -304,7 +300,7 @@ abstract class HiveComparisonTest val catalystResults = queryList.zip(hiveResults).map { case (queryString, hive) => val query = new TestHive.HiveQLQueryExecution(queryString) try { (query, prepareAnswer(query, query.stringResult())) } catch { - case e: Exception => + case e: Throwable => val errorMessage = s""" |Failed to execute query using catalyst: @@ -313,8 +309,6 @@ abstract class HiveComparisonTest |$query |== HIVE - ${hive.size} row(s) == |${hive.mkString("\n")} - | - |$consoleTestCase """.stripMargin stringToFile(new File(failedDirectory, testCaseName), errorMessage + consoleTestCase) fail(errorMessage) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala index f76e16bc1a..c3cfa3d25a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala @@ -17,16 +17,26 @@ package org.apache.spark.sql.hive.execution +import org.scalatest.BeforeAndAfter + import org.apache.spark.sql.hive.TestHive /** * Runs the test cases that are included in the hive distribution. */ -class HiveCompatibilitySuite extends HiveQueryFileTest { +class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { // TODO: bundle in jar files... get from classpath lazy val hiveQueryDir = TestHive.getHiveFile("ql/src/test/queries/clientpositive") def testCases = hiveQueryDir.listFiles.map(f => f.getName.stripSuffix(".q") -> f) + override def beforeAll() { + TestHive.cacheTables = true + } + + override def afterAll() { + TestHive.cacheTables = false + } + /** A list of tests deemed out of scope currently and thus completely disregarded. */ override def blackList = Seq( // These tests use hooks that are not on the classpath and thus break all subsequent execution. -- cgit v1.2.3