diff options
Diffstat (limited to 'core')
16 files changed, 271 insertions, 25 deletions
diff --git a/core/src/main/scala/org/apache/spark/Accumulators.scala b/core/src/main/scala/org/apache/spark/Accumulators.scala index 2ba871a600..df01b2e942 100644 --- a/core/src/main/scala/org/apache/spark/Accumulators.scala +++ b/core/src/main/scala/org/apache/spark/Accumulators.scala @@ -17,17 +17,17 @@ package org.apache.spark -import java.io._ +import java.io.{ObjectInputStream, Serializable} import scala.collection.mutable.Map import scala.collection.generic.Growable import org.apache.spark.serializer.JavaSerializer /** - * A datatype that can be accumulated, ie has an commutative and associative "add" operation, + * A data type that can be accumulated, ie has an commutative and associative "add" operation, * but where the result type, `R`, may be different from the element type being added, `T`. * - * You must define how to add data, and how to merge two of these together. For some datatypes, + * You must define how to add data, and how to merge two of these together. For some data types, * such as a counter, these might be the same operation. In that case, you can use the simpler * [[org.apache.spark.Accumulator]]. They won't always be the same, though -- e.g., imagine you are * accumulating a set. You will add items to the set, and you will union two sets together. @@ -45,7 +45,7 @@ class Accumulable[R, T] ( val id = Accumulators.newId @transient private var value_ = initialValue // Current value on master val zero = param.zero(initialValue) // Zero value to be passed to workers - var deserialized = false + private var deserialized = false Accumulators.register(this, true) @@ -127,7 +127,7 @@ class Accumulable[R, T] ( /** * Helper object defining how to accumulate values of a particular type. An implicit - * AccumulableParam needs to be available when you create Accumulables of a specific type. + * AccumulableParam needs to be available when you create [[Accumulable]]s of a specific type. * * @tparam R the full accumulated data (result type) * @tparam T partial data that can be added in @@ -186,7 +186,29 @@ class GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Ser /** * A simpler value of [[Accumulable]] where the result type being accumulated is the same - * as the types of elements being merged. + * as the types of elements being merged, i.e. variables that are only "added" to through an + * associative operation and can therefore be efficiently supported in parallel. They can be used + * to implement counters (as in MapReduce) or sums. Spark natively supports accumulators of type + * `Int` and `Double`, and programmers can add support for new types. + * + * An accumulator is created from an initial value `v` by calling [[SparkContext#accumulator]]. + * Tasks running on the cluster can then add to it using the [[Accumulable#+=]] operator. + * However, they cannot read its value. Only the driver program can read the accumulator's value, + * using its value method. + * + * The interpreter session below shows an accumulator being used to add up the elements of an array: + * + * {{{ + * scala> val accum = sc.accumulator(0) + * accum: spark.Accumulator[Int] = 0 + * + * scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x) + * ... + * 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s + * + * scala> accum.value + * res2: Int = 10 + * }}} * * @param initialValue initial value of accumulator * @param param helper object defining how to add elements of type `T` @@ -196,9 +218,9 @@ class Accumulator[T](@transient initialValue: T, param: AccumulatorParam[T]) extends Accumulable[T,T](initialValue, param) /** - * A simpler version of [[org.apache.spark.AccumulableParam]] where the only datatype you can add in is the same type - * as the accumulated value. An implicit AccumulatorParam object needs to be available when you create - * Accumulators of a specific type. + * A simpler version of [[org.apache.spark.AccumulableParam]] where the only data type you can add + * in is the same type as the accumulated value. An implicit AccumulatorParam object needs to be + * available when you create Accumulators of a specific type. * * @tparam T type of value to accumulate */ diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala index bcf8ae5fb6..c4579cf6ad 100644 --- a/core/src/main/scala/org/apache/spark/Aggregator.scala +++ b/core/src/main/scala/org/apache/spark/Aggregator.scala @@ -17,6 +17,8 @@ package org.apache.spark +import scala.{Option, deprecated} + import org.apache.spark.util.collection.{AppendOnlyMap, ExternalAppendOnlyMap} /** @@ -33,8 +35,12 @@ case class Aggregator[K, V, C] ( private val externalSorting = SparkEnv.get.conf.getBoolean("spark.shuffle.spill", true) + @deprecated("use combineValuesByKey with TaskContext argument", "0.9.0") + def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]): Iterator[(K, C)] = + combineValuesByKey(iter, null) + def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]], - context: TaskContext) : Iterator[(K, C)] = { + context: TaskContext): Iterator[(K, C)] = { if (!externalSorting) { val combiners = new AppendOnlyMap[K,C] var kv: Product2[K, V] = null @@ -52,12 +58,17 @@ case class Aggregator[K, V, C] ( val (k, v) = iter.next() combiners.insert(k, v) } - context.taskMetrics.memoryBytesSpilled = combiners.memoryBytesSpilled - context.taskMetrics.diskBytesSpilled = combiners.diskBytesSpilled + // TODO: Make this non optional in a future release + Option(context).foreach(c => c.taskMetrics.memoryBytesSpilled = combiners.memoryBytesSpilled) + Option(context).foreach(c => c.taskMetrics.diskBytesSpilled = combiners.diskBytesSpilled) combiners.iterator } } + @deprecated("use combineCombinersByKey with TaskContext argument", "0.9.0") + def combineCombinersByKey(iter: Iterator[(K, C)]) : Iterator[(K, C)] = + combineCombinersByKey(iter, null) + def combineCombinersByKey(iter: Iterator[(K, C)], context: TaskContext) : Iterator[(K, C)] = { if (!externalSorting) { val combiners = new AppendOnlyMap[K,C] @@ -76,8 +87,9 @@ case class Aggregator[K, V, C] ( val (k, c) = iter.next() combiners.insert(k, c) } - context.taskMetrics.memoryBytesSpilled = combiners.memoryBytesSpilled - context.taskMetrics.diskBytesSpilled = combiners.diskBytesSpilled + // TODO: Make this non optional in a future release + Option(context).foreach(c => c.taskMetrics.memoryBytesSpilled = combiners.memoryBytesSpilled) + Option(context).foreach(c => c.taskMetrics.diskBytesSpilled = combiners.diskBytesSpilled) combiners.iterator } } diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index fc60cf6f71..369c6ce78f 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark import scala.collection.JavaConverters._ diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 55ac76bf63..ba3e91effb 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -340,8 +340,8 @@ class SparkContext( * Hadoop-supported file system URI, and return it as an RDD of Strings. */ def textFile(path: String, minSplits: Int = defaultMinSplits): RDD[String] = { - hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], minSplits) - .map(pair => pair._2.toString) + hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], + minSplits, cloneRecords = false).map(pair => pair._2.toString) } /** diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala index 7a6f044965..8041163e3d 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala @@ -34,11 +34,11 @@ import org.apache.spark.SparkContext.IntAccumulatorParam import org.apache.spark.SparkContext.DoubleAccumulatorParam import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD -import scala.Tuple2 + /** - * A Java-friendly version of [[org.apache.spark.SparkContext]] that returns [[org.apache.spark.api.java.JavaRDD]]s and - * works with Java collections instead of Scala ones. + * A Java-friendly version of [[org.apache.spark.SparkContext]] that returns + * [[org.apache.spark.api.java.JavaRDD]]s and works with Java collections instead of Scala ones. */ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWorkaround { /** @@ -333,8 +333,9 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork sc.accumulable(initialValue)(param) /** - * Broadcast a read-only variable to the cluster, returning a [[org.apache.spark.Broadcast]] object for - * reading it in distributed functions. The variable will be sent to each cluster only once. + * Broadcast a read-only variable to the cluster, returning a + * [[org.apache.spark.broadcast.Broadcast]] object for reading it in distributed functions. + * The variable will be sent to each cluster only once. */ def broadcast[T](value: T): Broadcast[T] = sc.broadcast(value) diff --git a/core/src/main/scala/org/apache/spark/api/java/package.scala b/core/src/main/scala/org/apache/spark/api/java/package.scala new file mode 100644 index 0000000000..8ec770046a --- /dev/null +++ b/core/src/main/scala/org/apache/spark/api/java/package.scala @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api + +/** Spark Java programming APIs. */ +package object java { + // For package docs only +} diff --git a/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala index 6bfe2cb4a2..d113d40405 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala @@ -17,13 +17,40 @@ package org.apache.spark.broadcast -import java.io._ +import java.io.Serializable import java.util.concurrent.atomic.AtomicLong import org.apache.spark._ -private[spark] -abstract class Broadcast[T](private[spark] val id: Long) extends Serializable { +/** + * A broadcast variable. Broadcast variables allow the programmer to keep a read-only variable + * cached on each machine rather than shipping a copy of it with tasks. They can be used, for + * example, to give every node a copy of a large input dataset in an efficient manner. Spark also + * attempts to distribute broadcast variables using efficient broadcast algorithms to reduce + * communication cost. + * + * Broadcast variables are created from a variable `v` by calling [[SparkContext#broadcast]]. + * The broadcast variable is a wrapper around `v`, and its value can be accessed by calling the + * `value` method. The interpreter session below shows this: + * + * {{{ + * scala> val broadcastVar = sc.broadcast(Array(1, 2, 3)) + * broadcastVar: spark.Broadcast[Array[Int]] = spark.Broadcast(b5c40191-a864-4c7d-b9bf-d87e1a4e787c) + * + * scala> broadcastVar.value + * res0: Array[Int] = Array(1, 2, 3) + * }}} + * + * After the broadcast variable is created, it should be used instead of the value `v` in any + * functions run on the cluster so that `v` is not shipped to the nodes more than once. + * In addition, the object `v` should not be modified after it is broadcast in order to ensure + * that all nodes get the same value of the broadcast variable (e.g. if the variable is shipped + * to a new node later). + * + * @param id A unique identifier for the broadcast variable. + * @tparam T Type of the data contained in the broadcast variable. + */ +abstract class Broadcast[T](val id: Long) extends Serializable { def value: T // We cannot have an abstract readObject here due to some weird issues with diff --git a/core/src/main/scala/org/apache/spark/broadcast/package.scala b/core/src/main/scala/org/apache/spark/broadcast/package.scala new file mode 100644 index 0000000000..01bf88629a --- /dev/null +++ b/core/src/main/scala/org/apache/spark/broadcast/package.scala @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +/** + * Package for broadcast variables. See [[broadcast.Broadcast]] for details. + */ +package object broadcast { + // For package docs only +} diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala index cf6a23339d..460883ec7a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.deploy.worker import java.io.{File, FileOutputStream, IOException, InputStream} diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala index 1640d5fee0..6f6c101547 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.deploy.worker import akka.actor._ diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala index 0e0d0cd626..1dc39c450e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.deploy.worker import akka.actor.{Actor, Address, AddressFromURIString} diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 6461deee32..ed53558566 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -81,7 +81,7 @@ private[spark] class BlockManager( // Whether to compress RDD partitions that are stored serialized val compressRdds = conf.getBoolean("spark.rdd.compress", false) // Whether to compress shuffle output temporarily spilled to disk - val compressShuffleSpill = conf.getBoolean("spark.shuffle.spill.compress", false) + val compressShuffleSpill = conf.getBoolean("spark.shuffle.spill.compress", true) val heartBeatFrequency = BlockManager.getHeartBeatFrequency(conf) diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala index ef5936dd2f..fa49974db4 100644 --- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark import org.scalatest.FunSuite diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala index 45dbcaffae..0c50261264 100644 --- a/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala +++ b/core/src/test/scala/org/apache/spark/deploy/worker/DriverRunnerTest.scala @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.deploy.worker import java.io.File diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerWatcherSuite.scala b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerWatcherSuite.scala index 94d88d307a..1f1d8d1380 100644 --- a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerWatcherSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerWatcherSuite.scala @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.deploy.worker diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala index c3391f3e53..bb4dc0fcd3 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.util.collection import scala.collection.mutable.ArrayBuffer |