aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorReynold Xin <reynoldx@gmail.com>2013-08-18 20:39:29 -0700
committerReynold Xin <reynoldx@gmail.com>2013-08-18 20:39:29 -0700
commit2a7b99c08b29d3002183a8d7ed3acd14fbf5dc41 (patch)
tree81719e59174193fb00ffbf0411842574bf25394b /core
parent82bf4c0339808f51c9cdffa6a0a829cb5981d92d (diff)
downloadspark-2a7b99c08b29d3002183a8d7ed3acd14fbf5dc41.tar.gz
spark-2a7b99c08b29d3002183a8d7ed3acd14fbf5dc41.tar.bz2
spark-2a7b99c08b29d3002183a8d7ed3acd14fbf5dc41.zip
Added the missing RDD files and cleaned up SparkContext.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/SparkContext.scala17
-rw-r--r--core/src/main/scala/spark/rdd/FlatMappedValuesRDD.scala36
-rw-r--r--core/src/main/scala/spark/rdd/MappedValuesRDD.scala34
-rw-r--r--core/src/main/scala/spark/rdd/OrderedRDDFunctions.scala51
4 files changed, 126 insertions, 12 deletions
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index c049bd3fa9..5db1767146 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -20,19 +20,14 @@ package spark
import java.io._
import java.net.URI
import java.util.Properties
-import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicInteger
-import scala.collection.JavaConversions._
import scala.collection.Map
import scala.collection.generic.Growable
-import scala.collection.mutable.HashMap
-import scala.collection.mutable.ArrayBuffer
import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.HashMap
import scala.util.DynamicVariable
-import scala.collection.mutable.{ConcurrentMap, HashMap}
-
-import akka.actor.Actor._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
@@ -54,7 +49,6 @@ import org.apache.hadoop.mapred.TextInputFormat
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.hadoop.mapreduce.{Job => NewHadoopJob}
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
-import org.apache.hadoop.security.UserGroupInformation
import org.apache.mesos.MesosNativeLibrary
@@ -63,15 +57,14 @@ import spark.partial.{ApproximateEvaluator, PartialResult}
import spark.rdd.{CheckpointRDD, HadoopRDD, NewHadoopRDD, UnionRDD, ParallelCollectionRDD,
OrderedRDDFunctions}
import spark.scheduler.{DAGScheduler, DAGSchedulerSource, ResultTask, ShuffleMapTask, SparkListener,
- SplitInfo, Stage, StageInfo, TaskScheduler, ActiveJob}
+ SplitInfo, Stage, StageInfo, TaskScheduler}
import spark.scheduler.cluster.{StandaloneSchedulerBackend, SparkDeploySchedulerBackend,
ClusterScheduler, Schedulable, SchedulingMode}
import spark.scheduler.local.LocalScheduler
import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
import spark.storage.{StorageStatus, StorageUtils, RDDInfo, BlockManagerSource}
+import spark.ui.SparkUI
import spark.util.{MetadataCleaner, TimeStampedHashMap}
-import ui.{SparkUI}
-import spark.metrics._
/**
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
@@ -887,7 +880,7 @@ object SparkContext {
implicit def rddToOrderedRDDFunctions[K <% Ordered[K]: ClassManifest, V: ClassManifest](
rdd: RDD[(K, V)]) =
- new OrderedRDDFunctions(rdd.asInstanceOf[RDD[Product2[K, V]]])
+ new OrderedRDDFunctions(rdd)
implicit def doubleRDDToDoubleRDDFunctions(rdd: RDD[Double]) = new DoubleRDDFunctions(rdd)
diff --git a/core/src/main/scala/spark/rdd/FlatMappedValuesRDD.scala b/core/src/main/scala/spark/rdd/FlatMappedValuesRDD.scala
new file mode 100644
index 0000000000..05fdfd82c1
--- /dev/null
+++ b/core/src/main/scala/spark/rdd/FlatMappedValuesRDD.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.rdd
+
+import spark.{TaskContext, Partition, RDD}
+
+
+private[spark]
+class FlatMappedValuesRDD[K, V, U](prev: RDD[_ <: Product2[K, V]], f: V => TraversableOnce[U])
+ extends RDD[(K, U)](prev) {
+
+ override def getPartitions = firstParent[Product2[K, V]].partitions
+
+ override val partitioner = firstParent[Product2[K, V]].partitioner
+
+ override def compute(split: Partition, context: TaskContext) = {
+ firstParent[Product2[K, V]].iterator(split, context).flatMap { case (k, v) =>
+ f(v).map(x => (k, x))
+ }
+ }
+}
diff --git a/core/src/main/scala/spark/rdd/MappedValuesRDD.scala b/core/src/main/scala/spark/rdd/MappedValuesRDD.scala
new file mode 100644
index 0000000000..21ae97daa9
--- /dev/null
+++ b/core/src/main/scala/spark/rdd/MappedValuesRDD.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.rdd
+
+
+import spark.{TaskContext, Partition, RDD}
+
+private[spark]
+class MappedValuesRDD[K, V, U](prev: RDD[_ <: Product2[K, V]], f: V => U)
+ extends RDD[(K, U)](prev) {
+
+ override def getPartitions = firstParent[Product2[K, U]].partitions
+
+ override val partitioner = firstParent[Product2[K, U]].partitioner
+
+ override def compute(split: Partition, context: TaskContext): Iterator[(K, U)] = {
+ firstParent[Product2[K, V]].iterator(split, context).map { case(k ,v) => (k, f(v)) }
+ }
+}
diff --git a/core/src/main/scala/spark/rdd/OrderedRDDFunctions.scala b/core/src/main/scala/spark/rdd/OrderedRDDFunctions.scala
new file mode 100644
index 0000000000..6328c6a4ac
--- /dev/null
+++ b/core/src/main/scala/spark/rdd/OrderedRDDFunctions.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.rdd
+
+import spark.{RangePartitioner, Logging, RDD}
+
+/**
+ * Extra functions available on RDDs of (key, value) pairs where the key is sortable through
+ * an implicit conversion. Import `spark.SparkContext._` at the top of your program to use these
+ * functions. They will work with any key type that has a `scala.math.Ordered` implementation.
+ */
+class OrderedRDDFunctions[K <% Ordered[K]: ClassManifest, V: ClassManifest](
+ self: RDD[_ <: Product2[K, V]])
+ extends Logging with Serializable {
+
+ /**
+ * Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling
+ * `collect` or `save` on the resulting RDD will return or output an ordered list of records
+ * (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in
+ * order of the keys).
+ */
+ def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size)
+ : RDD[(K, V)] =
+ {
+ val part = new RangePartitioner(numPartitions, self.asInstanceOf[RDD[Product2[K,V]]], ascending)
+ val shuffled = new ShuffledRDD[K, V](self, part)
+ shuffled.mapPartitions(iter => {
+ val buf = iter.toArray
+ if (ascending) {
+ buf.sortWith((x, y) => x._1 < y._1).iterator
+ } else {
+ buf.sortWith((x, y) => x._1 > y._1).iterator
+ }
+ }, preservesPartitioning = true)
+ }
+}