aboutsummaryrefslogtreecommitdiff
path: root/R
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2016-09-06 19:34:11 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2016-09-06 19:34:11 -0700
commiteb1ab88a86ce35f3d6ba03b3a798099fbcf6b3fc (patch)
treec8a3a83ddc538b2a1e83c551441b98e9ad2c2099 /R
parentd6eede9a36766e2d2294951b054d7557008a5662 (diff)
downloadspark-eb1ab88a86ce35f3d6ba03b3a798099fbcf6b3fc.tar.gz
spark-eb1ab88a86ce35f3d6ba03b3a798099fbcf6b3fc.tar.bz2
spark-eb1ab88a86ce35f3d6ba03b3a798099fbcf6b3fc.zip
[SPARK-17372][SQL][STREAMING] Avoid serialization issues by using Arrays to save file names in FileStreamSource
## What changes were proposed in this pull request? When we create a filestream on a directory that has partitioned subdirs (i.e. dir/x=y/), then ListingFileCatalog.allFiles returns the files in the dir as Seq[String] which internally is a Stream[String]. This is because of this [line](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala#L93), where a LinkedHashSet.values.toSeq returns Stream. Then when the [FileStreamSource](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala#L79) filters this Stream[String] to remove the seen files, it creates a new Stream[String], which has a filter function that has a $outer reference to the FileStreamSource (in Scala 2.10). Trying to serialize this Stream[String] causes NotSerializableException. This will happened even if there is just one file in the dir. Its important to note that this behavior is different in Scala 2.11. There is no $outer reference to FileStreamSource, so it does not throw NotSerializableException. However, with a large sequence of files (tested with 10000 files), it throws StackOverflowError. This is because how Stream class is implemented. Its basically like a linked list, and attempting to serialize a long Stream requires *recursively* going through linked list, thus resulting in StackOverflowError. In short, across both Scala 2.10 and 2.11, serialization fails when both the following conditions are true. - file stream defined on a partitioned directory - directory has 10k+ files The right solution is to convert the seq to an array before writing to the log. This PR implements this fix in two ways. - Changing all uses for HDFSMetadataLog to ensure Array is used instead of Seq - Added a `require` in HDFSMetadataLog such that it is never used with type Seq ## How was this patch tested? Added unit test that test that ensures the file stream source can handle with 10000 files. This tests fails in both Scala 2.10 and 2.11 with different failures as indicated above. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #14987 from tdas/SPARK-17372.
Diffstat (limited to 'R')
0 files changed, 0 insertions, 0 deletions