summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/actors/scala/actors/remote/TcpService.scala29
-rw-r--r--src/compiler/scala/reflect/macros/runtime/Aliases.scala5
-rw-r--r--src/compiler/scala/reflect/macros/runtime/Enclosures.scala16
-rw-r--r--src/compiler/scala/reflect/macros/runtime/Typers.scala23
-rw-r--r--src/compiler/scala/reflect/macros/util/Traces.scala2
-rw-r--r--src/compiler/scala/tools/nsc/ast/parser/Parsers.scala62
-rw-r--r--src/compiler/scala/tools/nsc/ast/parser/Scanners.scala2
-rw-r--r--src/compiler/scala/tools/nsc/ast/parser/TreeBuilder.scala9
-rw-r--r--src/compiler/scala/tools/nsc/doc/html/resource/lib/index.js17
-rw-r--r--src/compiler/scala/tools/nsc/interactive/Global.scala9
-rw-r--r--src/compiler/scala/tools/nsc/javac/JavaParsers.scala6
-rw-r--r--src/compiler/scala/tools/nsc/settings/ScalaSettings.scala1
-rw-r--r--src/compiler/scala/tools/nsc/symtab/classfile/ClassfileParser.scala59
-rw-r--r--src/compiler/scala/tools/nsc/symtab/classfile/Pickler.scala7
-rw-r--r--src/compiler/scala/tools/nsc/typechecker/ContextErrors.scala34
-rw-r--r--src/compiler/scala/tools/nsc/typechecker/Contexts.scala2
-rw-r--r--src/compiler/scala/tools/nsc/typechecker/Implicits.scala143
-rw-r--r--src/compiler/scala/tools/nsc/typechecker/Macros.scala141
-rw-r--r--src/compiler/scala/tools/nsc/typechecker/RefChecks.scala10
-rw-r--r--src/compiler/scala/tools/nsc/typechecker/SuperAccessors.scala23
-rw-r--r--src/compiler/scala/tools/nsc/typechecker/Typers.scala33
-rw-r--r--src/compiler/scala/tools/reflect/MacroImplementations.scala78
-rw-r--r--src/compiler/scala/tools/reflect/ToolBoxFactory.scala10
-rw-r--r--src/forkjoin/scala/concurrent/forkjoin/ForkJoinPool.java2605
-rw-r--r--src/forkjoin/scala/concurrent/forkjoin/ForkJoinTask.java356
-rw-r--r--src/forkjoin/scala/concurrent/forkjoin/ForkJoinWorkerThread.java20
-rw-r--r--src/library/scala/PartialFunction.scala2
-rw-r--r--src/library/scala/concurrent/impl/Future.scala2
-rw-r--r--src/partest/scala/tools/partest/CompilerTest.scala2
-rw-r--r--src/partest/scala/tools/partest/DirectTest.scala28
-rw-r--r--src/reflect/scala/reflect/internal/Flags.scala12
-rw-r--r--src/reflect/scala/reflect/internal/util/Statistics.scala2
-rw-r--r--src/reflect/scala/reflect/macros/Enclosures.scala6
-rw-r--r--src/reflect/scala/reflect/macros/Typers.scala6
-rw-r--r--src/reflect/scala/reflect/runtime/ReflectionUtils.scala12
35 files changed, 2420 insertions, 1354 deletions
diff --git a/src/actors/scala/actors/remote/TcpService.scala b/src/actors/scala/actors/remote/TcpService.scala
index bde05fd816..028dd3a083 100644
--- a/src/actors/scala/actors/remote/TcpService.scala
+++ b/src/actors/scala/actors/remote/TcpService.scala
@@ -14,7 +14,7 @@ package remote
import java.io.{DataInputStream, DataOutputStream, IOException}
import java.lang.{Thread, SecurityException}
-import java.net.{InetAddress, ServerSocket, Socket, UnknownHostException}
+import java.net.{InetAddress, InetSocketAddress, ServerSocket, Socket, SocketTimeoutException, UnknownHostException}
import scala.collection.mutable
import scala.util.Random
@@ -59,6 +59,23 @@ object TcpService {
portnum
}
+ private val connectTimeoutMillis = {
+ val propName = "scala.actors.tcpSocket.connectTimeoutMillis"
+ val defaultTimeoutMillis = 0
+ sys.props get propName flatMap {
+ timeout =>
+ try {
+ val to = timeout.toInt
+ Debug.info("Using socket timeout $to")
+ Some(to)
+ } catch {
+ case e: NumberFormatException =>
+ Debug.warning(s"""Could not parse $propName = "$timeout" as an Int""")
+ None
+ }
+ } getOrElse defaultTimeoutMillis
+ }
+
var BufSize: Int = 65536
}
@@ -176,7 +193,15 @@ class TcpService(port: Int, cl: ClassLoader) extends Thread with Service {
}
def connect(n: Node): TcpServiceWorker = synchronized {
- val socket = new Socket(n.address, n.port)
+ val socket = new Socket()
+ val start = System.nanoTime
+ try {
+ socket.connect(new InetSocketAddress(n.address, n.port), TcpService.connectTimeoutMillis)
+ } catch {
+ case e: SocketTimeoutException =>
+ Debug.warning(f"Timed out connecting to $n after ${(System.nanoTime - start) / math.pow(10, 9)}%.3f seconds")
+ throw e
+ }
val worker = new TcpServiceWorker(this, socket)
worker.sendNode(n)
worker.start()
diff --git a/src/compiler/scala/reflect/macros/runtime/Aliases.scala b/src/compiler/scala/reflect/macros/runtime/Aliases.scala
index ff870e728e..96cf50e498 100644
--- a/src/compiler/scala/reflect/macros/runtime/Aliases.scala
+++ b/src/compiler/scala/reflect/macros/runtime/Aliases.scala
@@ -28,4 +28,9 @@ trait Aliases {
override def typeTag[T](implicit ttag: TypeTag[T]) = ttag
override def weakTypeOf[T](implicit attag: WeakTypeTag[T]): Type = attag.tpe
override def typeOf[T](implicit ttag: TypeTag[T]): Type = ttag.tpe
+
+ type ImplicitCandidate = (Type, Tree)
+ implicit class RichOpenImplicit(oi: universe.analyzer.OpenImplicit) {
+ def toImplicitCandidate = (oi.pt, oi.tree)
+ }
} \ No newline at end of file
diff --git a/src/compiler/scala/reflect/macros/runtime/Enclosures.scala b/src/compiler/scala/reflect/macros/runtime/Enclosures.scala
index be5f2dbe83..2a4a22f81c 100644
--- a/src/compiler/scala/reflect/macros/runtime/Enclosures.scala
+++ b/src/compiler/scala/reflect/macros/runtime/Enclosures.scala
@@ -13,12 +13,12 @@ trait Enclosures {
// vals are eager to simplify debugging
// after all we wouldn't save that much time by making them lazy
- val macroApplication: Tree = expandee
- val enclosingClass: Tree = enclTrees collectFirst { case x: ImplDef => x } getOrElse EmptyTree
- val enclosingImplicits: List[(Type, Tree)] = site.openImplicits
- val enclosingMacros: List[Context] = this :: universe.analyzer.openMacros // include self
- val enclosingMethod: Tree = site.enclMethod.tree
- val enclosingPosition: Position = if (enclPoses.isEmpty) NoPosition else enclPoses.head.pos
- val enclosingUnit: CompilationUnit = universe.currentRun.currentUnit
- val enclosingRun: Run = universe.currentRun
+ val macroApplication: Tree = expandee
+ val enclosingClass: Tree = enclTrees collectFirst { case x: ImplDef => x } getOrElse EmptyTree
+ val enclosingImplicits: List[ImplicitCandidate] = site.openImplicits.map(_.toImplicitCandidate)
+ val enclosingMacros: List[Context] = this :: universe.analyzer.openMacros // include self
+ val enclosingMethod: Tree = site.enclMethod.tree
+ val enclosingPosition: Position = if (enclPoses.isEmpty) NoPosition else enclPoses.head.pos
+ val enclosingUnit: CompilationUnit = universe.currentRun.currentUnit
+ val enclosingRun: Run = universe.currentRun
}
diff --git a/src/compiler/scala/reflect/macros/runtime/Typers.scala b/src/compiler/scala/reflect/macros/runtime/Typers.scala
index f9add91b9a..4d333f180b 100644
--- a/src/compiler/scala/reflect/macros/runtime/Typers.scala
+++ b/src/compiler/scala/reflect/macros/runtime/Typers.scala
@@ -6,7 +6,7 @@ trait Typers {
def openMacros: List[Context] = this :: universe.analyzer.openMacros
- def openImplicits: List[(Type, Tree)] = callsiteTyper.context.openImplicits
+ def openImplicits: List[ImplicitCandidate] = callsiteTyper.context.openImplicits.map(_.toImplicitCandidate)
/**
* @see [[scala.tools.reflect.Toolbox.typeCheck]]
@@ -35,31 +35,16 @@ trait Typers {
def inferImplicitValue(pt: Type, silent: Boolean = true, withMacrosDisabled: Boolean = false, pos: Position = enclosingPosition): Tree = {
macroLogVerbose("inferring implicit value of type %s, macros = %s".format(pt, !withMacrosDisabled))
- inferImplicit(universe.EmptyTree, pt, isView = false, silent = silent, withMacrosDisabled = withMacrosDisabled, pos = pos)
+ universe.analyzer.inferImplicit(universe.EmptyTree, pt, false, callsiteTyper.context, silent, withMacrosDisabled, pos, (pos, msg) => throw TypecheckException(pos, msg))
}
def inferImplicitView(tree: Tree, from: Type, to: Type, silent: Boolean = true, withMacrosDisabled: Boolean = false, pos: Position = enclosingPosition): Tree = {
macroLogVerbose("inferring implicit view from %s to %s for %s, macros = %s".format(from, to, tree, !withMacrosDisabled))
val viewTpe = universe.appliedType(universe.definitions.FunctionClass(1).toTypeConstructor, List(from, to))
- inferImplicit(tree, viewTpe, isView = true, silent = silent, withMacrosDisabled = withMacrosDisabled, pos = pos)
- }
-
- private def inferImplicit(tree: Tree, pt: Type, isView: Boolean, silent: Boolean, withMacrosDisabled: Boolean, pos: Position): Tree = {
- import universe.analyzer.SearchResult
- val context = callsiteTyper.context
- val wrapper1 = if (!withMacrosDisabled) (context.withMacrosEnabled[SearchResult] _) else (context.withMacrosDisabled[SearchResult] _)
- def wrapper (inference: => SearchResult) = wrapper1(inference)
- wrapper(universe.analyzer.inferImplicit(tree, pt, reportAmbiguous = true, isView = isView, context = context, saveAmbiguousDivergent = !silent, pos = pos)) match {
- case failure if failure.tree.isEmpty =>
- macroLogVerbose("implicit search has failed. to find out the reason, turn on -Xlog-implicits")
- if (context.hasErrors) throw new TypecheckException(context.errBuffer.head.errPos, context.errBuffer.head.errMsg)
- universe.EmptyTree
- case success =>
- success.tree
- }
+ universe.analyzer.inferImplicit(tree, viewTpe, true, callsiteTyper.context, silent, withMacrosDisabled, pos, (pos, msg) => throw TypecheckException(pos, msg))
}
def resetAllAttrs(tree: Tree): Tree = universe.resetAllAttrs(tree)
def resetLocalAttrs(tree: Tree): Tree = universe.resetLocalAttrs(tree)
-} \ No newline at end of file
+}
diff --git a/src/compiler/scala/reflect/macros/util/Traces.scala b/src/compiler/scala/reflect/macros/util/Traces.scala
index d16916b753..2dffc68745 100644
--- a/src/compiler/scala/reflect/macros/util/Traces.scala
+++ b/src/compiler/scala/reflect/macros/util/Traces.scala
@@ -6,8 +6,6 @@ trait Traces {
val macroDebugLite = globalSettings.YmacrodebugLite.value
val macroDebugVerbose = globalSettings.YmacrodebugVerbose.value
- val macroTraceLite = scala.tools.nsc.util.trace when (macroDebugLite || macroDebugVerbose)
- val macroTraceVerbose = scala.tools.nsc.util.trace when macroDebugVerbose
@inline final def macroLogLite(msg: => Any) { if (macroDebugLite || macroDebugVerbose) println(msg) }
@inline final def macroLogVerbose(msg: => Any) { if (macroDebugVerbose) println(msg) }
}
diff --git a/src/compiler/scala/tools/nsc/ast/parser/Parsers.scala b/src/compiler/scala/tools/nsc/ast/parser/Parsers.scala
index 6f79f639b9..b8791c15dc 100644
--- a/src/compiler/scala/tools/nsc/ast/parser/Parsers.scala
+++ b/src/compiler/scala/tools/nsc/ast/parser/Parsers.scala
@@ -1107,32 +1107,33 @@ self =>
* | symbol
* | null
* }}}
- * @note The returned tree does not yet have a position
*/
- def literal(isNegated: Boolean = false, inPattern: Boolean = false): Tree = {
- def finish(value: Any): Tree = {
- val t = Literal(Constant(value))
- in.nextToken()
- t
- }
- if (in.token == SYMBOLLIT)
- Apply(scalaDot(nme.Symbol), List(finish(in.strVal)))
- else if (in.token == INTERPOLATIONID)
- interpolatedString(inPattern = inPattern)
- else finish(in.token match {
- case CHARLIT => in.charVal
- case INTLIT => in.intVal(isNegated).toInt
- case LONGLIT => in.intVal(isNegated)
- case FLOATLIT => in.floatVal(isNegated).toFloat
- case DOUBLELIT => in.floatVal(isNegated)
- case STRINGLIT | STRINGPART => in.strVal.intern()
- case TRUE => true
- case FALSE => false
- case NULL => null
- case _ =>
- syntaxErrorOrIncomplete("illegal literal", true)
- null
- })
+ def literal(isNegated: Boolean = false, inPattern: Boolean = false, start: Int = in.offset): Tree = {
+ atPos(start) {
+ def finish(value: Any): Tree = {
+ val t = Literal(Constant(value))
+ in.nextToken()
+ t
+ }
+ if (in.token == SYMBOLLIT)
+ Apply(scalaDot(nme.Symbol), List(finish(in.strVal)))
+ else if (in.token == INTERPOLATIONID)
+ interpolatedString(inPattern = inPattern)
+ else finish(in.token match {
+ case CHARLIT => in.charVal
+ case INTLIT => in.intVal(isNegated).toInt
+ case LONGLIT => in.intVal(isNegated)
+ case FLOATLIT => in.floatVal(isNegated).toFloat
+ case DOUBLELIT => in.floatVal(isNegated)
+ case STRINGLIT | STRINGPART => in.strVal.intern()
+ case TRUE => true
+ case FALSE => false
+ case NULL => null
+ case _ =>
+ syntaxErrorOrIncomplete("illegal literal", true)
+ null
+ })
+ }
}
private def stringOp(t: Tree, op: TermName) = {
@@ -1332,11 +1333,10 @@ self =>
def parseWhile = {
val start = in.offset
atPos(in.skipToken()) {
- val lname: Name = freshTermName(nme.WHILE_PREFIX)
val cond = condExpr()
newLinesOpt()
val body = expr()
- makeWhile(lname, cond, body)
+ makeWhile(start, cond, body)
}
}
parseWhile
@@ -1510,7 +1510,7 @@ self =>
atPos(in.offset) {
val name = nme.toUnaryName(rawIdent())
if (name == nme.UNARY_- && isNumericLit)
- simpleExprRest(atPos(in.offset)(literal(isNegated = true)), canApply = true)
+ simpleExprRest(literal(isNegated = true), canApply = true)
else
Select(stripParens(simpleExpr()), name)
}
@@ -1535,7 +1535,7 @@ self =>
def simpleExpr(): Tree = {
var canApply = true
val t =
- if (isLiteral) atPos(in.offset)(literal())
+ if (isLiteral) literal()
else in.token match {
case XMLSTART =>
xmlLiteral()
@@ -1924,7 +1924,7 @@ self =>
case INTLIT | LONGLIT | FLOATLIT | DOUBLELIT =>
t match {
case Ident(nme.MINUS) =>
- return atPos(start) { literal(isNegated = true, inPattern = true) }
+ return literal(isNegated = true, inPattern = true, start = start)
case _ =>
}
case _ =>
@@ -1942,7 +1942,7 @@ self =>
atPos(start, start) { Ident(nme.WILDCARD) }
case CHARLIT | INTLIT | LONGLIT | FLOATLIT | DOUBLELIT |
STRINGLIT | INTERPOLATIONID | SYMBOLLIT | TRUE | FALSE | NULL =>
- atPos(start) { literal(inPattern = true) }
+ literal(inPattern = true)
case LPAREN =>
atPos(start)(makeParens(noSeq.patterns()))
case XMLSTART =>
diff --git a/src/compiler/scala/tools/nsc/ast/parser/Scanners.scala b/src/compiler/scala/tools/nsc/ast/parser/Scanners.scala
index c05906c740..1aa50be83a 100644
--- a/src/compiler/scala/tools/nsc/ast/parser/Scanners.scala
+++ b/src/compiler/scala/tools/nsc/ast/parser/Scanners.scala
@@ -425,6 +425,7 @@ trait Scanners extends ScannersCommon {
if (ch == '\"') {
nextRawChar()
if (ch == '\"') {
+ offset += 3
nextRawChar()
getStringPart(multiLine = true)
sepRegions = STRINGPART :: sepRegions // indicate string part
@@ -434,6 +435,7 @@ trait Scanners extends ScannersCommon {
strVal = ""
}
} else {
+ offset += 1
getStringPart(multiLine = false)
sepRegions = STRINGLIT :: sepRegions // indicate single line string part
}
diff --git a/src/compiler/scala/tools/nsc/ast/parser/TreeBuilder.scala b/src/compiler/scala/tools/nsc/ast/parser/TreeBuilder.scala
index cd93221c50..b5771454f8 100644
--- a/src/compiler/scala/tools/nsc/ast/parser/TreeBuilder.scala
+++ b/src/compiler/scala/tools/nsc/ast/parser/TreeBuilder.scala
@@ -251,8 +251,13 @@ abstract class TreeBuilder {
else CompoundTypeTree(Template(tps, emptyValDef, Nil))
/** Create tree representing a while loop */
- def makeWhile(lname: TermName, cond: Tree, body: Tree): Tree = {
- val continu = atPos(o2p(body.pos pointOrElse wrappingPos(List(cond, body)).pos.endOrPoint)) { Apply(Ident(lname), Nil) }
+ def makeWhile(startPos: Int, cond: Tree, body: Tree): Tree = {
+ val lname = freshTermName(nme.WHILE_PREFIX)
+ def default = wrappingPos(List(cond, body)) match {
+ case p if p.isDefined => p.endOrPoint
+ case _ => startPos
+ }
+ val continu = atPos(o2p(body.pos pointOrElse default)) { Apply(Ident(lname), Nil) }
val rhs = If(cond, Block(List(body), continu), Literal(Constant()))
LabelDef(lname, Nil, rhs)
}
diff --git a/src/compiler/scala/tools/nsc/doc/html/resource/lib/index.js b/src/compiler/scala/tools/nsc/doc/html/resource/lib/index.js
index 70073b272a..96689ae701 100644
--- a/src/compiler/scala/tools/nsc/doc/html/resource/lib/index.js
+++ b/src/compiler/scala/tools/nsc/doc/html/resource/lib/index.js
@@ -14,9 +14,9 @@ var title = $(document).attr('title');
var lastHash = "";
$(document).ready(function() {
- $('body').layout({
+ $('body').layout({
west__size: '20%',
- center__maskContents: true
+ center__maskContents: true
});
$('#browser').layout({
center__paneSelector: ".ui-west-center"
@@ -342,11 +342,6 @@ function configureTextFilter() {
if (event.keyCode == 27) { // escape
input.attr("value", "");
}
- if (event.keyCode == 9) { // tab
- $("#template").contents().find("#mbrsel-input").focus();
- input.attr("value", "");
- return false;
- }
if (event.keyCode == 40) { // down arrow
$(window).unbind("keydown");
keyboardScrolldownLeftPane();
@@ -354,6 +349,14 @@ function configureTextFilter() {
}
textFilter();
});
+ input.bind('keydown', function(event) {
+ if (event.keyCode == 9) { // tab
+ $("#template").contents().find("#mbrsel-input").focus();
+ input.attr("value", "");
+ return false;
+ }
+ textFilter();
+ });
input.focus(function(event) { input.select(); });
});
scheduler.add("init", function() {
diff --git a/src/compiler/scala/tools/nsc/interactive/Global.scala b/src/compiler/scala/tools/nsc/interactive/Global.scala
index c63cca9b88..84670750d7 100644
--- a/src/compiler/scala/tools/nsc/interactive/Global.scala
+++ b/src/compiler/scala/tools/nsc/interactive/Global.scala
@@ -1174,8 +1174,13 @@ class Global(settings: Settings, _reporter: Reporter, projectName: String = "")
debugLog("type error caught: "+ex)
alt
case ex: DivergentImplicit =>
- debugLog("divergent implicit caught: "+ex)
- alt
+ if (settings.Xdivergence211.value) {
+ debugLog("this shouldn't happen. DivergentImplicit exception has been thrown with -Xdivergence211 turned on: "+ex)
+ alt
+ } else {
+ debugLog("divergent implicit caught: "+ex)
+ alt
+ }
}
}
diff --git a/src/compiler/scala/tools/nsc/javac/JavaParsers.scala b/src/compiler/scala/tools/nsc/javac/JavaParsers.scala
index 8aa9b81a72..0779e648cd 100644
--- a/src/compiler/scala/tools/nsc/javac/JavaParsers.scala
+++ b/src/compiler/scala/tools/nsc/javac/JavaParsers.scala
@@ -420,6 +420,9 @@ trait JavaParsers extends ast.parser.ParsersCommon with JavaScanners {
case FINAL =>
flags |= Flags.FINAL
in.nextToken
+ case DEFAULT =>
+ flags |= Flags.DEFAULTMETHOD
+ in.nextToken()
case NATIVE =>
addAnnot(NativeAttr)
in.nextToken
@@ -544,8 +547,9 @@ trait JavaParsers extends ast.parser.ParsersCommon with JavaScanners {
val vparams = formalParams()
if (!isVoid) rtpt = optArrayBrackets(rtpt)
optThrows()
+ val bodyOk = !inInterface || (mods hasFlag Flags.DEFAULTMETHOD)
val body =
- if (!inInterface && in.token == LBRACE) {
+ if (bodyOk && in.token == LBRACE) {
methodBody()
} else {
if (parentToken == AT && in.token == DEFAULT) {
diff --git a/src/compiler/scala/tools/nsc/settings/ScalaSettings.scala b/src/compiler/scala/tools/nsc/settings/ScalaSettings.scala
index 3df6334ec1..dbfaa2c531 100644
--- a/src/compiler/scala/tools/nsc/settings/ScalaSettings.scala
+++ b/src/compiler/scala/tools/nsc/settings/ScalaSettings.scala
@@ -110,6 +110,7 @@ trait ScalaSettings extends AbsScalaSettings
val XoldPatmat = BooleanSetting ("-Xoldpatmat", "Use the pre-2.10 pattern matcher. Otherwise, the 'virtualizing' pattern matcher is used in 2.10.")
val XnoPatmatAnalysis = BooleanSetting ("-Xno-patmat-analysis", "Don't perform exhaustivity/unreachability analysis. Also, ignore @switch annotation.")
val XfullLubs = BooleanSetting ("-Xfull-lubs", "Retains pre 2.10 behavior of less aggressive truncation of least upper bounds.")
+ val Xdivergence211 = BooleanSetting ("-Xdivergence211", "Turn on the 2.11 behavior of implicit divergence not terminating recursive implicit searches (SI-7291).")
/** Compatibility stubs for options whose value name did
* not previously match the option name.
diff --git a/src/compiler/scala/tools/nsc/symtab/classfile/ClassfileParser.scala b/src/compiler/scala/tools/nsc/symtab/classfile/ClassfileParser.scala
index fb2301de65..da117540b4 100644
--- a/src/compiler/scala/tools/nsc/symtab/classfile/ClassfileParser.scala
+++ b/src/compiler/scala/tools/nsc/symtab/classfile/ClassfileParser.scala
@@ -52,18 +52,14 @@ abstract class ClassfileParser {
private def handleMissing(e: MissingRequirementError) = {
if (settings.debug.value) e.printStackTrace
- throw new IOException("Missing dependency '" + e.req + "', required by " + in.file)
+ throw new IOException(s"Missing dependency '${e.req}', required by ${in.file}")
}
private def handleError(e: Exception) = {
if (settings.debug.value) e.printStackTrace()
- throw new IOException("class file '%s' is broken\n(%s/%s)".format(
- in.file,
- e.getClass,
- if (e.getMessage eq null) "" else e.getMessage)
- )
+ throw new IOException(s"class file '${in.file}' is broken\n(${e.getClass}/${e.getMessage})")
}
private def mismatchError(c: Symbol) = {
- throw new IOException("class file '%s' has location not matching its contents: contains ".format(in.file) + c)
+ throw new IOException(s"class file '${in.file}' has location not matching its contents: contains $c")
}
private def parseErrorHandler[T]: PartialFunction[Throwable, T] = {
@@ -72,8 +68,8 @@ abstract class ClassfileParser {
}
@inline private def pushBusy[T](sym: Symbol)(body: => T): T = {
busy match {
- case Some(`sym`) => throw new IOException("unsatisfiable cyclic dependency in '%s'".format(sym))
- case Some(sym1) => throw new IOException("illegal class file dependency between '%s' and '%s'".format(sym, sym1))
+ case Some(`sym`) => throw new IOException(s"unsatisfiable cyclic dependency in '$sym'")
+ case Some(sym1) => throw new IOException(s"illegal class file dependency between '$sym' and '$sym1'")
case _ => ()
}
@@ -232,8 +228,6 @@ abstract class ClassfileParser {
forceMangledName(tpe0.typeSymbol.name, false)
val (name, tpe) = getNameAndType(in.getChar(start + 3), ownerTpe)
-// println("new tpe: " + tpe + " at phase: " + phase)
-
if (name == nme.MODULE_INSTANCE_FIELD) {
val index = in.getChar(start + 1)
val name = getExternalName(in.getChar(starts(index) + 1))
@@ -244,14 +238,12 @@ abstract class ClassfileParser {
} else {
val origName = nme.originalName(name)
val owner = if (static) ownerTpe.typeSymbol.linkedClassOfClass else ownerTpe.typeSymbol
-// println("\t" + owner.info.member(name).tpe.widen + " =:= " + tpe)
f = owner.info.findMember(origName, 0, 0, false).suchThat(_.tpe.widen =:= tpe)
if (f == NoSymbol)
f = owner.info.findMember(newTermName(origName + nme.LOCAL_SUFFIX_STRING), 0, 0, false).suchThat(_.tpe =:= tpe)
if (f == NoSymbol) {
// if it's an impl class, try to find it's static member inside the class
if (ownerTpe.typeSymbol.isImplClass) {
-// println("impl class, member: " + owner.tpe.member(origName) + ": " + owner.tpe.member(origName).tpe)
f = ownerTpe.findMember(origName, 0, 0, false).suchThat(_.tpe =:= tpe)
} else {
log("Couldn't find " + name + ": " + tpe + " inside: \n" + ownerTpe)
@@ -262,11 +254,13 @@ abstract class ClassfileParser {
f setInfo tpe
log("created fake member " + f.fullName)
}
-// println("\townerTpe.decls: " + ownerTpe.decls)
-// println("Looking for: " + name + ": " + tpe + " inside: " + ownerTpe.typeSymbol + "\n\tand found: " + ownerTpe.members)
}
}
- assert(f != NoSymbol, "could not find\n " + name + ": " + tpe + "\ninside:\n " + ownerTpe.members.mkString(", "))
+ assert(f != NoSymbol,
+ s"could not find $name: $tpe in $ownerTpe" + (
+ if (settings.debug.value) ownerTpe.members.mkString(", members are:\n ", "\n ", "") else ""
+ )
+ )
values(index) = f
}
f
@@ -586,11 +580,9 @@ abstract class ClassfileParser {
def addEnclosingTParams(clazz: Symbol) {
var sym = clazz.owner
while (sym.isClass && !sym.isModuleClass) {
-// println("adding tparams of " + sym)
- for (t <- sym.tpe.typeArgs) {
-// println("\tadding " + (t.typeSymbol.name + "->" + t.typeSymbol))
+ for (t <- sym.tpe.typeArgs)
classTParams = classTParams + (t.typeSymbol.name -> t.typeSymbol)
- }
+
sym = sym.owner
}
}
@@ -740,10 +732,7 @@ abstract class ClassfileParser {
// raw type - existentially quantify all type parameters
val eparams = typeParamsToExistentials(classSym, classSym.unsafeTypeParams)
val t = typeRef(pre, classSym, eparams.map(_.tpeHK))
- val res = newExistentialType(eparams, t)
- if (settings.debug.value && settings.verbose.value)
- println("raw type " + classSym + " -> " + res)
- res
+ newExistentialType(eparams, t)
}
case tp =>
assert(sig.charAt(index) != '<', s"sig=$sig, index=$index, tp=$tp")
@@ -865,8 +854,6 @@ abstract class ClassfileParser {
val sig = pool.getExternalName(in.nextChar)
val newType = sigToType(sym, sig)
sym.setInfo(newType)
- if (settings.debug.value && settings.verbose.value)
- println("" + sym + "; signature = " + sig + " type = " + newType)
}
else in.skip(attrLen)
case tpnme.SyntheticATTR =>
@@ -883,10 +870,10 @@ abstract class ClassfileParser {
val c = pool.getConstant(in.nextChar)
val c1 = convertTo(c, symtype)
if (c1 ne null) sym.setInfo(ConstantType(c1))
- else println("failure to convert " + c + " to " + symtype); //debug
+ else debugwarn(s"failure to convert $c to $symtype")
case tpnme.ScalaSignatureATTR =>
if (!isScalaAnnot) {
- debuglog("warning: symbol " + sym.fullName + " has pickled signature in attribute")
+ debugwarn(s"symbol ${sym.fullName} has pickled signature in attribute")
unpickler.unpickle(in.buf, in.bp, clazz, staticModule, in.file.name)
}
in.skip(attrLen)
@@ -931,6 +918,12 @@ abstract class ClassfileParser {
case pkg => pkg.fullName(File.separatorChar)+File.separator+srcfileLeaf
}
srcfile0 = settings.outputDirs.srcFilesFor(in.file, srcpath).find(_.exists)
+ case tpnme.CodeATTR =>
+ if (sym.owner.isInterface) {
+ sym setFlag DEFAULTMETHOD
+ log(s"$sym in ${sym.owner} is a java8+ default method.")
+ }
+ in.skip(attrLen)
case _ =>
in.skip(attrLen)
}
@@ -1018,16 +1011,18 @@ abstract class ClassfileParser {
}
if (hasError) None
else Some(AnnotationInfo(attrType, List(), nvpairs.toList))
- } catch {
- case f: FatalError => throw f // don't eat fatal errors, they mean a class was not found
- case ex: Throwable =>
+ }
+ catch {
+ case f: FatalError => throw f // don't eat fatal errors, they mean a class was not found
+ case ex: java.lang.Error => throw ex
+ case ex: Throwable =>
// We want to be robust when annotations are unavailable, so the very least
// we can do is warn the user about the exception
// There was a reference to ticket 1135, but that is outdated: a reference to a class not on
// the classpath would *not* end up here. A class not found is signaled
// with a `FatalError` exception, handled above. Here you'd end up after a NPE (for example),
// and that should never be swallowed silently.
- warning("Caught: " + ex + " while parsing annotations in " + in.file)
+ warning(s"Caught: $ex while parsing annotations in ${in.file}")
if (settings.debug.value) ex.printStackTrace()
None // ignore malformed annotations
diff --git a/src/compiler/scala/tools/nsc/symtab/classfile/Pickler.scala b/src/compiler/scala/tools/nsc/symtab/classfile/Pickler.scala
index e8b0cd2696..ed7eb6d307 100644
--- a/src/compiler/scala/tools/nsc/symtab/classfile/Pickler.scala
+++ b/src/compiler/scala/tools/nsc/symtab/classfile/Pickler.scala
@@ -69,11 +69,7 @@ abstract class Pickler extends SubComponent {
}
if (!t.isDef && t.hasSymbol && t.symbol.isTermMacro) {
- unit.error(t.pos, t.symbol.typeParams.length match {
- case 0 => "macro has not been expanded"
- case 1 => "this type parameter must be specified"
- case _ => "these type parameters must be specified"
- })
+ unit.error(t.pos, "macro has not been expanded")
return
}
}
@@ -293,7 +289,6 @@ abstract class Pickler extends SubComponent {
putTree(definition)
*/
case Template(parents, self, body) =>
- writeNat(parents.length)
putTrees(parents)
putTree(self)
putTrees(body)
diff --git a/src/compiler/scala/tools/nsc/typechecker/ContextErrors.scala b/src/compiler/scala/tools/nsc/typechecker/ContextErrors.scala
index 49049f110d..b9cff5b2d3 100644
--- a/src/compiler/scala/tools/nsc/typechecker/ContextErrors.scala
+++ b/src/compiler/scala/tools/nsc/typechecker/ContextErrors.scala
@@ -60,6 +60,24 @@ trait ContextErrors {
def errPos = tree.pos
}
+ // Unlike other type errors diverging implicit expansion
+ // will be re-issued explicitly on failed implicit argument search.
+ // This is because we want to:
+ // 1) provide better error message than just "implicit not found"
+ // 2) provide the type of the implicit parameter for which we got diverging expansion
+ // (pt at the point of divergence gives less information to the user)
+ // Note: it is safe to delay error message generation in this case
+ // becasue we don't modify implicits' infos.
+ // only issued when -Xdivergence211 is turned on
+ case class DivergentImplicitTypeError(tree: Tree, pt0: Type, sym: Symbol) extends AbsTypeError {
+ def errPos: Position = tree.pos
+ def errMsg: String = errMsgForPt(pt0)
+ def kind = ErrorKinds.Divergent
+ def withPt(pt: Type): AbsTypeError = NormalTypeError(tree, errMsgForPt(pt), kind)
+ private def errMsgForPt(pt: Type) =
+ s"diverging implicit expansion for type ${pt}\nstarting with ${sym.fullLocationString}"
+ }
+
case class AmbiguousTypeError(underlyingTree: Tree, errPos: Position, errMsg: String, kind: ErrorKind = ErrorKinds.Ambiguous) extends AbsTypeError
case class PosAndMsgTypeError(errPos: Position, errMsg: String, kind: ErrorKind = ErrorKinds.Normal) extends AbsTypeError
@@ -73,6 +91,7 @@ trait ContextErrors {
issueTypeError(SymbolTypeError(sym, msg))
}
+ // only called when -Xdivergence211 is turned off
def issueDivergentImplicitsError(tree: Tree, msg: String)(implicit context: Context) {
issueTypeError(NormalTypeError(tree, msg, ErrorKinds.Divergent))
}
@@ -1152,9 +1171,13 @@ trait ContextErrors {
}
def DivergingImplicitExpansionError(tree: Tree, pt: Type, sym: Symbol)(implicit context0: Context) =
- issueDivergentImplicitsError(tree,
- "diverging implicit expansion for type "+pt+"\nstarting with "+
- sym.fullLocationString)
+ if (settings.Xdivergence211.value) {
+ issueTypeError(DivergentImplicitTypeError(tree, pt, sym))
+ } else {
+ issueDivergentImplicitsError(tree,
+ "diverging implicit expansion for type "+pt+"\nstarting with "+
+ sym.fullLocationString)
+ }
}
object NamesDefaultsErrorsGen {
@@ -1284,6 +1307,11 @@ trait ContextErrors {
throw MacroBodyTypecheckException // don't call fail, because we don't need IS_ERROR
}
+ def MacroDefIsQmarkQmarkQmark() = {
+ macroLogVerbose("typecheck terminated unexpectedly: macro is ???")
+ throw MacroBodyTypecheckException
+ }
+
def MacroFeatureNotEnabled() = {
macroLogVerbose("typecheck terminated unexpectedly: language.experimental.macros feature is not enabled")
fail()
diff --git a/src/compiler/scala/tools/nsc/typechecker/Contexts.scala b/src/compiler/scala/tools/nsc/typechecker/Contexts.scala
index 6c2945cad3..3fe98ed127 100644
--- a/src/compiler/scala/tools/nsc/typechecker/Contexts.scala
+++ b/src/compiler/scala/tools/nsc/typechecker/Contexts.scala
@@ -120,7 +120,7 @@ trait Contexts { self: Analyzer =>
// not inherited to child contexts
var depth: Int = 0
var imports: List[ImportInfo] = List() // currently visible imports
- var openImplicits: List[(Type,Tree)] = List() // types for which implicit arguments
+ var openImplicits: List[OpenImplicit] = List() // types for which implicit arguments
// are currently searched
// for a named application block (Tree) the corresponding NamedApplyInfo
var namedApplyBlockInfo: Option[(Tree, NamedApplyInfo)] = None
diff --git a/src/compiler/scala/tools/nsc/typechecker/Implicits.scala b/src/compiler/scala/tools/nsc/typechecker/Implicits.scala
index 01ae0a7a94..770aef5d61 100644
--- a/src/compiler/scala/tools/nsc/typechecker/Implicits.scala
+++ b/src/compiler/scala/tools/nsc/typechecker/Implicits.scala
@@ -80,7 +80,7 @@ trait Implicits {
printTyping("typing implicit: %s %s".format(tree, context.undetparamsString))
val implicitSearchContext = context.makeImplicit(reportAmbiguous)
val result = new ImplicitSearch(tree, pt, isView, implicitSearchContext, pos).bestImplicit
- if (saveAmbiguousDivergent && implicitSearchContext.hasErrors) {
+ if ((result.isFailure || !settings.Xdivergence211.value) && saveAmbiguousDivergent && implicitSearchContext.hasErrors) {
context.updateBuffer(implicitSearchContext.errBuffer.filter(err => err.kind == ErrorKinds.Ambiguous || err.kind == ErrorKinds.Divergent))
debugwarn("update buffer: " + implicitSearchContext.errBuffer)
}
@@ -96,6 +96,31 @@ trait Implicits {
result
}
+ /** A friendly wrapper over inferImplicit to be used in macro contexts and toolboxes.
+ */
+ def inferImplicit(tree: Tree, pt: Type, isView: Boolean, context: Context, silent: Boolean, withMacrosDisabled: Boolean, pos: Position, onError: (Position, String) => Unit): Tree = {
+ val wrapper1 = if (!withMacrosDisabled) (context.withMacrosEnabled[SearchResult] _) else (context.withMacrosDisabled[SearchResult] _)
+ def wrapper(inference: => SearchResult) = wrapper1(inference)
+ def fail(reason: Option[String]) = {
+ if (!silent) {
+ if (context.hasErrors) onError(context.errBuffer.head.errPos, context.errBuffer.head.errMsg)
+ else onError(pos, reason getOrElse "implicit search has failed. to find out the reason, turn on -Xlog-implicits")
+ }
+ EmptyTree
+ }
+ try {
+ wrapper(inferImplicit(tree, pt, reportAmbiguous = true, isView = isView, context = context, saveAmbiguousDivergent = !silent, pos = pos)) match {
+ case failure if failure.tree.isEmpty => fail(None)
+ case success => success.tree
+ }
+ } catch {
+ case ex: DivergentImplicit =>
+ if (settings.Xdivergence211.value)
+ debugwarn("this shouldn't happen. DivergentImplicit exception has been thrown with -Xdivergence211 turned on: "+ex)
+ fail(Some("divergent implicit expansion"))
+ }
+ }
+
/** Find all views from type `tp` (in which `tpars` are free)
*
* Note that the trees in the search results in the returned list share the same type variables.
@@ -149,16 +174,24 @@ trait Implicits {
class SearchResult(val tree: Tree, val subst: TreeTypeSubstituter) {
override def toString = "SearchResult(%s, %s)".format(tree,
if (subst.isEmpty) "" else subst)
-
+
def isFailure = false
def isAmbiguousFailure = false
+ // only used when -Xdivergence211 is turned on
+ def isDivergent = false
final def isSuccess = !isFailure
}
lazy val SearchFailure = new SearchResult(EmptyTree, EmptyTreeTypeSubstituter) {
override def isFailure = true
}
-
+
+ // only used when -Xdivergence211 is turned on
+ lazy val DivergentSearchFailure = new SearchResult(EmptyTree, EmptyTreeTypeSubstituter) {
+ override def isFailure = true
+ override def isDivergent = true
+ }
+
lazy val AmbiguousSearchFailure = new SearchResult(EmptyTree, EmptyTreeTypeSubstituter) {
override def isFailure = true
override def isAmbiguousFailure = true
@@ -219,6 +252,10 @@ trait Implicits {
override def toString = name + ": " + tpe
}
+ /** A class which is used to track pending implicits to prevent infinite implicit searches.
+ */
+ case class OpenImplicit(info: ImplicitInfo, pt: Type, tree: Tree)
+
/** A sentinel indicating no implicit was found */
val NoImplicitInfo = new ImplicitInfo(null, NoType, NoSymbol) {
// equals used to be implemented in ImplicitInfo with an `if(this eq NoImplicitInfo)`
@@ -406,20 +443,34 @@ trait Implicits {
* @pre `info.tpe` does not contain an error
*/
private def typedImplicit(info: ImplicitInfo, ptChecked: Boolean, isLocal: Boolean): SearchResult = {
- (context.openImplicits find { case (tp, tree1) => tree1.symbol == tree.symbol && dominates(pt, tp)}) match {
+ // SI-7167 let implicit macros decide what amounts for a divergent implicit search
+ // imagine a macro writer which wants to synthesize a complex implicit Complex[T] by making recursive calls to Complex[U] for its parts
+ // e.g. we have `class Foo(val bar: Bar)` and `class Bar(val x: Int)`
+ // then it's quite reasonable for the macro writer to synthesize Complex[Foo] by calling `inferImplicitValue(typeOf[Complex[Bar])`
+ // however if we didn't insert the `info.sym.isMacro` check here, then under some circumstances
+ // (e.g. as described here http://groups.google.com/group/scala-internals/browse_thread/thread/545462b377b0ac0a)
+ // `dominates` might decide that `Bar` dominates `Foo` and therefore a recursive implicit search should be prohibited
+ // now when we yield control of divergent expansions to the macro writer, what happens next?
+ // in the worst case, if the macro writer is careless, we'll get a StackOverflowException from repeated macro calls
+ // otherwise, the macro writer could check `c.openMacros` and `c.openImplicits` and do `c.abort` when expansions are deemed to be divergent
+ // upon receiving `c.abort` the typechecker will decide that the corresponding implicit search has failed
+ // which will fail the entire stack of implicit searches, producing a nice error message provided by the programmer
+ (context.openImplicits find { case OpenImplicit(info, tp, tree1) => !info.sym.isMacro && tree1.symbol == tree.symbol && dominates(pt, tp)}) match {
case Some(pending) =>
//println("Pending implicit "+pending+" dominates "+pt+"/"+undetParams) //@MDEBUG
- throw DivergentImplicit
+ if (settings.Xdivergence211.value) DivergentSearchFailure
+ else throw DivergentImplicit
case None =>
+ def pre211DivergenceLogic() = {
try {
- context.openImplicits = (pt, tree) :: context.openImplicits
+ context.openImplicits = OpenImplicit(info, pt, tree) :: context.openImplicits
// println(" "*context.openImplicits.length+"typed implicit "+info+" for "+pt) //@MDEBUG
typedImplicit0(info, ptChecked, isLocal)
} catch {
case ex: DivergentImplicit =>
//println("DivergentImplicit for pt:"+ pt +", open implicits:"+context.openImplicits) //@MDEBUG
if (context.openImplicits.tail.isEmpty) {
- if (!(pt.isErroneous))
+ if (!pt.isErroneous && !info.sym.isMacro)
DivergingImplicitExpansionError(tree, pt, info.sym)(context)
SearchFailure
} else {
@@ -428,6 +479,24 @@ trait Implicits {
} finally {
context.openImplicits = context.openImplicits.tail
}
+ }
+ def post211DivergenceLogic() = {
+ try {
+ context.openImplicits = OpenImplicit(info, pt, tree) :: context.openImplicits
+ // println(" "*context.openImplicits.length+"typed implicit "+info+" for "+pt) //@MDEBUG
+ val result = typedImplicit0(info, ptChecked, isLocal)
+ if (result.isDivergent) {
+ //println("DivergentImplicit for pt:"+ pt +", open implicits:"+context.openImplicits) //@MDEBUG
+ if (context.openImplicits.tail.isEmpty && !pt.isErroneous)
+ DivergingImplicitExpansionError(tree, pt, info.sym)(context)
+ }
+ result
+ } finally {
+ context.openImplicits = context.openImplicits.tail
+ }
+ }
+ if (settings.Xdivergence211.value) post211DivergenceLogic()
+ else pre211DivergenceLogic()
}
}
@@ -796,6 +865,9 @@ trait Implicits {
/** Preventing a divergent implicit from terminating implicit search,
* so that if there is a best candidate it can still be selected.
+ *
+ * The old way of handling divergence.
+ * Only enabled when -Xdivergence211 is turned off.
*/
private var divergence = false
private val divergenceHandler: PartialFunction[Throwable, SearchResult] = {
@@ -808,6 +880,28 @@ trait Implicits {
}
}
+ /** Preventing a divergent implicit from terminating implicit search,
+ * so that if there is a best candidate it can still be selected.
+ *
+ * The new way of handling divergence.
+ * Only enabled when -Xdivergence211 is turned on.
+ */
+ object DivergentImplicitRecovery {
+ // symbol of the implicit that caused the divergence.
+ // Initially null, will be saved on first diverging expansion.
+ private var implicitSym: Symbol = _
+ private var countdown: Int = 1
+
+ def sym: Symbol = implicitSym
+ def apply(search: SearchResult, i: ImplicitInfo): SearchResult =
+ if (search.isDivergent && countdown > 0) {
+ countdown -= 1
+ implicitSym = i.sym
+ log("discarding divergent implicit ${implicitSym} during implicit search")
+ SearchFailure
+ } else search
+ }
+
/** Sorted list of eligible implicits.
*/
val eligible = {
@@ -834,11 +928,20 @@ trait Implicits {
@tailrec private def rankImplicits(pending: Infos, acc: Infos): Infos = pending match {
case Nil => acc
case i :: is =>
- def tryImplicitInfo(i: ImplicitInfo) =
+ def pre211tryImplicitInfo(i: ImplicitInfo) =
try typedImplicit(i, ptChecked = true, isLocal)
catch divergenceHandler
- tryImplicitInfo(i) match {
+ def post211tryImplicitInfo(i: ImplicitInfo) =
+ DivergentImplicitRecovery(typedImplicit(i, ptChecked = true, isLocal), i)
+
+ {
+ if (settings.Xdivergence211.value) post211tryImplicitInfo(i)
+ else pre211tryImplicitInfo(i)
+ } match {
+ // only used if -Xdivergence211 is turned on
+ case sr if sr.isDivergent =>
+ Nil
case sr if sr.isFailure =>
// We don't want errors that occur during checking implicit info
// to influence the check of further infos.
@@ -890,13 +993,22 @@ trait Implicits {
/** If there is no winner, and we witnessed and caught divergence,
* now we can throw it for the error message.
*/
- if (divergence)
- throw DivergentImplicit
-
- if (invalidImplicits.nonEmpty)
+ if (divergence || DivergentImplicitRecovery.sym != null) {
+ if (settings.Xdivergence211.value) DivergingImplicitExpansionError(tree, pt, DivergentImplicitRecovery.sym)(context)
+ else throw DivergentImplicit
+ } else invalidImplicits take 1 foreach { sym =>
+ def isSensibleAddendum = pt match {
+ case Function1(_, out) => out <:< sym.tpe.finalResultType
+ case _ => pt <:< sym.tpe.finalResultType
+ }
+ // Don't pitch in with this theory unless it looks plausible that the
+ // implicit would have helped
setAddendum(pos, () =>
- "\n Note: implicit "+invalidImplicits.head+" is not applicable here"+
- " because it comes after the application point and it lacks an explicit result type")
+ if (isSensibleAddendum)
+ s"\n Note: implicit $sym is not applicable here because it comes after the application point and it lacks an explicit result type"
+ else ""
+ )
+ }
}
best
@@ -1453,5 +1565,6 @@ object ImplicitsStats {
val implicitCacheHits = Statistics.newSubCounter("implicit cache hits", implicitCacheAccs)
}
+// only used when -Xdivergence211 is turned off
class DivergentImplicit extends Exception
object DivergentImplicit extends DivergentImplicit
diff --git a/src/compiler/scala/tools/nsc/typechecker/Macros.scala b/src/compiler/scala/tools/nsc/typechecker/Macros.scala
index 0ba30ffa73..816f977890 100644
--- a/src/compiler/scala/tools/nsc/typechecker/Macros.scala
+++ b/src/compiler/scala/tools/nsc/typechecker/Macros.scala
@@ -341,11 +341,14 @@ trait Macros extends scala.tools.reflect.FastTrack with Traces {
}
import SigGenerator._
- macroTraceVerbose("generating macroImplSigs for: ")(macroDef)
- macroTraceVerbose("tparams are: ")(tparams)
- macroTraceVerbose("vparamss are: ")(vparamss)
- macroTraceVerbose("retTpe is: ")(retTpe)
- macroTraceVerbose("macroImplSig is: ")((paramss, implRetTpe))
+ macroLogVerbose(sm"""
+ |generating macroImplSigs for: $macroDef
+ |tparams are: $tparams
+ |vparamss are: $vparamss
+ |retTpe is: $retTpe
+ |macroImplSig is: $paramss, $implRetTpe
+ """.trim)
+ (paramss, implRetTpe)
}
/** Verifies that the body of a macro def typechecks to a reference to a static public non-overloaded method,
@@ -388,6 +391,8 @@ trait Macros extends scala.tools.reflect.FastTrack with Traces {
} finally {
popMacroContext()
}
+ case Delay(delayed) =>
+ typer.instantiate(delayed, EXPRmode, WildcardType)
case Fallback(fallback) =>
typer.typed1(fallback, EXPRmode, WildcardType)
case Other(result) =>
@@ -407,6 +412,9 @@ trait Macros extends scala.tools.reflect.FastTrack with Traces {
// Phase II: typecheck the right-hand side of the macro def
val typed = typecheckRhs(macroDdef.rhs)
typed match {
+ case MacroImplReference(_, meth, _) if meth == Predef_??? =>
+ bindMacroImpl(macroDef, typed)
+ MacroDefIsQmarkQmarkQmark()
case MacroImplReference(owner, meth, targs) =>
if (!meth.isMethod) MacroDefInvalidBodyError()
if (!meth.isPublic) MacroImplNotPublicError()
@@ -505,7 +513,7 @@ trait Macros extends scala.tools.reflect.FastTrack with Traces {
type MacroRuntime = MacroArgs => Any
private val macroRuntimesCache = perRunCaches.newWeakMap[Symbol, MacroRuntime]
private def macroRuntime(macroDef: Symbol): MacroRuntime = {
- macroTraceVerbose("looking for macro implementation: ")(macroDef)
+ macroLogVerbose(s"looking for macro implementation: $macroDef")
if (fastTrack contains macroDef) {
macroLogVerbose("macro expansion is serviced by a fast track")
fastTrack(macroDef)
@@ -516,26 +524,30 @@ trait Macros extends scala.tools.reflect.FastTrack with Traces {
val methName = binding.methName
macroLogVerbose(s"resolved implementation as $className.$methName")
- // I don't use Scala reflection here, because it seems to interfere with JIT magic
- // whenever you instantiate a mirror (and not do anything with in, just instantiate), performance drops by 15-20%
- // I'm not sure what's the reason - for me it's pure voodoo
- // upd. my latest experiments show that everything's okay
- // it seems that in 2.10.1 we can easily switch to Scala reflection
- try {
- macroTraceVerbose("loading implementation class: ")(className)
- macroTraceVerbose("classloader is: ")(ReflectionUtils.show(macroClassloader))
- val implObj = ReflectionUtils.staticSingletonInstance(macroClassloader, className)
- // relies on the fact that macro impls cannot be overloaded
- // so every methName can resolve to at maximum one method
- val implMeths = implObj.getClass.getDeclaredMethods.find(_.getName == methName)
- val implMeth = implMeths getOrElse { throw new NoSuchMethodException(s"$className.$methName") }
- macroLogVerbose("successfully loaded macro impl as (%s, %s)".format(implObj, implMeth))
- args => implMeth.invoke(implObj, ((args.c +: args.others) map (_.asInstanceOf[AnyRef])): _*)
- } catch {
- case ex: Exception =>
- macroTraceVerbose(s"macro runtime failed to load: ")(ex.toString)
- macroDef setFlag IS_ERROR
- null
+ if (binding.className == Predef_???.owner.fullName.toString && binding.methName == Predef_???.name.encoded) {
+ args => throw new AbortMacroException(args.c.enclosingPosition, "macro implementation is missing")
+ } else {
+ // I don't use Scala reflection here, because it seems to interfere with JIT magic
+ // whenever you instantiate a mirror (and not do anything with in, just instantiate), performance drops by 15-20%
+ // I'm not sure what's the reason - for me it's pure voodoo
+ // upd. my latest experiments show that everything's okay
+ // it seems that in 2.10.1 we can easily switch to Scala reflection
+ try {
+ macroLogVerbose(s"loading implementation class: $className")
+ macroLogVerbose(s"classloader is: ${ReflectionUtils.show(macroClassloader)}")
+ val implObj = ReflectionUtils.staticSingletonInstance(macroClassloader, className)
+ // relies on the fact that macro impls cannot be overloaded
+ // so every methName can resolve to at maximum one method
+ val implMeths = implObj.getClass.getDeclaredMethods.find(_.getName == methName)
+ val implMeth = implMeths getOrElse { throw new NoSuchMethodException(s"$className.$methName") }
+ macroLogVerbose(s"successfully loaded macro impl as ($implObj, $implMeth)")
+ args => implMeth.invoke(implObj, ((args.c +: args.others) map (_.asInstanceOf[AnyRef])): _*)
+ } catch {
+ case ex: Exception =>
+ macroLogVerbose(s"macro runtime failed to load: ${ex.toString}")
+ macroDef setFlag IS_ERROR
+ null
+ }
}
})
}
@@ -577,8 +589,8 @@ trait Macros extends scala.tools.reflect.FastTrack with Traces {
if (argcDoesntMatch && !nullaryArgsEmptyParams) { typer.TyperErrorGen.MacroPartialApplicationError(expandee) }
val argss: List[List[Any]] = exprArgs.toList
- macroTraceVerbose("context: ")(context)
- macroTraceVerbose("argss: ")(argss)
+ macroLogVerbose(s"context: $context")
+ macroLogVerbose(s"argss: $argss")
val preparedArgss: List[List[Any]] =
if (fastTrack contains macroDef) {
@@ -602,7 +614,7 @@ trait Macros extends scala.tools.reflect.FastTrack with Traces {
// whereas V won't be resolved by asSeenFrom and need to be loaded directly from `expandee` which needs to contain a TypeApply node
// also, macro implementation reference may contain a regular type as a type argument, then we pass it verbatim
val binding = loadMacroImplBinding(macroDef)
- macroTraceVerbose("binding: ")(binding)
+ macroLogVerbose(s"binding: $binding")
val tags = binding.signature filter (_ != -1) map (paramPos => {
val targ = binding.targs(paramPos).tpe.typeSymbol
val tpe = if (targ.isTypeParameterOrSkolem) {
@@ -620,7 +632,7 @@ trait Macros extends scala.tools.reflect.FastTrack with Traces {
targ.tpe
context.WeakTypeTag(tpe)
})
- macroTraceVerbose("tags: ")(tags)
+ macroLogVerbose(s"tags: $tags")
// transforms argss taking into account varargness of paramss
// note that typetag context bounds are only declared on macroImpls
@@ -637,7 +649,7 @@ trait Macros extends scala.tools.reflect.FastTrack with Traces {
} else as
})
}
- macroTraceVerbose("preparedArgss: ")(preparedArgss)
+ macroLogVerbose(s"preparedArgss: $preparedArgss")
MacroArgs(context, preparedArgss.flatten)
}
@@ -652,9 +664,9 @@ trait Macros extends scala.tools.reflect.FastTrack with Traces {
private sealed abstract class MacroExpansionResult
private case class Success(expanded: Tree) extends MacroExpansionResult
+ private case class Delay(delayed: Tree) extends MacroExpansionResult
private case class Fallback(fallback: Tree) extends MacroExpansionResult { currentRun.seenMacroExpansionsFallingBack = true }
private case class Other(result: Tree) extends MacroExpansionResult
- private def Delay(expanded: Tree) = Other(expanded)
private def Skip(expanded: Tree) = Other(expanded)
private def Cancel(expandee: Tree) = Other(expandee)
private def Failure(expandee: Tree) = Other(expandee)
@@ -695,7 +707,8 @@ trait Macros extends scala.tools.reflect.FastTrack with Traces {
val numErrors = reporter.ERROR.count
def hasNewErrors = reporter.ERROR.count > numErrors
val result = typer.context.withImplicitsEnabled(typer.typed(tree, EXPRmode, pt))
- macroTraceVerbose(s"""${if (hasNewErrors) "failed to typecheck" else "successfully typechecked"} against $phase $pt:\n$result\n""")(result)
+ macroLogVerbose(s"""${if (hasNewErrors) "failed to typecheck" else "successfully typechecked"} against $phase $pt:\n$result""")
+ result
}
var expectedTpe = expandee.tpe
@@ -706,6 +719,54 @@ trait Macros extends scala.tools.reflect.FastTrack with Traces {
} finally {
popMacroContext()
}
+ case Delay(delayed) =>
+ // =========== THE SITUATION ===========
+ //
+ // If we've been delayed (i.e. bailed out of the expansion because of undetermined type params present in the expandee),
+ // then there are two possible situations we're in:
+ //
+ // 1) We're in POLYmode, when the typer tests the waters wrt type inference
+ // (e.g. as in typedArgToPoly in doTypedApply).
+ //
+ // 2) We're out of POLYmode, which means that the typer is out of tricks to infer our type
+ // (e.g. if we're an argument to a function call, then this means that no previous argument lists
+ // can determine our type variables for us).
+ //
+ // Situation #1 is okay for us, since there's no pressure. In POLYmode we're just verifying that
+ // there's nothing outrageously wrong with our undetermined type params (from what I understand!).
+ //
+ // Situation #2 requires measures to be taken. If we're in it, then noone's going to help us infer
+ // the undetermined type params. Therefore we need to do something ourselves or otherwise this
+ // expandee will forever remaing not expanded (see SI-5692).
+ //
+ // A traditional way out of this conundrum is to call `instantiate` and let the inferencer
+ // try to find the way out. It works for simple cases, but sometimes, if the inferencer lacks
+ // information, it will be forced to approximate.
+ //
+ // =========== THE PROBLEM ===========
+ //
+ // Consider the following example (thanks, Miles!):
+ //
+ // // Iso represents an isomorphism between two datatypes:
+ // // 1) An arbitrary one (e.g. a random case class)
+ // // 2) A uniform representation for all datatypes (e.g. an HList)
+ // trait Iso[T, U] {
+ // def to(t : T) : U
+ // def from(u : U) : T
+ // }
+ // implicit def materializeIso[T, U]: Iso[T, U] = macro ???
+ //
+ // case class Foo(i: Int, s: String, b: Boolean)
+ // def foo[C, L](c: C)(implicit iso: Iso[C, L]): L = iso.to(c)
+ // foo(Foo(23, "foo", true))
+ //
+ // In the snippet above, even though we know that there's a fundep going from T to U
+ // (in a sense that a datatype's uniform representation is unambiguously determined by the datatype,
+ // e.g. for Foo it will be Int :: String :: Boolean :: HNil), there's no way to convey this information
+ // to the typechecker. Therefore the typechecker will infer Nothing for L, which is hardly what we want.
+ val shouldInstantiate = typer.context.undetparams.nonEmpty && !inPolyMode(mode)
+ if (shouldInstantiate) typer.instantiatePossiblyExpectingUnit(delayed, mode, pt)
+ else delayed
case Fallback(fallback) =>
typer.context.withImplicitsEnabled(typer.typed(fallback, EXPRmode, pt))
case Other(result) =>
@@ -724,7 +785,7 @@ trait Macros extends scala.tools.reflect.FastTrack with Traces {
withInfoLevel(nodePrinters.InfoLevel.Quiet) {
if (expandee.symbol.isErroneous || (expandee exists (_.isErroneous))) {
val reason = if (expandee.symbol.isErroneous) "not found or incompatible macro implementation" else "erroneous arguments"
- macroTraceVerbose("cancelled macro expansion because of %s: ".format(reason))(expandee)
+ macroLogVerbose(s"cancelled macro expansion because of $reason: $expandee")
return Cancel(typer.infer.setError(expandee))
}
@@ -746,8 +807,12 @@ trait Macros extends scala.tools.reflect.FastTrack with Traces {
val nowDelayed = !typer.context.macrosEnabled || undetparams.nonEmpty
(wasDelayed, nowDelayed) match {
- case (true, true) => Delay(expandee)
- case (true, false) => Skip(macroExpandAll(typer, expandee))
+ case (true, true) =>
+ Delay(expandee)
+ case (true, false) =>
+ val expanded = macroExpandAll(typer, expandee)
+ if (expanded exists (_.isErroneous)) Failure(expandee)
+ else Skip(expanded)
case (false, true) =>
macroLogLite("macro expansion is delayed: %s".format(expandee))
delayed += expandee -> undetparams
@@ -794,7 +859,7 @@ trait Macros extends scala.tools.reflect.FastTrack with Traces {
private def macroExpandWithoutRuntime(typer: Typer, expandee: Tree): MacroExpansionResult = {
import typer.TyperErrorGen._
val fallbackSym = expandee.symbol.nextOverriddenSymbol orElse MacroImplementationNotFoundError(expandee)
- macroTraceLite("falling back to: ")(fallbackSym)
+ macroLogLite(s"falling back to: $fallbackSym")
def mkFallbackTree(tree: Tree): Tree = {
tree match {
@@ -846,7 +911,7 @@ trait Macros extends scala.tools.reflect.FastTrack with Traces {
undetparams --= undetNoMore map (_.id)
if (undetparams.isEmpty) {
hasPendingMacroExpansions = true
- macroTraceVerbose("macro expansion is pending: ")(expandee)
+ macroLogVerbose(s"macro expansion is pending: $expandee")
}
case _ =>
// do nothing
diff --git a/src/compiler/scala/tools/nsc/typechecker/RefChecks.scala b/src/compiler/scala/tools/nsc/typechecker/RefChecks.scala
index 4933b8e0cb..03ce710700 100644
--- a/src/compiler/scala/tools/nsc/typechecker/RefChecks.scala
+++ b/src/compiler/scala/tools/nsc/typechecker/RefChecks.scala
@@ -383,7 +383,7 @@ abstract class RefChecks extends InfoTransform with scala.reflect.internal.trans
overrideError("cannot be used here - classes can only override abstract types");
} else if (other.isEffectivelyFinal) { // (1.2)
overrideError("cannot override final member");
- } else if (!other.isDeferred && !member.isAnyOverride && !member.isSynthetic) { // (*)
+ } else if (!other.isDeferred && !other.hasFlag(DEFAULTMETHOD) && !member.isAnyOverride && !member.isSynthetic) { // (*)
// (*) Synthetic exclusion for (at least) default getters, fixes SI-5178. We cannot assign the OVERRIDE flag to
// the default getter: one default getter might sometimes override, sometimes not. Example in comment on ticket.
if (isNeitherInClass && !(other.owner isSubClass member.owner))
@@ -1063,9 +1063,13 @@ abstract class RefChecks extends InfoTransform with scala.reflect.internal.trans
case _ =>
}
+ private def isObjectOrAnyComparisonMethod(sym: Symbol) = sym match {
+ case Object_eq | Object_ne | Object_== | Object_!= | Any_== | Any_!= => true
+ case _ => false
+ }
def checkSensible(pos: Position, fn: Tree, args: List[Tree]) = fn match {
- case Select(qual, name @ (nme.EQ | nme.NE | nme.eq | nme.ne)) if args.length == 1 =>
- def isReferenceOp = name == nme.eq || name == nme.ne
+ case Select(qual, name @ (nme.EQ | nme.NE | nme.eq | nme.ne)) if args.length == 1 && isObjectOrAnyComparisonMethod(fn.symbol) =>
+ def isReferenceOp = fn.symbol == Object_eq || fn.symbol == Object_ne
def isNew(tree: Tree) = tree match {
case Function(_, _)
| Apply(Select(New(_), nme.CONSTRUCTOR), _) => true
diff --git a/src/compiler/scala/tools/nsc/typechecker/SuperAccessors.scala b/src/compiler/scala/tools/nsc/typechecker/SuperAccessors.scala
index fa72ad64bf..bad49385aa 100644
--- a/src/compiler/scala/tools/nsc/typechecker/SuperAccessors.scala
+++ b/src/compiler/scala/tools/nsc/typechecker/SuperAccessors.scala
@@ -80,32 +80,11 @@ abstract class SuperAccessors extends transform.Transform with transform.TypingT
private def transformArgs(params: List[Symbol], args: List[Tree]) = {
treeInfo.mapMethodParamsAndArgs(params, args) { (param, arg) =>
if (isByNameParamType(param.tpe))
- withInvalidOwner { checkPackedConforms(transform(arg), param.tpe.typeArgs.head) }
+ withInvalidOwner(transform(arg))
else transform(arg)
}
}
- private def checkPackedConforms(tree: Tree, pt: Type): Tree = {
- def typeError(typer: analyzer.Typer, pos: Position, found: Type, req: Type) {
- if (!found.isErroneous && !req.isErroneous) {
- val msg = analyzer.ErrorUtils.typeErrorMsg(found, req, typer.infer.isPossiblyMissingArgs(found, req))
- typer.context.error(pos, analyzer.withAddendum(pos)(msg))
- if (settings.explaintypes.value)
- explainTypes(found, req)
- }
- }
-
- if (tree.tpe exists (_.typeSymbol.isExistentialSkolem)) {
- val packed = localTyper.packedType(tree, NoSymbol)
- if (!(packed <:< pt)) {
- val errorContext = localTyper.context.make(localTyper.context.tree)
- errorContext.setReportErrors()
- typeError(analyzer.newTyper(errorContext), tree.pos, packed, pt)
- }
- }
- tree
- }
-
/** Check that a class and its companion object to not both define
* a class or module with same name
*/
diff --git a/src/compiler/scala/tools/nsc/typechecker/Typers.scala b/src/compiler/scala/tools/nsc/typechecker/Typers.scala
index bb3ebb4e0f..c59ef4ebda 100644
--- a/src/compiler/scala/tools/nsc/typechecker/Typers.scala
+++ b/src/compiler/scala/tools/nsc/typechecker/Typers.scala
@@ -138,13 +138,18 @@ trait Typers extends Modes with Adaptations with Tags {
mkArg = mkNamedArg // don't pass the default argument (if any) here, but start emitting named arguments for the following args
if (!param.hasDefault && !paramFailed) {
context.errBuffer.find(_.kind == ErrorKinds.Divergent) match {
- case Some(divergentImplicit) =>
+ case Some(divergentImplicit) if !settings.Xdivergence211.value =>
// DivergentImplicit error has higher priority than "no implicit found"
// no need to issue the problem again if we are still in silent mode
if (context.reportErrors) {
context.issue(divergentImplicit)
context.condBufferFlush(_.kind == ErrorKinds.Divergent)
}
+ case Some(divergentImplicit: DivergentImplicitTypeError) if settings.Xdivergence211.value =>
+ if (context.reportErrors) {
+ context.issue(divergentImplicit.withPt(paramTp))
+ context.condBufferFlush(_.kind == ErrorKinds.Divergent)
+ }
case None =>
NoImplicitFoundError(fun, param)
}
@@ -1149,12 +1154,9 @@ trait Typers extends Modes with Adaptations with Tags {
adaptConstrPattern()
else if (shouldInsertApply(tree))
insertApply()
- else if (!context.undetparams.isEmpty && !inPolyMode(mode)) { // (9)
+ else if (context.undetparams.nonEmpty && !inPolyMode(mode)) { // (9)
assert(!inHKMode(mode), modeString(mode)) //@M
- if (inExprModeButNot(mode, FUNmode) && pt.typeSymbol == UnitClass)
- instantiateExpectingUnit(tree, mode)
- else
- instantiate(tree, mode, pt)
+ instantiatePossiblyExpectingUnit(tree, mode, pt)
} else if (tree.tpe <:< pt) {
tree
} else {
@@ -1302,6 +1304,13 @@ trait Typers extends Modes with Adaptations with Tags {
}
}
+ def instantiatePossiblyExpectingUnit(tree: Tree, mode: Int, pt: Type): Tree = {
+ if (inExprModeButNot(mode, FUNmode) && pt.typeSymbol == UnitClass)
+ instantiateExpectingUnit(tree, mode)
+ else
+ instantiate(tree, mode, pt)
+ }
+
private def isAdaptableWithView(qual: Tree) = {
val qtpe = qual.tpe.widen
( !isPastTyper
@@ -3055,8 +3064,7 @@ trait Typers extends Modes with Adaptations with Tags {
else if (isByNameParamType(formals.head)) 0
else BYVALmode
)
- var tree = typedArg(args.head, mode, typedMode, adapted.head)
- if (hasPendingMacroExpansions) tree = macroExpandAll(this, tree)
+ val tree = typedArg(args.head, mode, typedMode, adapted.head)
// formals may be empty, so don't call tail
tree :: loop(args.tail, formals drop 1, adapted.tail)
}
@@ -5608,7 +5616,12 @@ trait Typers extends Modes with Adaptations with Tags {
}
tree1.tpe = pluginsTyped(tree1.tpe, this, tree1, mode, ptPlugins)
- val result = if (tree1.isEmpty) tree1 else adapt(tree1, mode, ptPlugins, tree)
+ val result =
+ if (tree1.isEmpty) tree1
+ else {
+ val result = adapt(tree1, mode, ptPlugins, tree)
+ if (hasPendingMacroExpansions) macroExpandAll(this, result) else result
+ }
if (!alreadyTyped) {
printTyping("adapted %s: %s to %s, %s".format(
@@ -5779,7 +5792,7 @@ trait Typers extends Modes with Adaptations with Tags {
tree1
}
- val isMacroBodyOkay = !tree.symbol.isErroneous && !(tree1 exists (_.isErroneous))
+ val isMacroBodyOkay = !tree.symbol.isErroneous && !(tree1 exists (_.isErroneous)) && tree1 != EmptyTree
val shouldInheritMacroImplReturnType = ddef.tpt.isEmpty
if (isMacroBodyOkay && shouldInheritMacroImplReturnType) computeMacroDefTypeFromMacroImpl(ddef, tree1.symbol) else AnyClass.tpe
}
diff --git a/src/compiler/scala/tools/reflect/MacroImplementations.scala b/src/compiler/scala/tools/reflect/MacroImplementations.scala
index ae13cc561f..f4f385f8b3 100644
--- a/src/compiler/scala/tools/reflect/MacroImplementations.scala
+++ b/src/compiler/scala/tools/reflect/MacroImplementations.scala
@@ -4,6 +4,7 @@ import scala.reflect.macros.{ReificationException, UnexpectedReificationExceptio
import scala.reflect.macros.runtime.Context
import scala.collection.mutable.ListBuffer
import scala.collection.mutable.Stack
+import scala.reflect.internal.util.OffsetPosition
abstract class MacroImplementations {
val c: Context
@@ -26,16 +27,12 @@ abstract class MacroImplementations {
c.abort(args(parts.length-1).pos,
"too many arguments for interpolated string")
}
- val stringParts = parts map {
- case Literal(Constant(s: String)) => s;
- case _ => throw new IllegalArgumentException("argument parts must be a list of string literals")
- }
- val pi = stringParts.iterator
+ val pi = parts.iterator
val bldr = new java.lang.StringBuilder
val evals = ListBuffer[ValDef]()
val ids = ListBuffer[Ident]()
- val argsStack = Stack(args : _*)
+ val argStack = Stack(args : _*)
def defval(value: Tree, tpe: Type): Unit = {
val freshName = newTermName(c.fresh("arg$"))
@@ -82,50 +79,73 @@ abstract class MacroImplementations {
}
def copyString(first: Boolean): Unit = {
- val str = StringContext.treatEscapes(pi.next())
+ val strTree = pi.next()
+ val rawStr = strTree match {
+ case Literal(Constant(str: String)) => str
+ case _ => throw new IllegalArgumentException("internal error: argument parts must be a list of string literals")
+ }
+ val str = StringContext.treatEscapes(rawStr)
val strLen = str.length
val strIsEmpty = strLen == 0
- var start = 0
+ def charAtIndexIs(idx: Int, ch: Char) = idx < strLen && str(idx) == ch
+ def isPercent(idx: Int) = charAtIndexIs(idx, '%')
+ def isConversion(idx: Int) = isPercent(idx) && !charAtIndexIs(idx + 1, 'n') && !charAtIndexIs(idx + 1, '%')
var idx = 0
+ def errorAtIndex(idx: Int, msg: String) = c.error(new OffsetPosition(strTree.pos.source, strTree.pos.point + idx), msg)
+ def wrongConversionString(idx: Int) = errorAtIndex(idx, "wrong conversion string")
+ def illegalConversionCharacter(idx: Int) = errorAtIndex(idx, "illegal conversion character")
+ def nonEscapedPercent(idx: Int) = errorAtIndex(idx, "percent signs not directly following splicees must be escaped")
+
+ // STEP 1: handle argument conversion
+ // 1) "...${smth}" => okay, equivalent to "...${smth}%s"
+ // 2) "...${smth}blahblah" => okay, equivalent to "...${smth}%sblahblah"
+ // 3) "...${smth}%" => error
+ // 4) "...${smth}%n" => okay, equivalent to "...${smth}%s%n"
+ // 5) "...${smth}%%" => okay, equivalent to "...${smth}%s%%"
+ // 6) "...${smth}[%legalJavaConversion]" => okay, according to http://docs.oracle.com/javase/1.5.0/docs/api/java/util/Formatter.html
+ // 7) "...${smth}[%illegalJavaConversion]" => error
if (!first) {
- val arg = argsStack.pop
- if (strIsEmpty || (str charAt 0) != '%') {
- bldr append "%s"
- defval(arg, AnyTpe)
- } else {
+ val arg = argStack.pop
+ if (isConversion(0)) {
// PRE str is not empty and str(0) == '%'
// argument index parameter is not allowed, thus parse
// [flags][width][.precision]conversion
var pos = 1
- while(pos < strLen && isFlag(str charAt pos)) pos += 1
- while(pos < strLen && Character.isDigit(str charAt pos)) pos += 1
- if(pos < strLen && str.charAt(pos) == '.') { pos += 1
- while(pos < strLen && Character.isDigit(str charAt pos)) pos += 1
+ while (pos < strLen && isFlag(str charAt pos)) pos += 1
+ while (pos < strLen && Character.isDigit(str charAt pos)) pos += 1
+ if (pos < strLen && str.charAt(pos) == '.') {
+ pos += 1
+ while (pos < strLen && Character.isDigit(str charAt pos)) pos += 1
}
- if(pos < strLen) {
+ if (pos < strLen) {
conversionType(str charAt pos, arg) match {
case Some(tpe) => defval(arg, tpe)
- case None => c.error(arg.pos, "illegal conversion character")
+ case None => illegalConversionCharacter(pos)
}
} else {
- // TODO: place error message on conversion string
- c.error(arg.pos, "wrong conversion string")
+ wrongConversionString(pos - 1)
}
+ idx = 1
+ } else {
+ bldr append "%s"
+ defval(arg, AnyTpe)
}
- idx = 1
}
+
+ // STEP 2: handle the rest of the text
+ // 1) %n tokens are left as is
+ // 2) %% tokens are left as is
+ // 3) other usages of percents are reported as errors
if (!strIsEmpty) {
- val len = str.length
- while (idx < len) {
- def notPercentN = str(idx) != '%' || (idx + 1 < len && str(idx + 1) != 'n')
- if (str(idx) == '%' && notPercentN) {
- bldr append (str substring (start, idx)) append "%%"
- start = idx + 1
+ while (idx < strLen) {
+ if (isPercent(idx)) {
+ if (isConversion(idx)) nonEscapedPercent(idx)
+ else idx += 1 // skip n and % in %n and %%
}
idx += 1
}
- bldr append (str substring (start, idx))
+ bldr append (str take idx)
}
}
diff --git a/src/compiler/scala/tools/reflect/ToolBoxFactory.scala b/src/compiler/scala/tools/reflect/ToolBoxFactory.scala
index c05c59d5ff..4b5dcadfa2 100644
--- a/src/compiler/scala/tools/reflect/ToolBoxFactory.scala
+++ b/src/compiler/scala/tools/reflect/ToolBoxFactory.scala
@@ -181,15 +181,7 @@ abstract class ToolBoxFactory[U <: JavaUniverse](val u: U) { factorySelf =>
transformDuringTyper(tree, withImplicitViewsDisabled = false, withMacrosDisabled = withMacrosDisabled)(
(currentTyper, tree) => {
trace("inferring implicit %s (macros = %s): ".format(if (isView) "view" else "value", !withMacrosDisabled))(showAttributed(pt, true, true, settings.Yshowsymkinds.value))
- val context = currentTyper.context
- analyzer.inferImplicit(tree, pt, reportAmbiguous = true, isView = isView, context = context, saveAmbiguousDivergent = !silent, pos = pos) match {
- case failure if failure.tree.isEmpty =>
- trace("implicit search has failed. to find out the reason, turn on -Xlog-implicits: ")(failure.tree)
- if (context.hasErrors) throw ToolBoxError("reflective implicit search has failed: %s".format(context.errBuffer.head.errMsg))
- EmptyTree
- case success =>
- success.tree
- }
+ analyzer.inferImplicit(tree, pt, isView, currentTyper.context, silent, withMacrosDisabled, pos, (pos, msg) => throw ToolBoxError(msg))
})
def compile(expr0: Tree): () => Any = {
diff --git a/src/forkjoin/scala/concurrent/forkjoin/ForkJoinPool.java b/src/forkjoin/scala/concurrent/forkjoin/ForkJoinPool.java
index 8dbca6da4b..6578504155 100644
--- a/src/forkjoin/scala/concurrent/forkjoin/ForkJoinPool.java
+++ b/src/forkjoin/scala/concurrent/forkjoin/ForkJoinPool.java
@@ -5,12 +5,12 @@
*/
package scala.concurrent.forkjoin;
+
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
-import java.util.Random;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
@@ -18,10 +18,357 @@ import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.AbstractQueuedSynchronizer;
-import java.util.concurrent.locks.Condition;
+
+/**
+ * @since 1.8
+ * @author Doug Lea
+ */
+/*public*/ abstract class CountedCompleter<T> extends ForkJoinTask<T> {
+ private static final long serialVersionUID = 5232453752276485070L;
+
+ /** This task's completer, or null if none */
+ final CountedCompleter<?> completer;
+ /** The number of pending tasks until completion */
+ volatile int pending;
+
+ /**
+ * Creates a new CountedCompleter with the given completer
+ * and initial pending count.
+ *
+ * @param completer this task's completer, or {@code null} if none
+ * @param initialPendingCount the initial pending count
+ */
+ protected CountedCompleter(CountedCompleter<?> completer,
+ int initialPendingCount) {
+ this.completer = completer;
+ this.pending = initialPendingCount;
+ }
+
+ /**
+ * Creates a new CountedCompleter with the given completer
+ * and an initial pending count of zero.
+ *
+ * @param completer this task's completer, or {@code null} if none
+ */
+ protected CountedCompleter(CountedCompleter<?> completer) {
+ this.completer = completer;
+ }
+
+ /**
+ * Creates a new CountedCompleter with no completer
+ * and an initial pending count of zero.
+ */
+ protected CountedCompleter() {
+ this.completer = null;
+ }
+
+ /**
+ * The main computation performed by this task.
+ */
+ public abstract void compute();
+
+ /**
+ * Performs an action when method {@link #tryComplete} is invoked
+ * and the pending count is zero, or when the unconditional
+ * method {@link #complete} is invoked. By default, this method
+ * does nothing. You can distinguish cases by checking the
+ * identity of the given caller argument. If not equal to {@code
+ * this}, then it is typically a subtask that may contain results
+ * (and/or links to other results) to combine.
+ *
+ * @param caller the task invoking this method (which may
+ * be this task itself)
+ */
+ public void onCompletion(CountedCompleter<?> caller) {
+ }
+
+ /**
+ * Performs an action when method {@link #completeExceptionally}
+ * is invoked or method {@link #compute} throws an exception, and
+ * this task has not otherwise already completed normally. On
+ * entry to this method, this task {@link
+ * ForkJoinTask#isCompletedAbnormally}. The return value of this
+ * method controls further propagation: If {@code true} and this
+ * task has a completer, then this completer is also completed
+ * exceptionally. The default implementation of this method does
+ * nothing except return {@code true}.
+ *
+ * @param ex the exception
+ * @param caller the task invoking this method (which may
+ * be this task itself)
+ * @return true if this exception should be propagated to this
+ * task's completer, if one exists
+ */
+ public boolean onExceptionalCompletion(Throwable ex, CountedCompleter<?> caller) {
+ return true;
+ }
+
+ /**
+ * Returns the completer established in this task's constructor,
+ * or {@code null} if none.
+ *
+ * @return the completer
+ */
+ public final CountedCompleter<?> getCompleter() {
+ return completer;
+ }
+
+ /**
+ * Returns the current pending count.
+ *
+ * @return the current pending count
+ */
+ public final int getPendingCount() {
+ return pending;
+ }
+
+ /**
+ * Sets the pending count to the given value.
+ *
+ * @param count the count
+ */
+ public final void setPendingCount(int count) {
+ pending = count;
+ }
+
+ /**
+ * Adds (atomically) the given value to the pending count.
+ *
+ * @param delta the value to add
+ */
+ public final void addToPendingCount(int delta) {
+ int c; // note: can replace with intrinsic in jdk8
+ do {} while (!U.compareAndSwapInt(this, PENDING, c = pending, c+delta));
+ }
+
+ /**
+ * Sets (atomically) the pending count to the given count only if
+ * it currently holds the given expected value.
+ *
+ * @param expected the expected value
+ * @param count the new value
+ * @return true if successful
+ */
+ public final boolean compareAndSetPendingCount(int expected, int count) {
+ return U.compareAndSwapInt(this, PENDING, expected, count);
+ }
+
+ /**
+ * If the pending count is nonzero, (atomically) decrements it.
+ *
+ * @return the initial (undecremented) pending count holding on entry
+ * to this method
+ */
+ public final int decrementPendingCountUnlessZero() {
+ int c;
+ do {} while ((c = pending) != 0 &&
+ !U.compareAndSwapInt(this, PENDING, c, c - 1));
+ return c;
+ }
+
+ /**
+ * Returns the root of the current computation; i.e., this
+ * task if it has no completer, else its completer's root.
+ *
+ * @return the root of the current computation
+ */
+ public final CountedCompleter<?> getRoot() {
+ CountedCompleter<?> a = this, p;
+ while ((p = a.completer) != null)
+ a = p;
+ return a;
+ }
+
+ /**
+ * If the pending count is nonzero, decrements the count;
+ * otherwise invokes {@link #onCompletion} and then similarly
+ * tries to complete this task's completer, if one exists,
+ * else marks this task as complete.
+ */
+ public final void tryComplete() {
+ CountedCompleter<?> a = this, s = a;
+ for (int c;;) {
+ if ((c = a.pending) == 0) {
+ a.onCompletion(s);
+ if ((a = (s = a).completer) == null) {
+ s.quietlyComplete();
+ return;
+ }
+ }
+ else if (U.compareAndSwapInt(a, PENDING, c, c - 1))
+ return;
+ }
+ }
+
+ /**
+ * Equivalent to {@link #tryComplete} but does not invoke {@link
+ * #onCompletion} along the completion path: If the pending count
+ * is nonzero, decrements the count; otherwise, similarly tries to
+ * complete this task's completer, if one exists, else marks this
+ * task as complete. This method may be useful in cases where
+ * {@code onCompletion} should not, or need not, be invoked for
+ * each completer in a computation.
+ */
+ public final void propagateCompletion() {
+ CountedCompleter<?> a = this, s = a;
+ for (int c;;) {
+ if ((c = a.pending) == 0) {
+ if ((a = (s = a).completer) == null) {
+ s.quietlyComplete();
+ return;
+ }
+ }
+ else if (U.compareAndSwapInt(a, PENDING, c, c - 1))
+ return;
+ }
+ }
+
+ /**
+ * Regardless of pending count, invokes {@link #onCompletion},
+ * marks this task as complete and further triggers {@link
+ * #tryComplete} on this task's completer, if one exists. The
+ * given rawResult is used as an argument to {@link #setRawResult}
+ * before invoking {@link #onCompletion} or marking this task as
+ * complete; its value is meaningful only for classes overriding
+ * {@code setRawResult}.
+ *
+ * <p>This method may be useful when forcing completion as soon as
+ * any one (versus all) of several subtask results are obtained.
+ * However, in the common (and recommended) case in which {@code
+ * setRawResult} is not overridden, this effect can be obtained
+ * more simply using {@code quietlyCompleteRoot();}.
+ *
+ * @param rawResult the raw result
+ */
+ public void complete(T rawResult) {
+ CountedCompleter<?> p;
+ setRawResult(rawResult);
+ onCompletion(this);
+ quietlyComplete();
+ if ((p = completer) != null)
+ p.tryComplete();
+ }
+
+
+ /**
+ * If this task's pending count is zero, returns this task;
+ * otherwise decrements its pending count and returns {@code
+ * null}. This method is designed to be used with {@link
+ * #nextComplete} in completion traversal loops.
+ *
+ * @return this task, if pending count was zero, else {@code null}
+ */
+ public final CountedCompleter<?> firstComplete() {
+ for (int c;;) {
+ if ((c = pending) == 0)
+ return this;
+ else if (U.compareAndSwapInt(this, PENDING, c, c - 1))
+ return null;
+ }
+ }
+
+ /**
+ * If this task does not have a completer, invokes {@link
+ * ForkJoinTask#quietlyComplete} and returns {@code null}. Or, if
+ * this task's pending count is non-zero, decrements its pending
+ * count and returns {@code null}. Otherwise, returns the
+ * completer. This method can be used as part of a completion
+ * traversal loop for homogeneous task hierarchies:
+ *
+ * <pre> {@code
+ * for (CountedCompleter<?> c = firstComplete();
+ * c != null;
+ * c = c.nextComplete()) {
+ * // ... process c ...
+ * }}</pre>
+ *
+ * @return the completer, or {@code null} if none
+ */
+ public final CountedCompleter<?> nextComplete() {
+ CountedCompleter<?> p;
+ if ((p = completer) != null)
+ return p.firstComplete();
+ else {
+ quietlyComplete();
+ return null;
+ }
+ }
+
+ /**
+ * Equivalent to {@code getRoot().quietlyComplete()}.
+ */
+ public final void quietlyCompleteRoot() {
+ for (CountedCompleter<?> a = this, p;;) {
+ if ((p = a.completer) == null) {
+ a.quietlyComplete();
+ return;
+ }
+ a = p;
+ }
+ }
+
+ /**
+ * Supports ForkJoinTask exception propagation.
+ */
+ void internalPropagateException(Throwable ex) {
+ CountedCompleter<?> a = this, s = a;
+ while (a.onExceptionalCompletion(ex, s) &&
+ (a = (s = a).completer) != null && a.status >= 0)
+ a.recordExceptionalCompletion(ex);
+ }
+
+ /**
+ * Implements execution conventions for CountedCompleters.
+ */
+ protected final boolean exec() {
+ compute();
+ return false;
+ }
+
+ /**
+ * Returns the result of the computation. By default
+ * returns {@code null}, which is appropriate for {@code Void}
+ * actions, but in other cases should be overridden, almost
+ * always to return a field or function of a field that
+ * holds the result upon completion.
+ *
+ * @return the result of the computation
+ */
+ public T getRawResult() { return null; }
+
+ /**
+ * A method that result-bearing CountedCompleters may optionally
+ * use to help maintain result data. By default, does nothing.
+ * Overrides are not recommended. However, if this method is
+ * overridden to update existing objects or fields, then it must
+ * in general be defined to be thread-safe.
+ */
+ protected void setRawResult(T t) { }
+
+ // Unsafe mechanics
+ private static final sun.misc.Unsafe U;
+ private static final long PENDING;
+ static {
+ try {
+ U = getUnsafe();
+ PENDING = U.objectFieldOffset
+ (CountedCompleter.class.getDeclaredField("pending"));
+ } catch (Exception e) {
+ throw new Error(e);
+ }
+ }
+
+ /**
+ * Returns a sun.misc.Unsafe. Suitable for use in a 3rd party package.
+ * Replace with a simple call to Unsafe.getUnsafe when integrating
+ * into a jdk.
+ *
+ * @return a sun.misc.Unsafe
+ */
+ private static sun.misc.Unsafe getUnsafe() {
+ return scala.concurrent.util.Unsafe.instance;
+ }
+}
/**
* An {@link ExecutorService} for running {@link ForkJoinTask}s.
@@ -41,14 +388,22 @@ import java.util.concurrent.locks.Condition;
* ForkJoinPool}s may also be appropriate for use with event-style
* tasks that are never joined.
*
- * <p>A {@code ForkJoinPool} is constructed with a given target
- * parallelism level; by default, equal to the number of available
- * processors. The pool attempts to maintain enough active (or
- * available) threads by dynamically adding, suspending, or resuming
- * internal worker threads, even if some tasks are stalled waiting to
- * join others. However, no such adjustments are guaranteed in the
- * face of blocked IO or other unmanaged synchronization. The nested
- * {@link ManagedBlocker} interface enables extension of the kinds of
+ * <p>A static {@link #commonPool()} is available and appropriate for
+ * most applications. The common pool is used by any ForkJoinTask that
+ * is not explicitly submitted to a specified pool. Using the common
+ * pool normally reduces resource usage (its threads are slowly
+ * reclaimed during periods of non-use, and reinstated upon subsequent
+ * use).
+ *
+ * <p>For applications that require separate or custom pools, a {@code
+ * ForkJoinPool} may be constructed with a given target parallelism
+ * level; by default, equal to the number of available processors. The
+ * pool attempts to maintain enough active (or available) threads by
+ * dynamically adding, suspending, or resuming internal worker
+ * threads, even if some tasks are stalled waiting to join
+ * others. However, no such adjustments are guaranteed in the face of
+ * blocked I/O or other unmanaged synchronization. The nested {@link
+ * ManagedBlocker} interface enables extension of the kinds of
* synchronization accommodated.
*
* <p>In addition to execution and lifecycle control methods, this
@@ -58,7 +413,7 @@ import java.util.concurrent.locks.Condition;
* {@link #toString} returns indications of pool state in a
* convenient form for informal monitoring.
*
- * <p> As is the case with other ExecutorServices, there are three
+ * <p>As is the case with other ExecutorServices, there are three
* main task execution methods summarized in the following table.
* These are designed to be used primarily by clients not already
* engaged in fork/join computations in the current pool. The main
@@ -93,22 +448,16 @@ import java.util.concurrent.locks.Condition;
* </tr>
* </table>
*
- * <p><b>Sample Usage.</b> Normally a single {@code ForkJoinPool} is
- * used for all parallel task execution in a program or subsystem.
- * Otherwise, use would not usually outweigh the construction and
- * bookkeeping overhead of creating a large set of threads. For
- * example, a common pool could be used for the {@code SortTasks}
- * illustrated in {@link RecursiveAction}. Because {@code
- * ForkJoinPool} uses threads in {@linkplain java.lang.Thread#isDaemon
- * daemon} mode, there is typically no need to explicitly {@link
- * #shutdown} such a pool upon program exit.
- *
- * <pre> {@code
- * static final ForkJoinPool mainPool = new ForkJoinPool();
- * ...
- * public void sort(long[] array) {
- * mainPool.invoke(new SortTask(array, 0, array.length));
- * }}</pre>
+ * <p>The common pool is by default constructed with default
+ * parameters, but these may be controlled by setting three {@link
+ * System#getProperty system properties} with prefix {@code
+ * java.util.concurrent.ForkJoinPool.common}: {@code parallelism} --
+ * an integer greater than zero, {@code threadFactory} -- the class
+ * name of a {@link ForkJoinWorkerThreadFactory}, and {@code
+ * exceptionHandler} -- the class name of a {@link
+ * java.lang.Thread.UncaughtExceptionHandler
+ * Thread.UncaughtExceptionHandler}. Upon any error in establishing
+ * these settings, default parameters are used.
*
* <p><b>Implementation notes</b>: This implementation restricts the
* maximum number of running threads to 32767. Attempts to create
@@ -196,21 +545,24 @@ public class ForkJoinPool extends AbstractExecutorService {
* WorkQueues are also used in a similar way for tasks submitted
* to the pool. We cannot mix these tasks in the same queues used
* for work-stealing (this would contaminate lifo/fifo
- * processing). Instead, we loosely associate submission queues
+ * processing). Instead, we randomly associate submission queues
* with submitting threads, using a form of hashing. The
* ThreadLocal Submitter class contains a value initially used as
* a hash code for choosing existing queues, but may be randomly
* repositioned upon contention with other submitters. In
- * essence, submitters act like workers except that they never
- * take tasks, and they are multiplexed on to a finite number of
- * shared work queues. However, classes are set up so that future
- * extensions could allow submitters to optionally help perform
- * tasks as well. Insertion of tasks in shared mode requires a
- * lock (mainly to protect in the case of resizing) but we use
- * only a simple spinlock (using bits in field runState), because
- * submitters encountering a busy queue move on to try or create
- * other queues -- they block only when creating and registering
- * new queues.
+ * essence, submitters act like workers except that they are
+ * restricted to executing local tasks that they submitted (or in
+ * the case of CountedCompleters, others with the same root task).
+ * However, because most shared/external queue operations are more
+ * expensive than internal, and because, at steady state, external
+ * submitters will compete for CPU with workers, ForkJoinTask.join
+ * and related methods disable them from repeatedly helping to
+ * process tasks if all workers are active. Insertion of tasks in
+ * shared mode requires a lock (mainly to protect in the case of
+ * resizing) but we use only a simple spinlock (using bits in
+ * field qlock), because submitters encountering a busy queue move
+ * on to try or create other queues -- they block only when
+ * creating and registering new queues.
*
* Management
* ==========
@@ -232,23 +584,26 @@ public class ForkJoinPool extends AbstractExecutorService {
* and their negations (used for thresholding) to fit into 16bit
* fields.
*
- * Field "runState" contains 32 bits needed to register and
- * deregister WorkQueues, as well as to enable shutdown. It is
- * only modified under a lock (normally briefly held, but
- * occasionally protecting allocations and resizings) but even
- * when locked remains available to check consistency.
+ * Field "plock" is a form of sequence lock with a saturating
+ * shutdown bit (similarly for per-queue "qlocks"), mainly
+ * protecting updates to the workQueues array, as well as to
+ * enable shutdown. When used as a lock, it is normally only very
+ * briefly held, so is nearly always available after at most a
+ * brief spin, but we use a monitor-based backup strategy to
+ * block when needed.
*
* Recording WorkQueues. WorkQueues are recorded in the
- * "workQueues" array that is created upon pool construction and
- * expanded if necessary. Updates to the array while recording
- * new workers and unrecording terminated ones are protected from
- * each other by a lock but the array is otherwise concurrently
- * readable, and accessed directly. To simplify index-based
- * operations, the array size is always a power of two, and all
- * readers must tolerate null slots. Shared (submission) queues
- * are at even indices, worker queues at odd indices. Grouping
- * them together in this way simplifies and speeds up task
- * scanning.
+ * "workQueues" array that is created upon first use and expanded
+ * if necessary. Updates to the array while recording new workers
+ * and unrecording terminated ones are protected from each other
+ * by a lock but the array is otherwise concurrently readable, and
+ * accessed directly. To simplify index-based operations, the
+ * array size is always a power of two, and all readers must
+ * tolerate null slots. Worker queues are at odd indices. Shared
+ * (submission) queues are at even indices, up to a maximum of 64
+ * slots, to limit growth even if array needs to expand to add
+ * more workers. Grouping them together in this way simplifies and
+ * speeds up task scanning.
*
* All worker thread creation is on-demand, triggered by task
* submissions, replacement of terminated workers, and/or
@@ -309,24 +664,33 @@ public class ForkJoinPool extends AbstractExecutorService {
*
* Signalling. We create or wake up workers only when there
* appears to be at least one task they might be able to find and
- * execute. When a submission is added or another worker adds a
- * task to a queue that previously had fewer than two tasks, they
- * signal waiting workers (or trigger creation of new ones if
- * fewer than the given parallelism level -- see signalWork).
- * These primary signals are buttressed by signals during rescans;
- * together these cover the signals needed in cases when more
- * tasks are pushed but untaken, and improve performance compared
- * to having one thread wake up all workers.
+ * execute. However, many other threads may notice the same task
+ * and each signal to wake up a thread that might take it. So in
+ * general, pools will be over-signalled. When a submission is
+ * added or another worker adds a task to a queue that has fewer
+ * than two tasks, they signal waiting workers (or trigger
+ * creation of new ones if fewer than the given parallelism level
+ * -- signalWork), and may leave a hint to the unparked worker to
+ * help signal others upon wakeup). These primary signals are
+ * buttressed by others (see method helpSignal) whenever other
+ * threads scan for work or do not have a task to process. On
+ * most platforms, signalling (unpark) overhead time is noticeably
+ * long, and the time between signalling a thread and it actually
+ * making progress can be very noticeably long, so it is worth
+ * offloading these delays from critical paths as much as
+ * possible.
*
* Trimming workers. To release resources after periods of lack of
* use, a worker starting to wait when the pool is quiescent will
- * time out and terminate if the pool has remained quiescent for
- * SHRINK_RATE nanosecs. This will slowly propagate, eventually
- * terminating all workers after long periods of non-use.
+ * time out and terminate if the pool has remained quiescent for a
+ * given period -- a short period if there are more threads than
+ * parallelism, longer as the number of threads decreases. This
+ * will slowly propagate, eventually terminating all workers after
+ * periods of non-use.
*
* Shutdown and Termination. A call to shutdownNow atomically sets
- * a runState bit and then (non-atomically) sets each worker's
- * runState status, cancels all unprocessed tasks, and wakes up
+ * a plock bit and then (non-atomically) sets each worker's
+ * qlock status, cancels all unprocessed tasks, and wakes up
* all waiting workers. Detecting whether termination should
* commence after a non-abrupt shutdown() call requires more work
* and bookkeeping. We need consensus about quiescence (i.e., that
@@ -354,13 +718,13 @@ public class ForkJoinPool extends AbstractExecutorService {
* method tryCompensate() may create or re-activate a spare
* thread to compensate for blocked joiners until they unblock.
*
- * A third form (implemented in tryRemoveAndExec and
- * tryPollForAndExec) amounts to helping a hypothetical
- * compensator: If we can readily tell that a possible action of a
- * compensator is to steal and execute the task being joined, the
- * joining thread can do so directly, without the need for a
- * compensation thread (although at the expense of larger run-time
- * stacks, but the tradeoff is typically worthwhile).
+ * A third form (implemented in tryRemoveAndExec) amounts to
+ * helping a hypothetical compensator: If we can readily tell that
+ * a possible action of a compensator is to steal and execute the
+ * task being joined, the joining thread can do so directly,
+ * without the need for a compensation thread (although at the
+ * expense of larger run-time stacks, but the tradeoff is
+ * typically worthwhile).
*
* The ManagedBlocker extension API can't use helping so relies
* only on compensation in method awaitBlocker.
@@ -382,12 +746,12 @@ public class ForkJoinPool extends AbstractExecutorService {
* steals, rather than use per-task bookkeeping. This sometimes
* requires a linear scan of workQueues array to locate stealers,
* but often doesn't because stealers leave hints (that may become
- * stale/wrong) of where to locate them. A stealHint is only a
- * hint because a worker might have had multiple steals and the
- * hint records only one of them (usually the most current).
- * Hinting isolates cost to when it is needed, rather than adding
- * to per-task overhead. (2) It is "shallow", ignoring nesting
- * and potentially cyclic mutual steals. (3) It is intentionally
+ * stale/wrong) of where to locate them. It is only a hint
+ * because a worker might have had multiple steals and the hint
+ * records only one of them (usually the most current). Hinting
+ * isolates cost to when it is needed, rather than adding to
+ * per-task overhead. (2) It is "shallow", ignoring nesting and
+ * potentially cyclic mutual steals. (3) It is intentionally
* racy: field currentJoin is updated only while actively joining,
* which means that we miss links in the chain during long-lived
* tasks, GC stalls etc (which is OK since blocking in such cases
@@ -395,6 +759,12 @@ public class ForkJoinPool extends AbstractExecutorService {
* to find work (see MAX_HELP) and fall back to suspending the
* worker and if necessary replacing it with another.
*
+ * Helping actions for CountedCompleters are much simpler: Method
+ * helpComplete can take and execute any task with the same root
+ * as the task being waited on. However, this still entails some
+ * traversal of completer chains, so is less efficient than using
+ * CountedCompleters without explicit joins.
+ *
* It is impossible to keep exactly the target parallelism number
* of threads running at any given time. Determining the
* existence of conservatively safe helping targets, the
@@ -416,30 +786,41 @@ public class ForkJoinPool extends AbstractExecutorService {
* intractable) game with an opponent that may choose the worst
* (for us) active thread to stall at any time. We take several
* precautions to bound losses (and thus bound gains), mainly in
- * methods tryCompensate and awaitJoin: (1) We only try
- * compensation after attempting enough helping steps (measured
- * via counting and timing) that we have already consumed the
- * estimated cost of creating and activating a new thread. (2) We
- * allow up to 50% of threads to be blocked before initially
- * adding any others, and unless completely saturated, check that
- * some work is available for a new worker before adding. Also, we
- * create up to only 50% more threads until entering a mode that
- * only adds a thread if all others are possibly blocked. All
- * together, this means that we might be half as fast to react,
- * and create half as many threads as possible in the ideal case,
- * but present vastly fewer anomalies in all other cases compared
- * to both more aggressive and more conservative alternatives.
- *
- * Style notes: There is a lot of representation-level coupling
- * among classes ForkJoinPool, ForkJoinWorkerThread, and
- * ForkJoinTask. The fields of WorkQueue maintain data structures
- * managed by ForkJoinPool, so are directly accessed. There is
- * little point trying to reduce this, since any associated future
- * changes in representations will need to be accompanied by
- * algorithmic changes anyway. Several methods intrinsically
- * sprawl because they must accumulate sets of consistent reads of
- * volatiles held in local variables. Methods signalWork() and
- * scan() are the main bottlenecks, so are especially heavily
+ * methods tryCompensate and awaitJoin.
+ *
+ * Common Pool
+ * ===========
+ *
+ * The static common Pool always exists after static
+ * initialization. Since it (or any other created pool) need
+ * never be used, we minimize initial construction overhead and
+ * footprint to the setup of about a dozen fields, with no nested
+ * allocation. Most bootstrapping occurs within method
+ * fullExternalPush during the first submission to the pool.
+ *
+ * When external threads submit to the common pool, they can
+ * perform some subtask processing (see externalHelpJoin and
+ * related methods). We do not need to record whether these
+ * submissions are to the common pool -- if not, externalHelpJoin
+ * returns quickly (at the most helping to signal some common pool
+ * workers). These submitters would otherwise be blocked waiting
+ * for completion, so the extra effort (with liberally sprinkled
+ * task status checks) in inapplicable cases amounts to an odd
+ * form of limited spin-wait before blocking in ForkJoinTask.join.
+ *
+ * Style notes
+ * ===========
+ *
+ * There is a lot of representation-level coupling among classes
+ * ForkJoinPool, ForkJoinWorkerThread, and ForkJoinTask. The
+ * fields of WorkQueue maintain data structures managed by
+ * ForkJoinPool, so are directly accessed. There is little point
+ * trying to reduce this, since any associated future changes in
+ * representations will need to be accompanied by algorithmic
+ * changes anyway. Several methods intrinsically sprawl because
+ * they must accumulate sets of consistent reads of volatiles held
+ * in local variables. Methods signalWork() and scan() are the
+ * main bottlenecks, so are especially heavily
* micro-optimized/mangled. There are lots of inline assignments
* (of form "while ((local = field) != 0)") which are usually the
* simplest way to ensure the required read orderings (which are
@@ -447,7 +828,8 @@ public class ForkJoinPool extends AbstractExecutorService {
* declarations of these locals at the heads of methods or blocks.
* There are several occurrences of the unusual "do {} while
* (!cas...)" which is the simplest way to force an update of a
- * CAS'ed variable. There are also other coding oddities that help
+ * CAS'ed variable. There are also other coding oddities (including
+ * several unnecessary-looking hoisted null checks) that help
* some methods perform reasonably even when interpreted (not
* compiled).
*
@@ -496,34 +878,31 @@ public class ForkJoinPool extends AbstractExecutorService {
* Default ForkJoinWorkerThreadFactory implementation; creates a
* new ForkJoinWorkerThread.
*/
- static class DefaultForkJoinWorkerThreadFactory
+ static final class DefaultForkJoinWorkerThreadFactory
implements ForkJoinWorkerThreadFactory {
- public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
+ public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
return new ForkJoinWorkerThread(pool);
}
}
/**
- * A simple non-reentrant lock used for exclusion when managing
- * queues and workers. We use a custom lock so that we can readily
- * probe lock state in constructions that check among alternative
- * actions. The lock is normally only very briefly held, and
- * sometimes treated as a spinlock, but other usages block to
- * reduce overall contention in those cases where locked code
- * bodies perform allocation/resizing.
+ * Per-thread records for threads that submit to pools. Currently
+ * holds only pseudo-random seed / index that is used to choose
+ * submission queues in method externalPush. In the future, this may
+ * also incorporate a means to implement different task rejection
+ * and resubmission policies.
+ *
+ * Seeds for submitters and workers/workQueues work in basically
+ * the same way but are initialized and updated using slightly
+ * different mechanics. Both are initialized using the same
+ * approach as in class ThreadLocal, where successive values are
+ * unlikely to collide with previous values. Seeds are then
+ * randomly modified upon collisions using xorshifts, which
+ * requires a non-zero seed.
*/
- static final class Mutex extends AbstractQueuedSynchronizer {
- public final boolean tryAcquire(int ignore) {
- return compareAndSetState(0, 1);
- }
- public final boolean tryRelease(int ignore) {
- setState(0);
- return true;
- }
- public final void lock() { acquire(0); }
- public final void unlock() { release(0); }
- public final boolean isHeldExclusively() { return getState() == 1; }
- public final Condition newCondition() { return new ConditionObject(); }
+ static final class Submitter {
+ int seed;
+ Submitter(int s) { seed = s; }
}
/**
@@ -533,6 +912,7 @@ public class ForkJoinPool extends AbstractExecutorService {
* actually do anything beyond having a unique identity.
*/
static final class EmptyTask extends ForkJoinTask<Void> {
+ private static final long serialVersionUID = -7721805057305804111L;
EmptyTask() { status = ForkJoinTask.NORMAL; } // force done
public final Void getRawResult() { return null; }
public final void setRawResult(Void x) {}
@@ -553,27 +933,31 @@ public class ForkJoinPool extends AbstractExecutorService {
*
* Field "top" is the index (mod array.length) of the next queue
* slot to push to or pop from. It is written only by owner thread
- * for push, or under lock for trySharedPush, and accessed by
- * other threads only after reading (volatile) base. Both top and
- * base are allowed to wrap around on overflow, but (top - base)
- * (or more commonly -(base - top) to force volatile read of base
- * before top) still estimates size.
+ * for push, or under lock for external/shared push, and accessed
+ * by other threads only after reading (volatile) base. Both top
+ * and base are allowed to wrap around on overflow, but (top -
+ * base) (or more commonly -(base - top) to force volatile read of
+ * base before top) still estimates size. The lock ("qlock") is
+ * forced to -1 on termination, causing all further lock attempts
+ * to fail. (Note: we don't need CAS for termination state because
+ * upon pool shutdown, all shared-queues will stop being used
+ * anyway.) Nearly all lock bodies are set up so that exceptions
+ * within lock bodies are "impossible" (modulo JVM errors that
+ * would cause failure anyway.)
*
* The array slots are read and written using the emulation of
* volatiles/atomics provided by Unsafe. Insertions must in
* general use putOrderedObject as a form of releasing store to
* ensure that all writes to the task object are ordered before
- * its publication in the queue. (Although we can avoid one case
- * of this when locked in trySharedPush.) All removals entail a
- * CAS to null. The array is always a power of two. To ensure
- * safety of Unsafe array operations, all accesses perform
- * explicit null checks and implicit bounds checks via
- * power-of-two masking.
+ * its publication in the queue. All removals entail a CAS to
+ * null. The array is always a power of two. To ensure safety of
+ * Unsafe array operations, all accesses perform explicit null
+ * checks and implicit bounds checks via power-of-two masking.
*
* In addition to basic queuing support, this class contains
* fields described elsewhere to control execution. It turns out
- * to work better memory-layout-wise to include them in this
- * class rather than a separate class.
+ * to work better memory-layout-wise to include them in this class
+ * rather than a separate class.
*
* Performance on most platforms is very sensitive to placement of
* instances of both WorkQueues and their arrays -- we absolutely
@@ -587,10 +971,10 @@ public class ForkJoinPool extends AbstractExecutorService {
* trades off slightly slower average field access for the sake of
* avoiding really bad worst-case access. (Until better JVM
* support is in place, this padding is dependent on transient
- * properties of JVM field layout rules.) We also take care in
+ * properties of JVM field layout rules.) We also take care in
* allocating, sizing and resizing the array. Non-shared queue
- * arrays are initialized (via method growArray) by workers before
- * use. Others are allocated on first use.
+ * arrays are initialized by workers before use. Others are
+ * allocated on first use.
*/
static final class WorkQueue {
/**
@@ -613,16 +997,17 @@ public class ForkJoinPool extends AbstractExecutorService {
*/
static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M
- volatile long totalSteals; // cumulative number of steals
+ // Heuristic padding to ameliorate unfortunate memory placements
+ volatile long pad00, pad01, pad02, pad03, pad04, pad05, pad06;
+
int seed; // for random scanning; initialize nonzero
volatile int eventCount; // encoded inactivation count; < 0 if inactive
int nextWait; // encoded record of next event waiter
- int rescans; // remaining scans until block
- int nsteals; // top-level task executions since last idle
- final int mode; // lifo, fifo, or shared
+ int hint; // steal or signal hint (index)
int poolIndex; // index of this queue in pool (or 0)
- int stealHint; // index of most recent known stealer
- volatile int runState; // 1: locked, -1: terminate; else 0
+ final int mode; // 0: lifo, > 0: fifo, < 0: shared
+ int nsteals; // number of steals
+ volatile int qlock; // 1: locked, -1: terminate; else 0
volatile int base; // index of next slot for poll
int top; // index of next slot for push
ForkJoinTask<?>[] array; // the elements (initially unallocated)
@@ -631,14 +1016,16 @@ public class ForkJoinPool extends AbstractExecutorService {
volatile Thread parker; // == owner during call to park; else null
volatile ForkJoinTask<?> currentJoin; // task being joined in awaitJoin
ForkJoinTask<?> currentSteal; // current non-local task being executed
- // Heuristic padding to ameliorate unfortunate memory placements
- Object p00, p01, p02, p03, p04, p05, p06, p07;
- Object p08, p09, p0a, p0b, p0c, p0d, p0e;
- WorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner, int mode) {
- this.mode = mode;
+ volatile Object pad10, pad11, pad12, pad13, pad14, pad15, pad16, pad17;
+ volatile Object pad18, pad19, pad1a, pad1b, pad1c, pad1d;
+
+ WorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner, int mode,
+ int seed) {
this.pool = pool;
this.owner = owner;
+ this.mode = mode;
+ this.seed = seed;
// Place indices in the center of array (that is not yet allocated)
base = top = INITIAL_QUEUE_CAPACITY >>> 1;
}
@@ -651,7 +1038,7 @@ public class ForkJoinPool extends AbstractExecutorService {
return (n >= 0) ? 0 : -n; // ignore transient negative
}
- /**
+ /**
* Provides a more accurate estimate of whether this queue has
* any tasks than does queueSize, by checking whether a
* near-empty queue has at least one unclaimed task.
@@ -663,62 +1050,63 @@ public class ForkJoinPool extends AbstractExecutorService {
(n == -1 &&
((a = array) == null ||
(m = a.length - 1) < 0 ||
- U.getObjectVolatile
- (a, ((m & (s - 1)) << ASHIFT) + ABASE) == null)));
+ U.getObject
+ (a, (long)((m & (s - 1)) << ASHIFT) + ABASE) == null)));
}
/**
- * Pushes a task. Call only by owner in unshared queues.
+ * Pushes a task. Call only by owner in unshared queues. (The
+ * shared-queue version is embedded in method externalPush.)
*
* @param task the task. Caller must ensure non-null.
- * @throw RejectedExecutionException if array cannot be resized
+ * @throws RejectedExecutionException if array cannot be resized
*/
final void push(ForkJoinTask<?> task) {
ForkJoinTask<?>[] a; ForkJoinPool p;
int s = top, m, n;
if ((a = array) != null) { // ignore if queue removed
- U.putOrderedObject
- (a, (((m = a.length - 1) & s) << ASHIFT) + ABASE, task);
+ int j = (((m = a.length - 1) & s) << ASHIFT) + ABASE;
+ U.putOrderedObject(a, j, task);
if ((n = (top = s + 1) - base) <= 2) {
if ((p = pool) != null)
- p.signalWork();
+ p.signalWork(this);
}
else if (n >= m)
- growArray(true);
+ growArray();
}
}
- /**
- * Pushes a task if lock is free and array is either big
- * enough or can be resized to be big enough.
- *
- * @param task the task. Caller must ensure non-null.
- * @return true if submitted
+ /**
+ * Initializes or doubles the capacity of array. Call either
+ * by owner or with lock held -- it is OK for base, but not
+ * top, to move while resizings are in progress.
*/
- final boolean trySharedPush(ForkJoinTask<?> task) {
- boolean submitted = false;
- if (runState == 0 && U.compareAndSwapInt(this, RUNSTATE, 0, 1)) {
- ForkJoinTask<?>[] a = array;
- int s = top;
- try {
- if ((a != null && a.length > s + 1 - base) ||
- (a = growArray(false)) != null) { // must presize
- int j = (((a.length - 1) & s) << ASHIFT) + ABASE;
- U.putObject(a, (long)j, task); // don't need "ordered"
- top = s + 1;
- submitted = true;
- }
- } finally {
- runState = 0; // unlock
- }
+ final ForkJoinTask<?>[] growArray() {
+ ForkJoinTask<?>[] oldA = array;
+ int size = oldA != null ? oldA.length << 1 : INITIAL_QUEUE_CAPACITY;
+ if (size > MAXIMUM_QUEUE_CAPACITY)
+ throw new RejectedExecutionException("Queue capacity exceeded");
+ int oldMask, t, b;
+ ForkJoinTask<?>[] a = array = new ForkJoinTask<?>[size];
+ if (oldA != null && (oldMask = oldA.length - 1) >= 0 &&
+ (t = top) - (b = base) > 0) {
+ int mask = size - 1;
+ do {
+ ForkJoinTask<?> x;
+ int oldj = ((b & oldMask) << ASHIFT) + ABASE;
+ int j = ((b & mask) << ASHIFT) + ABASE;
+ x = (ForkJoinTask<?>)U.getObjectVolatile(oldA, oldj);
+ if (x != null &&
+ U.compareAndSwapObject(oldA, oldj, x, null))
+ U.putObjectVolatile(a, j, x);
+ } while (++b != t);
}
- return submitted;
+ return a;
}
/**
* Takes next task, if one exists, in LIFO order. Call only
- * by owner in unshared queues. (We do not have a shared
- * version of this method because it is never needed.)
+ * by owner in unshared queues.
*/
final ForkJoinTask<?> pop() {
ForkJoinTask<?>[] a; ForkJoinTask<?> t; int m;
@@ -773,7 +1161,7 @@ public class ForkJoinPool extends AbstractExecutorService {
else if (base == b) {
if (b + 1 == top)
break;
- Thread.yield(); // wait for lagging update
+ Thread.yield(); // wait for lagging update (very rare)
}
}
return null;
@@ -800,6 +1188,7 @@ public class ForkJoinPool extends AbstractExecutorService {
/**
* Pops the given task only if it is at the current top.
+ * (A shared version is available only via FJP.tryExternalUnpush)
*/
final boolean tryUnpush(ForkJoinTask<?> t) {
ForkJoinTask<?>[] a; int s;
@@ -813,57 +1202,6 @@ public class ForkJoinPool extends AbstractExecutorService {
}
/**
- * Polls the given task only if it is at the current base.
- */
- final boolean pollFor(ForkJoinTask<?> task) {
- ForkJoinTask<?>[] a; int b;
- if ((b = base) - top < 0 && (a = array) != null) {
- int j = (((a.length - 1) & b) << ASHIFT) + ABASE;
- if (U.getObjectVolatile(a, j) == task && base == b &&
- U.compareAndSwapObject(a, j, task, null)) {
- base = b + 1;
- return true;
- }
- }
- return false;
- }
-
- /**
- * Initializes or doubles the capacity of array. Call either
- * by owner or with lock held -- it is OK for base, but not
- * top, to move while resizings are in progress.
- *
- * @param rejectOnFailure if true, throw exception if capacity
- * exceeded (relayed ultimately to user); else return null.
- */
- final ForkJoinTask<?>[] growArray(boolean rejectOnFailure) {
- ForkJoinTask<?>[] oldA = array;
- int size = oldA != null ? oldA.length << 1 : INITIAL_QUEUE_CAPACITY;
- if (size <= MAXIMUM_QUEUE_CAPACITY) {
- int oldMask, t, b;
- ForkJoinTask<?>[] a = array = new ForkJoinTask<?>[size];
- if (oldA != null && (oldMask = oldA.length - 1) >= 0 &&
- (t = top) - (b = base) > 0) {
- int mask = size - 1;
- do {
- ForkJoinTask<?> x;
- int oldj = ((b & oldMask) << ASHIFT) + ABASE;
- int j = ((b & mask) << ASHIFT) + ABASE;
- x = (ForkJoinTask<?>)U.getObjectVolatile(oldA, oldj);
- if (x != null &&
- U.compareAndSwapObject(oldA, oldj, x, null))
- U.putObjectVolatile(a, j, x);
- } while (++b != t);
- }
- return a;
- }
- else if (!rejectOnFailure)
- return null;
- else
- throw new RejectedExecutionException("Queue capacity exceeded");
- }
-
- /**
* Removes and cancels all known tasks, ignoring any exceptions.
*/
final void cancelAll() {
@@ -887,7 +1225,7 @@ public class ForkJoinPool extends AbstractExecutorService {
return seed = r ^= r << 5;
}
- // Execution methods
+ // Specialized execution methods
/**
* Pops and runs tasks until empty.
@@ -916,16 +1254,14 @@ public class ForkJoinPool extends AbstractExecutorService {
}
/**
- * If present, removes from queue and executes the given task, or
- * any other cancelled task. Returns (true) immediately on any CAS
+ * If present, removes from queue and executes the given task,
+ * or any other cancelled task. Returns (true) on any CAS
* or consistency check failure so caller can retry.
*
- * @return 0 if no progress can be made, else positive
- * (this unusual convention simplifies use with tryHelpStealer.)
+ * @return false if no progress can be made, else true
*/
- final int tryRemoveAndExec(ForkJoinTask<?> task) {
- int stat = 1;
- boolean removed = false, empty = true;
+ final boolean tryRemoveAndExec(ForkJoinTask<?> task) {
+ boolean stat = true, removed = false, empty = true;
ForkJoinTask<?>[] a; int m, s, b, n;
if ((a = array) != null && (m = a.length - 1) >= 0 &&
(n = (s = top) - (b = base)) > 0) {
@@ -955,7 +1291,7 @@ public class ForkJoinPool extends AbstractExecutorService {
}
if (--n == 0) {
if (!empty && base == b)
- stat = 0;
+ stat = false;
break;
}
}
@@ -966,21 +1302,49 @@ public class ForkJoinPool extends AbstractExecutorService {
}
/**
+ * Polls for and executes the given task or any other task in
+ * its CountedCompleter computation.
+ */
+ final boolean pollAndExecCC(ForkJoinTask<?> root) {
+ ForkJoinTask<?>[] a; int b; Object o;
+ outer: while ((b = base) - top < 0 && (a = array) != null) {
+ long j = (((a.length - 1) & b) << ASHIFT) + ABASE;
+ if ((o = U.getObject(a, j)) == null ||
+ !(o instanceof CountedCompleter))
+ break;
+ for (CountedCompleter<?> t = (CountedCompleter<?>)o, r = t;;) {
+ if (r == root) {
+ if (base == b &&
+ U.compareAndSwapObject(a, j, t, null)) {
+ base = b + 1;
+ t.doExec();
+ return true;
+ }
+ else
+ break; // restart
+ }
+ if ((r = r.completer) == null)
+ break outer; // not part of root computation
+ }
+ }
+ return false;
+ }
+
+ /**
* Executes a top-level task and any local tasks remaining
* after execution.
*/
final void runTask(ForkJoinTask<?> t) {
if (t != null) {
- currentSteal = t;
- t.doExec();
- if (top != base) { // process remaining local tasks
+ (currentSteal = t).doExec();
+ currentSteal = null;
+ ++nsteals;
+ if (base - top < 0) { // process remaining local tasks
if (mode == 0)
popAndExecAll();
else
pollAndExecAll();
}
- ++nsteals;
- currentSteal = null;
}
}
@@ -990,8 +1354,7 @@ public class ForkJoinPool extends AbstractExecutorService {
final void runSubtask(ForkJoinTask<?> t) {
if (t != null) {
ForkJoinTask<?> ps = currentSteal;
- currentSteal = t;
- t.doExec();
+ (currentSteal = t).doExec();
currentSteal = ps;
}
}
@@ -1008,74 +1371,29 @@ public class ForkJoinPool extends AbstractExecutorService {
s != Thread.State.TIMED_WAITING);
}
- /**
- * If this owned and is not already interrupted, try to
- * interrupt and/or unpark, ignoring exceptions.
- */
- final void interruptOwner() {
- Thread wt, p;
- if ((wt = owner) != null && !wt.isInterrupted()) {
- try {
- wt.interrupt();
- } catch (SecurityException ignore) {
- }
- }
- if ((p = parker) != null)
- U.unpark(p);
- }
-
// Unsafe mechanics
private static final sun.misc.Unsafe U;
- private static final long RUNSTATE;
+ private static final long QLOCK;
private static final int ABASE;
private static final int ASHIFT;
static {
- int s;
try {
U = getUnsafe();
Class<?> k = WorkQueue.class;
Class<?> ak = ForkJoinTask[].class;
- RUNSTATE = U.objectFieldOffset
- (k.getDeclaredField("runState"));
+ QLOCK = U.objectFieldOffset
+ (k.getDeclaredField("qlock"));
ABASE = U.arrayBaseOffset(ak);
- s = U.arrayIndexScale(ak);
+ int scale = U.arrayIndexScale(ak);
+ if ((scale & (scale - 1)) != 0)
+ throw new Error("data type scale not a power of two");
+ ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
} catch (Exception e) {
throw new Error(e);
}
- if ((s & (s-1)) != 0)
- throw new Error("data type scale not a power of two");
- ASHIFT = 31 - Integer.numberOfLeadingZeros(s);
- }
- }
- /**
- * Per-thread records for threads that submit to pools. Currently
- * holds only pseudo-random seed / index that is used to choose
- * submission queues in method doSubmit. In the future, this may
- * also incorporate a means to implement different task rejection
- * and resubmission policies.
- *
- * Seeds for submitters and workers/workQueues work in basically
- * the same way but are initialized and updated using slightly
- * different mechanics. Both are initialized using the same
- * approach as in class ThreadLocal, where successive values are
- * unlikely to collide with previous values. This is done during
- * registration for workers, but requires a separate AtomicInteger
- * for submitters. Seeds are then randomly modified upon
- * collisions using xorshifts, which requires a non-zero seed.
- */
- static final class Submitter {
- int seed;
- Submitter() {
- int s = nextSubmitterSeed.getAndAdd(SEED_INCREMENT);
- seed = (s == 0) ? 1 : s; // ensure non-zero
}
}
- /** ThreadLocal class for Submitters */
- static final class ThreadSubmitter extends ThreadLocal<Submitter> {
- public Submitter initialValue() { return new Submitter(); }
- }
-
// static fields (initialized in static initializer below)
/**
@@ -1086,15 +1404,13 @@ public class ForkJoinPool extends AbstractExecutorService {
defaultForkJoinWorkerThreadFactory;
/**
- * Generator for assigning sequence numbers as pool names.
- */
- private static final AtomicInteger poolNumberGenerator;
-
- /**
- * Generator for initial hashes/seeds for submitters. Accessed by
- * Submitter class constructor.
+ * Per-thread submission bookkeeping. Shared across all pools
+ * to reduce ThreadLocal pollution and because random motion
+ * to avoid contention in one pool is likely to hold for others.
+ * Lazily initialized on first submission (but null-checked
+ * in other contexts to avoid unnecessary initialization).
*/
- static final AtomicInteger nextSubmitterSeed;
+ static final ThreadLocal<Submitter> submitters;
/**
* Permission required for callers of methods that may start or
@@ -1103,37 +1419,56 @@ public class ForkJoinPool extends AbstractExecutorService {
private static final RuntimePermission modifyThreadPermission;
/**
- * Per-thread submission bookeeping. Shared across all pools
- * to reduce ThreadLocal pollution and because random motion
- * to avoid contention in one pool is likely to hold for others.
+ * Common (static) pool. Non-null for public use unless a static
+ * construction exception, but internal usages null-check on use
+ * to paranoically avoid potential initialization circularities
+ * as well as to simplify generated code.
*/
- private static final ThreadSubmitter submitters;
+ static final ForkJoinPool common;
+
+ /**
+ * Common pool parallelism. Must equal common.parallelism.
+ */
+ static final int commonParallelism;
+
+ /**
+ * Sequence number for creating workerNamePrefix.
+ */
+ private static int poolNumberSequence;
+
+ /**
+ * Returns the next sequence number. We don't expect this to
+ * ever contend, so use simple builtin sync.
+ */
+ private static final synchronized int nextPoolId() {
+ return ++poolNumberSequence;
+ }
// static constants
/**
- * The wakeup interval (in nanoseconds) for a worker waiting for a
- * task when the pool is quiescent to instead try to shrink the
- * number of workers. The exact value does not matter too
- * much. It must be short enough to release resources during
- * sustained periods of idleness, but not so short that threads
- * are continually re-created.
+ * Initial timeout value (in nanoseconds) for the thread
+ * triggering quiescence to park waiting for new work. On timeout,
+ * the thread will instead try to shrink the number of
+ * workers. The value should be large enough to avoid overly
+ * aggressive shrinkage during most transient stalls (long GCs
+ * etc).
+ */
+ private static final long IDLE_TIMEOUT = 2000L * 1000L * 1000L; // 2sec
+
+ /**
+ * Timeout value when there are more threads than parallelism level
*/
- private static final long SHRINK_RATE =
- 4L * 1000L * 1000L * 1000L; // 4 seconds
+ private static final long FAST_IDLE_TIMEOUT = 200L * 1000L * 1000L;
/**
- * The timeout value for attempted shrinkage, includes
- * some slop to cope with system timer imprecision.
+ * Tolerance for idle timeouts, to cope with timer undershoots
*/
- private static final long SHRINK_TIMEOUT = SHRINK_RATE - (SHRINK_RATE / 10);
+ private static final long TIMEOUT_SLOP = 2000000L;
/**
* The maximum stolen->joining link depth allowed in method
- * tryHelpStealer. Must be a power of two. This value also
- * controls the maximum number of times to try to help join a task
- * without any apparent progress or change in pool state before
- * giving up and blocking (see awaitJoin). Depths for legitimate
+ * tryHelpStealer. Must be a power of two. Depths for legitimate
* chains are unbounded, but we use a fixed constant to avoid
* (otherwise unchecked) cycles and to bound staleness of
* traversal parameters at the expense of sometimes blocking when
@@ -1142,22 +1477,12 @@ public class ForkJoinPool extends AbstractExecutorService {
private static final int MAX_HELP = 64;
/**
- * Secondary time-based bound (in nanosecs) for helping attempts
- * before trying compensated blocking in awaitJoin. Used in
- * conjunction with MAX_HELP to reduce variance due to different
- * polling rates associated with different helping options. The
- * value should roughly approximate the time required to create
- * and/or activate a worker thread.
- */
- private static final long COMPENSATION_DELAY = 1L << 18; // ~0.25 millisec
-
- /**
* Increment for seed generators. See class ThreadLocal for
* explanation.
*/
private static final int SEED_INCREMENT = 0x61c88647;
- /**
+ /*
* Bits and masks for control variables
*
* Field ctl is a long packed with:
@@ -1185,14 +1510,14 @@ public class ForkJoinPool extends AbstractExecutorService {
* scan for them to avoid queuing races. Note however that
* eventCount updates lag releases so usage requires care.
*
- * Field runState is an int packed with:
+ * Field plock is an int packed with:
* SHUTDOWN: true if shutdown is enabled (1 bit)
- * SEQ: a sequence number updated upon (de)registering workers (30 bits)
- * INIT: set true after workQueues array construction (1 bit)
+ * SEQ: a sequence lock, with PL_LOCK bit set if locked (30 bits)
+ * SIGNAL: set when threads may be waiting on the lock (1 bit)
*
* The sequence number enables simple consistency checks:
* Staleness of read-only operations on the workQueues array can
- * be checked by comparing runState before vs after the reads.
+ * be checked by comparing plock before vs after the reads.
*/
// bit positions/shifts for fields
@@ -1204,7 +1529,8 @@ public class ForkJoinPool extends AbstractExecutorService {
// bounds
private static final int SMASK = 0xffff; // short bits
private static final int MAX_CAP = 0x7fff; // max #workers - 1
- private static final int SQMASK = 0xfffe; // even short bits
+ private static final int EVENMASK = 0xfffe; // even short bits
+ private static final int SQMASK = 0x007e; // max 64 (even) slots
private static final int SHORT_SIGN = 1 << 15;
private static final int INT_SIGN = 1 << 31;
@@ -1229,91 +1555,164 @@ public class ForkJoinPool extends AbstractExecutorService {
private static final int E_MASK = 0x7fffffff; // no STOP_BIT
private static final int E_SEQ = 1 << EC_SHIFT;
- // runState bits
+ // plock bits
private static final int SHUTDOWN = 1 << 31;
+ private static final int PL_LOCK = 2;
+ private static final int PL_SIGNAL = 1;
+ private static final int PL_SPINS = 1 << 8;
// access mode for WorkQueue
static final int LIFO_QUEUE = 0;
static final int FIFO_QUEUE = 1;
static final int SHARED_QUEUE = -1;
+ // bounds for #steps in scan loop -- must be power 2 minus 1
+ private static final int MIN_SCAN = 0x1ff; // cover estimation slop
+ private static final int MAX_SCAN = 0x1ffff; // 4 * max workers
+
// Instance fields
/*
- * Field layout order in this class tends to matter more than one
- * would like. Runtime layout order is only loosely related to
+ * Field layout of this class tends to matter more than one would
+ * like. Runtime layout order is only loosely related to
* declaration order and may differ across JVMs, but the following
* empirically works OK on current JVMs.
*/
+ // Heuristic padding to ameliorate unfortunate memory placements
+ volatile long pad00, pad01, pad02, pad03, pad04, pad05, pad06;
+
+ volatile long stealCount; // collects worker counts
volatile long ctl; // main pool control
- final int parallelism; // parallelism level
- final int localMode; // per-worker scheduling mode
- final int submitMask; // submit queue index bound
- int nextSeed; // for initializing worker seeds
- volatile int runState; // shutdown status and seq
+ volatile int plock; // shutdown status and seqLock
+ volatile int indexSeed; // worker/submitter index seed
+ final int config; // mode and parallelism level
WorkQueue[] workQueues; // main registry
- final Mutex lock; // for registration
- final Condition termination; // for awaitTermination
- final ForkJoinWorkerThreadFactory factory; // factory for new workers
+ final ForkJoinWorkerThreadFactory factory;
final Thread.UncaughtExceptionHandler ueh; // per-worker UEH
- final AtomicLong stealCount; // collect counts when terminated
- final AtomicInteger nextWorkerNumber; // to create worker name string
final String workerNamePrefix; // to create worker name string
- // Creating, registering, and deregistering workers
+ volatile Object pad10, pad11, pad12, pad13, pad14, pad15, pad16, pad17;
+ volatile Object pad18, pad19, pad1a, pad1b;
+
+ /**
+ * Acquires the plock lock to protect worker array and related
+ * updates. This method is called only if an initial CAS on plock
+ * fails. This acts as a spinlock for normal cases, but falls back
+ * to builtin monitor to block when (rarely) needed. This would be
+ * a terrible idea for a highly contended lock, but works fine as
+ * a more conservative alternative to a pure spinlock.
+ */
+ private int acquirePlock() {
+ int spins = PL_SPINS, r = 0, ps, nps;
+ for (;;) {
+ if (((ps = plock) & PL_LOCK) == 0 &&
+ U.compareAndSwapInt(this, PLOCK, ps, nps = ps + PL_LOCK))
+ return nps;
+ else if (r == 0) { // randomize spins if possible
+ Thread t = Thread.currentThread(); WorkQueue w; Submitter z;
+ if ((t instanceof ForkJoinWorkerThread) &&
+ (w = ((ForkJoinWorkerThread)t).workQueue) != null)
+ r = w.seed;
+ else if ((z = submitters.get()) != null)
+ r = z.seed;
+ else
+ r = 1;
+ }
+ else if (spins >= 0) {
+ r ^= r << 1; r ^= r >>> 3; r ^= r << 10; // xorshift
+ if (r >= 0)
+ --spins;
+ }
+ else if (U.compareAndSwapInt(this, PLOCK, ps, ps | PL_SIGNAL)) {
+ synchronized (this) {
+ if ((plock & PL_SIGNAL) != 0) {
+ try {
+ wait();
+ } catch (InterruptedException ie) {
+ try {
+ Thread.currentThread().interrupt();
+ } catch (SecurityException ignore) {
+ }
+ }
+ }
+ else
+ notifyAll();
+ }
+ }
+ }
+ }
/**
- * Tries to create and start a worker
+ * Unlocks and signals any thread waiting for plock. Called only
+ * when CAS of seq value for unlock fails.
*/
- private void addWorker() {
- Throwable ex = null;
- ForkJoinWorkerThread wt = null;
- try {
- if ((wt = factory.newThread(this)) != null) {
- wt.start();
- return;
- }
- } catch (Throwable e) {
- ex = e;
- }
- deregisterWorker(wt, ex); // adjust counts etc on failure
+ private void releasePlock(int ps) {
+ plock = ps;
+ synchronized (this) { notifyAll(); }
}
/**
- * Callback from ForkJoinWorkerThread constructor to assign a
- * public name. This must be separate from registerWorker because
- * it is called during the "super" constructor call in
- * ForkJoinWorkerThread.
+ * Tries to create and start one worker if fewer than target
+ * parallelism level exist. Adjusts counts etc on failure.
*/
- final String nextWorkerName() {
- return workerNamePrefix.concat
- (Integer.toString(nextWorkerNumber.addAndGet(1)));
+ private void tryAddWorker() {
+ long c; int u;
+ while ((u = (int)((c = ctl) >>> 32)) < 0 &&
+ (u & SHORT_SIGN) != 0 && (int)c == 0) {
+ long nc = (long)(((u + UTC_UNIT) & UTC_MASK) |
+ ((u + UAC_UNIT) & UAC_MASK)) << 32;
+ if (U.compareAndSwapLong(this, CTL, c, nc)) {
+ ForkJoinWorkerThreadFactory fac;
+ Throwable ex = null;
+ ForkJoinWorkerThread wt = null;
+ try {
+ if ((fac = factory) != null &&
+ (wt = fac.newThread(this)) != null) {
+ wt.start();
+ break;
+ }
+ } catch (Throwable e) {
+ ex = e;
+ }
+ deregisterWorker(wt, ex);
+ break;
+ }
+ }
}
+ // Registering and deregistering workers
+
/**
- * Callback from ForkJoinWorkerThread constructor to establish its
- * poolIndex and record its WorkQueue. To avoid scanning bias due
- * to packing entries in front of the workQueues array, we treat
- * the array as a simple power-of-two hash table using per-thread
- * seed as hash, expanding as needed.
+ * Callback from ForkJoinWorkerThread to establish and record its
+ * WorkQueue. To avoid scanning bias due to packing entries in
+ * front of the workQueues array, we treat the array as a simple
+ * power-of-two hash table using per-thread seed as hash,
+ * expanding as needed.
*
- * @param w the worker's queue
+ * @param wt the worker thread
+ * @return the worker's queue
*/
-
- final void registerWorker(WorkQueue w) {
- Mutex lock = this.lock;
- lock.lock();
+ final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
+ Thread.UncaughtExceptionHandler handler; WorkQueue[] ws; int s, ps;
+ wt.setDaemon(true);
+ if ((handler = ueh) != null)
+ wt.setUncaughtExceptionHandler(handler);
+ do {} while (!U.compareAndSwapInt(this, INDEXSEED, s = indexSeed,
+ s += SEED_INCREMENT) ||
+ s == 0); // skip 0
+ WorkQueue w = new WorkQueue(this, wt, config >>> 16, s);
+ if (((ps = plock) & PL_LOCK) != 0 ||
+ !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
+ ps = acquirePlock();
+ int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN);
try {
- WorkQueue[] ws = workQueues;
- if (w != null && ws != null) { // skip on shutdown/failure
- int rs, n = ws.length, m = n - 1;
- int s = nextSeed += SEED_INCREMENT; // rarely-colliding sequence
- w.seed = (s == 0) ? 1 : s; // ensure non-zero seed
- int r = (s << 1) | 1; // use odd-numbered indices
- if (ws[r &= m] != null) { // collision
- int probes = 0; // step by approx half size
- int step = (n <= 4) ? 2 : ((n >>> 1) & SQMASK) + 2;
+ if ((ws = workQueues) != null) { // skip if shutting down
+ int n = ws.length, m = n - 1;
+ int r = (s << 1) | 1; // use odd-numbered indices
+ if (ws[r &= m] != null) { // collision
+ int probes = 0; // step by approx half size
+ int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2;
while (ws[r = (r + step) & m] != null) {
if (++probes >= n) {
workQueues = ws = Arrays.copyOf(ws, n <<= 1);
@@ -1322,104 +1721,206 @@ public class ForkJoinPool extends AbstractExecutorService {
}
}
}
- w.eventCount = w.poolIndex = r; // establish before recording
- ws[r] = w; // also update seq
- runState = ((rs = runState) & SHUTDOWN) | ((rs + 2) & ~SHUTDOWN);
+ w.eventCount = w.poolIndex = r; // volatile write orders
+ ws[r] = w;
}
} finally {
- lock.unlock();
+ if (!U.compareAndSwapInt(this, PLOCK, ps, nps))
+ releasePlock(nps);
}
+ wt.setName(workerNamePrefix.concat(Integer.toString(w.poolIndex)));
+ return w;
}
/**
* Final callback from terminating worker, as well as upon failure
- * to construct or start a worker in addWorker. Removes record of
- * worker from array, and adjusts counts. If pool is shutting
- * down, tries to complete termination.
+ * to construct or start a worker. Removes record of worker from
+ * array, and adjusts counts. If pool is shutting down, tries to
+ * complete termination.
*
- * @param wt the worker thread or null if addWorker failed
+ * @param wt the worker thread or null if construction failed
* @param ex the exception causing failure, or null if none
*/
final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
- Mutex lock = this.lock;
WorkQueue w = null;
if (wt != null && (w = wt.workQueue) != null) {
- w.runState = -1; // ensure runState is set
- stealCount.getAndAdd(w.totalSteals + w.nsteals);
- int idx = w.poolIndex;
- lock.lock();
- try { // remove record from array
+ int ps;
+ w.qlock = -1; // ensure set
+ long ns = w.nsteals, sc; // collect steal count
+ do {} while (!U.compareAndSwapLong(this, STEALCOUNT,
+ sc = stealCount, sc + ns));
+ if (((ps = plock) & PL_LOCK) != 0 ||
+ !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
+ ps = acquirePlock();
+ int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN);
+ try {
+ int idx = w.poolIndex;
WorkQueue[] ws = workQueues;
if (ws != null && idx >= 0 && idx < ws.length && ws[idx] == w)
ws[idx] = null;
} finally {
- lock.unlock();
+ if (!U.compareAndSwapInt(this, PLOCK, ps, nps))
+ releasePlock(nps);
}
}
- long c; // adjust ctl counts
+ long c; // adjust ctl counts
do {} while (!U.compareAndSwapLong
(this, CTL, c = ctl, (((c - AC_UNIT) & AC_MASK) |
((c - TC_UNIT) & TC_MASK) |
(c & ~(AC_MASK|TC_MASK)))));
- if (!tryTerminate(false, false) && w != null) {
- w.cancelAll(); // cancel remaining tasks
- if (w.array != null) // suppress signal if never ran
- signalWork(); // wake up or create replacement
- if (ex == null) // help clean refs on way out
- ForkJoinTask.helpExpungeStaleExceptions();
+ if (!tryTerminate(false, false) && w != null && w.array != null) {
+ w.cancelAll(); // cancel remaining tasks
+ WorkQueue[] ws; WorkQueue v; Thread p; int u, i, e;
+ while ((u = (int)((c = ctl) >>> 32)) < 0 && (e = (int)c) >= 0) {
+ if (e > 0) { // activate or create replacement
+ if ((ws = workQueues) == null ||
+ (i = e & SMASK) >= ws.length ||
+ (v = ws[i]) == null)
+ break;
+ long nc = (((long)(v.nextWait & E_MASK)) |
+ ((long)(u + UAC_UNIT) << 32));
+ if (v.eventCount != (e | INT_SIGN))
+ break;
+ if (U.compareAndSwapLong(this, CTL, c, nc)) {
+ v.eventCount = (e + E_SEQ) & E_MASK;
+ if ((p = v.parker) != null)
+ U.unpark(p);
+ break;
+ }
+ }
+ else {
+ if ((short)u < 0)
+ tryAddWorker();
+ break;
+ }
+ }
}
-
- if (ex != null) // rethrow
+ if (ex == null) // help clean refs on way out
+ ForkJoinTask.helpExpungeStaleExceptions();
+ else // rethrow
ForkJoinTask.rethrow(ex);
}
-
// Submissions
/**
* Unless shutting down, adds the given task to a submission queue
* at submitter's current queue index (modulo submission
- * range). If no queue exists at the index, one is created. If
- * the queue is busy, another index is randomly chosen. The
- * submitMask bounds the effective number of queues to the
- * (nearest power of two for) parallelism level.
+ * range). Only the most common path is directly handled in this
+ * method. All others are relayed to fullExternalPush.
*
* @param task the task. Caller must ensure non-null.
*/
- private void doSubmit(ForkJoinTask<?> task) {
- Submitter s = submitters.get();
- for (int r = s.seed, m = submitMask;;) {
- WorkQueue[] ws; WorkQueue q;
- int k = r & m & SQMASK; // use only even indices
- if (runState < 0 || (ws = workQueues) == null || ws.length <= k)
- throw new RejectedExecutionException(); // shutting down
- else if ((q = ws[k]) == null) { // create new queue
- WorkQueue nq = new WorkQueue(this, null, SHARED_QUEUE);
- Mutex lock = this.lock; // construct outside lock
- lock.lock();
- try { // recheck under lock
- int rs = runState; // to update seq
- if (ws == workQueues && ws[k] == null) {
- ws[k] = nq;
- runState = ((rs & SHUTDOWN) | ((rs + 2) & ~SHUTDOWN));
- }
- } finally {
- lock.unlock();
- }
- }
- else if (q.trySharedPush(task)) {
- signalWork();
+ final void externalPush(ForkJoinTask<?> task) {
+ WorkQueue[] ws; WorkQueue q; Submitter z; int m; ForkJoinTask<?>[] a;
+ if ((z = submitters.get()) != null && plock > 0 &&
+ (ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&
+ (q = ws[m & z.seed & SQMASK]) != null &&
+ U.compareAndSwapInt(q, QLOCK, 0, 1)) { // lock
+ int b = q.base, s = q.top, n, an;
+ if ((a = q.array) != null && (an = a.length) > (n = s + 1 - b)) {
+ int j = (((an - 1) & s) << ASHIFT) + ABASE;
+ U.putOrderedObject(a, j, task);
+ q.top = s + 1; // push on to deque
+ q.qlock = 0;
+ if (n <= 2)
+ signalWork(q);
return;
}
- else if (m > 1) { // move to a different index
- r ^= r << 13; // same xorshift as WorkQueues
+ q.qlock = 0;
+ }
+ fullExternalPush(task);
+ }
+
+ /**
+ * Full version of externalPush. This method is called, among
+ * other times, upon the first submission of the first task to the
+ * pool, so must perform secondary initialization. It also
+ * detects first submission by an external thread by looking up
+ * its ThreadLocal, and creates a new shared queue if the one at
+ * index if empty or contended. The plock lock body must be
+ * exception-free (so no try/finally) so we optimistically
+ * allocate new queues outside the lock and throw them away if
+ * (very rarely) not needed.
+ *
+ * Secondary initialization occurs when plock is zero, to create
+ * workQueue array and set plock to a valid value. This lock body
+ * must also be exception-free. Because the plock seq value can
+ * eventually wrap around zero, this method harmlessly fails to
+ * reinitialize if workQueues exists, while still advancing plock.
+ */
+ private void fullExternalPush(ForkJoinTask<?> task) {
+ int r = 0; // random index seed
+ for (Submitter z = submitters.get();;) {
+ WorkQueue[] ws; WorkQueue q; int ps, m, k;
+ if (z == null) {
+ if (U.compareAndSwapInt(this, INDEXSEED, r = indexSeed,
+ r += SEED_INCREMENT) && r != 0)
+ submitters.set(z = new Submitter(r));
+ }
+ else if (r == 0) { // move to a different index
+ r = z.seed;
+ r ^= r << 13; // same xorshift as WorkQueues
r ^= r >>> 17;
- s.seed = r ^= r << 5;
+ z.seed = r ^ (r << 5);
+ }
+ else if ((ps = plock) < 0)
+ throw new RejectedExecutionException();
+ else if (ps == 0 || (ws = workQueues) == null ||
+ (m = ws.length - 1) < 0) { // initialize workQueues
+ int p = config & SMASK; // find power of two table size
+ int n = (p > 1) ? p - 1 : 1; // ensure at least 2 slots
+ n |= n >>> 1; n |= n >>> 2; n |= n >>> 4;
+ n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1;
+ WorkQueue[] nws = ((ws = workQueues) == null || ws.length == 0 ?
+ new WorkQueue[n] : null);
+ if (((ps = plock) & PL_LOCK) != 0 ||
+ !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
+ ps = acquirePlock();
+ if (((ws = workQueues) == null || ws.length == 0) && nws != null)
+ workQueues = nws;
+ int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN);
+ if (!U.compareAndSwapInt(this, PLOCK, ps, nps))
+ releasePlock(nps);
+ }
+ else if ((q = ws[k = r & m & SQMASK]) != null) {
+ if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) {
+ ForkJoinTask<?>[] a = q.array;
+ int s = q.top;
+ boolean submitted = false;
+ try { // locked version of push
+ if ((a != null && a.length > s + 1 - q.base) ||
+ (a = q.growArray()) != null) { // must presize
+ int j = (((a.length - 1) & s) << ASHIFT) + ABASE;
+ U.putOrderedObject(a, j, task);
+ q.top = s + 1;
+ submitted = true;
+ }
+ } finally {
+ q.qlock = 0; // unlock
+ }
+ if (submitted) {
+ signalWork(q);
+ return;
+ }
+ }
+ r = 0; // move on failure
+ }
+ else if (((ps = plock) & PL_LOCK) == 0) { // create new queue
+ q = new WorkQueue(this, null, SHARED_QUEUE, r);
+ if (((ps = plock) & PL_LOCK) != 0 ||
+ !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
+ ps = acquirePlock();
+ if ((ws = workQueues) != null && k < ws.length && ws[k] == null)
+ ws[k] = q;
+ int nps = (ps & SHUTDOWN) | ((ps + PL_LOCK) & ~SHUTDOWN);
+ if (!U.compareAndSwapInt(this, PLOCK, ps, nps))
+ releasePlock(nps);
}
else
- Thread.yield(); // yield if no alternatives
+ r = 0; // try elsewhere while lock held
}
}
@@ -1434,37 +1935,37 @@ public class ForkJoinPool extends AbstractExecutorService {
}
/**
- * Tries to activate or create a worker if too few are active.
+ * Tries to create or activate a worker if too few are active.
+ *
+ * @param q the (non-null) queue holding tasks to be signalled
*/
- final void signalWork() {
- long c; int u;
- while ((u = (int)((c = ctl) >>> 32)) < 0) { // too few active
- WorkQueue[] ws = workQueues; int e, i; WorkQueue w; Thread p;
- if ((e = (int)c) > 0) { // at least one waiting
- if (ws != null && (i = e & SMASK) < ws.length &&
+ final void signalWork(WorkQueue q) {
+ int hint = q.poolIndex;
+ long c; int e, u, i, n; WorkQueue[] ws; WorkQueue w; Thread p;
+ while ((u = (int)((c = ctl) >>> 32)) < 0) {
+ if ((e = (int)c) > 0) {
+ if ((ws = workQueues) != null && ws.length > (i = e & SMASK) &&
(w = ws[i]) != null && w.eventCount == (e | INT_SIGN)) {
long nc = (((long)(w.nextWait & E_MASK)) |
((long)(u + UAC_UNIT) << 32));
if (U.compareAndSwapLong(this, CTL, c, nc)) {
+ w.hint = hint;
w.eventCount = (e + E_SEQ) & E_MASK;
if ((p = w.parker) != null)
- U.unpark(p); // activate and release
+ U.unpark(p);
break;
}
+ if (q.top - q.base <= 0)
+ break;
}
else
break;
}
- else if (e == 0 && (u & SHORT_SIGN) != 0) { // too few total
- long nc = (long)(((u + UTC_UNIT) & UTC_MASK) |
- ((u + UAC_UNIT) & UAC_MASK)) << 32;
- if (U.compareAndSwapLong(this, CTL, c, nc)) {
- addWorker();
- break;
- }
- }
- else
+ else {
+ if ((short)u < 0)
+ tryAddWorker();
break;
+ }
}
}
@@ -1474,8 +1975,8 @@ public class ForkJoinPool extends AbstractExecutorService {
* Top-level runloop for workers, called by ForkJoinWorkerThread.run.
*/
final void runWorker(WorkQueue w) {
- w.growArray(false); // initialize queue array in this thread
- do { w.runTask(scan(w)); } while (w.runState >= 0);
+ w.growArray(); // allocate queue
+ do { w.runTask(scan(w)); } while (w.qlock >= 0);
}
/**
@@ -1486,116 +1987,122 @@ public class ForkJoinPool extends AbstractExecutorService {
* contention, or state changes that indicate possible success on
* re-invocation.
*
- * The scan searches for tasks across a random permutation of
- * queues (starting at a random index and stepping by a random
- * relative prime, checking each at least once). The scan
- * terminates upon either finding a non-empty queue, or completing
- * the sweep. If the worker is not inactivated, it takes and
- * returns a task from this queue. On failure to find a task, we
- * take one of the following actions, after which the caller will
- * retry calling this method unless terminated.
+ * The scan searches for tasks across queues (starting at a random
+ * index, and relying on registerWorker to irregularly scatter
+ * them within array to avoid bias), checking each at least twice.
+ * The scan terminates upon either finding a non-empty queue, or
+ * completing the sweep. If the worker is not inactivated, it
+ * takes and returns a task from this queue. Otherwise, if not
+ * activated, it signals workers (that may include itself) and
+ * returns so caller can retry. Also returns for true if the
+ * worker array may have changed during an empty scan. On failure
+ * to find a task, we take one of the following actions, after
+ * which the caller will retry calling this method unless
+ * terminated.
*
* * If pool is terminating, terminate the worker.
*
- * * If not a complete sweep, try to release a waiting worker. If
- * the scan terminated because the worker is inactivated, then the
- * released worker will often be the calling worker, and it can
- * succeed obtaining a task on the next call. Or maybe it is
- * another worker, but with same net effect. Releasing in other
- * cases as well ensures that we have enough workers running.
- *
* * If not already enqueued, try to inactivate and enqueue the
* worker on wait queue. Or, if inactivating has caused the pool
- * to be quiescent, relay to idleAwaitWork to check for
- * termination and possibly shrink pool.
+ * to be quiescent, relay to idleAwaitWork to possibly shrink
+ * pool.
*
- * * If already inactive, and the caller has run a task since the
- * last empty scan, return (to allow rescan) unless others are
- * also inactivated. Field WorkQueue.rescans counts down on each
- * scan to ensure eventual inactivation and blocking.
+ * * If already enqueued and none of the above apply, possibly
+ * park awaiting signal, else lingering to help scan and signal.
*
- * * If already enqueued and none of the above apply, park
- * awaiting signal,
+ * * If a non-empty queue discovered or left as a hint,
+ * help wake up other workers before return.
*
* @param w the worker (via its WorkQueue)
- * @return a task or null of none found
+ * @return a task or null if none found
*/
private final ForkJoinTask<?> scan(WorkQueue w) {
- WorkQueue[] ws; // first update random seed
- int r = w.seed; r ^= r << 13; r ^= r >>> 17; w.seed = r ^= r << 5;
- int rs = runState, m; // volatile read order matters
- if ((ws = workQueues) != null && (m = ws.length - 1) > 0) {
- int ec = w.eventCount; // ec is negative if inactive
- int step = (r >>> 16) | 1; // relative prime
- for (int j = (m + 1) << 2; ; r += step) {
- WorkQueue q; ForkJoinTask<?> t; ForkJoinTask<?>[] a; int b;
- if ((q = ws[r & m]) != null && (b = q.base) - q.top < 0 &&
- (a = q.array) != null) { // probably nonempty
+ WorkQueue[] ws; int m;
+ int ps = plock; // read plock before ws
+ if (w != null && (ws = workQueues) != null && (m = ws.length - 1) >= 0) {
+ int ec = w.eventCount; // ec is negative if inactive
+ int r = w.seed; r ^= r << 13; r ^= r >>> 17; w.seed = r ^= r << 5;
+ w.hint = -1; // update seed and clear hint
+ int j = ((m + m + 1) | MIN_SCAN) & MAX_SCAN;
+ do {
+ WorkQueue q; ForkJoinTask<?>[] a; int b;
+ if ((q = ws[(r + j) & m]) != null && (b = q.base) - q.top < 0 &&
+ (a = q.array) != null) { // probably nonempty
int i = (((a.length - 1) & b) << ASHIFT) + ABASE;
- t = (ForkJoinTask<?>)U.getObjectVolatile(a, i);
+ ForkJoinTask<?> t = (ForkJoinTask<?>)
+ U.getObjectVolatile(a, i);
if (q.base == b && ec >= 0 && t != null &&
U.compareAndSwapObject(a, i, t, null)) {
- q.base = b + 1; // specialization of pollAt
- return t;
+ if ((q.base = b + 1) - q.top < 0)
+ signalWork(q);
+ return t; // taken
}
- else if (ec < 0 || j <= m) {
- rs = 0; // mark scan as imcomplete
- break; // caller can retry after release
+ else if ((ec < 0 || j < m) && (int)(ctl >> AC_SHIFT) <= 0) {
+ w.hint = (r + j) & m; // help signal below
+ break; // cannot take
}
}
- if (--j < 0)
- break;
- }
+ } while (--j >= 0);
- long c = ctl; int e = (int)c, a = (int)(c >> AC_SHIFT), nr, ns;
- if (e < 0) // decode ctl on empty scan
- w.runState = -1; // pool is terminating
- else if (rs == 0 || rs != runState) { // incomplete scan
- WorkQueue v; Thread p; // try to release a waiter
- if (e > 0 && a < 0 && w.eventCount == ec &&
- (v = ws[e & m]) != null && v.eventCount == (e | INT_SIGN)) {
- long nc = ((long)(v.nextWait & E_MASK) |
- ((c + AC_UNIT) & (AC_MASK|TC_MASK)));
- if (ctl == c && U.compareAndSwapLong(this, CTL, c, nc)) {
+ int h, e, ns; long c, sc; WorkQueue q;
+ if ((ns = w.nsteals) != 0) {
+ if (U.compareAndSwapLong(this, STEALCOUNT,
+ sc = stealCount, sc + ns))
+ w.nsteals = 0; // collect steals and rescan
+ }
+ else if (plock != ps) // consistency check
+ ; // skip
+ else if ((e = (int)(c = ctl)) < 0)
+ w.qlock = -1; // pool is terminating
+ else {
+ if ((h = w.hint) < 0) {
+ if (ec >= 0) { // try to enqueue/inactivate
+ long nc = (((long)ec |
+ ((c - AC_UNIT) & (AC_MASK|TC_MASK))));
+ w.nextWait = e; // link and mark inactive
+ w.eventCount = ec | INT_SIGN;
+ if (ctl != c || !U.compareAndSwapLong(this, CTL, c, nc))
+ w.eventCount = ec; // unmark on CAS failure
+ else if ((int)(c >> AC_SHIFT) == 1 - (config & SMASK))
+ idleAwaitWork(w, nc, c);
+ }
+ else if (w.eventCount < 0 && ctl == c) {
+ Thread wt = Thread.currentThread();
+ Thread.interrupted(); // clear status
+ U.putObject(wt, PARKBLOCKER, this);
+ w.parker = wt; // emulate LockSupport.park
+ if (w.eventCount < 0) // recheck
+ U.park(false, 0L); // block
+ w.parker = null;
+ U.putObject(wt, PARKBLOCKER, null);
+ }
+ }
+ if ((h >= 0 || (h = w.hint) >= 0) &&
+ (ws = workQueues) != null && h < ws.length &&
+ (q = ws[h]) != null) { // signal others before retry
+ WorkQueue v; Thread p; int u, i, s;
+ for (int n = (config & SMASK) - 1;;) {
+ int idleCount = (w.eventCount < 0) ? 0 : -1;
+ if (((s = idleCount - q.base + q.top) <= n &&
+ (n = s) <= 0) ||
+ (u = (int)((c = ctl) >>> 32)) >= 0 ||
+ (e = (int)c) <= 0 || m < (i = e & SMASK) ||
+ (v = ws[i]) == null)
+ break;
+ long nc = (((long)(v.nextWait & E_MASK)) |
+ ((long)(u + UAC_UNIT) << 32));
+ if (v.eventCount != (e | INT_SIGN) ||
+ !U.compareAndSwapLong(this, CTL, c, nc))
+ break;
+ v.hint = h;
v.eventCount = (e + E_SEQ) & E_MASK;
if ((p = v.parker) != null)
U.unpark(p);
+ if (--n <= 0)
+ break;
}
}
}
- else if (ec >= 0) { // try to enqueue/inactivate
- long nc = (long)ec | ((c - AC_UNIT) & (AC_MASK|TC_MASK));
- w.nextWait = e;
- w.eventCount = ec | INT_SIGN; // mark as inactive
- if (ctl != c || !U.compareAndSwapLong(this, CTL, c, nc))
- w.eventCount = ec; // unmark on CAS failure
- else {
- if ((ns = w.nsteals) != 0) {
- w.nsteals = 0; // set rescans if ran task
- w.rescans = (a > 0) ? 0 : a + parallelism;
- w.totalSteals += ns;
- }
- if (a == 1 - parallelism) // quiescent
- idleAwaitWork(w, nc, c);
- }
- }
- else if (w.eventCount < 0) { // already queued
- if ((nr = w.rescans) > 0) { // continue rescanning
- int ac = a + parallelism;
- if (((w.rescans = (ac < nr) ? ac : nr - 1) & 3) == 0)
- Thread.yield(); // yield before block
- }
- else {
- Thread.interrupted(); // clear status
- Thread wt = Thread.currentThread();
- U.putObject(wt, PARKBLOCKER, this);
- w.parker = wt; // emulate LockSupport.park
- if (w.eventCount < 0) // recheck
- U.park(false, 0L);
- w.parker = null;
- U.putObject(wt, PARKBLOCKER, null);
- }
- }
}
return null;
}
@@ -1603,8 +2110,8 @@ public class ForkJoinPool extends AbstractExecutorService {
/**
* If inactivating worker w has caused the pool to become
* quiescent, checks for pool termination, and, so long as this is
- * not the only worker, waits for event for up to SHRINK_RATE
- * nanosecs. On timeout, if ctl has not changed, terminates the
+ * not the only worker, waits for event for up to a given
+ * duration. On timeout, if ctl has not changed, terminates the
* worker, which will in turn wake up another worker to possibly
* repeat this process.
*
@@ -1613,25 +2120,28 @@ public class ForkJoinPool extends AbstractExecutorService {
* @param prevCtl the ctl value to restore if thread is terminated
*/
private void idleAwaitWork(WorkQueue w, long currentCtl, long prevCtl) {
- if (w.eventCount < 0 && !tryTerminate(false, false) &&
- (int)prevCtl != 0 && !hasQueuedSubmissions() && ctl == currentCtl) {
+ if (w != null && w.eventCount < 0 &&
+ !tryTerminate(false, false) && (int)prevCtl != 0 &&
+ ctl == currentCtl) {
+ int dc = -(short)(currentCtl >>> TC_SHIFT);
+ long parkTime = dc < 0 ? FAST_IDLE_TIMEOUT: (dc + 1) * IDLE_TIMEOUT;
+ long deadline = System.nanoTime() + parkTime - TIMEOUT_SLOP;
Thread wt = Thread.currentThread();
- Thread.yield(); // yield before block
while (ctl == currentCtl) {
- long startTime = System.nanoTime();
Thread.interrupted(); // timed variant of version in scan()
U.putObject(wt, PARKBLOCKER, this);
w.parker = wt;
if (ctl == currentCtl)
- U.park(false, SHRINK_RATE);
+ U.park(false, parkTime);
w.parker = null;
U.putObject(wt, PARKBLOCKER, null);
if (ctl != currentCtl)
break;
- if (System.nanoTime() - startTime >= SHRINK_TIMEOUT &&
+ if (deadline - System.nanoTime() <= 0L &&
U.compareAndSwapLong(this, CTL, currentCtl, prevCtl)) {
w.eventCount = (w.eventCount + E_SEQ) | E_MASK;
- w.runState = -1; // shrink
+ w.hint = -1;
+ w.qlock = -1; // shrink
break;
}
}
@@ -1639,6 +2149,47 @@ public class ForkJoinPool extends AbstractExecutorService {
}
/**
+ * Scans through queues looking for work while joining a task; if
+ * any present, signals. May return early if more signalling is
+ * detectably unneeded.
+ *
+ * @param task return early if done
+ * @param origin an index to start scan
+ */
+ private void helpSignal(ForkJoinTask<?> task, int origin) {
+ WorkQueue[] ws; WorkQueue w; Thread p; long c; int m, u, e, i, s;
+ if (task != null && task.status >= 0 &&
+ (u = (int)(ctl >>> 32)) < 0 && (u >> UAC_SHIFT) < 0 &&
+ (ws = workQueues) != null && (m = ws.length - 1) >= 0) {
+ outer: for (int k = origin, j = m; j >= 0; --j) {
+ WorkQueue q = ws[k++ & m];
+ for (int n = m;;) { // limit to at most m signals
+ if (task.status < 0)
+ break outer;
+ if (q == null ||
+ ((s = -q.base + q.top) <= n && (n = s) <= 0))
+ break;
+ if ((u = (int)((c = ctl) >>> 32)) >= 0 ||
+ (e = (int)c) <= 0 || m < (i = e & SMASK) ||
+ (w = ws[i]) == null)
+ break outer;
+ long nc = (((long)(w.nextWait & E_MASK)) |
+ ((long)(u + UAC_UNIT) << 32));
+ if (w.eventCount != (e | INT_SIGN))
+ break outer;
+ if (U.compareAndSwapLong(this, CTL, c, nc)) {
+ w.eventCount = (e + E_SEQ) & E_MASK;
+ if ((p = w.parker) != null)
+ U.unpark(p);
+ if (--n <= 0)
+ break;
+ }
+ }
+ }
+ }
+ }
+
+ /**
* Tries to locate and execute tasks for a stealer of the given
* task, or in turn one of its stealers, Traces currentSteal ->
* currentJoin links looking for a thread working on a descendant
@@ -1669,7 +2220,7 @@ public class ForkJoinPool extends AbstractExecutorService {
}
if ((ws = workQueues) == null || (m = ws.length - 1) <= 0)
break restart; // shutting down
- if ((v = ws[h = (j.stealHint | 1) & m]) == null ||
+ if ((v = ws[h = (j.hint | 1) & m]) == null ||
v.currentSteal != subtask) {
for (int origin = h;;) { // find stealer
if (((h = (h + 2) & m) & 15) == 1 &&
@@ -1677,7 +2228,7 @@ public class ForkJoinPool extends AbstractExecutorService {
continue restart; // occasional staleness check
if ((v = ws[h]) != null &&
v.currentSteal == subtask) {
- j.stealHint = h; // save hint
+ j.hint = h; // save hint
break;
}
if (h == origin)
@@ -1725,88 +2276,77 @@ public class ForkJoinPool extends AbstractExecutorService {
}
/**
- * If task is at base of some steal queue, steals and executes it.
+ * Analog of tryHelpStealer for CountedCompleters. Tries to steal
+ * and run tasks within the target's computation.
*
- * @param joiner the joining worker
- * @param task the task
- */
- private void tryPollForAndExec(WorkQueue joiner, ForkJoinTask<?> task) {
- WorkQueue[] ws;
- if ((ws = workQueues) != null) {
- for (int j = 1; j < ws.length && task.status >= 0; j += 2) {
- WorkQueue q = ws[j];
- if (q != null && q.pollFor(task)) {
- joiner.runSubtask(task);
- break;
+ * @param task the task to join
+ * @param mode if shared, exit upon completing any task
+ * if all workers are active
+ */
+ private int helpComplete(ForkJoinTask<?> task, int mode) {
+ WorkQueue[] ws; WorkQueue q; int m, n, s, u;
+ if (task != null && (ws = workQueues) != null &&
+ (m = ws.length - 1) >= 0) {
+ for (int j = 1, origin = j;;) {
+ if ((s = task.status) < 0)
+ return s;
+ if ((q = ws[j & m]) != null && q.pollAndExecCC(task)) {
+ origin = j;
+ if (mode == SHARED_QUEUE &&
+ ((u = (int)(ctl >>> 32)) >= 0 || (u >> UAC_SHIFT) >= 0))
+ break;
}
+ else if ((j = (j + 2) & m) == origin)
+ break;
}
}
+ return 0;
}
/**
* Tries to decrement active count (sometimes implicitly) and
* possibly release or create a compensating worker in preparation
* for blocking. Fails on contention or termination. Otherwise,
- * adds a new thread if no idle workers are available and either
- * pool would become completely starved or: (at least half
- * starved, and fewer than 50% spares exist, and there is at least
- * one task apparently available). Even though the availability
- * check requires a full scan, it is worthwhile in reducing false
- * alarms.
- *
- * @param task if non-null, a task being waited for
- * @param blocker if non-null, a blocker being waited for
- * @return true if the caller can block, else should recheck and retry
- */
- final boolean tryCompensate(ForkJoinTask<?> task, ManagedBlocker blocker) {
- int pc = parallelism, e;
- long c = ctl;
- WorkQueue[] ws = workQueues;
- if ((e = (int)c) >= 0 && ws != null) {
- int u, a, ac, hc;
- int tc = (short)((u = (int)(c >>> 32)) >>> UTC_SHIFT) + pc;
- boolean replace = false;
- if ((a = u >> UAC_SHIFT) <= 0) {
- if ((ac = a + pc) <= 1)
- replace = true;
- else if ((e > 0 || (task != null &&
- ac <= (hc = pc >>> 1) && tc < pc + hc))) {
- WorkQueue w;
- for (int j = 0; j < ws.length; ++j) {
- if ((w = ws[j]) != null && !w.isEmpty()) {
- replace = true;
- break; // in compensation range and tasks available
- }
- }
+ * adds a new thread if no idle workers are available and pool
+ * may become starved.
+ */
+ final boolean tryCompensate() {
+ int pc = config & SMASK, e, i, tc; long c;
+ WorkQueue[] ws; WorkQueue w; Thread p;
+ if ((ws = workQueues) != null && (e = (int)(c = ctl)) >= 0) {
+ if (e != 0 && (i = e & SMASK) < ws.length &&
+ (w = ws[i]) != null && w.eventCount == (e | INT_SIGN)) {
+ long nc = ((long)(w.nextWait & E_MASK) |
+ (c & (AC_MASK|TC_MASK)));
+ if (U.compareAndSwapLong(this, CTL, c, nc)) {
+ w.eventCount = (e + E_SEQ) & E_MASK;
+ if ((p = w.parker) != null)
+ U.unpark(p);
+ return true; // replace with idle worker
}
}
- if ((task == null || task.status >= 0) && // recheck need to block
- (blocker == null || !blocker.isReleasable()) && ctl == c) {
- if (!replace) { // no compensation
- long nc = ((c - AC_UNIT) & AC_MASK) | (c & ~AC_MASK);
- if (U.compareAndSwapLong(this, CTL, c, nc))
- return true;
- }
- else if (e != 0) { // release an idle worker
- WorkQueue w; Thread p; int i;
- if ((i = e & SMASK) < ws.length && (w = ws[i]) != null) {
- long nc = ((long)(w.nextWait & E_MASK) |
- (c & (AC_MASK|TC_MASK)));
- if (w.eventCount == (e | INT_SIGN) &&
- U.compareAndSwapLong(this, CTL, c, nc)) {
- w.eventCount = (e + E_SEQ) & E_MASK;
- if ((p = w.parker) != null)
- U.unpark(p);
+ else if ((tc = (short)(c >>> TC_SHIFT)) >= 0 &&
+ (int)(c >> AC_SHIFT) + pc > 1) {
+ long nc = ((c - AC_UNIT) & AC_MASK) | (c & ~AC_MASK);
+ if (U.compareAndSwapLong(this, CTL, c, nc))
+ return true; // no compensation
+ }
+ else if (tc + pc < MAX_CAP) {
+ long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK);
+ if (U.compareAndSwapLong(this, CTL, c, nc)) {
+ ForkJoinWorkerThreadFactory fac;
+ Throwable ex = null;
+ ForkJoinWorkerThread wt = null;
+ try {
+ if ((fac = factory) != null &&
+ (wt = fac.newThread(this)) != null) {
+ wt.start();
return true;
}
+ } catch (Throwable rex) {
+ ex = rex;
}
- }
- else if (tc < MAX_CAP) { // create replacement
- long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK);
- if (U.compareAndSwapLong(this, CTL, c, nc)) {
- addWorker();
- return true;
- }
+ deregisterWorker(wt, ex); // clean up and return false
}
}
}
@@ -1821,25 +2361,25 @@ public class ForkJoinPool extends AbstractExecutorService {
* @return task status on exit
*/
final int awaitJoin(WorkQueue joiner, ForkJoinTask<?> task) {
- int s;
- if ((s = task.status) >= 0) {
+ int s = 0;
+ if (joiner != null && task != null && (s = task.status) >= 0) {
ForkJoinTask<?> prevJoin = joiner.currentJoin;
joiner.currentJoin = task;
- long startTime = 0L;
- for (int k = 0;;) {
- if ((s = (joiner.isEmpty() ? // try to help
- tryHelpStealer(joiner, task) :
- joiner.tryRemoveAndExec(task))) == 0 &&
+ do {} while ((s = task.status) >= 0 && !joiner.isEmpty() &&
+ joiner.tryRemoveAndExec(task)); // process local tasks
+ if (s >= 0 && (s = task.status) >= 0) {
+ helpSignal(task, joiner.poolIndex);
+ if ((s = task.status) >= 0 &&
+ (task instanceof CountedCompleter))
+ s = helpComplete(task, LIFO_QUEUE);
+ }
+ while (s >= 0 && (s = task.status) >= 0) {
+ if ((!joiner.isEmpty() || // try helping
+ (s = tryHelpStealer(joiner, task)) == 0) &&
(s = task.status) >= 0) {
- if (k == 0) {
- startTime = System.nanoTime();
- tryPollForAndExec(joiner, task); // check uncommon case
- }
- else if ((k & (MAX_HELP - 1)) == 0 &&
- System.nanoTime() - startTime >=
- COMPENSATION_DELAY &&
- tryCompensate(task, null)) {
- if (task.trySetSignal()) {
+ helpSignal(task, joiner.poolIndex);
+ if ((s = task.status) >= 0 && tryCompensate()) {
+ if (task.trySetSignal() && (s = task.status) >= 0) {
synchronized (task) {
if (task.status >= 0) {
try { // see ForkJoinTask
@@ -1856,13 +2396,8 @@ public class ForkJoinPool extends AbstractExecutorService {
(this, CTL, c = ctl, c + AC_UNIT));
}
}
- if (s < 0 || (s = task.status) < 0) {
- joiner.currentJoin = prevJoin;
- break;
- }
- else if ((k++ & (MAX_HELP - 1)) == MAX_HELP >>> 1)
- Thread.yield(); // for politeness
}
+ joiner.currentJoin = prevJoin;
}
return s;
}
@@ -1874,46 +2409,49 @@ public class ForkJoinPool extends AbstractExecutorService {
*
* @param joiner the joining worker
* @param task the task
- * @return task status on exit
*/
- final int helpJoinOnce(WorkQueue joiner, ForkJoinTask<?> task) {
+ final void helpJoinOnce(WorkQueue joiner, ForkJoinTask<?> task) {
int s;
- while ((s = task.status) >= 0 &&
- (joiner.isEmpty() ?
- tryHelpStealer(joiner, task) :
- joiner.tryRemoveAndExec(task)) != 0)
- ;
- return s;
+ if (joiner != null && task != null && (s = task.status) >= 0) {
+ ForkJoinTask<?> prevJoin = joiner.currentJoin;
+ joiner.currentJoin = task;
+ do {} while ((s = task.status) >= 0 && !joiner.isEmpty() &&
+ joiner.tryRemoveAndExec(task));
+ if (s >= 0 && (s = task.status) >= 0) {
+ helpSignal(task, joiner.poolIndex);
+ if ((s = task.status) >= 0 &&
+ (task instanceof CountedCompleter))
+ s = helpComplete(task, LIFO_QUEUE);
+ }
+ if (s >= 0 && joiner.isEmpty()) {
+ do {} while (task.status >= 0 &&
+ tryHelpStealer(joiner, task) > 0);
+ }
+ joiner.currentJoin = prevJoin;
+ }
}
/**
* Returns a (probably) non-empty steal queue, if one is found
- * during a random, then cyclic scan, else null. This method must
- * be retried by caller if, by the time it tries to use the queue,
- * it is empty.
- */
- private WorkQueue findNonEmptyStealQueue(WorkQueue w) {
- // Similar to loop in scan(), but ignoring submissions
- int r = w.seed; r ^= r << 13; r ^= r >>> 17; w.seed = r ^= r << 5;
- int step = (r >>> 16) | 1;
- for (WorkQueue[] ws;;) {
- int rs = runState, m;
- if ((ws = workQueues) == null || (m = ws.length - 1) < 1)
- return null;
- for (int j = (m + 1) << 2; ; r += step) {
- WorkQueue q = ws[((r << 1) | 1) & m];
- if (q != null && !q.isEmpty())
- return q;
- else if (--j < 0) {
- if (runState == rs)
- return null;
- break;
+ * during a scan, else null. This method must be retried by
+ * caller if, by the time it tries to use the queue, it is empty.
+ * @param r a (random) seed for scanning
+ */
+ private WorkQueue findNonEmptyStealQueue(int r) {
+ for (;;) {
+ int ps = plock, m; WorkQueue[] ws; WorkQueue q;
+ if ((ws = workQueues) != null && (m = ws.length - 1) >= 0) {
+ for (int j = (m + 1) << 2; j >= 0; --j) {
+ if ((q = ws[(((r + j) << 1) | 1) & m]) != null &&
+ q.base - q.top < 0)
+ return q;
}
}
+ if (plock == ps)
+ return null;
}
}
-
/**
* Runs tasks until {@code isQuiescent()}. We piggyback on
* active count ctl maintenance, but rather than blocking
@@ -1922,36 +2460,34 @@ public class ForkJoinPool extends AbstractExecutorService {
*/
final void helpQuiescePool(WorkQueue w) {
for (boolean active = true;;) {
- ForkJoinTask<?> localTask; // exhaust local queue
- while ((localTask = w.nextLocalTask()) != null)
- localTask.doExec();
- WorkQueue q = findNonEmptyStealQueue(w);
- if (q != null) {
- ForkJoinTask<?> t; int b;
+ long c; WorkQueue q; ForkJoinTask<?> t; int b;
+ while ((t = w.nextLocalTask()) != null) {
+ if (w.base - w.top < 0)
+ signalWork(w);
+ t.doExec();
+ }
+ if ((q = findNonEmptyStealQueue(w.nextSeed())) != null) {
if (!active) { // re-establish active count
- long c;
active = true;
do {} while (!U.compareAndSwapLong
(this, CTL, c = ctl, c + AC_UNIT));
}
- if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null)
+ if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null) {
+ if (q.base - q.top < 0)
+ signalWork(q);
w.runSubtask(t);
+ }
}
- else {
- long c;
- if (active) { // decrement active count without queuing
+ else if (active) { // decrement active count without queuing
+ long nc = (c = ctl) - AC_UNIT;
+ if ((int)(nc >> AC_SHIFT) + (config & SMASK) == 0)
+ return; // bypass decrement-then-increment
+ if (U.compareAndSwapLong(this, CTL, c, nc))
active = false;
- do {} while (!U.compareAndSwapLong
- (this, CTL, c = ctl, c -= AC_UNIT));
- }
- else
- c = ctl; // re-increment on exit
- if ((int)(c >> AC_SHIFT) + parallelism == 0) {
- do {} while (!U.compareAndSwapLong
- (this, CTL, c = ctl, c + AC_UNIT));
- break;
- }
}
+ else if ((int)((c = ctl) >> AC_SHIFT) + (config & SMASK) == 0 &&
+ U.compareAndSwapLong(this, CTL, c, c + AC_UNIT))
+ return;
}
}
@@ -1965,27 +2501,75 @@ public class ForkJoinPool extends AbstractExecutorService {
WorkQueue q; int b;
if ((t = w.nextLocalTask()) != null)
return t;
- if ((q = findNonEmptyStealQueue(w)) == null)
+ if ((q = findNonEmptyStealQueue(w.nextSeed())) == null)
return null;
- if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null)
+ if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null) {
+ if (q.base - q.top < 0)
+ signalWork(q);
return t;
+ }
}
}
/**
- * Returns the approximate (non-atomic) number of idle threads per
- * active thread to offset steal queue size for method
- * ForkJoinTask.getSurplusQueuedTaskCount().
- */
- final int idlePerActive() {
- // Approximate at powers of two for small values, saturate past 4
- int p = parallelism;
- int a = p + (int)(ctl >> AC_SHIFT);
- return (a > (p >>>= 1) ? 0 :
- a > (p >>>= 1) ? 1 :
- a > (p >>>= 1) ? 2 :
- a > (p >>>= 1) ? 4 :
- 8);
+ * Returns a cheap heuristic guide for task partitioning when
+ * programmers, frameworks, tools, or languages have little or no
+ * idea about task granularity. In essence by offering this
+ * method, we ask users only about tradeoffs in overhead vs
+ * expected throughput and its variance, rather than how finely to
+ * partition tasks.
+ *
+ * In a steady state strict (tree-structured) computation, each
+ * thread makes available for stealing enough tasks for other
+ * threads to remain active. Inductively, if all threads play by
+ * the same rules, each thread should make available only a
+ * constant number of tasks.
+ *
+ * The minimum useful constant is just 1. But using a value of 1
+ * would require immediate replenishment upon each steal to
+ * maintain enough tasks, which is infeasible. Further,
+ * partitionings/granularities of offered tasks should minimize
+ * steal rates, which in general means that threads nearer the top
+ * of computation tree should generate more than those nearer the
+ * bottom. In perfect steady state, each thread is at
+ * approximately the same level of computation tree. However,
+ * producing extra tasks amortizes the uncertainty of progress and
+ * diffusion assumptions.
+ *
+ * So, users will want to use values larger (but not much larger)
+ * than 1 to both smooth over transient shortages and hedge
+ * against uneven progress; as traded off against the cost of
+ * extra task overhead. We leave the user to pick a threshold
+ * value to compare with the results of this call to guide
+ * decisions, but recommend values such as 3.
+ *
+ * When all threads are active, it is on average OK to estimate
+ * surplus strictly locally. In steady-state, if one thread is
+ * maintaining say 2 surplus tasks, then so are others. So we can
+ * just use estimated queue length. However, this strategy alone
+ * leads to serious mis-estimates in some non-steady-state
+ * conditions (ramp-up, ramp-down, other stalls). We can detect
+ * many of these by further considering the number of "idle"
+ * threads, that are known to have zero queued tasks, so
+ * compensate by a factor of (#idle/#active) threads.
+ *
+ * Note: The approximation of #busy workers as #active workers is
+ * not very good under current signalling scheme, and should be
+ * improved.
+ */
+ static int getSurplusQueuedTaskCount() {
+ Thread t; ForkJoinWorkerThread wt; ForkJoinPool pool; WorkQueue q;
+ if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)) {
+ int p = (pool = (wt = (ForkJoinWorkerThread)t).pool).config & SMASK;
+ int n = (q = wt.workQueue).top - q.base;
+ int a = (int)(pool.ctl >> AC_SHIFT) + p;
+ return n - (a > (p >>>= 1) ? 0 :
+ a > (p >>>= 1) ? 1 :
+ a > (p >>>= 1) ? 2 :
+ a > (p >>>= 1) ? 4 :
+ 8);
+ }
+ return 0;
}
// Termination
@@ -2005,56 +2589,71 @@ public class ForkJoinPool extends AbstractExecutorService {
* @return true if now terminating or terminated
*/
private boolean tryTerminate(boolean now, boolean enable) {
- Mutex lock = this.lock;
+ int ps;
+ if (this == common) // cannot shut down
+ return false;
+ if ((ps = plock) >= 0) { // enable by setting plock
+ if (!enable)
+ return false;
+ if ((ps & PL_LOCK) != 0 ||
+ !U.compareAndSwapInt(this, PLOCK, ps, ps += PL_LOCK))
+ ps = acquirePlock();
+ int nps = ((ps + PL_LOCK) & ~SHUTDOWN) | SHUTDOWN;
+ if (!U.compareAndSwapInt(this, PLOCK, ps, nps))
+ releasePlock(nps);
+ }
for (long c;;) {
- if (((c = ctl) & STOP_BIT) != 0) { // already terminating
- if ((short)(c >>> TC_SHIFT) == -parallelism) {
- lock.lock(); // don't need try/finally
- termination.signalAll(); // signal when 0 workers
- lock.unlock();
+ if (((c = ctl) & STOP_BIT) != 0) { // already terminating
+ if ((short)(c >>> TC_SHIFT) == -(config & SMASK)) {
+ synchronized (this) {
+ notifyAll(); // signal when 0 workers
+ }
}
return true;
}
- if (runState >= 0) { // not yet enabled
- if (!enable)
+ if (!now) { // check if idle & no tasks
+ WorkQueue[] ws; WorkQueue w;
+ if ((int)(c >> AC_SHIFT) != -(config & SMASK))
return false;
- lock.lock();
- runState |= SHUTDOWN;
- lock.unlock();
- }
- if (!now) { // check if idle & no tasks
- if ((int)(c >> AC_SHIFT) != -parallelism ||
- hasQueuedSubmissions())
- return false;
- // Check for unqueued inactive workers. One pass suffices.
- WorkQueue[] ws = workQueues; WorkQueue w;
- if (ws != null) {
- for (int i = 1; i < ws.length; i += 2) {
- if ((w = ws[i]) != null && w.eventCount >= 0)
- return false;
+ if ((ws = workQueues) != null) {
+ for (int i = 0; i < ws.length; ++i) {
+ if ((w = ws[i]) != null) {
+ if (!w.isEmpty()) { // signal unprocessed tasks
+ signalWork(w);
+ return false;
+ }
+ if ((i & 1) != 0 && w.eventCount >= 0)
+ return false; // unqueued inactive worker
+ }
}
}
}
if (U.compareAndSwapLong(this, CTL, c, c | STOP_BIT)) {
for (int pass = 0; pass < 3; ++pass) {
- WorkQueue[] ws = workQueues;
- if (ws != null) {
- WorkQueue w;
+ WorkQueue[] ws; WorkQueue w; Thread wt;
+ if ((ws = workQueues) != null) {
int n = ws.length;
for (int i = 0; i < n; ++i) {
if ((w = ws[i]) != null) {
- w.runState = -1;
+ w.qlock = -1;
if (pass > 0) {
w.cancelAll();
- if (pass > 1)
- w.interruptOwner();
+ if (pass > 1 && (wt = w.owner) != null) {
+ if (!wt.isInterrupted()) {
+ try {
+ wt.interrupt();
+ } catch (Throwable ignore) {
+ }
+ }
+ U.unpark(wt);
+ }
}
}
}
// Wake up workers parked on event queue
int i, e; long cc; Thread p;
while ((e = (int)(cc = ctl) & E_MASK) != 0 &&
- (i = e & SMASK) < n &&
+ (i = e & SMASK) < n && i >= 0 &&
(w = ws[i]) != null) {
long nc = ((long)(w.nextWait & E_MASK) |
((cc + AC_UNIT) & AC_MASK) |
@@ -2062,7 +2661,7 @@ public class ForkJoinPool extends AbstractExecutorService {
if (w.eventCount == (e | INT_SIGN) &&
U.compareAndSwapLong(this, CTL, cc, nc)) {
w.eventCount = (e + E_SEQ) & E_MASK;
- w.runState = -1;
+ w.qlock = -1;
if ((p = w.parker) != null)
U.unpark(p);
}
@@ -2073,6 +2672,135 @@ public class ForkJoinPool extends AbstractExecutorService {
}
}
+ // external operations on common pool
+
+ /**
+ * Returns common pool queue for a thread that has submitted at
+ * least one task.
+ */
+ static WorkQueue commonSubmitterQueue() {
+ ForkJoinPool p; WorkQueue[] ws; int m; Submitter z;
+ return ((z = submitters.get()) != null &&
+ (p = common) != null &&
+ (ws = p.workQueues) != null &&
+ (m = ws.length - 1) >= 0) ?
+ ws[m & z.seed & SQMASK] : null;
+ }
+
+ /**
+ * Tries to pop the given task from submitter's queue in common pool.
+ */
+ static boolean tryExternalUnpush(ForkJoinTask<?> t) {
+ ForkJoinPool p; WorkQueue[] ws; WorkQueue q; Submitter z;
+ ForkJoinTask<?>[] a; int m, s;
+ if (t != null &&
+ (z = submitters.get()) != null &&
+ (p = common) != null &&
+ (ws = p.workQueues) != null &&
+ (m = ws.length - 1) >= 0 &&
+ (q = ws[m & z.seed & SQMASK]) != null &&
+ (s = q.top) != q.base &&
+ (a = q.array) != null) {
+ long j = (((a.length - 1) & (s - 1)) << ASHIFT) + ABASE;
+ if (U.getObject(a, j) == t &&
+ U.compareAndSwapInt(q, QLOCK, 0, 1)) {
+ if (q.array == a && q.top == s && // recheck
+ U.compareAndSwapObject(a, j, t, null)) {
+ q.top = s - 1;
+ q.qlock = 0;
+ return true;
+ }
+ q.qlock = 0;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Tries to pop and run local tasks within the same computation
+ * as the given root. On failure, tries to help complete from
+ * other queues via helpComplete.
+ */
+ private void externalHelpComplete(WorkQueue q, ForkJoinTask<?> root) {
+ ForkJoinTask<?>[] a; int m;
+ if (q != null && (a = q.array) != null && (m = (a.length - 1)) >= 0 &&
+ root != null && root.status >= 0) {
+ for (;;) {
+ int s, u; Object o; CountedCompleter<?> task = null;
+ if ((s = q.top) - q.base > 0) {
+ long j = ((m & (s - 1)) << ASHIFT) + ABASE;
+ if ((o = U.getObject(a, j)) != null &&
+ (o instanceof CountedCompleter)) {
+ CountedCompleter<?> t = (CountedCompleter<?>)o, r = t;
+ do {
+ if (r == root) {
+ if (U.compareAndSwapInt(q, QLOCK, 0, 1)) {
+ if (q.array == a && q.top == s &&
+ U.compareAndSwapObject(a, j, t, null)) {
+ q.top = s - 1;
+ task = t;
+ }
+ q.qlock = 0;
+ }
+ break;
+ }
+ } while ((r = r.completer) != null);
+ }
+ }
+ if (task != null)
+ task.doExec();
+ if (root.status < 0 ||
+ (u = (int)(ctl >>> 32)) >= 0 || (u >> UAC_SHIFT) >= 0)
+ break;
+ if (task == null) {
+ helpSignal(root, q.poolIndex);
+ if (root.status >= 0)
+ helpComplete(root, SHARED_QUEUE);
+ break;
+ }
+ }
+ }
+ }
+
+ /**
+ * Tries to help execute or signal availability of the given task
+ * from submitter's queue in common pool.
+ */
+ static void externalHelpJoin(ForkJoinTask<?> t) {
+ // Some hard-to-avoid overlap with tryExternalUnpush
+ ForkJoinPool p; WorkQueue[] ws; WorkQueue q, w; Submitter z;
+ ForkJoinTask<?>[] a; int m, s, n;
+ if (t != null &&
+ (z = submitters.get()) != null &&
+ (p = common) != null &&
+ (ws = p.workQueues) != null &&
+ (m = ws.length - 1) >= 0 &&
+ (q = ws[m & z.seed & SQMASK]) != null &&
+ (a = q.array) != null) {
+ int am = a.length - 1;
+ if ((s = q.top) != q.base) {
+ long j = ((am & (s - 1)) << ASHIFT) + ABASE;
+ if (U.getObject(a, j) == t &&
+ U.compareAndSwapInt(q, QLOCK, 0, 1)) {
+ if (q.array == a && q.top == s &&
+ U.compareAndSwapObject(a, j, t, null)) {
+ q.top = s - 1;
+ q.qlock = 0;
+ t.doExec();
+ }
+ else
+ q.qlock = 0;
+ }
+ }
+ if (t.status >= 0) {
+ if (t instanceof CountedCompleter)
+ p.externalHelpComplete(q, t);
+ else
+ p.helpSignal(t, q.poolIndex);
+ }
+ }
+ }
+
// Exported methods
// Constructors
@@ -2089,7 +2817,7 @@ public class ForkJoinPool extends AbstractExecutorService {
* java.lang.RuntimePermission}{@code ("modifyThread")}
*/
public ForkJoinPool() {
- this(Runtime.getRuntime().availableProcessors(),
+ this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
defaultForkJoinWorkerThreadFactory, null, false);
}
@@ -2144,29 +2872,48 @@ public class ForkJoinPool extends AbstractExecutorService {
throw new NullPointerException();
if (parallelism <= 0 || parallelism > MAX_CAP)
throw new IllegalArgumentException();
- this.parallelism = parallelism;
this.factory = factory;
this.ueh = handler;
- this.localMode = asyncMode ? FIFO_QUEUE : LIFO_QUEUE;
+ this.config = parallelism | (asyncMode ? (FIFO_QUEUE << 16) : 0);
long np = (long)(-parallelism); // offset ctl counts
this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
- // Use nearest power 2 for workQueues size. See Hackers Delight sec 3.2.
- int n = parallelism - 1;
- n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16;
- int size = (n + 1) << 1; // #slots = 2*#workers
- this.submitMask = size - 1; // room for max # of submit queues
- this.workQueues = new WorkQueue[size];
- this.termination = (this.lock = new Mutex()).newCondition();
- this.stealCount = new AtomicLong();
- this.nextWorkerNumber = new AtomicInteger();
- int pn = poolNumberGenerator.incrementAndGet();
+ int pn = nextPoolId();
StringBuilder sb = new StringBuilder("ForkJoinPool-");
sb.append(Integer.toString(pn));
sb.append("-worker-");
this.workerNamePrefix = sb.toString();
- lock.lock();
- this.runState = 1; // set init flag
- lock.unlock();
+ }
+
+ /**
+ * Constructor for common pool, suitable only for static initialization.
+ * Basically the same as above, but uses smallest possible initial footprint.
+ */
+ ForkJoinPool(int parallelism, long ctl,
+ ForkJoinWorkerThreadFactory factory,
+ Thread.UncaughtExceptionHandler handler) {
+ this.config = parallelism;
+ this.ctl = ctl;
+ this.factory = factory;
+ this.ueh = handler;
+ this.workerNamePrefix = "ForkJoinPool.commonPool-worker-";
+ }
+
+ /**
+ * Returns the common pool instance. This pool is statically
+ * constructed; its run state is unaffected by attempts to {@link
+ * #shutdown} or {@link #shutdownNow}. However this pool and any
+ * ongoing processing are automatically terminated upon program
+ * {@link System#exit}. Any program that relies on asynchronous
+ * task processing to complete before program termination should
+ * invoke {@code commonPool().}{@link #awaitQuiescence}, before
+ * exit.
+ *
+ * @return the common pool instance
+ * @since 1.8
+ */
+ public static ForkJoinPool commonPool() {
+ // assert common != null : "static init error";
+ return common;
}
// Execution methods
@@ -2190,7 +2937,7 @@ public class ForkJoinPool extends AbstractExecutorService {
public <T> T invoke(ForkJoinTask<T> task) {
if (task == null)
throw new NullPointerException();
- doSubmit(task);
+ externalPush(task);
return task.join();
}
@@ -2205,7 +2952,7 @@ public class ForkJoinPool extends AbstractExecutorService {
public void execute(ForkJoinTask<?> task) {
if (task == null)
throw new NullPointerException();
- doSubmit(task);
+ externalPush(task);
}
// AbstractExecutorService methods
@@ -2223,7 +2970,7 @@ public class ForkJoinPool extends AbstractExecutorService {
job = (ForkJoinTask<?>) task;
else
job = new ForkJoinTask.AdaptedRunnableAction(task);
- doSubmit(job);
+ externalPush(job);
}
/**
@@ -2238,7 +2985,7 @@ public class ForkJoinPool extends AbstractExecutorService {
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
if (task == null)
throw new NullPointerException();
- doSubmit(task);
+ externalPush(task);
return task;
}
@@ -2249,7 +2996,7 @@ public class ForkJoinPool extends AbstractExecutorService {
*/
public <T> ForkJoinTask<T> submit(Callable<T> task) {
ForkJoinTask<T> job = new ForkJoinTask.AdaptedCallable<T>(task);
- doSubmit(job);
+ externalPush(job);
return job;
}
@@ -2260,7 +3007,7 @@ public class ForkJoinPool extends AbstractExecutorService {
*/
public <T> ForkJoinTask<T> submit(Runnable task, T result) {
ForkJoinTask<T> job = new ForkJoinTask.AdaptedRunnable<T>(task, result);
- doSubmit(job);
+ externalPush(job);
return job;
}
@@ -2277,7 +3024,7 @@ public class ForkJoinPool extends AbstractExecutorService {
job = (ForkJoinTask<?>) task;
else
job = new ForkJoinTask.AdaptedRunnableAction(task);
- doSubmit(job);
+ externalPush(job);
return job;
}
@@ -2289,27 +3036,23 @@ public class ForkJoinPool extends AbstractExecutorService {
// In previous versions of this class, this method constructed
// a task to run ForkJoinTask.invokeAll, but now external
// invocation of multiple tasks is at least as efficient.
- List<ForkJoinTask<T>> fs = new ArrayList<ForkJoinTask<T>>(tasks.size());
- // Workaround needed because method wasn't declared with
- // wildcards in return type but should have been.
- @SuppressWarnings({"unchecked", "rawtypes"})
- List<Future<T>> futures = (List<Future<T>>) (List) fs;
+ ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
boolean done = false;
try {
for (Callable<T> t : tasks) {
ForkJoinTask<T> f = new ForkJoinTask.AdaptedCallable<T>(t);
- doSubmit(f);
- fs.add(f);
+ futures.add(f);
+ externalPush(f);
}
- for (ForkJoinTask<T> f : fs)
- f.quietlyJoin();
+ for (int i = 0, size = futures.size(); i < size; i++)
+ ((ForkJoinTask<?>)futures.get(i)).quietlyJoin();
done = true;
return futures;
} finally {
if (!done)
- for (ForkJoinTask<T> f : fs)
- f.cancel(false);
+ for (int i = 0, size = futures.size(); i < size; i++)
+ futures.get(i).cancel(false);
}
}
@@ -2338,7 +3081,17 @@ public class ForkJoinPool extends AbstractExecutorService {
* @return the targeted parallelism level of this pool
*/
public int getParallelism() {
- return parallelism;
+ return config & SMASK;
+ }
+
+ /**
+ * Returns the targeted parallelism level of the common pool.
+ *
+ * @return the targeted parallelism level of the common pool
+ * @since 1.8
+ */
+ public static int getCommonPoolParallelism() {
+ return commonParallelism;
}
/**
@@ -2350,7 +3103,7 @@ public class ForkJoinPool extends AbstractExecutorService {
* @return the number of worker threads
*/
public int getPoolSize() {
- return parallelism + (short)(ctl >>> TC_SHIFT);
+ return (config & SMASK) + (short)(ctl >>> TC_SHIFT);
}
/**
@@ -2360,7 +3113,7 @@ public class ForkJoinPool extends AbstractExecutorService {
* @return {@code true} if this pool uses async mode
*/
public boolean getAsyncMode() {
- return localMode != 0;
+ return (config >>> 16) == FIFO_QUEUE;
}
/**
@@ -2391,7 +3144,7 @@ public class ForkJoinPool extends AbstractExecutorService {
* @return the number of active threads
*/
public int getActiveThreadCount() {
- int r = parallelism + (int)(ctl >> AC_SHIFT);
+ int r = (config & SMASK) + (int)(ctl >> AC_SHIFT);
return (r <= 0) ? 0 : r; // suppress momentarily negative values
}
@@ -2407,7 +3160,7 @@ public class ForkJoinPool extends AbstractExecutorService {
* @return {@code true} if all threads are currently idle
*/
public boolean isQuiescent() {
- return (int)(ctl >> AC_SHIFT) + parallelism == 0;
+ return (int)(ctl >> AC_SHIFT) + (config & SMASK) == 0;
}
/**
@@ -2422,12 +3175,12 @@ public class ForkJoinPool extends AbstractExecutorService {
* @return the number of steals
*/
public long getStealCount() {
- long count = stealCount.get();
+ long count = stealCount;
WorkQueue[] ws; WorkQueue w;
if ((ws = workQueues) != null) {
for (int i = 1; i < ws.length; i += 2) {
if ((w = ws[i]) != null)
- count += w.totalSteals;
+ count += w.nsteals;
}
}
return count;
@@ -2552,7 +3305,7 @@ public class ForkJoinPool extends AbstractExecutorService {
public String toString() {
// Use a single pass through workQueues to collect counts
long qt = 0L, qs = 0L; int rc = 0;
- long st = stealCount.get();
+ long st = stealCount;
long c = ctl;
WorkQueue[] ws; WorkQueue w;
if ((ws = workQueues) != null) {
@@ -2563,14 +3316,14 @@ public class ForkJoinPool extends AbstractExecutorService {
qs += size;
else {
qt += size;
- st += w.totalSteals;
+ st += w.nsteals;
if (w.isApparentlyUnblocked())
++rc;
}
}
}
}
- int pc = parallelism;
+ int pc = (config & SMASK);
int tc = pc + (short)(c >>> TC_SHIFT);
int ac = pc + (int)(c >> AC_SHIFT);
if (ac < 0) // ignore transient negative
@@ -2579,7 +3332,7 @@ public class ForkJoinPool extends AbstractExecutorService {
if ((c & STOP_BIT) != 0)
level = (tc == 0) ? "Terminated" : "Terminating";
else
- level = runState < 0 ? "Shutting down" : "Running";
+ level = plock < 0 ? "Shutting down" : "Running";
return super.toString() +
"[" + level +
", parallelism = " + pc +
@@ -2593,11 +3346,13 @@ public class ForkJoinPool extends AbstractExecutorService {
}
/**
- * Initiates an orderly shutdown in which previously submitted
- * tasks are executed, but no new tasks will be accepted.
- * Invocation has no additional effect if already shut down.
- * Tasks that are in the process of being submitted concurrently
- * during the course of this method may or may not be rejected.
+ * Possibly initiates an orderly shutdown in which previously
+ * submitted tasks are executed, but no new tasks will be
+ * accepted. Invocation has no effect on execution state if this
+ * is the {@link #commonPool()}, and no additional effect if
+ * already shut down. Tasks that are in the process of being
+ * submitted concurrently during the course of this method may or
+ * may not be rejected.
*
* @throws SecurityException if a security manager exists and
* the caller is not permitted to modify threads
@@ -2610,14 +3365,16 @@ public class ForkJoinPool extends AbstractExecutorService {
}
/**
- * Attempts to cancel and/or stop all tasks, and reject all
- * subsequently submitted tasks. Tasks that are in the process of
- * being submitted or executed concurrently during the course of
- * this method may or may not be rejected. This method cancels
- * both existing and unexecuted tasks, in order to permit
- * termination in the presence of task dependencies. So the method
- * always returns an empty list (unlike the case for some other
- * Executors).
+ * Possibly attempts to cancel and/or stop all tasks, and reject
+ * all subsequently submitted tasks. Invocation has no effect on
+ * execution state if this is the {@link #commonPool()}, and no
+ * additional effect if already shut down. Otherwise, tasks that
+ * are in the process of being submitted or executed concurrently
+ * during the course of this method may or may not be
+ * rejected. This method cancels both existing and unexecuted
+ * tasks, in order to permit termination in the presence of task
+ * dependencies. So the method always returns an empty list
+ * (unlike the case for some other Executors).
*
* @return an empty list
* @throws SecurityException if a security manager exists and
@@ -2639,7 +3396,7 @@ public class ForkJoinPool extends AbstractExecutorService {
public boolean isTerminated() {
long c = ctl;
return ((c & STOP_BIT) != 0L &&
- (short)(c >>> TC_SHIFT) == -parallelism);
+ (short)(c >>> TC_SHIFT) == -(config & SMASK));
}
/**
@@ -2647,7 +3404,7 @@ public class ForkJoinPool extends AbstractExecutorService {
* commenced but not yet completed. This method may be useful for
* debugging. A return of {@code true} reported a sufficient
* period after shutdown may indicate that submitted tasks have
- * ignored or suppressed interruption, or are waiting for IO,
+ * ignored or suppressed interruption, or are waiting for I/O,
* causing this executor not to properly terminate. (See the
* advisory notes for class {@link ForkJoinTask} stating that
* tasks should not normally entail blocking operations. But if
@@ -2658,7 +3415,7 @@ public class ForkJoinPool extends AbstractExecutorService {
public boolean isTerminating() {
long c = ctl;
return ((c & STOP_BIT) != 0L &&
- (short)(c >>> TC_SHIFT) != -parallelism);
+ (short)(c >>> TC_SHIFT) != -(config & SMASK));
}
/**
@@ -2667,13 +3424,16 @@ public class ForkJoinPool extends AbstractExecutorService {
* @return {@code true} if this pool has been shut down
*/
public boolean isShutdown() {
- return runState < 0;
+ return plock < 0;
}
/**
- * Blocks until all tasks have completed execution after a shutdown
- * request, or the timeout occurs, or the current thread is
- * interrupted, whichever happens first.
+ * Blocks until all tasks have completed execution after a
+ * shutdown request, or the timeout occurs, or the current thread
+ * is interrupted, whichever happens first. Because the {@link
+ * #commonPool()} never terminates until program shutdown, when
+ * applied to the common pool, this method is equivalent to {@link
+ * #awaitQuiescence} but always returns {@code false}.
*
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
@@ -2683,20 +3443,84 @@ public class ForkJoinPool extends AbstractExecutorService {
*/
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
+ if (Thread.interrupted())
+ throw new InterruptedException();
+ if (this == common) {
+ awaitQuiescence(timeout, unit);
+ return false;
+ }
long nanos = unit.toNanos(timeout);
- final Mutex lock = this.lock;
- lock.lock();
- try {
- for (;;) {
- if (isTerminated())
- return true;
- if (nanos <= 0)
+ if (isTerminated())
+ return true;
+ long startTime = System.nanoTime();
+ boolean terminated = false;
+ synchronized (this) {
+ for (long waitTime = nanos, millis = 0L;;) {
+ if (terminated = isTerminated() ||
+ waitTime <= 0L ||
+ (millis = unit.toMillis(waitTime)) <= 0L)
+ break;
+ wait(millis);
+ waitTime = nanos - (System.nanoTime() - startTime);
+ }
+ }
+ return terminated;
+ }
+
+ /**
+ * If called by a ForkJoinTask operating in this pool, equivalent
+ * in effect to {@link ForkJoinTask#helpQuiesce}. Otherwise,
+ * waits and/or attempts to assist performing tasks until this
+ * pool {@link #isQuiescent} or the indicated timeout elapses.
+ *
+ * @param timeout the maximum time to wait
+ * @param unit the time unit of the timeout argument
+ * @return {@code true} if quiescent; {@code false} if the
+ * timeout elapsed.
+ */
+ public boolean awaitQuiescence(long timeout, TimeUnit unit) {
+ long nanos = unit.toNanos(timeout);
+ ForkJoinWorkerThread wt;
+ Thread thread = Thread.currentThread();
+ if ((thread instanceof ForkJoinWorkerThread) &&
+ (wt = (ForkJoinWorkerThread)thread).pool == this) {
+ helpQuiescePool(wt.workQueue);
+ return true;
+ }
+ long startTime = System.nanoTime();
+ WorkQueue[] ws;
+ int r = 0, m;
+ boolean found = true;
+ while (!isQuiescent() && (ws = workQueues) != null &&
+ (m = ws.length - 1) >= 0) {
+ if (!found) {
+ if ((System.nanoTime() - startTime) > nanos)
return false;
- nanos = termination.awaitNanos(nanos);
+ Thread.yield(); // cannot block
+ }
+ found = false;
+ for (int j = (m + 1) << 2; j >= 0; --j) {
+ ForkJoinTask<?> t; WorkQueue q; int b;
+ if ((q = ws[r++ & m]) != null && (b = q.base) - q.top < 0) {
+ found = true;
+ if ((t = q.pollAt(b)) != null) {
+ if (q.base - q.top < 0)
+ signalWork(q);
+ t.doExec();
+ }
+ break;
+ }
}
- } finally {
- lock.unlock();
}
+ return true;
+ }
+
+ /**
+ * Waits and/or attempts to assist performing tasks indefinitely
+ * until the {@link #commonPool()} {@link #isQuiescent}.
+ */
+ static void quiesceCommonPool() {
+ common.awaitQuiescence(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
}
/**
@@ -2795,19 +3619,37 @@ public class ForkJoinPool extends AbstractExecutorService {
public static void managedBlock(ManagedBlocker blocker)
throws InterruptedException {
Thread t = Thread.currentThread();
- ForkJoinPool p = ((t instanceof ForkJoinWorkerThread) ?
- ((ForkJoinWorkerThread)t).pool : null);
- while (!blocker.isReleasable()) {
- if (p == null || p.tryCompensate(null, blocker)) {
- try {
- do {} while (!blocker.isReleasable() && !blocker.block());
- } finally {
- if (p != null)
+ if (t instanceof ForkJoinWorkerThread) {
+ ForkJoinPool p = ((ForkJoinWorkerThread)t).pool;
+ while (!blocker.isReleasable()) { // variant of helpSignal
+ WorkQueue[] ws; WorkQueue q; int m, u;
+ if ((ws = p.workQueues) != null && (m = ws.length - 1) >= 0) {
+ for (int i = 0; i <= m; ++i) {
+ if (blocker.isReleasable())
+ return;
+ if ((q = ws[i]) != null && q.base - q.top < 0) {
+ p.signalWork(q);
+ if ((u = (int)(p.ctl >>> 32)) >= 0 ||
+ (u >> UAC_SHIFT) >= 0)
+ break;
+ }
+ }
+ }
+ if (p.tryCompensate()) {
+ try {
+ do {} while (!blocker.isReleasable() &&
+ !blocker.block());
+ } finally {
p.incrementActiveCount();
+ }
+ break;
}
- break;
}
}
+ else {
+ do {} while (!blocker.isReleasable() &&
+ !blocker.block());
+ }
}
// AbstractExecutorService overrides. These rely on undocumented
@@ -2828,32 +3670,80 @@ public class ForkJoinPool extends AbstractExecutorService {
private static final long PARKBLOCKER;
private static final int ABASE;
private static final int ASHIFT;
+ private static final long STEALCOUNT;
+ private static final long PLOCK;
+ private static final long INDEXSEED;
+ private static final long QLOCK;
static {
- poolNumberGenerator = new AtomicInteger();
- nextSubmitterSeed = new AtomicInteger(0x55555555);
- modifyThreadPermission = new RuntimePermission("modifyThread");
- defaultForkJoinWorkerThreadFactory =
- new DefaultForkJoinWorkerThreadFactory();
- submitters = new ThreadSubmitter();
- int s;
+ // initialize field offsets for CAS etc
try {
U = getUnsafe();
Class<?> k = ForkJoinPool.class;
- Class<?> ak = ForkJoinTask[].class;
CTL = U.objectFieldOffset
(k.getDeclaredField("ctl"));
+ STEALCOUNT = U.objectFieldOffset
+ (k.getDeclaredField("stealCount"));
+ PLOCK = U.objectFieldOffset
+ (k.getDeclaredField("plock"));
+ INDEXSEED = U.objectFieldOffset
+ (k.getDeclaredField("indexSeed"));
Class<?> tk = Thread.class;
PARKBLOCKER = U.objectFieldOffset
(tk.getDeclaredField("parkBlocker"));
+ Class<?> wk = WorkQueue.class;
+ QLOCK = U.objectFieldOffset
+ (wk.getDeclaredField("qlock"));
+ Class<?> ak = ForkJoinTask[].class;
ABASE = U.arrayBaseOffset(ak);
- s = U.arrayIndexScale(ak);
+ int scale = U.arrayIndexScale(ak);
+ if ((scale & (scale - 1)) != 0)
+ throw new Error("data type scale not a power of two");
+ ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
} catch (Exception e) {
throw new Error(e);
}
- if ((s & (s-1)) != 0)
- throw new Error("data type scale not a power of two");
- ASHIFT = 31 - Integer.numberOfLeadingZeros(s);
+
+ submitters = new ThreadLocal<Submitter>();
+ ForkJoinWorkerThreadFactory fac = defaultForkJoinWorkerThreadFactory =
+ new DefaultForkJoinWorkerThreadFactory();
+ modifyThreadPermission = new RuntimePermission("modifyThread");
+
+ /*
+ * Establish common pool parameters. For extra caution,
+ * computations to set up common pool state are here; the
+ * constructor just assigns these values to fields.
+ */
+
+ int par = 0;
+ Thread.UncaughtExceptionHandler handler = null;
+ try { // TBD: limit or report ignored exceptions?
+ String pp = System.getProperty
+ ("java.util.concurrent.ForkJoinPool.common.parallelism");
+ String hp = System.getProperty
+ ("java.util.concurrent.ForkJoinPool.common.exceptionHandler");
+ String fp = System.getProperty
+ ("java.util.concurrent.ForkJoinPool.common.threadFactory");
+ if (fp != null)
+ fac = ((ForkJoinWorkerThreadFactory)ClassLoader.
+ getSystemClassLoader().loadClass(fp).newInstance());
+ if (hp != null)
+ handler = ((Thread.UncaughtExceptionHandler)ClassLoader.
+ getSystemClassLoader().loadClass(hp).newInstance());
+ if (pp != null)
+ par = Integer.parseInt(pp);
+ } catch (Exception ignore) {
+ }
+
+ if (par <= 0)
+ par = Runtime.getRuntime().availableProcessors();
+ if (par > MAX_CAP)
+ par = MAX_CAP;
+ commonParallelism = par;
+ long np = (long)(-par); // precompute initial ctl value
+ long ct = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
+
+ common = new ForkJoinPool(par, ct, fac, handler);
}
/**
@@ -2866,5 +3756,4 @@ public class ForkJoinPool extends AbstractExecutorService {
private static sun.misc.Unsafe getUnsafe() {
return scala.concurrent.util.Unsafe.instance;
}
-
}
diff --git a/src/forkjoin/scala/concurrent/forkjoin/ForkJoinTask.java b/src/forkjoin/scala/concurrent/forkjoin/ForkJoinTask.java
index 839fd26b39..fd1e132b07 100644
--- a/src/forkjoin/scala/concurrent/forkjoin/ForkJoinTask.java
+++ b/src/forkjoin/scala/concurrent/forkjoin/ForkJoinTask.java
@@ -5,6 +5,7 @@
*/
package scala.concurrent.forkjoin;
+
import java.io.Serializable;
import java.util.Collection;
import java.util.List;
@@ -29,15 +30,18 @@ import java.lang.reflect.Constructor;
* subtasks may be hosted by a small number of actual threads in a
* ForkJoinPool, at the price of some usage limitations.
*
- * <p>A "main" {@code ForkJoinTask} begins execution when submitted
- * to a {@link ForkJoinPool}. Once started, it will usually in turn
- * start other subtasks. As indicated by the name of this class,
- * many programs using {@code ForkJoinTask} employ only methods
- * {@link #fork} and {@link #join}, or derivatives such as {@link
+ * <p>A "main" {@code ForkJoinTask} begins execution when it is
+ * explicitly submitted to a {@link ForkJoinPool}, or, if not already
+ * engaged in a ForkJoin computation, commenced in the {@link
+ * ForkJoinPool#commonPool()} via {@link #fork}, {@link #invoke}, or
+ * related methods. Once started, it will usually in turn start other
+ * subtasks. As indicated by the name of this class, many programs
+ * using {@code ForkJoinTask} employ only methods {@link #fork} and
+ * {@link #join}, or derivatives such as {@link
* #invokeAll(ForkJoinTask...) invokeAll}. However, this class also
* provides a number of other methods that can come into play in
- * advanced usages, as well as extension mechanics that allow
- * support of new forms of fork/join processing.
+ * advanced usages, as well as extension mechanics that allow support
+ * of new forms of fork/join processing.
*
* <p>A {@code ForkJoinTask} is a lightweight form of {@link Future}.
* The efficiency of {@code ForkJoinTask}s stems from a set of
@@ -51,7 +55,7 @@ import java.lang.reflect.Constructor;
* minimize other blocking synchronization apart from joining other
* tasks or using synchronizers such as Phasers that are advertised to
* cooperate with fork/join scheduling. Subdividable tasks should also
- * not perform blocking IO, and should ideally access variables that
+ * not perform blocking I/O, and should ideally access variables that
* are completely independent of those accessed by other running
* tasks. These guidelines are loosely enforced by not permitting
* checked exceptions such as {@code IOExceptions} to be
@@ -69,10 +73,11 @@ import java.lang.reflect.Constructor;
* <p>It is possible to define and use ForkJoinTasks that may block,
* but doing do requires three further considerations: (1) Completion
* of few if any <em>other</em> tasks should be dependent on a task
- * that blocks on external synchronization or IO. Event-style async
- * tasks that are never joined often fall into this category. (2) To
- * minimize resource impact, tasks should be small; ideally performing
- * only the (possibly) blocking action. (3) Unless the {@link
+ * that blocks on external synchronization or I/O. Event-style async
+ * tasks that are never joined (for example, those subclassing {@link
+ * CountedCompleter}) often fall into this category. (2) To minimize
+ * resource impact, tasks should be small; ideally performing only the
+ * (possibly) blocking action. (3) Unless the {@link
* ForkJoinPool.ManagedBlocker} API is used, or the number of possibly
* blocked tasks is known to be less than the pool's {@link
* ForkJoinPool#getParallelism} level, the pool cannot guarantee that
@@ -121,13 +126,7 @@ import java.lang.reflect.Constructor;
* other actions. Normally, a concrete ForkJoinTask subclass declares
* fields comprising its parameters, established in a constructor, and
* then defines a {@code compute} method that somehow uses the control
- * methods supplied by this base class. While these methods have
- * {@code public} access (to allow instances of different task
- * subclasses to call each other's methods), some of them may only be
- * called from within other ForkJoinTasks (as may be determined using
- * method {@link #inForkJoinPool}). Attempts to invoke them in other
- * contexts result in exceptions or errors, possibly including {@code
- * ClassCastException}.
+ * methods supplied by this base class.
*
* <p>Method {@link #join} and its variants are appropriate for use
* only when completion dependencies are acyclic; that is, the
@@ -138,17 +137,16 @@ import java.lang.reflect.Constructor;
* {@link Phaser}, {@link #helpQuiesce}, and {@link #complete}) that
* may be of use in constructing custom subclasses for problems that
* are not statically structured as DAGs. To support such usages a
- * ForkJoinTask may be atomically <em>tagged</em> with a {@code
- * short} value using {@link #setForkJoinTaskTag} or {@link
+ * ForkJoinTask may be atomically <em>tagged</em> with a {@code short}
+ * value using {@link #setForkJoinTaskTag} or {@link
* #compareAndSetForkJoinTaskTag} and checked using {@link
- * #getForkJoinTaskTag}. The ForkJoinTask implementation does not
- * use these {@code protected} methods or tags for any purpose, but
- * they may be of use in the construction of specialized subclasses.
- * For example, parallel graph traversals can use the supplied methods
- * to avoid revisiting nodes/tasks that have already been processed.
- * Also, completion based designs can use them to record that subtasks
- * have completed. (Method names for tagging are bulky in part to
- * encourage definition of methods that reflect their usage patterns.)
+ * #getForkJoinTaskTag}. The ForkJoinTask implementation does not use
+ * these {@code protected} methods or tags for any purpose, but they
+ * may be of use in the construction of specialized subclasses. For
+ * example, parallel graph traversals can use the supplied methods to
+ * avoid revisiting nodes/tasks that have already been processed.
+ * (Method names for tagging are bulky in part to encourage definition
+ * of methods that reflect their usage patterns.)
*
* <p>Most base support methods are {@code final}, to prevent
* overriding of implementations that are intrinsically tied to the
@@ -286,8 +284,9 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
* @return status upon completion
*/
private int externalAwaitDone() {
- boolean interrupted = false;
int s;
+ ForkJoinPool.externalHelpJoin(this);
+ boolean interrupted = false;
while ((s = status) >= 0) {
if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
synchronized (this) {
@@ -315,6 +314,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
int s;
if (Thread.interrupted())
throw new InterruptedException();
+ ForkJoinPool.externalHelpJoin(this);
while ((s = status) >= 0) {
if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
synchronized (this) {
@@ -328,6 +328,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
return s;
}
+
/**
* Implementation for join, get, quietlyJoin. Directly handles
* only cases of already-completed, external wait, and
@@ -337,16 +338,12 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
*/
private int doJoin() {
int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
- if ((s = status) >= 0) {
- if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)) {
- if (!(w = (wt = (ForkJoinWorkerThread)t).workQueue).
- tryUnpush(this) || (s = doExec()) >= 0)
- s = wt.pool.awaitJoin(w, this);
- }
- else
- s = externalAwaitDone();
- }
- return s;
+ return (s = status) < 0 ? s :
+ ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
+ (w = (wt = (ForkJoinWorkerThread)t).workQueue).
+ tryUnpush(this) && (s = doExec()) < 0 ? s :
+ wt.pool.awaitJoin(w, this) :
+ externalAwaitDone();
}
/**
@@ -356,14 +353,10 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
*/
private int doInvoke() {
int s; Thread t; ForkJoinWorkerThread wt;
- if ((s = doExec()) >= 0) {
- if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
- s = (wt = (ForkJoinWorkerThread)t).pool.awaitJoin(wt.workQueue,
- this);
- else
- s = externalAwaitDone();
- }
- return s;
+ return (s = doExec()) < 0 ? s :
+ ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
+ (wt = (ForkJoinWorkerThread)t).pool.awaitJoin(wt.workQueue, this) :
+ externalAwaitDone();
}
// Exception table support
@@ -411,11 +404,11 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
}
/**
- * Records exception and sets exceptional completion.
+ * Records exception and sets status.
*
* @return status on exit
*/
- private int setExceptionalCompletion(Throwable ex) {
+ final int recordExceptionalCompletion(Throwable ex) {
int s;
if ((s = status) >= 0) {
int h = System.identityHashCode(this);
@@ -438,17 +431,25 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
}
s = setCompletion(EXCEPTIONAL);
}
- ForkJoinTask<?> p = internalGetCompleter(); // propagate
- if (p != null && p.status >= 0)
- p.setExceptionalCompletion(ex);
return s;
}
/**
- * Exception propagation support for tasks with completers.
+ * Records exception and possibly propagates.
+ *
+ * @return status on exit
*/
- ForkJoinTask<?> internalGetCompleter() {
- return null;
+ private int setExceptionalCompletion(Throwable ex) {
+ int s = recordExceptionalCompletion(ex);
+ if ((s & DONE_MASK) == EXCEPTIONAL)
+ internalPropagateException(ex);
+ return s;
+ }
+
+ /**
+ * Hook for exception propagation support for tasks with completers.
+ */
+ void internalPropagateException(Throwable ex) {
}
/**
@@ -467,7 +468,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
}
/**
- * Removes exception node and clears status
+ * Removes exception node and clears status.
*/
private void clearExceptionalCompletion() {
int h = System.identityHashCode(this);
@@ -595,7 +596,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
}
}
}
-
+
/**
* A version of "sneaky throw" to relay exceptions
*/
@@ -624,35 +625,35 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
* Throws exception, if any, associated with the given status.
*/
private void reportException(int s) {
- Throwable ex = ((s == CANCELLED) ? new CancellationException() :
- (s == EXCEPTIONAL) ? getThrowableException() :
- null);
- if (ex != null)
- ForkJoinTask.rethrow(ex);
+ if (s == CANCELLED)
+ throw new CancellationException();
+ if (s == EXCEPTIONAL)
+ rethrow(getThrowableException());
}
// public methods
/**
- * Arranges to asynchronously execute this task. While it is not
- * necessarily enforced, it is a usage error to fork a task more
- * than once unless it has completed and been reinitialized.
- * Subsequent modifications to the state of this task or any data
- * it operates on are not necessarily consistently observable by
- * any thread other than the one executing it unless preceded by a
- * call to {@link #join} or related methods, or a call to {@link
- * #isDone} returning {@code true}.
- *
- * <p>This method may be invoked only from within {@code
- * ForkJoinPool} computations (as may be determined using method
- * {@link #inForkJoinPool}). Attempts to invoke in other contexts
- * result in exceptions or errors, possibly including {@code
- * ClassCastException}.
+ * Arranges to asynchronously execute this task in the pool the
+ * current task is running in, if applicable, or using the {@link
+ * ForkJoinPool#commonPool()} if not {@link #inForkJoinPool}. While
+ * it is not necessarily enforced, it is a usage error to fork a
+ * task more than once unless it has completed and been
+ * reinitialized. Subsequent modifications to the state of this
+ * task or any data it operates on are not necessarily
+ * consistently observable by any thread other than the one
+ * executing it unless preceded by a call to {@link #join} or
+ * related methods, or a call to {@link #isDone} returning {@code
+ * true}.
*
* @return {@code this}, to simplify usage
*/
public final ForkJoinTask<V> fork() {
- ((ForkJoinWorkerThread)Thread.currentThread()).workQueue.push(this);
+ Thread t;
+ if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
+ ((ForkJoinWorkerThread)t).workQueue.push(this);
+ else
+ ForkJoinPool.common.externalPush(this);
return this;
}
@@ -702,12 +703,6 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
* cancelled, completed normally or exceptionally, or left
* unprocessed.
*
- * <p>This method may be invoked only from within {@code
- * ForkJoinPool} computations (as may be determined using method
- * {@link #inForkJoinPool}). Attempts to invoke in other contexts
- * result in exceptions or errors, possibly including {@code
- * ClassCastException}.
- *
* @param t1 the first task
* @param t2 the second task
* @throws NullPointerException if any task is null
@@ -733,12 +728,6 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
* related methods to check if they have been cancelled, completed
* normally or exceptionally, or left unprocessed.
*
- * <p>This method may be invoked only from within {@code
- * ForkJoinPool} computations (as may be determined using method
- * {@link #inForkJoinPool}). Attempts to invoke in other contexts
- * result in exceptions or errors, possibly including {@code
- * ClassCastException}.
- *
* @param tasks the tasks
* @throws NullPointerException if any task is null
*/
@@ -766,7 +755,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
}
}
if (ex != null)
- ForkJoinTask.rethrow(ex);
+ rethrow(ex);
}
/**
@@ -782,12 +771,6 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
* cancelled, completed normally or exceptionally, or left
* unprocessed.
*
- * <p>This method may be invoked only from within {@code
- * ForkJoinPool} computations (as may be determined using method
- * {@link #inForkJoinPool}). Attempts to invoke in other contexts
- * result in exceptions or errors, possibly including {@code
- * ClassCastException}.
- *
* @param tasks the collection of tasks
* @return the tasks argument, to simplify usage
* @throws NullPointerException if tasks or any element are null
@@ -823,7 +806,7 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
}
}
if (ex != null)
- ForkJoinTask.rethrow(ex);
+ rethrow(ex);
return tasks;
}
@@ -996,8 +979,9 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
if (Thread.interrupted())
throw new InterruptedException();
// Messy in part because we measure in nanosecs, but wait in millisecs
- int s; long ns, ms;
- if ((s = status) >= 0 && (ns = unit.toNanos(timeout)) > 0L) {
+ int s; long ms;
+ long ns = unit.toNanos(timeout);
+ if ((s = status) >= 0 && ns > 0L) {
long deadline = System.nanoTime() + ns;
ForkJoinPool p = null;
ForkJoinPool.WorkQueue w = null;
@@ -1006,16 +990,18 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t;
p = wt.pool;
w = wt.workQueue;
- s = p.helpJoinOnce(w, this); // no retries on failure
+ p.helpJoinOnce(w, this); // no retries on failure
}
+ else
+ ForkJoinPool.externalHelpJoin(this);
boolean canBlock = false;
boolean interrupted = false;
try {
while ((s = status) >= 0) {
- if (w != null && w.runState < 0)
+ if (w != null && w.qlock < 0)
cancelIgnoringExceptions(this);
else if (!canBlock) {
- if (p == null || p.tryCompensate(this, null))
+ if (p == null || p.tryCompensate())
canBlock = true;
}
else {
@@ -1083,17 +1069,15 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
* be of use in designs in which many tasks are forked, but none
* are explicitly joined, instead executing them until all are
* processed.
- *
- * <p>This method may be invoked only from within {@code
- * ForkJoinPool} computations (as may be determined using method
- * {@link #inForkJoinPool}). Attempts to invoke in other contexts
- * result in exceptions or errors, possibly including {@code
- * ClassCastException}.
*/
public static void helpQuiesce() {
- ForkJoinWorkerThread wt =
- (ForkJoinWorkerThread)Thread.currentThread();
- wt.pool.helpQuiescePool(wt.workQueue);
+ Thread t;
+ if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
+ ForkJoinWorkerThread wt = (ForkJoinWorkerThread)t;
+ wt.pool.helpQuiescePool(wt.workQueue);
+ }
+ else
+ ForkJoinPool.quiesceCommonPool();
}
/**
@@ -1146,23 +1130,19 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
/**
* Tries to unschedule this task for execution. This method will
- * typically succeed if this task is the most recently forked task
- * by the current thread, and has not commenced executing in
- * another thread. This method may be useful when arranging
- * alternative local processing of tasks that could have been, but
- * were not, stolen.
- *
- * <p>This method may be invoked only from within {@code
- * ForkJoinPool} computations (as may be determined using method
- * {@link #inForkJoinPool}). Attempts to invoke in other contexts
- * result in exceptions or errors, possibly including {@code
- * ClassCastException}.
+ * typically (but is not guaranteed to) succeed if this task is
+ * the most recently forked task by the current thread, and has
+ * not commenced executing in another thread. This method may be
+ * useful when arranging alternative local processing of tasks
+ * that could have been, but were not, stolen.
*
* @return {@code true} if unforked
*/
public boolean tryUnfork() {
- return ((ForkJoinWorkerThread)Thread.currentThread())
- .workQueue.tryUnpush(this);
+ Thread t;
+ return (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
+ ((ForkJoinWorkerThread)t).workQueue.tryUnpush(this) :
+ ForkJoinPool.tryExternalUnpush(this));
}
/**
@@ -1171,84 +1151,32 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
* value may be useful for heuristic decisions about whether to
* fork other tasks.
*
- * <p>This method may be invoked only from within {@code
- * ForkJoinPool} computations (as may be determined using method
- * {@link #inForkJoinPool}). Attempts to invoke in other contexts
- * result in exceptions or errors, possibly including {@code
- * ClassCastException}.
- *
* @return the number of tasks
*/
public static int getQueuedTaskCount() {
- return ((ForkJoinWorkerThread) Thread.currentThread())
- .workQueue.queueSize();
+ Thread t; ForkJoinPool.WorkQueue q;
+ if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
+ q = ((ForkJoinWorkerThread)t).workQueue;
+ else
+ q = ForkJoinPool.commonSubmitterQueue();
+ return (q == null) ? 0 : q.queueSize();
}
/**
* Returns an estimate of how many more locally queued tasks are
* held by the current worker thread than there are other worker
- * threads that might steal them. This value may be useful for
+ * threads that might steal them, or zero if this thread is not
+ * operating in a ForkJoinPool. This value may be useful for
* heuristic decisions about whether to fork other tasks. In many
* usages of ForkJoinTasks, at steady state, each worker should
* aim to maintain a small constant surplus (for example, 3) of
* tasks, and to process computations locally if this threshold is
* exceeded.
*
- * <p>This method may be invoked only from within {@code
- * ForkJoinPool} computations (as may be determined using method
- * {@link #inForkJoinPool}). Attempts to invoke in other contexts
- * result in exceptions or errors, possibly including {@code
- * ClassCastException}.
- *
* @return the surplus number of tasks, which may be negative
*/
public static int getSurplusQueuedTaskCount() {
- /*
- * The aim of this method is to return a cheap heuristic guide
- * for task partitioning when programmers, frameworks, tools,
- * or languages have little or no idea about task granularity.
- * In essence by offering this method, we ask users only about
- * tradeoffs in overhead vs expected throughput and its
- * variance, rather than how finely to partition tasks.
- *
- * In a steady state strict (tree-structured) computation,
- * each thread makes available for stealing enough tasks for
- * other threads to remain active. Inductively, if all threads
- * play by the same rules, each thread should make available
- * only a constant number of tasks.
- *
- * The minimum useful constant is just 1. But using a value of
- * 1 would require immediate replenishment upon each steal to
- * maintain enough tasks, which is infeasible. Further,
- * partitionings/granularities of offered tasks should
- * minimize steal rates, which in general means that threads
- * nearer the top of computation tree should generate more
- * than those nearer the bottom. In perfect steady state, each
- * thread is at approximately the same level of computation
- * tree. However, producing extra tasks amortizes the
- * uncertainty of progress and diffusion assumptions.
- *
- * So, users will want to use values larger, but not much
- * larger than 1 to both smooth over transient shortages and
- * hedge against uneven progress; as traded off against the
- * cost of extra task overhead. We leave the user to pick a
- * threshold value to compare with the results of this call to
- * guide decisions, but recommend values such as 3.
- *
- * When all threads are active, it is on average OK to
- * estimate surplus strictly locally. In steady-state, if one
- * thread is maintaining say 2 surplus tasks, then so are
- * others. So we can just use estimated queue length.
- * However, this strategy alone leads to serious mis-estimates
- * in some non-steady-state conditions (ramp-up, ramp-down,
- * other stalls). We can detect many of these by further
- * considering the number of "idle" threads, that are known to
- * have zero queued tasks, so compensate by a factor of
- * (#idle/#active) threads.
- */
- ForkJoinWorkerThread wt =
- (ForkJoinWorkerThread)Thread.currentThread();
- return wt.workQueue.queueSize() - wt.pool.idlePerActive();
+ return ForkJoinPool.getSurplusQueuedTaskCount();
}
// Extension methods
@@ -1299,59 +1227,51 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
* primarily to support extensions, and is unlikely to be useful
* otherwise.
*
- * <p>This method may be invoked only from within {@code
- * ForkJoinPool} computations (as may be determined using method
- * {@link #inForkJoinPool}). Attempts to invoke in other contexts
- * result in exceptions or errors, possibly including {@code
- * ClassCastException}.
- *
* @return the next task, or {@code null} if none are available
*/
protected static ForkJoinTask<?> peekNextLocalTask() {
- return ((ForkJoinWorkerThread) Thread.currentThread()).workQueue.peek();
+ Thread t; ForkJoinPool.WorkQueue q;
+ if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
+ q = ((ForkJoinWorkerThread)t).workQueue;
+ else
+ q = ForkJoinPool.commonSubmitterQueue();
+ return (q == null) ? null : q.peek();
}
/**
* Unschedules and returns, without executing, the next task
- * queued by the current thread but not yet executed. This method
- * is designed primarily to support extensions, and is unlikely to
- * be useful otherwise.
- *
- * <p>This method may be invoked only from within {@code
- * ForkJoinPool} computations (as may be determined using method
- * {@link #inForkJoinPool}). Attempts to invoke in other contexts
- * result in exceptions or errors, possibly including {@code
- * ClassCastException}.
+ * queued by the current thread but not yet executed, if the
+ * current thread is operating in a ForkJoinPool. This method is
+ * designed primarily to support extensions, and is unlikely to be
+ * useful otherwise.
*
* @return the next task, or {@code null} if none are available
*/
protected static ForkJoinTask<?> pollNextLocalTask() {
- return ((ForkJoinWorkerThread) Thread.currentThread())
- .workQueue.nextLocalTask();
+ Thread t;
+ return ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
+ ((ForkJoinWorkerThread)t).workQueue.nextLocalTask() :
+ null;
}
/**
- * Unschedules and returns, without executing, the next task
+ * If the current thread is operating in a ForkJoinPool,
+ * unschedules and returns, without executing, the next task
* queued by the current thread but not yet executed, if one is
* available, or if not available, a task that was forked by some
* other thread, if available. Availability may be transient, so a
- * {@code null} result does not necessarily imply quiescence
- * of the pool this task is operating in. This method is designed
+ * {@code null} result does not necessarily imply quiescence of
+ * the pool this task is operating in. This method is designed
* primarily to support extensions, and is unlikely to be useful
* otherwise.
*
- * <p>This method may be invoked only from within {@code
- * ForkJoinPool} computations (as may be determined using method
- * {@link #inForkJoinPool}). Attempts to invoke in other contexts
- * result in exceptions or errors, possibly including {@code
- * ClassCastException}.
- *
* @return a task, or {@code null} if none are available
*/
protected static ForkJoinTask<?> pollTask() {
- ForkJoinWorkerThread wt =
- (ForkJoinWorkerThread)Thread.currentThread();
- return wt.pool.nextTaskFor(wt.workQueue);
+ Thread t; ForkJoinWorkerThread wt;
+ return ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
+ (wt = (ForkJoinWorkerThread)t).pool.nextTaskFor(wt.workQueue) :
+ null;
}
// tag operations
@@ -1540,14 +1460,16 @@ public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
// Unsafe mechanics
private static final sun.misc.Unsafe U;
private static final long STATUS;
+
static {
exceptionTableLock = new ReentrantLock();
exceptionTableRefQueue = new ReferenceQueue<Object>();
exceptionTable = new ExceptionNode[EXCEPTION_MAP_CAPACITY];
try {
U = getUnsafe();
+ Class<?> k = ForkJoinTask.class;
STATUS = U.objectFieldOffset
- (ForkJoinTask.class.getDeclaredField("status"));
+ (k.getDeclaredField("status"));
} catch (Exception e) {
throw new Error(e);
}
diff --git a/src/forkjoin/scala/concurrent/forkjoin/ForkJoinWorkerThread.java b/src/forkjoin/scala/concurrent/forkjoin/ForkJoinWorkerThread.java
index 90a0af5723..e62fc6eb71 100644
--- a/src/forkjoin/scala/concurrent/forkjoin/ForkJoinWorkerThread.java
+++ b/src/forkjoin/scala/concurrent/forkjoin/ForkJoinWorkerThread.java
@@ -25,10 +25,17 @@ public class ForkJoinWorkerThread extends Thread {
* ForkJoinWorkerThreads are managed by ForkJoinPools and perform
* ForkJoinTasks. For explanation, see the internal documentation
* of class ForkJoinPool.
+ *
+ * This class just maintains links to its pool and WorkQueue. The
+ * pool field is set immediately upon construction, but the
+ * workQueue field is not set until a call to registerWorker
+ * completes. This leads to a visibility race, that is tolerated
+ * by requiring that the workQueue field is only accessed by the
+ * owning thread.
*/
- final ForkJoinPool.WorkQueue workQueue; // Work-stealing mechanics
final ForkJoinPool pool; // the pool this thread works in
+ final ForkJoinPool.WorkQueue workQueue; // work-stealing mechanics
/**
* Creates a ForkJoinWorkerThread operating in the given pool.
@@ -37,14 +44,10 @@ public class ForkJoinWorkerThread extends Thread {
* @throws NullPointerException if pool is null
*/
protected ForkJoinWorkerThread(ForkJoinPool pool) {
- super(pool.nextWorkerName());
- setDaemon(true);
- Thread.UncaughtExceptionHandler ueh = pool.ueh;
- if (ueh != null)
- setUncaughtExceptionHandler(ueh);
+ // Use a placeholder until a useful name can be set in registerWorker
+ super("aForkJoinWorkerThread");
this.pool = pool;
- pool.registerWorker(this.workQueue = new ForkJoinPool.WorkQueue
- (pool, this, pool.localMode));
+ this.workQueue = pool.registerWorker(this);
}
/**
@@ -116,4 +119,3 @@ public class ForkJoinWorkerThread extends Thread {
}
}
}
-
diff --git a/src/library/scala/PartialFunction.scala b/src/library/scala/PartialFunction.scala
index 7ff5a33586..9ff648a05a 100644
--- a/src/library/scala/PartialFunction.scala
+++ b/src/library/scala/PartialFunction.scala
@@ -81,7 +81,7 @@ trait PartialFunction[-A, +B] extends (A => B) { self =>
override def andThen[C](k: B => C): PartialFunction[A, C] =
new AndThen[A, B, C] (this, k)
- /** Turns this partial function into an plain function returning an `Option` result.
+ /** Turns this partial function into a plain function returning an `Option` result.
* @see Function.unlift
* @return a function that takes an argument `x` to `Some(this(x))` if `this`
* is defined for `x`, and to `None` otherwise.
diff --git a/src/library/scala/concurrent/impl/Future.scala b/src/library/scala/concurrent/impl/Future.scala
index 8c2a77c75f..89d10e5c47 100644
--- a/src/library/scala/concurrent/impl/Future.scala
+++ b/src/library/scala/concurrent/impl/Future.scala
@@ -28,7 +28,7 @@ private[concurrent] object Future {
def apply[T](body: =>T)(implicit executor: ExecutionContext): scala.concurrent.Future[T] = {
val runnable = new PromiseCompletingRunnable(body)
- executor.execute(runnable)
+ executor.prepare.execute(runnable)
runnable.promise.future
}
}
diff --git a/src/partest/scala/tools/partest/CompilerTest.scala b/src/partest/scala/tools/partest/CompilerTest.scala
index d73d99bc89..848deef8c5 100644
--- a/src/partest/scala/tools/partest/CompilerTest.scala
+++ b/src/partest/scala/tools/partest/CompilerTest.scala
@@ -19,7 +19,7 @@ abstract class CompilerTest extends DirectTest {
def check(source: String, unit: global.CompilationUnit): Unit
lazy val global: Global = newCompiler()
- lazy val units = compilationUnits(global)(sources: _ *)
+ lazy val units: List[global.CompilationUnit] = compilationUnits(global)(sources: _ *)
import global._
import definitions._
diff --git a/src/partest/scala/tools/partest/DirectTest.scala b/src/partest/scala/tools/partest/DirectTest.scala
index 483cb491a1..e2dac2fd55 100644
--- a/src/partest/scala/tools/partest/DirectTest.scala
+++ b/src/partest/scala/tools/partest/DirectTest.scala
@@ -7,7 +7,7 @@ package scala.tools.partest
import scala.tools.nsc._
import io.Directory
-import util.{BatchSourceFile, CommandLineParser}
+import util.{ SourceFile, BatchSourceFile, CommandLineParser }
import reporters.{Reporter, ConsoleReporter}
/** A class for testing code which is embedded as a string.
@@ -49,18 +49,32 @@ abstract class DirectTest extends App {
def reporter(settings: Settings): Reporter = new ConsoleReporter(settings)
- def newSources(sourceCodes: String*) = sourceCodes.toList.zipWithIndex map {
- case (src, idx) => new BatchSourceFile("newSource" + (idx + 1), src)
- }
+ private def newSourcesWithExtension(ext: String)(codes: String*): List[BatchSourceFile] =
+ codes.toList.zipWithIndex map {
+ case (src, idx) => new BatchSourceFile(s"newSource${idx + 1}.$ext", src)
+ }
+
+ def newJavaSources(codes: String*) = newSourcesWithExtension("java")(codes: _*)
+ def newSources(codes: String*) = newSourcesWithExtension("scala")(codes: _*)
+
def compileString(global: Global)(sourceCode: String): Boolean = {
withRun(global)(_ compileSources newSources(sourceCode))
!global.reporter.hasErrors
}
- def compilationUnits(global: Global)(sourceCodes: String*): List[global.CompilationUnit] = {
- val units = withRun(global) { run =>
- run compileSources newSources(sourceCodes: _*)
+
+ def javaCompilationUnits(global: Global)(sourceCodes: String*) = {
+ sourceFilesToCompiledUnits(global)(newJavaSources(sourceCodes: _*))
+ }
+
+ def sourceFilesToCompiledUnits(global: Global)(files: List[SourceFile]) = {
+ withRun(global) { run =>
+ run compileSources files
run.units.toList
}
+ }
+
+ def compilationUnits(global: Global)(sourceCodes: String*): List[global.CompilationUnit] = {
+ val units = sourceFilesToCompiledUnits(global)(newSources(sourceCodes: _*))
if (global.reporter.hasErrors) {
global.reporter.flush()
sys.error("Compilation failure.")
diff --git a/src/reflect/scala/reflect/internal/Flags.scala b/src/reflect/scala/reflect/internal/Flags.scala
index 86cbba9c50..5ebe02d95d 100644
--- a/src/reflect/scala/reflect/internal/Flags.scala
+++ b/src/reflect/scala/reflect/internal/Flags.scala
@@ -59,9 +59,9 @@ import scala.collection.{ mutable, immutable }
// 42: VBRIDGE
// 43: VARARGS
// 44: TRIEDCOOKING
-// 45:
-// 46:
-// 47:
+// 45: SYNCHRONIZED/M
+// 46: ARTIFACT
+// 47: DEFAULTMETHOD/M
// 48:
// 49:
// 50:
@@ -116,6 +116,8 @@ class ModifierFlags {
final val LAZY = 1L << 31 // symbol is a lazy val. can't have MUTABLE unless transformed by typer
final val PRESUPER = 1L << 37 // value is evaluated before super call
final val DEFAULTINIT = 1L << 41 // symbol is initialized to the default value: used by -Xcheckinit
+ // ARTIFACT at #46 in 2.11+
+ final val DEFAULTMETHOD = 1L << 47 // symbol is a java default method
// Overridden.
def flagToString(flag: Long): String = ""
@@ -239,7 +241,7 @@ class Flags extends ModifierFlags {
*/
final val ExplicitFlags =
PRIVATE | PROTECTED | ABSTRACT | FINAL | SEALED |
- OVERRIDE | CASE | IMPLICIT | ABSOVERRIDE | LAZY
+ OVERRIDE | CASE | IMPLICIT | ABSOVERRIDE | LAZY | DEFAULTMETHOD
/** The two bridge flags */
final val BridgeFlags = BRIDGE | VBRIDGE
@@ -421,7 +423,7 @@ class Flags extends ModifierFlags {
case TRIEDCOOKING => "<triedcooking>" // (1L << 44)
case SYNCHRONIZED => "<synchronized>" // (1L << 45)
case 0x400000000000L => "" // (1L << 46)
- case 0x800000000000L => "" // (1L << 47)
+ case DEFAULTMETHOD => "<defaultmethod>" // (1L << 47)
case 0x1000000000000L => "" // (1L << 48)
case 0x2000000000000L => "" // (1L << 49)
case 0x4000000000000L => "" // (1L << 50)
diff --git a/src/reflect/scala/reflect/internal/util/Statistics.scala b/src/reflect/scala/reflect/internal/util/Statistics.scala
index af4a0263ec..cbd27b0d65 100644
--- a/src/reflect/scala/reflect/internal/util/Statistics.scala
+++ b/src/reflect/scala/reflect/internal/util/Statistics.scala
@@ -103,7 +103,7 @@ quant)
r <- q :: q.children.toList if r.prefix.nonEmpty) yield r
private def showPercent(x: Double, base: Double) =
- if (base == 0) "" else f" (${x / base * 100}%2.1f%)"
+ if (base == 0) "" else f" (${x / base * 100}%2.1f%%)"
/** The base trait for quantities.
* Quantities with non-empty prefix are printed in the statistics info.
diff --git a/src/reflect/scala/reflect/macros/Enclosures.scala b/src/reflect/scala/reflect/macros/Enclosures.scala
index c48656b366..a4ad71c348 100644
--- a/src/reflect/scala/reflect/macros/Enclosures.scala
+++ b/src/reflect/scala/reflect/macros/Enclosures.scala
@@ -29,8 +29,12 @@ trait Enclosures {
*/
val enclosingMacros: List[Context]
- /** Types along with corresponding trees for which implicit arguments are currently searched.
+ /** Information about one of the currently considered implicit candidates.
+ * Candidates are used in plural form, because implicit parameters may themselves have implicit parameters,
+ * hence implicit searches can recursively trigger other implicit searches.
+ *
* Can be useful to get information about an application with an implicit parameter that is materialized during current macro expansion.
+ * If we're in an implicit macro being expanded, it's included in this list.
*
* Unlike `openImplicits`, this is a val, which means that it gets initialized when the context is created
* and always stays the same regardless of whatever happens during macro expansion.
diff --git a/src/reflect/scala/reflect/macros/Typers.scala b/src/reflect/scala/reflect/macros/Typers.scala
index 427e7854b2..d36636a6d2 100644
--- a/src/reflect/scala/reflect/macros/Typers.scala
+++ b/src/reflect/scala/reflect/macros/Typers.scala
@@ -24,8 +24,12 @@ trait Typers {
*/
def openMacros: List[Context]
- /** Types along with corresponding trees for which implicit arguments are currently searched.
+ /** Information about one of the currently considered implicit candidates.
+ * Candidates are used in plural form, because implicit parameters may themselves have implicit parameters,
+ * hence implicit searches can recursively trigger other implicit searches.
+ *
* Can be useful to get information about an application with an implicit parameter that is materialized during current macro expansion.
+ * If we're in an implicit macro being expanded, it's included in this list.
*
* Unlike `enclosingImplicits`, this is a def, which means that it gets recalculated on every invocation,
* so it might change depending on what is going on during macro expansion.
diff --git a/src/reflect/scala/reflect/runtime/ReflectionUtils.scala b/src/reflect/scala/reflect/runtime/ReflectionUtils.scala
index 7b093e0e80..33ad6d2430 100644
--- a/src/reflect/scala/reflect/runtime/ReflectionUtils.scala
+++ b/src/reflect/scala/reflect/runtime/ReflectionUtils.scala
@@ -28,15 +28,6 @@ private[scala] object ReflectionUtils {
case ex if pf isDefinedAt unwrapThrowable(ex) => pf(unwrapThrowable(ex))
}
- private def systemProperties: Iterator[(String, String)] = {
- import scala.collection.JavaConverters._
- System.getProperties.asScala.iterator
- }
-
- private def inferBootClasspath: String = (
- systemProperties find (_._1 endsWith ".boot.class.path") map (_._2) getOrElse ""
- )
-
def show(cl: ClassLoader): String = {
import scala.language.reflectiveCalls
@@ -51,7 +42,8 @@ private[scala] object ReflectionUtils {
case cl if cl != null && isAbstractFileClassLoader(cl.getClass) =>
cl.asInstanceOf[{val root: scala.reflect.io.AbstractFile}].root.canonicalPath
case null =>
- inferBootClasspath
+ val loadBootCp = (flavor: String) => scala.util.Properties.propOrNone(flavor + ".boot.class.path")
+ loadBootCp("sun") orElse loadBootCp("java") getOrElse "<unknown>"
case _ =>
"<unknown>"
}