aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst/src
diff options
context:
space:
mode:
authorwangzhenhua <wangzhenhua@huawei.com>2016-12-24 15:34:44 +0800
committerWenchen Fan <wenchen@databricks.com>2016-12-24 15:34:44 +0800
commit3cff8161578b65139c9740fed694d4b3c81fa74a (patch)
treeb957978656c6e855216fcd3964febceadf2379cc /sql/catalyst/src
parenta848f0ba84e37fd95d0f47863ec68326e3296b33 (diff)
downloadspark-3cff8161578b65139c9740fed694d4b3c81fa74a.tar.gz
spark-3cff8161578b65139c9740fed694d4b3c81fa74a.tar.bz2
spark-3cff8161578b65139c9740fed694d4b3c81fa74a.zip
[SPARK-18911][SQL] Define CatalogStatistics to interact with metastore and convert it to Statistics in relations
## What changes were proposed in this pull request? Statistics in LogicalPlan should use attributes to refer to columns rather than column names, because two columns from two relations can have the same column name. But CatalogTable doesn't have the concepts of attribute or broadcast hint in Statistics. Therefore, putting Statistics in CatalogTable is confusing. We define a different statistic structure in CatalogTable, which is only responsible for interacting with metastore, and is converted to statistics in LogicalPlan when it is used. ## How was this patch tested? add test cases Author: wangzhenhua <wangzhenhua@huawei.com> Author: Zhenhua Wang <wzh_zju@163.com> Closes #16323 from wzhfy/nameToAttr.
Diffstat (limited to 'sql/catalyst/src')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala34
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala4
2 files changed, 33 insertions, 5 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 5b5378c09e..24d75ab02c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -21,8 +21,8 @@ import java.util.Date
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIdentifier}
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal}
-import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, Cast, Literal}
+import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util.quoteIdentifier
import org.apache.spark.sql.types.{StructField, StructType}
@@ -171,7 +171,7 @@ case class CatalogTable(
createTime: Long = System.currentTimeMillis,
lastAccessTime: Long = -1,
properties: Map[String, String] = Map.empty,
- stats: Option[Statistics] = None,
+ stats: Option[CatalogStatistics] = None,
viewOriginalText: Option[String] = None,
viewText: Option[String] = None,
comment: Option[String] = None,
@@ -247,6 +247,34 @@ case class CatalogTable(
}
+/**
+ * This class of statistics is used in [[CatalogTable]] to interact with metastore.
+ * We define this new class instead of directly using [[Statistics]] here because there are no
+ * concepts of attributes or broadcast hint in catalog.
+ */
+case class CatalogStatistics(
+ sizeInBytes: BigInt,
+ rowCount: Option[BigInt] = None,
+ colStats: Map[String, ColumnStat] = Map.empty) {
+
+ /**
+ * Convert [[CatalogStatistics]] to [[Statistics]], and match column stats to attributes based
+ * on column names.
+ */
+ def toPlanStats(planOutput: Seq[Attribute]): Statistics = {
+ val matched = planOutput.flatMap(a => colStats.get(a.name).map(a -> _))
+ Statistics(sizeInBytes = sizeInBytes, rowCount = rowCount,
+ attributeStats = AttributeMap(matched))
+ }
+
+ /** Readable string representation for the CatalogStatistics. */
+ def simpleString: String = {
+ val rowCountString = if (rowCount.isDefined) s", ${rowCount.get} rows" else ""
+ s"$sizeInBytes bytes$rowCountString"
+ }
+}
+
+
case class CatalogTableType private(name: String)
object CatalogTableType {
val EXTERNAL = new CatalogTableType("EXTERNAL")
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
index 465fbab571..91404d4bb8 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
@@ -41,13 +41,13 @@ import org.apache.spark.sql.types._
* @param sizeInBytes Physical size in bytes. For leaf operators this defaults to 1, otherwise it
* defaults to the product of children's `sizeInBytes`.
* @param rowCount Estimated number of rows.
- * @param colStats Column-level statistics.
+ * @param attributeStats Statistics for Attributes.
* @param isBroadcastable If true, output is small enough to be used in a broadcast join.
*/
case class Statistics(
sizeInBytes: BigInt,
rowCount: Option[BigInt] = None,
- colStats: Map[String, ColumnStat] = Map.empty,
+ attributeStats: AttributeMap[ColumnStat] = AttributeMap(Nil),
isBroadcastable: Boolean = false) {
override def toString: String = "Statistics(" + simpleString + ")"