aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDongjoon Hyun <dongjoon@apache.org>2016-04-02 17:50:40 -0700
committerReynold Xin <rxin@databricks.com>2016-04-02 17:50:40 -0700
commit4a6e78abd9d5edc4a5092738dff0006bbe202a89 (patch)
tree5ecbee86bb057139128b65b0f99405c51e637e38
parentf705037617d55bb479ec60bcb1e55c736224be94 (diff)
downloadspark-4a6e78abd9d5edc4a5092738dff0006bbe202a89.tar.gz
spark-4a6e78abd9d5edc4a5092738dff0006bbe202a89.tar.bz2
spark-4a6e78abd9d5edc4a5092738dff0006bbe202a89.zip
[MINOR][DOCS] Use multi-line JavaDoc comments in Scala code.
## What changes were proposed in this pull request? This PR aims to fix all Scala-Style multiline comments into Java-Style multiline comments in Scala codes. (All comment-only changes over 77 files: +786 lines, −747 lines) ## How was this patch tested? Manual. Author: Dongjoon Hyun <dongjoon@apache.org> Closes #12130 from dongjoon-hyun/use_multiine_javadoc_comments.
-rw-r--r--core/src/main/scala/org/apache/spark/FutureAction.scala14
-rw-r--r--core/src/main/scala/org/apache/spark/SSLOptions.scala57
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala42
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala60
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDD.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala24
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala20
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala17
-rw-r--r--core/src/test/scala/org/apache/spark/Smuggle.scala46
-rw-r--r--core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala24
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala4
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala20
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/GroupByTest.scala4
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/MultiBroadcastTest.scala4
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/SimpleSkewedGroupByTest.scala4
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/SkewedGroupByTest.scala4
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewGenerator.scala23
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewStream.scala21
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala15
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDDPartition.scala15
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala10
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala18
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/feature/ElementwiseProduct.scala6
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/api/python/GaussianMixtureModelWrapper.scala8
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/api/python/Word2VecModelWrapper.scala4
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala16
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionWithSGD.scala4
-rw-r--r--repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala21
-rw-r--r--repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkImports.scala5
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala24
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala20
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala6
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala26
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/grouping.scala18
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala4
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala40
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala4
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala28
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala4
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala6
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerExtendableSuite.scala14
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala14
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala172
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala36
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala22
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala16
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala22
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala22
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala76
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala36
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/functions.scala191
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala26
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala8
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVParserSuite.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala7
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala7
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala23
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala7
-rw-r--r--tools/src/main/scala/org/apache/spark/tools/GenerateMIMAIgnore.scala9
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala16
77 files changed, 786 insertions, 747 deletions
diff --git a/core/src/main/scala/org/apache/spark/FutureAction.scala b/core/src/main/scala/org/apache/spark/FutureAction.scala
index 2a8220ff40..ce11772a6d 100644
--- a/core/src/main/scala/org/apache/spark/FutureAction.scala
+++ b/core/src/main/scala/org/apache/spark/FutureAction.scala
@@ -146,16 +146,16 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc:
/**
- * Handle via which a "run" function passed to a [[ComplexFutureAction]]
- * can submit jobs for execution.
- */
+ * Handle via which a "run" function passed to a [[ComplexFutureAction]]
+ * can submit jobs for execution.
+ */
@DeveloperApi
trait JobSubmitter {
/**
- * Submit a job for execution and return a FutureAction holding the result.
- * This is a wrapper around the same functionality provided by SparkContext
- * to enable cancellation.
- */
+ * Submit a job for execution and return a FutureAction holding the result.
+ * This is a wrapper around the same functionality provided by SparkContext
+ * to enable cancellation.
+ */
def submitJob[T, U, R](
rdd: RDD[T],
processPartition: Iterator[T] => U,
diff --git a/core/src/main/scala/org/apache/spark/SSLOptions.scala b/core/src/main/scala/org/apache/spark/SSLOptions.scala
index 30db6ccbf4..719905a2c9 100644
--- a/core/src/main/scala/org/apache/spark/SSLOptions.scala
+++ b/core/src/main/scala/org/apache/spark/SSLOptions.scala
@@ -132,34 +132,35 @@ private[spark] case class SSLOptions(
private[spark] object SSLOptions extends Logging {
- /** Resolves SSLOptions settings from a given Spark configuration object at a given namespace.
- *
- * The following settings are allowed:
- * $ - `[ns].enabled` - `true` or `false`, to enable or disable SSL respectively
- * $ - `[ns].keyStore` - a path to the key-store file; can be relative to the current directory
- * $ - `[ns].keyStorePassword` - a password to the key-store file
- * $ - `[ns].keyPassword` - a password to the private key
- * $ - `[ns].keyStoreType` - the type of the key-store
- * $ - `[ns].needClientAuth` - whether SSL needs client authentication
- * $ - `[ns].trustStore` - a path to the trust-store file; can be relative to the current
- * directory
- * $ - `[ns].trustStorePassword` - a password to the trust-store file
- * $ - `[ns].trustStoreType` - the type of trust-store
- * $ - `[ns].protocol` - a protocol name supported by a particular Java version
- * $ - `[ns].enabledAlgorithms` - a comma separated list of ciphers
- *
- * For a list of protocols and ciphers supported by particular Java versions, you may go to
- * [[https://blogs.oracle.com/java-platform-group/entry/diagnosing_tls_ssl_and_https Oracle
- * blog page]].
- *
- * You can optionally specify the default configuration. If you do, for each setting which is
- * missing in SparkConf, the corresponding setting is used from the default configuration.
- *
- * @param conf Spark configuration object where the settings are collected from
- * @param ns the namespace name
- * @param defaults the default configuration
- * @return [[org.apache.spark.SSLOptions]] object
- */
+ /**
+ * Resolves SSLOptions settings from a given Spark configuration object at a given namespace.
+ *
+ * The following settings are allowed:
+ * $ - `[ns].enabled` - `true` or `false`, to enable or disable SSL respectively
+ * $ - `[ns].keyStore` - a path to the key-store file; can be relative to the current directory
+ * $ - `[ns].keyStorePassword` - a password to the key-store file
+ * $ - `[ns].keyPassword` - a password to the private key
+ * $ - `[ns].keyStoreType` - the type of the key-store
+ * $ - `[ns].needClientAuth` - whether SSL needs client authentication
+ * $ - `[ns].trustStore` - a path to the trust-store file; can be relative to the current
+ * directory
+ * $ - `[ns].trustStorePassword` - a password to the trust-store file
+ * $ - `[ns].trustStoreType` - the type of trust-store
+ * $ - `[ns].protocol` - a protocol name supported by a particular Java version
+ * $ - `[ns].enabledAlgorithms` - a comma separated list of ciphers
+ *
+ * For a list of protocols and ciphers supported by particular Java versions, you may go to
+ * [[https://blogs.oracle.com/java-platform-group/entry/diagnosing_tls_ssl_and_https Oracle
+ * blog page]].
+ *
+ * You can optionally specify the default configuration. If you do, for each setting which is
+ * missing in SparkConf, the corresponding setting is used from the default configuration.
+ *
+ * @param conf Spark configuration object where the settings are collected from
+ * @param ns the namespace name
+ * @param defaults the default configuration
+ * @return [[org.apache.spark.SSLOptions]] object
+ */
def parse(conf: SparkConf, ns: String, defaults: Option[SSLOptions] = None): SSLOptions = {
val enabled = conf.getBoolean(s"$ns.enabled", defaultValue = defaults.exists(_.enabled))
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index d7cb253d69..4b3264cbf5 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -773,9 +773,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
parallelize(seq, numSlices)
}
- /** Distribute a local Scala collection to form an RDD, with one or more
- * location preferences (hostnames of Spark nodes) for each object.
- * Create a new partition for each collection item. */
+ /**
+ * Distribute a local Scala collection to form an RDD, with one or more
+ * location preferences (hostnames of Spark nodes) for each object.
+ * Create a new partition for each collection item.
+ */
def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T] = withScope {
assertNotStopped()
val indexToPrefs = seq.zipWithIndex.map(t => (t._2, t._1._2)).toMap
@@ -1095,14 +1097,15 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
new NewHadoopRDD(this, fClass, kClass, vClass, jconf)
}
- /** Get an RDD for a Hadoop SequenceFile with given key and value types.
- *
- * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
- * record, directly caching the returned RDD or directly passing it to an aggregation or shuffle
- * operation will create many references to the same object.
- * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
- * copy them using a `map` function.
- */
+ /**
+ * Get an RDD for a Hadoop SequenceFile with given key and value types.
+ *
+ * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
+ * record, directly caching the returned RDD or directly passing it to an aggregation or shuffle
+ * operation will create many references to the same object.
+ * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
+ * copy them using a `map` function.
+ */
def sequenceFile[K, V](path: String,
keyClass: Class[K],
valueClass: Class[V],
@@ -1113,14 +1116,15 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
hadoopFile(path, inputFormatClass, keyClass, valueClass, minPartitions)
}
- /** Get an RDD for a Hadoop SequenceFile with given key and value types.
- *
- * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
- * record, directly caching the returned RDD or directly passing it to an aggregation or shuffle
- * operation will create many references to the same object.
- * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
- * copy them using a `map` function.
- * */
+ /**
+ * Get an RDD for a Hadoop SequenceFile with given key and value types.
+ *
+ * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
+ * record, directly caching the returned RDD or directly passing it to an aggregation or shuffle
+ * operation will create many references to the same object.
+ * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
+ * copy them using a `map` function.
+ */
def sequenceFile[K, V](
path: String,
keyClass: Class[K],
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
index e080f91f50..2897272a8b 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
@@ -461,10 +461,10 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
fromRDD(rdd.partitionBy(partitioner))
/**
- * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each
- * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
- * (k, v2) is in `other`. Uses the given Partitioner to partition the output RDD.
- */
+ * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each
+ * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
+ * (k, v2) is in `other`. Uses the given Partitioner to partition the output RDD.
+ */
def join[W](other: JavaPairRDD[K, W], partitioner: Partitioner): JavaPairRDD[K, (V, W)] =
fromRDD(rdd.join(other, partitioner))
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
index d362c40b7a..dfd91ae338 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
@@ -295,13 +295,14 @@ class JavaSparkContext(val sc: SparkContext)
new JavaRDD(sc.binaryRecords(path, recordLength))
}
- /** Get an RDD for a Hadoop SequenceFile with given key and value types.
- *
- * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
- * record, directly caching the returned RDD will create many references to the same object.
- * If you plan to directly cache Hadoop writable objects, you should first copy them using
- * a `map` function.
- * */
+ /**
+ * Get an RDD for a Hadoop SequenceFile with given key and value types.
+ *
+ * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
+ * record, directly caching the returned RDD will create many references to the same object.
+ * If you plan to directly cache Hadoop writable objects, you should first copy them using
+ * a `map` function.
+ */
def sequenceFile[K, V](path: String,
keyClass: Class[K],
valueClass: Class[V],
@@ -312,13 +313,14 @@ class JavaSparkContext(val sc: SparkContext)
new JavaPairRDD(sc.sequenceFile(path, keyClass, valueClass, minPartitions))
}
- /** Get an RDD for a Hadoop SequenceFile.
- *
- * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
- * record, directly caching the returned RDD will create many references to the same object.
- * If you plan to directly cache Hadoop writable objects, you should first copy them using
- * a `map` function.
- */
+ /**
+ * Get an RDD for a Hadoop SequenceFile.
+ *
+ * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
+ * record, directly caching the returned RDD will create many references to the same object.
+ * If you plan to directly cache Hadoop writable objects, you should first copy them using
+ * a `map` function.
+ */
def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V]):
JavaPairRDD[K, V] = {
implicit val ctagK: ClassTag[K] = ClassTag(keyClass)
@@ -411,13 +413,14 @@ class JavaSparkContext(val sc: SparkContext)
new JavaHadoopRDD(rdd.asInstanceOf[HadoopRDD[K, V]])
}
- /** Get an RDD for a Hadoop file with an arbitrary InputFormat.
- *
- * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
- * record, directly caching the returned RDD will create many references to the same object.
- * If you plan to directly cache Hadoop writable objects, you should first copy them using
- * a `map` function.
- */
+ /**
+ * Get an RDD for a Hadoop file with an arbitrary InputFormat.
+ *
+ * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
+ * record, directly caching the returned RDD will create many references to the same object.
+ * If you plan to directly cache Hadoop writable objects, you should first copy them using
+ * a `map` function.
+ */
def hadoopFile[K, V, F <: InputFormat[K, V]](
path: String,
inputFormatClass: Class[F],
@@ -431,13 +434,14 @@ class JavaSparkContext(val sc: SparkContext)
new JavaHadoopRDD(rdd.asInstanceOf[HadoopRDD[K, V]])
}
- /** Get an RDD for a Hadoop file with an arbitrary InputFormat
- *
- * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
- * record, directly caching the returned RDD will create many references to the same object.
- * If you plan to directly cache Hadoop writable objects, you should first copy them using
- * a `map` function.
- */
+ /**
+ * Get an RDD for a Hadoop file with an arbitrary InputFormat
+ *
+ * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each
+ * record, directly caching the returned RDD will create many references to the same object.
+ * If you plan to directly cache Hadoop writable objects, you should first copy them using
+ * a `map` function.
+ */
def hadoopFile[K, V, F <: InputFormat[K, V]](
path: String,
inputFormatClass: Class[F],
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
index a4efafcb27..cba4aaffe2 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala
@@ -29,7 +29,7 @@ import org.apache.spark.launcher.WorkerCommandBuilder
import org.apache.spark.util.Utils
/**
- ** Utilities for running commands with the spark classpath.
+ * Utilities for running commands with the spark classpath.
*/
private[deploy]
object CommandUtils extends Logging {
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
index e5ebc63082..7bc1eb0436 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
@@ -29,10 +29,12 @@ import org.apache.spark.serializer.Serializer
import org.apache.spark.util.collection.{CompactBuffer, ExternalAppendOnlyMap}
import org.apache.spark.util.Utils
-/** The references to rdd and splitIndex are transient because redundant information is stored
- * in the CoGroupedRDD object. Because CoGroupedRDD is serialized separately from
- * CoGroupPartition, if rdd and splitIndex aren't transient, they'll be included twice in the
- * task closure. */
+/**
+ * The references to rdd and splitIndex are transient because redundant information is stored
+ * in the CoGroupedRDD object. Because CoGroupedRDD is serialized separately from
+ * CoGroupPartition, if rdd and splitIndex aren't transient, they'll be included twice in the
+ * task closure.
+ */
private[spark] case class NarrowCoGroupSplitDep(
@transient rdd: RDD[_],
@transient splitIndex: Int,
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index f96551c793..4a0a2199ef 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -255,8 +255,8 @@ abstract class RDD[T: ClassTag](
}
/**
- * Returns the number of partitions of this RDD.
- */
+ * Returns the number of partitions of this RDD.
+ */
@Since("1.6.0")
final def getNumPartitions: Int = partitions.length
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index 90b1813750..50b452c72f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -295,12 +295,12 @@ private[spark] class CoarseMesosSchedulerBackend(
}
/**
- * Launches executors on accepted offers, and declines unused offers. Executors are launched
- * round-robin on offers.
- *
- * @param d SchedulerDriver
- * @param offers Mesos offers that match attribute constraints
- */
+ * Launches executors on accepted offers, and declines unused offers. Executors are launched
+ * round-robin on offers.
+ *
+ * @param d SchedulerDriver
+ * @param offers Mesos offers that match attribute constraints
+ */
private def handleMatchedOffers(d: SchedulerDriver, offers: Buffer[Offer]): Unit = {
val tasks = buildMesosTasks(offers)
for (offer <- offers) {
@@ -336,12 +336,12 @@ private[spark] class CoarseMesosSchedulerBackend(
}
/**
- * Returns a map from OfferIDs to the tasks to launch on those offers. In order to maximize
- * per-task memory and IO, tasks are round-robin assigned to offers.
- *
- * @param offers Mesos offers that match attribute constraints
- * @return A map from OfferID to a list of Mesos tasks to launch on that offer
- */
+ * Returns a map from OfferIDs to the tasks to launch on those offers. In order to maximize
+ * per-task memory and IO, tasks are round-robin assigned to offers.
+ *
+ * @param offers Mesos offers that match attribute constraints
+ * @return A map from OfferID to a list of Mesos tasks to launch on that offer
+ */
private def buildMesosTasks(offers: Buffer[Offer]): Map[OfferID, List[MesosTaskInfo]] = {
// offerID -> tasks
val tasks = new HashMap[OfferID, List[MesosTaskInfo]].withDefaultValue(Nil)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
index c41fa58607..73bd4c58e1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
@@ -453,12 +453,12 @@ private[spark] class MesosClusterScheduler(
}
/**
- * Escape args for Unix-like shells, unless already quoted by the user.
- * Based on: http://www.gnu.org/software/bash/manual/html_node/Double-Quotes.html
- * and http://www.grymoire.com/Unix/Quote.html
- * @param value argument
- * @return escaped argument
- */
+ * Escape args for Unix-like shells, unless already quoted by the user.
+ * Based on: http://www.gnu.org/software/bash/manual/html_node/Double-Quotes.html
+ * and http://www.grymoire.com/Unix/Quote.html
+ * @param value argument
+ * @return escaped argument
+ */
private[scheduler] def shellEscape(value: String): String = {
val WrappedInQuotes = """^(".+"|'.+')$""".r
val ShellSpecialChars = (""".*([ '<>&|\?\*;!#\\(\)"$`]).*""").r
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
index 9a12a61f2f..35f914355d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
@@ -148,8 +148,8 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
}
/**
- * Signal that the scheduler has registered with Mesos.
- */
+ * Signal that the scheduler has registered with Mesos.
+ */
protected def markRegistered(): Unit = {
registerLatch.countDown()
}
diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala
index 76fd249fbd..364fad664e 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala
@@ -54,9 +54,9 @@ private[spark] trait ShuffleManager {
context: TaskContext): ShuffleReader[K, C]
/**
- * Remove a shuffle's metadata from the ShuffleManager.
- * @return true if the metadata removed successfully, otherwise false.
- */
+ * Remove a shuffle's metadata from the ShuffleManager.
+ * @return true if the metadata removed successfully, otherwise false.
+ */
def unregisterShuffle(shuffleId: Int): Boolean
/**
diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
index df38d11e43..99be4de065 100644
--- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala
@@ -455,16 +455,16 @@ private[spark] class MemoryStore(
}
/**
- * Try to evict blocks to free up a given amount of space to store a particular block.
- * Can fail if either the block is bigger than our memory or it would require replacing
- * another block from the same RDD (which leads to a wasteful cyclic replacement pattern for
- * RDDs that don't fit into memory that we want to avoid).
- *
- * @param blockId the ID of the block we are freeing space for, if any
- * @param space the size of this block
- * @param memoryMode the type of memory to free (on- or off-heap)
- * @return the amount of memory (in bytes) freed by eviction
- */
+ * Try to evict blocks to free up a given amount of space to store a particular block.
+ * Can fail if either the block is bigger than our memory or it would require replacing
+ * another block from the same RDD (which leads to a wasteful cyclic replacement pattern for
+ * RDDs that don't fit into memory that we want to avoid).
+ *
+ * @param blockId the ID of the block we are freeing space for, if any
+ * @param space the size of this block
+ * @param memoryMode the type of memory to free (on- or off-heap)
+ * @return the amount of memory (in bytes) freed by eviction
+ */
private[spark] def evictBlocksToFreeSpace(
blockId: Option[BlockId],
space: Long,
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 73768ff4c8..50bcf85805 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -256,10 +256,11 @@ private[spark] object Utils extends Logging {
dir
}
- /** Copy all data from an InputStream to an OutputStream. NIO way of file stream to file stream
- * copying is disabled by default unless explicitly set transferToEnabled as true,
- * the parameter transferToEnabled should be configured by spark.file.transferTo = [true|false].
- */
+ /**
+ * Copy all data from an InputStream to an OutputStream. NIO way of file stream to file stream
+ * copying is disabled by default unless explicitly set transferToEnabled as true,
+ * the parameter transferToEnabled should be configured by spark.file.transferTo = [true|false].
+ */
def copyStream(in: InputStream,
out: OutputStream,
closeStreams: Boolean = false,
@@ -1564,9 +1565,11 @@ private[spark] object Utils extends Logging {
else -1
}
- /** Returns the system properties map that is thread-safe to iterator over. It gets the
- * properties which have been set explicitly, as well as those for which only a default value
- * has been defined. */
+ /**
+ * Returns the system properties map that is thread-safe to iterator over. It gets the
+ * properties which have been set explicitly, as well as those for which only a default value
+ * has been defined.
+ */
def getSystemProperties: Map[String, String] = {
System.getProperties.stringPropertyNames().asScala
.map(key => (key, System.getProperty(key))).toMap
diff --git a/core/src/test/scala/org/apache/spark/Smuggle.scala b/core/src/test/scala/org/apache/spark/Smuggle.scala
index 9f0a1b4c25..9d9217ea1b 100644
--- a/core/src/test/scala/org/apache/spark/Smuggle.scala
+++ b/core/src/test/scala/org/apache/spark/Smuggle.scala
@@ -24,16 +24,16 @@ import scala.collection.mutable
import scala.language.implicitConversions
/**
- * Utility wrapper to "smuggle" objects into tasks while bypassing serialization.
- * This is intended for testing purposes, primarily to make locks, semaphores, and
- * other constructs that would not survive serialization available from within tasks.
- * A Smuggle reference is itself serializable, but after being serialized and
- * deserialized, it still refers to the same underlying "smuggled" object, as long
- * as it was deserialized within the same JVM. This can be useful for tests that
- * depend on the timing of task completion to be deterministic, since one can "smuggle"
- * a lock or semaphore into the task, and then the task can block until the test gives
- * the go-ahead to proceed via the lock.
- */
+ * Utility wrapper to "smuggle" objects into tasks while bypassing serialization.
+ * This is intended for testing purposes, primarily to make locks, semaphores, and
+ * other constructs that would not survive serialization available from within tasks.
+ * A Smuggle reference is itself serializable, but after being serialized and
+ * deserialized, it still refers to the same underlying "smuggled" object, as long
+ * as it was deserialized within the same JVM. This can be useful for tests that
+ * depend on the timing of task completion to be deterministic, since one can "smuggle"
+ * a lock or semaphore into the task, and then the task can block until the test gives
+ * the go-ahead to proceed via the lock.
+ */
class Smuggle[T] private(val key: Symbol) extends Serializable {
def smuggledObject: T = Smuggle.get(key)
}
@@ -41,13 +41,13 @@ class Smuggle[T] private(val key: Symbol) extends Serializable {
object Smuggle {
/**
- * Wraps the specified object to be smuggled into a serialized task without
- * being serialized itself.
- *
- * @param smuggledObject
- * @tparam T
- * @return Smuggle wrapper around smuggledObject.
- */
+ * Wraps the specified object to be smuggled into a serialized task without
+ * being serialized itself.
+ *
+ * @param smuggledObject
+ * @tparam T
+ * @return Smuggle wrapper around smuggledObject.
+ */
def apply[T](smuggledObject: T): Smuggle[T] = {
val key = Symbol(UUID.randomUUID().toString)
lock.writeLock().lock()
@@ -72,12 +72,12 @@ object Smuggle {
}
/**
- * Implicit conversion of a Smuggle wrapper to the object being smuggled.
- *
- * @param smuggle the wrapper to unpack.
- * @tparam T
- * @return the smuggled object represented by the wrapper.
- */
+ * Implicit conversion of a Smuggle wrapper to the object being smuggled.
+ *
+ * @param smuggle the wrapper to unpack.
+ * @tparam T
+ * @return the smuggled object represented by the wrapper.
+ */
implicit def unpackSmuggledObject[T](smuggle : Smuggle[T]): T = smuggle.smuggledObject
}
diff --git a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
index 3d1a0e9795..99d5b496bc 100644
--- a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
@@ -78,18 +78,18 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite with BeforeAndAft
}
/**
- * Simulate the part of [[MemoryStore.evictBlocksToFreeSpace]] that releases storage memory.
- *
- * This is a significant simplification of the real method, which actually drops existing
- * blocks based on the size of each block. Instead, here we simply release as many bytes
- * as needed to ensure the requested amount of free space. This allows us to set up the
- * test without relying on the [[org.apache.spark.storage.BlockManager]], which brings in
- * many other dependencies.
- *
- * Every call to this method will set a global variable, [[evictBlocksToFreeSpaceCalled]], that
- * records the number of bytes this is called with. This variable is expected to be cleared
- * by the test code later through [[assertEvictBlocksToFreeSpaceCalled]].
- */
+ * Simulate the part of [[MemoryStore.evictBlocksToFreeSpace]] that releases storage memory.
+ *
+ * This is a significant simplification of the real method, which actually drops existing
+ * blocks based on the size of each block. Instead, here we simply release as many bytes
+ * as needed to ensure the requested amount of free space. This allows us to set up the
+ * test without relying on the [[org.apache.spark.storage.BlockManager]], which brings in
+ * many other dependencies.
+ *
+ * Every call to this method will set a global variable, [[evictBlocksToFreeSpaceCalled]], that
+ * records the number of bytes this is called with. This variable is expected to be cleared
+ * by the test code later through [[assertEvictBlocksToFreeSpaceCalled]].
+ */
private def evictBlocksToFreeSpaceAnswer(mm: MemoryManager): Answer[Long] = {
new Answer[Long] {
override def answer(invocation: InvocationOnMock): Long = {
diff --git a/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala b/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala
index 3da5236745..af5a815f6e 100644
--- a/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala
@@ -21,8 +21,8 @@ package org.apache.spark.examples
import org.apache.spark.{SparkConf, SparkContext}
/**
- * Usage: BroadcastTest [slices] [numElem] [blockSize]
- */
+ * Usage: BroadcastTest [slices] [numElem] [blockSize]
+ */
object BroadcastTest {
def main(args: Array[String]) {
diff --git a/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala b/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala
index 743fc13db7..7bf023667d 100644
--- a/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala
@@ -25,16 +25,16 @@ import scala.io.Source._
import org.apache.spark.{SparkConf, SparkContext}
/**
- * Simple test for reading and writing to a distributed
- * file system. This example does the following:
- *
- * 1. Reads local file
- * 2. Computes word count on local file
- * 3. Writes local file to a DFS
- * 4. Reads the file back from the DFS
- * 5. Computes word count on the file using Spark
- * 6. Compares the word count results
- */
+ * Simple test for reading and writing to a distributed
+ * file system. This example does the following:
+ *
+ * 1. Reads local file
+ * 2. Computes word count on local file
+ * 3. Writes local file to a DFS
+ * 4. Reads the file back from the DFS
+ * 5. Computes word count on the file using Spark
+ * 6. Compares the word count results
+ */
object DFSReadWriteTest {
private var localFilePath: File = new File(".")
diff --git a/examples/src/main/scala/org/apache/spark/examples/GroupByTest.scala b/examples/src/main/scala/org/apache/spark/examples/GroupByTest.scala
index 08b6c717d4..4db229b5de 100644
--- a/examples/src/main/scala/org/apache/spark/examples/GroupByTest.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/GroupByTest.scala
@@ -23,8 +23,8 @@ import java.util.Random
import org.apache.spark.{SparkConf, SparkContext}
/**
- * Usage: GroupByTest [numMappers] [numKVPairs] [KeySize] [numReducers]
- */
+ * Usage: GroupByTest [numMappers] [numKVPairs] [KeySize] [numReducers]
+ */
object GroupByTest {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("GroupBy Test")
diff --git a/examples/src/main/scala/org/apache/spark/examples/MultiBroadcastTest.scala b/examples/src/main/scala/org/apache/spark/examples/MultiBroadcastTest.scala
index 134c3d1d63..3eb0c27723 100644
--- a/examples/src/main/scala/org/apache/spark/examples/MultiBroadcastTest.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/MultiBroadcastTest.scala
@@ -22,8 +22,8 @@ import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
/**
- * Usage: MultiBroadcastTest [slices] [numElem]
- */
+ * Usage: MultiBroadcastTest [slices] [numElem]
+ */
object MultiBroadcastTest {
def main(args: Array[String]) {
diff --git a/examples/src/main/scala/org/apache/spark/examples/SimpleSkewedGroupByTest.scala b/examples/src/main/scala/org/apache/spark/examples/SimpleSkewedGroupByTest.scala
index 7c09664c2f..ec07e6323e 100644
--- a/examples/src/main/scala/org/apache/spark/examples/SimpleSkewedGroupByTest.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/SimpleSkewedGroupByTest.scala
@@ -23,8 +23,8 @@ import java.util.Random
import org.apache.spark.{SparkConf, SparkContext}
/**
- * Usage: SimpleSkewedGroupByTest [numMappers] [numKVPairs] [valSize] [numReducers] [ratio]
- */
+ * Usage: SimpleSkewedGroupByTest [numMappers] [numKVPairs] [valSize] [numReducers] [ratio]
+ */
object SimpleSkewedGroupByTest {
def main(args: Array[String]) {
diff --git a/examples/src/main/scala/org/apache/spark/examples/SkewedGroupByTest.scala b/examples/src/main/scala/org/apache/spark/examples/SkewedGroupByTest.scala
index d498af9c39..8e4c2b6229 100644
--- a/examples/src/main/scala/org/apache/spark/examples/SkewedGroupByTest.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/SkewedGroupByTest.scala
@@ -23,8 +23,8 @@ import java.util.Random
import org.apache.spark.{SparkConf, SparkContext}
/**
- * Usage: GroupByTest [numMappers] [numKVPairs] [KeySize] [numReducers]
- */
+ * Usage: GroupByTest [numMappers] [numKVPairs] [KeySize] [numReducers]
+ */
object SkewedGroupByTest {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("GroupBy Test")
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewGenerator.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewGenerator.scala
index 50216b9bd4..0ddd065f0d 100644
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewGenerator.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewGenerator.scala
@@ -38,17 +38,18 @@ object PageView extends Serializable {
}
// scalastyle:off
-/** Generates streaming events to simulate page views on a website.
- *
- * This should be used in tandem with PageViewStream.scala. Example:
- *
- * To run the generator
- * `$ bin/run-example org.apache.spark.examples.streaming.clickstream.PageViewGenerator 44444 10`
- * To process the generated stream
- * `$ bin/run-example \
- * org.apache.spark.examples.streaming.clickstream.PageViewStream errorRatePerZipCode localhost 44444`
- *
- */
+/**
+ * Generates streaming events to simulate page views on a website.
+ *
+ * This should be used in tandem with PageViewStream.scala. Example:
+ *
+ * To run the generator
+ * `$ bin/run-example org.apache.spark.examples.streaming.clickstream.PageViewGenerator 44444 10`
+ * To process the generated stream
+ * `$ bin/run-example \
+ * org.apache.spark.examples.streaming.clickstream.PageViewStream errorRatePerZipCode localhost 44444`
+ *
+ */
// scalastyle:on
object PageViewGenerator {
val pages = Map("http://foo.com/" -> .7,
diff --git a/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewStream.scala b/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewStream.scala
index 773a2e5fc2..1ba093f57b 100644
--- a/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewStream.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/streaming/clickstream/PageViewStream.scala
@@ -22,16 +22,17 @@ import org.apache.spark.examples.streaming.StreamingExamples
import org.apache.spark.streaming.{Seconds, StreamingContext}
// scalastyle:off
-/** Analyses a streaming dataset of web page views. This class demonstrates several types of
- * operators available in Spark streaming.
- *
- * This should be used in tandem with PageViewStream.scala. Example:
- * To run the generator
- * `$ bin/run-example org.apache.spark.examples.streaming.clickstream.PageViewGenerator 44444 10`
- * To process the generated stream
- * `$ bin/run-example \
- * org.apache.spark.examples.streaming.clickstream.PageViewStream errorRatePerZipCode localhost 44444`
- */
+/**
+ * Analyses a streaming dataset of web page views. This class demonstrates several types of
+ * operators available in Spark streaming.
+ *
+ * This should be used in tandem with PageViewStream.scala. Example:
+ * To run the generator
+ * `$ bin/run-example org.apache.spark.examples.streaming.clickstream.PageViewGenerator 44444 10`
+ * To process the generated stream
+ * `$ bin/run-example \
+ * org.apache.spark.examples.streaming.clickstream.PageViewStream errorRatePerZipCode localhost 44444`
+ */
// scalastyle:on
object PageViewStream {
def main(args: Array[String]) {
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
index 7dc9606913..6e7c3f358e 100644
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
@@ -185,13 +185,14 @@ class FlumeReceiver(
override def preferredLocation: Option[String] = Option(host)
- /** A Netty Pipeline factory that will decompress incoming data from
- * and the Netty client and compress data going back to the client.
- *
- * The compression on the return is required because Flume requires
- * a successful response to indicate it can remove the event/batch
- * from the configured channel
- */
+ /**
+ * A Netty Pipeline factory that will decompress incoming data from
+ * and the Netty client and compress data going back to the client.
+ *
+ * The compression on the return is required because Flume requires
+ * a successful response to indicate it can remove the event/batch
+ * from the configured channel
+ */
private[streaming]
class CompressionChannelPipelineFactory extends ChannelPipelineFactory {
def getPipeline(): ChannelPipeline = {
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDDPartition.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDDPartition.scala
index a660d2a00c..02917becf0 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDDPartition.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDDPartition.scala
@@ -19,13 +19,14 @@ package org.apache.spark.streaming.kafka
import org.apache.spark.Partition
-/** @param topic kafka topic name
- * @param partition kafka partition id
- * @param fromOffset inclusive starting offset
- * @param untilOffset exclusive ending offset
- * @param host preferred kafka host, i.e. the leader at the time the rdd was created
- * @param port preferred kafka host's port
- */
+/**
+ * @param topic kafka topic name
+ * @param partition kafka partition id
+ * @param fromOffset inclusive starting offset
+ * @param untilOffset exclusive ending offset
+ * @param host preferred kafka host, i.e. the leader at the time the rdd was created
+ * @param port preferred kafka host's port
+ */
private[kafka]
class KafkaRDDPartition(
val index: Int,
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
index a783fe305f..868658dfe5 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
@@ -415,11 +415,11 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali
}
/**
- * Compute the connected component membership of each vertex and return a graph with the vertex
- * value containing the lowest vertex id in the connected component containing that vertex.
- *
- * @see [[org.apache.spark.graphx.lib.ConnectedComponents$#run]]
- */
+ * Compute the connected component membership of each vertex and return a graph with the vertex
+ * value containing the lowest vertex id in the connected component containing that vertex.
+ *
+ * @see [[org.apache.spark.graphx.lib.ConnectedComponents$#run]]
+ */
def connectedComponents(): Graph[VertexId, ED] = {
ConnectedComponents.run(graph)
}
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala
index 137c512c99..4e9b13162e 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/ConnectedComponents.scala
@@ -60,15 +60,15 @@ object ConnectedComponents {
} // end of connectedComponents
/**
- * Compute the connected component membership of each vertex and return a graph with the vertex
- * value containing the lowest vertex id in the connected component containing that vertex.
- *
- * @tparam VD the vertex attribute type (discarded in the computation)
- * @tparam ED the edge attribute type (preserved in the computation)
- * @param graph the graph for which to compute the connected components
- * @return a graph with vertex attributes containing the smallest vertex in each
- * connected component
- */
+ * Compute the connected component membership of each vertex and return a graph with the vertex
+ * value containing the lowest vertex id in the connected component containing that vertex.
+ *
+ * @tparam VD the vertex attribute type (discarded in the computation)
+ * @tparam ED the edge attribute type (preserved in the computation)
+ * @param graph the graph for which to compute the connected components
+ * @return a graph with vertex attributes containing the smallest vertex in each
+ * connected component
+ */
def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[VertexId, ED] = {
run(graph, Int.MaxValue)
}
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/ElementwiseProduct.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/ElementwiseProduct.scala
index 2c7ffdb7ba..1b0a9a12e8 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/ElementwiseProduct.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/ElementwiseProduct.scala
@@ -38,9 +38,9 @@ class ElementwiseProduct(override val uid: String)
def this() = this(Identifiable.randomUID("elemProd"))
/**
- * the vector to multiply with input vectors
- * @group param
- */
+ * the vector to multiply with input vectors
+ * @group param
+ */
val scalingVec: Param[Vector] = new Param(this, "scalingVec", "vector for hadamard product")
/** @group setParam */
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/GaussianMixtureModelWrapper.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/GaussianMixtureModelWrapper.scala
index a689b09341..364d5eea08 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/GaussianMixtureModelWrapper.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/GaussianMixtureModelWrapper.scala
@@ -24,15 +24,15 @@ import org.apache.spark.mllib.clustering.GaussianMixtureModel
import org.apache.spark.mllib.linalg.{Vector, Vectors}
/**
- * Wrapper around GaussianMixtureModel to provide helper methods in Python
- */
+ * Wrapper around GaussianMixtureModel to provide helper methods in Python
+ */
private[python] class GaussianMixtureModelWrapper(model: GaussianMixtureModel) {
val weights: Vector = Vectors.dense(model.weights)
val k: Int = weights.size
/**
- * Returns gaussians as a List of Vectors and Matrices corresponding each MultivariateGaussian
- */
+ * Returns gaussians as a List of Vectors and Matrices corresponding each MultivariateGaussian
+ */
val gaussians: Array[Byte] = {
val modelGaussians = model.gaussians.map { gaussian =>
Array[Any](gaussian.mu, gaussian.sigma)
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/Word2VecModelWrapper.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/Word2VecModelWrapper.scala
index 073f03e16f..05273c3434 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/Word2VecModelWrapper.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/Word2VecModelWrapper.scala
@@ -27,8 +27,8 @@ import org.apache.spark.mllib.feature.Word2VecModel
import org.apache.spark.mllib.linalg.{Vector, Vectors}
/**
- * Wrapper around Word2VecModel to provide helper methods in Python
- */
+ * Wrapper around Word2VecModel to provide helper methods in Python
+ */
private[python] class Word2VecModelWrapper(model: Word2VecModel) {
def transform(word: String): Vector = {
model.transform(word)
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala
index 6e571fe35a..8c09b69b3c 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala
@@ -123,14 +123,18 @@ sealed trait Matrix extends Serializable {
@Since("1.4.0")
def toString(maxLines: Int, maxLineWidth: Int): String = toBreeze.toString(maxLines, maxLineWidth)
- /** Map the values of this matrix using a function. Generates a new matrix. Performs the
- * function on only the backing array. For example, an operation such as addition or
- * subtraction will only be performed on the non-zero values in a `SparseMatrix`. */
+ /**
+ * Map the values of this matrix using a function. Generates a new matrix. Performs the
+ * function on only the backing array. For example, an operation such as addition or
+ * subtraction will only be performed on the non-zero values in a `SparseMatrix`.
+ */
private[spark] def map(f: Double => Double): Matrix
- /** Update all the values of this matrix using the function f. Performed in-place on the
- * backing array. For example, an operation such as addition or subtraction will only be
- * performed on the non-zero values in a `SparseMatrix`. */
+ /**
+ * Update all the values of this matrix using the function f. Performed in-place on the
+ * backing array. For example, an operation such as addition or subtraction will only be
+ * performed on the non-zero values in a `SparseMatrix`.
+ */
private[mllib] def update(f: Double => Double): Matrix
/**
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionWithSGD.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionWithSGD.scala
index e8f4422fd4..84764963b5 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionWithSGD.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionWithSGD.scala
@@ -81,8 +81,8 @@ class StreamingLinearRegressionWithSGD private[mllib] (
}
/**
- * Set the number of iterations of gradient descent to run per update. Default: 50.
- */
+ * Set the number of iterations of gradient descent to run per update. Default: 50.
+ */
@Since("1.1.0")
def setNumIterations(numIterations: Int): this.type = {
this.algorithm.optimizer.setNumIterations(numIterations)
diff --git a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala
index 67a616dc15..c5dc6ba221 100644
--- a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala
+++ b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala
@@ -797,9 +797,11 @@ class SparkILoop(
// echo("Switched " + (if (old) "off" else "on") + " result printing.")
}
- /** Run one command submitted by the user. Two values are returned:
- * (1) whether to keep running, (2) the line to record for replay,
- * if any. */
+ /**
+ * Run one command submitted by the user. Two values are returned:
+ * (1) whether to keep running, (2) the line to record for replay,
+ * if any.
+ */
private[repl] def command(line: String): Result = {
if (line startsWith ":") {
val cmd = line.tail takeWhile (x => !x.isWhitespace)
@@ -841,12 +843,13 @@ class SparkILoop(
}
import paste.{ ContinueString, PromptString }
- /** Interpret expressions starting with the first line.
- * Read lines until a complete compilation unit is available
- * or until a syntax error has been seen. If a full unit is
- * read, go ahead and interpret it. Return the full string
- * to be recorded for replay, if any.
- */
+ /**
+ * Interpret expressions starting with the first line.
+ * Read lines until a complete compilation unit is available
+ * or until a syntax error has been seen. If a full unit is
+ * read, go ahead and interpret it. Return the full string
+ * to be recorded for replay, if any.
+ */
private def interpretStartingWith(code: String): Option[String] = {
// signal completion non-completion input has been received
in.completion.resetVerbosity()
diff --git a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkImports.scala b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkImports.scala
index 1d0fe10d3d..f22776592c 100644
--- a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkImports.scala
+++ b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkImports.scala
@@ -118,8 +118,9 @@ private[repl] trait SparkImports {
case class ReqAndHandler(req: Request, handler: MemberHandler) { }
def reqsToUse: List[ReqAndHandler] = {
- /** Loop through a list of MemberHandlers and select which ones to keep.
- * 'wanted' is the set of names that need to be imported.
+ /**
+ * Loop through a list of MemberHandlers and select which ones to keep.
+ * 'wanted' is the set of names that need to be imported.
*/
def select(reqs: List[ReqAndHandler], wanted: Set[Name]): List[ReqAndHandler] = {
// Single symbol imports might be implicits! See bug #1752. Rather than
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala
index 1f20e26354..e0bfe3c32f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala
@@ -140,27 +140,27 @@ object Encoders {
def STRING: Encoder[java.lang.String] = ExpressionEncoder()
/**
- * An encoder for nullable decimal type.
- * @since 1.6.0
- */
+ * An encoder for nullable decimal type.
+ * @since 1.6.0
+ */
def DECIMAL: Encoder[java.math.BigDecimal] = ExpressionEncoder()
/**
- * An encoder for nullable date type.
- * @since 1.6.0
- */
+ * An encoder for nullable date type.
+ * @since 1.6.0
+ */
def DATE: Encoder[java.sql.Date] = ExpressionEncoder()
/**
- * An encoder for nullable timestamp type.
- * @since 1.6.0
- */
+ * An encoder for nullable timestamp type.
+ * @since 1.6.0
+ */
def TIMESTAMP: Encoder[java.sql.Timestamp] = ExpressionEncoder()
/**
- * An encoder for arrays of bytes.
- * @since 1.6.1
- */
+ * An encoder for arrays of bytes.
+ * @since 1.6.1
+ */
def BINARY: Encoder[Array[Byte]] = ExpressionEncoder()
/**
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 05e2b9a447..a6e317ebf0 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -733,9 +733,9 @@ class Analyzer(
}
/**
- * Add the missing attributes into projectList of Project/Window or aggregateExpressions of
- * Aggregate.
- */
+ * Add the missing attributes into projectList of Project/Window or aggregateExpressions of
+ * Aggregate.
+ */
private def addMissingAttr(plan: LogicalPlan, missingAttrs: AttributeSet): LogicalPlan = {
if (missingAttrs.isEmpty) {
return plan
@@ -767,9 +767,9 @@ class Analyzer(
}
/**
- * Resolve the expression on a specified logical plan and it's child (recursively), until
- * the expression is resolved or meet a non-unary node or Subquery.
- */
+ * Resolve the expression on a specified logical plan and it's child (recursively), until
+ * the expression is resolved or meet a non-unary node or Subquery.
+ */
@tailrec
private def resolveExpressionRecursively(expr: Expression, plan: LogicalPlan): Expression = {
val resolved = resolveExpression(expr, plan)
@@ -1398,8 +1398,8 @@ class Analyzer(
}
/**
- * Check and add order to [[AggregateWindowFunction]]s.
- */
+ * Check and add order to [[AggregateWindowFunction]]s.
+ */
object ResolveWindowOrder extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case logical: LogicalPlan => logical transformExpressions {
@@ -1489,8 +1489,8 @@ object EliminateSubqueryAliases extends Rule[LogicalPlan] {
}
/**
- * Removes [[Union]] operators from the plan if it just has one child.
- */
+ * Removes [[Union]] operators from the plan if it just has one child.
+ */
object EliminateUnions extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case Union(children) if children.size == 1 => children.head
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
index 053e612f3e..354311c5e7 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
@@ -136,9 +136,9 @@ object UnsafeProjection {
}
/**
- * Same as other create()'s but allowing enabling/disabling subexpression elimination.
- * TODO: refactor the plumbing and clean this up.
- */
+ * Same as other create()'s but allowing enabling/disabling subexpression elimination.
+ * TODO: refactor the plumbing and clean this up.
+ */
def create(
exprs: Seq[Expression],
inputSchema: Seq[Attribute],
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
index cd490dd676..b64d3eea49 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
@@ -58,10 +58,10 @@ class CodegenContext {
val references: mutable.ArrayBuffer[Any] = new mutable.ArrayBuffer[Any]()
/**
- * Add an object to `references`, create a class member to access it.
- *
- * Returns the name of class member.
- */
+ * Add an object to `references`, create a class member to access it.
+ *
+ * Returns the name of class member.
+ */
def addReferenceObj(name: String, obj: Any, className: String = null): String = {
val term = freshName(name)
val idx = references.length
@@ -72,9 +72,9 @@ class CodegenContext {
}
/**
- * Holding a list of generated columns as input of current operator, will be used by
- * BoundReference to generate code.
- */
+ * Holding a list of generated columns as input of current operator, will be used by
+ * BoundReference to generate code.
+ */
var currentVars: Seq[ExprCode] = null
/**
@@ -169,14 +169,14 @@ class CodegenContext {
final var INPUT_ROW = "i"
/**
- * The map from a variable name to it's next ID.
- */
+ * The map from a variable name to it's next ID.
+ */
private val freshNameIds = new mutable.HashMap[String, Int]
freshNameIds += INPUT_ROW -> 1
/**
- * A prefix used to generate fresh name.
- */
+ * A prefix used to generate fresh name.
+ */
var freshNamePrefix = ""
/**
@@ -234,8 +234,8 @@ class CodegenContext {
}
/**
- * Update a column in MutableRow from ExprCode.
- */
+ * Update a column in MutableRow from ExprCode.
+ */
def updateColumn(
row: String,
dataType: DataType,
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/grouping.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/grouping.scala
index 437e417266..3be761c867 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/grouping.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/grouping.scala
@@ -22,8 +22,8 @@ import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
import org.apache.spark.sql.types._
/**
- * A placeholder expression for cube/rollup, which will be replaced by analyzer
- */
+ * A placeholder expression for cube/rollup, which will be replaced by analyzer
+ */
trait GroupingSet extends Expression with CodegenFallback {
def groupByExprs: Seq[Expression]
@@ -43,9 +43,9 @@ case class Cube(groupByExprs: Seq[Expression]) extends GroupingSet {}
case class Rollup(groupByExprs: Seq[Expression]) extends GroupingSet {}
/**
- * Indicates whether a specified column expression in a GROUP BY list is aggregated or not.
- * GROUPING returns 1 for aggregated or 0 for not aggregated in the result set.
- */
+ * Indicates whether a specified column expression in a GROUP BY list is aggregated or not.
+ * GROUPING returns 1 for aggregated or 0 for not aggregated in the result set.
+ */
case class Grouping(child: Expression) extends Expression with Unevaluable {
override def references: AttributeSet = AttributeSet(VirtualColumn.groupingIdAttribute :: Nil)
override def children: Seq[Expression] = child :: Nil
@@ -54,10 +54,10 @@ case class Grouping(child: Expression) extends Expression with Unevaluable {
}
/**
- * GroupingID is a function that computes the level of grouping.
- *
- * If groupByExprs is empty, it means all grouping expressions in GroupingSets.
- */
+ * GroupingID is a function that computes the level of grouping.
+ *
+ * If groupByExprs is empty, it means all grouping expressions in GroupingSets.
+ */
case class GroupingID(groupByExprs: Seq[Expression]) extends Expression with Unevaluable {
override def references: AttributeSet = AttributeSet(VirtualColumn.groupingIdAttribute :: Nil)
override def children: Seq[Expression] = groupByExprs
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala
index e8a3e129b4..eb8dc1423a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala
@@ -467,8 +467,8 @@ object Murmur3HashFunction extends InterpretedHashFunction {
}
/**
- * Print the result of an expression to stderr (used for debugging codegen).
- */
+ * Print the result of an expression to stderr (used for debugging codegen).
+ */
case class PrintToStderr(child: Expression) extends UnaryExpression {
override def dataType: DataType = child.dataType
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index a5ab390c76..69b09bcb35 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -31,9 +31,9 @@ import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.types._
/**
- * Abstract class all optimizers should inherit of, contains the standard batches (extending
- * Optimizers can override this.
- */
+ * Abstract class all optimizers should inherit of, contains the standard batches (extending
+ * Optimizers can override this.
+ */
abstract class Optimizer extends RuleExecutor[LogicalPlan] {
def batches: Seq[Batch] = {
// Technically some of the rules in Finish Analysis are not optimizer rules and belong more
@@ -111,11 +111,11 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] {
}
/**
- * Non-abstract representation of the standard Spark optimizing strategies
- *
- * To ensure extendability, we leave the standard rules in the abstract optimizer rules, while
- * specific rules go to the subclasses
- */
+ * Non-abstract representation of the standard Spark optimizing strategies
+ *
+ * To ensure extendability, we leave the standard rules in the abstract optimizer rules, while
+ * specific rules go to the subclasses
+ */
object DefaultOptimizer extends Optimizer
/**
@@ -962,21 +962,21 @@ object PushPredicateThroughAggregate extends Rule[LogicalPlan] with PredicateHel
}
/**
- * Reorder the joins and push all the conditions into join, so that the bottom ones have at least
- * one condition.
- *
- * The order of joins will not be changed if all of them already have at least one condition.
- */
+ * Reorder the joins and push all the conditions into join, so that the bottom ones have at least
+ * one condition.
+ *
+ * The order of joins will not be changed if all of them already have at least one condition.
+ */
object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper {
/**
- * Join a list of plans together and push down the conditions into them.
- *
- * The joined plan are picked from left to right, prefer those has at least one join condition.
- *
- * @param input a list of LogicalPlans to join.
- * @param conditions a list of condition for join.
- */
+ * Join a list of plans together and push down the conditions into them.
+ *
+ * The joined plan are picked from left to right, prefer those has at least one join condition.
+ *
+ * @param input a list of LogicalPlans to join.
+ * @param conditions a list of condition for join.
+ */
@tailrec
def createOrderedJoin(input: Seq[LogicalPlan], conditions: Seq[Expression]): LogicalPlan = {
assert(input.size >= 2)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index c350f3049f..8541b1f7c6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -1430,8 +1430,8 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging {
}
/**
- * Create a [[StructType]] from a sequence of [[StructField]]s.
- */
+ * Create a [[StructType]] from a sequence of [[StructField]]s.
+ */
protected def createStructType(ctx: ColTypeListContext): StructType = {
StructType(Option(ctx).toSeq.flatMap(visitColTypeList))
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
index 28d2c445b1..6f35d87ebb 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
@@ -140,20 +140,20 @@ object ExtractEquiJoinKeys extends Logging with PredicateHelper {
}
/**
- * A pattern that collects the filter and inner joins.
- *
- * Filter
- * |
- * inner Join
- * / \ ----> (Seq(plan0, plan1, plan2), conditions)
- * Filter plan2
- * |
- * inner join
- * / \
- * plan0 plan1
- *
- * Note: This pattern currently only works for left-deep trees.
- */
+ * A pattern that collects the filter and inner joins.
+ *
+ * Filter
+ * |
+ * inner Join
+ * / \ ----> (Seq(plan0, plan1, plan2), conditions)
+ * Filter plan2
+ * |
+ * inner join
+ * / \
+ * plan0 plan1
+ *
+ * Note: This pattern currently only works for left-deep trees.
+ */
object ExtractFiltersAndInnerJoins extends PredicateHelper {
// flatten all inner joins, which are next to each other
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
index 22a4461e66..609a33e2f1 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
@@ -122,8 +122,8 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT
AttributeSet(children.flatMap(_.asInstanceOf[QueryPlan[PlanType]].output))
/**
- * The set of all attributes that are produced by this node.
- */
+ * The set of all attributes that are produced by this node.
+ */
def producedAttributes: AttributeSet = AttributeSet.empty
/**
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
index be9f1ffa22..d449088498 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala
@@ -76,9 +76,9 @@ case class OrderedDistribution(ordering: Seq[SortOrder]) extends Distribution {
}
/**
- * Represents data where tuples are broadcasted to every node. It is quite common that the
- * entire set of tuples is transformed into different data structure.
- */
+ * Represents data where tuples are broadcasted to every node. It is quite common that the
+ * entire set of tuples is transformed into different data structure.
+ */
case class BroadcastDistribution(mode: BroadcastMode) extends Distribution
/**
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerExtendableSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerExtendableSuite.scala
index 7e3da6bea7..6e5672ddc3 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerExtendableSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerExtendableSuite.scala
@@ -23,21 +23,21 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
/**
- * This is a test for SPARK-7727 if the Optimizer is kept being extendable
- */
+ * This is a test for SPARK-7727 if the Optimizer is kept being extendable
+ */
class OptimizerExtendableSuite extends SparkFunSuite {
/**
- * Dummy rule for test batches
- */
+ * Dummy rule for test batches
+ */
object DummyRule extends Rule[LogicalPlan] {
def apply(p: LogicalPlan): LogicalPlan = p
}
/**
- * This class represents a dummy extended optimizer that takes the batches of the
- * Optimizer and adds custom ones.
- */
+ * This class represents a dummy extended optimizer that takes the batches of the
+ * Optimizer and adds custom ones.
+ */
class ExtendedOptimizer extends Optimizer {
// rules set to DummyRule, would not be executed anyways
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 221782ee8f..d4290fee0a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -712,13 +712,13 @@ class SQLContext private[sql](
}
/**
- * :: Experimental ::
- * Creates a [[Dataset]] with a single [[LongType]] column named `id`, containing elements
- * in an range from `start` to `end` (exclusive) with an step value.
- *
- * @since 2.0.0
- * @group dataset
- */
+ * :: Experimental ::
+ * Creates a [[Dataset]] with a single [[LongType]] column named `id`, containing elements
+ * in an range from `start` to `end` (exclusive) with an step value.
+ *
+ * @since 2.0.0
+ * @group dataset
+ */
@Experimental
def range(start: Long, end: Long, step: Long): Dataset[java.lang.Long] = {
range(start, end, step, numPartitions = sparkContext.defaultParallelism)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index f3478a873a..124ec09efd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -109,9 +109,10 @@ private[sql] class CacheManager extends Logging {
cachedData.remove(dataIndex)
}
- /** Tries to remove the data for the given [[Dataset]] from the cache
- * if it's cached
- */
+ /**
+ * Tries to remove the data for the given [[Dataset]] from the cache
+ * if it's cached
+ */
private[sql] def tryUncacheQuery(
query: Dataset[_],
blocking: Boolean = true): Boolean = writeLock {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index b1b3d4ac81..ff19d1be1c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -84,8 +84,8 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
private[sql] def metrics: Map[String, SQLMetric[_, _]] = Map.empty
/**
- * Reset all the metrics.
- */
+ * Reset all the metrics.
+ */
private[sql] def resetMetrics(): Unit = {
metrics.valuesIterator.foreach(_.reset())
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
index 9bdf611f6e..9f539c4929 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
@@ -31,8 +31,8 @@ import org.apache.spark.sql.execution.metric.{LongSQLMetricValue, SQLMetrics}
import org.apache.spark.sql.internal.SQLConf
/**
- * An interface for those physical operators that support codegen.
- */
+ * An interface for those physical operators that support codegen.
+ */
trait CodegenSupport extends SparkPlan {
/** Prefix used in the current operator's variable names. */
@@ -46,10 +46,10 @@ trait CodegenSupport extends SparkPlan {
}
/**
- * Creates a metric using the specified name.
- *
- * @return name of the variable representing the metric
- */
+ * Creates a metric using the specified name.
+ *
+ * @return name of the variable representing the metric
+ */
def metricTerm(ctx: CodegenContext, name: String): String = {
val metric = ctx.addReferenceObj(name, longMetric(name))
val value = ctx.freshName("metricValue")
@@ -59,25 +59,25 @@ trait CodegenSupport extends SparkPlan {
}
/**
- * Whether this SparkPlan support whole stage codegen or not.
- */
+ * Whether this SparkPlan support whole stage codegen or not.
+ */
def supportCodegen: Boolean = true
/**
- * Which SparkPlan is calling produce() of this one. It's itself for the first SparkPlan.
- */
+ * Which SparkPlan is calling produce() of this one. It's itself for the first SparkPlan.
+ */
protected var parent: CodegenSupport = null
/**
- * Returns all the RDDs of InternalRow which generates the input rows.
- *
- * Note: right now we support up to two RDDs.
- */
+ * Returns all the RDDs of InternalRow which generates the input rows.
+ *
+ * Note: right now we support up to two RDDs.
+ */
def upstreams(): Seq[RDD[InternalRow]]
/**
- * Returns Java source code to process the rows from upstream.
- */
+ * Returns Java source code to process the rows from upstream.
+ */
final def produce(ctx: CodegenContext, parent: CodegenSupport): String = {
this.parent = parent
ctx.freshNamePrefix = variablePrefix
@@ -89,28 +89,28 @@ trait CodegenSupport extends SparkPlan {
}
/**
- * Generate the Java source code to process, should be overridden by subclass to support codegen.
- *
- * doProduce() usually generate the framework, for example, aggregation could generate this:
- *
- * if (!initialized) {
- * # create a hash map, then build the aggregation hash map
- * # call child.produce()
- * initialized = true;
- * }
- * while (hashmap.hasNext()) {
- * row = hashmap.next();
- * # build the aggregation results
- * # create variables for results
- * # call consume(), which will call parent.doConsume()
+ * Generate the Java source code to process, should be overridden by subclass to support codegen.
+ *
+ * doProduce() usually generate the framework, for example, aggregation could generate this:
+ *
+ * if (!initialized) {
+ * # create a hash map, then build the aggregation hash map
+ * # call child.produce()
+ * initialized = true;
+ * }
+ * while (hashmap.hasNext()) {
+ * row = hashmap.next();
+ * # build the aggregation results
+ * # create variables for results
+ * # call consume(), which will call parent.doConsume()
* if (shouldStop()) return;
- * }
- */
+ * }
+ */
protected def doProduce(ctx: CodegenContext): String
/**
- * Consume the generated columns or row from current SparkPlan, call it's parent's doConsume().
- */
+ * Consume the generated columns or row from current SparkPlan, call it's parent's doConsume().
+ */
final def consume(ctx: CodegenContext, outputVars: Seq[ExprCode], row: String = null): String = {
val inputVars =
if (row != null) {
@@ -158,9 +158,9 @@ trait CodegenSupport extends SparkPlan {
}
/**
- * Returns source code to evaluate all the variables, and clear the code of them, to prevent
- * them to be evaluated twice.
- */
+ * Returns source code to evaluate all the variables, and clear the code of them, to prevent
+ * them to be evaluated twice.
+ */
protected def evaluateVariables(variables: Seq[ExprCode]): String = {
val evaluate = variables.filter(_.code != "").map(_.code.trim).mkString("\n")
variables.foreach(_.code = "")
@@ -168,9 +168,9 @@ trait CodegenSupport extends SparkPlan {
}
/**
- * Returns source code to evaluate the variables for required attributes, and clear the code
- * of evaluated variables, to prevent them to be evaluated twice..
- */
+ * Returns source code to evaluate the variables for required attributes, and clear the code
+ * of evaluated variables, to prevent them to be evaluated twice..
+ */
protected def evaluateRequiredVariables(
attributes: Seq[Attribute],
variables: Seq[ExprCode],
@@ -194,18 +194,18 @@ trait CodegenSupport extends SparkPlan {
def usedInputs: AttributeSet = references
/**
- * Generate the Java source code to process the rows from child SparkPlan.
- *
- * This should be override by subclass to support codegen.
- *
- * For example, Filter will generate the code like this:
- *
- * # code to evaluate the predicate expression, result is isNull1 and value2
- * if (isNull1 || !value2) continue;
- * # call consume(), which will call parent.doConsume()
- *
- * Note: A plan can either consume the rows as UnsafeRow (row), or a list of variables (input).
- */
+ * Generate the Java source code to process the rows from child SparkPlan.
+ *
+ * This should be override by subclass to support codegen.
+ *
+ * For example, Filter will generate the code like this:
+ *
+ * # code to evaluate the predicate expression, result is isNull1 and value2
+ * if (isNull1 || !value2) continue;
+ * # call consume(), which will call parent.doConsume()
+ *
+ * Note: A plan can either consume the rows as UnsafeRow (row), or a list of variables (input).
+ */
def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = {
throw new UnsupportedOperationException
}
@@ -213,11 +213,11 @@ trait CodegenSupport extends SparkPlan {
/**
- * InputAdapter is used to hide a SparkPlan from a subtree that support codegen.
- *
- * This is the leaf node of a tree with WholeStageCodegen, is used to generate code that consumes
- * an RDD iterator of InternalRow.
- */
+ * InputAdapter is used to hide a SparkPlan from a subtree that support codegen.
+ *
+ * This is the leaf node of a tree with WholeStageCodegen, is used to generate code that consumes
+ * an RDD iterator of InternalRow.
+ */
case class InputAdapter(child: SparkPlan) extends UnaryNode with CodegenSupport {
override def output: Seq[Attribute] = child.output
@@ -260,33 +260,33 @@ object WholeStageCodegen {
}
/**
- * WholeStageCodegen compile a subtree of plans that support codegen together into single Java
- * function.
- *
- * Here is the call graph of to generate Java source (plan A support codegen, but plan B does not):
- *
- * WholeStageCodegen Plan A FakeInput Plan B
- * =========================================================================
- *
- * -> execute()
- * |
- * doExecute() ---------> upstreams() -------> upstreams() ------> execute()
- * |
- * +-----------------> produce()
- * |
- * doProduce() -------> produce()
- * |
- * doProduce()
- * |
- * doConsume() <--------- consume()
- * |
- * doConsume() <-------- consume()
- *
- * SparkPlan A should override doProduce() and doConsume().
- *
- * doCodeGen() will create a CodeGenContext, which will hold a list of variables for input,
- * used to generated code for BoundReference.
- */
+ * WholeStageCodegen compile a subtree of plans that support codegen together into single Java
+ * function.
+ *
+ * Here is the call graph of to generate Java source (plan A support codegen, but plan B does not):
+ *
+ * WholeStageCodegen Plan A FakeInput Plan B
+ * =========================================================================
+ *
+ * -> execute()
+ * |
+ * doExecute() ---------> upstreams() -------> upstreams() ------> execute()
+ * |
+ * +-----------------> produce()
+ * |
+ * doProduce() -------> produce()
+ * |
+ * doProduce()
+ * |
+ * doConsume() <--------- consume()
+ * |
+ * doConsume() <-------- consume()
+ *
+ * SparkPlan A should override doProduce() and doConsume().
+ *
+ * doCodeGen() will create a CodeGenContext, which will hold a list of variables for input,
+ * used to generated code for BoundReference.
+ */
case class WholeStageCodegen(child: SparkPlan) extends UnaryNode with CodegenSupport {
override def output: Seq[Attribute] = child.output
@@ -422,8 +422,8 @@ case class WholeStageCodegen(child: SparkPlan) extends UnaryNode with CodegenSup
/**
- * Find the chained plans that support codegen, collapse them together as WholeStageCodegen.
- */
+ * Find the chained plans that support codegen, collapse them together as WholeStageCodegen.
+ */
case class CollapseCodegenStages(conf: SQLConf) extends Rule[SparkPlan] {
private def supportCodegen(e: Expression): Boolean = e match {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
index 7d0567842c..806089196c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
@@ -444,8 +444,8 @@ private[execution] final case class RangeBoundOrdering(
}
/**
- * The interface of row buffer for a partition
- */
+ * The interface of row buffer for a partition
+ */
private[execution] abstract class RowBuffer {
/** Number of rows. */
@@ -462,8 +462,8 @@ private[execution] abstract class RowBuffer {
}
/**
- * A row buffer based on ArrayBuffer (the number of rows is limited)
- */
+ * A row buffer based on ArrayBuffer (the number of rows is limited)
+ */
private[execution] class ArrayRowBuffer(buffer: ArrayBuffer[UnsafeRow]) extends RowBuffer {
private[this] var cursor: Int = -1
@@ -493,8 +493,8 @@ private[execution] class ArrayRowBuffer(buffer: ArrayBuffer[UnsafeRow]) extends
}
/**
- * An external buffer of rows based on UnsafeExternalSorter
- */
+ * An external buffer of rows based on UnsafeExternalSorter
+ */
private[execution] class ExternalRowBuffer(sorter: UnsafeExternalSorter, numFields: Int)
extends RowBuffer {
@@ -654,12 +654,16 @@ private[execution] final class SlidingWindowFunctionFrame(
/** The rows within current sliding window. */
private[this] val buffer = new util.ArrayDeque[InternalRow]()
- /** Index of the first input row with a value greater than the upper bound of the current
- * output row. */
+ /**
+ * Index of the first input row with a value greater than the upper bound of the current
+ * output row.
+ */
private[this] var inputHighIndex = 0
- /** Index of the first input row with a value equal to or greater than the lower bound of the
- * current output row. */
+ /**
+ * Index of the first input row with a value equal to or greater than the lower bound of the
+ * current output row.
+ */
private[this] var inputLowIndex = 0
/** Prepare the frame for calculating a new partition. Reset all variables. */
@@ -763,8 +767,10 @@ private[execution] final class UnboundedPrecedingWindowFunctionFrame(
/** The next row from `input`. */
private[this] var nextRow: InternalRow = null
- /** Index of the first input row with a value greater than the upper bound of the current
- * output row. */
+ /**
+ * Index of the first input row with a value greater than the upper bound of the current
+ * output row.
+ */
private[this] var inputIndex = 0
/** Prepare the frame for calculating a new partition. */
@@ -819,8 +825,10 @@ private[execution] final class UnboundedFollowingWindowFunctionFrame(
/** Rows of the partition currently being processed. */
private[this] var input: RowBuffer = null
- /** Index of the first input row with a value equal to or greater than the lower bound of the
- * current output row. */
+ /**
+ * Index of the first input row with a value equal to or greater than the lower bound of the
+ * current output row.
+ */
private[this] var inputIndex = 0
/** Prepare the frame for calculating a new partition. */
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala
index 15627a7004..042c731901 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggregationIterator.scala
@@ -47,17 +47,17 @@ abstract class AggregationIterator(
///////////////////////////////////////////////////////////////////////////
/**
- * The following combinations of AggregationMode are supported:
- * - Partial
- * - PartialMerge (for single distinct)
- * - Partial and PartialMerge (for single distinct)
- * - Final
- * - Complete (for SortBasedAggregate with functions that does not support Partial)
- * - Final and Complete (currently not used)
- *
- * TODO: AggregateMode should have only two modes: Update and Merge, AggregateExpression
- * could have a flag to tell it's final or not.
- */
+ * The following combinations of AggregationMode are supported:
+ * - Partial
+ * - PartialMerge (for single distinct)
+ * - Partial and PartialMerge (for single distinct)
+ * - Final
+ * - Complete (for SortBasedAggregate with functions that does not support Partial)
+ * - Final and Complete (currently not used)
+ *
+ * TODO: AggregateMode should have only two modes: Update and Merge, AggregateExpression
+ * could have a flag to tell it's final or not.
+ */
{
val modes = aggregateExpressions.map(_.mode).distinct.toSet
require(modes.size <= 2,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala
index 8f974980bb..de1491d357 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala
@@ -46,9 +46,9 @@ class SortBasedAggregationIterator(
newMutableProjection) {
/**
- * Creates a new aggregation buffer and initializes buffer values
- * for all aggregate functions.
- */
+ * Creates a new aggregation buffer and initializes buffer values
+ * for all aggregate functions.
+ */
private def newBuffer: MutableRow = {
val bufferSchema = aggregateFunctions.flatMap(_.aggBufferAttributes)
val bufferRowSize: Int = bufferSchema.length
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
index 7c215d1b96..60027edc7c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
@@ -266,8 +266,8 @@ case class TungstenAggregate(
private var sorterTerm: String = _
/**
- * This is called by generated Java class, should be public.
- */
+ * This is called by generated Java class, should be public.
+ */
def createHashMap(): UnsafeFixedWidthAggregationMap = {
// create initialized aggregate buffer
val initExpr = declFunctions.flatMap(f => f.initialValues)
@@ -286,15 +286,15 @@ case class TungstenAggregate(
}
/**
- * This is called by generated Java class, should be public.
- */
+ * This is called by generated Java class, should be public.
+ */
def createUnsafeJoiner(): UnsafeRowJoiner = {
GenerateUnsafeRowJoiner.create(groupingKeySchema, bufferSchema)
}
/**
- * Called by generated Java class to finish the aggregate and return a KVIterator.
- */
+ * Called by generated Java class to finish the aggregate and return a KVIterator.
+ */
def finishAggregate(
hashMap: UnsafeFixedWidthAggregationMap,
sorter: UnsafeKVExternalSorter): KVIterator[UnsafeRow, UnsafeRow] = {
@@ -372,8 +372,8 @@ case class TungstenAggregate(
}
/**
- * Generate the code for output.
- */
+ * Generate the code for output.
+ */
private def generateResultCode(
ctx: CodegenContext,
keyTerm: String,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala
index f3514cd14c..159fdc99dd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala
@@ -168,10 +168,10 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag](
private[this] var reader: RecordReader[Void, V] = null
/**
- * If the format is ParquetInputFormat, try to create the optimized RecordReader. If this
- * fails (for example, unsupported schema), try with the normal reader.
- * TODO: plumb this through a different way?
- */
+ * If the format is ParquetInputFormat, try to create the optimized RecordReader. If this
+ * fails (for example, unsupported schema), try with the normal reader.
+ * TODO: plumb this through a different way?
+ */
if (enableVectorizedParquetReader &&
format.getClass.getName == "org.apache.parquet.hadoop.ParquetInputFormat") {
val parquetReader: VectorizedParquetRecordReader = new VectorizedParquetRecordReader()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala
index 797f740dc5..ea843a1013 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVInferSchema.scala
@@ -33,11 +33,11 @@ import org.apache.spark.unsafe.types.UTF8String
private[csv] object CSVInferSchema {
/**
- * Similar to the JSON schema inference
- * 1. Infer type of each row
- * 2. Merge row types to find common type
- * 3. Replace any null types with string type
- */
+ * Similar to the JSON schema inference
+ * 1. Infer type of each row
+ * 2. Merge row types to find common type
+ * 3. Replace any null types with string type
+ */
def infer(
tokenRdd: RDD[Array[String]],
header: Array[String],
@@ -75,9 +75,9 @@ private[csv] object CSVInferSchema {
}
/**
- * Infer type of string field. Given known type Double, and a string "1", there is no
- * point checking if it is an Int, as the final type must be Double or higher.
- */
+ * Infer type of string field. Given known type Double, and a string "1", there is no
+ * point checking if it is an Int, as the final type must be Double or higher.
+ */
def inferField(typeSoFar: DataType, field: String, nullValue: String = ""): DataType = {
if (field == null || field.isEmpty || field == nullValue) {
typeSoFar
@@ -142,9 +142,9 @@ private[csv] object CSVInferSchema {
private val numericPrecedence: IndexedSeq[DataType] = HiveTypeCoercion.numericPrecedence
/**
- * Copied from internal Spark api
- * [[org.apache.spark.sql.catalyst.analysis.HiveTypeCoercion]]
- */
+ * Copied from internal Spark api
+ * [[org.apache.spark.sql.catalyst.analysis.HiveTypeCoercion]]
+ */
val findTightestCommonType: (DataType, DataType) => Option[DataType] = {
case (t1, t2) if t1 == t2 => Some(t1)
case (NullType, t1) => Some(t1)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala
index c0d6f6fbf7..34fcbdf871 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/DefaultSource.scala
@@ -38,8 +38,8 @@ import org.apache.spark.util.SerializableConfiguration
import org.apache.spark.util.collection.BitSet
/**
- * Provides access to CSV data from pure SQL statements.
- */
+ * Provides access to CSV data from pure SQL statements.
+ */
class DefaultSource extends FileFormat with DataSourceRegister {
override def shortName(): String = "csv"
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
index 877e159fbd..2e88d588be 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
@@ -51,11 +51,11 @@ case class DescribeCommand(
}
/**
- * Used to represent the operation of create table using a data source.
+ * Used to represent the operation of create table using a data source.
*
- * @param allowExisting If it is true, we will do nothing when the table already exists.
- * If it is false, an exception will be thrown
- */
+ * @param allowExisting If it is true, we will do nothing when the table already exists.
+ * If it is false, an exception will be thrown
+ */
case class CreateTableUsing(
tableIdent: TableIdentifier,
userSpecifiedSchema: Option[StructType],
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
index 0ed1ed41b0..41e566c27b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoin.scala
@@ -122,8 +122,8 @@ case class BroadcastHashJoin(
}
/**
- * Returns a tuple of Broadcast of HashedRelation and the variable name for it.
- */
+ * Returns a tuple of Broadcast of HashedRelation and the variable name for it.
+ */
private def prepareBroadcast(ctx: CodegenContext): (Broadcast[HashedRelation], String) = {
// create a name for HashedRelation
val broadcastRelation = buildPlan.executeBroadcast[HashedRelation]()
@@ -139,9 +139,9 @@ case class BroadcastHashJoin(
}
/**
- * Returns the code for generating join key for stream side, and expression of whether the key
- * has any null in it or not.
- */
+ * Returns the code for generating join key for stream side, and expression of whether the key
+ * has any null in it or not.
+ */
private def genStreamSideJoinKey(
ctx: CodegenContext,
input: Seq[ExprCode]): (ExprCode, String) = {
@@ -160,8 +160,8 @@ case class BroadcastHashJoin(
}
/**
- * Generates the code for variable of build side.
- */
+ * Generates the code for variable of build side.
+ */
private def genBuildSideVars(ctx: CodegenContext, matched: String): Seq[ExprCode] = {
ctx.currentVars = null
ctx.INPUT_ROW = matched
@@ -188,8 +188,8 @@ case class BroadcastHashJoin(
}
/**
- * Generates the code for Inner join.
- */
+ * Generates the code for Inner join.
+ */
private def codegenInner(ctx: CodegenContext, input: Seq[ExprCode]): String = {
val (broadcastRelation, relationTerm) = prepareBroadcast(ctx)
val (keyEv, anyNull) = genStreamSideJoinKey(ctx, input)
@@ -254,8 +254,8 @@ case class BroadcastHashJoin(
/**
- * Generates the code for left or right outer join.
- */
+ * Generates the code for left or right outer join.
+ */
private def codegenOuter(ctx: CodegenContext, input: Seq[ExprCode]): String = {
val (broadcastRelation, relationTerm) = prepareBroadcast(ctx)
val (keyEv, anyNull) = genStreamSideJoinKey(ctx, input)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
index fb65b50da8..edb4c5a16f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProduct.scala
@@ -28,10 +28,10 @@ import org.apache.spark.util.CompletionIterator
import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter
/**
- * An optimized CartesianRDD for UnsafeRow, which will cache the rows from second child RDD,
- * will be much faster than building the right partition for every row in left RDD, it also
- * materialize the right RDD (in case of the right RDD is nondeterministic).
- */
+ * An optimized CartesianRDD for UnsafeRow, which will cache the rows from second child RDD,
+ * will be much faster than building the right partition for every row in left RDD, it also
+ * materialize the right RDD (in case of the right RDD is nondeterministic).
+ */
private[spark]
class UnsafeCartesianRDD(left : RDD[UnsafeRow], right : RDD[UnsafeRow], numFieldsOfRight: Int)
extends CartesianRDD[UnsafeRow, UnsafeRow](left.sparkContext, left, right) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
index 5f42d07273..c298b7dee0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
@@ -64,10 +64,10 @@ trait HashJoin {
}
/**
- * Try to rewrite the key as LongType so we can use getLong(), if they key can fit with a long.
- *
- * If not, returns the original expressions.
- */
+ * Try to rewrite the key as LongType so we can use getLong(), if they key can fit with a long.
+ *
+ * If not, returns the original expressions.
+ */
def rewriteKeyExpr(keys: Seq[Expression]): Seq[Expression] = {
var keyExpr: Expression = null
var width = 0
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
index dc4793e85a..91c470d187 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
@@ -38,20 +38,20 @@ import org.apache.spark.util.collection.CompactBuffer
*/
private[execution] sealed trait HashedRelation {
/**
- * Returns matched rows.
- */
+ * Returns matched rows.
+ */
def get(key: InternalRow): Seq[InternalRow]
/**
- * Returns matched rows for a key that has only one column with LongType.
- */
+ * Returns matched rows for a key that has only one column with LongType.
+ */
def get(key: Long): Seq[InternalRow] = {
throw new UnsupportedOperationException
}
/**
- * Returns the size of used memory.
- */
+ * Returns the size of used memory.
+ */
def getMemorySize: Long = 1L // to make the test happy
/**
@@ -77,20 +77,20 @@ private[execution] sealed trait HashedRelation {
}
/**
- * Interface for a hashed relation that have only one row per key.
- *
- * We should call getValue() for better performance.
- */
+ * Interface for a hashed relation that have only one row per key.
+ *
+ * We should call getValue() for better performance.
+ */
private[execution] trait UniqueHashedRelation extends HashedRelation {
/**
- * Returns the matched single row.
- */
+ * Returns the matched single row.
+ */
def getValue(key: InternalRow): InternalRow
/**
- * Returns the matched single row with key that have only one column of LongType.
- */
+ * Returns the matched single row with key that have only one column of LongType.
+ */
def getValue(key: Long): InternalRow = {
throw new UnsupportedOperationException
}
@@ -345,8 +345,8 @@ private[joins] object UnsafeHashedRelation {
}
/**
- * An interface for a hashed relation that the key is a Long.
- */
+ * An interface for a hashed relation that the key is a Long.
+ */
private[joins] trait LongHashedRelation extends HashedRelation {
override def get(key: InternalRow): Seq[InternalRow] = {
get(key.getLong(0))
@@ -396,26 +396,26 @@ private[joins] final class UniqueLongHashedRelation(
}
/**
- * A relation that pack all the rows into a byte array, together with offsets and sizes.
- *
- * All the bytes of UnsafeRow are packed together as `bytes`:
- *
- * [ Row0 ][ Row1 ][] ... [ RowN ]
- *
- * With keys:
- *
- * start start+1 ... start+N
- *
- * `offsets` are offsets of UnsafeRows in the `bytes`
- * `sizes` are the numbers of bytes of UnsafeRows, 0 means no row for this key.
- *
- * For example, two UnsafeRows (24 bytes and 32 bytes), with keys as 3 and 5 will stored as:
- *
- * start = 3
- * offsets = [0, 0, 24]
- * sizes = [24, 0, 32]
- * bytes = [0 - 24][][24 - 56]
- */
+ * A relation that pack all the rows into a byte array, together with offsets and sizes.
+ *
+ * All the bytes of UnsafeRow are packed together as `bytes`:
+ *
+ * [ Row0 ][ Row1 ][] ... [ RowN ]
+ *
+ * With keys:
+ *
+ * start start+1 ... start+N
+ *
+ * `offsets` are offsets of UnsafeRows in the `bytes`
+ * `sizes` are the numbers of bytes of UnsafeRows, 0 means no row for this key.
+ *
+ * For example, two UnsafeRows (24 bytes and 32 bytes), with keys as 3 and 5 will stored as:
+ *
+ * start = 3
+ * offsets = [0, 0, 24]
+ * sizes = [24, 0, 32]
+ * bytes = [0 - 24][][24 - 56]
+ */
private[joins] final class LongArrayRelation(
private var numFields: Int,
private var start: Long,
@@ -483,8 +483,8 @@ private[joins] final class LongArrayRelation(
}
/**
- * Create hashed relation with key that is long.
- */
+ * Create hashed relation with key that is long.
+ */
private[joins] object LongHashedRelation {
val DENSE_FACTOR = 0.2
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
index 60bd8ea39a..0e7b2f2f31 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoin.scala
@@ -256,9 +256,9 @@ case class SortMergeJoin(
}
/**
- * Generate a function to scan both left and right to find a match, returns the term for
- * matched one row from left side and buffered rows from right side.
- */
+ * Generate a function to scan both left and right to find a match, returns the term for
+ * matched one row from left side and buffered rows from right side.
+ */
private def genScanner(ctx: CodegenContext): (String, String) = {
// Create class member for next row from both sides.
val leftRow = ctx.freshName("leftRow")
@@ -341,12 +341,12 @@ case class SortMergeJoin(
}
/**
- * Creates variables for left part of result row.
- *
- * In order to defer the access after condition and also only access once in the loop,
- * the variables should be declared separately from accessing the columns, we can't use the
- * codegen of BoundReference here.
- */
+ * Creates variables for left part of result row.
+ *
+ * In order to defer the access after condition and also only access once in the loop,
+ * the variables should be declared separately from accessing the columns, we can't use the
+ * codegen of BoundReference here.
+ */
private def createLeftVars(ctx: CodegenContext, leftRow: String): Seq[ExprCode] = {
ctx.INPUT_ROW = leftRow
left.output.zipWithIndex.map { case (a, i) =>
@@ -370,9 +370,9 @@ case class SortMergeJoin(
}
/**
- * Creates the variables for right part of result row, using BoundReference, since the right
- * part are accessed inside the loop.
- */
+ * Creates the variables for right part of result row, using BoundReference, since the right
+ * part are accessed inside the loop.
+ */
private def createRightVar(ctx: CodegenContext, rightRow: String): Seq[ExprCode] = {
ctx.INPUT_ROW = rightRow
right.output.zipWithIndex.map { case (a, i) =>
@@ -381,12 +381,12 @@ case class SortMergeJoin(
}
/**
- * Splits variables based on whether it's used by condition or not, returns the code to create
- * these variables before the condition and after the condition.
- *
- * Only a few columns are used by condition, then we can skip the accessing of those columns
- * that are not used by condition also filtered out by condition.
- */
+ * Splits variables based on whether it's used by condition or not, returns the code to create
+ * these variables before the condition and after the condition.
+ *
+ * Only a few columns are used by condition, then we can skip the accessing of those columns
+ * that are not used by condition also filtered out by condition.
+ */
private def splitVarsByCondition(
attributes: Seq[Attribute],
variables: Seq[ExprCode]): (String, String) = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
index 998eb82de1..8ece3c971a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
@@ -468,10 +468,10 @@ private[state] class HDFSBackedStateStoreProvider(
}
/**
- * Clean up old snapshots and delta files that are not needed any more. It ensures that last
- * few versions of the store can be recovered from the files, so re-executed RDD operations
- * can re-apply updates on the past versions of the store.
- */
+ * Clean up old snapshots and delta files that are not needed any more. It ensures that last
+ * few versions of the store can be recovered from the files, so re-executed RDD operations
+ * can re-apply updates on the past versions of the store.
+ */
private[state] def cleanup(): Unit = {
try {
val files = fetchFiles()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
index 24a01f5be1..012b125d6b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
@@ -45,8 +45,8 @@ private[ui] case class SparkPlanGraph(
}
/**
- * All the SparkPlanGraphNodes, including those inside of WholeStageCodegen.
- */
+ * All the SparkPlanGraphNodes, including those inside of WholeStageCodegen.
+ */
val allNodes: Seq[SparkPlanGraphNode] = {
nodes.flatMap {
case cluster: SparkPlanGraphCluster => cluster.nodes :+ cluster
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
index baf947d037..da58ba2add 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala
@@ -332,95 +332,94 @@ object functions {
}
/**
- * Aggregate function: returns the first value in a group.
- *
- * The function by default returns the first values it sees. It will return the first non-null
- * value it sees when ignoreNulls is set to true. If all values are null, then null is returned.
- *
- * @group agg_funcs
- * @since 2.0.0
- */
+ * Aggregate function: returns the first value in a group.
+ *
+ * The function by default returns the first values it sees. It will return the first non-null
+ * value it sees when ignoreNulls is set to true. If all values are null, then null is returned.
+ *
+ * @group agg_funcs
+ * @since 2.0.0
+ */
def first(e: Column, ignoreNulls: Boolean): Column = withAggregateFunction {
new First(e.expr, Literal(ignoreNulls))
}
/**
- * Aggregate function: returns the first value of a column in a group.
- *
- * The function by default returns the first values it sees. It will return the first non-null
- * value it sees when ignoreNulls is set to true. If all values are null, then null is returned.
- *
- * @group agg_funcs
- * @since 2.0.0
- */
+ * Aggregate function: returns the first value of a column in a group.
+ *
+ * The function by default returns the first values it sees. It will return the first non-null
+ * value it sees when ignoreNulls is set to true. If all values are null, then null is returned.
+ *
+ * @group agg_funcs
+ * @since 2.0.0
+ */
def first(columnName: String, ignoreNulls: Boolean): Column = {
first(Column(columnName), ignoreNulls)
}
/**
- * Aggregate function: returns the first value in a group.
- *
- * The function by default returns the first values it sees. It will return the first non-null
- * value it sees when ignoreNulls is set to true. If all values are null, then null is returned.
- *
- * @group agg_funcs
- * @since 1.3.0
- */
+ * Aggregate function: returns the first value in a group.
+ *
+ * The function by default returns the first values it sees. It will return the first non-null
+ * value it sees when ignoreNulls is set to true. If all values are null, then null is returned.
+ *
+ * @group agg_funcs
+ * @since 1.3.0
+ */
def first(e: Column): Column = first(e, ignoreNulls = false)
/**
- * Aggregate function: returns the first value of a column in a group.
- *
- * The function by default returns the first values it sees. It will return the first non-null
- * value it sees when ignoreNulls is set to true. If all values are null, then null is returned.
- *
- * @group agg_funcs
- * @since 1.3.0
- */
+ * Aggregate function: returns the first value of a column in a group.
+ *
+ * The function by default returns the first values it sees. It will return the first non-null
+ * value it sees when ignoreNulls is set to true. If all values are null, then null is returned.
+ *
+ * @group agg_funcs
+ * @since 1.3.0
+ */
def first(columnName: String): Column = first(Column(columnName))
-
/**
- * Aggregate function: indicates whether a specified column in a GROUP BY list is aggregated
- * or not, returns 1 for aggregated or 0 for not aggregated in the result set.
- *
- * @group agg_funcs
- * @since 2.0.0
- */
+ * Aggregate function: indicates whether a specified column in a GROUP BY list is aggregated
+ * or not, returns 1 for aggregated or 0 for not aggregated in the result set.
+ *
+ * @group agg_funcs
+ * @since 2.0.0
+ */
def grouping(e: Column): Column = Column(Grouping(e.expr))
/**
- * Aggregate function: indicates whether a specified column in a GROUP BY list is aggregated
- * or not, returns 1 for aggregated or 0 for not aggregated in the result set.
- *
- * @group agg_funcs
- * @since 2.0.0
- */
+ * Aggregate function: indicates whether a specified column in a GROUP BY list is aggregated
+ * or not, returns 1 for aggregated or 0 for not aggregated in the result set.
+ *
+ * @group agg_funcs
+ * @since 2.0.0
+ */
def grouping(columnName: String): Column = grouping(Column(columnName))
/**
- * Aggregate function: returns the level of grouping, equals to
- *
- * (grouping(c1) << (n-1)) + (grouping(c2) << (n-2)) + ... + grouping(cn)
- *
- * Note: the list of columns should match with grouping columns exactly, or empty (means all the
- * grouping columns).
- *
- * @group agg_funcs
- * @since 2.0.0
- */
+ * Aggregate function: returns the level of grouping, equals to
+ *
+ * (grouping(c1) << (n-1)) + (grouping(c2) << (n-2)) + ... + grouping(cn)
+ *
+ * Note: the list of columns should match with grouping columns exactly, or empty (means all the
+ * grouping columns).
+ *
+ * @group agg_funcs
+ * @since 2.0.0
+ */
def grouping_id(cols: Column*): Column = Column(GroupingID(cols.map(_.expr)))
/**
- * Aggregate function: returns the level of grouping, equals to
- *
- * (grouping(c1) << (n-1)) + (grouping(c2) << (n-2)) + ... + grouping(cn)
- *
- * Note: the list of columns should match with grouping columns exactly.
- *
- * @group agg_funcs
- * @since 2.0.0
- */
+ * Aggregate function: returns the level of grouping, equals to
+ *
+ * (grouping(c1) << (n-1)) + (grouping(c2) << (n-2)) + ... + grouping(cn)
+ *
+ * Note: the list of columns should match with grouping columns exactly.
+ *
+ * @group agg_funcs
+ * @since 2.0.0
+ */
def grouping_id(colName: String, colNames: String*): Column = {
grouping_id((Seq(colName) ++ colNames).map(n => Column(n)) : _*)
}
@@ -442,51 +441,51 @@ object functions {
def kurtosis(columnName: String): Column = kurtosis(Column(columnName))
/**
- * Aggregate function: returns the last value in a group.
- *
- * The function by default returns the last values it sees. It will return the last non-null
- * value it sees when ignoreNulls is set to true. If all values are null, then null is returned.
- *
- * @group agg_funcs
- * @since 2.0.0
- */
+ * Aggregate function: returns the last value in a group.
+ *
+ * The function by default returns the last values it sees. It will return the last non-null
+ * value it sees when ignoreNulls is set to true. If all values are null, then null is returned.
+ *
+ * @group agg_funcs
+ * @since 2.0.0
+ */
def last(e: Column, ignoreNulls: Boolean): Column = withAggregateFunction {
new Last(e.expr, Literal(ignoreNulls))
}
/**
- * Aggregate function: returns the last value of the column in a group.
- *
- * The function by default returns the last values it sees. It will return the last non-null
- * value it sees when ignoreNulls is set to true. If all values are null, then null is returned.
- *
- * @group agg_funcs
- * @since 2.0.0
- */
+ * Aggregate function: returns the last value of the column in a group.
+ *
+ * The function by default returns the last values it sees. It will return the last non-null
+ * value it sees when ignoreNulls is set to true. If all values are null, then null is returned.
+ *
+ * @group agg_funcs
+ * @since 2.0.0
+ */
def last(columnName: String, ignoreNulls: Boolean): Column = {
last(Column(columnName), ignoreNulls)
}
/**
- * Aggregate function: returns the last value in a group.
- *
- * The function by default returns the last values it sees. It will return the last non-null
- * value it sees when ignoreNulls is set to true. If all values are null, then null is returned.
- *
- * @group agg_funcs
- * @since 1.3.0
- */
+ * Aggregate function: returns the last value in a group.
+ *
+ * The function by default returns the last values it sees. It will return the last non-null
+ * value it sees when ignoreNulls is set to true. If all values are null, then null is returned.
+ *
+ * @group agg_funcs
+ * @since 1.3.0
+ */
def last(e: Column): Column = last(e, ignoreNulls = false)
/**
- * Aggregate function: returns the last value of the column in a group.
- *
- * The function by default returns the last values it sees. It will return the last non-null
- * value it sees when ignoreNulls is set to true. If all values are null, then null is returned.
- *
- * @group agg_funcs
- * @since 1.3.0
- */
+ * Aggregate function: returns the last value of the column in a group.
+ *
+ * The function by default returns the last values it sees. It will return the last non-null
+ * value it sees when ignoreNulls is set to true. If all values are null, then null is returned.
+ *
+ * @group agg_funcs
+ * @since 1.3.0
+ */
def last(columnName: String): Column = last(Column(columnName), ignoreNulls = false)
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index e8834d052c..14e14710f6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -152,19 +152,19 @@ trait StreamSinkProvider {
@DeveloperApi
trait CreatableRelationProvider {
/**
- * Creates a relation with the given parameters based on the contents of the given
- * DataFrame. The mode specifies the expected behavior of createRelation when
- * data already exists.
- * Right now, there are three modes, Append, Overwrite, and ErrorIfExists.
- * Append mode means that when saving a DataFrame to a data source, if data already exists,
- * contents of the DataFrame are expected to be appended to existing data.
- * Overwrite mode means that when saving a DataFrame to a data source, if data already exists,
- * existing data is expected to be overwritten by the contents of the DataFrame.
- * ErrorIfExists mode means that when saving a DataFrame to a data source,
- * if data already exists, an exception is expected to be thrown.
- *
- * @since 1.3.0
- */
+ * Creates a relation with the given parameters based on the contents of the given
+ * DataFrame. The mode specifies the expected behavior of createRelation when
+ * data already exists.
+ * Right now, there are three modes, Append, Overwrite, and ErrorIfExists.
+ * Append mode means that when saving a DataFrame to a data source, if data already exists,
+ * contents of the DataFrame are expected to be appended to existing data.
+ * Overwrite mode means that when saving a DataFrame to a data source, if data already exists,
+ * existing data is expected to be overwritten by the contents of the DataFrame.
+ * ErrorIfExists mode means that when saving a DataFrame to a data source,
+ * if data already exists, an exception is expected to be thrown.
+ *
+ * @since 1.3.0
+ */
def createRelation(
sqlContext: SQLContext,
mode: SaveMode,
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
index 854a662cc4..d160f8ab8c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
@@ -286,8 +286,8 @@ abstract class QueryTest extends PlanTest {
}
/**
- * Asserts that a given [[Dataset]] does not have missing inputs in all the analyzed plans.
- */
+ * Asserts that a given [[Dataset]] does not have missing inputs in all the analyzed plans.
+ */
def assertEmptyMissingInput(query: Dataset[_]): Unit = {
assert(query.queryExecution.analyzed.missingInput.isEmpty,
s"The analyzed logical plan has missing inputs: ${query.queryExecution.analyzed}")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala
index 55906793c0..289e1b6db9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala
@@ -32,10 +32,10 @@ import org.apache.spark.unsafe.map.BytesToBytesMap
import org.apache.spark.util.Benchmark
/**
- * Benchmark to measure whole stage codegen performance.
- * To run this:
- * build/sbt "sql/test-only *BenchmarkWholeStageCodegen"
- */
+ * Benchmark to measure whole stage codegen performance.
+ * To run this:
+ * build/sbt "sql/test-only *BenchmarkWholeStageCodegen"
+ */
class BenchmarkWholeStageCodegen extends SparkFunSuite {
lazy val conf = new SparkConf().setMaster("local[1]").setAppName("benchmark")
.set("spark.sql.shuffle.partitions", "1")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVParserSuite.scala
index dc54883277..aaeecef5f3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVParserSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVParserSuite.scala
@@ -20,8 +20,8 @@ package org.apache.spark.sql.execution.datasources.csv
import org.apache.spark.SparkFunSuite
/**
- * test cases for StringIteratorReader
- */
+ * test cases for StringIteratorReader
+ */
class CSVParserSuite extends SparkFunSuite {
private def readAll(iter: Iterator[String]) = {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index c1e151d08b..ac37e8e022 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -497,9 +497,10 @@ class StreamingContext private[streaming] (
new TransformedDStream[T](dstreams, sparkContext.clean(transformFunc))
}
- /** Add a [[org.apache.spark.streaming.scheduler.StreamingListener]] object for
- * receiving system events related to streaming.
- */
+ /**
+ * Add a [[org.apache.spark.streaming.scheduler.StreamingListener]] object for
+ * receiving system events related to streaming.
+ */
def addStreamingListener(streamingListener: StreamingListener) {
scheduler.listenerBus.addListener(streamingListener)
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index 05f4da6fac..922e4a5e4d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -517,9 +517,10 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
ssc.remember(duration)
}
- /** Add a [[org.apache.spark.streaming.scheduler.StreamingListener]] object for
- * receiving system events related to streaming.
- */
+ /**
+ * Add a [[org.apache.spark.streaming.scheduler.StreamingListener]] object for
+ * receiving system events related to streaming.
+ */
def addStreamingListener(streamingListener: StreamingListener) {
ssc.addStreamingListener(streamingListener)
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala
index 0a861f22b1..fbac4880bd 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/RateLimiter.scala
@@ -22,17 +22,18 @@ import com.google.common.util.concurrent.{RateLimiter => GuavaRateLimiter}
import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
-/** Provides waitToPush() method to limit the rate at which receivers consume data.
- *
- * waitToPush method will block the thread if too many messages have been pushed too quickly,
- * and only return when a new message has been pushed. It assumes that only one message is
- * pushed at a time.
- *
- * The spark configuration spark.streaming.receiver.maxRate gives the maximum number of messages
- * per second that each receiver will accept.
- *
- * @param conf spark configuration
- */
+/**
+ * Provides waitToPush() method to limit the rate at which receivers consume data.
+ *
+ * waitToPush method will block the thread if too many messages have been pushed too quickly,
+ * and only return when a new message has been pushed. It assumes that only one message is
+ * pushed at a time.
+ *
+ * The spark configuration spark.streaming.receiver.maxRate gives the maximum number of messages
+ * per second that each receiver will accept.
+ *
+ * @param conf spark configuration
+ */
private[receiver] abstract class RateLimiter(conf: SparkConf) extends Logging {
// treated as an upper limit
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
index 66d5ffb797..0baedaf275 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala
@@ -21,9 +21,10 @@ import scala.collection.mutable.HashSet
import org.apache.spark.streaming.Time
-/** Class representing a set of Jobs
- * belong to the same batch.
- */
+/**
+ * Class representing a set of Jobs
+ * belong to the same batch.
+ */
private[streaming]
case class JobSet(
time: Time,
diff --git a/tools/src/main/scala/org/apache/spark/tools/GenerateMIMAIgnore.scala b/tools/src/main/scala/org/apache/spark/tools/GenerateMIMAIgnore.scala
index 0df3c501de..c9058ff409 100644
--- a/tools/src/main/scala/org/apache/spark/tools/GenerateMIMAIgnore.scala
+++ b/tools/src/main/scala/org/apache/spark/tools/GenerateMIMAIgnore.scala
@@ -91,10 +91,11 @@ object GenerateMIMAIgnore {
(ignoredClasses.flatMap(c => Seq(c, c.replace("$", "#"))).toSet, ignoredMembers.toSet)
}
- /** Scala reflection does not let us see inner function even if they are upgraded
- * to public for some reason. So had to resort to java reflection to get all inner
- * functions with $$ in there name.
- */
+ /**
+ * Scala reflection does not let us see inner function even if they are upgraded
+ * to public for some reason. So had to resort to java reflection to get all inner
+ * functions with $$ in there name.
+ */
def getInnerFunctions(classSymbol: unv.ClassSymbol): Seq[String] = {
try {
Class.forName(classSymbol.fullName, false, classLoader).getMethods.map(_.getName)
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index 5af2c29808..4b36da309d 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -135,8 +135,8 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
}
/**
- * Obtains token for the Hive metastore and adds them to the credentials.
- */
+ * Obtains token for the Hive metastore and adds them to the credentials.
+ */
def obtainTokenForHiveMetastore(
sparkConf: SparkConf,
conf: Configuration,
@@ -149,8 +149,8 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
}
/**
- * Obtain a security token for HBase.
- */
+ * Obtain a security token for HBase.
+ */
def obtainTokenForHBase(
sparkConf: SparkConf,
conf: Configuration,
@@ -164,10 +164,10 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
}
/**
- * Return whether delegation tokens should be retrieved for the given service when security is
- * enabled. By default, tokens are retrieved, but that behavior can be changed by setting
- * a service-specific configuration.
- */
+ * Return whether delegation tokens should be retrieved for the given service when security is
+ * enabled. By default, tokens are retrieved, but that behavior can be changed by setting
+ * a service-specific configuration.
+ */
private def shouldGetTokens(conf: SparkConf, service: String): Boolean = {
conf.getBoolean(s"spark.yarn.security.tokens.${service}.enabled", true)
}