aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2016-02-10 13:34:53 -0800
committerReynold Xin <rxin@databricks.com>2016-02-10 13:34:53 -0800
commit29c547303f886b96b74b411ac70f0fa81113f086 (patch)
treea0696ca78c2fb21e9d480c0704674f1d517e42fd
parent5947fa8fa1f95d8fc1537c1e37bc16bae8fe7988 (diff)
downloadspark-29c547303f886b96b74b411ac70f0fa81113f086.tar.gz
spark-29c547303f886b96b74b411ac70f0fa81113f086.tar.bz2
spark-29c547303f886b96b74b411ac70f0fa81113f086.zip
[SPARK-12414][CORE] Remove closure serializer
Remove spark.closure.serializer option and use JavaSerializer always CC andrewor14 rxin I see there's a discussion in the JIRA but just thought I'd offer this for a look at what the change would be. Author: Sean Owen <sowen@cloudera.com> Closes #11150 from srowen/SPARK-12414.
-rw-r--r--core/src/main/scala/org/apache/spark/SparkEnv.scala5
-rw-r--r--core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala3
-rw-r--r--docs/configuration.md7
-rw-r--r--docs/streaming-programming-guide.md2
4 files changed, 3 insertions, 14 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 9461afdc54..204f7356f7 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -35,7 +35,7 @@ import org.apache.spark.network.netty.NettyBlockTransferService
import org.apache.spark.rpc.{RpcEndpoint, RpcEndpointRef, RpcEnv}
import org.apache.spark.scheduler.{LiveListenerBus, OutputCommitCoordinator}
import org.apache.spark.scheduler.OutputCommitCoordinator.OutputCommitCoordinatorEndpoint
-import org.apache.spark.serializer.Serializer
+import org.apache.spark.serializer.{JavaSerializer, Serializer}
import org.apache.spark.shuffle.ShuffleManager
import org.apache.spark.storage._
import org.apache.spark.util.{RpcUtils, Utils}
@@ -277,8 +277,7 @@ object SparkEnv extends Logging {
"spark.serializer", "org.apache.spark.serializer.JavaSerializer")
logDebug(s"Using serializer: ${serializer.getClass}")
- val closureSerializer = instantiateClassFromConf[Serializer](
- "spark.closure.serializer", "org.apache.spark.serializer.JavaSerializer")
+ val closureSerializer = new JavaSerializer(conf)
def registerOrLookupEndpoint(
name: String, endpointCreator: => RpcEndpoint):
diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
index f869bcd708..27d063630b 100644
--- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala
@@ -282,8 +282,7 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
test("kryo with fold") {
val control = 1 :: 2 :: Nil
// zeroValue must not be a ClassWithoutNoArgConstructor instance because it will be
- // serialized by spark.closure.serializer but spark.closure.serializer only supports
- // the default Java serializer.
+ // serialized by the Java serializer.
val result = sc.parallelize(control, 2).map(new ClassWithoutNoArgConstructor(_))
.fold(null)((t1, t2) => {
val t1x = if (t1 == null) 0 else t1.x
diff --git a/docs/configuration.md b/docs/configuration.md
index b07c69cd4c..dd2cde8194 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -587,13 +587,6 @@ Apart from these, the following properties are also available, and may be useful
</td>
</tr>
<tr>
- <td><code>spark.closure.serializer</code></td>
- <td>org.apache.spark.serializer.<br />JavaSerializer</td>
- <td>
- Serializer class to use for closures. Currently only the Java serializer is supported.
- </td>
-</tr>
-<tr>
<td><code>spark.io.compression.codec</code></td>
<td>lz4</td>
<td>
diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md
index 7e681b67cf..677f5ff7be 100644
--- a/docs/streaming-programming-guide.md
+++ b/docs/streaming-programming-guide.md
@@ -2163,8 +2163,6 @@ If the number of tasks launched per second is high (say, 50 or more per second),
of sending out tasks to the slaves may be significant and will make it hard to achieve sub-second
latencies. The overhead can be reduced by the following changes:
-* **Task Serialization**: Using Kryo serialization for serializing tasks can reduce the task sizes, and therefore reduce the time taken to send them to the slaves. This is controlled by the ```spark.closure.serializer``` property. However, at this time, Kryo serialization cannot be enabled for closure serialization. This may be resolved in a future release.
-
* **Execution mode**: Running Spark in Standalone mode or coarse-grained Mesos mode leads to
better task launch times than the fine-grained Mesos mode. Please refer to the
[Running on Mesos guide](running-on-mesos.html) for more details.