aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@apache.org>2014-01-09 00:56:16 -0800
committerReynold Xin <rxin@apache.org>2014-01-09 00:56:16 -0800
commit365cac94652cd012bf3783f74eed98c95b884bbb (patch)
tree60b340cd5b9c75dd54dd4677f4ac48f99fe82e31
parent73c724e1237db383737a9d37c1c8d697064ec28e (diff)
parent295d82583a8e162a41a7cc59a6612490580ffe09 (diff)
downloadspark-365cac94652cd012bf3783f74eed98c95b884bbb.tar.gz
spark-365cac94652cd012bf3783f74eed98c95b884bbb.tar.bz2
spark-365cac94652cd012bf3783f74eed98c95b884bbb.zip
Merge pull request #361 from rxin/clean
Minor style cleanup. Mostly on indenting & line width changes. Focused on the few important files since they are the files that new contributors usually read first.
-rw-r--r--core/src/main/scala/org/apache/spark/Partitioner.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala32
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala52
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDD.scala32
4 files changed, 68 insertions, 55 deletions
diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala
index 31b0773bfe..9b043f06dd 100644
--- a/core/src/main/scala/org/apache/spark/Partitioner.scala
+++ b/core/src/main/scala/org/apache/spark/Partitioner.scala
@@ -61,7 +61,8 @@ object Partitioner {
}
/**
- * A [[org.apache.spark.Partitioner]] that implements hash-based partitioning using Java's `Object.hashCode`.
+ * A [[org.apache.spark.Partitioner]] that implements hash-based partitioning using
+ * Java's `Object.hashCode`.
*
* Java arrays have hashCodes that are based on the arrays' identities rather than their contents,
* so attempting to partition an RDD[Array[_]] or RDD[(Array[_], _)] using a HashPartitioner will
@@ -84,8 +85,8 @@ class HashPartitioner(partitions: Int) extends Partitioner {
}
/**
- * A [[org.apache.spark.Partitioner]] that partitions sortable records by range into roughly equal ranges.
- * Determines the ranges by sampling the RDD passed in.
+ * A [[org.apache.spark.Partitioner]] that partitions sortable records by range into roughly
+ * equal ranges. The ranges are determined by sampling the content of the RDD passed in.
*/
class RangePartitioner[K <% Ordered[K]: ClassTag, V](
partitions: Int,
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 0e47f4e442..fce8f2d48c 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -31,9 +31,9 @@ import scala.reflect.{ClassTag, classTag}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{ArrayWritable, BooleanWritable, BytesWritable, DoubleWritable,
-FloatWritable, IntWritable, LongWritable, NullWritable, Text, Writable}
+ FloatWritable, IntWritable, LongWritable, NullWritable, Text, Writable}
import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf, SequenceFileInputFormat,
-TextInputFormat}
+ TextInputFormat}
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, Job => NewHadoopJob}
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
import org.apache.mesos.MesosNativeLibrary
@@ -49,7 +49,7 @@ import org.apache.spark.scheduler.local.LocalBackend
import org.apache.spark.storage.{BlockManagerSource, RDDInfo, StorageStatus, StorageUtils}
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.{Utils, TimeStampedHashMap, MetadataCleaner, MetadataCleanerType,
-ClosureCleaner}
+ ClosureCleaner}
/**
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
@@ -517,15 +517,15 @@ class SparkContext(
// Methods for creating shared variables
/**
- * Create an [[org.apache.spark.Accumulator]] variable of a given type, which tasks can "add" values
- * to using the `+=` method. Only the driver can access the accumulator's `value`.
+ * Create an [[org.apache.spark.Accumulator]] variable of a given type, which tasks can "add"
+ * values to using the `+=` method. Only the driver can access the accumulator's `value`.
*/
def accumulator[T](initialValue: T)(implicit param: AccumulatorParam[T]) =
new Accumulator(initialValue, param)
/**
- * Create an [[org.apache.spark.Accumulable]] shared variable, to which tasks can add values with `+=`.
- * Only the driver can access the accumuable's `value`.
+ * Create an [[org.apache.spark.Accumulable]] shared variable, to which tasks can add values
+ * with `+=`. Only the driver can access the accumuable's `value`.
* @tparam T accumulator type
* @tparam R type that can be added to the accumulator
*/
@@ -538,14 +538,16 @@ class SparkContext(
* Growable and TraversableOnce are the standard APIs that guarantee += and ++=, implemented by
* standard mutable collections. So you can use this with mutable Map, Set, etc.
*/
- def accumulableCollection[R <% Growable[T] with TraversableOnce[T] with Serializable, T](initialValue: R) = {
+ def accumulableCollection[R <% Growable[T] with TraversableOnce[T] with Serializable, T]
+ (initialValue: R) = {
val param = new GrowableAccumulableParam[R,T]
new Accumulable(initialValue, param)
}
/**
- * Broadcast a read-only variable to the cluster, returning a [[org.apache.spark.broadcast.Broadcast]] object for
- * reading it in distributed functions. The variable will be sent to each cluster only once.
+ * Broadcast a read-only variable to the cluster, returning a
+ * [[org.apache.spark.broadcast.Broadcast]] object for reading it in distributed functions.
+ * The variable will be sent to each cluster only once.
*/
def broadcast[T](value: T) = env.broadcastManager.newBroadcast[T](value, isLocal)
@@ -1010,7 +1012,8 @@ object SparkContext {
implicit def stringToText(s: String) = new Text(s)
- private implicit def arrayToArrayWritable[T <% Writable: ClassTag](arr: Traversable[T]): ArrayWritable = {
+ private implicit def arrayToArrayWritable[T <% Writable: ClassTag](arr: Traversable[T])
+ : ArrayWritable = {
def anyToWritable[U <% Writable](u: U): Writable = u
new ArrayWritable(classTag[T].runtimeClass.asInstanceOf[Class[Writable]],
@@ -1033,7 +1036,9 @@ object SparkContext {
implicit def booleanWritableConverter() = simpleWritableConverter[Boolean, BooleanWritable](_.get)
- implicit def bytesWritableConverter() = simpleWritableConverter[Array[Byte], BytesWritable](_.getBytes)
+ implicit def bytesWritableConverter() = {
+ simpleWritableConverter[Array[Byte], BytesWritable](_.getBytes)
+ }
implicit def stringWritableConverter() = simpleWritableConverter[String, Text](_.toString)
@@ -1049,7 +1054,8 @@ object SparkContext {
if (uri != null) {
val uriStr = uri.toString
if (uriStr.startsWith("jar:file:")) {
- // URI will be of the form "jar:file:/path/foo.jar!/package/cls.class", so pull out the /path/foo.jar
+ // URI will be of the form "jar:file:/path/foo.jar!/package/cls.class",
+ // so pull out the /path/foo.jar
List(uriStr.substring("jar:file:".length, uriStr.indexOf('!')))
} else {
Nil
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index a0809cd43a..c118ddfc01 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -18,35 +18,34 @@
package org.apache.spark.rdd
import java.nio.ByteBuffer
-import java.util.Date
import java.text.SimpleDateFormat
+import java.util.Date
import java.util.{HashMap => JHashMap}
-import scala.collection.{mutable, Map}
+import scala.collection.Map
+import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.collection.JavaConversions._
import scala.reflect.{ClassTag, classTag}
-import org.apache.hadoop.mapred._
-import org.apache.hadoop.io.compress.CompressionCodec
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.SequenceFile.CompressionType
-import org.apache.hadoop.mapred.FileOutputFormat
-import org.apache.hadoop.mapred.OutputFormat
+import org.apache.hadoop.io.compress.CompressionCodec
+import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf, OutputFormat}
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
-import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat}
-import org.apache.hadoop.mapreduce.SparkHadoopMapReduceUtil
import org.apache.hadoop.mapreduce.{Job => NewAPIHadoopJob}
import org.apache.hadoop.mapreduce.{RecordWriter => NewRecordWriter}
+import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat}
import com.clearspring.analytics.stream.cardinality.HyperLogLog
+// SparkHadoopWriter and SparkHadoopMapReduceUtil are actually source files defined in Spark.
+import org.apache.hadoop.mapred.SparkHadoopWriter
+import org.apache.hadoop.mapreduce.SparkHadoopMapReduceUtil
import org.apache.spark._
import org.apache.spark.SparkContext._
import org.apache.spark.partial.{BoundedDouble, PartialResult}
-import org.apache.spark.Aggregator
-import org.apache.spark.Partitioner
import org.apache.spark.Partitioner.defaultPartitioner
import org.apache.spark.util.SerializableHyperLogLog
@@ -120,9 +119,9 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
}
/**
- * Merge the values for each key using an associative function and a neutral "zero value" which may
- * be added to the result an arbitrary number of times, and must not change the result (e.g., Nil for
- * list concatenation, 0 for addition, or 1 for multiplication.).
+ * Merge the values for each key using an associative function and a neutral "zero value" which
+ * may be added to the result an arbitrary number of times, and must not change the result
+ * (e.g., Nil for list concatenation, 0 for addition, or 1 for multiplication.).
*/
def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)] = {
// Serialize the zero value to a byte array so that we can get a new clone of it on each key
@@ -138,18 +137,18 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
}
/**
- * Merge the values for each key using an associative function and a neutral "zero value" which may
- * be added to the result an arbitrary number of times, and must not change the result (e.g., Nil for
- * list concatenation, 0 for addition, or 1 for multiplication.).
+ * Merge the values for each key using an associative function and a neutral "zero value" which
+ * may be added to the result an arbitrary number of times, and must not change the result
+ * (e.g., Nil for list concatenation, 0 for addition, or 1 for multiplication.).
*/
def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)] = {
foldByKey(zeroValue, new HashPartitioner(numPartitions))(func)
}
/**
- * Merge the values for each key using an associative function and a neutral "zero value" which may
- * be added to the result an arbitrary number of times, and must not change the result (e.g., Nil for
- * list concatenation, 0 for addition, or 1 for multiplication.).
+ * Merge the values for each key using an associative function and a neutral "zero value" which
+ * may be added to the result an arbitrary number of times, and must not change the result
+ * (e.g., Nil for list concatenation, 0 for addition, or 1 for multiplication.).
*/
def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)] = {
foldByKey(zeroValue, defaultPartitioner(self))(func)
@@ -226,7 +225,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
}
/**
- * Return approximate number of distinct values for each key in this RDD.
+ * Return approximate number of distinct values for each key in this RDD.
* The accuracy of approximation can be controlled through the relative standard deviation
* (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
* more accurate counts but increase the memory footprint and vise versa. HashPartitions the
@@ -579,7 +578,8 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
*/
def saveAsHadoopFile[F <: OutputFormat[K, V]](
path: String, codec: Class[_ <: CompressionCodec]) (implicit fm: ClassTag[F]) {
- saveAsHadoopFile(path, getKeyClass, getValueClass, fm.runtimeClass.asInstanceOf[Class[F]], codec)
+ val runtimeClass = fm.runtimeClass
+ saveAsHadoopFile(path, getKeyClass, getValueClass, runtimeClass.asInstanceOf[Class[F]], codec)
}
/**
@@ -599,7 +599,8 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[_ <: NewOutputFormat[_, _]],
- conf: Configuration = self.context.hadoopConfiguration) {
+ conf: Configuration = self.context.hadoopConfiguration)
+ {
val job = new NewAPIHadoopJob(conf)
job.setOutputKeyClass(keyClass)
job.setOutputValueClass(valueClass)
@@ -668,7 +669,9 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
codec: Option[Class[_ <: CompressionCodec]] = None) {
conf.setOutputKeyClass(keyClass)
conf.setOutputValueClass(valueClass)
- // conf.setOutputFormat(outputFormatClass) // Doesn't work in Scala 2.9 due to what may be a generics bug
+ // Doesn't work in Scala 2.9 due to what may be a generics bug
+ // TODO: Should we uncomment this for Scala 2.10?
+ // conf.setOutputFormat(outputFormatClass)
conf.set("mapred.output.format.class", outputFormatClass.getName)
for (c <- codec) {
conf.setCompressMapOutput(true)
@@ -702,7 +705,8 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
throw new SparkException("Output value class not set")
}
- logInfo("Saving as hadoop file of type (" + keyClass.getSimpleName+ ", " + valueClass.getSimpleName+ ")")
+ logDebug("Saving as hadoop file of type (" + keyClass.getSimpleName + ", " +
+ valueClass.getSimpleName+ ")")
val writer = new SparkHadoopWriter(conf)
writer.preSetup()
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 3f41b66279..f9dc12eee3 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -23,7 +23,6 @@ import scala.collection.Map
import scala.collection.JavaConversions.mapAsScalaMap
import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable.HashMap
import scala.reflect.{classTag, ClassTag}
import org.apache.hadoop.io.BytesWritable
@@ -52,11 +51,13 @@ import org.apache.spark._
* partitioned collection of elements that can be operated on in parallel. This class contains the
* basic operations available on all RDDs, such as `map`, `filter`, and `persist`. In addition,
* [[org.apache.spark.rdd.PairRDDFunctions]] contains operations available only on RDDs of key-value
- * pairs, such as `groupByKey` and `join`; [[org.apache.spark.rdd.DoubleRDDFunctions]] contains
- * operations available only on RDDs of Doubles; and [[org.apache.spark.rdd.SequenceFileRDDFunctions]]
- * contains operations available on RDDs that can be saved as SequenceFiles. These operations are
- * automatically available on any RDD of the right type (e.g. RDD[(Int, Int)] through implicit
- * conversions when you `import org.apache.spark.SparkContext._`.
+ * pairs, such as `groupByKey` and `join`;
+ * [[org.apache.spark.rdd.DoubleRDDFunctions]] contains operations available only on RDDs of
+ * Doubles; and
+ * [[org.apache.spark.rdd.SequenceFileRDDFunctions]] contains operations available on RDDs that
+ * can be saved as SequenceFiles.
+ * These operations are automatically available on any RDD of the right type (e.g. RDD[(Int, Int)]
+ * through implicit conversions when you `import org.apache.spark.SparkContext._`.
*
* Internally, each RDD is characterized by five main properties:
*
@@ -235,12 +236,9 @@ abstract class RDD[T: ClassTag](
/**
* Compute an RDD partition or read it from a checkpoint if the RDD is checkpointing.
*/
- private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] = {
- if (isCheckpointed) {
- firstParent[T].iterator(split, context)
- } else {
- compute(split, context)
- }
+ private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
+ {
+ if (isCheckpointed) firstParent[T].iterator(split, context) else compute(split, context)
}
// Transformations (return a new RDD)
@@ -268,6 +266,9 @@ abstract class RDD[T: ClassTag](
def distinct(numPartitions: Int): RDD[T] =
map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
+ /**
+ * Return a new RDD containing the distinct elements in this RDD.
+ */
def distinct(): RDD[T] = distinct(partitions.size)
/**
@@ -280,7 +281,7 @@ abstract class RDD[T: ClassTag](
* which can avoid performing a shuffle.
*/
def repartition(numPartitions: Int): RDD[T] = {
- coalesce(numPartitions, true)
+ coalesce(numPartitions, shuffle = true)
}
/**
@@ -646,7 +647,8 @@ abstract class RDD[T: ClassTag](
}
/**
- * Reduces the elements of this RDD using the specified commutative and associative binary operator.
+ * Reduces the elements of this RDD using the specified commutative and
+ * associative binary operator.
*/
def reduce(f: (T, T) => T): T = {
val cleanF = sc.clean(f)
@@ -953,7 +955,7 @@ abstract class RDD[T: ClassTag](
private var storageLevel: StorageLevel = StorageLevel.NONE
/** Record user function generating this RDD. */
- @transient private[spark] val origin = sc.getCallSite
+ @transient private[spark] val origin = sc.getCallSite()
private[spark] def elementClassTag: ClassTag[T] = classTag[T]