diff options
Diffstat (limited to 'sql/core')
24 files changed, 361 insertions, 353 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 221782ee8f..d4290fee0a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -712,13 +712,13 @@ class SQLContext private[sql]( } /** - * :: Experimental :: - * Creates a [[Dataset]] with a single [[LongType]] column named `id`, containing elements - * in an range from `start` to `end` (exclusive) with an step value. - * - * @since 2.0.0 - * @group dataset - */ + * :: Experimental :: + * Creates a [[Dataset]] with a single [[LongType]] column named `id`, containing elements + * in an range from `start` to `end` (exclusive) with an step value. + * + * @since 2.0.0 + * @group dataset + */ @Experimental def range(start: Long, end: Long, step: Long): Dataset[java.lang.Long] = { range(start, end, step, numPartitions = sparkContext.defaultParallelism) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala index f3478a873a..124ec09efd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala @@ -109,9 +109,10 @@ private[sql] class CacheManager extends Logging { cachedData.remove(dataIndex) } - /** Tries to remove the data for the given [[Dataset]] from the cache - * if it's cached - */ + /** + * Tries to remove the data for the given [[Dataset]] from the cache + * if it's cached + */ private[sql] def tryUncacheQuery( query: Dataset[_], blocking: Boolean = true): Boolean = writeLock { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index b1b3d4ac81..ff19d1be1c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -84,8 +84,8 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ private[sql] def metrics: Map[String, SQLMetric[_, _]] = Map.empty /** - * Reset all the metrics. - */ + * Reset all the metrics. + */ private[sql] def resetMetrics(): Unit = { metrics.valuesIterator.foreach(_.reset()) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala index 9bdf611f6e..9f539c4929 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala @@ -31,8 +31,8 @@ import org.apache.spark.sql.execution.metric.{LongSQLMetricValue, SQLMetrics} import org.apache.spark.sql.internal.SQLConf /** - * An interface for those physical operators that support codegen. - */ + * An interface for those physical operators that support codegen. + */ trait CodegenSupport extends SparkPlan { /** Prefix used in the current operator's variable names. */ @@ -46,10 +46,10 @@ trait CodegenSupport extends SparkPlan { } /** - * Creates a metric using the specified name. - * - * @return name of the variable representing the metric - */ + * Creates a metric using the specified name. + * + * @return name of the variable representing the metric + */ def metricTerm(ctx: CodegenContext, name: String): String = { val metric = ctx.addReferenceObj(name, longMetric(name)) val value = ctx.freshName("metricValue") @@ -59,25 +59,25 @@ trait CodegenSupport extends SparkPlan { } /** - * Whether this SparkPlan support whole stage codegen or not. - */ + * Whether this SparkPlan support whole stage codegen or not. + */ def supportCodegen: Boolean = true /** - * Which SparkPlan is calling produce() of this one. It's itself for the first SparkPlan. - */ + * Which SparkPlan is calling produce() of this one. It's itself for the first SparkPlan. + */ protected var parent: CodegenSupport = null /** - * Returns all the RDDs of InternalRow which generates the input rows. - * - * Note: right now we support up to two RDDs. - */ + * Returns all the RDDs of InternalRow which generates the input rows. + * + * Note: right now we support up to two RDDs. + */ def upstreams(): Seq[RDD[InternalRow]] /** - * Returns Java source code to process the rows from upstream. - */ + * Returns Java source code to process the rows from upstream. + */ final def produce(ctx: CodegenContext, parent: CodegenSupport): String = { this.parent = parent ctx.freshNamePrefix = variablePrefix @@ -89,28 +89,28 @@ trait CodegenSupport extends SparkPlan { } /** - * Generate the Java source code to process, should be overridden by subclass to support codegen. - * - * doProduce() usually generate the framework, for example, aggregation could generate this: - * - * if (!initialized) { - * # create a hash map, then build the aggregation hash map - * # call child.produce() - * initialized = true; - * } - * while (hashmap.hasNext()) { - * row = hashmap.next(); - * # build the aggregation results - * # create variables for results - * # call consume(), which will call parent.doConsume() + * Generate the Java source code to process, should be overridden by subclass to support codegen. + * + * doProduce() usually generate the framework, for example, aggregation could generate this: + * + * if (!initialized) { + * # create a hash map, then build the aggregation hash map + * # call child.produce() + * initialized = true; + * } + * while (hashmap.hasNext()) { + * row = hashmap.next(); + * # build the aggregation results + * # create variables for results + * # call consume(), which will call parent.doConsume() * if (shouldStop()) return; - * } - */ + * } + */ protected def doProduce(ctx: CodegenContext): String /** - * Consume the generated columns or row from current SparkPlan, call it's parent's doConsume(). - */ + * Consume the generated columns or row from current SparkPlan, call it's parent's doConsume(). + */ final def consume(ctx: CodegenContext, outputVars: Seq[ExprCode], row: String = null): String = { val inputVars = if (row != null) { @@ -158,9 +158,9 @@ trait CodegenSupport extends SparkPlan { } /** - * Returns source code to evaluate all the variables, and clear the code of them, to prevent - * them to be evaluated twice. - */ + * Returns source code to evaluate all the variables, and clear the code of them, to prevent + * them to be evaluated twice. + */ protected def evaluateVariables(variables: Seq[ExprCode]): String = { val evaluate = variables.filter(_.code != "").map(_.code.trim).mkString("\n") variables.foreach(_.code = "") @@ -168,9 +168,9 @@ trait CodegenSupport extends SparkPlan { } /** - * Returns source code to evaluate the variables for required attributes, and clear the code - * of evaluated variables, to prevent them to be evaluated twice.. - */ + * Returns source code to evaluate the variables for required attributes, and clear the code + * of evaluated variables, to prevent them to be evaluated twice.. + */ protected def evaluateRequiredVariables( attributes: Seq[Attribute], variables: Seq[ExprCode], @@ -194,18 +194,18 @@ trait CodegenSupport extends SparkPlan { def usedInputs: AttributeSet = references /** - * Generate the Java source code to process the rows from child SparkPlan. - * - * This should be override by subclass to support codegen. - * - * For example, Filter will generate the code like this: - * - * # code to evaluate the predicate expression, result is isNull1 and value2 - * if (isNull1 || !value2) continue; - * # call consume(), which will call parent.doConsume() - * - * Note: A plan can either consume the rows as UnsafeRow (row), or a list of variables (input). - */ + * Generate the Java source code to process the rows from child SparkPlan. + * + * This should be override by subclass to support codegen. + * + * For example, Filter will generate the code like this: + * + * # code to evaluate the predicate expression, result is isNull1 and value2 + * if (isNull1 || !value2) continue; + * # call consume(), which will call parent.doConsume() + * + * Note: A plan can either consume the rows as UnsafeRow (row), or a list of variables (input). + */ def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = { throw new UnsupportedOperationException } @@ -213,11 +213,11 @@ trait CodegenSupport extends SparkPlan { /** - * InputAdapter is used to hide a SparkPlan from a subtree that support codegen. - * - * This is the leaf node of a tree with WholeStageCodegen, is used to generate code that consumes - * an RDD iterator of InternalRow. - */ + * InputAdapter is used to hide a SparkPlan from a subtree that support codegen. + * + * This is the leaf node of a tree with WholeStageCodegen, is used to generate code that consumes + * an RDD iterator of InternalRow. + */ case class InputAdapter(child: SparkPlan) extends UnaryNode with CodegenSupport { override def output: Seq[Attribute] = child.output @@ -260,33 +260,33 @@ object WholeStageCodegen { } /** - * WholeStageCodegen compile a subtree of plans that support codegen together into single Java - * function. - * - * Here is the call graph of to generate Java source (plan A support codegen, but plan B does not): - * - * WholeStageCodegen Plan A FakeInput Plan B - * ========================================================================= - * - * -> execute() - * | - * doExecute() ---------> upstreams() -------> upstreams() ------> execute() - * | - * +-----------------> produce() - * | - * doProduce() -------> produce() - * | - * doProduce() - * | - * doConsume() <--------- consume() - * | - * doConsume() <-------- consume() - * - * SparkPlan A should override doProduce() and doConsume(). - * - * doCodeGen() will create a CodeGenContext, which will hold a list of variables for input, - * used to generated code for BoundReference. - */ + * WholeStageCodegen compile a subtree of plans that support codegen together into single Java + * function. + * + * Here is the call graph of to generate Java source (plan A support codegen, but plan B does not): + * + * WholeStageCodegen Plan A FakeInput Plan B + * ========================================================================= + * + * -> execute() + * | + * doExecute() ---------> upstreams() -------> upstreams() ------> execute() + * | + * +-----------------> produce() + * | + * doProduce() -------> produce() + * | + * doProduce() + * | + * doConsume() <--------- consume() + * | + * doConsume() <-------- consume() + * + * SparkPlan A should override doProduce() and doConsume(). + * + * doCodeGen() will create a CodeGenContext, which will hold a list of variables for input, + * used to generated code for BoundReference. + */ case class WholeStageCodegen(child: SparkPlan) extends UnaryNode with CodegenSupport { override def output: Seq[Attribute] = child.output @@ -422,8 +422,8 @@ case class WholeStageCodegen(child: SparkPlan) extends UnaryNode with CodegenSup /** - * Find the chained plans that support codegen, collapse them together as WholeStageCodegen. - */ + * Find the chained plans that support codegen, collapse them together as WholeStageCodegen. + */ case class CollapseCodegenStages(conf: SQLConf) extends Rule[SparkPlan] { private def supportCodegen(e: Expression): Boolean = e match { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala index 7d0567842c..806089196c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala @@ -444,8 +444,8 @@ private[execution] final case class RangeBoundOrdering( } /** - * The interface of row buffer for a partition - */ + * The interface of row buffer for a partition + */ private[execution] abstract class RowBuffer { /** Number of rows. */ @@ -462,8 +462,8 @@ private[execution] abstract class RowBuffer { } /** - * A row buffer based on ArrayBuffer (the number of rows is limited) - */ + * A row buffer based on ArrayBuffer (the number of rows is limited) + */ private[execution] class ArrayRowBuffer(buffer: ArrayBuffer[UnsafeRow]) extends RowBuffer { private[this] var cursor: Int = -1 @@ -493,8 +493,8 @@ private[execution] class ArrayRowBuffer(buffer: ArrayBuffer[UnsafeRow]) extends } /** - * An external buffer of rows based on UnsafeExternalSorter - */ + * An external buffer of rows based on UnsafeExternalSorter + */ private[execution] class ExternalRowBuffer(sorter: UnsafeExternalSorter, numFields: Int) extends RowBuffer { @@ -654,12 +654,16 @@ private[execution] final class SlidingWindowFunctionFrame( /** The rows within current sliding window. */ private[this] val buffer = new util.ArrayDeque[InternalRow]() - /** Index of the first input row with a value greater than the upper bound of the current - * output row. */ + /** + * Index of the first input row with a value greater than the upper bound of the current + * output row. + */ private[this] var inputHighIndex = 0 - /** Index of the first input row with a value equal to or greater than the lower bound of the - * current output row. */ + /** + * Index of the first input row with a value equal to or greater than the lower bound of the + * current output row. + */ private[this] var inputLowIndex = 0 /** Prepare the frame for calculating a new partition. Reset all variables. */ @@ -763,8 +767,10 @@ private[execution] final class UnboundedPrecedingWindowFunctionFrame( /** The next row from `input`. */ private[this] var nextRow: InternalRow = null - /** Index of the first input row with a value greater than the upper bound of the current - * output row. */ + /** + * Index of the first input row with a value greater than the upper bound of the current + * output row. + */ private[this] var inputIndex = 0 /** Prepare the frame for calculating a new partition. */ @@ -819,8 +825,10 @@ private[execution] final class UnboundedFollowingWindowFunctionFrame( /** Rows of the partition currently being processed. */ private[this] var input: RowBuffer = null - /** Index of the first input row with a value equal to or greater than the lower bound of the - * current output row. */ + /** + * Index of the first input row with a value equal to or greater than the lower bound of the + * current output row. + */ private[this] var inputIndex = 0 /** Prepare the frame for calculating a new partition. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala index 15627a7004..042c731901 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala @@ -47,17 +47,17 @@ abstract class AggregationIterator( /////////////////////////////////////////////////////////////////////////// /** - * The following combinations of AggregationMode are supported: - * - Partial - * - PartialMerge (for single distinct) - * - Partial and PartialMerge (for single distinct) - * - Final - * - Complete (for SortBasedAggregate with functions that does not support Partial) - * - Final and Complete (currently not used) - * - * TODO: AggregateMode should have only two modes: Update and Merge, AggregateExpression - * could have a flag to tell it's final or not. - */ + * The following combinations of AggregationMode are supported: + * - Partial + * - PartialMerge (for single distinct) + * - Partial and PartialMerge (for single distinct) + * - Final + * - Complete (for SortBasedAggregate with functions that does not support Partial) + * - Final and Complete (currently not used) + * + * TODO: AggregateMode should have only two modes: Update and Merge, AggregateExpression + * could have a flag to tell it's final or not. + */ { val modes = aggregateExpressions.map(_.mode).distinct.toSet require(modes.size <= 2, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala index 8f974980bb..de1491d357 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala @@ -46,9 +46,9 @@ class SortBasedAggregationIterator( newMutableProjection) { /** - * Creates a new aggregation buffer and initializes buffer values - * for all aggregate functions. - */ + * Creates a new aggregation buffer and initializes buffer values + * for all aggregate functions. + */ private def newBuffer: MutableRow = { val bufferSchema = aggregateFunctions.flatMap(_.aggBufferAttributes) val bufferRowSize: Int = bufferSchema.length diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala index 7c215d1b96..60027edc7c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala @@ -266,8 +266,8 @@ case class TungstenAggregate( private var sorterTerm: String = _ /** - * This is called by generated Java class, should be public. - */ + * This is called by generated Java class, should be public. + */ def createHashMap(): UnsafeFixedWidthAggregationMap = { // create initialized aggregate buffer val initExpr = declFunctions.flatMap(f => f.initialValues) @@ -286,15 +286,15 @@ case class TungstenAggregate( } /** - * This is called by generated Java class, should be public. - */ + * This is called by generated Java class, should be public. + */ def createUnsafeJoiner(): UnsafeRowJoiner = { GenerateUnsafeRowJoiner.create(groupingKeySchema, bufferSchema) } /** - * Called by generated Java class to finish the aggregate and return a KVIterator. - */ + * Called by generated Java class to finish the aggregate and return a KVIterator. + */ def finishAggregate( hashMap: UnsafeFixedWidthAggregationMap, sorter: UnsafeKVExternalSorter): KVIterator[UnsafeRow, UnsafeRow] = { @@ -372,8 +372,8 @@ case class TungstenAggregate( } /** - * Generate the code for output. - */ + * Generate the code for output. + */ private def generateResultCode( ctx: CodegenContext, keyTerm: String, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala index f3514cd14c..159fdc99dd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala @@ -168,10 +168,10 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag]( private[this] var reader: RecordReader[Void, V] = null /** - * If the format is ParquetInputFormat, try to create the optimized RecordReader. If this - * fails (for example, unsupported schema), try with the normal reader. - * TODO: plumb this through a different way? - */ + * If the format is ParquetInputFormat, try to create the optimized RecordReader. If this + * fails (for example, unsupported schema), try with the normal reader. + * TODO: plumb this through a different way? + */ if (enableVectorizedParquetReader && format.getClass.getName == "org.apache.parquet.hadoop.ParquetInputFormat") { val parquetReader: VectorizedParquetRecordReader = new VectorizedParquetRecordReader() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala index 797f740dc5..ea843a1013 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala @@ -33,11 +33,11 @@ import org.apache.spark.unsafe.types.UTF8String private[csv] object CSVInferSchema { /** - * Similar to the JSON schema inference - * 1. Infer type of each row - * 2. Merge row types to find common type - * 3. Replace any null types with string type - */ + * Similar to the JSON schema inference + * 1. Infer type of each row + * 2. Merge row types to find common type + * 3. Replace any null types with string type + */ def infer( tokenRdd: RDD[Array[String]], header: Array[String], @@ -75,9 +75,9 @@ private[csv] object CSVInferSchema { } /** - * Infer type of string field. Given known type Double, and a string "1", there is no - * point checking if it is an Int, as the final type must be Double or higher. - */ + * Infer type of string field. Given known type Double, and a string "1", there is no + * point checking if it is an Int, as the final type must be Double or higher. + */ def inferField(typeSoFar: DataType, field: String, nullValue: String = ""): DataType = { if (field == null || field.isEmpty || field == nullValue) { typeSoFar @@ -142,9 +142,9 @@ private[csv] object CSVInferSchema { private val numericPrecedence: IndexedSeq[DataType] = HiveTypeCoercion.numericPrecedence /** - * Copied from internal Spark api - * [[org.apache.spark.sql.catalyst.analysis.HiveTypeCoercion]] - */ + * Copied from internal Spark api + * [[org.apache.spark.sql.catalyst.analysis.HiveTypeCoercion]] + */ val findTightestCommonType: (DataType, DataType) => Option[DataType] = { case (t1, t2) if t1 == t2 => Some(t1) case (NullType, t1) => Some(t1) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala index c0d6f6fbf7..34fcbdf871 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala @@ -38,8 +38,8 @@ import org.apache.spark.util.SerializableConfiguration import org.apache.spark.util.collection.BitSet /** - * Provides access to CSV data from pure SQL statements. - */ + * Provides access to CSV data from pure SQL statements. + */ class DefaultSource extends FileFormat with DataSourceRegister { override def shortName(): String = "csv" diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala index 877e159fbd..2e88d588be 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala @@ -51,11 +51,11 @@ case class DescribeCommand( } /** - * Used to represent the operation of create table using a data source. + * Used to represent the operation of create table using a data source. * - * @param allowExisting If it is true, we will do nothing when the table already exists. - * If it is false, an exception will be thrown - */ + * @param allowExisting If it is true, we will do nothing when the table already exists. + * If it is false, an exception will be thrown + */ case class CreateTableUsing( tableIdent: TableIdentifier, userSpecifiedSchema: Option[StructType], diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala index 0ed1ed41b0..41e566c27b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala @@ -122,8 +122,8 @@ case class BroadcastHashJoin( } /** - * Returns a tuple of Broadcast of HashedRelation and the variable name for it. - */ + * Returns a tuple of Broadcast of HashedRelation and the variable name for it. + */ private def prepareBroadcast(ctx: CodegenContext): (Broadcast[HashedRelation], String) = { // create a name for HashedRelation val broadcastRelation = buildPlan.executeBroadcast[HashedRelation]() @@ -139,9 +139,9 @@ case class BroadcastHashJoin( } /** - * Returns the code for generating join key for stream side, and expression of whether the key - * has any null in it or not. - */ + * Returns the code for generating join key for stream side, and expression of whether the key + * has any null in it or not. + */ private def genStreamSideJoinKey( ctx: CodegenContext, input: Seq[ExprCode]): (ExprCode, String) = { @@ -160,8 +160,8 @@ case class BroadcastHashJoin( } /** - * Generates the code for variable of build side. - */ + * Generates the code for variable of build side. + */ private def genBuildSideVars(ctx: CodegenContext, matched: String): Seq[ExprCode] = { ctx.currentVars = null ctx.INPUT_ROW = matched @@ -188,8 +188,8 @@ case class BroadcastHashJoin( } /** - * Generates the code for Inner join. - */ + * Generates the code for Inner join. + */ private def codegenInner(ctx: CodegenContext, input: Seq[ExprCode]): String = { val (broadcastRelation, relationTerm) = prepareBroadcast(ctx) val (keyEv, anyNull) = genStreamSideJoinKey(ctx, input) @@ -254,8 +254,8 @@ case class BroadcastHashJoin( /** - * Generates the code for left or right outer join. - */ + * Generates the code for left or right outer join. + */ private def codegenOuter(ctx: CodegenContext, input: Seq[ExprCode]): String = { val (broadcastRelation, relationTerm) = prepareBroadcast(ctx) val (keyEv, anyNull) = genStreamSideJoinKey(ctx, input) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala index fb65b50da8..edb4c5a16f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala @@ -28,10 +28,10 @@ import org.apache.spark.util.CompletionIterator import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter /** - * An optimized CartesianRDD for UnsafeRow, which will cache the rows from second child RDD, - * will be much faster than building the right partition for every row in left RDD, it also - * materialize the right RDD (in case of the right RDD is nondeterministic). - */ + * An optimized CartesianRDD for UnsafeRow, which will cache the rows from second child RDD, + * will be much faster than building the right partition for every row in left RDD, it also + * materialize the right RDD (in case of the right RDD is nondeterministic). + */ private[spark] class UnsafeCartesianRDD(left : RDD[UnsafeRow], right : RDD[UnsafeRow], numFieldsOfRight: Int) extends CartesianRDD[UnsafeRow, UnsafeRow](left.sparkContext, left, right) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala index 5f42d07273..c298b7dee0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala @@ -64,10 +64,10 @@ trait HashJoin { } /** - * Try to rewrite the key as LongType so we can use getLong(), if they key can fit with a long. - * - * If not, returns the original expressions. - */ + * Try to rewrite the key as LongType so we can use getLong(), if they key can fit with a long. + * + * If not, returns the original expressions. + */ def rewriteKeyExpr(keys: Seq[Expression]): Seq[Expression] = { var keyExpr: Expression = null var width = 0 diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala index dc4793e85a..91c470d187 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala @@ -38,20 +38,20 @@ import org.apache.spark.util.collection.CompactBuffer */ private[execution] sealed trait HashedRelation { /** - * Returns matched rows. - */ + * Returns matched rows. + */ def get(key: InternalRow): Seq[InternalRow] /** - * Returns matched rows for a key that has only one column with LongType. - */ + * Returns matched rows for a key that has only one column with LongType. + */ def get(key: Long): Seq[InternalRow] = { throw new UnsupportedOperationException } /** - * Returns the size of used memory. - */ + * Returns the size of used memory. + */ def getMemorySize: Long = 1L // to make the test happy /** @@ -77,20 +77,20 @@ private[execution] sealed trait HashedRelation { } /** - * Interface for a hashed relation that have only one row per key. - * - * We should call getValue() for better performance. - */ + * Interface for a hashed relation that have only one row per key. + * + * We should call getValue() for better performance. + */ private[execution] trait UniqueHashedRelation extends HashedRelation { /** - * Returns the matched single row. - */ + * Returns the matched single row. + */ def getValue(key: InternalRow): InternalRow /** - * Returns the matched single row with key that have only one column of LongType. - */ + * Returns the matched single row with key that have only one column of LongType. + */ def getValue(key: Long): InternalRow = { throw new UnsupportedOperationException } @@ -345,8 +345,8 @@ private[joins] object UnsafeHashedRelation { } /** - * An interface for a hashed relation that the key is a Long. - */ + * An interface for a hashed relation that the key is a Long. + */ private[joins] trait LongHashedRelation extends HashedRelation { override def get(key: InternalRow): Seq[InternalRow] = { get(key.getLong(0)) @@ -396,26 +396,26 @@ private[joins] final class UniqueLongHashedRelation( } /** - * A relation that pack all the rows into a byte array, together with offsets and sizes. - * - * All the bytes of UnsafeRow are packed together as `bytes`: - * - * [ Row0 ][ Row1 ][] ... [ RowN ] - * - * With keys: - * - * start start+1 ... start+N - * - * `offsets` are offsets of UnsafeRows in the `bytes` - * `sizes` are the numbers of bytes of UnsafeRows, 0 means no row for this key. - * - * For example, two UnsafeRows (24 bytes and 32 bytes), with keys as 3 and 5 will stored as: - * - * start = 3 - * offsets = [0, 0, 24] - * sizes = [24, 0, 32] - * bytes = [0 - 24][][24 - 56] - */ + * A relation that pack all the rows into a byte array, together with offsets and sizes. + * + * All the bytes of UnsafeRow are packed together as `bytes`: + * + * [ Row0 ][ Row1 ][] ... [ RowN ] + * + * With keys: + * + * start start+1 ... start+N + * + * `offsets` are offsets of UnsafeRows in the `bytes` + * `sizes` are the numbers of bytes of UnsafeRows, 0 means no row for this key. + * + * For example, two UnsafeRows (24 bytes and 32 bytes), with keys as 3 and 5 will stored as: + * + * start = 3 + * offsets = [0, 0, 24] + * sizes = [24, 0, 32] + * bytes = [0 - 24][][24 - 56] + */ private[joins] final class LongArrayRelation( private var numFields: Int, private var start: Long, @@ -483,8 +483,8 @@ private[joins] final class LongArrayRelation( } /** - * Create hashed relation with key that is long. - */ + * Create hashed relation with key that is long. + */ private[joins] object LongHashedRelation { val DENSE_FACTOR = 0.2 diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala index 60bd8ea39a..0e7b2f2f31 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala @@ -256,9 +256,9 @@ case class SortMergeJoin( } /** - * Generate a function to scan both left and right to find a match, returns the term for - * matched one row from left side and buffered rows from right side. - */ + * Generate a function to scan both left and right to find a match, returns the term for + * matched one row from left side and buffered rows from right side. + */ private def genScanner(ctx: CodegenContext): (String, String) = { // Create class member for next row from both sides. val leftRow = ctx.freshName("leftRow") @@ -341,12 +341,12 @@ case class SortMergeJoin( } /** - * Creates variables for left part of result row. - * - * In order to defer the access after condition and also only access once in the loop, - * the variables should be declared separately from accessing the columns, we can't use the - * codegen of BoundReference here. - */ + * Creates variables for left part of result row. + * + * In order to defer the access after condition and also only access once in the loop, + * the variables should be declared separately from accessing the columns, we can't use the + * codegen of BoundReference here. + */ private def createLeftVars(ctx: CodegenContext, leftRow: String): Seq[ExprCode] = { ctx.INPUT_ROW = leftRow left.output.zipWithIndex.map { case (a, i) => @@ -370,9 +370,9 @@ case class SortMergeJoin( } /** - * Creates the variables for right part of result row, using BoundReference, since the right - * part are accessed inside the loop. - */ + * Creates the variables for right part of result row, using BoundReference, since the right + * part are accessed inside the loop. + */ private def createRightVar(ctx: CodegenContext, rightRow: String): Seq[ExprCode] = { ctx.INPUT_ROW = rightRow right.output.zipWithIndex.map { case (a, i) => @@ -381,12 +381,12 @@ case class SortMergeJoin( } /** - * Splits variables based on whether it's used by condition or not, returns the code to create - * these variables before the condition and after the condition. - * - * Only a few columns are used by condition, then we can skip the accessing of those columns - * that are not used by condition also filtered out by condition. - */ + * Splits variables based on whether it's used by condition or not, returns the code to create + * these variables before the condition and after the condition. + * + * Only a few columns are used by condition, then we can skip the accessing of those columns + * that are not used by condition also filtered out by condition. + */ private def splitVarsByCondition( attributes: Seq[Attribute], variables: Seq[ExprCode]): (String, String) = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala index 998eb82de1..8ece3c971a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala @@ -468,10 +468,10 @@ private[state] class HDFSBackedStateStoreProvider( } /** - * Clean up old snapshots and delta files that are not needed any more. It ensures that last - * few versions of the store can be recovered from the files, so re-executed RDD operations - * can re-apply updates on the past versions of the store. - */ + * Clean up old snapshots and delta files that are not needed any more. It ensures that last + * few versions of the store can be recovered from the files, so re-executed RDD operations + * can re-apply updates on the past versions of the store. + */ private[state] def cleanup(): Unit = { try { val files = fetchFiles() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala index 24a01f5be1..012b125d6b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala @@ -45,8 +45,8 @@ private[ui] case class SparkPlanGraph( } /** - * All the SparkPlanGraphNodes, including those inside of WholeStageCodegen. - */ + * All the SparkPlanGraphNodes, including those inside of WholeStageCodegen. + */ val allNodes: Seq[SparkPlanGraphNode] = { nodes.flatMap { case cluster: SparkPlanGraphCluster => cluster.nodes :+ cluster diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index baf947d037..da58ba2add 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -332,95 +332,94 @@ object functions { } /** - * Aggregate function: returns the first value in a group. - * - * The function by default returns the first values it sees. It will return the first non-null - * value it sees when ignoreNulls is set to true. If all values are null, then null is returned. - * - * @group agg_funcs - * @since 2.0.0 - */ + * Aggregate function: returns the first value in a group. + * + * The function by default returns the first values it sees. It will return the first non-null + * value it sees when ignoreNulls is set to true. If all values are null, then null is returned. + * + * @group agg_funcs + * @since 2.0.0 + */ def first(e: Column, ignoreNulls: Boolean): Column = withAggregateFunction { new First(e.expr, Literal(ignoreNulls)) } /** - * Aggregate function: returns the first value of a column in a group. - * - * The function by default returns the first values it sees. It will return the first non-null - * value it sees when ignoreNulls is set to true. If all values are null, then null is returned. - * - * @group agg_funcs - * @since 2.0.0 - */ + * Aggregate function: returns the first value of a column in a group. + * + * The function by default returns the first values it sees. It will return the first non-null + * value it sees when ignoreNulls is set to true. If all values are null, then null is returned. + * + * @group agg_funcs + * @since 2.0.0 + */ def first(columnName: String, ignoreNulls: Boolean): Column = { first(Column(columnName), ignoreNulls) } /** - * Aggregate function: returns the first value in a group. - * - * The function by default returns the first values it sees. It will return the first non-null - * value it sees when ignoreNulls is set to true. If all values are null, then null is returned. - * - * @group agg_funcs - * @since 1.3.0 - */ + * Aggregate function: returns the first value in a group. + * + * The function by default returns the first values it sees. It will return the first non-null + * value it sees when ignoreNulls is set to true. If all values are null, then null is returned. + * + * @group agg_funcs + * @since 1.3.0 + */ def first(e: Column): Column = first(e, ignoreNulls = false) /** - * Aggregate function: returns the first value of a column in a group. - * - * The function by default returns the first values it sees. It will return the first non-null - * value it sees when ignoreNulls is set to true. If all values are null, then null is returned. - * - * @group agg_funcs - * @since 1.3.0 - */ + * Aggregate function: returns the first value of a column in a group. + * + * The function by default returns the first values it sees. It will return the first non-null + * value it sees when ignoreNulls is set to true. If all values are null, then null is returned. + * + * @group agg_funcs + * @since 1.3.0 + */ def first(columnName: String): Column = first(Column(columnName)) - /** - * Aggregate function: indicates whether a specified column in a GROUP BY list is aggregated - * or not, returns 1 for aggregated or 0 for not aggregated in the result set. - * - * @group agg_funcs - * @since 2.0.0 - */ + * Aggregate function: indicates whether a specified column in a GROUP BY list is aggregated + * or not, returns 1 for aggregated or 0 for not aggregated in the result set. + * + * @group agg_funcs + * @since 2.0.0 + */ def grouping(e: Column): Column = Column(Grouping(e.expr)) /** - * Aggregate function: indicates whether a specified column in a GROUP BY list is aggregated - * or not, returns 1 for aggregated or 0 for not aggregated in the result set. - * - * @group agg_funcs - * @since 2.0.0 - */ + * Aggregate function: indicates whether a specified column in a GROUP BY list is aggregated + * or not, returns 1 for aggregated or 0 for not aggregated in the result set. + * + * @group agg_funcs + * @since 2.0.0 + */ def grouping(columnName: String): Column = grouping(Column(columnName)) /** - * Aggregate function: returns the level of grouping, equals to - * - * (grouping(c1) << (n-1)) + (grouping(c2) << (n-2)) + ... + grouping(cn) - * - * Note: the list of columns should match with grouping columns exactly, or empty (means all the - * grouping columns). - * - * @group agg_funcs - * @since 2.0.0 - */ + * Aggregate function: returns the level of grouping, equals to + * + * (grouping(c1) << (n-1)) + (grouping(c2) << (n-2)) + ... + grouping(cn) + * + * Note: the list of columns should match with grouping columns exactly, or empty (means all the + * grouping columns). + * + * @group agg_funcs + * @since 2.0.0 + */ def grouping_id(cols: Column*): Column = Column(GroupingID(cols.map(_.expr))) /** - * Aggregate function: returns the level of grouping, equals to - * - * (grouping(c1) << (n-1)) + (grouping(c2) << (n-2)) + ... + grouping(cn) - * - * Note: the list of columns should match with grouping columns exactly. - * - * @group agg_funcs - * @since 2.0.0 - */ + * Aggregate function: returns the level of grouping, equals to + * + * (grouping(c1) << (n-1)) + (grouping(c2) << (n-2)) + ... + grouping(cn) + * + * Note: the list of columns should match with grouping columns exactly. + * + * @group agg_funcs + * @since 2.0.0 + */ def grouping_id(colName: String, colNames: String*): Column = { grouping_id((Seq(colName) ++ colNames).map(n => Column(n)) : _*) } @@ -442,51 +441,51 @@ object functions { def kurtosis(columnName: String): Column = kurtosis(Column(columnName)) /** - * Aggregate function: returns the last value in a group. - * - * The function by default returns the last values it sees. It will return the last non-null - * value it sees when ignoreNulls is set to true. If all values are null, then null is returned. - * - * @group agg_funcs - * @since 2.0.0 - */ + * Aggregate function: returns the last value in a group. + * + * The function by default returns the last values it sees. It will return the last non-null + * value it sees when ignoreNulls is set to true. If all values are null, then null is returned. + * + * @group agg_funcs + * @since 2.0.0 + */ def last(e: Column, ignoreNulls: Boolean): Column = withAggregateFunction { new Last(e.expr, Literal(ignoreNulls)) } /** - * Aggregate function: returns the last value of the column in a group. - * - * The function by default returns the last values it sees. It will return the last non-null - * value it sees when ignoreNulls is set to true. If all values are null, then null is returned. - * - * @group agg_funcs - * @since 2.0.0 - */ + * Aggregate function: returns the last value of the column in a group. + * + * The function by default returns the last values it sees. It will return the last non-null + * value it sees when ignoreNulls is set to true. If all values are null, then null is returned. + * + * @group agg_funcs + * @since 2.0.0 + */ def last(columnName: String, ignoreNulls: Boolean): Column = { last(Column(columnName), ignoreNulls) } /** - * Aggregate function: returns the last value in a group. - * - * The function by default returns the last values it sees. It will return the last non-null - * value it sees when ignoreNulls is set to true. If all values are null, then null is returned. - * - * @group agg_funcs - * @since 1.3.0 - */ + * Aggregate function: returns the last value in a group. + * + * The function by default returns the last values it sees. It will return the last non-null + * value it sees when ignoreNulls is set to true. If all values are null, then null is returned. + * + * @group agg_funcs + * @since 1.3.0 + */ def last(e: Column): Column = last(e, ignoreNulls = false) /** - * Aggregate function: returns the last value of the column in a group. - * - * The function by default returns the last values it sees. It will return the last non-null - * value it sees when ignoreNulls is set to true. If all values are null, then null is returned. - * - * @group agg_funcs - * @since 1.3.0 - */ + * Aggregate function: returns the last value of the column in a group. + * + * The function by default returns the last values it sees. It will return the last non-null + * value it sees when ignoreNulls is set to true. If all values are null, then null is returned. + * + * @group agg_funcs + * @since 1.3.0 + */ def last(columnName: String): Column = last(Column(columnName), ignoreNulls = false) /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index e8834d052c..14e14710f6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -152,19 +152,19 @@ trait StreamSinkProvider { @DeveloperApi trait CreatableRelationProvider { /** - * Creates a relation with the given parameters based on the contents of the given - * DataFrame. The mode specifies the expected behavior of createRelation when - * data already exists. - * Right now, there are three modes, Append, Overwrite, and ErrorIfExists. - * Append mode means that when saving a DataFrame to a data source, if data already exists, - * contents of the DataFrame are expected to be appended to existing data. - * Overwrite mode means that when saving a DataFrame to a data source, if data already exists, - * existing data is expected to be overwritten by the contents of the DataFrame. - * ErrorIfExists mode means that when saving a DataFrame to a data source, - * if data already exists, an exception is expected to be thrown. - * - * @since 1.3.0 - */ + * Creates a relation with the given parameters based on the contents of the given + * DataFrame. The mode specifies the expected behavior of createRelation when + * data already exists. + * Right now, there are three modes, Append, Overwrite, and ErrorIfExists. + * Append mode means that when saving a DataFrame to a data source, if data already exists, + * contents of the DataFrame are expected to be appended to existing data. + * Overwrite mode means that when saving a DataFrame to a data source, if data already exists, + * existing data is expected to be overwritten by the contents of the DataFrame. + * ErrorIfExists mode means that when saving a DataFrame to a data source, + * if data already exists, an exception is expected to be thrown. + * + * @since 1.3.0 + */ def createRelation( sqlContext: SQLContext, mode: SaveMode, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala index 854a662cc4..d160f8ab8c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala @@ -286,8 +286,8 @@ abstract class QueryTest extends PlanTest { } /** - * Asserts that a given [[Dataset]] does not have missing inputs in all the analyzed plans. - */ + * Asserts that a given [[Dataset]] does not have missing inputs in all the analyzed plans. + */ def assertEmptyMissingInput(query: Dataset[_]): Unit = { assert(query.queryExecution.analyzed.missingInput.isEmpty, s"The analyzed logical plan has missing inputs: ${query.queryExecution.analyzed}") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala index 55906793c0..289e1b6db9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala @@ -32,10 +32,10 @@ import org.apache.spark.unsafe.map.BytesToBytesMap import org.apache.spark.util.Benchmark /** - * Benchmark to measure whole stage codegen performance. - * To run this: - * build/sbt "sql/test-only *BenchmarkWholeStageCodegen" - */ + * Benchmark to measure whole stage codegen performance. + * To run this: + * build/sbt "sql/test-only *BenchmarkWholeStageCodegen" + */ class BenchmarkWholeStageCodegen extends SparkFunSuite { lazy val conf = new SparkConf().setMaster("local[1]").setAppName("benchmark") .set("spark.sql.shuffle.partitions", "1") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVParserSuite.scala index dc54883277..aaeecef5f3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVParserSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVParserSuite.scala @@ -20,8 +20,8 @@ package org.apache.spark.sql.execution.datasources.csv import org.apache.spark.SparkFunSuite /** - * test cases for StringIteratorReader - */ + * test cases for StringIteratorReader + */ class CSVParserSuite extends SparkFunSuite { private def readAll(iter: Iterator[String]) = { |