aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/pom.xml2
-rw-r--r--core/src/main/scala/spark/RDD.scala2
-rw-r--r--core/src/main/scala/spark/SequenceFileRDDFunctions.scala22
-rw-r--r--core/src/main/scala/spark/rdd/CheckpointRDD.scala2
-rw-r--r--core/src/main/scala/spark/rdd/CoGroupedRDD.scala4
-rw-r--r--core/src/main/scala/spark/rdd/ShuffledRDD.scala2
-rw-r--r--core/src/main/scala/spark/rdd/SubtractedRDD.scala2
-rw-r--r--core/src/main/scala/spark/rdd/ZippedRDD.scala20
-rw-r--r--core/src/main/scala/spark/storage/BlockManagerMasterActor.scala3
9 files changed, 30 insertions, 29 deletions
diff --git a/core/pom.xml b/core/pom.xml
index 7f65ce5c00..da26d674ec 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -4,7 +4,7 @@
<parent>
<groupId>org.spark-project</groupId>
<artifactId>spark-parent</artifactId>
- <version>0.7.1-SNAPSHOT</version>
+ <version>0.8.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index 33dc7627a3..ccd9d0364a 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -366,7 +366,7 @@ abstract class RDD[T: ClassManifest](
* Return a new RDD by applying a function to each partition of this RDD, while tracking the index
* of the original partition.
*/
- @deprecated("use mapPartitionsWithIndex")
+ @deprecated("use mapPartitionsWithIndex", "0.7.0")
def mapPartitionsWithSplit[U: ClassManifest](
f: (Int, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U] =
diff --git a/core/src/main/scala/spark/SequenceFileRDDFunctions.scala b/core/src/main/scala/spark/SequenceFileRDDFunctions.scala
index 6b4a11d6d3..518034e07b 100644
--- a/core/src/main/scala/spark/SequenceFileRDDFunctions.scala
+++ b/core/src/main/scala/spark/SequenceFileRDDFunctions.scala
@@ -36,17 +36,17 @@ class SequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable : Cla
self: RDD[(K, V)])
extends Logging
with Serializable {
-
+
private def getWritableClass[T <% Writable: ClassManifest](): Class[_ <: Writable] = {
val c = {
- if (classOf[Writable].isAssignableFrom(classManifest[T].erasure)) {
+ if (classOf[Writable].isAssignableFrom(classManifest[T].erasure)) {
classManifest[T].erasure
} else {
// We get the type of the Writable class by looking at the apply method which converts
// from T to Writable. Since we have two apply methods we filter out the one which
- // is of the form "java.lang.Object apply(java.lang.Object)"
+ // is not of the form "java.lang.Object apply(java.lang.Object)"
implicitly[T => Writable].getClass.getDeclaredMethods().filter(
- m => m.getReturnType().toString != "java.lang.Object" &&
+ m => m.getReturnType().toString != "class java.lang.Object" &&
m.getName() == "apply")(0).getReturnType
}
@@ -69,17 +69,17 @@ class SequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable : Cla
val valueClass = getWritableClass[V]
val convertKey = !classOf[Writable].isAssignableFrom(self.getKeyClass)
val convertValue = !classOf[Writable].isAssignableFrom(self.getValueClass)
-
- logInfo("Saving as sequence file of type (" + keyClass.getSimpleName + "," + valueClass.getSimpleName + ")" )
+
+ logInfo("Saving as sequence file of type (" + keyClass.getSimpleName + "," + valueClass.getSimpleName + ")" )
val format = classOf[SequenceFileOutputFormat[Writable, Writable]]
if (!convertKey && !convertValue) {
- self.saveAsHadoopFile(path, keyClass, valueClass, format)
+ self.saveAsHadoopFile(path, keyClass, valueClass, format)
} else if (!convertKey && convertValue) {
- self.map(x => (x._1,anyToWritable(x._2))).saveAsHadoopFile(path, keyClass, valueClass, format)
+ self.map(x => (x._1,anyToWritable(x._2))).saveAsHadoopFile(path, keyClass, valueClass, format)
} else if (convertKey && !convertValue) {
- self.map(x => (anyToWritable(x._1),x._2)).saveAsHadoopFile(path, keyClass, valueClass, format)
+ self.map(x => (anyToWritable(x._1),x._2)).saveAsHadoopFile(path, keyClass, valueClass, format)
} else if (convertKey && convertValue) {
- self.map(x => (anyToWritable(x._1),anyToWritable(x._2))).saveAsHadoopFile(path, keyClass, valueClass, format)
- }
+ self.map(x => (anyToWritable(x._1),anyToWritable(x._2))).saveAsHadoopFile(path, keyClass, valueClass, format)
+ }
}
}
diff --git a/core/src/main/scala/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/spark/rdd/CheckpointRDD.scala
index 79d00edee7..43ee39c993 100644
--- a/core/src/main/scala/spark/rdd/CheckpointRDD.scala
+++ b/core/src/main/scala/spark/rdd/CheckpointRDD.scala
@@ -131,6 +131,6 @@ private[spark] object CheckpointRDD extends Logging {
val cpRDD = new CheckpointRDD[Int](sc, path.toString)
assert(cpRDD.partitions.length == rdd.partitions.length, "Number of partitions is not the same")
assert(cpRDD.collect.toList == rdd.collect.toList, "Data of partitions not the same")
- fs.delete(path)
+ fs.delete(path, true)
}
}
diff --git a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
index 9213513e80..a6235491ca 100644
--- a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
@@ -29,7 +29,7 @@ private[spark] case class NarrowCoGroupSplitDep(
private[spark] case class ShuffleCoGroupSplitDep(shuffleId: Int) extends CoGroupSplitDep
private[spark]
-class CoGroupPartition(idx: Int, val deps: Seq[CoGroupSplitDep])
+class CoGroupPartition(idx: Int, val deps: Array[CoGroupSplitDep])
extends Partition with Serializable {
override val index: Int = idx
override def hashCode(): Int = idx
@@ -88,7 +88,7 @@ class CoGroupedRDD[K](
case _ =>
new NarrowCoGroupSplitDep(rdd, i, rdd.partitions(i))
}
- }.toList)
+ }.toArray)
}
array
}
diff --git a/core/src/main/scala/spark/rdd/ShuffledRDD.scala b/core/src/main/scala/spark/rdd/ShuffledRDD.scala
index 51f02409b6..4e33b7dd5c 100644
--- a/core/src/main/scala/spark/rdd/ShuffledRDD.scala
+++ b/core/src/main/scala/spark/rdd/ShuffledRDD.scala
@@ -16,7 +16,7 @@ private[spark] class ShuffledRDDPartition(val idx: Int) extends Partition {
* @tparam V the value class.
*/
class ShuffledRDD[K, V](
- prev: RDD[(K, V)],
+ @transient prev: RDD[(K, V)],
part: Partitioner)
extends RDD[(K, V)](prev.context, List(new ShuffleDependency(prev, part))) {
diff --git a/core/src/main/scala/spark/rdd/SubtractedRDD.scala b/core/src/main/scala/spark/rdd/SubtractedRDD.scala
index 0a02561062..481e03b349 100644
--- a/core/src/main/scala/spark/rdd/SubtractedRDD.scala
+++ b/core/src/main/scala/spark/rdd/SubtractedRDD.scala
@@ -56,7 +56,7 @@ private[spark] class SubtractedRDD[K: ClassManifest, V: ClassManifest, W: ClassM
case _ =>
new NarrowCoGroupSplitDep(rdd, i, rdd.partitions(i))
}
- }.toList)
+ }.toArray)
}
array
}
diff --git a/core/src/main/scala/spark/rdd/ZippedRDD.scala b/core/src/main/scala/spark/rdd/ZippedRDD.scala
index e80ec17aa5..35b0e06785 100644
--- a/core/src/main/scala/spark/rdd/ZippedRDD.scala
+++ b/core/src/main/scala/spark/rdd/ZippedRDD.scala
@@ -10,17 +10,17 @@ private[spark] class ZippedPartition[T: ClassManifest, U: ClassManifest](
@transient rdd2: RDD[U]
) extends Partition {
- var split1 = rdd1.partitions(idx)
- var split2 = rdd1.partitions(idx)
+ var partition1 = rdd1.partitions(idx)
+ var partition2 = rdd2.partitions(idx)
override val index: Int = idx
- def splits = (split1, split2)
+ def partitions = (partition1, partition2)
@throws(classOf[IOException])
private def writeObject(oos: ObjectOutputStream) {
- // Update the reference to parent split at the time of task serialization
- split1 = rdd1.partitions(idx)
- split2 = rdd2.partitions(idx)
+ // Update the reference to parent partition at the time of task serialization
+ partition1 = rdd1.partitions(idx)
+ partition2 = rdd2.partitions(idx)
oos.defaultWriteObject()
}
}
@@ -43,13 +43,13 @@ class ZippedRDD[T: ClassManifest, U: ClassManifest](
}
override def compute(s: Partition, context: TaskContext): Iterator[(T, U)] = {
- val (split1, split2) = s.asInstanceOf[ZippedPartition[T, U]].splits
- rdd1.iterator(split1, context).zip(rdd2.iterator(split2, context))
+ val (partition1, partition2) = s.asInstanceOf[ZippedPartition[T, U]].partitions
+ rdd1.iterator(partition1, context).zip(rdd2.iterator(partition2, context))
}
override def getPreferredLocations(s: Partition): Seq[String] = {
- val (split1, split2) = s.asInstanceOf[ZippedPartition[T, U]].splits
- rdd1.preferredLocations(split1).intersect(rdd2.preferredLocations(split2))
+ val (partition1, partition2) = s.asInstanceOf[ZippedPartition[T, U]].partitions
+ rdd1.preferredLocations(partition1).intersect(rdd2.preferredLocations(partition2))
}
override def clearDependencies() {
diff --git a/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala
index 3ce1e6e257..9b64f95df8 100644
--- a/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala
+++ b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala
@@ -121,7 +121,8 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
val toRemove = new HashSet[BlockManagerId]
for (info <- blockManagerInfo.values) {
if (info.lastSeenMs < minSeenTime) {
- logWarning("Removing BlockManager " + info.blockManagerId + " with no recent heart beats")
+ logWarning("Removing BlockManager " + info.blockManagerId + " with no recent heart beats: " +
+ (now - info.lastSeenMs) + "ms exceeds " + slaveTimeout + "ms")
toRemove += info.blockManagerId
}
}