From 511f52f8423e151b0d0133baf040d34a0af3d422 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Tue, 9 Aug 2016 18:22:14 +0800 Subject: [SPARK-16964][SQL] Remove private[sql] and private[spark] from sql.execution package ## What changes were proposed in this pull request? This package is meant to be internal, and as a result it does not make sense to mark things as private[sql] or private[spark]. It simply makes debugging harder when Spark developers need to inspect the plans at runtime. This patch removes all private[sql] and private[spark] visibility modifiers in org.apache.spark.sql.execution. ## How was this patch tested? N/A - just visibility changes. Author: Reynold Xin Closes #14554 from rxin/remote-private. --- .../apache/spark/sql/execution/CacheManager.scala | 22 ++++++++++---------- .../spark/sql/execution/DataSourceScanExec.scala | 10 ++++----- .../apache/spark/sql/execution/ExistingRDD.scala | 14 ++++++------- .../apache/spark/sql/execution/ExpandExec.scala | 2 +- .../apache/spark/sql/execution/FileRelation.scala | 2 +- .../apache/spark/sql/execution/GenerateExec.scala | 2 +- .../spark/sql/execution/LocalTableScanExec.scala | 4 ++-- .../apache/spark/sql/execution/RowIterator.scala | 2 +- .../apache/spark/sql/execution/SQLExecution.scala | 2 +- .../org/apache/spark/sql/execution/SortExec.scala | 6 ++---- .../org/apache/spark/sql/execution/SparkPlan.scala | 14 ++++++------- .../apache/spark/sql/execution/SparkPlanInfo.scala | 2 +- .../spark/sql/execution/SparkStrategies.scala | 6 ++---- .../spark/sql/execution/UnsafeRowSerializer.scala | 4 ++-- .../sql/execution/WholeStageCodegenExec.scala | 2 +- .../execution/aggregate/HashAggregateExec.scala | 2 +- .../execution/aggregate/SortAggregateExec.scala | 2 +- .../spark/sql/execution/aggregate/udaf.scala | 6 +++--- .../sql/execution/basicPhysicalOperators.scala | 6 +++--- .../sql/execution/columnar/InMemoryRelation.scala | 8 ++++---- .../execution/columnar/InMemoryTableScanExec.scala | 4 ++-- .../spark/sql/execution/command/commands.scala | 4 ++-- .../sql/execution/datasources/BucketingUtils.scala | 2 +- .../execution/datasources/DataSourceStrategy.scala | 10 ++++----- .../execution/datasources/FileSourceStrategy.scala | 7 ++----- .../datasources/InsertIntoDataSourceCommand.scala | 2 +- .../InsertIntoHadoopFsRelationCommand.scala | 2 +- .../execution/datasources/PartitioningUtils.scala | 24 ++++++++++++---------- .../execution/datasources/WriterContainer.scala | 8 ++++---- .../sql/execution/datasources/csv/CSVOptions.scala | 2 +- .../sql/execution/datasources/csv/CSVParser.scala | 4 ++-- .../execution/datasources/csv/CSVRelation.scala | 4 ++-- .../datasources/fileSourceInterfaces.scala | 6 +++--- .../sql/execution/datasources/jdbc/JDBCRDD.scala | 8 ++++---- .../datasources/parquet/ParquetFileFormat.scala | 17 +++++++-------- .../datasources/parquet/ParquetFilters.scala | 2 +- .../datasources/parquet/ParquetOptions.scala | 6 +++--- .../spark/sql/execution/datasources/rules.scala | 6 +++--- .../apache/spark/sql/execution/debug/package.scala | 2 +- .../execution/exchange/BroadcastExchangeExec.scala | 2 +- .../execution/exchange/ExchangeCoordinator.scala | 4 ++-- .../sql/execution/exchange/ShuffleExchange.scala | 9 ++++---- .../execution/joins/BroadcastHashJoinExec.scala | 2 +- .../joins/BroadcastNestedLoopJoinExec.scala | 2 +- .../sql/execution/joins/CartesianProductExec.scala | 5 ++--- .../sql/execution/joins/ShuffledHashJoinExec.scala | 2 +- .../sql/execution/joins/SortMergeJoinExec.scala | 2 +- .../spark/sql/execution/metric/SQLMetrics.scala | 10 ++++----- .../sql/execution/python/ExtractPythonUDFs.scala | 4 ++-- .../sql/execution/r/MapPartitionsRWrapper.scala | 4 ++-- .../spark/sql/execution/stat/FrequentItems.scala | 4 ++-- .../spark/sql/execution/stat/StatFunctions.scala | 8 ++++---- .../execution/streaming/IncrementalExecution.scala | 2 +- .../sql/execution/streaming/StreamExecution.scala | 19 ++++++++--------- .../sql/execution/streaming/StreamProgress.scala | 2 +- .../sql/execution/streaming/state/StateStore.scala | 2 +- .../streaming/state/StateStoreCoordinator.scala | 4 ++-- .../spark/sql/execution/ui/ExecutionPage.scala | 2 +- .../spark/sql/execution/ui/SQLListener.scala | 6 +++--- .../org/apache/spark/sql/execution/ui/SQLTab.scala | 4 ++-- .../spark/sql/execution/ui/SparkPlanGraph.scala | 6 +++--- .../apache/spark/sql/internal/SharedState.scala | 2 -- .../sql/hive/execution/HiveTableScanExec.scala | 2 +- 63 files changed, 170 insertions(+), 177 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala index de2503a87a..83b7c779ab 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala @@ -31,7 +31,7 @@ import org.apache.spark.storage.StorageLevel import org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK /** Holds a cached logical plan and its data */ -private[sql] case class CachedData(plan: LogicalPlan, cachedRepresentation: InMemoryRelation) +case class CachedData(plan: LogicalPlan, cachedRepresentation: InMemoryRelation) /** * Provides support in a SQLContext for caching query results and automatically using these cached @@ -41,7 +41,7 @@ private[sql] case class CachedData(plan: LogicalPlan, cachedRepresentation: InMe * * Internal to Spark SQL. */ -private[sql] class CacheManager extends Logging { +class CacheManager extends Logging { @transient private val cachedData = new scala.collection.mutable.ArrayBuffer[CachedData] @@ -68,13 +68,13 @@ private[sql] class CacheManager extends Logging { } /** Clears all cached tables. */ - private[sql] def clearCache(): Unit = writeLock { + def clearCache(): Unit = writeLock { cachedData.foreach(_.cachedRepresentation.cachedColumnBuffers.unpersist()) cachedData.clear() } /** Checks if the cache is empty. */ - private[sql] def isEmpty: Boolean = readLock { + def isEmpty: Boolean = readLock { cachedData.isEmpty } @@ -83,7 +83,7 @@ private[sql] class CacheManager extends Logging { * Unlike `RDD.cache()`, the default storage level is set to be `MEMORY_AND_DISK` because * recomputing the in-memory columnar representation of the underlying table is expensive. */ - private[sql] def cacheQuery( + def cacheQuery( query: Dataset[_], tableName: Option[String] = None, storageLevel: StorageLevel = MEMORY_AND_DISK): Unit = writeLock { @@ -108,7 +108,7 @@ private[sql] class CacheManager extends Logging { * Tries to remove the data for the given [[Dataset]] from the cache. * No operation, if it's already uncached. */ - private[sql] def uncacheQuery(query: Dataset[_], blocking: Boolean = true): Boolean = writeLock { + def uncacheQuery(query: Dataset[_], blocking: Boolean = true): Boolean = writeLock { val planToCache = query.queryExecution.analyzed val dataIndex = cachedData.indexWhere(cd => planToCache.sameResult(cd.plan)) val found = dataIndex >= 0 @@ -120,17 +120,17 @@ private[sql] class CacheManager extends Logging { } /** Optionally returns cached data for the given [[Dataset]] */ - private[sql] def lookupCachedData(query: Dataset[_]): Option[CachedData] = readLock { + def lookupCachedData(query: Dataset[_]): Option[CachedData] = readLock { lookupCachedData(query.queryExecution.analyzed) } /** Optionally returns cached data for the given [[LogicalPlan]]. */ - private[sql] def lookupCachedData(plan: LogicalPlan): Option[CachedData] = readLock { + def lookupCachedData(plan: LogicalPlan): Option[CachedData] = readLock { cachedData.find(cd => plan.sameResult(cd.plan)) } /** Replaces segments of the given logical plan with cached versions where possible. */ - private[sql] def useCachedData(plan: LogicalPlan): LogicalPlan = { + def useCachedData(plan: LogicalPlan): LogicalPlan = { plan transformDown { case currentFragment => lookupCachedData(currentFragment) @@ -143,7 +143,7 @@ private[sql] class CacheManager extends Logging { * Invalidates the cache of any data that contains `plan`. Note that it is possible that this * function will over invalidate. */ - private[sql] def invalidateCache(plan: LogicalPlan): Unit = writeLock { + def invalidateCache(plan: LogicalPlan): Unit = writeLock { cachedData.foreach { case data if data.plan.collect { case p if p.sameResult(plan) => p }.nonEmpty => data.cachedRepresentation.recache() @@ -155,7 +155,7 @@ private[sql] class CacheManager extends Logging { * Invalidates the cache of any data that contains `resourcePath` in one or more * `HadoopFsRelation` node(s) as part of its logical plan. */ - private[sql] def invalidateCachedPath( + def invalidateCachedPath( sparkSession: SparkSession, resourcePath: String): Unit = writeLock { val (fs, qualifiedPath) = { val path = new Path(resourcePath) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 1e749b3dfc..1a8d0e310a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -38,7 +38,7 @@ import org.apache.spark.sql.sources.{BaseRelation, Filter} import org.apache.spark.sql.types.{DataType, StructType} import org.apache.spark.util.Utils -private[sql] trait DataSourceScanExec extends LeafExecNode with CodegenSupport { +trait DataSourceScanExec extends LeafExecNode with CodegenSupport { val relation: BaseRelation val metastoreTableIdentifier: Option[TableIdentifier] @@ -48,7 +48,7 @@ private[sql] trait DataSourceScanExec extends LeafExecNode with CodegenSupport { } /** Physical plan node for scanning data from a relation. */ -private[sql] case class RowDataSourceScanExec( +case class RowDataSourceScanExec( output: Seq[Attribute], rdd: RDD[InternalRow], @transient relation: BaseRelation, @@ -57,7 +57,7 @@ private[sql] case class RowDataSourceScanExec( override val metastoreTableIdentifier: Option[TableIdentifier]) extends DataSourceScanExec { - private[sql] override lazy val metrics = + override lazy val metrics = Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) val outputUnsafeRows = relation match { @@ -138,7 +138,7 @@ private[sql] case class RowDataSourceScanExec( * @param dataFilters Data source filters to use for filtering data within partitions. * @param metastoreTableIdentifier */ -private[sql] case class FileSourceScanExec( +case class FileSourceScanExec( @transient relation: HadoopFsRelation, output: Seq[Attribute], outputSchema: StructType, @@ -211,7 +211,7 @@ private[sql] case class FileSourceScanExec( inputRDD :: Nil } - private[sql] override lazy val metrics = + override lazy val metrics = Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time")) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index b762c16914..6c4248c60e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -67,7 +67,7 @@ object RDDConversions { } } -private[sql] object ExternalRDD { +object ExternalRDD { def apply[T: Encoder](rdd: RDD[T], session: SparkSession): LogicalPlan = { val externalRdd = ExternalRDD(CatalystSerde.generateObjAttr[T], rdd)(session) @@ -76,7 +76,7 @@ private[sql] object ExternalRDD { } /** Logical plan node for scanning data from an RDD. */ -private[sql] case class ExternalRDD[T]( +case class ExternalRDD[T]( outputObjAttr: Attribute, rdd: RDD[T])(session: SparkSession) extends LeafNode with ObjectProducer with MultiInstanceRelation { @@ -103,11 +103,11 @@ private[sql] case class ExternalRDD[T]( } /** Physical plan node for scanning data from an RDD. */ -private[sql] case class ExternalRDDScanExec[T]( +case class ExternalRDDScanExec[T]( outputObjAttr: Attribute, rdd: RDD[T]) extends LeafExecNode with ObjectProducerExec { - private[sql] override lazy val metrics = Map( + override lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) protected override def doExecute(): RDD[InternalRow] = { @@ -128,7 +128,7 @@ private[sql] case class ExternalRDDScanExec[T]( } /** Logical plan node for scanning data from an RDD of InternalRow. */ -private[sql] case class LogicalRDD( +case class LogicalRDD( output: Seq[Attribute], rdd: RDD[InternalRow])(session: SparkSession) extends LeafNode with MultiInstanceRelation { @@ -155,12 +155,12 @@ private[sql] case class LogicalRDD( } /** Physical plan node for scanning data from an RDD of InternalRow. */ -private[sql] case class RDDScanExec( +case class RDDScanExec( output: Seq[Attribute], rdd: RDD[InternalRow], override val nodeName: String) extends LeafExecNode { - private[sql] override lazy val metrics = Map( + override lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) protected override def doExecute(): RDD[InternalRow] = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala index 4c046f7bdc..d5603b3b00 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala @@ -39,7 +39,7 @@ case class ExpandExec( child: SparkPlan) extends UnaryExecNode with CodegenSupport { - private[sql] override lazy val metrics = Map( + override lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) // The GroupExpressions can output data with arbitrary partitioning, so set it diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/FileRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/FileRelation.scala index 7a2a9eed58..a299fed7fd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/FileRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/FileRelation.scala @@ -22,7 +22,7 @@ package org.apache.spark.sql.execution * the list of paths that it returns will be returned to a user who calls `inputPaths` on any * DataFrame that queries this relation. */ -private[sql] trait FileRelation { +trait FileRelation { /** Returns the list of files that will be read when scanning this relation. */ def inputFiles: Array[String] } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala index 8b62c5507c..39189a2b0c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala @@ -55,7 +55,7 @@ case class GenerateExec( child: SparkPlan) extends UnaryExecNode { - private[sql] override lazy val metrics = Map( + override lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) override def producedAttributes: AttributeSet = AttributeSet(output) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala index f86f42b1f8..556f482f4b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala @@ -26,11 +26,11 @@ import org.apache.spark.sql.execution.metric.SQLMetrics /** * Physical plan node for scanning data from a local collection. */ -private[sql] case class LocalTableScanExec( +case class LocalTableScanExec( output: Seq[Attribute], rows: Seq[InternalRow]) extends LeafExecNode { - private[sql] override lazy val metrics = Map( + override lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) private val unsafeRows: Array[InternalRow] = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/RowIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/RowIterator.scala index 7462dbc4eb..717ff93eab 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/RowIterator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/RowIterator.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.InternalRow * iterator to consume the next row, whereas RowIterator combines these calls into a single * [[advanceNext()]] method. */ -private[sql] abstract class RowIterator { +abstract class RowIterator { /** * Advance this iterator by a single row. Returns `false` if this iterator has no more rows * and `true` otherwise. If this returns `true`, then the new row can be retrieved by calling diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala index 6cb1a44a20..ec07aab359 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.execution.ui.{SparkListenerSQLExecutionEnd, SparkListenerSQLExecutionStart} -private[sql] object SQLExecution { +object SQLExecution { val EXECUTION_ID_KEY = "spark.sql.execution.id" diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala index 6db7f45cfd..d8e0675e3e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala @@ -22,11 +22,9 @@ import org.apache.spark.executor.TaskMetrics import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode, GenerateUnsafeProjection} +import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} import org.apache.spark.sql.catalyst.plans.physical.{Distribution, OrderedDistribution, UnspecifiedDistribution} import org.apache.spark.sql.execution.metric.SQLMetrics -import org.apache.spark.sql.types._ -import org.apache.spark.util.collection.unsafe.sort.RadixSort; /** * Performs (external) sorting. @@ -52,7 +50,7 @@ case class SortExec( private val enableRadixSort = sqlContext.conf.enableRadixSort - override private[sql] lazy val metrics = Map( + override lazy val metrics = Map( "sortTime" -> SQLMetrics.createTimingMetric(sparkContext, "sort time"), "peakMemory" -> SQLMetrics.createSizeMetric(sparkContext, "peak memory"), "spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size")) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index 045ccc7bd6..79cb40948b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -72,24 +72,24 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ /** * Return all metadata that describes more details of this SparkPlan. */ - private[sql] def metadata: Map[String, String] = Map.empty + def metadata: Map[String, String] = Map.empty /** * Return all metrics containing metrics of this SparkPlan. */ - private[sql] def metrics: Map[String, SQLMetric] = Map.empty + def metrics: Map[String, SQLMetric] = Map.empty /** * Reset all the metrics. */ - private[sql] def resetMetrics(): Unit = { + def resetMetrics(): Unit = { metrics.valuesIterator.foreach(_.reset()) } /** * Return a LongSQLMetric according to the name. */ - private[sql] def longMetric(name: String): SQLMetric = metrics(name) + def longMetric(name: String): SQLMetric = metrics(name) // TODO: Move to `DistributedPlan` /** Specifies how data is partitioned across different nodes in the cluster. */ @@ -395,7 +395,7 @@ object SparkPlan { ThreadUtils.newDaemonCachedThreadPool("subquery", 16)) } -private[sql] trait LeafExecNode extends SparkPlan { +trait LeafExecNode extends SparkPlan { override def children: Seq[SparkPlan] = Nil override def producedAttributes: AttributeSet = outputSet } @@ -407,7 +407,7 @@ object UnaryExecNode { } } -private[sql] trait UnaryExecNode extends SparkPlan { +trait UnaryExecNode extends SparkPlan { def child: SparkPlan override def children: Seq[SparkPlan] = child :: Nil @@ -415,7 +415,7 @@ private[sql] trait UnaryExecNode extends SparkPlan { override def outputPartitioning: Partitioning = child.outputPartitioning } -private[sql] trait BinaryExecNode extends SparkPlan { +trait BinaryExecNode extends SparkPlan { def left: SparkPlan def right: SparkPlan diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala index f84070a0c4..7aa93126fd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala @@ -47,7 +47,7 @@ class SparkPlanInfo( } } -private[sql] object SparkPlanInfo { +private[execution] object SparkPlanInfo { def fromSparkPlan(plan: SparkPlan): SparkPlanInfo = { val children = plan match { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 4dfec3ec85..4aaf454285 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.execution -import org.apache.spark.annotation.DeveloperApi import org.apache.spark.rdd.RDD import org.apache.spark.sql.{execution, SaveMode, Strategy} import org.apache.spark.sql.catalyst.InternalRow @@ -43,13 +42,12 @@ import org.apache.spark.sql.streaming.StreamingQuery * writing libraries should instead consider using the stable APIs provided in * [[org.apache.spark.sql.sources]] */ -@DeveloperApi abstract class SparkStrategy extends GenericStrategy[SparkPlan] { override protected def planLater(plan: LogicalPlan): SparkPlan = PlanLater(plan) } -private[sql] case class PlanLater(plan: LogicalPlan) extends LeafExecNode { +case class PlanLater(plan: LogicalPlan) extends LeafExecNode { override def output: Seq[Attribute] = plan.output @@ -58,7 +56,7 @@ private[sql] case class PlanLater(plan: LogicalPlan) extends LeafExecNode { } } -private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { +abstract class SparkStrategies extends QueryPlanner[SparkPlan] { self: SparkPlanner => /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala index 484923428f..8ab553369d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/UnsafeRowSerializer.scala @@ -40,12 +40,12 @@ import org.apache.spark.unsafe.Platform * * @param numFields the number of fields in the row being serialized. */ -private[sql] class UnsafeRowSerializer( +class UnsafeRowSerializer( numFields: Int, dataSize: SQLMetric = null) extends Serializer with Serializable { override def newInstance(): SerializerInstance = new UnsafeRowSerializerInstance(numFields, dataSize) - override private[spark] def supportsRelocationOfSerializedObjects: Boolean = true + override def supportsRelocationOfSerializedObjects: Boolean = true } private class UnsafeRowSerializerInstance( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index ac4c3aae5f..fb57ed7692 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -295,7 +295,7 @@ case class WholeStageCodegenExec(child: SparkPlan) extends UnaryExecNode with Co override def outputPartitioning: Partitioning = child.outputPartitioning override def outputOrdering: Seq[SortOrder] = child.outputOrdering - override private[sql] lazy val metrics = Map( + override lazy val metrics = Map( "pipelineTime" -> SQLMetrics.createTimingMetric(sparkContext, WholeStageCodegenExec.PIPELINE_DURATION_METRIC)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala index 54d7340d8a..cfc47aba88 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala @@ -54,7 +54,7 @@ case class HashAggregateExec( child.output ++ aggregateBufferAttributes ++ aggregateAttributes ++ aggregateExpressions.flatMap(_.aggregateFunction.inputAggBufferAttributes) - override private[sql] lazy val metrics = Map( + override lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), "peakMemory" -> SQLMetrics.createSizeMetric(sparkContext, "peak memory"), "spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size"), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala index 00e45256c4..2a81a823c4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala @@ -49,7 +49,7 @@ case class SortAggregateExec( AttributeSet(resultExpressions.diff(groupingExpressions).map(_.toAttribute)) ++ AttributeSet(aggregateBufferAttributes) - override private[sql] lazy val metrics = Map( + override lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) override def output: Seq[Attribute] = resultExpressions.map(_.toAttribute) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala index b047bc0641..586e1456ac 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala @@ -204,7 +204,7 @@ sealed trait BufferSetterGetterUtils { /** * A Mutable [[Row]] representing a mutable aggregation buffer. */ -private[sql] class MutableAggregationBufferImpl ( +private[aggregate] class MutableAggregationBufferImpl( schema: StructType, toCatalystConverters: Array[Any => Any], toScalaConverters: Array[Any => Any], @@ -266,7 +266,7 @@ private[sql] class MutableAggregationBufferImpl ( /** * A [[Row]] representing an immutable aggregation buffer. */ -private[sql] class InputAggregationBuffer private[sql] ( +private[aggregate] class InputAggregationBuffer( schema: StructType, toCatalystConverters: Array[Any => Any], toScalaConverters: Array[Any => Any], @@ -319,7 +319,7 @@ private[sql] class InputAggregationBuffer private[sql] ( * The internal wrapper used to hook a [[UserDefinedAggregateFunction]] `udaf` in the * internal aggregation code path. */ -private[sql] case class ScalaUDAF( +case class ScalaUDAF( children: Seq[Expression], udaf: UserDefinedAggregateFunction, mutableAggBufferOffset: Int = 0, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index 185c79f899..e6f7081f29 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -102,7 +102,7 @@ case class FilterExec(condition: Expression, child: SparkPlan) } } - private[sql] override lazy val metrics = Map( + override lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) override def inputRDDs(): Seq[RDD[InternalRow]] = { @@ -228,7 +228,7 @@ case class SampleExec( child: SparkPlan) extends UnaryExecNode with CodegenSupport { override def output: Seq[Attribute] = child.output - private[sql] override lazy val metrics = Map( + override lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) protected override def doExecute(): RDD[InternalRow] = { @@ -317,7 +317,7 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range) override val output: Seq[Attribute] = range.output - private[sql] override lazy val metrics = Map( + override lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) // output attributes should not affect the results diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala index 079e122a5a..479934a7af 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala @@ -34,7 +34,7 @@ import org.apache.spark.storage.StorageLevel import org.apache.spark.util.CollectionAccumulator -private[sql] object InMemoryRelation { +object InMemoryRelation { def apply( useCompression: Boolean, batchSize: Int, @@ -55,15 +55,15 @@ private[sql] object InMemoryRelation { private[columnar] case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: InternalRow) -private[sql] case class InMemoryRelation( +case class InMemoryRelation( output: Seq[Attribute], useCompression: Boolean, batchSize: Int, storageLevel: StorageLevel, @transient child: SparkPlan, tableName: Option[String])( - @transient private[sql] var _cachedColumnBuffers: RDD[CachedBatch] = null, - private[sql] val batchStats: CollectionAccumulator[InternalRow] = + @transient var _cachedColumnBuffers: RDD[CachedBatch] = null, + val batchStats: CollectionAccumulator[InternalRow] = child.sqlContext.sparkContext.collectionAccumulator[InternalRow]) extends logical.LeafNode with MultiInstanceRelation { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala index 67a410f539..b86825902a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryTableScanExec.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.types.UserDefinedType -private[sql] case class InMemoryTableScanExec( +case class InMemoryTableScanExec( attributes: Seq[Attribute], predicates: Seq[Expression], @transient relation: InMemoryRelation) @@ -36,7 +36,7 @@ private[sql] case class InMemoryTableScanExec( override protected def innerChildren: Seq[QueryPlan[_]] = Seq(relation) ++ super.innerChildren - private[sql] override lazy val metrics = Map( + override lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) override def output: Seq[Attribute] = attributes diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala index 7eaad81a81..cce1489abd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala @@ -35,7 +35,7 @@ import org.apache.spark.sql.types._ * A logical command that is executed for its side-effects. `RunnableCommand`s are * wrapped in `ExecutedCommand` during execution. */ -private[sql] trait RunnableCommand extends LogicalPlan with logical.Command { +trait RunnableCommand extends LogicalPlan with logical.Command { override def output: Seq[Attribute] = Seq.empty override def children: Seq[LogicalPlan] = Seq.empty def run(sparkSession: SparkSession): Seq[Row] @@ -45,7 +45,7 @@ private[sql] trait RunnableCommand extends LogicalPlan with logical.Command { * A physical operator that executes the run method of a `RunnableCommand` and * saves the result to prevent multiple executions. */ -private[sql] case class ExecutedCommandExec(cmd: RunnableCommand) extends SparkPlan { +case class ExecutedCommandExec(cmd: RunnableCommand) extends SparkPlan { /** * A concrete command should override this lazy field to wrap up any side effects caused by the * command or any other computation that should be evaluated exactly once. The value of this field diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BucketingUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BucketingUtils.scala index 377b818096..ea4fe9c8ad 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BucketingUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BucketingUtils.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution.datasources -private[sql] object BucketingUtils { +object BucketingUtils { // The file name of bucketed data should have 3 parts: // 1. some other information in the head of file name // 2. bucket id part, some numbers, starts with "_" diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index ed8ccca6de..733ba18528 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -43,7 +43,7 @@ import org.apache.spark.unsafe.types.UTF8String * Replaces generic operations with specific variants that are designed to work with Spark * SQL Data Sources. */ -private[sql] case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] { +case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] { def resolver: Resolver = { if (conf.caseSensitiveAnalysis) { @@ -53,8 +53,8 @@ private[sql] case class DataSourceAnalysis(conf: CatalystConf) extends Rule[Logi } } - // The access modifier is used to expose this method to tests. - private[sql] def convertStaticPartitions( + // Visible for testing. + def convertStaticPartitions( sourceAttributes: Seq[Attribute], providedPartitions: Map[String, Option[String]], targetAttributes: Seq[Attribute], @@ -202,7 +202,7 @@ private[sql] case class DataSourceAnalysis(conf: CatalystConf) extends Rule[Logi * Replaces [[SimpleCatalogRelation]] with data source table if its table property contains data * source information. */ -private[sql] class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] { +class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] { private def readDataSourceTable(sparkSession: SparkSession, table: CatalogTable): LogicalPlan = { val schema = DDLUtils.getSchemaFromTableProperties(table) @@ -242,7 +242,7 @@ private[sql] class FindDataSourceTable(sparkSession: SparkSession) extends Rule[ /** * A Strategy for planning scans over data sources defined using the sources API. */ -private[sql] object DataSourceStrategy extends Strategy with Logging { +object DataSourceStrategy extends Strategy with Logging { def apply(plan: LogicalPlan): Seq[execution.SparkPlan] = plan match { case PhysicalOperation(projects, filters, l @ LogicalRelation(t: CatalystScan, _, _)) => pruneFilterProjectRaw( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala index 3ac09d99c7..8b36caf6f1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala @@ -18,14 +18,11 @@ package org.apache.spark.sql.execution.datasources import org.apache.spark.internal.Logging -import org.apache.spark.rdd.RDD import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.{expressions, InternalRow} -import org.apache.spark.sql.catalyst.catalog.BucketSpec +import org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, UnknownPartitioning} import org.apache.spark.sql.execution.FileSourceScanExec import org.apache.spark.sql.execution.SparkPlan @@ -52,7 +49,7 @@ import org.apache.spark.sql.execution.SparkPlan * is under the threshold with the addition of the next file, add it. If not, open a new bucket * and add it. Proceed to the next file. */ -private[sql] object FileSourceStrategy extends Strategy with Logging { +object FileSourceStrategy extends Strategy with Logging { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case PhysicalOperation(projects, filters, l @ LogicalRelation(fsRelation: HadoopFsRelation, _, table)) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala index 8549ae96e2..b2ff68a833 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.sources.InsertableRelation /** * Inserts the results of `query` in to a relation that extends [[InsertableRelation]]. */ -private[sql] case class InsertIntoDataSourceCommand( +case class InsertIntoDataSourceCommand( logicalRelation: LogicalRelation, query: LogicalPlan, overwrite: Boolean) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index b49525c8ce..de822180ab 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -55,7 +55,7 @@ import org.apache.spark.sql.internal.SQLConf * 4. If all tasks are committed, commit the job, otherwise aborts the job; If any exception is * thrown during job commitment, also aborts the job. */ -private[sql] case class InsertIntoHadoopFsRelationCommand( +case class InsertIntoHadoopFsRelationCommand( outputPath: Path, partitionColumns: Seq[Attribute], bucketSpec: Option[BucketSpec], diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index c3561099d6..504464216e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Cast, Literal} import org.apache.spark.sql.types._ +// TODO: We should tighten up visibility of the classes here once we clean up Hive coupling. object PartitionDirectory { def apply(values: InternalRow, path: String): PartitionDirectory = @@ -41,22 +42,23 @@ object PartitionDirectory { * Holds a directory in a partitioned collection of files as well as as the partition values * in the form of a Row. Before scanning, the files at `path` need to be enumerated. */ -private[sql] case class PartitionDirectory(values: InternalRow, path: Path) +case class PartitionDirectory(values: InternalRow, path: Path) -private[sql] case class PartitionSpec( +case class PartitionSpec( partitionColumns: StructType, partitions: Seq[PartitionDirectory]) -private[sql] object PartitionSpec { +object PartitionSpec { val emptySpec = PartitionSpec(StructType(Seq.empty[StructField]), Seq.empty[PartitionDirectory]) } -private[sql] object PartitioningUtils { +object PartitioningUtils { // This duplicates default value of Hive `ConfVars.DEFAULTPARTITIONNAME`, since sql/core doesn't // depend on Hive. - private[sql] val DEFAULT_PARTITION_NAME = "__HIVE_DEFAULT_PARTITION__" + val DEFAULT_PARTITION_NAME = "__HIVE_DEFAULT_PARTITION__" - private[sql] case class PartitionValues(columnNames: Seq[String], literals: Seq[Literal]) { + private[datasources] case class PartitionValues(columnNames: Seq[String], literals: Seq[Literal]) + { require(columnNames.size == literals.size) } @@ -83,7 +85,7 @@ private[sql] object PartitioningUtils { * path = "hdfs://:/path/to/partition/a=2/b=world/c=6.28"))) * }}} */ - private[sql] def parsePartitions( + private[datasources] def parsePartitions( paths: Seq[Path], defaultPartitionName: String, typeInference: Boolean, @@ -166,7 +168,7 @@ private[sql] object PartitioningUtils { * hdfs://:/path/to/partition * }}} */ - private[sql] def parsePartition( + private[datasources] def parsePartition( path: Path, defaultPartitionName: String, typeInference: Boolean, @@ -249,7 +251,7 @@ private[sql] object PartitioningUtils { * DoubleType -> StringType * }}} */ - private[sql] def resolvePartitions( + def resolvePartitions( pathsWithPartitionValues: Seq[(Path, PartitionValues)]): Seq[PartitionValues] = { if (pathsWithPartitionValues.isEmpty) { Seq.empty @@ -275,7 +277,7 @@ private[sql] object PartitioningUtils { } } - private[sql] def listConflictingPartitionColumns( + private[datasources] def listConflictingPartitionColumns( pathWithPartitionValues: Seq[(Path, PartitionValues)]): String = { val distinctPartColNames = pathWithPartitionValues.map(_._2.columnNames).distinct @@ -308,7 +310,7 @@ private[sql] object PartitioningUtils { * [[IntegerType]], [[LongType]], [[DoubleType]], [[DecimalType.SYSTEM_DEFAULT]], and * [[StringType]]. */ - private[sql] def inferPartitionColumnValue( + private[datasources] def inferPartitionColumnValue( raw: String, defaultPartitionName: String, typeInference: Boolean): Literal = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala index c801436b0a..447c237e3a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala @@ -41,14 +41,14 @@ import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter /** A container for all the details required when writing to a table. */ -case class WriteRelation( +private[datasources] case class WriteRelation( sparkSession: SparkSession, dataSchema: StructType, path: String, prepareJobForWrite: Job => OutputWriterFactory, bucketSpec: Option[BucketSpec]) -private[sql] abstract class BaseWriterContainer( +private[datasources] abstract class BaseWriterContainer( @transient val relation: WriteRelation, @transient private val job: Job, isAppend: Boolean) @@ -235,7 +235,7 @@ private[sql] abstract class BaseWriterContainer( /** * A writer that writes all of the rows in a partition to a single file. */ -private[sql] class DefaultWriterContainer( +private[datasources] class DefaultWriterContainer( relation: WriteRelation, job: Job, isAppend: Boolean) @@ -294,7 +294,7 @@ private[sql] class DefaultWriterContainer( * done by maintaining a HashMap of open files until `maxFiles` is reached. If this occurs, the * writer externally sorts the remaining rows and then writes out them out one file at a time. */ -private[sql] class DynamicPartitionWriterContainer( +private[datasources] class DynamicPartitionWriterContainer( relation: WriteRelation, job: Job, partitionColumns: Seq[Attribute], diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala index 22fb8163b1..10fe541a2c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVOptions.scala @@ -23,7 +23,7 @@ import java.text.SimpleDateFormat import org.apache.spark.internal.Logging import org.apache.spark.sql.execution.datasources.{CompressionCodecs, ParseModes} -private[sql] class CSVOptions(@transient private val parameters: Map[String, String]) +private[csv] class CSVOptions(@transient private val parameters: Map[String, String]) extends Logging with Serializable { private def getChar(paramName: String, default: Char): Char = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParser.scala index 13ae76d498..64bdd6f464 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVParser.scala @@ -28,7 +28,7 @@ import org.apache.spark.internal.Logging * * @param params Parameters object */ -private[sql] class CsvReader(params: CSVOptions) { +private[csv] class CsvReader(params: CSVOptions) { private val parser: CsvParser = { val settings = new CsvParserSettings() @@ -65,7 +65,7 @@ private[sql] class CsvReader(params: CSVOptions) { * @param params Parameters object for configuration * @param headers headers for columns */ -private[sql] class LineCsvWriter(params: CSVOptions, headers: Seq[String]) extends Logging { +private[csv] class LineCsvWriter(params: CSVOptions, headers: Seq[String]) extends Logging { private val writerSettings = new CsvWriterSettings private val format = writerSettings.getFormat diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala index c6ba424d86..6b2f9fc61e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala @@ -168,7 +168,7 @@ object CSVRelation extends Logging { } } -private[sql] class CSVOutputWriterFactory(params: CSVOptions) extends OutputWriterFactory { +private[csv] class CSVOutputWriterFactory(params: CSVOptions) extends OutputWriterFactory { override def newInstance( path: String, bucketId: Option[Int], @@ -179,7 +179,7 @@ private[sql] class CSVOutputWriterFactory(params: CSVOptions) extends OutputWrit } } -private[sql] class CsvOutputWriter( +private[csv] class CsvOutputWriter( path: String, dataSchema: StructType, context: TaskAttemptContext, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala index 5ce8350de2..f068779b3e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala @@ -76,7 +76,7 @@ abstract class OutputWriterFactory extends Serializable { * through the [[OutputWriterFactory]] implementation. * @since 2.0.0 */ - private[sql] def newWriter(path: String): OutputWriter = { + def newWriter(path: String): OutputWriter = { throw new UnsupportedOperationException("newInstance with just path not supported") } } @@ -263,7 +263,7 @@ trait FileFormat { * appends partition values to [[InternalRow]]s produced by the reader function [[buildReader]] * returns. */ - private[sql] def buildReaderWithPartitionValues( + def buildReaderWithPartitionValues( sparkSession: SparkSession, dataSchema: StructType, partitionSchema: StructType, @@ -357,7 +357,7 @@ trait FileCatalog { /** * Helper methods for gathering metadata from HDFS. */ -private[sql] object HadoopFsRelation extends Logging { +object HadoopFsRelation extends Logging { /** Checks if we should filter out this path name. */ def shouldFilterOut(pathName: String): Boolean = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala index e267e77c52..6dad8cbef7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRDD.scala @@ -38,11 +38,11 @@ import org.apache.spark.unsafe.types.UTF8String /** * Data corresponding to one partition of a JDBCRDD. */ -private[sql] case class JDBCPartition(whereClause: String, idx: Int) extends Partition { +case class JDBCPartition(whereClause: String, idx: Int) extends Partition { override def index: Int = idx } -private[sql] object JDBCRDD extends Logging { +object JDBCRDD extends Logging { /** * Maps a JDBC type to a Catalyst type. This function is called only when @@ -192,7 +192,7 @@ private[sql] object JDBCRDD extends Logging { * Turns a single Filter into a String representing a SQL expression. * Returns None for an unhandled filter. */ - private[jdbc] def compileFilter(f: Filter): Option[String] = { + def compileFilter(f: Filter): Option[String] = { Option(f match { case EqualTo(attr, value) => s"$attr = ${compileValue(value)}" case EqualNullSafe(attr, value) => @@ -275,7 +275,7 @@ private[sql] object JDBCRDD extends Logging { * driver code and the workers must be able to access the database; the driver * needs to fetch the schema while the workers need to fetch the data. */ -private[sql] class JDBCRDD( +private[jdbc] class JDBCRDD( sc: SparkContext, getConnection: () => Connection, schema: StructType, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index ea32506c09..612a295c0e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -51,7 +51,7 @@ import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ import org.apache.spark.util.SerializableConfiguration -private[sql] class ParquetFileFormat +class ParquetFileFormat extends FileFormat with DataSourceRegister with Logging @@ -268,7 +268,7 @@ private[sql] class ParquetFileFormat true } - override private[sql] def buildReaderWithPartitionValues( + override def buildReaderWithPartitionValues( sparkSession: SparkSession, dataSchema: StructType, partitionSchema: StructType, @@ -424,7 +424,7 @@ private[sql] class ParquetFileFormat * writes the data to the path used to generate the output writer. Callers of this factory * has to ensure which files are to be considered as committed. */ -private[sql] class ParquetOutputWriterFactory( +private[parquet] class ParquetOutputWriterFactory( sqlConf: SQLConf, dataSchema: StructType, hadoopConf: Configuration, @@ -473,7 +473,7 @@ private[sql] class ParquetOutputWriterFactory( * Returns a [[OutputWriter]] that writes data to the give path without using * [[OutputCommitter]]. */ - override private[sql] def newWriter(path: String): OutputWriter = new OutputWriter { + override def newWriter(path: String): OutputWriter = new OutputWriter { // Create TaskAttemptContext that is used to pass on Configuration to the ParquetRecordWriter private val hadoopTaskAttemptId = new TaskAttemptID(new TaskID(new JobID, TaskType.MAP, 0), 0) @@ -520,7 +520,7 @@ private[sql] class ParquetOutputWriterFactory( // NOTE: This class is instantiated and used on executor side only, no need to be serializable. -private[sql] class ParquetOutputWriter( +private[parquet] class ParquetOutputWriter( path: String, bucketId: Option[Int], context: TaskAttemptContext) @@ -558,12 +558,13 @@ private[sql] class ParquetOutputWriter( override def write(row: Row): Unit = throw new UnsupportedOperationException("call writeInternal") - override protected[sql] def writeInternal(row: InternalRow): Unit = recordWriter.write(null, row) + override def writeInternal(row: InternalRow): Unit = recordWriter.write(null, row) override def close(): Unit = recordWriter.close(context) } -private[sql] object ParquetFileFormat extends Logging { + +object ParquetFileFormat extends Logging { /** * If parquet's block size (row group size) setting is larger than the min split size, * we use parquet's block size setting as the min split size. Otherwise, we will create @@ -710,7 +711,7 @@ private[sql] object ParquetFileFormat extends Logging { * distinguish binary and string). This method generates a correct schema by merging Metastore * schema data types and Parquet schema field names. */ - private[sql] def mergeMetastoreParquetSchema( + def mergeMetastoreParquetSchema( metastoreSchema: StructType, parquetSchema: StructType): StructType = { def schemaConflictMessage: String = diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index 426263fa44..a6e9788097 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.types._ /** * Some utility function to convert Spark data source filters to Parquet filters. */ -private[sql] object ParquetFilters { +private[parquet] object ParquetFilters { private val makeEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = { case BooleanType => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala index dd2e915e7b..3eec582714 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetOptions.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.internal.SQLConf /** * Options for the Parquet data source. */ -private[sql] class ParquetOptions( +private[parquet] class ParquetOptions( @transient private val parameters: Map[String, String], @transient private val sqlConf: SQLConf) extends Serializable { @@ -56,8 +56,8 @@ private[sql] class ParquetOptions( } -private[sql] object ParquetOptions { - private[sql] val MERGE_SCHEMA = "mergeSchema" +object ParquetOptions { + val MERGE_SCHEMA = "mergeSchema" // The parquet compression short names private val shortParquetCompressionCodecNames = Map( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index d5b92323d4..c133dda13e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -36,7 +36,7 @@ import org.apache.spark.sql.types.{AtomicType, StructType} /** * Try to replaces [[UnresolvedRelation]]s with [[ResolveDataSource]]. */ -private[sql] class ResolveDataSource(sparkSession: SparkSession) extends Rule[LogicalPlan] { +class ResolveDataSource(sparkSession: SparkSession) extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { case u: UnresolvedRelation if u.tableIdentifier.database.isDefined => try { @@ -195,7 +195,7 @@ case class PreprocessDDL(conf: SQLConf) extends Rule[LogicalPlan] { * table. It also does data type casting and field renaming, to make sure that the columns to be * inserted have the correct data type and fields have the correct names. */ -private[sql] case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] { +case class PreprocessTableInsertion(conf: SQLConf) extends Rule[LogicalPlan] { private def preprocess( insert: InsertIntoTable, tblName: String, @@ -275,7 +275,7 @@ private[sql] case class PreprocessTableInsertion(conf: SQLConf) extends Rule[Log /** * A rule to do various checks before inserting into or writing to a data source table. */ -private[sql] case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog) +case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog) extends (LogicalPlan => Unit) { def failAnalysis(msg: String): Unit = { throw new AnalysisException(msg) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala index e89f792496..082f97a880 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala @@ -104,7 +104,7 @@ package object debug { } } - private[sql] case class DebugExec(child: SparkPlan) extends UnaryExecNode with CodegenSupport { + case class DebugExec(child: SparkPlan) extends UnaryExecNode with CodegenSupport { def output: Seq[Attribute] = child.output class SetAccumulator[T] extends AccumulatorV2[T, HashSet[T]] { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala index bd0841db7e..a809076de5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala @@ -38,7 +38,7 @@ case class BroadcastExchangeExec( mode: BroadcastMode, child: SparkPlan) extends Exchange { - override private[sql] lazy val metrics = Map( + override lazy val metrics = Map( "dataSize" -> SQLMetrics.createMetric(sparkContext, "data size (bytes)"), "collectTime" -> SQLMetrics.createMetric(sparkContext, "time to collect (ms)"), "buildTime" -> SQLMetrics.createMetric(sparkContext, "time to build (ms)"), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala index 2ea6ee38a9..57da85fa84 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala @@ -79,7 +79,7 @@ import org.apache.spark.sql.execution.{ShuffledRowRDD, SparkPlan} * - post-shuffle partition 1: pre-shuffle partition 2 * - post-shuffle partition 2: pre-shuffle partition 3 and 4 */ -private[sql] class ExchangeCoordinator( +class ExchangeCoordinator( numExchanges: Int, advisoryTargetPostShuffleInputSize: Long, minNumPostShufflePartitions: Option[Int] = None) @@ -112,7 +112,7 @@ private[sql] class ExchangeCoordinator( * Estimates partition start indices for post-shuffle partitions based on * mapOutputStatistics provided by all pre-shuffle stages. */ - private[sql] def estimatePartitionStartIndices( + def estimatePartitionStartIndices( mapOutputStatistics: Array[MapOutputStatistics]): Array[Int] = { // If we have mapOutputStatistics.length < numExchange, it is because we do not submit // a stage when the number of partitions of this dependency is 0. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala index afe0fbea73..7a4a251370 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala @@ -40,7 +40,7 @@ case class ShuffleExchange( child: SparkPlan, @transient coordinator: Option[ExchangeCoordinator]) extends Exchange { - override private[sql] lazy val metrics = Map( + override lazy val metrics = Map( "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size")) override def nodeName: String = { @@ -81,7 +81,8 @@ case class ShuffleExchange( * the partitioning scheme defined in `newPartitioning`. Those partitions of * the returned ShuffleDependency will be the input of shuffle. */ - private[sql] def prepareShuffleDependency(): ShuffleDependency[Int, InternalRow, InternalRow] = { + private[exchange] def prepareShuffleDependency() + : ShuffleDependency[Int, InternalRow, InternalRow] = { ShuffleExchange.prepareShuffleDependency( child.execute(), child.output, newPartitioning, serializer) } @@ -92,7 +93,7 @@ case class ShuffleExchange( * partition start indices array. If this optional array is defined, the returned * [[ShuffledRowRDD]] will fetch pre-shuffle partitions based on indices of this array. */ - private[sql] def preparePostShuffleRDD( + private[exchange] def preparePostShuffleRDD( shuffleDependency: ShuffleDependency[Int, InternalRow, InternalRow], specifiedPartitionStartIndices: Option[Array[Int]] = None): ShuffledRowRDD = { // If an array of partition start indices is provided, we need to use this array @@ -194,7 +195,7 @@ object ShuffleExchange { * the partitioning scheme defined in `newPartitioning`. Those partitions of * the returned ShuffleDependency will be the input of shuffle. */ - private[sql] def prepareShuffleDependency( + def prepareShuffleDependency( rdd: RDD[InternalRow], outputAttributes: Seq[Attribute], newPartitioning: Partitioning, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala index 7c194ab726..0f24baacd1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala @@ -45,7 +45,7 @@ case class BroadcastHashJoinExec( right: SparkPlan) extends BinaryExecNode with HashJoin with CodegenSupport { - override private[sql] lazy val metrics = Map( + override lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) override def requiredChildDistribution: Seq[Distribution] = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala index 4d43765f8f..6a9965f1a2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala @@ -37,7 +37,7 @@ case class BroadcastNestedLoopJoinExec( condition: Option[Expression], withinBroadcastThreshold: Boolean = true) extends BinaryExecNode { - override private[sql] lazy val metrics = Map( + override lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) /** BuildRight means the right relation <=> the broadcast relation. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala index 0553086a22..57866df90d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala @@ -34,7 +34,6 @@ import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter * will be much faster than building the right partition for every row in left RDD, it also * materialize the right RDD (in case of the right RDD is nondeterministic). */ -private[spark] class UnsafeCartesianRDD(left : RDD[UnsafeRow], right : RDD[UnsafeRow], numFieldsOfRight: Int) extends CartesianRDD[UnsafeRow, UnsafeRow](left.sparkContext, left, right) { @@ -78,7 +77,7 @@ class UnsafeCartesianRDD(left : RDD[UnsafeRow], right : RDD[UnsafeRow], numField for (x <- rdd1.iterator(partition.s1, context); y <- createIter()) yield (x, y) CompletionIterator[(UnsafeRow, UnsafeRow), Iterator[(UnsafeRow, UnsafeRow)]]( - resultIter, sorter.cleanupResources) + resultIter, sorter.cleanupResources()) } } @@ -89,7 +88,7 @@ case class CartesianProductExec( condition: Option[Expression]) extends BinaryExecNode { override def output: Seq[Attribute] = left.output ++ right.output - override private[sql] lazy val metrics = Map( + override lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) protected override def doPrepare(): Unit = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala index 0036f9aadc..afb6e5e3dd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala @@ -39,7 +39,7 @@ case class ShuffledHashJoinExec( right: SparkPlan) extends BinaryExecNode with HashJoin { - override private[sql] lazy val metrics = Map( + override lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), "buildDataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size of build side"), "buildTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to build hash map")) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala index fac6b8de8e..5c9c1e6062 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala @@ -40,7 +40,7 @@ case class SortMergeJoinExec( left: SparkPlan, right: SparkPlan) extends BinaryExecNode with CodegenSupport { - override private[sql] lazy val metrics = Map( + override lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) override def output: Seq[Attribute] = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala index 9817a56f49..15afa0b1a5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala @@ -55,17 +55,17 @@ class SQLMetric(val metricType: String, initValue: Long = 0L) extends Accumulato override def value: Long = _value // Provide special identifier as metadata so we can tell that this is a `SQLMetric` later - private[spark] override def toInfo(update: Option[Any], value: Option[Any]): AccumulableInfo = { + override def toInfo(update: Option[Any], value: Option[Any]): AccumulableInfo = { new AccumulableInfo( id, name, update, value, true, true, Some(AccumulatorContext.SQL_ACCUM_IDENTIFIER)) } } -private[sql] object SQLMetrics { - private[sql] val SUM_METRIC = "sum" - private[sql] val SIZE_METRIC = "size" - private[sql] val TIMING_METRIC = "timing" +object SQLMetrics { + private val SUM_METRIC = "sum" + private val SIZE_METRIC = "size" + private val TIMING_METRIC = "timing" def createMetric(sc: SparkContext, name: String): SQLMetric = { val acc = new SQLMetric(SUM_METRIC) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala index 829bcae6f9..16e44845d5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.execution.SparkPlan * Extracts all the Python UDFs in logical aggregate, which depends on aggregate expression or * grouping key, evaluate them after aggregate. */ -private[spark] object ExtractPythonUDFFromAggregate extends Rule[LogicalPlan] { +object ExtractPythonUDFFromAggregate extends Rule[LogicalPlan] { /** * Returns whether the expression could only be evaluated within aggregate. @@ -90,7 +90,7 @@ private[spark] object ExtractPythonUDFFromAggregate extends Rule[LogicalPlan] { * This has the limitation that the input to the Python UDF is not allowed include attributes from * multiple child operators. */ -private[spark] object ExtractPythonUDFs extends Rule[SparkPlan] { +object ExtractPythonUDFs extends Rule[SparkPlan] { private def hasPythonUDF(e: Expression): Boolean = { e.find(_.isInstanceOf[PythonUDF]).isDefined diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/r/MapPartitionsRWrapper.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/r/MapPartitionsRWrapper.scala index 70539da348..d2178e971e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/r/MapPartitionsRWrapper.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/r/MapPartitionsRWrapper.scala @@ -21,12 +21,12 @@ import org.apache.spark.api.r._ import org.apache.spark.broadcast.Broadcast import org.apache.spark.sql.api.r.SQLUtils._ import org.apache.spark.sql.Row -import org.apache.spark.sql.types.{BinaryType, StructField, StructType} +import org.apache.spark.sql.types.StructType /** * A function wrapper that applies the given R function to each partition. */ -private[sql] case class MapPartitionsRWrapper( +case class MapPartitionsRWrapper( func: Array[Byte], packageNames: Array[Byte], broadcastVars: Array[Broadcast[Object]], diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/FrequentItems.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/FrequentItems.scala index b19344f043..b9dbfcf773 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/FrequentItems.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/FrequentItems.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.{Column, DataFrame, Dataset, Row} import org.apache.spark.sql.catalyst.plans.logical.LocalRelation import org.apache.spark.sql.types._ -private[sql] object FrequentItems extends Logging { +object FrequentItems extends Logging { /** A helper class wrapping `MutableMap[Any, Long]` for simplicity. */ private class FreqItemCounter(size: Int) extends Serializable { @@ -79,7 +79,7 @@ private[sql] object FrequentItems extends Logging { * than 1e-4. * @return A Local DataFrame with the Array of frequent items for each column. */ - private[sql] def singlePassFreqItems( + def singlePassFreqItems( df: DataFrame, cols: Seq[String], support: Double): DataFrame = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala index ea58df70b3..50eecb4098 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/stat/StatFunctions.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String -private[sql] object StatFunctions extends Logging { +object StatFunctions extends Logging { import QuantileSummaries.Stats @@ -337,7 +337,7 @@ private[sql] object StatFunctions extends Logging { } /** Calculate the Pearson Correlation Coefficient for the given columns */ - private[sql] def pearsonCorrelation(df: DataFrame, cols: Seq[String]): Double = { + def pearsonCorrelation(df: DataFrame, cols: Seq[String]): Double = { val counts = collectStatisticalData(df, cols, "correlation") counts.Ck / math.sqrt(counts.MkX * counts.MkY) } @@ -407,13 +407,13 @@ private[sql] object StatFunctions extends Logging { * @param cols the column names * @return the covariance of the two columns. */ - private[sql] def calculateCov(df: DataFrame, cols: Seq[String]): Double = { + def calculateCov(df: DataFrame, cols: Seq[String]): Double = { val counts = collectStatisticalData(df, cols, "covariance") counts.cov } /** Generate a table of frequencies for the elements of two columns. */ - private[sql] def crossTabulate(df: DataFrame, col1: String, col2: String): DataFrame = { + def crossTabulate(df: DataFrame, col1: String, col2: String): DataFrame = { val tableName = s"${col1}_$col2" val counts = df.groupBy(col1, col2).agg(count("*")).take(1e6.toInt) if (counts.length == 1e6.toInt) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala index 7367c68d0a..05294df267 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.streaming.OutputMode * A variant of [[QueryExecution]] that allows the execution of the given [[LogicalPlan]] * plan incrementally. Possibly preserving state in between each execution. */ -class IncrementalExecution private[sql]( +class IncrementalExecution( sparkSession: SparkSession, logicalPlan: LogicalPlan, val outputMode: OutputMode, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index af2229a46b..66fb5a4bde 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -49,10 +49,10 @@ class StreamExecution( override val id: Long, override val name: String, checkpointRoot: String, - private[sql] val logicalPlan: LogicalPlan, + val logicalPlan: LogicalPlan, val sink: Sink, val trigger: Trigger, - private[sql] val triggerClock: Clock, + val triggerClock: Clock, val outputMode: OutputMode) extends StreamingQuery with Logging { @@ -74,7 +74,7 @@ class StreamExecution( * input source. */ @volatile - private[sql] var committedOffsets = new StreamProgress + var committedOffsets = new StreamProgress /** * Tracks the offsets that are available to be processed, but have not yet be committed to the @@ -102,10 +102,10 @@ class StreamExecution( private var state: State = INITIALIZED @volatile - private[sql] var lastExecution: QueryExecution = null + var lastExecution: QueryExecution = null @volatile - private[sql] var streamDeathCause: StreamingQueryException = null + var streamDeathCause: StreamingQueryException = null /* Get the call site in the caller thread; will pass this into the micro batch thread */ private val callSite = Utils.getCallSite() @@ -115,7 +115,7 @@ class StreamExecution( * [[org.apache.spark.util.UninterruptibleThread]] to avoid potential deadlocks in using * [[HDFSMetadataLog]]. See SPARK-14131 for more details. */ - private[sql] val microBatchThread = + val microBatchThread = new UninterruptibleThread(s"stream execution thread for $name") { override def run(): Unit = { // To fix call site like "run at :0", we bridge the call site from the caller @@ -131,8 +131,7 @@ class StreamExecution( * processing is done. Thus, the Nth record in this log indicated data that is currently being * processed and the N-1th entry indicates which offsets have been durably committed to the sink. */ - private[sql] val offsetLog = - new HDFSMetadataLog[CompositeOffset](sparkSession, checkpointFile("offsets")) + val offsetLog = new HDFSMetadataLog[CompositeOffset](sparkSession, checkpointFile("offsets")) /** Whether the query is currently active or not */ override def isActive: Boolean = state == ACTIVE @@ -159,7 +158,7 @@ class StreamExecution( * Starts the execution. This returns only after the thread has started and [[QueryStarted]] event * has been posted to all the listeners. */ - private[sql] def start(): Unit = { + def start(): Unit = { microBatchThread.setDaemon(true) microBatchThread.start() startLatch.await() // Wait until thread started and QueryStart event has been posted @@ -518,7 +517,7 @@ class StreamExecution( case object TERMINATED extends State } -private[sql] object StreamExecution { +object StreamExecution { private val _nextId = new AtomicLong(0) def nextId: Long = _nextId.getAndIncrement() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala index 405a5f0387..db0bd9e6bc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamProgress.scala @@ -26,7 +26,7 @@ class StreamProgress( val baseMap: immutable.Map[Source, Offset] = new immutable.HashMap[Source, Offset]) extends scala.collection.immutable.Map[Source, Offset] { - private[sql] def toCompositeOffset(source: Seq[Source]): CompositeOffset = { + def toCompositeOffset(source: Seq[Source]): CompositeOffset = { CompositeOffset(source.map(get)) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala index 066765324a..a67fdceb3c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala @@ -113,7 +113,7 @@ case class KeyRemoved(key: UnsafeRow) extends StoreUpdate * the store is the active instance. Accordingly, it either keeps it loaded and performs * maintenance, or unloads the store. */ -private[sql] object StateStore extends Logging { +object StateStore extends Logging { val MAINTENANCE_INTERVAL_CONFIG = "spark.sql.streaming.stateStore.maintenanceInterval" val MAINTENANCE_INTERVAL_DEFAULT_SECS = 60 diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala index e418217238..d945d7aff2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala @@ -45,7 +45,7 @@ private object StopCoordinator extends StateStoreCoordinatorMessage /** Helper object used to create reference to [[StateStoreCoordinator]]. */ -private[sql] object StateStoreCoordinatorRef extends Logging { +object StateStoreCoordinatorRef extends Logging { private val endpointName = "StateStoreCoordinator" @@ -77,7 +77,7 @@ private[sql] object StateStoreCoordinatorRef extends Logging { * Reference to a [[StateStoreCoordinator]] that can be used to coordinate instances of * [[StateStore]]s across all the executors, and get their locations for job scheduling. */ -private[sql] class StateStoreCoordinatorRef private(rpcEndpointRef: RpcEndpointRef) { +class StateStoreCoordinatorRef private(rpcEndpointRef: RpcEndpointRef) { private[state] def reportActiveInstance( storeId: StateStoreId, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala index 4b4fa126b8..23fc0bd0bc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala @@ -24,7 +24,7 @@ import scala.xml.Node import org.apache.spark.internal.Logging import org.apache.spark.ui.{UIUtils, WebUIPage} -private[sql] class ExecutionPage(parent: SQLTab) extends WebUIPage("execution") with Logging { +class ExecutionPage(parent: SQLTab) extends WebUIPage("execution") with Logging { private val listener = parent.listener diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala index 6e94791901..60f13432d7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLListener.scala @@ -46,14 +46,14 @@ case class SparkListenerSQLExecutionEnd(executionId: Long, time: Long) case class SparkListenerDriverAccumUpdates(executionId: Long, accumUpdates: Seq[(Long, Long)]) extends SparkListenerEvent -private[sql] class SQLHistoryListenerFactory extends SparkHistoryListenerFactory { +class SQLHistoryListenerFactory extends SparkHistoryListenerFactory { override def createListeners(conf: SparkConf, sparkUI: SparkUI): Seq[SparkListener] = { List(new SQLHistoryListener(conf, sparkUI)) } } -private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Logging { +class SQLListener(conf: SparkConf) extends SparkListener with Logging { private val retainedExecutions = conf.getInt("spark.sql.ui.retainedExecutions", 1000) @@ -333,7 +333,7 @@ private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Loggi /** * A [[SQLListener]] for rendering the SQL UI in the history server. */ -private[spark] class SQLHistoryListener(conf: SparkConf, sparkUI: SparkUI) +class SQLHistoryListener(conf: SparkConf, sparkUI: SparkUI) extends SQLListener(conf) { private var sqlTabAttached = false diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala index e8675ce749..d0376af3e3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLTab.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.ui import org.apache.spark.internal.Logging import org.apache.spark.ui.{SparkUI, SparkUITab} -private[sql] class SQLTab(val listener: SQLListener, sparkUI: SparkUI) +class SQLTab(val listener: SQLListener, sparkUI: SparkUI) extends SparkUITab(sparkUI, "SQL") with Logging { val parent = sparkUI @@ -32,6 +32,6 @@ private[sql] class SQLTab(val listener: SQLListener, sparkUI: SparkUI) parent.addStaticHandler(SQLTab.STATIC_RESOURCE_DIR, "/static/sql") } -private[sql] object SQLTab { +object SQLTab { private val STATIC_RESOURCE_DIR = "org/apache/spark/sql/execution/ui/static" } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala index 8f5681bfc7..4bb9d6fef4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala @@ -24,7 +24,7 @@ import scala.collection.mutable import org.apache.commons.lang3.StringEscapeUtils import org.apache.spark.sql.execution.{SparkPlanInfo, WholeStageCodegenExec} -import org.apache.spark.sql.execution.metric.SQLMetrics + /** * A graph used for storing information of an executionPlan of DataFrame. @@ -32,7 +32,7 @@ import org.apache.spark.sql.execution.metric.SQLMetrics * Each graph is defined with a set of nodes and a set of edges. Each node represents a node in the * SparkPlan tree, and each edge represents a parent-child relationship between two nodes. */ -private[ui] case class SparkPlanGraph( +case class SparkPlanGraph( nodes: Seq[SparkPlanGraphNode], edges: Seq[SparkPlanGraphEdge]) { def makeDotFile(metrics: Map[Long, String]): String = { @@ -55,7 +55,7 @@ private[ui] case class SparkPlanGraph( } } -private[sql] object SparkPlanGraph { +object SparkPlanGraph { /** * Build a SparkPlanGraph from the root of a SparkPlan tree. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala index 6c43fe3177..54aee5e02b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.internal -import org.apache.hadoop.conf.Configuration - import org.apache.spark.SparkContext import org.apache.spark.internal.Logging import org.apache.spark.sql.{SparkSession, SQLContext} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala index cc3e74b4e8..a716a3eab6 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala @@ -54,7 +54,7 @@ case class HiveTableScanExec( require(partitionPruningPred.isEmpty || relation.hiveQlTable.isPartitioned, "Partition pruning predicates only supported for partitioned tables.") - private[sql] override lazy val metrics = Map( + override lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) override def producedAttributes: AttributeSet = outputSet ++ -- cgit v1.2.3