diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2017-03-22 12:30:36 -0700 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2017-03-22 12:30:36 -0700 |
commit | 82b598b963a21ae9d6a2a9638e86b4165c2a78c9 (patch) | |
tree | 86a2f15a15d9642df2ebbc0037a4e91025a04164 /sql/core/src/test/scala/org | |
parent | 80fd070389a9c8ffa342d7b11f1ab2ea92e0f562 (diff) | |
download | spark-82b598b963a21ae9d6a2a9638e86b4165c2a78c9.tar.gz spark-82b598b963a21ae9d6a2a9638e86b4165c2a78c9.tar.bz2 spark-82b598b963a21ae9d6a2a9638e86b4165c2a78c9.zip |
[SPARK-20057][SS] Renamed KeyedState to GroupState in mapGroupsWithState
## What changes were proposed in this pull request?
Since the state is tied a "group" in the "mapGroupsWithState" operations, its better to call the state "GroupState" instead of a key. This would make it more general if you extends this operation to RelationGroupedDataset and python APIs.
## How was this patch tested?
Existing unit tests.
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes #17385 from tdas/SPARK-20057.
Diffstat (limited to 'sql/core/src/test/scala/org')
-rw-r--r-- | sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala | 122 |
1 files changed, 61 insertions, 61 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala index fe72283bb6..3dabef6a9a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.plans.logical.FlatMapGroupsWithState import org.apache.spark.sql.catalyst.plans.physical.UnknownPartitioning import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._ import org.apache.spark.sql.execution.RDDScanExec -import org.apache.spark.sql.execution.streaming.{FlatMapGroupsWithStateExec, KeyedStateImpl, MemoryStream} +import org.apache.spark.sql.execution.streaming.{FlatMapGroupsWithStateExec, GroupStateImpl, MemoryStream} import org.apache.spark.sql.execution.streaming.state.{StateStore, StateStoreId, StoreUpdate} import org.apache.spark.sql.streaming.FlatMapGroupsWithStateSuite.MemoryStateStore import org.apache.spark.sql.types.{DataType, IntegerType} @@ -43,16 +43,16 @@ case class Result(key: Long, count: Int) class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAfterAll { import testImplicits._ - import KeyedStateImpl._ - import KeyedStateTimeout._ + import GroupStateImpl._ + import GroupStateTimeout._ override def afterAll(): Unit = { super.afterAll() StateStore.stop() } - test("KeyedState - get, exists, update, remove") { - var state: KeyedStateImpl[String] = null + test("GroupState - get, exists, update, remove") { + var state: GroupStateImpl[String] = null def testState( expectedData: Option[String], @@ -73,13 +73,13 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf } // Updating empty state - state = new KeyedStateImpl[String](None) + state = new GroupStateImpl[String](None) testState(None) state.update("") testState(Some(""), shouldBeUpdated = true) // Updating exiting state - state = new KeyedStateImpl[String](Some("2")) + state = new GroupStateImpl[String](Some("2")) testState(Some("2")) state.update("3") testState(Some("3"), shouldBeUpdated = true) @@ -97,19 +97,19 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf } } - test("KeyedState - setTimeout**** with NoTimeout") { + test("GroupState - setTimeout**** with NoTimeout") { for (initState <- Seq(None, Some(5))) { // for different initial state - implicit val state = new KeyedStateImpl(initState, 1000, 1000, NoTimeout, hasTimedOut = false) + implicit val state = new GroupStateImpl(initState, 1000, 1000, NoTimeout, hasTimedOut = false) testTimeoutDurationNotAllowed[UnsupportedOperationException](state) testTimeoutTimestampNotAllowed[UnsupportedOperationException](state) } } - test("KeyedState - setTimeout**** with ProcessingTimeTimeout") { - implicit var state: KeyedStateImpl[Int] = null + test("GroupState - setTimeout**** with ProcessingTimeTimeout") { + implicit var state: GroupStateImpl[Int] = null - state = new KeyedStateImpl[Int](None, 1000, 1000, ProcessingTimeTimeout, hasTimedOut = false) + state = new GroupStateImpl[Int](None, 1000, 1000, ProcessingTimeTimeout, hasTimedOut = false) assert(state.getTimeoutTimestamp === NO_TIMESTAMP) testTimeoutDurationNotAllowed[IllegalStateException](state) testTimeoutTimestampNotAllowed[UnsupportedOperationException](state) @@ -128,8 +128,8 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf testTimeoutTimestampNotAllowed[UnsupportedOperationException](state) } - test("KeyedState - setTimeout**** with EventTimeTimeout") { - implicit val state = new KeyedStateImpl[Int]( + test("GroupState - setTimeout**** with EventTimeTimeout") { + implicit val state = new GroupStateImpl[Int]( None, 1000, 1000, EventTimeTimeout, hasTimedOut = false) assert(state.getTimeoutTimestamp === NO_TIMESTAMP) testTimeoutDurationNotAllowed[UnsupportedOperationException](state) @@ -148,8 +148,8 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf testTimeoutTimestampNotAllowed[IllegalStateException](state) } - test("KeyedState - illegal params to setTimeout****") { - var state: KeyedStateImpl[Int] = null + test("GroupState - illegal params to setTimeout****") { + var state: GroupStateImpl[Int] = null // Test setTimeout****() with illegal values def testIllegalTimeout(body: => Unit): Unit = { @@ -157,14 +157,14 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf assert(state.getTimeoutTimestamp === NO_TIMESTAMP) } - state = new KeyedStateImpl(Some(5), 1000, 1000, ProcessingTimeTimeout, hasTimedOut = false) + state = new GroupStateImpl(Some(5), 1000, 1000, ProcessingTimeTimeout, hasTimedOut = false) testIllegalTimeout { state.setTimeoutDuration(-1000) } testIllegalTimeout { state.setTimeoutDuration(0) } testIllegalTimeout { state.setTimeoutDuration("-2 second") } testIllegalTimeout { state.setTimeoutDuration("-1 month") } testIllegalTimeout { state.setTimeoutDuration("1 month -1 day") } - state = new KeyedStateImpl(Some(5), 1000, 1000, EventTimeTimeout, hasTimedOut = false) + state = new GroupStateImpl(Some(5), 1000, 1000, EventTimeTimeout, hasTimedOut = false) testIllegalTimeout { state.setTimeoutTimestamp(-10000) } testIllegalTimeout { state.setTimeoutTimestamp(10000, "-3 second") } testIllegalTimeout { state.setTimeoutTimestamp(10000, "-1 month") } @@ -175,25 +175,25 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf testIllegalTimeout { state.setTimeoutTimestamp(new Date(-10000), "1 month -1 day") } } - test("KeyedState - hasTimedOut") { + test("GroupState - hasTimedOut") { for (timeoutConf <- Seq(NoTimeout, ProcessingTimeTimeout, EventTimeTimeout)) { for (initState <- Seq(None, Some(5))) { - val state1 = new KeyedStateImpl(initState, 1000, 1000, timeoutConf, hasTimedOut = false) + val state1 = new GroupStateImpl(initState, 1000, 1000, timeoutConf, hasTimedOut = false) assert(state1.hasTimedOut === false) - val state2 = new KeyedStateImpl(initState, 1000, 1000, timeoutConf, hasTimedOut = true) + val state2 = new GroupStateImpl(initState, 1000, 1000, timeoutConf, hasTimedOut = true) assert(state2.hasTimedOut === true) } } } - test("KeyedState - primitive type") { - var intState = new KeyedStateImpl[Int](None) + test("GroupState - primitive type") { + var intState = new GroupStateImpl[Int](None) intercept[NoSuchElementException] { intState.get } assert(intState.getOption === None) - intState = new KeyedStateImpl[Int](Some(10)) + intState = new GroupStateImpl[Int](Some(10)) assert(intState.get == 10) intState.update(0) assert(intState.get == 0) @@ -218,21 +218,21 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf testStateUpdateWithData( testName + "no update", stateUpdates = state => { /* do nothing */ }, - timeoutConf = KeyedStateTimeout.NoTimeout, + timeoutConf = GroupStateTimeout.NoTimeout, priorState = priorState, expectedState = priorState) // should not change testStateUpdateWithData( testName + "state updated", stateUpdates = state => { state.update(5) }, - timeoutConf = KeyedStateTimeout.NoTimeout, + timeoutConf = GroupStateTimeout.NoTimeout, priorState = priorState, expectedState = Some(5)) // should change testStateUpdateWithData( testName + "state removed", stateUpdates = state => { state.remove() }, - timeoutConf = KeyedStateTimeout.NoTimeout, + timeoutConf = GroupStateTimeout.NoTimeout, priorState = priorState, expectedState = None) // should be removed } @@ -283,7 +283,7 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf testStateUpdateWithData( s"ProcessingTimeTimeout - $testName - state and timeout duration updated", stateUpdates = - (state: KeyedState[Int]) => { state.update(5); state.setTimeoutDuration(5000) }, + (state: GroupState[Int]) => { state.update(5); state.setTimeoutDuration(5000) }, timeoutConf = ProcessingTimeTimeout, priorState = priorState, priorTimeoutTimestamp = priorTimeoutTimestamp, @@ -293,7 +293,7 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf testStateUpdateWithData( s"EventTimeTimeout - $testName - state and timeout timestamp updated", stateUpdates = - (state: KeyedState[Int]) => { state.update(5); state.setTimeoutTimestamp(5000) }, + (state: GroupState[Int]) => { state.update(5); state.setTimeoutTimestamp(5000) }, timeoutConf = EventTimeTimeout, priorState = priorState, priorTimeoutTimestamp = priorTimeoutTimestamp, @@ -303,7 +303,7 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf testStateUpdateWithData( s"EventTimeTimeout - $testName - timeout timestamp updated to before watermark", stateUpdates = - (state: KeyedState[Int]) => { + (state: GroupState[Int]) => { state.update(5) intercept[IllegalArgumentException] { state.setTimeoutTimestamp(currentBatchWatermark - 1) // try to set to < watermark @@ -387,7 +387,7 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf test("StateStoreUpdater - rows are cloned before writing to StateStore") { // function for running count - val func = (key: Int, values: Iterator[Int], state: KeyedState[Int]) => { + val func = (key: Int, values: Iterator[Int], state: GroupState[Int]) => { state.update(state.getOption.getOrElse(0) + values.size) Iterator.empty } @@ -404,7 +404,7 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf test("flatMapGroupsWithState - streaming") { // Function to maintain running count up to 2, and then remove the count // Returns the data and the count if state is defined, otherwise does not return anything - val stateFunc = (key: String, values: Iterator[String], state: KeyedState[RunningCount]) => { + val stateFunc = (key: String, values: Iterator[String], state: GroupState[RunningCount]) => { val count = state.getOption.map(_.count).getOrElse(0L) + values.size if (count == 3) { @@ -420,7 +420,7 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf val result = inputData.toDS() .groupByKey(x => x) - .flatMapGroupsWithState(Update, KeyedStateTimeout.NoTimeout)(stateFunc) + .flatMapGroupsWithState(Update, GroupStateTimeout.NoTimeout)(stateFunc) testStream(result, Update)( AddData(inputData, "a"), @@ -446,7 +446,7 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf // Function to maintain running count up to 2, and then remove the count // Returns the data and the count if state is defined, otherwise does not return anything // Additionally, it updates state lazily as the returned iterator get consumed - val stateFunc = (key: String, values: Iterator[String], state: KeyedState[RunningCount]) => { + val stateFunc = (key: String, values: Iterator[String], state: GroupState[RunningCount]) => { values.flatMap { _ => val count = state.getOption.map(_.count).getOrElse(0L) + 1 if (count == 3) { @@ -463,7 +463,7 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf val result = inputData.toDS() .groupByKey(x => x) - .flatMapGroupsWithState(Update, KeyedStateTimeout.NoTimeout)(stateFunc) + .flatMapGroupsWithState(Update, GroupStateTimeout.NoTimeout)(stateFunc) testStream(result, Update)( AddData(inputData, "a", "a", "b"), CheckLastBatch(("a", "1"), ("a", "2"), ("b", "1")), @@ -481,7 +481,7 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf test("flatMapGroupsWithState - streaming + aggregation") { // Function to maintain running count up to 2, and then remove the count // Returns the data and the count (-1 if count reached beyond 2 and state was just removed) - val stateFunc = (key: String, values: Iterator[String], state: KeyedState[RunningCount]) => { + val stateFunc = (key: String, values: Iterator[String], state: GroupState[RunningCount]) => { val count = state.getOption.map(_.count).getOrElse(0L) + values.size if (count == 3) { @@ -497,7 +497,7 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf val result = inputData.toDS() .groupByKey(x => x) - .flatMapGroupsWithState(Append, KeyedStateTimeout.NoTimeout)(stateFunc) + .flatMapGroupsWithState(Append, GroupStateTimeout.NoTimeout)(stateFunc) .groupByKey(_._1) .count() @@ -524,20 +524,20 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf test("flatMapGroupsWithState - batch") { // Function that returns running count only if its even, otherwise does not return - val stateFunc = (key: String, values: Iterator[String], state: KeyedState[RunningCount]) => { + val stateFunc = (key: String, values: Iterator[String], state: GroupState[RunningCount]) => { if (state.exists) throw new IllegalArgumentException("state.exists should be false") Iterator((key, values.size)) } val df = Seq("a", "a", "b").toDS .groupByKey(x => x) - .flatMapGroupsWithState(Update, KeyedStateTimeout.NoTimeout)(stateFunc).toDF + .flatMapGroupsWithState(Update, GroupStateTimeout.NoTimeout)(stateFunc).toDF checkAnswer(df, Seq(("a", 2), ("b", 1)).toDF) } test("flatMapGroupsWithState - streaming with processing time timeout") { // Function to maintain running count up to 2, and then remove the count // Returns the data and the count (-1 if count reached beyond 2 and state was just removed) - val stateFunc = (key: String, values: Iterator[String], state: KeyedState[RunningCount]) => { + val stateFunc = (key: String, values: Iterator[String], state: GroupState[RunningCount]) => { if (state.hasTimedOut) { state.remove() Iterator((key, "-1")) @@ -594,7 +594,7 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf val stateFunc = ( key: String, values: Iterator[(String, Long)], - state: KeyedState[Long]) => { + state: GroupState[Long]) => { val timeoutDelay = 5 if (key != "a") { Iterator.empty @@ -637,7 +637,7 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf test("mapGroupsWithState - streaming") { // Function to maintain running count up to 2, and then remove the count // Returns the data and the count (-1 if count reached beyond 2 and state was just removed) - val stateFunc = (key: String, values: Iterator[String], state: KeyedState[RunningCount]) => { + val stateFunc = (key: String, values: Iterator[String], state: GroupState[RunningCount]) => { val count = state.getOption.map(_.count).getOrElse(0L) + values.size if (count == 3) { @@ -676,7 +676,7 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf } test("mapGroupsWithState - batch") { - val stateFunc = (key: String, values: Iterator[String], state: KeyedState[RunningCount]) => { + val stateFunc = (key: String, values: Iterator[String], state: GroupState[RunningCount]) => { if (state.exists) throw new IllegalArgumentException("state.exists should be false") (key, values.size) } @@ -690,7 +690,7 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf } testQuietly("StateStore.abort on task failure handling") { - val stateFunc = (key: String, values: Iterator[String], state: KeyedState[RunningCount]) => { + val stateFunc = (key: String, values: Iterator[String], state: GroupState[RunningCount]) => { if (FlatMapGroupsWithStateSuite.failInTask) throw new Exception("expected failure") val count = state.getOption.map(_.count).getOrElse(0L) + values.size state.update(RunningCount(count)) @@ -724,7 +724,7 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf } test("output partitioning is unknown") { - val stateFunc = (key: String, values: Iterator[String], state: KeyedState[RunningCount]) => key + val stateFunc = (key: String, values: Iterator[String], state: GroupState[RunningCount]) => key val inputData = MemoryStream[String] val result = inputData.toDS.groupByKey(x => x).mapGroupsWithState(stateFunc) testStream(result, Update)( @@ -735,13 +735,13 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf } test("disallow complete mode") { - val stateFunc = (key: String, values: Iterator[String], state: KeyedState[Int]) => { + val stateFunc = (key: String, values: Iterator[String], state: GroupState[Int]) => { Iterator[String]() } var e = intercept[IllegalArgumentException] { MemoryStream[String].toDS().groupByKey(x => x).flatMapGroupsWithState( - OutputMode.Complete, KeyedStateTimeout.NoTimeout)(stateFunc) + OutputMode.Complete, GroupStateTimeout.NoTimeout)(stateFunc) } assert(e.getMessage === "The output mode of function should be append or update") @@ -750,20 +750,20 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf override def call( key: String, values: JIterator[String], - state: KeyedState[Int]): JIterator[String] = { null } + state: GroupState[Int]): JIterator[String] = { null } } e = intercept[IllegalArgumentException] { MemoryStream[String].toDS().groupByKey(x => x).flatMapGroupsWithState( javaStateFunc, OutputMode.Complete, - implicitly[Encoder[Int]], implicitly[Encoder[String]], KeyedStateTimeout.NoTimeout) + implicitly[Encoder[Int]], implicitly[Encoder[String]], GroupStateTimeout.NoTimeout) } assert(e.getMessage === "The output mode of function should be append or update") } def testStateUpdateWithData( testName: String, - stateUpdates: KeyedState[Int] => Unit, - timeoutConf: KeyedStateTimeout, + stateUpdates: GroupState[Int] => Unit, + timeoutConf: GroupStateTimeout, priorState: Option[Int], priorTimeoutTimestamp: Long = NO_TIMESTAMP, expectedState: Option[Int] = None, @@ -773,7 +773,7 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf return // there can be no prior timestamp, when there is no prior state } test(s"StateStoreUpdater - updates with data - $testName") { - val mapGroupsFunc = (key: Int, values: Iterator[Int], state: KeyedState[Int]) => { + val mapGroupsFunc = (key: Int, values: Iterator[Int], state: GroupState[Int]) => { assert(state.hasTimedOut === false, "hasTimedOut not false") assert(values.nonEmpty, "Some value is expected") stateUpdates(state) @@ -787,14 +787,14 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf def testStateUpdateWithTimeout( testName: String, - stateUpdates: KeyedState[Int] => Unit, - timeoutConf: KeyedStateTimeout, + stateUpdates: GroupState[Int] => Unit, + timeoutConf: GroupStateTimeout, priorTimeoutTimestamp: Long, expectedState: Option[Int], expectedTimeoutTimestamp: Long = NO_TIMESTAMP): Unit = { test(s"StateStoreUpdater - updates for timeout - $testName") { - val mapGroupsFunc = (key: Int, values: Iterator[Int], state: KeyedState[Int]) => { + val mapGroupsFunc = (key: Int, values: Iterator[Int], state: GroupState[Int]) => { assert(state.hasTimedOut === true, "hasTimedOut not true") assert(values.isEmpty, "values not empty") stateUpdates(state) @@ -808,8 +808,8 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf def testStateUpdate( testTimeoutUpdates: Boolean, - mapGroupsFunc: (Int, Iterator[Int], KeyedState[Int]) => Iterator[Int], - timeoutConf: KeyedStateTimeout, + mapGroupsFunc: (Int, Iterator[Int], GroupState[Int]) => Iterator[Int], + timeoutConf: GroupStateTimeout, priorState: Option[Int], priorTimeoutTimestamp: Long, expectedState: Option[Int], @@ -848,8 +848,8 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf } def newFlatMapGroupsWithStateExec( - func: (Int, Iterator[Int], KeyedState[Int]) => Iterator[Int], - timeoutType: KeyedStateTimeout = KeyedStateTimeout.NoTimeout, + func: (Int, Iterator[Int], GroupState[Int]) => Iterator[Int], + timeoutType: GroupStateTimeout = GroupStateTimeout.NoTimeout, batchTimestampMs: Long = NO_TIMESTAMP): FlatMapGroupsWithStateExec = { MemoryStream[Int] .toDS @@ -863,7 +863,7 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf }.get } - def testTimeoutDurationNotAllowed[T <: Exception: Manifest](state: KeyedStateImpl[_]): Unit = { + def testTimeoutDurationNotAllowed[T <: Exception: Manifest](state: GroupStateImpl[_]): Unit = { val prevTimestamp = state.getTimeoutTimestamp intercept[T] { state.setTimeoutDuration(1000) } assert(state.getTimeoutTimestamp === prevTimestamp) @@ -871,7 +871,7 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf assert(state.getTimeoutTimestamp === prevTimestamp) } - def testTimeoutTimestampNotAllowed[T <: Exception: Manifest](state: KeyedStateImpl[_]): Unit = { + def testTimeoutTimestampNotAllowed[T <: Exception: Manifest](state: GroupStateImpl[_]): Unit = { val prevTimestamp = state.getTimeoutTimestamp intercept[T] { state.setTimeoutTimestamp(2000) } assert(state.getTimeoutTimestamp === prevTimestamp) |