diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2012-10-12 18:19:21 -0700 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2012-10-12 18:19:21 -0700 |
commit | b4067cbad4a46cda0799a891ded152531ca83b62 (patch) | |
tree | e1a66e613f0a2558ec4f15d45e5e3b84eef4ab14 /core | |
parent | 8d7b77bcb545e7e1167cf6e4a010809d5bd76c5a (diff) | |
download | spark-b4067cbad4a46cda0799a891ded152531ca83b62.tar.gz spark-b4067cbad4a46cda0799a891ded152531ca83b62.tar.bz2 spark-b4067cbad4a46cda0799a891ded152531ca83b62.zip |
More doc updates, and moved Serializer to a subpackage.
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/spark/JavaSerializer.scala | 4 | ||||
-rw-r--r-- | core/src/main/scala/spark/KryoSerializer.scala | 11 | ||||
-rw-r--r-- | core/src/main/scala/spark/ParallelCollection.scala | 2 | ||||
-rw-r--r-- | core/src/main/scala/spark/Partitioner.scala | 11 | ||||
-rw-r--r-- | core/src/main/scala/spark/RDD.scala | 4 | ||||
-rw-r--r-- | core/src/main/scala/spark/SparkEnv.scala | 1 | ||||
-rw-r--r-- | core/src/main/scala/spark/api/java/JavaDoubleRDD.scala | 2 | ||||
-rw-r--r-- | core/src/main/scala/spark/api/java/JavaPairRDD.scala | 10 | ||||
-rw-r--r-- | core/src/main/scala/spark/api/java/JavaRDD.scala | 2 | ||||
-rw-r--r-- | core/src/main/scala/spark/scheduler/Task.scala | 8 | ||||
-rw-r--r-- | core/src/main/scala/spark/serializer/Serializer.scala (renamed from core/src/main/scala/spark/Serializer.scala) | 18 | ||||
-rw-r--r-- | core/src/main/scala/spark/storage/BlockManager.scala | 3 |
12 files changed, 51 insertions, 25 deletions
diff --git a/core/src/main/scala/spark/JavaSerializer.scala b/core/src/main/scala/spark/JavaSerializer.scala index 863d00eeb5..b04a27d073 100644 --- a/core/src/main/scala/spark/JavaSerializer.scala +++ b/core/src/main/scala/spark/JavaSerializer.scala @@ -3,6 +3,7 @@ package spark import java.io._ import java.nio.ByteBuffer +import serializer.{Serializer, SerializerInstance, DeserializationStream, SerializationStream} import spark.util.ByteBufferInputStream private[spark] class JavaSerializationStream(out: OutputStream) extends SerializationStream { @@ -57,6 +58,9 @@ private[spark] class JavaSerializerInstance extends SerializerInstance { } } +/** + * A Spark serializer that uses Java's built-in serialization. + */ class JavaSerializer extends Serializer { def newInstance(): SerializerInstance = new JavaSerializerInstance } diff --git a/core/src/main/scala/spark/KryoSerializer.scala b/core/src/main/scala/spark/KryoSerializer.scala index b8aa3a86c5..44b630e478 100644 --- a/core/src/main/scala/spark/KryoSerializer.scala +++ b/core/src/main/scala/spark/KryoSerializer.scala @@ -13,6 +13,7 @@ import com.esotericsoftware.kryo.serialize.ClassSerializer import com.esotericsoftware.kryo.serialize.SerializableSerializer import de.javakaffee.kryoserializers.KryoReflectionFactorySupport +import serializer.{SerializerInstance, DeserializationStream, SerializationStream} import spark.broadcast._ import spark.storage._ @@ -158,12 +159,18 @@ private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends Serializ } } -// Used by clients to register their own classes +/** + * Interface implemented by clients to register their classes with Kryo when using Kryo + * serialization. + */ trait KryoRegistrator { def registerClasses(kryo: Kryo): Unit } -class KryoSerializer extends Serializer with Logging { +/** + * A Spark serializer that uses the [[http://code.google.com/p/kryo/wiki/V1Documentation Kryo 1.x library]]. + */ +class KryoSerializer extends spark.serializer.Serializer with Logging { // Make this lazy so that it only gets called once we receive our first task on each executor, // so we can pull out any custom Kryo registrator from the user's JARs. lazy val kryo = createKryo() diff --git a/core/src/main/scala/spark/ParallelCollection.scala b/core/src/main/scala/spark/ParallelCollection.scala index 321f5264b8..9b57ae3b4f 100644 --- a/core/src/main/scala/spark/ParallelCollection.scala +++ b/core/src/main/scala/spark/ParallelCollection.scala @@ -21,7 +21,7 @@ private[spark] class ParallelCollectionSplit[T: ClassManifest]( override val index: Int = slice } -class ParallelCollection[T: ClassManifest]( +private[spark] class ParallelCollection[T: ClassManifest]( sc: SparkContext, @transient data: Seq[T], numSlices: Int) diff --git a/core/src/main/scala/spark/Partitioner.scala b/core/src/main/scala/spark/Partitioner.scala index 20c31714ae..b71021a082 100644 --- a/core/src/main/scala/spark/Partitioner.scala +++ b/core/src/main/scala/spark/Partitioner.scala @@ -1,10 +1,17 @@ package spark +/** + * An object that defines how the elements in a key-value pair RDD are partitioned by key. + * Maps each key to a partition ID, from 0 to `numPartitions - 1`. + */ abstract class Partitioner extends Serializable { def numPartitions: Int def getPartition(key: Any): Int } +/** + * A [[spark.Partitioner]] that implements hash-based partitioning using Java's `Object.hashCode`. + */ class HashPartitioner(partitions: Int) extends Partitioner { def numPartitions = partitions @@ -29,6 +36,10 @@ class HashPartitioner(partitions: Int) extends Partitioner { } } +/** + * A [[spark.Partitioner]] that partitions sortable records by range into roughly equal ranges. + * Determines the ranges by sampling the RDD passed in. + */ class RangePartitioner[K <% Ordered[K]: ClassManifest, V]( partitions: Int, @transient rdd: RDD[(K,V)], diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index fdda8f29a6..ddb420efff 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -121,10 +121,10 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial this } - /** Persist this RDD with the default storage level (MEMORY_ONLY). */ + /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */ def persist(): RDD[T] = persist(StorageLevel.MEMORY_ONLY) - /** Persist this RDD with the default storage level (MEMORY_ONLY). */ + /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */ def cache(): RDD[T] = persist() /** Get the RDD's current storage level, or StorageLevel.NONE if none is set. */ diff --git a/core/src/main/scala/spark/SparkEnv.scala b/core/src/main/scala/spark/SparkEnv.scala index 6a006e0697..4c6ec6cc6e 100644 --- a/core/src/main/scala/spark/SparkEnv.scala +++ b/core/src/main/scala/spark/SparkEnv.scala @@ -4,6 +4,7 @@ import akka.actor.ActorSystem import akka.actor.ActorSystemImpl import akka.remote.RemoteActorRefProvider +import serializer.Serializer import spark.broadcast.BroadcastManager import spark.storage.BlockManager import spark.storage.BlockManagerMaster diff --git a/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala b/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala index 4ad006c4a7..843e1bd18b 100644 --- a/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala +++ b/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala @@ -22,7 +22,7 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, Jav import JavaDoubleRDD.fromRDD - /** Persist this RDD with the default storage level (MEMORY_ONLY). */ + /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */ def cache(): JavaDoubleRDD = fromRDD(srdd.cache()) /** diff --git a/core/src/main/scala/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/spark/api/java/JavaPairRDD.scala index cfaa26ff52..5c2be534ff 100644 --- a/core/src/main/scala/spark/api/java/JavaPairRDD.scala +++ b/core/src/main/scala/spark/api/java/JavaPairRDD.scala @@ -34,7 +34,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif // Common RDD functions - /** Persist this RDD with the default storage level (MEMORY_ONLY). */ + /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */ def cache(): JavaPairRDD[K, V] = new JavaPairRDD[K, V](rdd.cache()) /** @@ -431,8 +431,12 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif rdd.saveAsHadoopDataset(conf) } - - // Ordered RDD Functions + /** + * Sort the RDD by key, so that each partition contains a sorted range of the elements in + * ascending order. Calling `collect` or `save` on the resulting RDD will return or output an + * ordered list of records (in the `save` case, they will be written to multiple `part-X` files + * in the filesystem, in order of the keys). + */ def sortByKey(): JavaPairRDD[K, V] = sortByKey(true) /** diff --git a/core/src/main/scala/spark/api/java/JavaRDD.scala b/core/src/main/scala/spark/api/java/JavaRDD.scala index 50be5a6cbe..ac31350ec3 100644 --- a/core/src/main/scala/spark/api/java/JavaRDD.scala +++ b/core/src/main/scala/spark/api/java/JavaRDD.scala @@ -11,7 +11,7 @@ JavaRDDLike[T, JavaRDD[T]] { // Common RDD functions - /** Persist this RDD with the default storage level (MEMORY_ONLY). */ + /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */ def cache(): JavaRDD[T] = wrapRDD(rdd.cache()) /** diff --git a/core/src/main/scala/spark/scheduler/Task.scala b/core/src/main/scala/spark/scheduler/Task.scala index d449ac67d6..ef987fdeb6 100644 --- a/core/src/main/scala/spark/scheduler/Task.scala +++ b/core/src/main/scala/spark/scheduler/Task.scala @@ -1,8 +1,8 @@ package spark.scheduler -import scala.collection.mutable.{HashMap} -import spark.{SerializerInstance, Serializer, Utils} -import java.io.{DataInputStream, DataOutputStream, File} +import scala.collection.mutable.HashMap +import spark.serializer.{SerializerInstance, Serializer} +import java.io.{DataInputStream, DataOutputStream} import java.nio.ByteBuffer import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream import spark.util.ByteBufferInputStream @@ -92,4 +92,4 @@ private[spark] object Task { val subBuffer = serializedTask.slice() // ByteBufferInputStream will have read just up to task (taskFiles, taskJars, subBuffer) } -}
\ No newline at end of file +} diff --git a/core/src/main/scala/spark/Serializer.scala b/core/src/main/scala/spark/serializer/Serializer.scala index d8bcf6326a..50b086125a 100644 --- a/core/src/main/scala/spark/Serializer.scala +++ b/core/src/main/scala/spark/serializer/Serializer.scala @@ -1,25 +1,23 @@ -package spark +package spark.serializer -import java.io.{EOFException, InputStream, OutputStream} import java.nio.ByteBuffer -import java.nio.channels.Channels - +import java.io.{EOFException, InputStream, OutputStream} import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream - import spark.util.ByteBufferInputStream /** * A serializer. Because some serialization libraries are not thread safe, this class is used to - * create SerializerInstances that do the actual serialization. + * create [[spark.serializer.SerializerInstance]] objects that do the actual serialization and are + * guaranteed to only be called from one thread at a time. */ trait Serializer { def newInstance(): SerializerInstance } /** - * An instance of the serializer, for use by one thread at a time. + * An instance of a serializer, for use by one thread at a time. */ -private[spark] trait SerializerInstance { +trait SerializerInstance { def serialize[T](t: T): ByteBuffer def deserialize[T](bytes: ByteBuffer): T @@ -50,7 +48,7 @@ private[spark] trait SerializerInstance { /** * A stream for writing serialized objects. */ -private[spark] trait SerializationStream { +trait SerializationStream { def writeObject[T](t: T): SerializationStream def flush(): Unit def close(): Unit @@ -66,7 +64,7 @@ private[spark] trait SerializationStream { /** * A stream for reading serialized objects. */ -private[spark] trait DeserializationStream { +trait DeserializationStream { def readObject[T](): T def close(): Unit diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala index 8a111f44c9..bd9155ef29 100644 --- a/core/src/main/scala/spark/storage/BlockManager.scala +++ b/core/src/main/scala/spark/storage/BlockManager.scala @@ -12,8 +12,9 @@ import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue} import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Queue} import scala.collection.JavaConversions._ -import spark.{CacheTracker, Logging, Serializer, SizeEstimator, SparkException, Utils} +import spark.{CacheTracker, Logging, SizeEstimator, SparkException, Utils} import spark.network._ +import spark.serializer.Serializer import spark.util.ByteBufferInputStream import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream} import sun.nio.ch.DirectBuffer |