aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Armbrust <michael@databricks.com>2016-09-23 12:17:59 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2016-09-23 12:17:59 -0700
commit988c71457354b0a443471f501cef544a85b1a76a (patch)
treef2189ea63e2ee41bd26ff4e0cb3bdcc5b0c22f29
parentf62ddc5983a08d4d54c0a9a8210dd6cbec555671 (diff)
downloadspark-988c71457354b0a443471f501cef544a85b1a76a.tar.gz
spark-988c71457354b0a443471f501cef544a85b1a76a.tar.bz2
spark-988c71457354b0a443471f501cef544a85b1a76a.zip
[SPARK-17643] Remove comparable requirement from Offset
For some sources, it is difficult to provide a global ordering based only on the data in the offset. Since we don't use comparison for correctness, lets remove it. Author: Michael Armbrust <michael@databricks.com> Closes #15207 from marmbrus/removeComparable.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala30
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.scala19
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala9
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/OffsetSuite.scala39
5 files changed, 9 insertions, 94 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala
index 729c8462fe..ebc6ee8184 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompositeOffset.scala
@@ -24,36 +24,6 @@ package org.apache.spark.sql.execution.streaming
*/
case class CompositeOffset(offsets: Seq[Option[Offset]]) extends Offset {
/**
- * Returns a negative integer, zero, or a positive integer as this object is less than, equal to,
- * or greater than the specified object.
- */
- override def compareTo(other: Offset): Int = other match {
- case otherComposite: CompositeOffset if otherComposite.offsets.size == offsets.size =>
- val comparisons = offsets.zip(otherComposite.offsets).map {
- case (Some(a), Some(b)) => a compareTo b
- case (None, None) => 0
- case (None, _) => -1
- case (_, None) => 1
- }
- val nonZeroSigns = comparisons.map(sign).filter(_ != 0).toSet
- nonZeroSigns.size match {
- case 0 => 0 // if both empty or only 0s
- case 1 => nonZeroSigns.head // if there are only (0s and 1s) or (0s and -1s)
- case _ => // there are both 1s and -1s
- throw new IllegalArgumentException(
- s"Invalid comparison between non-linear histories: $this <=> $other")
- }
- case _ =>
- throw new IllegalArgumentException(s"Cannot compare $this <=> $other")
- }
-
- private def sign(num: Int): Int = num match {
- case i if i < 0 => -1
- case i if i == 0 => 0
- case i if i > 0 => 1
- }
-
- /**
* Unpacks an offset into [[StreamProgress]] by associating each offset with the order list of
* sources.
*
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala
index bb176408d8..c5e8827777 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/LongOffset.scala
@@ -22,12 +22,6 @@ package org.apache.spark.sql.execution.streaming
*/
case class LongOffset(offset: Long) extends Offset {
- override def compareTo(other: Offset): Int = other match {
- case l: LongOffset => offset.compareTo(l.offset)
- case _ =>
- throw new IllegalArgumentException(s"Invalid comparison of $getClass with ${other.getClass}")
- }
-
def +(increment: Long): LongOffset = new LongOffset(offset + increment)
def -(decrement: Long): LongOffset = new LongOffset(offset - decrement)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.scala
index 2cc012840d..1f52abf277 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Offset.scala
@@ -19,19 +19,8 @@ package org.apache.spark.sql.execution.streaming
/**
* An offset is a monotonically increasing metric used to track progress in the computation of a
- * stream. An [[Offset]] must be comparable, and the result of `compareTo` must be consistent
- * with `equals` and `hashcode`.
+ * stream. Since offsets are retrieved from a [[Source]] by a single thread, we know the global
+ * ordering of two [[Offset]] instances. We do assume that if two offsets are `equal` then no
+ * new data has arrived.
*/
-trait Offset extends Serializable {
-
- /**
- * Returns a negative integer, zero, or a positive integer as this object is less than, equal to,
- * or greater than the specified object.
- */
- def compareTo(other: Offset): Int
-
- def >(other: Offset): Boolean = compareTo(other) > 0
- def <(other: Offset): Boolean = compareTo(other) < 0
- def <=(other: Offset): Boolean = compareTo(other) <= 0
- def >=(other: Offset): Boolean = compareTo(other) >= 0
-}
+trait Offset extends Serializable {}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 220f77dc24..9825f19b86 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -259,7 +259,7 @@ class StreamExecution(
case (source, available) =>
committedOffsets
.get(source)
- .map(committed => committed < available)
+ .map(committed => committed != available)
.getOrElse(true)
}
}
@@ -318,7 +318,8 @@ class StreamExecution(
// Request unprocessed data from all sources.
val newData = availableOffsets.flatMap {
- case (source, available) if committedOffsets.get(source).map(_ < available).getOrElse(true) =>
+ case (source, available)
+ if committedOffsets.get(source).map(_ != available).getOrElse(true) =>
val current = committedOffsets.get(source)
val batch = source.getBatch(current, available)
logDebug(s"Retrieving data from $source: $current -> $available")
@@ -404,10 +405,10 @@ class StreamExecution(
* Blocks the current thread until processing for data from the given `source` has reached at
* least the given `Offset`. This method is indented for use primarily when writing tests.
*/
- def awaitOffset(source: Source, newOffset: Offset): Unit = {
+ private[sql] def awaitOffset(source: Source, newOffset: Offset): Unit = {
def notDone = {
val localCommittedOffsets = committedOffsets
- !localCommittedOffsets.contains(source) || localCommittedOffsets(source) < newOffset
+ !localCommittedOffsets.contains(source) || localCommittedOffsets(source) != newOffset
}
while (notDone) {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/OffsetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/OffsetSuite.scala
index 9590af4e77..b65a987770 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/OffsetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/OffsetSuite.scala
@@ -24,44 +24,12 @@ trait OffsetSuite extends SparkFunSuite {
/** Creates test to check all the comparisons of offsets given a `one` that is less than `two`. */
def compare(one: Offset, two: Offset): Unit = {
test(s"comparison $one <=> $two") {
- assert(one < two)
- assert(one <= two)
- assert(one <= one)
- assert(two > one)
- assert(two >= one)
- assert(one >= one)
assert(one == one)
assert(two == two)
assert(one != two)
assert(two != one)
}
}
-
- /** Creates test to check that non-equality comparisons throw exception. */
- def compareInvalid(one: Offset, two: Offset): Unit = {
- test(s"invalid comparison $one <=> $two") {
- intercept[IllegalArgumentException] {
- assert(one < two)
- }
-
- intercept[IllegalArgumentException] {
- assert(one <= two)
- }
-
- intercept[IllegalArgumentException] {
- assert(one > two)
- }
-
- intercept[IllegalArgumentException] {
- assert(one >= two)
- }
-
- assert(!(one == two))
- assert(!(two == one))
- assert(one != two)
- assert(two != one)
- }
- }
}
class LongOffsetSuite extends OffsetSuite {
@@ -79,10 +47,6 @@ class CompositeOffsetSuite extends OffsetSuite {
one = CompositeOffset(None :: Nil),
two = CompositeOffset(Some(LongOffset(2)) :: Nil))
- compareInvalid( // sizes must be same
- one = CompositeOffset(Nil),
- two = CompositeOffset(Some(LongOffset(2)) :: Nil))
-
compare(
one = CompositeOffset.fill(LongOffset(0), LongOffset(1)),
two = CompositeOffset.fill(LongOffset(1), LongOffset(2)))
@@ -91,8 +55,5 @@ class CompositeOffsetSuite extends OffsetSuite {
one = CompositeOffset.fill(LongOffset(1), LongOffset(1)),
two = CompositeOffset.fill(LongOffset(1), LongOffset(2)))
- compareInvalid(
- one = CompositeOffset.fill(LongOffset(2), LongOffset(1)), // vector time inconsistent
- two = CompositeOffset.fill(LongOffset(1), LongOffset(2)))
}