aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLiwei Lin <lwlin7@gmail.com>2016-04-27 10:25:33 -0700
committerMichael Armbrust <michael@databricks.com>2016-04-27 10:25:33 -0700
commita234cc61465bbefafd9e69c1cabe9aaaf968a91f (patch)
tree1bf6d07dface72373b687191cc13914d5504e0bf
parent7dd01d9c019ee8d015a82fcda5c85f66bf8a3673 (diff)
downloadspark-a234cc61465bbefafd9e69c1cabe9aaaf968a91f.tar.gz
spark-a234cc61465bbefafd9e69c1cabe9aaaf968a91f.tar.bz2
spark-a234cc61465bbefafd9e69c1cabe9aaaf968a91f.zip
[SPARK-14874][SQL][STREAMING] Remove the obsolete Batch representation
## What changes were proposed in this pull request? The `Batch` class, which had been used to indicate progress in a stream, was abandoned by [[SPARK-13985][SQL] Deterministic batches with ids](https://github.com/apache/spark/commit/caea15214571d9b12dcf1553e5c1cc8b83a8ba5b) and then became useless. This patch: - removes the `Batch` class - ~~does some related renaming~~ (update: this has been reverted) - fixes some related comments ## How was this patch tested? N/A Author: Liwei Lin <lwlin7@gmail.com> Closes #12638 from lw-lin/remove-batch.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Batch.scala26
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Sink.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala2
5 files changed, 4 insertions, 30 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Batch.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Batch.scala
deleted file mode 100644
index 1f25eb8fc5..0000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Batch.scala
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.streaming
-
-import org.apache.spark.sql.DataFrame
-
-/**
- * Used to pass a batch of data through a streaming query execution along with an indication
- * of progress in the stream.
- */
-class Batch(val end: Offset, val data: DataFrame)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index 681addea02..8e66538575 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -88,7 +88,7 @@ class FileStreamSource(
}
/**
- * Returns the next batch of data that is available after `start`, if any is available.
+ * Returns the data that is between the offsets (`start`, `end`].
*/
override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
val startId = start.map(_.asInstanceOf[LongOffset].offset).getOrElse(-1L)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Sink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Sink.scala
index 25015d58f7..e641e09b56 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Sink.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Sink.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.DataFrame
trait Sink {
/**
- * Adds a batch of data to this sink. The data for a given `batchId` is deterministic and if
+ * Adds a batch of data to this sink. The data for a given `batchId` is deterministic and if
* this method is called more than once with the same batchId (which will happen in the case of
* failures), then `data` should only be added once.
*/
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala
index 1d2f7a8753..14450c2e2f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala
@@ -34,7 +34,7 @@ trait Source {
def getOffset: Option[Offset]
/**
- * Returns the data that is between the offsets (`start`, `end`]. When `start` is `None` then
+ * Returns the data that is between the offsets (`start`, `end`]. When `start` is `None` then
* the batch should begin with the first available record. This method must always return the
* same data for a particular `start` and `end` pair.
*/
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
index 0d2a6dd929..a34927ff99 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
@@ -91,7 +91,7 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
}
/**
- * Returns the next batch of data that is available after `start`, if any is available.
+ * Returns the data that is between the offsets (`start`, `end`].
*/
override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
val startOrdinal =