aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorCheng Lian <lian.cs.zju@gmail.com>2014-04-14 15:22:43 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-04-14 15:22:43 -0700
commit7dbca68e92416ec5f023c8807bb06470c01a6d3a (patch)
treeb6f2ca90dc70e2b0e0d873211eb50cefc218e7c8 /sql
parent037fe4d2ba01be5610baa3dd9c5c9d3a5e5e1064 (diff)
downloadspark-7dbca68e92416ec5f023c8807bb06470c01a6d3a.tar.gz
spark-7dbca68e92416ec5f023c8807bb06470c01a6d3a.tar.bz2
spark-7dbca68e92416ec5f023c8807bb06470c01a6d3a.zip
[BUGFIX] In-memory columnar storage bug fixes
Fixed several bugs of in-memory columnar storage to make `HiveInMemoryCompatibilitySuite` pass. @rxin @marmbrus It is reasonable to include `HiveInMemoryCompatibilitySuite` in this PR, but I didn't, since it significantly increases test execution time. What do you think? **UPDATE** `HiveCompatibilitySuite` has been made to cache tables in memory. `HiveInMemoryCompatibilitySuite` was removed. Author: Cheng Lian <lian.cs.zju@gmail.com> Author: Michael Armbrust <michael@databricks.com> Closes #374 from liancheng/inMemBugFix and squashes the following commits: 6ad6d9b [Cheng Lian] Merged HiveCompatibilitySuite and HiveInMemoryCompatibilitySuite 5bdbfe7 [Cheng Lian] Revert 882c538 & 8426ddc, which introduced regression 882c538 [Cheng Lian] Remove attributes field from InMemoryColumnarTableScan 32cc9ce [Cheng Lian] Code style cleanup 99382bf [Cheng Lian] Enable compression by default 4390bcc [Cheng Lian] Report error for any Throwable in HiveComparisonTest d1df4fd [Michael Armbrust] Remove test tables that might always get created anyway? ab9e807 [Michael Armbrust] Fix the logged console version of failed test cases to use the new syntax. 1965123 [Michael Armbrust] Don't use coalesce for gathering all data to a single partition, as it does not work correctly with mutable rows. e36cdd0 [Michael Armbrust] Spelling. 2d0e168 [Michael Armbrust] Run Hive tests in-memory too. 6360723 [Cheng Lian] Made PreInsertionCasts support SparkLogicalPlan and InMemoryColumnarTableScan c9b0f6f [Cheng Lian] Let InsertIntoTable support InMemoryColumnarTableScan 9c8fc40 [Cheng Lian] Disable compression by default e619995 [Cheng Lian] Bug fix: incorrect byte order in CompressionScheme.columnHeaderSize 8426ddc [Cheng Lian] Bug fix: InMemoryColumnarTableScan should cache columns specified by the attributes argument 036cd09 [Cheng Lian] Clean up unused imports 44591a5 [Cheng Lian] Bug fix: NullableColumnAccessor.hasNext must take nulls into account 052bf41 [Cheng Lian] Bug fix: should only gather compressibility info for non-null values 95b3301 [Cheng Lian] Fixed bugs in IntegralDelta
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnAccessor.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressionScheme.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/compressionSchemes.scala20
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala5
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala (renamed from sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarQuerySuite.scala)12
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnAccessorSuite.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/IntegralDeltaSuite.scala15
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala42
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala4
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala10
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala16
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala10
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala12
17 files changed, 109 insertions, 66 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
index 16da7fd92b..91500416ee 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
@@ -99,7 +99,7 @@ class SchemaRDD(
def baseSchemaRDD = this
// =========================================================================================
- // RDD functions: Copy the interal row representation so we present immutable data to users.
+ // RDD functions: Copy the internal row representation so we present immutable data to users.
// =========================================================================================
override def compute(split: Partition, context: TaskContext): Iterator[Row] =
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnAccessor.scala
index 7d49ab07f7..b7f8826861 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnAccessor.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnAccessor.scala
@@ -54,4 +54,6 @@ private[sql] trait NullableColumnAccessor extends ColumnAccessor {
pos += 1
}
+
+ abstract override def hasNext = seenNulls < nullCount || super.hasNext
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala
index fd3b1adf96..0f808f68f2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala
@@ -65,7 +65,9 @@ private[sql] trait CompressibleColumnBuilder[T <: NativeType]
abstract override def appendFrom(row: Row, ordinal: Int) {
super.appendFrom(row, ordinal)
- gatherCompressibilityStats(row, ordinal)
+ if (!row.isNullAt(ordinal)) {
+ gatherCompressibilityStats(row, ordinal)
+ }
}
abstract override def build() = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressionScheme.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressionScheme.scala
index c605a8e443..ba1810dd2a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressionScheme.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressionScheme.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.columnar.compression
-import java.nio.ByteBuffer
+import java.nio.{ByteOrder, ByteBuffer}
import org.apache.spark.sql.catalyst.types.NativeType
import org.apache.spark.sql.columnar.{ColumnType, NativeColumnType}
@@ -84,7 +84,7 @@ private[sql] object CompressionScheme {
}
def columnHeaderSize(columnBuffer: ByteBuffer): Int = {
- val header = columnBuffer.duplicate()
+ val header = columnBuffer.duplicate().order(ByteOrder.nativeOrder)
val nullCount = header.getInt(4)
// Column type ID + null count + null positions
4 + 4 + 4 * nullCount
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/compressionSchemes.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/compressionSchemes.scala
index e92cf5ac4f..800009d319 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/compressionSchemes.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/compressionSchemes.scala
@@ -397,26 +397,27 @@ private[sql] sealed abstract class IntegralDelta[I <: IntegralType] extends Comp
if (initial) {
initial = false
- prev = value
_compressedSize += 1 + columnType.defaultSize
} else {
val (smallEnough, _) = byteSizedDelta(value, prev)
_compressedSize += (if (smallEnough) 1 else 1 + columnType.defaultSize)
}
+
+ prev = value
}
override def compress(from: ByteBuffer, to: ByteBuffer, columnType: NativeColumnType[I]) = {
to.putInt(typeId)
if (from.hasRemaining) {
- val prev = columnType.extract(from)
-
+ var prev = columnType.extract(from)
to.put(Byte.MinValue)
columnType.append(prev, to)
while (from.hasRemaining) {
val current = columnType.extract(from)
val (smallEnough, delta) = byteSizedDelta(current, prev)
+ prev = current
if (smallEnough) {
to.put(delta)
@@ -443,13 +444,8 @@ private[sql] sealed abstract class IntegralDelta[I <: IntegralType] extends Comp
override def next() = {
val delta = buffer.get()
-
- if (delta > Byte.MinValue) {
- addDelta(prev, delta)
- } else {
- prev = columnType.extract(buffer)
- prev
- }
+ prev = if (delta > Byte.MinValue) addDelta(prev, delta) else columnType.extract(buffer)
+ prev
}
override def hasNext = buffer.hasRemaining
@@ -465,7 +461,7 @@ private[sql] case object IntDelta extends IntegralDelta[IntegerType.type] {
override protected def byteSizedDelta(x: Int, y: Int): (Boolean, Byte) = {
val delta = x - y
- if (delta < Byte.MaxValue) (true, delta.toByte) else (false, 0: Byte)
+ if (math.abs(delta) <= Byte.MaxValue) (true, delta.toByte) else (false, 0: Byte)
}
}
@@ -478,6 +474,6 @@ private[sql] case object LongDelta extends IntegralDelta[LongType.type] {
override protected def byteSizedDelta(x: Long, y: Long): (Boolean, Byte) = {
val delta = x - y
- if (delta < Byte.MaxValue) (true, delta.toByte) else (false, 0: Byte)
+ if (math.abs(delta) <= Byte.MaxValue) (true, delta.toByte) else (false, 0: Byte)
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
index 450c142c0b..070557e47c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
@@ -61,7 +61,14 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una
shuffled.map(_._1)
case SinglePartition =>
- child.execute().coalesce(1, shuffle = true)
+ val rdd = child.execute().mapPartitions { iter =>
+ val mutablePair = new MutablePair[Null, Row]()
+ iter.map(r => mutablePair.update(null, r))
+ }
+ val partitioner = new HashPartitioner(1)
+ val shuffled = new ShuffledRDD[Null, Row, MutablePair[Null, Row]](rdd, partitioner)
+ shuffled.setSerializer(new SparkSqlSerializer(new SparkConf(false)))
+ shuffled.map(_._2)
case _ => sys.error(s"Exchange not implemented for $newPartitioning")
// TODO: Handle BroadcastPartitioning.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index daa423cb8e..5d89697db5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -70,8 +70,8 @@ case class SparkLogicalPlan(alreadyPlanned: SparkPlan)
SparkLogicalPlan(
alreadyPlanned match {
case ExistingRdd(output, rdd) => ExistingRdd(output.map(_.newInstance), rdd)
- case InMemoryColumnarTableScan(output, child) =>
- InMemoryColumnarTableScan(output.map(_.newInstance), child)
+ case scan @ InMemoryColumnarTableScan(output, child) =>
+ scan.copy(attributes = output.map(_.newInstance))
case _ => sys.error("Multiple instance of the same relation detected.")
}).asInstanceOf[this.type]
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index 7c6a642278..0331f90272 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -17,11 +17,10 @@
package org.apache.spark.sql
-import org.scalatest.FunSuite
import org.apache.spark.sql.TestData._
-import org.apache.spark.sql.test.TestSQLContext
-import org.apache.spark.sql.execution.SparkLogicalPlan
import org.apache.spark.sql.columnar.InMemoryColumnarTableScan
+import org.apache.spark.sql.execution.SparkLogicalPlan
+import org.apache.spark.sql.test.TestSQLContext
class CachedTableSuite extends QueryTest {
TestData // Load test tables.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala
index 2ed4cf2170..16a13b8a74 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala
@@ -18,10 +18,11 @@
package org.apache.spark.sql.columnar
import org.apache.spark.sql.{QueryTest, TestData}
+import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.execution.SparkLogicalPlan
import org.apache.spark.sql.test.TestSQLContext
-class ColumnarQuerySuite extends QueryTest {
+class InMemoryColumnarQuerySuite extends QueryTest {
import TestData._
import TestSQLContext._
@@ -32,6 +33,15 @@ class ColumnarQuerySuite extends QueryTest {
checkAnswer(scan, testData.collect().toSeq)
}
+ test("projection") {
+ val plan = TestSQLContext.executePlan(testData.select('value, 'key).logicalPlan).executedPlan
+ val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan.output, plan))
+
+ checkAnswer(scan, testData.collect().map {
+ case Row(key: Int, value: String) => value -> key
+ }.toSeq)
+ }
+
test("SPARK-1436 regression: in-memory columns must be able to be accessed multiple times") {
val plan = TestSQLContext.executePlan(testData.logicalPlan).executedPlan
val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan.output, plan))
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnAccessorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnAccessorSuite.scala
index 4a21eb6201..35ab14cbc3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnAccessorSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnAccessorSuite.scala
@@ -68,12 +68,16 @@ class NullableColumnAccessorSuite extends FunSuite {
val row = new GenericMutableRow(1)
(0 until 4).foreach { _ =>
+ assert(accessor.hasNext)
accessor.extractTo(row, 0)
assert(row(0) === randomRow(0))
+ assert(accessor.hasNext)
accessor.extractTo(row, 0)
assert(row.isNullAt(0))
}
+
+ assert(!accessor.hasNext)
}
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/IntegralDeltaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/IntegralDeltaSuite.scala
index 1390e5eef6..ce419ca726 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/IntegralDeltaSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/IntegralDeltaSuite.scala
@@ -22,6 +22,7 @@ import org.scalatest.FunSuite
import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
import org.apache.spark.sql.catalyst.types.IntegralType
import org.apache.spark.sql.columnar._
+import org.apache.spark.sql.columnar.ColumnarTestUtils._
class IntegralDeltaSuite extends FunSuite {
testIntegralDelta(new IntColumnStats, INT, IntDelta)
@@ -63,7 +64,7 @@ class IntegralDeltaSuite extends FunSuite {
} else {
val oneBoolean = columnType.defaultSize
1 + oneBoolean + deltas.map {
- d => if (math.abs(d) < Byte.MaxValue) 1 else 1 + oneBoolean
+ d => if (math.abs(d) <= Byte.MaxValue) 1 else 1 + oneBoolean
}.sum
})
@@ -78,7 +79,7 @@ class IntegralDeltaSuite extends FunSuite {
expectResult(input.head, "The first value is wrong")(columnType.extract(buffer))
(input.tail, deltas).zipped.foreach { (value, delta) =>
- if (delta < Byte.MaxValue) {
+ if (math.abs(delta) <= Byte.MaxValue) {
expectResult(delta, "Wrong delta")(buffer.get())
} else {
expectResult(Byte.MinValue, "Expecting escaping mark here")(buffer.get())
@@ -105,11 +106,17 @@ class IntegralDeltaSuite extends FunSuite {
test(s"$scheme: simple case") {
val input = columnType match {
- case INT => Seq(1: Int, 2: Int, 130: Int)
- case LONG => Seq(1: Long, 2: Long, 130: Long)
+ case INT => Seq(2: Int, 1: Int, 2: Int, 130: Int)
+ case LONG => Seq(2: Long, 1: Long, 2: Long, 130: Long)
}
skeleton(input.map(_.asInstanceOf[I#JvmType]))
}
+
+ test(s"$scheme: long random series") {
+ // Have to workaround with `Any` since no `ClassTag[I#JvmType]` available here.
+ val input = Array.fill[Any](10000)(makeRandomValue(columnType))
+ skeleton(input.map(_.asInstanceOf[I#JvmType]))
+ }
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index fc053c56c0..c36b5878cb 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -33,6 +33,8 @@ import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.catalyst.types._
+import org.apache.spark.sql.execution.SparkLogicalPlan
+import org.apache.spark.sql.columnar.InMemoryColumnarTableScan
/* Implicit conversions */
import scala.collection.JavaConversions._
@@ -115,23 +117,31 @@ class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with Logging {
case p: LogicalPlan if !p.childrenResolved => p
case p @ InsertIntoTable(table: MetastoreRelation, _, child, _) =>
- val childOutputDataTypes = child.output.map(_.dataType)
- // Only check attributes, not partitionKeys since they are always strings.
- // TODO: Fully support inserting into partitioned tables.
- val tableOutputDataTypes = table.attributes.map(_.dataType)
-
- if (childOutputDataTypes == tableOutputDataTypes) {
- p
- } else {
- // Only do the casting when child output data types differ from table output data types.
- val castedChildOutput = child.output.zip(table.output).map {
- case (input, output) if input.dataType != output.dataType =>
- Alias(Cast(input, output.dataType), input.name)()
- case (input, _) => input
- }
-
- p.copy(child = logical.Project(castedChildOutput, child))
+ castChildOutput(p, table, child)
+
+ case p @ logical.InsertIntoTable(SparkLogicalPlan(InMemoryColumnarTableScan(
+ _, HiveTableScan(_, table, _))), _, child, _) =>
+ castChildOutput(p, table, child)
+ }
+
+ def castChildOutput(p: InsertIntoTable, table: MetastoreRelation, child: LogicalPlan) = {
+ val childOutputDataTypes = child.output.map(_.dataType)
+ // Only check attributes, not partitionKeys since they are always strings.
+ // TODO: Fully support inserting into partitioned tables.
+ val tableOutputDataTypes = table.attributes.map(_.dataType)
+
+ if (childOutputDataTypes == tableOutputDataTypes) {
+ p
+ } else {
+ // Only do the casting when child output data types differ from table output data types.
+ val castedChildOutput = child.output.zip(table.output).map {
+ case (input, output) if input.dataType != output.dataType =>
+ Alias(Cast(input, output.dataType), input.name)()
+ case (input, _) => input
}
+
+ p.copy(child = logical.Project(castedChildOutput, child))
+ }
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 3ca1d93c11..ac817b21a1 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.planning._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution._
+import org.apache.spark.sql.columnar.InMemoryColumnarTableScan
trait HiveStrategies {
// Possibly being too clever with types here... or not clever enough.
@@ -42,6 +43,9 @@ trait HiveStrategies {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case logical.InsertIntoTable(table: MetastoreRelation, partition, child, overwrite) =>
InsertIntoHiveTable(table, partition, planLater(child), overwrite)(hiveContext) :: Nil
+ case logical.InsertIntoTable(SparkLogicalPlan(InMemoryColumnarTableScan(
+ _, HiveTableScan(_, table, _))), partition, child, overwrite) =>
+ InsertIntoHiveTable(table, partition, planLater(child), overwrite)(hiveContext) :: Nil
case _ => Nil
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
index 2fea970295..465e5f146f 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
@@ -160,12 +160,6 @@ class TestHiveContext(sc: SparkContext) extends LocalHiveContext(sc) {
TestTable("src1",
"CREATE TABLE src1 (key INT, value STRING)".cmd,
s"LOAD DATA LOCAL INPATH '${getHiveFile("data/files/kv3.txt")}' INTO TABLE src1".cmd),
- TestTable("dest1",
- "CREATE TABLE IF NOT EXISTS dest1 (key INT, value STRING)".cmd),
- TestTable("dest2",
- "CREATE TABLE IF NOT EXISTS dest2 (key INT, value STRING)".cmd),
- TestTable("dest3",
- "CREATE TABLE IF NOT EXISTS dest3 (key INT, value STRING)".cmd),
TestTable("srcpart", () => {
runSqlHive(
"CREATE TABLE srcpart (key INT, value STRING) PARTITIONED BY (ds STRING, hr STRING)")
@@ -257,6 +251,7 @@ class TestHiveContext(sc: SparkContext) extends LocalHiveContext(sc) {
private val loadedTables = new collection.mutable.HashSet[String]
+ var cacheTables: Boolean = false
def loadTestTable(name: String) {
if (!(loadedTables contains name)) {
// Marks the table as loaded first to prevent infite mutually recursive table loading.
@@ -265,6 +260,9 @@ class TestHiveContext(sc: SparkContext) extends LocalHiveContext(sc) {
val createCmds =
testTables.get(name).map(_.commands).getOrElse(sys.error(s"Unknown test table $name"))
createCmds.foreach(_())
+
+ if (cacheTables)
+ cacheTable(name)
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
index f9b437d435..55a4363af6 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
@@ -130,8 +130,7 @@ trait HiveFunctionFactory {
}
}
-abstract class HiveUdf
- extends Expression with Logging with HiveFunctionFactory {
+abstract class HiveUdf extends Expression with Logging with HiveFunctionFactory {
self: Product =>
type UDFType
@@ -146,7 +145,7 @@ abstract class HiveUdf
lazy val functionInfo = getFunctionInfo(name)
lazy val function = createFunction[UDFType](name)
- override def toString = s"${nodeName}#${functionInfo.getDisplayName}(${children.mkString(",")})"
+ override def toString = s"$nodeName#${functionInfo.getDisplayName}(${children.mkString(",")})"
}
case class HiveSimpleUdf(name: String, children: Seq[Expression]) extends HiveUdf {
@@ -202,10 +201,11 @@ case class HiveSimpleUdf(name: String, children: Seq[Expression]) extends HiveUd
}
}
-case class HiveGenericUdf(
- name: String,
- children: Seq[Expression]) extends HiveUdf with HiveInspectors {
+case class HiveGenericUdf(name: String, children: Seq[Expression])
+ extends HiveUdf with HiveInspectors {
+
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF._
+
type UDFType = GenericUDF
@transient
@@ -357,7 +357,7 @@ case class HiveGenericUdaf(
override def toString = s"$nodeName#$name(${children.mkString(",")})"
- def newInstance = new HiveUdafFunction(name, children, this)
+ def newInstance() = new HiveUdafFunction(name, children, this)
}
/**
@@ -435,7 +435,7 @@ case class HiveGenericUdtf(
}
}
- override def toString() = s"$nodeName#$name(${children.mkString(",")})"
+ override def toString = s"$nodeName#$name(${children.mkString(",")})"
}
case class HiveUdafFunction(
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
index 3cc4562a88..6c91f40d0f 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
@@ -218,10 +218,7 @@ abstract class HiveComparisonTest
val quotes = "\"\"\""
queryList.zipWithIndex.map {
case (query, i) =>
- s"""
- |val q$i = $quotes$query$quotes.q
- |q$i.stringResult()
- """.stripMargin
+ s"""val q$i = hql($quotes$query$quotes); q$i.collect()"""
}.mkString("\n== Console version of this test ==\n", "\n", "\n")
}
@@ -287,7 +284,6 @@ abstract class HiveComparisonTest
|Error: ${e.getMessage}
|${stackTraceToString(e)}
|$queryString
- |$consoleTestCase
""".stripMargin
stringToFile(
new File(hiveFailedDirectory, testCaseName),
@@ -304,7 +300,7 @@ abstract class HiveComparisonTest
val catalystResults = queryList.zip(hiveResults).map { case (queryString, hive) =>
val query = new TestHive.HiveQLQueryExecution(queryString)
try { (query, prepareAnswer(query, query.stringResult())) } catch {
- case e: Exception =>
+ case e: Throwable =>
val errorMessage =
s"""
|Failed to execute query using catalyst:
@@ -313,8 +309,6 @@ abstract class HiveComparisonTest
|$query
|== HIVE - ${hive.size} row(s) ==
|${hive.mkString("\n")}
- |
- |$consoleTestCase
""".stripMargin
stringToFile(new File(failedDirectory, testCaseName), errorMessage + consoleTestCase)
fail(errorMessage)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
index f76e16bc1a..c3cfa3d25a 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
@@ -17,16 +17,26 @@
package org.apache.spark.sql.hive.execution
+import org.scalatest.BeforeAndAfter
+
import org.apache.spark.sql.hive.TestHive
/**
* Runs the test cases that are included in the hive distribution.
*/
-class HiveCompatibilitySuite extends HiveQueryFileTest {
+class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
// TODO: bundle in jar files... get from classpath
lazy val hiveQueryDir = TestHive.getHiveFile("ql/src/test/queries/clientpositive")
def testCases = hiveQueryDir.listFiles.map(f => f.getName.stripSuffix(".q") -> f)
+ override def beforeAll() {
+ TestHive.cacheTables = true
+ }
+
+ override def afterAll() {
+ TestHive.cacheTables = false
+ }
+
/** A list of tests deemed out of scope currently and thus completely disregarded. */
override def blackList = Seq(
// These tests use hooks that are not on the classpath and thus break all subsequent execution.