diff options
author | Hari Shreedharan <hshreedharan@apache.org> | 2015-02-09 14:17:14 -0800 |
---|---|---|
committer | Josh Rosen <joshrosen@databricks.com> | 2015-02-09 14:17:14 -0800 |
commit | 0765af9b21e9204c410c7a849c7201bc3eda8cc3 (patch) | |
tree | 795f56d1fc1a47ca3aaecb02f86641bef6edb8dd | |
parent | 6fe70d8432314f0b7290a66f114306f61e0a87cc (diff) | |
download | spark-0765af9b21e9204c410c7a849c7201bc3eda8cc3.tar.gz spark-0765af9b21e9204c410c7a849c7201bc3eda8cc3.tar.bz2 spark-0765af9b21e9204c410c7a849c7201bc3eda8cc3.zip |
[SPARK-4905][STREAMING] FlumeStreamSuite fix.
Using String constructor instead of CharsetDecoder to see if it fixes the issue of empty strings in Flume test output.
Author: Hari Shreedharan <hshreedharan@apache.org>
Closes #4371 from harishreedharan/Flume-stream-attempted-fix and squashes the following commits:
550d363 [Hari Shreedharan] Fix imports.
8695950 [Hari Shreedharan] Use Charsets.UTF_8 instead of "UTF-8" in String constructors.
af3ba14 [Hari Shreedharan] [SPARK-4905][STREAMING] FlumeStreamSuite fix.
-rw-r--r-- | external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala | 7 |
1 files changed, 3 insertions, 4 deletions
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala index f333e3891b..322de7bf2f 100644 --- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala +++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala @@ -19,13 +19,13 @@ package org.apache.spark.streaming.flume import java.net.{InetSocketAddress, ServerSocket} import java.nio.ByteBuffer -import java.nio.charset.Charset import scala.collection.JavaConversions._ import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer} import scala.concurrent.duration._ import scala.language.postfixOps +import com.google.common.base.Charsets import org.apache.avro.ipc.NettyTransceiver import org.apache.avro.ipc.specific.SpecificRequestor import org.apache.flume.source.avro @@ -108,7 +108,7 @@ class FlumeStreamSuite extends FunSuite with BeforeAndAfter with Matchers with L val inputEvents = input.map { item => val event = new AvroFlumeEvent - event.setBody(ByteBuffer.wrap(item.getBytes("UTF-8"))) + event.setBody(ByteBuffer.wrap(item.getBytes(Charsets.UTF_8))) event.setHeaders(Map[CharSequence, CharSequence]("test" -> "header")) event } @@ -138,14 +138,13 @@ class FlumeStreamSuite extends FunSuite with BeforeAndAfter with Matchers with L status should be (avro.Status.OK) } - val decoder = Charset.forName("UTF-8").newDecoder() eventually(timeout(10 seconds), interval(100 milliseconds)) { val outputEvents = outputBuffer.flatten.map { _.event } outputEvents.foreach { event => event.getHeaders.get("test") should be("header") } - val output = outputEvents.map(event => decoder.decode(event.getBody()).toString) + val output = outputEvents.map(event => new String(event.getBody.array(), Charsets.UTF_8)) output should be (input) } } |