aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src
diff options
context:
space:
mode:
Diffstat (limited to 'sql/core/src')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala14
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala172
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala36
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala22
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala16
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala22
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala22
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala76
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala36
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/functions.scala191
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala26
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala8
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVParserSuite.scala4
24 files changed, 361 insertions, 353 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 221782ee8f..d4290fee0a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -712,13 +712,13 @@ class SQLContext private[sql](
}
/**
- * :: Experimental ::
- * Creates a [[Dataset]] with a single [[LongType]] column named `id`, containing elements
- * in an range from `start` to `end` (exclusive) with an step value.
- *
- * @since 2.0.0
- * @group dataset
- */
+ * :: Experimental ::
+ * Creates a [[Dataset]] with a single [[LongType]] column named `id`, containing elements
+ * in an range from `start` to `end` (exclusive) with an step value.
+ *
+ * @since 2.0.0
+ * @group dataset
+ */
@Experimental
def range(start: Long, end: Long, step: Long): Dataset[java.lang.Long] = {
range(start, end, step, numPartitions = sparkContext.defaultParallelism)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index f3478a873a..124ec09efd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -109,9 +109,10 @@ private[sql] class CacheManager extends Logging {
cachedData.remove(dataIndex)
}
- /** Tries to remove the data for the given [[Dataset]] from the cache
- * if it's cached
- */
+ /**
+ * Tries to remove the data for the given [[Dataset]] from the cache
+ * if it's cached
+ */
private[sql] def tryUncacheQuery(
query: Dataset[_],
blocking: Boolean = true): Boolean = writeLock {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index b1b3d4ac81..ff19d1be1c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -84,8 +84,8 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
private[sql] def metrics: Map[String, SQLMetric[_, _]] = Map.empty
/**
- * Reset all the metrics.
- */
+ * Reset all the metrics.
+ */
private[sql] def resetMetrics(): Unit = {
metrics.valuesIterator.foreach(_.reset())
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
index 9bdf611f6e..9f539c4929 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
@@ -31,8 +31,8 @@ import org.apache.spark.sql.execution.metric.{LongSQLMetricValue, SQLMetrics}
import org.apache.spark.sql.internal.SQLConf
/**
- * An interface for those physical operators that support codegen.
- */
+ * An interface for those physical operators that support codegen.
+ */
trait CodegenSupport extends SparkPlan {
/** Prefix used in the current operator's variable names. */
@@ -46,10 +46,10 @@ trait CodegenSupport extends SparkPlan {
}
/**
- * Creates a metric using the specified name.
- *
- * @return name of the variable representing the metric
- */
+ * Creates a metric using the specified name.
+ *
+ * @return name of the variable representing the metric
+ */
def metricTerm(ctx: CodegenContext, name: String): String = {
val metric = ctx.addReferenceObj(name, longMetric(name))
val value = ctx.freshName("metricValue")
@@ -59,25 +59,25 @@ trait CodegenSupport extends SparkPlan {
}
/**
- * Whether this SparkPlan support whole stage codegen or not.
- */
+ * Whether this SparkPlan support whole stage codegen or not.
+ */
def supportCodegen: Boolean = true
/**
- * Which SparkPlan is calling produce() of this one. It's itself for the first SparkPlan.
- */
+ * Which SparkPlan is calling produce() of this one. It's itself for the first SparkPlan.
+ */
protected var parent: CodegenSupport = null
/**
- * Returns all the RDDs of InternalRow which generates the input rows.
- *
- * Note: right now we support up to two RDDs.
- */
+ * Returns all the RDDs of InternalRow which generates the input rows.
+ *
+ * Note: right now we support up to two RDDs.
+ */
def upstreams(): Seq[RDD[InternalRow]]
/**
- * Returns Java source code to process the rows from upstream.
- */
+ * Returns Java source code to process the rows from upstream.
+ */
final def produce(ctx: CodegenContext, parent: CodegenSupport): String = {
this.parent = parent
ctx.freshNamePrefix = variablePrefix
@@ -89,28 +89,28 @@ trait CodegenSupport extends SparkPlan {
}
/**
- * Generate the Java source code to process, should be overridden by subclass to support codegen.
- *
- * doProduce() usually generate the framework, for example, aggregation could generate this:
- *
- * if (!initialized) {
- * # create a hash map, then build the aggregation hash map
- * # call child.produce()
- * initialized = true;
- * }
- * while (hashmap.hasNext()) {
- * row = hashmap.next();
- * # build the aggregation results
- * # create variables for results
- * # call consume(), which will call parent.doConsume()
+ * Generate the Java source code to process, should be overridden by subclass to support codegen.
+ *
+ * doProduce() usually generate the framework, for example, aggregation could generate this:
+ *
+ * if (!initialized) {
+ * # create a hash map, then build the aggregation hash map
+ * # call child.produce()
+ * initialized = true;
+ * }
+ * while (hashmap.hasNext()) {
+ * row = hashmap.next();
+ * # build the aggregation results
+ * # create variables for results
+ * # call consume(), which will call parent.doConsume()
* if (shouldStop()) return;
- * }
- */
+ * }
+ */
protected def doProduce(ctx: CodegenContext): String
/**
- * Consume the generated columns or row from current SparkPlan, call it's parent's doConsume().
- */
+ * Consume the generated columns or row from current SparkPlan, call it's parent's doConsume().
+ */
final def consume(ctx: CodegenContext, outputVars: Seq[ExprCode], row: String = null): String = {
val inputVars =
if (row != null) {
@@ -158,9 +158,9 @@ trait CodegenSupport extends SparkPlan {
}
/**
- * Returns source code to evaluate all the variables, and clear the code of them, to prevent
- * them to be evaluated twice.
- */
+ * Returns source code to evaluate all the variables, and clear the code of them, to prevent
+ * them to be evaluated twice.
+ */
protected def evaluateVariables(variables: Seq[ExprCode]): String = {
val evaluate = variables.filter(_.code != "").map(_.code.trim).mkString("\n")
variables.foreach(_.code = "")
@@ -168,9 +168,9 @@ trait CodegenSupport extends SparkPlan {
}
/**
- * Returns source code to evaluate the variables for required attributes, and clear the code
- * of evaluated variables, to prevent them to be evaluated twice..
- */
+ * Returns source code to evaluate the variables for required attributes, and clear the code
+ * of evaluated variables, to prevent them to be evaluated twice..
+ */
protected def evaluateRequiredVariables(
attributes: Seq[Attribute],
variables: Seq[ExprCode],
@@ -194,18 +194,18 @@ trait CodegenSupport extends SparkPlan {
def usedInputs: AttributeSet = references
/**
- * Generate the Java source code to process the rows from child SparkPlan.
- *
- * This should be override by subclass to support codegen.
- *
- * For example, Filter will generate the code like this:
- *
- * # code to evaluate the predicate expression, result is isNull1 and value2
- * if (isNull1 || !value2) continue;
- * # call consume(), which will call parent.doConsume()
- *
- * Note: A plan can either consume the rows as UnsafeRow (row), or a list of variables (input).
- */
+ * Generate the Java source code to process the rows from child SparkPlan.
+ *
+ * This should be override by subclass to support codegen.
+ *
+ * For example, Filter will generate the code like this:
+ *
+ * # code to evaluate the predicate expression, result is isNull1 and value2
+ * if (isNull1 || !value2) continue;
+ * # call consume(), which will call parent.doConsume()
+ *
+ * Note: A plan can either consume the rows as UnsafeRow (row), or a list of variables (input).
+ */
def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = {
throw new UnsupportedOperationException
}
@@ -213,11 +213,11 @@ trait CodegenSupport extends SparkPlan {
/**
- * InputAdapter is used to hide a SparkPlan from a subtree that support codegen.
- *
- * This is the leaf node of a tree with WholeStageCodegen, is used to generate code that consumes
- * an RDD iterator of InternalRow.
- */
+ * InputAdapter is used to hide a SparkPlan from a subtree that support codegen.
+ *
+ * This is the leaf node of a tree with WholeStageCodegen, is used to generate code that consumes
+ * an RDD iterator of InternalRow.
+ */
case class InputAdapter(child: SparkPlan) extends UnaryNode with CodegenSupport {
override def output: Seq[Attribute] = child.output
@@ -260,33 +260,33 @@ object WholeStageCodegen {
}
/**
- * WholeStageCodegen compile a subtree of plans that support codegen together into single Java
- * function.
- *
- * Here is the call graph of to generate Java source (plan A support codegen, but plan B does not):
- *
- * WholeStageCodegen Plan A FakeInput Plan B
- * =========================================================================
- *
- * -> execute()
- * |
- * doExecute() ---------> upstreams() -------> upstreams() ------> execute()
- * |
- * +-----------------> produce()
- * |
- * doProduce() -------> produce()
- * |
- * doProduce()
- * |
- * doConsume() <--------- consume()
- * |
- * doConsume() <-------- consume()
- *
- * SparkPlan A should override doProduce() and doConsume().
- *
- * doCodeGen() will create a CodeGenContext, which will hold a list of variables for input,
- * used to generated code for BoundReference.
- */
+ * WholeStageCodegen compile a subtree of plans that support codegen together into single Java
+ * function.
+ *
+ * Here is the call graph of to generate Java source (plan A support codegen, but plan B does not):
+ *
+ * WholeStageCodegen Plan A FakeInput Plan B
+ * =========================================================================
+ *
+ * -> execute()
+ * |
+ * doExecute() ---------> upstreams() -------> upstreams() ------> execute()
+ * |
+ * +-----------------> produce()
+ * |
+ * doProduce() -------> produce()
+ * |
+ * doProduce()
+ * |
+ * doConsume() <--------- consume()
+ * |
+ * doConsume() <-------- consume()
+ *
+ * SparkPlan A should override doProduce() and doConsume().
+ *
+ * doCodeGen() will create a CodeGenContext, which will hold a list of variables for input,
+ * used to generated code for BoundReference.
+ */
case class WholeStageCodegen(child: SparkPlan) extends UnaryNode with CodegenSupport {
override def output: Seq[Attribute] = child.output
@@ -422,8 +422,8 @@ case class WholeStageCodegen(child: SparkPlan) extends UnaryNode with CodegenSup
/**
- * Find the chained plans that support codegen, collapse them together as WholeStageCodegen.
- */
+ * Find the chained plans that support codegen, collapse them together as WholeStageCodegen.
+ */
case class CollapseCodegenStages(conf: SQLConf) extends Rule[SparkPlan] {
private def supportCodegen(e: Expression): Boolean = e match {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
index 7d0567842c..806089196c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
@@ -444,8 +444,8 @@ private[execution] final case class RangeBoundOrdering(
}
/**
- * The interface of row buffer for a partition
- */
+ * The interface of row buffer for a partition
+ */
private[execution] abstract class RowBuffer {
/** Number of rows. */
@@ -462,8 +462,8 @@ private[execution] abstract class RowBuffer {
}
/**
- * A row buffer based on ArrayBuffer (the number of rows is limited)
- */
+ * A row buffer based on ArrayBuffer (the number of rows is limited)
+ */
private[execution] class ArrayRowBuffer(buffer: ArrayBuffer[UnsafeRow]) extends RowBuffer {
private[this] var cursor: Int = -1
@@ -493,8 +493,8 @@ private[execution] class ArrayRowBuffer(buffer: ArrayBuffer[UnsafeRow]) extends
}
/**
- * An external buffer of rows based on UnsafeExternalSorter
- */
+ * An external buffer of rows based on UnsafeExternalSorter
+ */
private[execution] class ExternalRowBuffer(sorter: UnsafeExternalSorter, numFields: Int)
extends RowBuffer {
@@ -654,12 +654,16 @@ private[execution] final class SlidingWindowFunctionFrame(
/** The rows within current sliding window. */
private[this] val buffer = new util.ArrayDeque[InternalRow]()
- /** Index of the first input row with a value greater than the upper bound of the current
- * output row. */
+ /**
+ * Index of the first input row with a value greater than the upper bound of the current
+ * output row.
+ */
private[this] var inputHighIndex = 0
- /** Index of the first input row with a value equal to or greater than the lower bound of the
- * current output row. */
+ /**
+ * Index of the first input row with a value equal to or greater than the lower bound of the
+ * current output row.
+ */
private[this] var inputLowIndex = 0
/** Prepare the frame for calculating a new partition. Reset all variables. */
@@ -763,8 +767,10 @@ private[execution] final class UnboundedPrecedingWindowFunctionFrame(
/** The next row from `input`. */
private[this] var nextRow: InternalRow = null
- /** Index of the first input row with a value greater than the upper bound of the current
- * output row. */
+ /**
+ * Index of the first input row with a value greater than the upper bound of the current
+ * output row.
+ */
private[this] var inputIndex = 0
/** Prepare the frame for calculating a new partition. */
@@ -819,8 +825,10 @@ private[execution] final class UnboundedFollowingWindowFunctionFrame(
/** Rows of the partition currently being processed. */
private[this] var input: RowBuffer = null
- /** Index of the first input row with a value equal to or greater than the lower bound of the
- * current output row. */
+ /**
+ * Index of the first input row with a value equal to or greater than the lower bound of the
+ * current output row.
+ */
private[this] var inputIndex = 0
/** Prepare the frame for calculating a new partition. */
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala
index 15627a7004..042c731901 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala
@@ -47,17 +47,17 @@ abstract class AggregationIterator(
///////////////////////////////////////////////////////////////////////////
/**
- * The following combinations of AggregationMode are supported:
- * - Partial
- * - PartialMerge (for single distinct)
- * - Partial and PartialMerge (for single distinct)
- * - Final
- * - Complete (for SortBasedAggregate with functions that does not support Partial)
- * - Final and Complete (currently not used)
- *
- * TODO: AggregateMode should have only two modes: Update and Merge, AggregateExpression
- * could have a flag to tell it's final or not.
- */
+ * The following combinations of AggregationMode are supported:
+ * - Partial
+ * - PartialMerge (for single distinct)
+ * - Partial and PartialMerge (for single distinct)
+ * - Final
+ * - Complete (for SortBasedAggregate with functions that does not support Partial)
+ * - Final and Complete (currently not used)
+ *
+ * TODO: AggregateMode should have only two modes: Update and Merge, AggregateExpression
+ * could have a flag to tell it's final or not.
+ */
{
val modes = aggregateExpressions.map(_.mode).distinct.toSet
require(modes.size <= 2,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala
index 8f974980bb..de1491d357 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala
@@ -46,9 +46,9 @@ class SortBasedAggregationIterator(
newMutableProjection) {
/**
- * Creates a new aggregation buffer and initializes buffer values
- * for all aggregate functions.
- */
+ * Creates a new aggregation buffer and initializes buffer values
+ * for all aggregate functions.
+ */
private def newBuffer: MutableRow = {
val bufferSchema = aggregateFunctions.flatMap(_.aggBufferAttributes)
val bufferRowSize: Int = bufferSchema.length
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
index 7c215d1b96..60027edc7c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
@@ -266,8 +266,8 @@ case class TungstenAggregate(
private var sorterTerm: String = _
/**
- * This is called by generated Java class, should be public.
- */
+ * This is called by generated Java class, should be public.
+ */
def createHashMap(): UnsafeFixedWidthAggregationMap = {
// create initialized aggregate buffer
val initExpr = declFunctions.flatMap(f => f.initialValues)
@@ -286,15 +286,15 @@ case class TungstenAggregate(
}
/**
- * This is called by generated Java class, should be public.
- */
+ * This is called by generated Java class, should be public.
+ */
def createUnsafeJoiner(): UnsafeRowJoiner = {
GenerateUnsafeRowJoiner.create(groupingKeySchema, bufferSchema)
}
/**
- * Called by generated Java class to finish the aggregate and return a KVIterator.
- */
+ * Called by generated Java class to finish the aggregate and return a KVIterator.
+ */
def finishAggregate(
hashMap: UnsafeFixedWidthAggregationMap,
sorter: UnsafeKVExternalSorter): KVIterator[UnsafeRow, UnsafeRow] = {
@@ -372,8 +372,8 @@ case class TungstenAggregate(
}
/**
- * Generate the code for output.
- */
+ * Generate the code for output.
+ */
private def generateResultCode(
ctx: CodegenContext,
keyTerm: String,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala
index f3514cd14c..159fdc99dd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala
@@ -168,10 +168,10 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag](
private[this] var reader: RecordReader[Void, V] = null
/**
- * If the format is ParquetInputFormat, try to create the optimized RecordReader. If this
- * fails (for example, unsupported schema), try with the normal reader.
- * TODO: plumb this through a different way?
- */
+ * If the format is ParquetInputFormat, try to create the optimized RecordReader. If this
+ * fails (for example, unsupported schema), try with the normal reader.
+ * TODO: plumb this through a different way?
+ */
if (enableVectorizedParquetReader &&
format.getClass.getName == "org.apache.parquet.hadoop.ParquetInputFormat") {
val parquetReader: VectorizedParquetRecordReader = new VectorizedParquetRecordReader()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala
index 797f740dc5..ea843a1013 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala
@@ -33,11 +33,11 @@ import org.apache.spark.unsafe.types.UTF8String
private[csv] object CSVInferSchema {
/**
- * Similar to the JSON schema inference
- * 1. Infer type of each row
- * 2. Merge row types to find common type
- * 3. Replace any null types with string type
- */
+ * Similar to the JSON schema inference
+ * 1. Infer type of each row
+ * 2. Merge row types to find common type
+ * 3. Replace any null types with string type
+ */
def infer(
tokenRdd: RDD[Array[String]],
header: Array[String],
@@ -75,9 +75,9 @@ private[csv] object CSVInferSchema {
}
/**
- * Infer type of string field. Given known type Double, and a string "1", there is no
- * point checking if it is an Int, as the final type must be Double or higher.
- */
+ * Infer type of string field. Given known type Double, and a string "1", there is no
+ * point checking if it is an Int, as the final type must be Double or higher.
+ */
def inferField(typeSoFar: DataType, field: String, nullValue: String = ""): DataType = {
if (field == null || field.isEmpty || field == nullValue) {
typeSoFar
@@ -142,9 +142,9 @@ private[csv] object CSVInferSchema {
private val numericPrecedence: IndexedSeq[DataType] = HiveTypeCoercion.numericPrecedence
/**
- * Copied from internal Spark api
- * [[org.apache.spark.sql.catalyst.analysis.HiveTypeCoercion]]
- */
+ * Copied from internal Spark api
+ * [[org.apache.spark.sql.catalyst.analysis.HiveTypeCoercion]]
+ */
val findTightestCommonType: (DataType, DataType) => Option[DataType] = {
case (t1, t2) if t1 == t2 => Some(t1)
case (NullType, t1) => Some(t1)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala
index c0d6f6fbf7..34fcbdf871 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala
@@ -38,8 +38,8 @@ import org.apache.spark.util.SerializableConfiguration
import org.apache.spark.util.collection.BitSet
/**
- * Provides access to CSV data from pure SQL statements.
- */
+ * Provides access to CSV data from pure SQL statements.
+ */
class DefaultSource extends FileFormat with DataSourceRegister {
override def shortName(): String = "csv"
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
index 877e159fbd..2e88d588be 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
@@ -51,11 +51,11 @@ case class DescribeCommand(
}
/**
- * Used to represent the operation of create table using a data source.
+ * Used to represent the operation of create table using a data source.
*
- * @param allowExisting If it is true, we will do nothing when the table already exists.
- * If it is false, an exception will be thrown
- */
+ * @param allowExisting If it is true, we will do nothing when the table already exists.
+ * If it is false, an exception will be thrown
+ */
case class CreateTableUsing(
tableIdent: TableIdentifier,
userSpecifiedSchema: Option[StructType],
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
index 0ed1ed41b0..41e566c27b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
@@ -122,8 +122,8 @@ case class BroadcastHashJoin(
}
/**
- * Returns a tuple of Broadcast of HashedRelation and the variable name for it.
- */
+ * Returns a tuple of Broadcast of HashedRelation and the variable name for it.
+ */
private def prepareBroadcast(ctx: CodegenContext): (Broadcast[HashedRelation], String) = {
// create a name for HashedRelation
val broadcastRelation = buildPlan.executeBroadcast[HashedRelation]()
@@ -139,9 +139,9 @@ case class BroadcastHashJoin(
}
/**
- * Returns the code for generating join key for stream side, and expression of whether the key
- * has any null in it or not.
- */
+ * Returns the code for generating join key for stream side, and expression of whether the key
+ * has any null in it or not.
+ */
private def genStreamSideJoinKey(
ctx: CodegenContext,
input: Seq[ExprCode]): (ExprCode, String) = {
@@ -160,8 +160,8 @@ case class BroadcastHashJoin(
}
/**
- * Generates the code for variable of build side.
- */
+ * Generates the code for variable of build side.
+ */
private def genBuildSideVars(ctx: CodegenContext, matched: String): Seq[ExprCode] = {
ctx.currentVars = null
ctx.INPUT_ROW = matched
@@ -188,8 +188,8 @@ case class BroadcastHashJoin(
}
/**
- * Generates the code for Inner join.
- */
+ * Generates the code for Inner join.
+ */
private def codegenInner(ctx: CodegenContext, input: Seq[ExprCode]): String = {
val (broadcastRelation, relationTerm) = prepareBroadcast(ctx)
val (keyEv, anyNull) = genStreamSideJoinKey(ctx, input)
@@ -254,8 +254,8 @@ case class BroadcastHashJoin(
/**
- * Generates the code for left or right outer join.
- */
+ * Generates the code for left or right outer join.
+ */
private def codegenOuter(ctx: CodegenContext, input: Seq[ExprCode]): String = {
val (broadcastRelation, relationTerm) = prepareBroadcast(ctx)
val (keyEv, anyNull) = genStreamSideJoinKey(ctx, input)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
index fb65b50da8..edb4c5a16f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
@@ -28,10 +28,10 @@ import org.apache.spark.util.CompletionIterator
import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter
/**
- * An optimized CartesianRDD for UnsafeRow, which will cache the rows from second child RDD,
- * will be much faster than building the right partition for every row in left RDD, it also
- * materialize the right RDD (in case of the right RDD is nondeterministic).
- */
+ * An optimized CartesianRDD for UnsafeRow, which will cache the rows from second child RDD,
+ * will be much faster than building the right partition for every row in left RDD, it also
+ * materialize the right RDD (in case of the right RDD is nondeterministic).
+ */
private[spark]
class UnsafeCartesianRDD(left : RDD[UnsafeRow], right : RDD[UnsafeRow], numFieldsOfRight: Int)
extends CartesianRDD[UnsafeRow, UnsafeRow](left.sparkContext, left, right) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
index 5f42d07273..c298b7dee0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
@@ -64,10 +64,10 @@ trait HashJoin {
}
/**
- * Try to rewrite the key as LongType so we can use getLong(), if they key can fit with a long.
- *
- * If not, returns the original expressions.
- */
+ * Try to rewrite the key as LongType so we can use getLong(), if they key can fit with a long.
+ *
+ * If not, returns the original expressions.
+ */
def rewriteKeyExpr(keys: Seq[Expression]): Seq[Expression] = {
var keyExpr: Expression = null
var width = 0
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
index dc4793e85a..91c470d187 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
@@ -38,20 +38,20 @@ import org.apache.spark.util.collection.CompactBuffer
*/
private[execution] sealed trait HashedRelation {
/**
- * Returns matched rows.
- */
+ * Returns matched rows.
+ */
def get(key: InternalRow): Seq[InternalRow]
/**
- * Returns matched rows for a key that has only one column with LongType.
- */
+ * Returns matched rows for a key that has only one column with LongType.
+ */
def get(key: Long): Seq[InternalRow] = {
throw new UnsupportedOperationException
}
/**
- * Returns the size of used memory.
- */
+ * Returns the size of used memory.
+ */
def getMemorySize: Long = 1L // to make the test happy
/**
@@ -77,20 +77,20 @@ private[execution] sealed trait HashedRelation {
}
/**
- * Interface for a hashed relation that have only one row per key.
- *
- * We should call getValue() for better performance.
- */
+ * Interface for a hashed relation that have only one row per key.
+ *
+ * We should call getValue() for better performance.
+ */
private[execution] trait UniqueHashedRelation extends HashedRelation {
/**
- * Returns the matched single row.
- */
+ * Returns the matched single row.
+ */
def getValue(key: InternalRow): InternalRow
/**
- * Returns the matched single row with key that have only one column of LongType.
- */
+ * Returns the matched single row with key that have only one column of LongType.
+ */
def getValue(key: Long): InternalRow = {
throw new UnsupportedOperationException
}
@@ -345,8 +345,8 @@ private[joins] object UnsafeHashedRelation {
}
/**
- * An interface for a hashed relation that the key is a Long.
- */
+ * An interface for a hashed relation that the key is a Long.
+ */
private[joins] trait LongHashedRelation extends HashedRelation {
override def get(key: InternalRow): Seq[InternalRow] = {
get(key.getLong(0))
@@ -396,26 +396,26 @@ private[joins] final class UniqueLongHashedRelation(
}
/**
- * A relation that pack all the rows into a byte array, together with offsets and sizes.
- *
- * All the bytes of UnsafeRow are packed together as `bytes`:
- *
- * [ Row0 ][ Row1 ][] ... [ RowN ]
- *
- * With keys:
- *
- * start start+1 ... start+N
- *
- * `offsets` are offsets of UnsafeRows in the `bytes`
- * `sizes` are the numbers of bytes of UnsafeRows, 0 means no row for this key.
- *
- * For example, two UnsafeRows (24 bytes and 32 bytes), with keys as 3 and 5 will stored as:
- *
- * start = 3
- * offsets = [0, 0, 24]
- * sizes = [24, 0, 32]
- * bytes = [0 - 24][][24 - 56]
- */
+ * A relation that pack all the rows into a byte array, together with offsets and sizes.
+ *
+ * All the bytes of UnsafeRow are packed together as `bytes`:
+ *
+ * [ Row0 ][ Row1 ][] ... [ RowN ]
+ *
+ * With keys:
+ *
+ * start start+1 ... start+N
+ *
+ * `offsets` are offsets of UnsafeRows in the `bytes`
+ * `sizes` are the numbers of bytes of UnsafeRows, 0 means no row for this key.
+ *
+ * For example, two UnsafeRows (24 bytes and 32 bytes), with keys as 3 and 5 will stored as:
+ *
+ * start = 3
+ * offsets = [0, 0, 24]
+ * sizes = [24, 0, 32]
+ * bytes = [0 - 24][][24 - 56]
+ */
private[joins] final class LongArrayRelation(
private var numFields: Int,
private var start: Long,
@@ -483,8 +483,8 @@ private[joins] final class LongArrayRelation(
}
/**
- * Create hashed relation with key that is long.
- */
+ * Create hashed relation with key that is long.
+ */
private[joins] object LongHashedRelation {
val DENSE_FACTOR = 0.2
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
index 60bd8ea39a..0e7b2f2f31 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
@@ -256,9 +256,9 @@ case class SortMergeJoin(
}
/**
- * Generate a function to scan both left and right to find a match, returns the term for
- * matched one row from left side and buffered rows from right side.
- */
+ * Generate a function to scan both left and right to find a match, returns the term for
+ * matched one row from left side and buffered rows from right side.
+ */
private def genScanner(ctx: CodegenContext): (String, String) = {
// Create class member for next row from both sides.
val leftRow = ctx.freshName("leftRow")
@@ -341,12 +341,12 @@ case class SortMergeJoin(
}
/**
- * Creates variables for left part of result row.
- *
- * In order to defer the access after condition and also only access once in the loop,
- * the variables should be declared separately from accessing the columns, we can't use the
- * codegen of BoundReference here.
- */
+ * Creates variables for left part of result row.
+ *
+ * In order to defer the access after condition and also only access once in the loop,
+ * the variables should be declared separately from accessing the columns, we can't use the
+ * codegen of BoundReference here.
+ */
private def createLeftVars(ctx: CodegenContext, leftRow: String): Seq[ExprCode] = {
ctx.INPUT_ROW = leftRow
left.output.zipWithIndex.map { case (a, i) =>
@@ -370,9 +370,9 @@ case class SortMergeJoin(
}
/**
- * Creates the variables for right part of result row, using BoundReference, since the right
- * part are accessed inside the loop.
- */
+ * Creates the variables for right part of result row, using BoundReference, since the right
+ * part are accessed inside the loop.
+ */
private def createRightVar(ctx: CodegenContext, rightRow: String): Seq[ExprCode] = {
ctx.INPUT_ROW = rightRow
right.output.zipWithIndex.map { case (a, i) =>
@@ -381,12 +381,12 @@ case class SortMergeJoin(
}
/**
- * Splits variables based on whether it's used by condition or not, returns the code to create
- * these variables before the condition and after the condition.
- *
- * Only a few columns are used by condition, then we can skip the accessing of those columns
- * that are not used by condition also filtered out by condition.
- */
+ * Splits variables based on whether it's used by condition or not, returns the code to create
+ * these variables before the condition and after the condition.
+ *
+ * Only a few columns are used by condition, then we can skip the accessing of those columns
+ * that are not used by condition also filtered out by condition.
+ */
private def splitVarsByCondition(
attributes: Seq[Attribute],
variables: Seq[ExprCode]): (String, String) = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
index 998eb82de1..8ece3c971a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
@@ -468,10 +468,10 @@ private[state] class HDFSBackedStateStoreProvider(
}
/**
- * Clean up old snapshots and delta files that are not needed any more. It ensures that last
- * few versions of the store can be recovered from the files, so re-executed RDD operations
- * can re-apply updates on the past versions of the store.
- */
+ * Clean up old snapshots and delta files that are not needed any more. It ensures that last
+ * few versions of the store can be recovered from the files, so re-executed RDD operations
+ * can re-apply updates on the past versions of the store.
+ */
private[state] def cleanup(): Unit = {
try {
val files = fetchFiles()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
index 24a01f5be1..012b125d6b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
@@ -45,8 +45,8 @@ private[ui] case class SparkPlanGraph(
}
/**
- * All the SparkPlanGraphNodes, including those inside of WholeStageCodegen.
- */
+ * All the SparkPlanGraphNodes, including those inside of WholeStageCodegen.
+ */
val allNodes: Seq[SparkPlanGraphNode] = {
nodes.flatMap {
case cluster: SparkPlanGraphCluster => cluster.nodes :+ cluster
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index baf947d037..da58ba2add 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -332,95 +332,94 @@ object functions {
}
/**
- * Aggregate function: returns the first value in a group.
- *
- * The function by default returns the first values it sees. It will return the first non-null
- * value it sees when ignoreNulls is set to true. If all values are null, then null is returned.
- *
- * @group agg_funcs
- * @since 2.0.0
- */
+ * Aggregate function: returns the first value in a group.
+ *
+ * The function by default returns the first values it sees. It will return the first non-null
+ * value it sees when ignoreNulls is set to true. If all values are null, then null is returned.
+ *
+ * @group agg_funcs
+ * @since 2.0.0
+ */
def first(e: Column, ignoreNulls: Boolean): Column = withAggregateFunction {
new First(e.expr, Literal(ignoreNulls))
}
/**
- * Aggregate function: returns the first value of a column in a group.
- *
- * The function by default returns the first values it sees. It will return the first non-null
- * value it sees when ignoreNulls is set to true. If all values are null, then null is returned.
- *
- * @group agg_funcs
- * @since 2.0.0
- */
+ * Aggregate function: returns the first value of a column in a group.
+ *
+ * The function by default returns the first values it sees. It will return the first non-null
+ * value it sees when ignoreNulls is set to true. If all values are null, then null is returned.
+ *
+ * @group agg_funcs
+ * @since 2.0.0
+ */
def first(columnName: String, ignoreNulls: Boolean): Column = {
first(Column(columnName), ignoreNulls)
}
/**
- * Aggregate function: returns the first value in a group.
- *
- * The function by default returns the first values it sees. It will return the first non-null
- * value it sees when ignoreNulls is set to true. If all values are null, then null is returned.
- *
- * @group agg_funcs
- * @since 1.3.0
- */
+ * Aggregate function: returns the first value in a group.
+ *
+ * The function by default returns the first values it sees. It will return the first non-null
+ * value it sees when ignoreNulls is set to true. If all values are null, then null is returned.
+ *
+ * @group agg_funcs
+ * @since 1.3.0
+ */
def first(e: Column): Column = first(e, ignoreNulls = false)
/**
- * Aggregate function: returns the first value of a column in a group.
- *
- * The function by default returns the first values it sees. It will return the first non-null
- * value it sees when ignoreNulls is set to true. If all values are null, then null is returned.
- *
- * @group agg_funcs
- * @since 1.3.0
- */
+ * Aggregate function: returns the first value of a column in a group.
+ *
+ * The function by default returns the first values it sees. It will return the first non-null
+ * value it sees when ignoreNulls is set to true. If all values are null, then null is returned.
+ *
+ * @group agg_funcs
+ * @since 1.3.0
+ */
def first(columnName: String): Column = first(Column(columnName))
-
/**
- * Aggregate function: indicates whether a specified column in a GROUP BY list is aggregated
- * or not, returns 1 for aggregated or 0 for not aggregated in the result set.
- *
- * @group agg_funcs
- * @since 2.0.0
- */
+ * Aggregate function: indicates whether a specified column in a GROUP BY list is aggregated
+ * or not, returns 1 for aggregated or 0 for not aggregated in the result set.
+ *
+ * @group agg_funcs
+ * @since 2.0.0
+ */
def grouping(e: Column): Column = Column(Grouping(e.expr))
/**
- * Aggregate function: indicates whether a specified column in a GROUP BY list is aggregated
- * or not, returns 1 for aggregated or 0 for not aggregated in the result set.
- *
- * @group agg_funcs
- * @since 2.0.0
- */
+ * Aggregate function: indicates whether a specified column in a GROUP BY list is aggregated
+ * or not, returns 1 for aggregated or 0 for not aggregated in the result set.
+ *
+ * @group agg_funcs
+ * @since 2.0.0
+ */
def grouping(columnName: String): Column = grouping(Column(columnName))
/**
- * Aggregate function: returns the level of grouping, equals to
- *
- * (grouping(c1) << (n-1)) + (grouping(c2) << (n-2)) + ... + grouping(cn)
- *
- * Note: the list of columns should match with grouping columns exactly, or empty (means all the
- * grouping columns).
- *
- * @group agg_funcs
- * @since 2.0.0
- */
+ * Aggregate function: returns the level of grouping, equals to
+ *
+ * (grouping(c1) << (n-1)) + (grouping(c2) << (n-2)) + ... + grouping(cn)
+ *
+ * Note: the list of columns should match with grouping columns exactly, or empty (means all the
+ * grouping columns).
+ *
+ * @group agg_funcs
+ * @since 2.0.0
+ */
def grouping_id(cols: Column*): Column = Column(GroupingID(cols.map(_.expr)))
/**
- * Aggregate function: returns the level of grouping, equals to
- *
- * (grouping(c1) << (n-1)) + (grouping(c2) << (n-2)) + ... + grouping(cn)
- *
- * Note: the list of columns should match with grouping columns exactly.
- *
- * @group agg_funcs
- * @since 2.0.0
- */
+ * Aggregate function: returns the level of grouping, equals to
+ *
+ * (grouping(c1) << (n-1)) + (grouping(c2) << (n-2)) + ... + grouping(cn)
+ *
+ * Note: the list of columns should match with grouping columns exactly.
+ *
+ * @group agg_funcs
+ * @since 2.0.0
+ */
def grouping_id(colName: String, colNames: String*): Column = {
grouping_id((Seq(colName) ++ colNames).map(n => Column(n)) : _*)
}
@@ -442,51 +441,51 @@ object functions {
def kurtosis(columnName: String): Column = kurtosis(Column(columnName))
/**
- * Aggregate function: returns the last value in a group.
- *
- * The function by default returns the last values it sees. It will return the last non-null
- * value it sees when ignoreNulls is set to true. If all values are null, then null is returned.
- *
- * @group agg_funcs
- * @since 2.0.0
- */
+ * Aggregate function: returns the last value in a group.
+ *
+ * The function by default returns the last values it sees. It will return the last non-null
+ * value it sees when ignoreNulls is set to true. If all values are null, then null is returned.
+ *
+ * @group agg_funcs
+ * @since 2.0.0
+ */
def last(e: Column, ignoreNulls: Boolean): Column = withAggregateFunction {
new Last(e.expr, Literal(ignoreNulls))
}
/**
- * Aggregate function: returns the last value of the column in a group.
- *
- * The function by default returns the last values it sees. It will return the last non-null
- * value it sees when ignoreNulls is set to true. If all values are null, then null is returned.
- *
- * @group agg_funcs
- * @since 2.0.0
- */
+ * Aggregate function: returns the last value of the column in a group.
+ *
+ * The function by default returns the last values it sees. It will return the last non-null
+ * value it sees when ignoreNulls is set to true. If all values are null, then null is returned.
+ *
+ * @group agg_funcs
+ * @since 2.0.0
+ */
def last(columnName: String, ignoreNulls: Boolean): Column = {
last(Column(columnName), ignoreNulls)
}
/**
- * Aggregate function: returns the last value in a group.
- *
- * The function by default returns the last values it sees. It will return the last non-null
- * value it sees when ignoreNulls is set to true. If all values are null, then null is returned.
- *
- * @group agg_funcs
- * @since 1.3.0
- */
+ * Aggregate function: returns the last value in a group.
+ *
+ * The function by default returns the last values it sees. It will return the last non-null
+ * value it sees when ignoreNulls is set to true. If all values are null, then null is returned.
+ *
+ * @group agg_funcs
+ * @since 1.3.0
+ */
def last(e: Column): Column = last(e, ignoreNulls = false)
/**
- * Aggregate function: returns the last value of the column in a group.
- *
- * The function by default returns the last values it sees. It will return the last non-null
- * value it sees when ignoreNulls is set to true. If all values are null, then null is returned.
- *
- * @group agg_funcs
- * @since 1.3.0
- */
+ * Aggregate function: returns the last value of the column in a group.
+ *
+ * The function by default returns the last values it sees. It will return the last non-null
+ * value it sees when ignoreNulls is set to true. If all values are null, then null is returned.
+ *
+ * @group agg_funcs
+ * @since 1.3.0
+ */
def last(columnName: String): Column = last(Column(columnName), ignoreNulls = false)
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index e8834d052c..14e14710f6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -152,19 +152,19 @@ trait StreamSinkProvider {
@DeveloperApi
trait CreatableRelationProvider {
/**
- * Creates a relation with the given parameters based on the contents of the given
- * DataFrame. The mode specifies the expected behavior of createRelation when
- * data already exists.
- * Right now, there are three modes, Append, Overwrite, and ErrorIfExists.
- * Append mode means that when saving a DataFrame to a data source, if data already exists,
- * contents of the DataFrame are expected to be appended to existing data.
- * Overwrite mode means that when saving a DataFrame to a data source, if data already exists,
- * existing data is expected to be overwritten by the contents of the DataFrame.
- * ErrorIfExists mode means that when saving a DataFrame to a data source,
- * if data already exists, an exception is expected to be thrown.
- *
- * @since 1.3.0
- */
+ * Creates a relation with the given parameters based on the contents of the given
+ * DataFrame. The mode specifies the expected behavior of createRelation when
+ * data already exists.
+ * Right now, there are three modes, Append, Overwrite, and ErrorIfExists.
+ * Append mode means that when saving a DataFrame to a data source, if data already exists,
+ * contents of the DataFrame are expected to be appended to existing data.
+ * Overwrite mode means that when saving a DataFrame to a data source, if data already exists,
+ * existing data is expected to be overwritten by the contents of the DataFrame.
+ * ErrorIfExists mode means that when saving a DataFrame to a data source,
+ * if data already exists, an exception is expected to be thrown.
+ *
+ * @since 1.3.0
+ */
def createRelation(
sqlContext: SQLContext,
mode: SaveMode,
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
index 854a662cc4..d160f8ab8c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
@@ -286,8 +286,8 @@ abstract class QueryTest extends PlanTest {
}
/**
- * Asserts that a given [[Dataset]] does not have missing inputs in all the analyzed plans.
- */
+ * Asserts that a given [[Dataset]] does not have missing inputs in all the analyzed plans.
+ */
def assertEmptyMissingInput(query: Dataset[_]): Unit = {
assert(query.queryExecution.analyzed.missingInput.isEmpty,
s"The analyzed logical plan has missing inputs: ${query.queryExecution.analyzed}")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala
index 55906793c0..289e1b6db9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala
@@ -32,10 +32,10 @@ import org.apache.spark.unsafe.map.BytesToBytesMap
import org.apache.spark.util.Benchmark
/**
- * Benchmark to measure whole stage codegen performance.
- * To run this:
- * build/sbt "sql/test-only *BenchmarkWholeStageCodegen"
- */
+ * Benchmark to measure whole stage codegen performance.
+ * To run this:
+ * build/sbt "sql/test-only *BenchmarkWholeStageCodegen"
+ */
class BenchmarkWholeStageCodegen extends SparkFunSuite {
lazy val conf = new SparkConf().setMaster("local[1]").setAppName("benchmark")
.set("spark.sql.shuffle.partitions", "1")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVParserSuite.scala
index dc54883277..aaeecef5f3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVParserSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVParserSuite.scala
@@ -20,8 +20,8 @@ package org.apache.spark.sql.execution.datasources.csv
import org.apache.spark.SparkFunSuite
/**
- * test cases for StringIteratorReader
- */
+ * test cases for StringIteratorReader
+ */
class CSVParserSuite extends SparkFunSuite {
private def readAll(iter: Iterator[String]) = {