summaryrefslogblamecommitdiff
path: root/test/files/pos/t5862.scala
blob: e3006ddc3f7a7807cd3f09b0a2c90a62ec103609 (plain) (tree)





































                                                                                     
package test

import java.io.DataOutput
import java.io.DataInput

/** Interface for writing outputs from a DoFn. */
trait Emitter[A] {
  def emit(value: A): Unit
}

/** A wrapper for a 'map' function tagged for a specific output channel. */
abstract class TaggedMapper[A, K, V]
    (val tags: Set[Int])
    (implicit val mA: Manifest[A], val wtA: WireFormat[A],
              val mK: Manifest[K], val wtK: WireFormat[K], val ordK: Ordering[K],
              val mV: Manifest[V], val wtV: WireFormat[V])
  extends Serializable {
}

/** Type-class for sending types across the Hadoop wire. */
trait WireFormat[A]

class MapReduceJob {
  trait DataSource

  import scala.collection.mutable.{ Set => MSet, Map => MMap }
  private val mappers: MMap[DataSource, MSet[TaggedMapper[_, _, _]]] = MMap.empty

  def addTaggedMapper[A, K, V](input: DataSource, m: TaggedMapper[A, K, V]): Unit = {
    if (!mappers.contains(input))
      mappers += (input -> MSet(m))
    else
      mappers(input) += m // : Unit

    m.tags.foreach { tag =>
    }
  }
}