aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2013-01-14 10:03:55 -0800
committerPatrick Wendell <pwendell@gmail.com>2013-01-14 10:03:55 -0800
commitd182a57cae6455804773db23d9498d2dcdd02172 (patch)
tree752672f24f4fa831043767d97512d9f729a3b22d /streaming
parenta292ed8d8af069ee1318cdf7c00d3db8d3ba8db9 (diff)
downloadspark-d182a57cae6455804773db23d9498d2dcdd02172.tar.gz
spark-d182a57cae6455804773db23d9498d2dcdd02172.tar.bz2
spark-d182a57cae6455804773db23d9498d2dcdd02172.zip
Two changes:
- Updating countByX() types based on bug fix - Porting new documentation to Java
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala10
-rw-r--r--streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala264
2 files changed, 269 insertions, 5 deletions
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala
index 80d8865725..4257ecd583 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala
@@ -1,7 +1,7 @@
package spark.streaming.api.java
import java.util.{List => JList}
-import java.lang.{Integer => JInt}
+import java.lang.{Long => JLong}
import scala.collection.JavaConversions._
@@ -17,8 +17,8 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable
def dstream: DStream[T]
- implicit def scalaIntToJavaInteger(in: DStream[Int]): JavaDStream[JInt] = {
- in.map(new JInt(_))
+ implicit def scalaIntToJavaLong(in: DStream[Long]): JavaDStream[JLong] = {
+ in.map(new JLong(_))
}
/**
@@ -31,14 +31,14 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable
* Returns a new DStream in which each RDD has a single element generated by counting each RDD
* of this DStream.
*/
- def count(): JavaDStream[JInt] = dstream.count()
+ def count(): JavaDStream[JLong] = dstream.count()
/**
* Returns a new DStream in which each RDD has a single element generated by counting the number
* of elements in a window over this DStream. windowDuration and slideDuration are as defined in the
* window() operation. This is equivalent to window(windowDuration, slideDuration).count()
*/
- def countByWindow(windowDuration: Duration, slideDuration: Duration) : JavaDStream[JInt] = {
+ def countByWindow(windowDuration: Duration, slideDuration: Duration) : JavaDStream[JLong] = {
dstream.countByWindow(windowDuration, slideDuration)
}
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala
index eeb1f07939..c761fdd3bd 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala
@@ -85,21 +85,66 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
// Methods only for PairDStream's
// =======================================================================
+ /**
+ * Creates a new DStream by applying `groupByKey` on each RDD of `this` DStream.
+ * Therefore, the values for each key in `this` DStream's RDDs are grouped into a
+ * single sequence to generate the RDDs of the new DStream. Hash partitioning is
+ * used to generate the RDDs with Spark's default number of partitions.
+ */
def groupByKey(): JavaPairDStream[K, JList[V]] =
dstream.groupByKey().mapValues(seqAsJavaList _)
+ /**
+ * Creates a new DStream by applying `groupByKey` on each RDD of `this` DStream.
+ * Therefore, the values for each key in `this` DStream's RDDs are grouped into a
+ * single sequence to generate the RDDs of the new DStream. Hash partitioning is
+ * used to generate the RDDs with `numPartitions` partitions.
+ */
def groupByKey(numPartitions: Int): JavaPairDStream[K, JList[V]] =
dstream.groupByKey(numPartitions).mapValues(seqAsJavaList _)
+ /**
+ * Creates a new DStream by applying `groupByKey` on each RDD of `this` DStream.
+ * Therefore, the values for each key in `this` DStream's RDDs are grouped into a
+ * single sequence to generate the RDDs of the new DStream. [[spark.Partitioner]]
+ * is used to control the partitioning of each RDD.
+ */
def groupByKey(partitioner: Partitioner): JavaPairDStream[K, JList[V]] =
dstream.groupByKey(partitioner).mapValues(seqAsJavaList _)
+ /**
+ * Creates a new DStream by applying `reduceByKey` on each RDD of `this` DStream.
+ * Therefore, the values for each key in `this` DStream's RDDs is merged using the
+ * associative reduce function to generate the RDDs of the new DStream.
+ * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
+ */
def reduceByKey(func: JFunction2[V, V, V]): JavaPairDStream[K, V] =
dstream.reduceByKey(func)
+ /**
+ * Creates a new DStream by applying `reduceByKey` on each RDD of `this` DStream.
+ * Therefore, the values for each key in `this` DStream's RDDs is merged using the
+ * associative reduce function to generate the RDDs of the new DStream.
+ * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
+ */
def reduceByKey(func: JFunction2[V, V, V], numPartitions: Int): JavaPairDStream[K, V] =
dstream.reduceByKey(func, numPartitions)
+ /**
+ * Creates a new DStream by applying `reduceByKey` on each RDD of `this` DStream.
+ * Therefore, the values for each key in `this` DStream's RDDs is merged using the
+ * associative reduce function to generate the RDDs of the new DStream.
+ * [[spark.Partitioner]] is used to control the partitioning of each RDD.
+ */
+ def reduceByKey(func: JFunction2[V, V, V], partitioner: Partitioner): JavaPairDStream[K, V] = {
+ dstream.reduceByKey(func, partitioner)
+ }
+
+ /**
+ * Generic function to combine elements of each key in DStream's RDDs using custom function.
+ * This is similar to the combineByKey for RDDs. Please refer to combineByKey in
+ * [[spark.PairRDDFunctions]] for more information.
+ */
def combineByKey[C](createCombiner: JFunction[V, C],
mergeValue: JFunction2[C, V, C],
mergeCombiners: JFunction2[C, C, C],
@@ -110,25 +155,78 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
dstream.combineByKey(createCombiner, mergeValue, mergeCombiners, partitioner)
}
+ /**
+ * Creates a new DStream by counting the number of values of each key in each RDD
+ * of `this` DStream. Hash partitioning is used to generate the RDDs.
+ */
def countByKey(numPartitions: Int): JavaPairDStream[K, JLong] = {
JavaPairDStream.scalaToJavaLong(dstream.countByKey(numPartitions));
}
+
+ /**
+ * Creates a new DStream by counting the number of values of each key in each RDD
+ * of `this` DStream. Hash partitioning is used to generate the RDDs with Spark's
+ * `numPartitions` partitions.
+ */
def countByKey(): JavaPairDStream[K, JLong] = {
JavaPairDStream.scalaToJavaLong(dstream.countByKey());
}
+ /**
+ * Creates a new DStream by applying `groupByKey` over a sliding window on `this` DStream.
+ * This is similar to `DStream.groupByKey()` but applies it over a sliding window.
+ * The new DStream generates RDDs with the same interval as this DStream.
+ * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ */
+ def groupByKeyAndWindow(windowDuration: Duration): JavaPairDStream[K, JList[V]] = {
+ dstream.groupByKeyAndWindow(windowDuration).mapValues(seqAsJavaList _)
+ }
+
+ /**
+ * Creates a new DStream by applying `groupByKey` over a sliding window on `this` DStream.
+ * This is similar to `DStream.groupByKey()` but applies it over a sliding window.
+ * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ */
def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration)
: JavaPairDStream[K, JList[V]] = {
dstream.groupByKeyAndWindow(windowDuration, slideDuration).mapValues(seqAsJavaList _)
}
+ /**
+ * Creates a new DStream by applying `groupByKey` over a sliding window on `this` DStream.
+ * This is similar to `DStream.groupByKey()` but applies it over a sliding window.
+ * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ * @param numPartitions Number of partitions of each RDD in the new DStream.
+ */
def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration, numPartitions: Int)
:JavaPairDStream[K, JList[V]] = {
dstream.groupByKeyAndWindow(windowDuration, slideDuration, numPartitions)
.mapValues(seqAsJavaList _)
}
+ /**
+ * Creates a new DStream by applying `groupByKey` over a sliding window on `this` DStream.
+ * This is similar to `DStream.groupByKey()` but applies it over a sliding window.
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream.
+ */
def groupByKeyAndWindow(
windowDuration: Duration,
slideDuration: Duration,
@@ -138,11 +236,31 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
.mapValues(seqAsJavaList _)
}
+ /**
+ * Creates a new DStream by applying `reduceByKey` over a sliding window on `this` DStream.
+ * This is similar to `DStream.reduceByKey()` but applies it over a sliding window.
+ * The new DStream generates RDDs with the same interval as this DStream.
+ * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
+ * @param reduceFunc associative reduce function
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ */
def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V], windowDuration: Duration)
:JavaPairDStream[K, V] = {
dstream.reduceByKeyAndWindow(reduceFunc, windowDuration)
}
+ /**
+ * Creates a new DStream by applying `reduceByKey` over a sliding window on `this` DStream.
+ * This is similar to `DStream.reduceByKey()` but applies it over a sliding window.
+ * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
+ * @param reduceFunc associative reduce function
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ */
def reduceByKeyAndWindow(
reduceFunc: Function2[V, V, V],
windowDuration: Duration,
@@ -151,6 +269,18 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
dstream.reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration)
}
+ /**
+ * Creates a new DStream by applying `reduceByKey` over a sliding window on `this` DStream.
+ * This is similar to `DStream.reduceByKey()` but applies it over a sliding window.
+ * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
+ * @param reduceFunc associative reduce function
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ * @param numPartitions Number of partitions of each RDD in the new DStream.
+ */
def reduceByKeyAndWindow(
reduceFunc: Function2[V, V, V],
windowDuration: Duration,
@@ -160,6 +290,17 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
dstream.reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, numPartitions)
}
+ /**
+ * Creates a new DStream by applying `reduceByKey` over a sliding window on `this` DStream.
+ * This is similar to `DStream.reduceByKey()` but applies it over a sliding window.
+ * @param reduceFunc associative reduce function
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream.
+ */
def reduceByKeyAndWindow(
reduceFunc: Function2[V, V, V],
windowDuration: Duration,
@@ -169,6 +310,24 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
dstream.reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, partitioner)
}
+
+ /**
+ * Creates a new DStream by reducing over a window in a smarter way.
+ * The reduced value of over a new window is calculated incrementally by using the
+ * old window's reduce value :
+ * 1. reduce the new values that entered the window (e.g., adding new counts)
+ * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
+ * This is more efficient that reduceByKeyAndWindow without "inverse reduce" function.
+ * However, it is applicable to only "invertible reduce functions".
+ * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
+ * @param reduceFunc associative reduce function
+ * @param invReduceFunc inverse function
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ */
def reduceByKeyAndWindow(
reduceFunc: Function2[V, V, V],
invReduceFunc: Function2[V, V, V],
@@ -178,6 +337,24 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
dstream.reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration)
}
+ /**
+ * Creates a new DStream by reducing over a window in a smarter way.
+ * The reduced value of over a new window is calculated incrementally by using the
+ * old window's reduce value :
+ * 1. reduce the new values that entered the window (e.g., adding new counts)
+ * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
+ * This is more efficient that reduceByKeyAndWindow without "inverse reduce" function.
+ * However, it is applicable to only "invertible reduce functions".
+ * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
+ * @param reduceFunc associative reduce function
+ * @param invReduceFunc inverse function
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ * @param numPartitions Number of partitions of each RDD in the new DStream.
+ */
def reduceByKeyAndWindow(
reduceFunc: Function2[V, V, V],
invReduceFunc: Function2[V, V, V],
@@ -193,6 +370,23 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
numPartitions)
}
+ /**
+ * Creates a new DStream by reducing over a window in a smarter way.
+ * The reduced value of over a new window is calculated incrementally by using the
+ * old window's reduce value :
+ * 1. reduce the new values that entered the window (e.g., adding new counts)
+ * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
+ * This is more efficient that reduceByKeyAndWindow without "inverse reduce" function.
+ * However, it is applicable to only "invertible reduce functions".
+ * @param reduceFunc associative reduce function
+ * @param invReduceFunc inverse function
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream.
+ */
def reduceByKeyAndWindow(
reduceFunc: Function2[V, V, V],
invReduceFunc: Function2[V, V, V],
@@ -208,16 +402,38 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
partitioner)
}
+ /**
+ * Creates a new DStream by counting the number of values for each key over a window.
+ * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ */
def countByKeyAndWindow(windowDuration: Duration, slideDuration: Duration)
: JavaPairDStream[K, JLong] = {
JavaPairDStream.scalaToJavaLong(dstream.countByKeyAndWindow(windowDuration, slideDuration))
}
+ /**
+ * Creates a new DStream by counting the number of values for each key over a window.
+ * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ * @param numPartitions Number of partitions of each RDD in the new DStream.
+ */
def countByKeyAndWindow(windowDuration: Duration, slideDuration: Duration, numPartitions: Int)
: JavaPairDStream[K, Long] = {
dstream.countByKeyAndWindow(windowDuration, slideDuration, numPartitions)
}
+
+ // TODO: Update State
+
def mapValues[U](f: JFunction[V, U]): JavaPairDStream[K, U] = {
implicit val cm: ClassManifest[U] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]]
@@ -232,12 +448,26 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
dstream.flatMapValues(fn)
}
+ /**
+ * Cogroups `this` DStream with `other` DStream. Each RDD of the new DStream will
+ * be generated by cogrouping RDDs from`this`and `other` DStreams. Therefore, for
+ * each key k in corresponding RDDs of `this` or `other` DStreams, the generated RDD
+ * will contains a tuple with the list of values for that key in both RDDs.
+ * HashPartitioner is used to partition each generated RDD into default number of partitions.
+ */
def cogroup[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (JList[V], JList[W])] = {
implicit val cm: ClassManifest[W] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
dstream.cogroup(other.dstream).mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2))))
}
+ /**
+ * Cogroups `this` DStream with `other` DStream. Each RDD of the new DStream will
+ * be generated by cogrouping RDDs from`this`and `other` DStreams. Therefore, for
+ * each key k in corresponding RDDs of `this` or `other` DStreams, the generated RDD
+ * will contains a tuple with the list of values for that key in both RDDs.
+ * Partitioner is used to partition each generated RDD.
+ */
def cogroup[W](other: JavaPairDStream[K, W], partitioner: Partitioner)
: JavaPairDStream[K, (JList[V], JList[W])] = {
implicit val cm: ClassManifest[W] =
@@ -246,12 +476,22 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
.mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2))))
}
+ /**
+ * Joins `this` DStream with `other` DStream. Each RDD of the new DStream will
+ * be generated by joining RDDs from `this` and `other` DStreams. HashPartitioner is used
+ * to partition each generated RDD into default number of partitions.
+ */
def join[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (V, W)] = {
implicit val cm: ClassManifest[W] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
dstream.join(other.dstream)
}
+ /**
+ * Joins `this` DStream with `other` DStream, that is, each RDD of the new DStream will
+ * be generated by joining RDDs from `this` and other DStream. Uses the given
+ * Partitioner to partition each generated RDD.
+ */
def join[W](other: JavaPairDStream[K, W], partitioner: Partitioner)
: JavaPairDStream[K, (V, W)] = {
implicit val cm: ClassManifest[W] =
@@ -259,10 +499,18 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
dstream.join(other.dstream, partitioner)
}
+ /**
+ * Saves each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is generated
+ * based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix"
+ */
def saveAsHadoopFiles[F <: OutputFormat[K, V]](prefix: String, suffix: String) {
dstream.saveAsHadoopFiles(prefix, suffix)
}
+ /**
+ * Saves each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is generated
+ * based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix"
+ */
def saveAsHadoopFiles(
prefix: String,
suffix: String,
@@ -272,6 +520,10 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
dstream.saveAsHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass)
}
+ /**
+ * Saves each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is generated
+ * based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix"
+ */
def saveAsHadoopFiles(
prefix: String,
suffix: String,
@@ -282,10 +534,18 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
dstream.saveAsHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass, conf)
}
+ /**
+ * Saves each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is generated
+ * based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
+ */
def saveAsNewAPIHadoopFiles[F <: NewOutputFormat[K, V]](prefix: String, suffix: String) {
dstream.saveAsNewAPIHadoopFiles(prefix, suffix)
}
+ /**
+ * Saves each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is generated
+ * based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
+ */
def saveAsNewAPIHadoopFiles(
prefix: String,
suffix: String,
@@ -295,6 +555,10 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
dstream.saveAsNewAPIHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass)
}
+ /**
+ * Saves each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is generated
+ * based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
+ */
def saveAsNewAPIHadoopFiles(
prefix: String,
suffix: String,