package scala.collection.parallel.benchmarks.parallel_array import scala.collection.parallel.benchmarks._ import scala.collection.parallel.mutable.ParArray import extra166y.{ParallelArray => JSR166Array} class Cont(val in: Int) { var num = in override def toString = in.toString } object Cont { val pred = (a: Cont) => a.in > 100 val predjsr = new extra166y.Ops.Predicate[Cont] { def op(a: Cont) = a.in > 100 } val op = (a: Cont, b: Cont) => { b.num = a.in + b.in b } val opnew = (a: Cont, b: Cont) => new Cont(a.in + b.in) val opheavy = (a: Cont, b: Cont) => { heavyComputation(a, b) } val reducer = new extra166y.Ops.Reducer[Cont] { def op(a: Cont, b: Cont) = { b.num = a.in + b.in b } } val reducernew = new extra166y.Ops.Reducer[Cont] { def op(a: Cont, b: Cont) = new Cont(a.in + b.in) } val reducerheavy = new extra166y.Ops.Reducer[Cont] { def op(a: Cont, b: Cont) = heavyComputation(a, b) } def heavyComputation(a: Cont, b: Cont) = { val f = a.in val s = b.in var i = 0 var res = f * s while (i < 50000) { if ((i + f) % 3 == 0) res += s else res -= f i += 1 } b.num = res b } } abstract class Resettable[T](val size: Int, val parallelism: Int, val runWhat: String, elemcreator: Int => T, arrcreator: Int => Array[Any], cls: Class[T]) extends Bench with SequentialOps[T] { val forkjoinpool = new scala.concurrent.forkjoin.ForkJoinPool(parallelism) forkjoinpool.setMaximumPoolSize(parallelism) val papool = new jsr166y.ForkJoinPool(parallelism) papool.setMaximumPoolSize(parallelism) var pa: ParArray[T] = null var jsrarr: JSR166Array[T] = null reset def reset = runWhat match { case "seq" => arr = arrcreator(size) for (i <- 0 until size) arr(i) = elemcreator(i) case "par" => pa = new ParArray[T](size) collection.parallel.tasksupport.environment = forkjoinpool for (i <- 0 until size) pa(i) = elemcreator(i) case "jsr" => jsrarr = JSR166Array.create(size, cls, papool) for (i <- 0 until size) jsrarr.set(i, elemcreator(i)) case _ => throw new IllegalArgumentException("Unknown type: " + runWhat) } var updateCounter = 0 def incUpdateCounter { updateCounter += 1 if (updateCounter > size) updateCounter = 0 } def updateSeq { val tmp = arr(updateCounter) arr(updateCounter) = arr(size - updateCounter - 1) arr(size - updateCounter - 1) = tmp incUpdateCounter } def updatePar { val tmp = pa(updateCounter) pa(updateCounter) = pa(size - updateCounter - 1) pa(size - updateCounter - 1) = tmp incUpdateCounter } def updateJsr { val tmp = jsrarr.get(updateCounter) jsrarr.set(updateCounter, jsrarr.get(size - updateCounter - 1)) jsrarr.set(size - updateCounter - 1, tmp) incUpdateCounter } override def printResults { println(" --- Fork join pool state --- ") println("Parallelism: " + forkjoinpool.getParallelism) println("Active threads: " + forkjoinpool.getActiveThreadCount) println("Work stealings: " + forkjoinpool.getStealCount) } }