aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCheng Lian <lian.cs.zju@gmail.com>2014-05-05 19:38:59 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-05-05 19:38:59 -0700
commit6d721c5f7131f7c9fe56c524133d70cb37f1222d (patch)
treeeed48d5bbe4b4c5a1a066713a95361870f52dd9a
parent98750a74daf7e2b873da85d2d5067f47e3bbdc4e (diff)
downloadspark-6d721c5f7131f7c9fe56c524133d70cb37f1222d.tar.gz
spark-6d721c5f7131f7c9fe56c524133d70cb37f1222d.tar.bz2
spark-6d721c5f7131f7c9fe56c524133d70cb37f1222d.zip
[SPARK-1678][SPARK-1679] In-memory compression bug fix and made compression configurable, disabled by default
In-memory compression is now configurable in `SparkConf` by the `spark.sql.inMemoryCompression.enabled` property, and is disabled by default. To help code review, the bug fix is in [the first commit](https://github.com/liancheng/spark/commit/d537a367edf0bf24d0b925cc58b21d805ccbc11f), compression configuration is in [the second one](https://github.com/liancheng/spark/commit/4ce09aa8aa820bbbbbaa0f3f084a6cff1d4e6195). Author: Cheng Lian <lian.cs.zju@gmail.com> Closes #608 from liancheng/spark-1678 and squashes the following commits: 66c3a8d [Cheng Lian] Renamed in-memory compression configuration key f8fb3a0 [Cheng Lian] Added assertion for testing .hasNext of various decoder 4ce09aa [Cheng Lian] Made in-memory compression configurable via SparkConf d537a36 [Cheng Lian] Fixed SPARK-1678
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala17
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnBuilder.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnAccessor.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala12
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/compressionSchemes.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/TestData.scala11
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala30
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/BooleanBitSetSuite.scala7
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/DictionaryEncodingSuite.scala7
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/IntegralDeltaSuite.scala7
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/RunLengthEncodingSuite.scala7
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/TestCompressibleColumnBuilder.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala2
17 files changed, 105 insertions, 26 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index e25201a6c1..bfebfa0c28 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -162,8 +162,11 @@ class SQLContext(@transient val sparkContext: SparkContext)
/** Caches the specified table in-memory. */
def cacheTable(tableName: String): Unit = {
val currentTable = catalog.lookupRelation(None, tableName)
+ val useCompression =
+ sparkContext.conf.getBoolean("spark.sql.inMemoryColumnarStorage.compressed", false)
val asInMemoryRelation =
- InMemoryColumnarTableScan(currentTable.output, executePlan(currentTable).executedPlan)
+ InMemoryColumnarTableScan(
+ currentTable.output, executePlan(currentTable).executedPlan, useCompression)
catalog.registerTable(None, tableName, SparkLogicalPlan(asInMemoryRelation))
}
@@ -173,7 +176,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
EliminateAnalysisOperators(catalog.lookupRelation(None, tableName)) match {
// This is kind of a hack to make sure that if this was just an RDD registered as a table,
// we reregister the RDD as a table.
- case SparkLogicalPlan(inMem @ InMemoryColumnarTableScan(_, e: ExistingRdd)) =>
+ case SparkLogicalPlan(inMem @ InMemoryColumnarTableScan(_, e: ExistingRdd, _)) =>
inMem.cachedColumnBuffers.unpersist()
catalog.unregisterTable(None, tableName)
catalog.registerTable(None, tableName, SparkLogicalPlan(e))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala
index 048ee66bff..4be048cd74 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala
@@ -28,7 +28,7 @@ private[sql] trait ColumnBuilder {
/**
* Initializes with an approximate lower bound on the expected number of elements in this column.
*/
- def initialize(initialSize: Int, columnName: String = "")
+ def initialize(initialSize: Int, columnName: String = "", useCompression: Boolean = false)
/**
* Appends `row(ordinal)` to the column builder.
@@ -55,7 +55,11 @@ private[sql] class BasicColumnBuilder[T <: DataType, JvmType](
protected var buffer: ByteBuffer = _
- override def initialize(initialSize: Int, columnName: String = "") = {
+ override def initialize(
+ initialSize: Int,
+ columnName: String = "",
+ useCompression: Boolean = false) = {
+
val size = if (initialSize == 0) DEFAULT_INITIAL_BUFFER_SIZE else initialSize
this.columnName = columnName
@@ -130,7 +134,12 @@ private[sql] object ColumnBuilder {
}
}
- def apply(typeId: Int, initialSize: Int = 0, columnName: String = ""): ColumnBuilder = {
+ def apply(
+ typeId: Int,
+ initialSize: Int = 0,
+ columnName: String = "",
+ useCompression: Boolean = false): ColumnBuilder = {
+
val builder = (typeId match {
case INT.typeId => new IntColumnBuilder
case LONG.typeId => new LongColumnBuilder
@@ -144,7 +153,7 @@ private[sql] object ColumnBuilder {
case GENERIC.typeId => new GenericColumnBuilder
}).asInstanceOf[ColumnBuilder]
- builder.initialize(initialSize, columnName)
+ builder.initialize(initialSize, columnName, useCompression)
builder
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
index 8a24733047..fdf28e1bb1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
@@ -20,8 +20,12 @@ package org.apache.spark.sql.columnar
import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, Attribute}
import org.apache.spark.sql.execution.{SparkPlan, LeafNode}
import org.apache.spark.sql.Row
+import org.apache.spark.SparkConf
-private[sql] case class InMemoryColumnarTableScan(attributes: Seq[Attribute], child: SparkPlan)
+private[sql] case class InMemoryColumnarTableScan(
+ attributes: Seq[Attribute],
+ child: SparkPlan,
+ useCompression: Boolean)
extends LeafNode {
override def output: Seq[Attribute] = attributes
@@ -30,7 +34,7 @@ private[sql] case class InMemoryColumnarTableScan(attributes: Seq[Attribute], ch
val output = child.output
val cached = child.execute().mapPartitions { iterator =>
val columnBuilders = output.map { attribute =>
- ColumnBuilder(ColumnType(attribute.dataType).typeId, 0, attribute.name)
+ ColumnBuilder(ColumnType(attribute.dataType).typeId, 0, attribute.name, useCompression)
}.toArray
var row: Row = null
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnBuilder.scala
index 2a3b6fc1e4..d008806eed 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnBuilder.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnBuilder.scala
@@ -40,12 +40,12 @@ private[sql] trait NullableColumnBuilder extends ColumnBuilder {
private var pos: Int = _
private var nullCount: Int = _
- abstract override def initialize(initialSize: Int, columnName: String) {
+ abstract override def initialize(initialSize: Int, columnName: String, useCompression: Boolean) {
nulls = ByteBuffer.allocate(1024)
nulls.order(ByteOrder.nativeOrder())
pos = 0
nullCount = 0
- super.initialize(initialSize, columnName)
+ super.initialize(initialSize, columnName, useCompression)
}
abstract override def appendFrom(row: Row, ordinal: Int) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnAccessor.scala
index 878cb84de1..b4120a3d43 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnAccessor.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnAccessor.scala
@@ -32,5 +32,7 @@ private[sql] trait CompressibleColumnAccessor[T <: NativeType] extends ColumnAcc
decoder = CompressionScheme(underlyingBuffer.getInt()).decoder(buffer, columnType)
}
- abstract override def extractSingle(buffer: ByteBuffer): T#JvmType = decoder.next()
+ abstract override def hasNext = super.hasNext || decoder.hasNext
+
+ override def extractSingle(buffer: ByteBuffer): T#JvmType = decoder.next()
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala
index 0f808f68f2..4c6675c3c8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala
@@ -47,7 +47,17 @@ private[sql] trait CompressibleColumnBuilder[T <: NativeType]
import CompressionScheme._
- val compressionEncoders = schemes.filter(_.supports(columnType)).map(_.encoder[T])
+ var compressionEncoders: Seq[Encoder[T]] = _
+
+ abstract override def initialize(initialSize: Int, columnName: String, useCompression: Boolean) {
+ compressionEncoders =
+ if (useCompression) {
+ schemes.filter(_.supports(columnType)).map(_.encoder[T])
+ } else {
+ Seq(PassThrough.encoder)
+ }
+ super.initialize(initialSize, columnName, useCompression)
+ }
protected def isWorthCompressing(encoder: Encoder[T]) = {
encoder.compressionRatio < 0.8
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/compressionSchemes.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/compressionSchemes.scala
index 800009d319..8cf9ec74ca 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/compressionSchemes.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/compressionSchemes.scala
@@ -157,7 +157,7 @@ private[sql] case object RunLengthEncoding extends CompressionScheme {
currentValue
}
- override def hasNext = buffer.hasRemaining
+ override def hasNext = valueCount < run || buffer.hasRemaining
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index 50124dd407..235a9b1692 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -77,7 +77,7 @@ case class SparkLogicalPlan(alreadyPlanned: SparkPlan)
SparkLogicalPlan(
alreadyPlanned match {
case ExistingRdd(output, rdd) => ExistingRdd(output.map(_.newInstance), rdd)
- case scan @ InMemoryColumnarTableScan(output, child) =>
+ case scan @ InMemoryColumnarTableScan(output, _, _) =>
scan.copy(attributes = output.map(_.newInstance))
case _ => sys.error("Multiple instance of the same relation detected.")
}).asInstanceOf[this.type]
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala b/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala
index 002b7f0ada..b5973c0f51 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala
@@ -73,4 +73,15 @@ object TestData {
ArrayData(Seq(1,2,3), Seq(Seq(1,2,3))) ::
ArrayData(Seq(2,3,4), Seq(Seq(2,3,4))) :: Nil)
arrayData.registerAsTable("arrayData")
+
+ case class StringData(s: String)
+ val repeatedData =
+ TestSQLContext.sparkContext.parallelize(List.fill(2)(StringData("test")))
+ repeatedData.registerAsTable("repeatedData")
+
+ val nullableRepeatedData =
+ TestSQLContext.sparkContext.parallelize(
+ List.fill(2)(StringData(null)) ++
+ List.fill(2)(StringData("test")))
+ nullableRepeatedData.registerAsTable("nullableRepeatedData")
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala
index 16a13b8a74..31c5dfba92 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala
@@ -28,14 +28,14 @@ class InMemoryColumnarQuerySuite extends QueryTest {
test("simple columnar query") {
val plan = TestSQLContext.executePlan(testData.logicalPlan).executedPlan
- val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan.output, plan))
+ val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan.output, plan, true))
checkAnswer(scan, testData.collect().toSeq)
}
test("projection") {
val plan = TestSQLContext.executePlan(testData.select('value, 'key).logicalPlan).executedPlan
- val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan.output, plan))
+ val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan.output, plan, true))
checkAnswer(scan, testData.collect().map {
case Row(key: Int, value: String) => value -> key
@@ -44,9 +44,33 @@ class InMemoryColumnarQuerySuite extends QueryTest {
test("SPARK-1436 regression: in-memory columns must be able to be accessed multiple times") {
val plan = TestSQLContext.executePlan(testData.logicalPlan).executedPlan
- val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan.output, plan))
+ val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan.output, plan, true))
checkAnswer(scan, testData.collect().toSeq)
checkAnswer(scan, testData.collect().toSeq)
}
+
+ test("SPARK-1678 regression: compression must not lose repeated values") {
+ checkAnswer(
+ sql("SELECT * FROM repeatedData"),
+ repeatedData.collect().toSeq)
+
+ TestSQLContext.cacheTable("repeatedData")
+
+ checkAnswer(
+ sql("SELECT * FROM repeatedData"),
+ repeatedData.collect().toSeq)
+ }
+
+ test("with null values") {
+ checkAnswer(
+ sql("SELECT * FROM nullableRepeatedData"),
+ nullableRepeatedData.collect().toSeq)
+
+ TestSQLContext.cacheTable("nullableRepeatedData")
+
+ checkAnswer(
+ sql("SELECT * FROM nullableRepeatedData"),
+ nullableRepeatedData.collect().toSeq)
+ }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/BooleanBitSetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/BooleanBitSetSuite.scala
index a754f98f7f..93259a19b9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/BooleanBitSetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/BooleanBitSetSuite.scala
@@ -72,7 +72,12 @@ class BooleanBitSetSuite extends FunSuite {
buffer.rewind().position(headerSize + 4)
val decoder = BooleanBitSet.decoder(buffer, BOOLEAN)
- values.foreach(expectResult(_, "Wrong decoded value")(decoder.next()))
+ if (values.nonEmpty) {
+ values.foreach {
+ assert(decoder.hasNext)
+ expectResult(_, "Wrong decoded value")(decoder.next())
+ }
+ }
assert(!decoder.hasNext)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/DictionaryEncodingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/DictionaryEncodingSuite.scala
index eab27987e0..198dcd8819 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/DictionaryEncodingSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/DictionaryEncodingSuite.scala
@@ -98,8 +98,11 @@ class DictionaryEncodingSuite extends FunSuite {
val decoder = DictionaryEncoding.decoder(buffer, columnType)
- inputSeq.foreach { i =>
- expectResult(values(i), "Wrong decoded value")(decoder.next())
+ if (inputSeq.nonEmpty) {
+ inputSeq.foreach { i =>
+ assert(decoder.hasNext)
+ expectResult(values(i), "Wrong decoded value")(decoder.next())
+ }
}
assert(!decoder.hasNext)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/IntegralDeltaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/IntegralDeltaSuite.scala
index ce419ca726..46af6e001c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/IntegralDeltaSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/IntegralDeltaSuite.scala
@@ -96,7 +96,12 @@ class IntegralDeltaSuite extends FunSuite {
buffer.rewind().position(headerSize + 4)
val decoder = scheme.decoder(buffer, columnType)
- input.foreach(expectResult(_, "Wrong decoded value")(decoder.next()))
+ if (input.nonEmpty) {
+ input.foreach{
+ assert(decoder.hasNext)
+ expectResult(_, "Wrong decoded value")(decoder.next())
+ }
+ }
assert(!decoder.hasNext)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/RunLengthEncodingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/RunLengthEncodingSuite.scala
index 89f9b60a43..d3b73ba19d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/RunLengthEncodingSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/RunLengthEncodingSuite.scala
@@ -81,8 +81,11 @@ class RunLengthEncodingSuite extends FunSuite {
val decoder = RunLengthEncoding.decoder(buffer, columnType)
- inputSeq.foreach { i =>
- expectResult(values(i), "Wrong decoded value")(decoder.next())
+ if (inputSeq.nonEmpty) {
+ inputSeq.foreach { i =>
+ assert(decoder.hasNext)
+ expectResult(values(i), "Wrong decoded value")(decoder.next())
+ }
}
assert(!decoder.hasNext)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/TestCompressibleColumnBuilder.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/TestCompressibleColumnBuilder.scala
index 81bf5e99d1..6d688ea95c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/TestCompressibleColumnBuilder.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/compression/TestCompressibleColumnBuilder.scala
@@ -38,7 +38,7 @@ object TestCompressibleColumnBuilder {
scheme: CompressionScheme) = {
val builder = new TestCompressibleColumnBuilder(columnStats, columnType, Seq(scheme))
- builder.initialize(0)
+ builder.initialize(0, "", useCompression = true)
builder
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 6c907887db..ba837a274c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -130,7 +130,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
castChildOutput(p, table, child)
case p @ logical.InsertIntoTable(SparkLogicalPlan(InMemoryColumnarTableScan(
- _, HiveTableScan(_, table, _))), _, child, _) =>
+ _, HiveTableScan(_, table, _), _)), _, child, _) =>
castChildOutput(p, table, child)
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index d9a6e0e889..b2157074a4 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -45,7 +45,7 @@ private[hive] trait HiveStrategies {
case logical.InsertIntoTable(table: MetastoreRelation, partition, child, overwrite) =>
InsertIntoHiveTable(table, partition, planLater(child), overwrite)(hiveContext) :: Nil
case logical.InsertIntoTable(SparkLogicalPlan(InMemoryColumnarTableScan(
- _, HiveTableScan(_, table, _))), partition, child, overwrite) =>
+ _, HiveTableScan(_, table, _), _)), partition, child, overwrite) =>
InsertIntoHiveTable(table, partition, planLater(child), overwrite)(hiveContext) :: Nil
case _ => Nil
}