From 71d4ed271bcbddb154643bd44297ed77190e75cf Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Tue, 25 Mar 2014 10:21:25 -0700 Subject: SPARK-1316. Remove use of Commons IO (This follows from a side point on SPARK-1133, in discussion of the PR: https://github.com/apache/spark/pull/164 ) Commons IO is barely used in the project, and can easily be replaced with equivalent calls to Guava or the existing Spark `Utils.scala` class. Removing a dependency feels good, and this one in particular can get a little problematic since Hadoop uses it too. Author: Sean Owen Closes #226 from srowen/SPARK-1316 and squashes the following commits: 21efef3 [Sean Owen] Remove use of Commons IO --- streaming/pom.xml | 4 ---- .../spark/streaming/util/MasterFailureTest.scala | 6 ++--- .../apache/spark/streaming/CheckpointSuite.scala | 13 +++++----- .../org/apache/spark/streaming/FailureSuite.scala | 11 +++------ .../apache/spark/streaming/InputStreamsSuite.scala | 28 ++++++++++++---------- 5 files changed, 27 insertions(+), 35 deletions(-) (limited to 'streaming') diff --git a/streaming/pom.xml b/streaming/pom.xml index 2cb8bde664..1953cc6883 100644 --- a/streaming/pom.xml +++ b/streaming/pom.xml @@ -74,10 +74,6 @@ junit-interface test - - commons-io - commons-io - target/scala-${scala.binary.version}/classes diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala index 2bb616cfb8..c48a38590e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala @@ -28,12 +28,12 @@ import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer} import scala.reflect.ClassTag import java.io.{File, ObjectInputStream, IOException} +import java.nio.charset.Charset import java.util.UUID import com.google.common.io.Files -import org.apache.commons.io.FileUtils -import org.apache.hadoop.fs.{FileUtil, FileSystem, Path} +import org.apache.hadoop.fs.Path import org.apache.hadoop.conf.Configuration @@ -389,7 +389,7 @@ class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long) val localFile = new File(localTestDir, (i + 1).toString) val hadoopFile = new Path(testDir, (i + 1).toString) val tempHadoopFile = new Path(testDir, ".tmp_" + (i + 1).toString) - FileUtils.writeStringToFile(localFile, input(i).toString + "\n") + Files.write(input(i) + "\n", localFile, Charset.forName("UTF-8")) var tries = 0 var done = false while (!done && tries < maxTries) { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index 831e7c1471..0784e562ac 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -18,10 +18,10 @@ package org.apache.spark.streaming import java.io.File +import java.nio.charset.Charset import scala.collection.mutable.ArrayBuffer import scala.reflect.ClassTag -import org.apache.commons.io.FileUtils import com.google.common.io.Files import org.apache.hadoop.fs.{Path, FileSystem} import org.apache.hadoop.conf.Configuration @@ -29,7 +29,6 @@ import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.dstream.{DStream, FileInputDStream} import org.apache.spark.streaming.util.ManualClock import org.apache.spark.util.Utils -import org.apache.spark.SparkConf /** * This test suites tests the checkpointing functionality of DStreams - @@ -46,13 +45,13 @@ class CheckpointSuite extends TestSuiteBase { override def beforeFunction() { super.beforeFunction() - FileUtils.deleteDirectory(new File(checkpointDir)) + Utils.deleteRecursively(new File(checkpointDir)) } override def afterFunction() { super.afterFunction() if (ssc != null) ssc.stop() - FileUtils.deleteDirectory(new File(checkpointDir)) + Utils.deleteRecursively(new File(checkpointDir)) } test("basic rdd checkpoints + dstream graph checkpoint recovery") { @@ -256,7 +255,7 @@ class CheckpointSuite extends TestSuiteBase { //var clock = ssc.scheduler.clock.asInstanceOf[ManualClock] Thread.sleep(1000) for (i <- Seq(1, 2, 3)) { - FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n") + Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8")) // wait to make sure that the file is written such that it gets shown in the file listings Thread.sleep(1000) } @@ -273,7 +272,7 @@ class CheckpointSuite extends TestSuiteBase { // Create files while the master is down for (i <- Seq(4, 5, 6)) { - FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n") + Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8")) Thread.sleep(1000) } @@ -289,7 +288,7 @@ class CheckpointSuite extends TestSuiteBase { // Restart stream computation ssc.start() for (i <- Seq(7, 8, 9)) { - FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n") + Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8")) Thread.sleep(1000) } Thread.sleep(1000) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala index da9b04de1a..92e1b76d28 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala @@ -19,14 +19,9 @@ package org.apache.spark.streaming import org.apache.spark.Logging import org.apache.spark.streaming.util.MasterFailureTest -import StreamingContext._ +import org.apache.spark.util.Utils -import org.scalatest.{FunSuite, BeforeAndAfter} -import com.google.common.io.Files import java.io.File -import org.apache.commons.io.FileUtils -import collection.mutable.ArrayBuffer - /** * This testsuite tests master failures at random times while the stream is running using @@ -43,12 +38,12 @@ class FailureSuite extends TestSuiteBase with Logging { override def beforeFunction() { super.beforeFunction() - FileUtils.deleteDirectory(new File(directory)) + Utils.deleteRecursively(new File(directory)) } override def afterFunction() { super.afterFunction() - FileUtils.deleteDirectory(new File(directory)) + Utils.deleteRecursively(new File(directory)) } test("multiple failures with map") { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala index 95bf40ba75..74e73ebb34 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala @@ -23,21 +23,23 @@ import akka.actor.IOManager import akka.actor.Props import akka.util.ByteString -import org.apache.spark.streaming.dstream.{NetworkReceiver} -import java.net.{InetSocketAddress, SocketException, Socket, ServerSocket} import java.io.{File, BufferedWriter, OutputStreamWriter} +import java.net.{InetSocketAddress, SocketException, ServerSocket} +import java.nio.charset.Charset import java.util.concurrent.{Executors, TimeUnit, ArrayBlockingQueue} -import collection.mutable.{SynchronizedBuffer, ArrayBuffer} -import util.ManualClock +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer} + +import com.google.common.io.Files +import org.scalatest.BeforeAndAfter + +import org.apache.spark.Logging import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.dstream.NetworkReceiver import org.apache.spark.streaming.receivers.Receiver -import org.apache.spark.Logging -import scala.util.Random -import org.apache.commons.io.FileUtils -import org.scalatest.BeforeAndAfter -import collection.JavaConversions._ -import com.google.common.io.Files -import java.util.concurrent.atomic.AtomicInteger +import org.apache.spark.streaming.util.ManualClock +import org.apache.spark.util.Utils class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { @@ -112,7 +114,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { Thread.sleep(1000) for (i <- 0 until input.size) { val file = new File(testDir, i.toString) - FileUtils.writeStringToFile(file, input(i).toString + "\n") + Files.write(input(i) + "\n", file, Charset.forName("UTF-8")) logInfo("Created file " + file) Thread.sleep(batchDuration.milliseconds) Thread.sleep(1000) @@ -136,7 +138,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { // (whether the elements were received one in each interval is not verified) assert(output.toList === expectedOutput.toList) - FileUtils.deleteDirectory(testDir) + Utils.deleteRecursively(testDir) // Enable manual clock back again for other tests conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock") -- cgit v1.2.3