aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
blob: 4585e3f6bd5b254d48b891203842ad749d57c5ac (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.spark.streaming.dstream

import java.io.{ObjectInputStream, IOException}
import scala.collection.mutable.{HashSet, HashMap}
import scala.reflect.ClassTag
import org.apache.hadoop.fs.{FileSystem, Path, PathFilter}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.UnionRDD
import org.apache.spark.streaming.{DStreamCheckpointData, StreamingContext, Time}
import org.apache.spark.util.TimeStampedHashMap


private[streaming]
class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : ClassTag](
    @transient ssc_ : StreamingContext,
    directory: String,
    filter: Path => Boolean = FileInputDStream.defaultFilter,
    newFilesOnly: Boolean = true)
  extends InputDStream[(K, V)](ssc_) {

  protected[streaming] override val checkpointData = new FileInputDStreamCheckpointData

  // Latest file mod time seen till any point of time
  private val prevModTimeFiles = new HashSet[String]()
  private var prevModTime = 0L

  @transient private var path_ : Path = null
  @transient private var fs_ : FileSystem = null
  @transient private[streaming] var files = new HashMap[Time, Array[String]]
  @transient private var fileModTimes = new TimeStampedHashMap[String, Long](true)
  @transient private var lastNewFileFindingTime = 0L

  override def start() {
    if (newFilesOnly) {
      prevModTime = graph.zeroTime.milliseconds
    } else {
      prevModTime = 0
    }
    logDebug("LastModTime initialized to " + prevModTime + ", new files only = " + newFilesOnly)
  }

  override def stop() { }

  /**
   * Finds the files that were modified since the last time this method was called and makes
   * a union RDD out of them. Note that this maintains the list of files that were processed
   * in the latest modification time in the previous call to this method. This is because the
   * modification time returned by the FileStatus API seems to return times only at the
   * granularity of seconds. And new files may have the same modification time as the
   * latest modification time in the previous call to this method yet was not reported in
   * the previous call.
   */
  override def compute(validTime: Time): Option[RDD[(K, V)]] = {
    assert(validTime.milliseconds >= prevModTime,
      "Trying to get new files for really old time [" + validTime + " < " + prevModTime + "]")

    // Find new files
    val (newFiles, latestModTime, latestModTimeFiles) = findNewFiles(validTime.milliseconds)
    logInfo("New files at time " + validTime + ":\n" + newFiles.mkString("\n"))
    if (newFiles.length > 0) {
      // Update the modification time and the files processed for that modification time
      if (prevModTime < latestModTime) {
        prevModTime = latestModTime
        prevModTimeFiles.clear()
      }
      prevModTimeFiles ++= latestModTimeFiles
      logDebug("Last mod time updated to " + prevModTime)
    }
    files += ((validTime, newFiles.toArray))
    Some(filesToRDD(newFiles))
  }

  /** Clear the old time-to-files mappings along with old RDDs */
  protected[streaming] override def clearMetadata(time: Time) {
    super.clearMetadata(time)
    val oldFiles = files.filter(_._1 <= (time - rememberDuration))
    files --= oldFiles.keys
    logInfo("Cleared " + oldFiles.size + " old files that were older than " +
      (time - rememberDuration) + ": " + oldFiles.keys.mkString(", "))
    logDebug("Cleared files are:\n" +
      oldFiles.map(p => (p._1, p._2.mkString(", "))).mkString("\n"))
    // Delete file times that weren't accessed in the last round of getting new files
    fileModTimes.clearOldValues(lastNewFileFindingTime - 1)
  }

  /**
   * Find files which have modification timestamp <= current time and return a 3-tuple of
   * (new files found, latest modification time among them, files with latest modification time)
   */
  private def findNewFiles(currentTime: Long): (Seq[String], Long, Seq[String]) = {
    logDebug("Trying to get new files for time " + currentTime)
    lastNewFileFindingTime = System.currentTimeMillis
    val filter = new CustomPathFilter(currentTime)
    val newFiles = fs.listStatus(directoryPath, filter).map(_.getPath.toString)
    val timeTaken = System.currentTimeMillis - lastNewFileFindingTime
    logInfo("Finding new files took " + timeTaken + " ms")
    if (timeTaken > slideDuration.milliseconds) {
      logWarning(
        "Time taken to find new files exceeds the batch size. " +
          "Consider increasing the batch size or reduceing the number of " +
          "files in the monitored directory."
      )
    }
    (newFiles, filter.latestModTime, filter.latestModTimeFiles.toSeq)
  }

  /** Generate one RDD from an array of files */
  private def filesToRDD(files: Seq[String]): RDD[(K, V)] = {
    val fileRDDs = files.map(file => context.sparkContext.newAPIHadoopFile[K, V, F](file))
    files.zip(fileRDDs).foreach { case (file, rdd) => {
      if (rdd.partitions.size == 0) {
        logError("File " + file + " has no data in it. Spark Streaming can only ingest " +
          "files that have been \"moved\" to the directory assigned to the file stream. " +
          "Refer to the streaming programming guide for more details.")
      }
    }}
    new UnionRDD(context.sparkContext, fileRDDs)
  }

  private def directoryPath: Path = {
    if (path_ == null) path_ = new Path(directory)
    path_
  }

  private def fs: FileSystem = {
    if (fs_ == null) fs_ = directoryPath.getFileSystem(new Configuration())
    fs_
  }

  private def getFileModTime(path: Path) = {
    fileModTimes.getOrElseUpdate(path.toString, fs.getFileStatus(path).getModificationTime())
  }

  private def reset()  {
    fs_ = null
  }

  @throws(classOf[IOException])
  private def readObject(ois: ObjectInputStream) {
    logDebug(this.getClass().getSimpleName + ".readObject used")
    ois.defaultReadObject()
    generatedRDDs = new HashMap[Time, RDD[(K,V)]] ()
    files = new HashMap[Time, Array[String]]
    fileModTimes = new TimeStampedHashMap[String, Long](true)
  }

  /**
   * A custom version of the DStreamCheckpointData that stores names of
   * Hadoop files as checkpoint data.
   */
  private[streaming]
  class FileInputDStreamCheckpointData extends DStreamCheckpointData(this) {

    def hadoopFiles = data.asInstanceOf[HashMap[Time, Array[String]]]

    override def update(time: Time) {
      hadoopFiles.clear()
      hadoopFiles ++= files
    }

    override def cleanup(time: Time) { }

    override def restore() {
      hadoopFiles.toSeq.sortBy(_._1)(Time.ordering).foreach {
        case (t, f) => {
          // Restore the metadata in both files and generatedRDDs
          logInfo("Restoring files for time " + t + " - " +
            f.mkString("[", ", ", "]") )
          files += ((t, f))
          generatedRDDs += ((t, filesToRDD(f)))
        }
      }
    }

    override def toString() = {
      "[\n" + hadoopFiles.size + " file sets\n" +
        hadoopFiles.map(p => (p._1, p._2.mkString(", "))).mkString("\n") + "\n]"
    }
  }

  /**
   * Custom PathFilter class to find new files that have modification timestamps <= current time,
   * but have not been seen before (i.e. the file should not be in lastModTimeFiles)
   */
  private[streaming]
  class CustomPathFilter(maxModTime: Long) extends PathFilter {
    // Latest file mod time seen in this round of fetching files and its corresponding files
    var latestModTime = 0L
    val latestModTimeFiles = new HashSet[String]()
    def accept(path: Path): Boolean = {
      try {
        if (!filter(path)) {  // Reject file if it does not satisfy filter
          logDebug("Rejected by filter " + path)
          return false
        }
        val modTime = getFileModTime(path)
        logDebug("Mod time for " + path + " is " + modTime)
        if (modTime < prevModTime) {
          logDebug("Mod time less than last mod time")
          return false  // If the file was created before the last time it was called
        } else if (modTime == prevModTime && prevModTimeFiles.contains(path.toString)) {
          logDebug("Mod time equal to last mod time, but file considered already")
          return false  // If the file was created exactly as lastModTime but not reported yet
        } else if (modTime > maxModTime) {
          logDebug("Mod time more than ")
          return false  // If the file is too new that considering it may give errors
        }
        if (modTime > latestModTime) {
          latestModTime = modTime
          latestModTimeFiles.clear()
          logDebug("Latest mod time updated to " + latestModTime)
        }
        latestModTimeFiles += path.toString
        logDebug("Accepted " + path)
      } catch {
        case fnfe: java.io.FileNotFoundException => 
          logWarning("Error finding new files", fnfe)
          reset()
          return false
      }
      return true
    }
  }
}

private[streaming]
object FileInputDStream {
  def defaultFilter(path: Path): Boolean = !path.getName().startsWith(".")
}