aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Armbrust <michael@databricks.com>2015-07-14 22:57:39 -0700
committerReynold Xin <rxin@databricks.com>2015-07-14 22:57:39 -0700
commitc6b1a9e74e34267dc198e57a184c41498ca9d6a3 (patch)
tree7b02969c8bab5a1a60fa2c0b57f3667e57575026
parentf23a721c10b64ec5c6768634fc5e9e7b60ee7ca8 (diff)
downloadspark-c6b1a9e74e34267dc198e57a184c41498ca9d6a3.tar.gz
spark-c6b1a9e74e34267dc198e57a184c41498ca9d6a3.tar.bz2
spark-c6b1a9e74e34267dc198e57a184c41498ca9d6a3.zip
Revert SPARK-6910 and SPARK-9027
Revert #7216 and #7386. These patch seems to be causing quite a few test failures: ``` Caused by: java.lang.reflect.InvocationTargetException at sun.reflect.GeneratedMethodAccessor322.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.sql.hive.client.Shim_v0_13.getPartitionsByFilter(HiveShim.scala:351) at org.apache.spark.sql.hive.client.ClientWrapper$$anonfun$getPartitionsByFilter$1.apply(ClientWrapper.scala:320) at org.apache.spark.sql.hive.client.ClientWrapper$$anonfun$getPartitionsByFilter$1.apply(ClientWrapper.scala:318) at org.apache.spark.sql.hive.client.ClientWrapper$$anonfun$withHiveState$1.apply(ClientWrapper.scala:180) at org.apache.spark.sql.hive.client.ClientWrapper.retryLocked(ClientWrapper.scala:135) at org.apache.spark.sql.hive.client.ClientWrapper.withHiveState(ClientWrapper.scala:172) at org.apache.spark.sql.hive.client.ClientWrapper.getPartitionsByFilter(ClientWrapper.scala:318) at org.apache.spark.sql.hive.client.HiveTable.getPartitions(ClientInterface.scala:78) at org.apache.spark.sql.hive.MetastoreRelation.getHiveQlPartitions(HiveMetastoreCatalog.scala:670) at org.apache.spark.sql.hive.execution.HiveTableScan.doExecute(HiveTableScan.scala:137) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:90) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:90) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:89) at org.apache.spark.sql.execution.Exchange$$anonfun$doExecute$1.apply(Exchange.scala:164) at org.apache.spark.sql.execution.Exchange$$anonfun$doExecute$1.apply(Exchange.scala:151) at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:48) ... 85 more Caused by: MetaException(message:Filtering is supported only on partition keys of type string) at org.apache.hadoop.hive.metastore.parser.ExpressionTree$FilterBuilder.setError(ExpressionTree.java:185) at org.apache.hadoop.hive.metastore.parser.ExpressionTree$LeafNode.getJdoFilterPushdownParam(ExpressionTree.java:452) at org.apache.hadoop.hive.metastore.parser.ExpressionTree$LeafNode.generateJDOFilterOverPartitions(ExpressionTree.java:357) at org.apache.hadoop.hive.metastore.parser.ExpressionTree$LeafNode.generateJDOFilter(ExpressionTree.java:279) at org.apache.hadoop.hive.metastore.parser.ExpressionTree$TreeNode.generateJDOFilter(ExpressionTree.java:243) at org.apache.hadoop.hive.metastore.parser.ExpressionTree.generateJDOFilterFragment(ExpressionTree.java:590) at org.apache.hadoop.hive.metastore.ObjectStore.makeQueryFilterString(ObjectStore.java:2417) at org.apache.hadoop.hive.metastore.ObjectStore.getPartitionsViaOrmFilter(ObjectStore.java:2029) at org.apache.hadoop.hive.metastore.ObjectStore.access$500(ObjectStore.java:146) at org.apache.hadoop.hive.metastore.ObjectStore$4.getJdoResult(ObjectStore.java:2332) ``` https://amplab.cs.berkeley.edu/jenkins/view/Spark-QA-Test/job/Spark-Master-Maven-with-YARN/2945/HADOOP_PROFILE=hadoop-2.4,label=centos/testReport/junit/org.apache.spark.sql.hive.execution/SortMergeCompatibilitySuite/auto_sortmerge_join_16/ Author: Michael Armbrust <michael@databricks.com> Closes #7409 from marmbrus/revertMetastorePushdown and squashes the following commits: 92fabd3 [Michael Armbrust] Revert SPARK-6910 and SPARK-9027 5d3bdf2 [Michael Armbrust] Revert "[SPARK-9027] [SQL] Generalize metastore predicate pushdown"
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala58
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala1
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala4
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientInterface.scala11
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala21
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala72
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala7
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/client/FiltersSuite.scala78
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala8
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala2
10 files changed, 44 insertions, 218 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 5bdf68c83f..4b7a782c80 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -301,9 +301,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
val result = if (metastoreRelation.hiveQlTable.isPartitioned) {
val partitionSchema = StructType.fromAttributes(metastoreRelation.partitionKeys)
val partitionColumnDataTypes = partitionSchema.map(_.dataType)
- // We're converting the entire table into ParquetRelation, so predicates to Hive metastore
- // are empty.
- val partitions = metastoreRelation.getHiveQlPartitions().map { p =>
+ val partitions = metastoreRelation.hiveQlPartitions.map { p =>
val location = p.getLocation
val values = InternalRow.fromSeq(p.getValues.zip(partitionColumnDataTypes).map {
case (rawValue, dataType) => Cast(Literal(rawValue), dataType).eval(null)
@@ -646,6 +644,32 @@ private[hive] case class MetastoreRelation
new Table(tTable)
}
+ @transient val hiveQlPartitions: Seq[Partition] = table.getAllPartitions.map { p =>
+ val tPartition = new org.apache.hadoop.hive.metastore.api.Partition
+ tPartition.setDbName(databaseName)
+ tPartition.setTableName(tableName)
+ tPartition.setValues(p.values)
+
+ val sd = new org.apache.hadoop.hive.metastore.api.StorageDescriptor()
+ tPartition.setSd(sd)
+ sd.setCols(table.schema.map(c => new FieldSchema(c.name, c.hiveType, c.comment)))
+
+ sd.setLocation(p.storage.location)
+ sd.setInputFormat(p.storage.inputFormat)
+ sd.setOutputFormat(p.storage.outputFormat)
+
+ val serdeInfo = new org.apache.hadoop.hive.metastore.api.SerDeInfo
+ sd.setSerdeInfo(serdeInfo)
+ serdeInfo.setSerializationLib(p.storage.serde)
+
+ val serdeParameters = new java.util.HashMap[String, String]()
+ serdeInfo.setParameters(serdeParameters)
+ table.serdeProperties.foreach { case (k, v) => serdeParameters.put(k, v) }
+ p.storage.serdeProperties.foreach { case (k, v) => serdeParameters.put(k, v) }
+
+ new Partition(hiveQlTable, tPartition)
+ }
+
@transient override lazy val statistics: Statistics = Statistics(
sizeInBytes = {
val totalSize = hiveQlTable.getParameters.get(StatsSetupConst.TOTAL_SIZE)
@@ -666,34 +690,6 @@ private[hive] case class MetastoreRelation
}
)
- def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] = {
- table.getPartitions(predicates).map { p =>
- val tPartition = new org.apache.hadoop.hive.metastore.api.Partition
- tPartition.setDbName(databaseName)
- tPartition.setTableName(tableName)
- tPartition.setValues(p.values)
-
- val sd = new org.apache.hadoop.hive.metastore.api.StorageDescriptor()
- tPartition.setSd(sd)
- sd.setCols(table.schema.map(c => new FieldSchema(c.name, c.hiveType, c.comment)))
-
- sd.setLocation(p.storage.location)
- sd.setInputFormat(p.storage.inputFormat)
- sd.setOutputFormat(p.storage.outputFormat)
-
- val serdeInfo = new org.apache.hadoop.hive.metastore.api.SerDeInfo
- sd.setSerdeInfo(serdeInfo)
- serdeInfo.setSerializationLib(p.storage.serde)
-
- val serdeParameters = new java.util.HashMap[String, String]()
- serdeInfo.setParameters(serdeParameters)
- table.serdeProperties.foreach { case (k, v) => serdeParameters.put(k, v) }
- p.storage.serdeProperties.foreach { case (k, v) => serdeParameters.put(k, v) }
-
- new Partition(hiveQlTable, tPartition)
- }
- }
-
/** Only compare database and tablename, not alias. */
override def sameResult(plan: LogicalPlan): Boolean = {
plan match {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala
index a357bb39ca..d08c594151 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveShim.scala
@@ -27,7 +27,6 @@ import scala.reflect.ClassTag
import com.esotericsoftware.kryo.Kryo
import com.esotericsoftware.kryo.io.{Input, Output}
-
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.ql.exec.{UDF, Utilities}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 9638a8201e..ed359620a5 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -125,7 +125,7 @@ private[hive] trait HiveStrategies {
InterpretedPredicate.create(castedPredicate)
}
- val partitions = relation.getHiveQlPartitions(pruningPredicates).filter { part =>
+ val partitions = relation.hiveQlPartitions.filter { part =>
val partitionValues = part.getValues
var i = 0
while (i < partitionValues.size()) {
@@ -213,7 +213,7 @@ private[hive] trait HiveStrategies {
projectList,
otherPredicates,
identity[Seq[Expression]],
- HiveTableScan(_, relation, pruningPredicates)(hiveContext)) :: Nil
+ HiveTableScan(_, relation, pruningPredicates.reduceLeftOption(And))(hiveContext)) :: Nil
case _ =>
Nil
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientInterface.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientInterface.scala
index 1656587d14..0a1d761a52 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientInterface.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientInterface.scala
@@ -21,7 +21,6 @@ import java.io.PrintStream
import java.util.{Map => JMap}
import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchTableException}
-import org.apache.spark.sql.catalyst.expressions.Expression
private[hive] case class HiveDatabase(
name: String,
@@ -72,12 +71,7 @@ private[hive] case class HiveTable(
def isPartitioned: Boolean = partitionColumns.nonEmpty
- def getPartitions(predicates: Seq[Expression]): Seq[HivePartition] = {
- predicates match {
- case Nil => client.getAllPartitions(this)
- case _ => client.getPartitionsByFilter(this, predicates)
- }
- }
+ def getAllPartitions: Seq[HivePartition] = client.getAllPartitions(this)
// Hive does not support backticks when passing names to the client.
def qualifiedName: String = s"$database.$name"
@@ -138,9 +132,6 @@ private[hive] trait ClientInterface {
/** Returns all partitions for the given table. */
def getAllPartitions(hTable: HiveTable): Seq[HivePartition]
- /** Returns partitions filtered by predicates for the given table. */
- def getPartitionsByFilter(hTable: HiveTable, predicates: Seq[Expression]): Seq[HivePartition]
-
/** Loads a static partition into an existing table. */
def loadPartition(
loadPath: String,
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
index 8adda54754..53f457ad4f 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
@@ -17,21 +17,25 @@
package org.apache.spark.sql.hive.client
-import java.io.{File, PrintStream}
-import java.util.{Map => JMap}
+import java.io.{BufferedReader, InputStreamReader, File, PrintStream}
+import java.net.URI
+import java.util.{ArrayList => JArrayList, Map => JMap, List => JList, Set => JSet}
import javax.annotation.concurrent.GuardedBy
import scala.collection.JavaConversions._
import scala.language.reflectiveCalls
import org.apache.hadoop.fs.Path
+import org.apache.hadoop.hive.metastore.api.Database
import org.apache.hadoop.hive.conf.HiveConf
-import org.apache.hadoop.hive.metastore.api.{Database, FieldSchema}
import org.apache.hadoop.hive.metastore.{TableType => HTableType}
+import org.apache.hadoop.hive.metastore.api
+import org.apache.hadoop.hive.metastore.api.FieldSchema
+import org.apache.hadoop.hive.ql.metadata
import org.apache.hadoop.hive.ql.metadata.Hive
-import org.apache.hadoop.hive.ql.processors._
import org.apache.hadoop.hive.ql.session.SessionState
-import org.apache.hadoop.hive.ql.{Driver, metadata}
+import org.apache.hadoop.hive.ql.processors._
+import org.apache.hadoop.hive.ql.Driver
import org.apache.spark.Logging
import org.apache.spark.sql.catalyst.expressions.Expression
@@ -312,13 +316,6 @@ private[hive] class ClientWrapper(
shim.getAllPartitions(client, qlTable).map(toHivePartition)
}
- override def getPartitionsByFilter(
- hTable: HiveTable,
- predicates: Seq[Expression]): Seq[HivePartition] = withHiveState {
- val qlTable = toQlTable(hTable)
- shim.getPartitionsByFilter(client, qlTable, predicates).map(toHivePartition)
- }
-
override def listTables(dbName: String): Seq[String] = withHiveState {
client.getAllTables(dbName)
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
index d12778c758..1fa9d278e2 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
@@ -31,11 +31,6 @@ import org.apache.hadoop.hive.ql.Driver
import org.apache.hadoop.hive.ql.metadata.{Hive, Partition, Table}
import org.apache.hadoop.hive.ql.processors.{CommandProcessor, CommandProcessorFactory}
import org.apache.hadoop.hive.ql.session.SessionState
-import org.apache.hadoop.hive.serde.serdeConstants
-
-import org.apache.spark.Logging
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.types.{StringType, IntegralType}
/**
* A shim that defines the interface between ClientWrapper and the underlying Hive library used to
@@ -66,8 +61,6 @@ private[client] sealed abstract class Shim {
def getAllPartitions(hive: Hive, table: Table): Seq[Partition]
- def getPartitionsByFilter(hive: Hive, table: Table, predicates: Seq[Expression]): Seq[Partition]
-
def getCommandProcessor(token: String, conf: HiveConf): CommandProcessor
def getDriverResults(driver: Driver): Seq[String]
@@ -116,7 +109,7 @@ private[client] sealed abstract class Shim {
}
-private[client] class Shim_v0_12 extends Shim with Logging {
+private[client] class Shim_v0_12 extends Shim {
private lazy val startMethod =
findStaticMethod(
@@ -203,17 +196,6 @@ private[client] class Shim_v0_12 extends Shim with Logging {
override def getAllPartitions(hive: Hive, table: Table): Seq[Partition] =
getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]].toSeq
- override def getPartitionsByFilter(
- hive: Hive,
- table: Table,
- predicates: Seq[Expression]): Seq[Partition] = {
- // getPartitionsByFilter() doesn't support binary comparison ops in Hive 0.12.
- // See HIVE-4888.
- logDebug("Hive 0.12 doesn't support predicate pushdown to metastore. " +
- "Please use Hive 0.13 or higher.")
- getAllPartitions(hive, table)
- }
-
override def getCommandProcessor(token: String, conf: HiveConf): CommandProcessor =
getCommandProcessorMethod.invoke(null, token, conf).asInstanceOf[CommandProcessor]
@@ -285,12 +267,6 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
classOf[Hive],
"getAllPartitionsOf",
classOf[Table])
- private lazy val getPartitionsByFilterMethod =
- findMethod(
- classOf[Hive],
- "getPartitionsByFilter",
- classOf[Table],
- classOf[String])
private lazy val getCommandProcessorMethod =
findStaticMethod(
classOf[CommandProcessorFactory],
@@ -312,52 +288,6 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
override def getAllPartitions(hive: Hive, table: Table): Seq[Partition] =
getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]].toSeq
- /**
- * Converts catalyst expression to the format that Hive's getPartitionsByFilter() expects, i.e.
- * a string that represents partition predicates like "str_key=\"value\" and int_key=1 ...".
- *
- * Unsupported predicates are skipped.
- */
- def convertFilters(table: Table, filters: Seq[Expression]): String = {
- // hive varchar is treated as catalyst string, but hive varchar can't be pushed down.
- val varcharKeys = table.getPartitionKeys
- .filter(col => col.getType.startsWith(serdeConstants.VARCHAR_TYPE_NAME))
- .map(col => col.getName).toSet
-
- filters.collect {
- case op @ BinaryComparison(a: Attribute, Literal(v, _: IntegralType)) =>
- s"${a.name} ${op.symbol} $v"
- case op @ BinaryComparison(Literal(v, _: IntegralType), a: Attribute) =>
- s"$v ${op.symbol} ${a.name}"
-
- case op @ BinaryComparison(a: Attribute, Literal(v, _: StringType))
- if !varcharKeys.contains(a.name) =>
- s"""${a.name} ${op.symbol} "$v""""
- case op @ BinaryComparison(Literal(v, _: StringType), a: Attribute)
- if !varcharKeys.contains(a.name) =>
- s""""$v" ${op.symbol} ${a.name}"""
- }.mkString(" and ")
- }
-
- override def getPartitionsByFilter(
- hive: Hive,
- table: Table,
- predicates: Seq[Expression]): Seq[Partition] = {
-
- // Hive getPartitionsByFilter() takes a string that represents partition
- // predicates like "str_key=\"value\" and int_key=1 ..."
- val filter = convertFilters(table, predicates)
- val partitions =
- if (filter.isEmpty) {
- getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]]
- } else {
- logDebug(s"Hive metastore filter is '$filter'.")
- getPartitionsByFilterMethod.invoke(hive, table, filter).asInstanceOf[JArrayList[Partition]]
- }
-
- partitions.toSeq
- }
-
override def getCommandProcessor(token: String, conf: HiveConf): CommandProcessor =
getCommandProcessorMethod.invoke(null, Array(token), conf).asInstanceOf[CommandProcessor]
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala
index ba7eb15a1c..d33da8242c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala
@@ -44,7 +44,7 @@ private[hive]
case class HiveTableScan(
requestedAttributes: Seq[Attribute],
relation: MetastoreRelation,
- partitionPruningPred: Seq[Expression])(
+ partitionPruningPred: Option[Expression])(
@transient val context: HiveContext)
extends LeafNode {
@@ -56,7 +56,7 @@ case class HiveTableScan(
// Bind all partition key attribute references in the partition pruning predicate for later
// evaluation.
- private[this] val boundPruningPred = partitionPruningPred.reduceLeftOption(And).map { pred =>
+ private[this] val boundPruningPred = partitionPruningPred.map { pred =>
require(
pred.dataType == BooleanType,
s"Data type of predicate $pred must be BooleanType rather than ${pred.dataType}.")
@@ -133,8 +133,7 @@ case class HiveTableScan(
protected override def doExecute(): RDD[InternalRow] = if (!relation.hiveQlTable.isPartitioned) {
hadoopReader.makeRDDForTable(relation.hiveQlTable)
} else {
- hadoopReader.makeRDDForPartitionedTable(
- prunePartitions(relation.getHiveQlPartitions(partitionPruningPred)))
+ hadoopReader.makeRDDForPartitionedTable(prunePartitions(relation.hiveQlPartitions))
}
override def output: Seq[Attribute] = attributes
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/FiltersSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/FiltersSuite.scala
deleted file mode 100644
index 0efcf80bd4..0000000000
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/FiltersSuite.scala
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive.client
-
-import scala.collection.JavaConversions._
-
-import org.apache.hadoop.hive.metastore.api.FieldSchema
-import org.apache.hadoop.hive.serde.serdeConstants
-
-import org.apache.spark.{Logging, SparkFunSuite}
-import org.apache.spark.sql.catalyst.dsl.expressions._
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.types._
-
-/**
- * A set of tests for the filter conversion logic used when pushing partition pruning into the
- * metastore
- */
-class FiltersSuite extends SparkFunSuite with Logging {
- private val shim = new Shim_v0_13
-
- private val testTable = new org.apache.hadoop.hive.ql.metadata.Table("default", "test")
- private val varCharCol = new FieldSchema()
- varCharCol.setName("varchar")
- varCharCol.setType(serdeConstants.VARCHAR_TYPE_NAME)
- testTable.setPartCols(varCharCol :: Nil)
-
- filterTest("string filter",
- (a("stringcol", StringType) > Literal("test")) :: Nil,
- "stringcol > \"test\"")
-
- filterTest("string filter backwards",
- (Literal("test") > a("stringcol", StringType)) :: Nil,
- "\"test\" > stringcol")
-
- filterTest("int filter",
- (a("intcol", IntegerType) === Literal(1)) :: Nil,
- "intcol = 1")
-
- filterTest("int filter backwards",
- (Literal(1) === a("intcol", IntegerType)) :: Nil,
- "1 = intcol")
-
- filterTest("int and string filter",
- (Literal(1) === a("intcol", IntegerType)) :: (Literal("a") === a("strcol", IntegerType)) :: Nil,
- "1 = intcol and \"a\" = strcol")
-
- filterTest("skip varchar",
- (Literal("") === a("varchar", StringType)) :: Nil,
- "")
-
- private def filterTest(name: String, filters: Seq[Expression], result: String) = {
- test(name){
- val converted = shim.convertFilters(testTable, filters)
- if (converted != result) {
- fail(
- s"Expected filters ${filters.mkString(",")} to convert to '$result' but got '$converted'")
- }
- }
- }
-
- private def a(name: String, dataType: DataType) = AttributeReference(name, dataType)()
-}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index 3eb127e23d..d52e162acb 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -20,9 +20,7 @@ package org.apache.spark.sql.hive.client
import java.io.File
import org.apache.spark.{Logging, SparkFunSuite}
-import org.apache.spark.sql.catalyst.expressions.{NamedExpression, Literal, AttributeReference, EqualTo}
import org.apache.spark.sql.catalyst.util.quietly
-import org.apache.spark.sql.types.IntegerType
import org.apache.spark.util.Utils
/**
@@ -153,12 +151,6 @@ class VersionsSuite extends SparkFunSuite with Logging {
client.getAllPartitions(client.getTable("default", "src_part"))
}
- test(s"$version: getPartitionsByFilter") {
- client.getPartitionsByFilter(client.getTable("default", "src_part"), Seq(EqualTo(
- AttributeReference("key", IntegerType, false)(NamedExpression.newExprId),
- Literal(1))))
- }
-
test(s"$version: loadPartition") {
client.loadPartition(
emptyDir,
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala
index e83a7dc77e..de6a41ce5b 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala
@@ -151,7 +151,7 @@ class PruningSuite extends HiveComparisonTest with BeforeAndAfter {
case p @ HiveTableScan(columns, relation, _) =>
val columnNames = columns.map(_.name)
val partValues = if (relation.table.isPartitioned) {
- p.prunePartitions(relation.getHiveQlPartitions()).map(_.getValues)
+ p.prunePartitions(relation.hiveQlPartitions).map(_.getValues)
} else {
Seq.empty
}