aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorMichael Armbrust <michael@databricks.com>2015-08-12 17:07:29 -0700
committerMichael Armbrust <michael@databricks.com>2015-08-12 17:07:29 -0700
commit660e6dcff8125b83cc73dbe00c90cbe58744bc66 (patch)
treeeb2cdf02de009c87b73691641602845c9469515f /sql
parentfc1c7fd66e64ccea53b31cd2fbb98bc6d307329c (diff)
downloadspark-660e6dcff8125b83cc73dbe00c90cbe58744bc66.tar.gz
spark-660e6dcff8125b83cc73dbe00c90cbe58744bc66.tar.bz2
spark-660e6dcff8125b83cc73dbe00c90cbe58744bc66.zip
[SPARK-9449] [SQL] Include MetastoreRelation's inputFiles
Author: Michael Armbrust <michael@databricks.com> Closes #8119 from marmbrus/metastoreInputFiles.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala10
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/FileRelation.scala28
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala6
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala26
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala16
5 files changed, 66 insertions, 20 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index 27b994f1f0..c466d9e6cb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -34,10 +34,10 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.logical.{Filter, _}
+import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.{Inner, JoinType}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, ScalaReflection, SqlParser}
-import org.apache.spark.sql.execution.{EvaluatePython, ExplainCommand, LogicalRDD, SQLExecution}
+import org.apache.spark.sql.execution.{EvaluatePython, ExplainCommand, FileRelation, LogicalRDD, SQLExecution}
import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, LogicalRelation}
import org.apache.spark.sql.execution.datasources.json.JacksonGenerator
import org.apache.spark.sql.sources.HadoopFsRelation
@@ -1560,8 +1560,10 @@ class DataFrame private[sql](
*/
def inputFiles: Array[String] = {
val files: Seq[String] = logicalPlan.collect {
- case LogicalRelation(fsBasedRelation: HadoopFsRelation) =>
- fsBasedRelation.paths.toSeq
+ case LogicalRelation(fsBasedRelation: FileRelation) =>
+ fsBasedRelation.inputFiles
+ case fr: FileRelation =>
+ fr.inputFiles
}.flatten
files.toSet.toArray
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/FileRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/FileRelation.scala
new file mode 100644
index 0000000000..7a2a9eed58
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/FileRelation.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+/**
+ * An interface for relations that are backed by files. When a class implements this interface,
+ * the list of paths that it returns will be returned to a user who calls `inputPaths` on any
+ * DataFrame that queries this relation.
+ */
+private[sql] trait FileRelation {
+ /** Returns the list of files that will be read when scanning this relation. */
+ def inputFiles: Array[String]
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index 2f8417a48d..b3b326fe61 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -31,7 +31,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection
-import org.apache.spark.sql.execution.RDDConversions
+import org.apache.spark.sql.execution.{FileRelation, RDDConversions}
import org.apache.spark.sql.execution.datasources.{PartitioningUtils, PartitionSpec, Partition}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql._
@@ -406,7 +406,7 @@ abstract class OutputWriter {
*/
@Experimental
abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[PartitionSpec])
- extends BaseRelation with Logging {
+ extends BaseRelation with FileRelation with Logging {
override def toString: String = getClass.getSimpleName + paths.mkString("[", ",", "]")
@@ -516,6 +516,8 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
*/
def paths: Array[String]
+ override def inputFiles: Array[String] = cachedLeafStatuses().map(_.getPath.toString).toArray
+
/**
* Partition columns. Can be either defined by [[userDefinedPartitionColumns]] or automatically
* discovered. Note that they should always be nullable.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index adbd95197d..2feec29955 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -485,21 +485,23 @@ class DataFrameSuite extends QueryTest with SQLTestUtils {
}
test("inputFiles") {
- val fakeRelation1 = new ParquetRelation(Array("/my/path", "/my/other/path"),
- Some(testData.schema), None, Map.empty)(sqlContext)
- val df1 = DataFrame(sqlContext, LogicalRelation(fakeRelation1))
- assert(df1.inputFiles.toSet == fakeRelation1.paths.toSet)
+ withTempDir { dir =>
+ val df = Seq((1, 22)).toDF("a", "b")
- val fakeRelation2 = new JSONRelation(
- None, 1, Some(testData.schema), None, None, Array("/json/path"))(sqlContext)
- val df2 = DataFrame(sqlContext, LogicalRelation(fakeRelation2))
- assert(df2.inputFiles.toSet == fakeRelation2.paths.toSet)
+ val parquetDir = new File(dir, "parquet").getCanonicalPath
+ df.write.parquet(parquetDir)
+ val parquetDF = sqlContext.read.parquet(parquetDir)
+ assert(parquetDF.inputFiles.nonEmpty)
- val unionDF = df1.unionAll(df2)
- assert(unionDF.inputFiles.toSet == fakeRelation1.paths.toSet ++ fakeRelation2.paths)
+ val jsonDir = new File(dir, "json").getCanonicalPath
+ df.write.json(jsonDir)
+ val jsonDF = sqlContext.read.json(jsonDir)
+ assert(parquetDF.inputFiles.nonEmpty)
- val filtered = df1.filter("false").unionAll(df2.intersect(df2))
- assert(filtered.inputFiles.toSet == fakeRelation1.paths.toSet ++ fakeRelation2.paths)
+ val unioned = jsonDF.unionAll(parquetDF).inputFiles.sorted
+ val allFiles = (jsonDF.inputFiles ++ parquetDF.inputFiles).toSet.toArray.sorted
+ assert(unioned === allFiles)
+ }
}
ignore("show") {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index ac9aaed19d..5e5497837a 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.catalyst.{InternalRow, SqlParser, TableIdentifier}
-import org.apache.spark.sql.execution.datasources
+import org.apache.spark.sql.execution.{FileRelation, datasources}
import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, LogicalRelation, Partition => ParquetPartition, PartitionSpec, ResolvedDataSource}
import org.apache.spark.sql.hive.client._
import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation
@@ -739,7 +739,7 @@ private[hive] case class MetastoreRelation
(databaseName: String, tableName: String, alias: Option[String])
(val table: HiveTable)
(@transient sqlContext: SQLContext)
- extends LeafNode with MultiInstanceRelation {
+ extends LeafNode with MultiInstanceRelation with FileRelation {
override def equals(other: Any): Boolean = other match {
case relation: MetastoreRelation =>
@@ -888,6 +888,18 @@ private[hive] case class MetastoreRelation
/** An attribute map for determining the ordinal for non-partition columns. */
val columnOrdinals = AttributeMap(attributes.zipWithIndex)
+ override def inputFiles: Array[String] = {
+ val partLocations = table.getPartitions(Nil).map(_.storage.location).toArray
+ if (partLocations.nonEmpty) {
+ partLocations
+ } else {
+ Array(
+ table.location.getOrElse(
+ sys.error(s"Could not get the location of ${table.qualifiedName}.")))
+ }
+ }
+
+
override def newInstance(): MetastoreRelation = {
MetastoreRelation(databaseName, tableName, alias)(table)(sqlContext)
}