aboutsummaryrefslogtreecommitdiff
path: root/core/src/main
diff options
context:
space:
mode:
authorImran Rashid <irashid@cloudera.com>2017-03-06 14:06:11 -0600
committerImran Rashid <irashid@cloudera.com>2017-03-06 14:06:11 -0600
commit12bf832407eaaed90d7c599522457cb36b303b6c (patch)
treebe8f9bcb9ed8355a36e08391ea57c5b8d457d15a /core/src/main
parent096df6d933c5326e5782aa8c5de842a0800eb369 (diff)
downloadspark-12bf832407eaaed90d7c599522457cb36b303b6c.tar.gz
spark-12bf832407eaaed90d7c599522457cb36b303b6c.tar.bz2
spark-12bf832407eaaed90d7c599522457cb36b303b6c.zip
[SPARK-19796][CORE] Fix serialization of long property values in TaskDescription
## What changes were proposed in this pull request? The properties that are serialized with a TaskDescription can have very long values (eg. "spark.job.description" which is set to the full sql statement with the thrift-server). DataOutputStream.writeUTF() does not work well for long strings, so this changes the way those values are serialized to handle longer strings. ## How was this patch tested? Updated existing unit test to reproduce the issue. All unit tests via jenkins. Author: Imran Rashid <irashid@cloudera.com> Closes #17140 from squito/SPARK-19796.
Diffstat (limited to 'core/src/main')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala12
1 files changed, 10 insertions, 2 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala
index 78aa5c4001..c98b87148e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskDescription.scala
@@ -19,6 +19,7 @@ package org.apache.spark.scheduler
import java.io.{DataInputStream, DataOutputStream}
import java.nio.ByteBuffer
+import java.nio.charset.StandardCharsets
import java.util.Properties
import scala.collection.JavaConverters._
@@ -86,7 +87,10 @@ private[spark] object TaskDescription {
dataOut.writeInt(taskDescription.properties.size())
taskDescription.properties.asScala.foreach { case (key, value) =>
dataOut.writeUTF(key)
- dataOut.writeUTF(value)
+ // SPARK-19796 -- writeUTF doesn't work for long strings, which can happen for property values
+ val bytes = value.getBytes(StandardCharsets.UTF_8)
+ dataOut.writeInt(bytes.length)
+ dataOut.write(bytes)
}
// Write the task. The task is already serialized, so write it directly to the byte buffer.
@@ -124,7 +128,11 @@ private[spark] object TaskDescription {
val properties = new Properties()
val numProperties = dataIn.readInt()
for (i <- 0 until numProperties) {
- properties.setProperty(dataIn.readUTF(), dataIn.readUTF())
+ val key = dataIn.readUTF()
+ val valueLength = dataIn.readInt()
+ val valueBytes = new Array[Byte](valueLength)
+ dataIn.readFully(valueBytes)
+ properties.setProperty(key, new String(valueBytes, StandardCharsets.UTF_8))
}
// Create a sub-buffer for the serialized task into its own buffer (to be deserialized later).