diff options
author | Sean Owen <sowen@cloudera.com> | 2014-03-25 10:21:25 -0700 |
---|---|---|
committer | Reynold Xin <rxin@apache.org> | 2014-03-25 10:21:25 -0700 |
commit | 71d4ed271bcbddb154643bd44297ed77190e75cf (patch) | |
tree | a6618c610ce4d9001ca8e5b08d4811e3105ecfc3 /streaming/src/test | |
parent | 134ace7fea7f772f5bafa9d11b8677cb7d311266 (diff) | |
download | spark-71d4ed271bcbddb154643bd44297ed77190e75cf.tar.gz spark-71d4ed271bcbddb154643bd44297ed77190e75cf.tar.bz2 spark-71d4ed271bcbddb154643bd44297ed77190e75cf.zip |
SPARK-1316. Remove use of Commons IO
(This follows from a side point on SPARK-1133, in discussion of the PR: https://github.com/apache/spark/pull/164 )
Commons IO is barely used in the project, and can easily be replaced with equivalent calls to Guava or the existing Spark `Utils.scala` class.
Removing a dependency feels good, and this one in particular can get a little problematic since Hadoop uses it too.
Author: Sean Owen <sowen@cloudera.com>
Closes #226 from srowen/SPARK-1316 and squashes the following commits:
21efef3 [Sean Owen] Remove use of Commons IO
Diffstat (limited to 'streaming/src/test')
3 files changed, 24 insertions, 28 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index 831e7c1471..0784e562ac 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -18,10 +18,10 @@ package org.apache.spark.streaming import java.io.File +import java.nio.charset.Charset import scala.collection.mutable.ArrayBuffer import scala.reflect.ClassTag -import org.apache.commons.io.FileUtils import com.google.common.io.Files import org.apache.hadoop.fs.{Path, FileSystem} import org.apache.hadoop.conf.Configuration @@ -29,7 +29,6 @@ import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.dstream.{DStream, FileInputDStream} import org.apache.spark.streaming.util.ManualClock import org.apache.spark.util.Utils -import org.apache.spark.SparkConf /** * This test suites tests the checkpointing functionality of DStreams - @@ -46,13 +45,13 @@ class CheckpointSuite extends TestSuiteBase { override def beforeFunction() { super.beforeFunction() - FileUtils.deleteDirectory(new File(checkpointDir)) + Utils.deleteRecursively(new File(checkpointDir)) } override def afterFunction() { super.afterFunction() if (ssc != null) ssc.stop() - FileUtils.deleteDirectory(new File(checkpointDir)) + Utils.deleteRecursively(new File(checkpointDir)) } test("basic rdd checkpoints + dstream graph checkpoint recovery") { @@ -256,7 +255,7 @@ class CheckpointSuite extends TestSuiteBase { //var clock = ssc.scheduler.clock.asInstanceOf[ManualClock] Thread.sleep(1000) for (i <- Seq(1, 2, 3)) { - FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n") + Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8")) // wait to make sure that the file is written such that it gets shown in the file listings Thread.sleep(1000) } @@ -273,7 +272,7 @@ class CheckpointSuite extends TestSuiteBase { // Create files while the master is down for (i <- Seq(4, 5, 6)) { - FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n") + Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8")) Thread.sleep(1000) } @@ -289,7 +288,7 @@ class CheckpointSuite extends TestSuiteBase { // Restart stream computation ssc.start() for (i <- Seq(7, 8, 9)) { - FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n") + Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8")) Thread.sleep(1000) } Thread.sleep(1000) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala index da9b04de1a..92e1b76d28 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala @@ -19,14 +19,9 @@ package org.apache.spark.streaming import org.apache.spark.Logging import org.apache.spark.streaming.util.MasterFailureTest -import StreamingContext._ +import org.apache.spark.util.Utils -import org.scalatest.{FunSuite, BeforeAndAfter} -import com.google.common.io.Files import java.io.File -import org.apache.commons.io.FileUtils -import collection.mutable.ArrayBuffer - /** * This testsuite tests master failures at random times while the stream is running using @@ -43,12 +38,12 @@ class FailureSuite extends TestSuiteBase with Logging { override def beforeFunction() { super.beforeFunction() - FileUtils.deleteDirectory(new File(directory)) + Utils.deleteRecursively(new File(directory)) } override def afterFunction() { super.afterFunction() - FileUtils.deleteDirectory(new File(directory)) + Utils.deleteRecursively(new File(directory)) } test("multiple failures with map") { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala index 95bf40ba75..74e73ebb34 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala @@ -23,21 +23,23 @@ import akka.actor.IOManager import akka.actor.Props import akka.util.ByteString -import org.apache.spark.streaming.dstream.{NetworkReceiver} -import java.net.{InetSocketAddress, SocketException, Socket, ServerSocket} import java.io.{File, BufferedWriter, OutputStreamWriter} +import java.net.{InetSocketAddress, SocketException, ServerSocket} +import java.nio.charset.Charset import java.util.concurrent.{Executors, TimeUnit, ArrayBlockingQueue} -import collection.mutable.{SynchronizedBuffer, ArrayBuffer} -import util.ManualClock +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer} + +import com.google.common.io.Files +import org.scalatest.BeforeAndAfter + +import org.apache.spark.Logging import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.dstream.NetworkReceiver import org.apache.spark.streaming.receivers.Receiver -import org.apache.spark.Logging -import scala.util.Random -import org.apache.commons.io.FileUtils -import org.scalatest.BeforeAndAfter -import collection.JavaConversions._ -import com.google.common.io.Files -import java.util.concurrent.atomic.AtomicInteger +import org.apache.spark.streaming.util.ManualClock +import org.apache.spark.util.Utils class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { @@ -112,7 +114,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { Thread.sleep(1000) for (i <- 0 until input.size) { val file = new File(testDir, i.toString) - FileUtils.writeStringToFile(file, input(i).toString + "\n") + Files.write(input(i) + "\n", file, Charset.forName("UTF-8")) logInfo("Created file " + file) Thread.sleep(batchDuration.milliseconds) Thread.sleep(1000) @@ -136,7 +138,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { // (whether the elements were received one in each interval is not verified) assert(output.toList === expectedOutput.toList) - FileUtils.deleteDirectory(testDir) + Utils.deleteRecursively(testDir) // Enable manual clock back again for other tests conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock") |