aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2014-03-25 10:21:25 -0700
committerReynold Xin <rxin@apache.org>2014-03-25 10:21:25 -0700
commit71d4ed271bcbddb154643bd44297ed77190e75cf (patch)
treea6618c610ce4d9001ca8e5b08d4811e3105ecfc3 /streaming
parent134ace7fea7f772f5bafa9d11b8677cb7d311266 (diff)
downloadspark-71d4ed271bcbddb154643bd44297ed77190e75cf.tar.gz
spark-71d4ed271bcbddb154643bd44297ed77190e75cf.tar.bz2
spark-71d4ed271bcbddb154643bd44297ed77190e75cf.zip
SPARK-1316. Remove use of Commons IO
(This follows from a side point on SPARK-1133, in discussion of the PR: https://github.com/apache/spark/pull/164 ) Commons IO is barely used in the project, and can easily be replaced with equivalent calls to Guava or the existing Spark `Utils.scala` class. Removing a dependency feels good, and this one in particular can get a little problematic since Hadoop uses it too. Author: Sean Owen <sowen@cloudera.com> Closes #226 from srowen/SPARK-1316 and squashes the following commits: 21efef3 [Sean Owen] Remove use of Commons IO
Diffstat (limited to 'streaming')
-rw-r--r--streaming/pom.xml4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala6
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala13
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala11
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala28
5 files changed, 27 insertions, 35 deletions
diff --git a/streaming/pom.xml b/streaming/pom.xml
index 2cb8bde664..1953cc6883 100644
--- a/streaming/pom.xml
+++ b/streaming/pom.xml
@@ -74,10 +74,6 @@
<artifactId>junit-interface</artifactId>
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>commons-io</groupId>
- <artifactId>commons-io</artifactId>
- </dependency>
</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala
index 2bb616cfb8..c48a38590e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/util/MasterFailureTest.scala
@@ -28,12 +28,12 @@ import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
import scala.reflect.ClassTag
import java.io.{File, ObjectInputStream, IOException}
+import java.nio.charset.Charset
import java.util.UUID
import com.google.common.io.Files
-import org.apache.commons.io.FileUtils
-import org.apache.hadoop.fs.{FileUtil, FileSystem, Path}
+import org.apache.hadoop.fs.Path
import org.apache.hadoop.conf.Configuration
@@ -389,7 +389,7 @@ class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long)
val localFile = new File(localTestDir, (i + 1).toString)
val hadoopFile = new Path(testDir, (i + 1).toString)
val tempHadoopFile = new Path(testDir, ".tmp_" + (i + 1).toString)
- FileUtils.writeStringToFile(localFile, input(i).toString + "\n")
+ Files.write(input(i) + "\n", localFile, Charset.forName("UTF-8"))
var tries = 0
var done = false
while (!done && tries < maxTries) {
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index 831e7c1471..0784e562ac 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -18,10 +18,10 @@
package org.apache.spark.streaming
import java.io.File
+import java.nio.charset.Charset
import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag
-import org.apache.commons.io.FileUtils
import com.google.common.io.Files
import org.apache.hadoop.fs.{Path, FileSystem}
import org.apache.hadoop.conf.Configuration
@@ -29,7 +29,6 @@ import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.dstream.{DStream, FileInputDStream}
import org.apache.spark.streaming.util.ManualClock
import org.apache.spark.util.Utils
-import org.apache.spark.SparkConf
/**
* This test suites tests the checkpointing functionality of DStreams -
@@ -46,13 +45,13 @@ class CheckpointSuite extends TestSuiteBase {
override def beforeFunction() {
super.beforeFunction()
- FileUtils.deleteDirectory(new File(checkpointDir))
+ Utils.deleteRecursively(new File(checkpointDir))
}
override def afterFunction() {
super.afterFunction()
if (ssc != null) ssc.stop()
- FileUtils.deleteDirectory(new File(checkpointDir))
+ Utils.deleteRecursively(new File(checkpointDir))
}
test("basic rdd checkpoints + dstream graph checkpoint recovery") {
@@ -256,7 +255,7 @@ class CheckpointSuite extends TestSuiteBase {
//var clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
Thread.sleep(1000)
for (i <- Seq(1, 2, 3)) {
- FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n")
+ Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8"))
// wait to make sure that the file is written such that it gets shown in the file listings
Thread.sleep(1000)
}
@@ -273,7 +272,7 @@ class CheckpointSuite extends TestSuiteBase {
// Create files while the master is down
for (i <- Seq(4, 5, 6)) {
- FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n")
+ Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8"))
Thread.sleep(1000)
}
@@ -289,7 +288,7 @@ class CheckpointSuite extends TestSuiteBase {
// Restart stream computation
ssc.start()
for (i <- Seq(7, 8, 9)) {
- FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n")
+ Files.write(i + "\n", new File(testDir, i.toString), Charset.forName("UTF-8"))
Thread.sleep(1000)
}
Thread.sleep(1000)
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala
index da9b04de1a..92e1b76d28 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/FailureSuite.scala
@@ -19,14 +19,9 @@ package org.apache.spark.streaming
import org.apache.spark.Logging
import org.apache.spark.streaming.util.MasterFailureTest
-import StreamingContext._
+import org.apache.spark.util.Utils
-import org.scalatest.{FunSuite, BeforeAndAfter}
-import com.google.common.io.Files
import java.io.File
-import org.apache.commons.io.FileUtils
-import collection.mutable.ArrayBuffer
-
/**
* This testsuite tests master failures at random times while the stream is running using
@@ -43,12 +38,12 @@ class FailureSuite extends TestSuiteBase with Logging {
override def beforeFunction() {
super.beforeFunction()
- FileUtils.deleteDirectory(new File(directory))
+ Utils.deleteRecursively(new File(directory))
}
override def afterFunction() {
super.afterFunction()
- FileUtils.deleteDirectory(new File(directory))
+ Utils.deleteRecursively(new File(directory))
}
test("multiple failures with map") {
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index 95bf40ba75..74e73ebb34 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -23,21 +23,23 @@ import akka.actor.IOManager
import akka.actor.Props
import akka.util.ByteString
-import org.apache.spark.streaming.dstream.{NetworkReceiver}
-import java.net.{InetSocketAddress, SocketException, Socket, ServerSocket}
import java.io.{File, BufferedWriter, OutputStreamWriter}
+import java.net.{InetSocketAddress, SocketException, ServerSocket}
+import java.nio.charset.Charset
import java.util.concurrent.{Executors, TimeUnit, ArrayBlockingQueue}
-import collection.mutable.{SynchronizedBuffer, ArrayBuffer}
-import util.ManualClock
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
+
+import com.google.common.io.Files
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.Logging
import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.dstream.NetworkReceiver
import org.apache.spark.streaming.receivers.Receiver
-import org.apache.spark.Logging
-import scala.util.Random
-import org.apache.commons.io.FileUtils
-import org.scalatest.BeforeAndAfter
-import collection.JavaConversions._
-import com.google.common.io.Files
-import java.util.concurrent.atomic.AtomicInteger
+import org.apache.spark.streaming.util.ManualClock
+import org.apache.spark.util.Utils
class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
@@ -112,7 +114,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
Thread.sleep(1000)
for (i <- 0 until input.size) {
val file = new File(testDir, i.toString)
- FileUtils.writeStringToFile(file, input(i).toString + "\n")
+ Files.write(input(i) + "\n", file, Charset.forName("UTF-8"))
logInfo("Created file " + file)
Thread.sleep(batchDuration.milliseconds)
Thread.sleep(1000)
@@ -136,7 +138,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
// (whether the elements were received one in each interval is not verified)
assert(output.toList === expectedOutput.toList)
- FileUtils.deleteDirectory(testDir)
+ Utils.deleteRecursively(testDir)
// Enable manual clock back again for other tests
conf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")