From bb1aaf28eca6d9ae9af664ac3ad35cafdfc01a3b Mon Sep 17 00:00:00 2001 From: Prashant Sharma Date: Fri, 7 Oct 2016 11:16:24 -0700 Subject: [SPARK-16411][SQL][STREAMING] Add textFile to Structured Streaming. ## What changes were proposed in this pull request? Adds the textFile API which exists in DataFrameReader and serves same purpose. ## How was this patch tested? Added corresponding testcase. Author: Prashant Sharma Closes #14087 from ScrapCodes/textFile. --- .../spark/sql/streaming/DataStreamReader.scala | 33 +++++++++++++++++++++- .../sql/streaming/FileStreamSourceSuite.scala | 18 ++++++++++++ 2 files changed, 50 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala index 864a9cd3eb..87b7306218 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala @@ -21,7 +21,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.annotation.Experimental import org.apache.spark.internal.Logging -import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} +import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, SparkSession} import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.execution.streaming.StreamingRelation import org.apache.spark.sql.types.StructType @@ -283,6 +283,37 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo */ def text(path: String): DataFrame = format("text").load(path) + /** + * Loads text file(s) and returns a [[Dataset]] of String. The underlying schema of the Dataset + * contains a single string column named "value". + * + * If the directory structure of the text files contains partitioning information, those are + * ignored in the resulting Dataset. To include partitioning information as columns, use `text`. + * + * Each line in the text file is a new element in the resulting Dataset. For example: + * {{{ + * // Scala: + * spark.readStream.textFile("/path/to/spark/README.md") + * + * // Java: + * spark.readStream().textFile("/path/to/spark/README.md") + * }}} + * + * You can set the following text-specific options to deal with text files: + * + * + * @param path input path + * @since 2.1.0 + */ + def textFile(path: String): Dataset[String] = { + if (userSpecifiedSchema.nonEmpty) { + throw new AnalysisException("User specified schema not supported with `textFile`") + } + text(path).select("value").as[String](sparkSession.implicits.newStringEncoder) + } /////////////////////////////////////////////////////////////////////////////////////// // Builder pattern config options diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 3157afe5a5..7f9c981a4e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -342,6 +342,24 @@ class FileStreamSourceSuite extends FileStreamSourceTest { } } + test("read from textfile") { + withTempDirs { case (src, tmp) => + val textStream = spark.readStream.textFile(src.getCanonicalPath) + val filtered = textStream.filter(_.contains("keep")) + + testStream(filtered)( + AddTextFileData("drop1\nkeep2\nkeep3", src, tmp), + CheckAnswer("keep2", "keep3"), + StopStream, + AddTextFileData("drop4\nkeep5\nkeep6", src, tmp), + StartStream(), + CheckAnswer("keep2", "keep3", "keep5", "keep6"), + AddTextFileData("drop7\nkeep8\nkeep9", src, tmp), + CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9") + ) + } + } + test("SPARK-17165 should not track the list of seen files indefinitely") { // This test works by: // 1. Create a file -- cgit v1.2.3