aboutsummaryrefslogtreecommitdiff
path: root/external
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2015-12-04 17:02:04 -0800
committerMarcelo Vanzin <vanzin@cloudera.com>2015-12-04 17:02:04 -0800
commit3af53e61fd604fe8000e1fdf656d60b79c842d1c (patch)
treeb12b7a4cdb05361813fc81f93bbed924ba961822 /external
parentf30373f5ee60f9892c28771e34b208e4f1f675a6 (diff)
downloadspark-3af53e61fd604fe8000e1fdf656d60b79c842d1c.tar.gz
spark-3af53e61fd604fe8000e1fdf656d60b79c842d1c.tar.bz2
spark-3af53e61fd604fe8000e1fdf656d60b79c842d1c.zip
[SPARK-12084][CORE] Fix codes that uses ByteBuffer.array incorrectly
`ByteBuffer` doesn't guarantee all contents in `ByteBuffer.array` are valid. E.g, a ByteBuffer returned by `ByteBuffer.slice`. We should not use the whole content of `ByteBuffer` unless we know that's correct. This patch fixed all places that use `ByteBuffer.array` incorrectly. Author: Shixiong Zhu <shixiong@databricks.com> Closes #10083 from zsxwing/bytebuffer-array.
Diffstat (limited to 'external')
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala6
-rw-r--r--external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala4
-rw-r--r--external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala4
3 files changed, 7 insertions, 7 deletions
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
index c8780aa83b..2b9116eb3c 100644
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
@@ -93,9 +93,9 @@ class SparkFlumeEvent() extends Externalizable {
/* Serialize to bytes. */
def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
- val body = event.getBody.array()
- out.writeInt(body.length)
- out.write(body)
+ val body = event.getBody
+ out.writeInt(body.remaining())
+ Utils.writeByteBuffer(body, out)
val numHeaders = event.getHeaders.size()
out.writeInt(numHeaders)
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
index 5fd2711f5f..bb951a6ef1 100644
--- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
@@ -24,11 +24,11 @@ import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
import scala.concurrent.duration._
import scala.language.postfixOps
-import com.google.common.base.Charsets.UTF_8
import org.scalatest.BeforeAndAfter
import org.scalatest.concurrent.Eventually._
import org.apache.spark.{Logging, SparkConf, SparkFunSuite}
+import org.apache.spark.network.util.JavaUtils
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.{Seconds, TestOutputStream, StreamingContext}
@@ -119,7 +119,7 @@ class FlumePollingStreamSuite extends SparkFunSuite with BeforeAndAfter with Log
val headers = flattenOutputBuffer.map(_.event.getHeaders.asScala.map {
case (key, value) => (key.toString, value.toString)
}).map(_.asJava)
- val bodies = flattenOutputBuffer.map(e => new String(e.event.getBody.array(), UTF_8))
+ val bodies = flattenOutputBuffer.map(e => JavaUtils.bytesToString(e.event.getBody))
utils.assertOutput(headers.asJava, bodies.asJava)
}
} finally {
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
index f315e0a7ca..b29e591c07 100644
--- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
@@ -22,7 +22,6 @@ import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
import scala.concurrent.duration._
import scala.language.postfixOps
-import com.google.common.base.Charsets
import org.jboss.netty.channel.ChannelPipeline
import org.jboss.netty.channel.socket.SocketChannel
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
@@ -31,6 +30,7 @@ import org.scalatest.{BeforeAndAfter, Matchers}
import org.scalatest.concurrent.Eventually._
import org.apache.spark.{Logging, SparkConf, SparkFunSuite}
+import org.apache.spark.network.util.JavaUtils
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Milliseconds, StreamingContext, TestOutputStream}
@@ -63,7 +63,7 @@ class FlumeStreamSuite extends SparkFunSuite with BeforeAndAfter with Matchers w
event =>
event.getHeaders.get("test") should be("header")
}
- val output = outputEvents.map(event => new String(event.getBody.array(), Charsets.UTF_8))
+ val output = outputEvents.map(event => JavaUtils.bytesToString(event.getBody))
output should be (input)
}
} finally {