From 71d4ed271bcbddb154643bd44297ed77190e75cf Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Tue, 25 Mar 2014 10:21:25 -0700 Subject: SPARK-1316. Remove use of Commons IO (This follows from a side point on SPARK-1133, in discussion of the PR: https://github.com/apache/spark/pull/164 ) Commons IO is barely used in the project, and can easily be replaced with equivalent calls to Guava or the existing Spark `Utils.scala` class. Removing a dependency feels good, and this one in particular can get a little problematic since Hadoop uses it too. Author: Sean Owen Closes #226 from srowen/SPARK-1316 and squashes the following commits: 21efef3 [Sean Owen] Remove use of Commons IO --- core/pom.xml | 5 ---- .../main/scala/org/apache/spark/util/Utils.scala | 5 +++- .../scala/org/apache/spark/util/UtilsSuite.scala | 3 +-- pom.xml | 5 ---- project/SparkBuild.scala | 8 ++----- streaming/pom.xml | 4 ---- .../spark/streaming/util/MasterFailureTest.scala | 6 ++--- .../apache/spark/streaming/CheckpointSuite.scala | 13 +++++----- .../org/apache/spark/streaming/FailureSuite.scala | 11 +++------ .../apache/spark/streaming/InputStreamsSuite.scala | 28 ++++++++++++---------- 10 files changed, 34 insertions(+), 54 deletions(-) diff --git a/core/pom.xml b/core/pom.xml index a6f478b09b..eb6cc4d310 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -200,11 +200,6 @@ derby test - - commons-io - commons-io - test - org.scalatest scalatest_${scala.binary.version} diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 13d9dbdd9a..ad87fda140 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -529,7 +529,10 @@ private[spark] object Utils extends Logging { } } if (!file.delete()) { - throw new IOException("Failed to delete: " + file) + // Delete can also fail if the file simply did not exist + if (file.exists()) { + throw new IOException("Failed to delete: " + file.getAbsolutePath) + } } } diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index 8f55b2372c..eb8f591560 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -24,7 +24,6 @@ import java.nio.{ByteBuffer, ByteOrder} import com.google.common.base.Charsets import com.google.common.io.Files -import org.apache.commons.io.FileUtils import org.scalatest.FunSuite class UtilsSuite extends FunSuite { @@ -136,7 +135,7 @@ class UtilsSuite extends FunSuite { // Read some nonexistent bytes on both ends assert(Utils.offsetBytes(f1Path, -3, 25) === "1\n2\n3\n4\n5\n6\n7\n8\n9\n") - FileUtils.deleteDirectory(tmpDir2) + Utils.deleteRecursively(tmpDir2) } test("deserialize long value") { diff --git a/pom.xml b/pom.xml index fa72d5f263..deb89b18ad 100644 --- a/pom.xml +++ b/pom.xml @@ -435,11 +435,6 @@ 1.9.1 test - - commons-io - commons-io - 2.4 - org.easymock easymock diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 21d2779d85..60f14ba37e 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -259,8 +259,7 @@ object SparkBuild extends Build { "org.scalacheck" %% "scalacheck" % "1.10.0" % "test", "com.novocode" % "junit-interface" % "0.10" % "test", "org.easymock" % "easymock" % "3.1" % "test", - "org.mockito" % "mockito-all" % "1.8.5" % "test", - "commons-io" % "commons-io" % "2.4" % "test" + "org.mockito" % "mockito-all" % "1.8.5" % "test" ), testOptions += Tests.Argument(TestFrameworks.JUnit, "-v", "-a"), @@ -439,10 +438,7 @@ object SparkBuild extends Build { def streamingSettings = sharedSettings ++ Seq( name := "spark-streaming", - previousArtifact := sparkPreviousArtifact("spark-streaming"), - libraryDependencies ++= Seq( - "commons-io" % "commons-io" % "2.4" - ) + previousArtifact := sparkPreviousArtifact("spark-streaming") ) def yarnCommonSettings = sharedSettings ++ Seq( diff --git a/streaming/pom.xml b/streaming/pom.xml index 2cb8bde664..1953cc6883 100644 --- a/streaming/pom.xml +++ b/streaming/pom.xml @@ -74,10 +74,6 @@ junit-interface test - - commons-io - commons-io - target/scala-${scala.binary.version}/classes diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala index 2bb616cfb8..c48a38590e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala @@ -28,12 +28,12 @@ import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer} import scala.reflect.ClassTag import java.io.{File, ObjectInputStream, IOException} +import java.nio.charset.Charset import java.util.UUID import com.google.common.io.Files -import org.apache.commons.io.FileUtils -import org.apache.hadoop.fs.{FileUtil, FileSystem, Path} +import org.apache.hadoop.fs.Path import org.apache.hadoop.conf.Configuration @@ -389,7 +389,7 @@ class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long) val localFile = new File(localTestDir, (i + 1).toString) val hadoopFile = new Path(testDir, (i + 1).toString) val tempHadoopFile = new Path(testDir, ".tmp_" + (i + 1).toString) - FileUtils.writeStringToFile(localFile, input(i).toString + "\n") + Files.write(input(i) + "\n", localFile, Charset.forName("UTF-8")) var tries = 0 var done = false while (!done && tries < maxTries) { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index 831e7c1471..0784e562ac 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -18,10 +18,10 @@ package org.apache.spark.streaming import java.io.File +import java.nio.charset.Charset import scala.collection.mutable.ArrayBuffer import scala.reflect.ClassTag -import org.apache.commons.io.FileUtils import com.google.common.io.Files import org.apache.hadoop.fs.{Path, FileSystem} import org.apache.hadoop.conf.Configuration @@ -29,7 +29,6 @@ import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.dstream.{DStream, FileInputDStream} import org.apache.spark.streaming.util.ManualClock import org.apache.spark.util.Utils -import org.apache.spark.SparkConf /** * This test suites tests the checkpointing functionality of DStreams - @@ -46,13 +45,13 @@ class CheckpointSuite extends TestSuiteBase { override def beforeFunction() { super.beforeFunction() - FileUtils.deleteDirectory(new File(checkpointDir)) + Utils.deleteRecursively(new File(checkpointDir)) } override def afterFunction() { super.afterFunction() if (ssc != null) ssc.stop() - FileUtils.deleteDirectory(new File(checkpointDir)) + Utils.deleteRecursively(new File(checkpointDir)) } test("basic rdd checkpoints + dstream graph checkpoint recovery") { @@ -256,7 +255,7 @@ class CheckpointSuite extends TestSuiteBase { //var clock = ssc.scheduler.clock.asInstanceOf[ManualClock] Thread.sleep(1000) for (i <- Seq(1, 2, 3)) { - FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n") + Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8")) // wait to make sure that the file is written such that it gets shown in the file listings Thread.sleep(1000) } @@ -273,7 +272,7 @@ class CheckpointSuite extends TestSuiteBase { // Create files while the master is down for (i <- Seq(4, 5, 6)) { - FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n") + Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8")) Thread.sleep(1000) } @@ -289,7 +288,7 @@ class CheckpointSuite extends TestSuiteBase { // Restart stream computation ssc.start() for (i <- Seq(7, 8, 9)) { - FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n") + Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8")) Thread.sleep(1000) } Thread.sleep(1000) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala index da9b04de1a..92e1b76d28 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala @@ -19,14 +19,9 @@ package org.apache.spark.streaming import org.apache.spark.Logging import org.apache.spark.streaming.util.MasterFailureTest -import StreamingContext._ +import org.apache.spark.util.Utils -import org.scalatest.{FunSuite, BeforeAndAfter} -import com.google.common.io.Files import java.io.File -import org.apache.commons.io.FileUtils -import collection.mutable.ArrayBuffer - /** * This testsuite tests master failures at random times while the stream is running using @@ -43,12 +38,12 @@ class FailureSuite extends TestSuiteBase with Logging { override def beforeFunction() { super.beforeFunction() - FileUtils.deleteDirectory(new File(directory)) + Utils.deleteRecursively(new File(directory)) } override def afterFunction() { super.afterFunction() - FileUtils.deleteDirectory(new File(directory)) + Utils.deleteRecursively(new File(directory)) } test("multiple failures with map") { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala index 95bf40ba75..74e73ebb34 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala @@ -23,21 +23,23 @@ import akka.actor.IOManager import akka.actor.Props import akka.util.ByteString -import org.apache.spark.streaming.dstream.{NetworkReceiver} -import java.net.{InetSocketAddress, SocketException, Socket, ServerSocket} import java.io.{File, BufferedWriter, OutputStreamWriter} +import java.net.{InetSocketAddress, SocketException, ServerSocket} +import java.nio.charset.Charset import java.util.concurrent.{Executors, TimeUnit, ArrayBlockingQueue} -import collection.mutable.{SynchronizedBuffer, ArrayBuffer} -import util.ManualClock +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer} + +import com.google.common.io.Files +import org.scalatest.BeforeAndAfter + +import org.apache.spark.Logging import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.dstream.NetworkReceiver import org.apache.spark.streaming.receivers.Receiver -import org.apache.spark.Logging -import scala.util.Random -import org.apache.commons.io.FileUtils -import org.scalatest.BeforeAndAfter -import collection.JavaConversions._ -import com.google.common.io.Files -import java.util.concurrent.atomic.AtomicInteger +import org.apache.spark.streaming.util.ManualClock +import org.apache.spark.util.Utils class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { @@ -112,7 +114,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { Thread.sleep(1000) for (i <- 0 until input.size) { val file = new File(testDir, i.toString) - FileUtils.writeStringToFile(file, input(i).toString + "\n") + Files.write(input(i) + "\n", file, Charset.forName("UTF-8")) logInfo("Created file " + file) Thread.sleep(batchDuration.milliseconds) Thread.sleep(1000) @@ -136,7 +138,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { // (whether the elements were received one in each interval is not verified) assert(output.toList === expectedOutput.toList) - FileUtils.deleteDirectory(testDir) + Utils.deleteRecursively(testDir) // Enable manual clock back again for other tests conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock") -- cgit v1.2.3