aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPrashant Sharma <prashsh1@in.ibm.com>2016-10-07 11:16:24 -0700
committerMichael Armbrust <michael@databricks.com>2016-10-07 11:16:24 -0700
commitbb1aaf28eca6d9ae9af664ac3ad35cafdfc01a3b (patch)
treef1eee37793e524129fabcb614c2c6cb06b8f8287
parentaa3a6841ebaf45efb5d3930a93869948bdd0d2b6 (diff)
downloadspark-bb1aaf28eca6d9ae9af664ac3ad35cafdfc01a3b.tar.gz
spark-bb1aaf28eca6d9ae9af664ac3ad35cafdfc01a3b.tar.bz2
spark-bb1aaf28eca6d9ae9af664ac3ad35cafdfc01a3b.zip
[SPARK-16411][SQL][STREAMING] Add textFile to Structured Streaming.
## What changes were proposed in this pull request? Adds the textFile API which exists in DataFrameReader and serves same purpose. ## How was this patch tested? Added corresponding testcase. Author: Prashant Sharma <prashsh1@in.ibm.com> Closes #14087 from ScrapCodes/textFile.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala33
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala18
2 files changed, 50 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
index 864a9cd3eb..87b7306218 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
@@ -21,7 +21,7 @@ import scala.collection.JavaConverters._
import org.apache.spark.annotation.Experimental
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
+import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.execution.streaming.StreamingRelation
import org.apache.spark.sql.types.StructType
@@ -283,6 +283,37 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
*/
def text(path: String): DataFrame = format("text").load(path)
+ /**
+ * Loads text file(s) and returns a [[Dataset]] of String. The underlying schema of the Dataset
+ * contains a single string column named "value".
+ *
+ * If the directory structure of the text files contains partitioning information, those are
+ * ignored in the resulting Dataset. To include partitioning information as columns, use `text`.
+ *
+ * Each line in the text file is a new element in the resulting Dataset. For example:
+ * {{{
+ * // Scala:
+ * spark.readStream.textFile("/path/to/spark/README.md")
+ *
+ * // Java:
+ * spark.readStream().textFile("/path/to/spark/README.md")
+ * }}}
+ *
+ * You can set the following text-specific options to deal with text files:
+ * <ul>
+ * <li>`maxFilesPerTrigger` (default: no max limit): sets the maximum number of new files to be
+ * considered in every trigger.</li>
+ * </ul>
+ *
+ * @param path input path
+ * @since 2.1.0
+ */
+ def textFile(path: String): Dataset[String] = {
+ if (userSpecifiedSchema.nonEmpty) {
+ throw new AnalysisException("User specified schema not supported with `textFile`")
+ }
+ text(path).select("value").as[String](sparkSession.implicits.newStringEncoder)
+ }
///////////////////////////////////////////////////////////////////////////////////////
// Builder pattern config options
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index 3157afe5a5..7f9c981a4e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -342,6 +342,24 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
}
}
+ test("read from textfile") {
+ withTempDirs { case (src, tmp) =>
+ val textStream = spark.readStream.textFile(src.getCanonicalPath)
+ val filtered = textStream.filter(_.contains("keep"))
+
+ testStream(filtered)(
+ AddTextFileData("drop1\nkeep2\nkeep3", src, tmp),
+ CheckAnswer("keep2", "keep3"),
+ StopStream,
+ AddTextFileData("drop4\nkeep5\nkeep6", src, tmp),
+ StartStream(),
+ CheckAnswer("keep2", "keep3", "keep5", "keep6"),
+ AddTextFileData("drop7\nkeep8\nkeep9", src, tmp),
+ CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9")
+ )
+ }
+ }
+
test("SPARK-17165 should not track the list of seen files indefinitely") {
// This test works by:
// 1. Create a file