aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main
diff options
context:
space:
mode:
authorwangzhenhua <wangzhenhua@huawei.com>2016-12-24 15:34:44 +0800
committerWenchen Fan <wenchen@databricks.com>2016-12-24 15:34:44 +0800
commit3cff8161578b65139c9740fed694d4b3c81fa74a (patch)
treeb957978656c6e855216fcd3964febceadf2379cc /sql/core/src/main
parenta848f0ba84e37fd95d0f47863ec68326e3296b33 (diff)
downloadspark-3cff8161578b65139c9740fed694d4b3c81fa74a.tar.gz
spark-3cff8161578b65139c9740fed694d4b3c81fa74a.tar.bz2
spark-3cff8161578b65139c9740fed694d4b3c81fa74a.zip
[SPARK-18911][SQL] Define CatalogStatistics to interact with metastore and convert it to Statistics in relations
## What changes were proposed in this pull request? Statistics in LogicalPlan should use attributes to refer to columns rather than column names, because two columns from two relations can have the same column name. But CatalogTable doesn't have the concepts of attribute or broadcast hint in Statistics. Therefore, putting Statistics in CatalogTable is confusing. We define a different statistic structure in CatalogTable, which is only responsible for interacting with metastore, and is converted to statistics in LogicalPlan when it is used. ## How was this patch tested? add test cases Author: wangzhenhua <wangzhenhua@huawei.com> Author: Zhenhua Wang <wzh_zju@163.com> Closes #16323 from wzhfy/nameToAttr.
Diffstat (limited to 'sql/core/src/main')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala10
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala2
3 files changed, 8 insertions, 8 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
index 9dffe3614a..1340c9bece 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
@@ -21,7 +21,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
-import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable}
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStatistics, CatalogTable}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.plans.logical._
@@ -64,7 +64,7 @@ case class AnalyzeColumnCommand(
AnalyzeColumnCommand.computeColumnStats(sparkSession, tableIdent.table, relation, columnNames)
// We also update table-level stats in order to keep them consistent with column-level stats.
- val statistics = Statistics(
+ val statistics = CatalogStatistics(
sizeInBytes = sizeInBytes,
rowCount = Some(rowCount),
// Newly computed column stats should override the existing ones.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
index 52a8fc88c5..4a994e34af 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
@@ -25,8 +25,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.{AnalysisException, Dataset, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
-import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable}
-import org.apache.spark.sql.catalyst.plans.logical.Statistics
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStatistics, CatalogTable}
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.internal.SessionState
@@ -62,9 +61,9 @@ case class AnalyzeTableCommand(
def updateTableStats(catalogTable: CatalogTable, newTotalSize: Long): Unit = {
val oldTotalSize = catalogTable.stats.map(_.sizeInBytes.toLong).getOrElse(0L)
val oldRowCount = catalogTable.stats.flatMap(_.rowCount.map(_.toLong)).getOrElse(-1L)
- var newStats: Option[Statistics] = None
+ var newStats: Option[CatalogStatistics] = None
if (newTotalSize > 0 && newTotalSize != oldTotalSize) {
- newStats = Some(Statistics(sizeInBytes = newTotalSize))
+ newStats = Some(CatalogStatistics(sizeInBytes = newTotalSize))
}
// We only set rowCount when noscan is false, because otherwise:
// 1. when total size is not changed, we don't need to alter the table;
@@ -76,7 +75,8 @@ case class AnalyzeTableCommand(
newStats = if (newStats.isDefined) {
newStats.map(_.copy(rowCount = Some(BigInt(newRowCount))))
} else {
- Some(Statistics(sizeInBytes = oldTotalSize, rowCount = Some(BigInt(newRowCount))))
+ Some(CatalogStatistics(
+ sizeInBytes = oldTotalSize, rowCount = Some(BigInt(newRowCount))))
}
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
index 7c28d48f26..3fd40384d2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
@@ -73,7 +73,7 @@ case class LogicalRelation(
override lazy val cleanArgs: Seq[Any] = Seq(relation)
@transient override lazy val statistics: Statistics = {
- catalogTable.flatMap(_.stats.map(_.copy(sizeInBytes = relation.sizeInBytes))).getOrElse(
+ catalogTable.flatMap(_.stats.map(_.toPlanStats(output))).getOrElse(
Statistics(sizeInBytes = relation.sizeInBytes))
}