/* * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License version 2 only, as * published by the Free Software Foundation. Oracle designates this * particular file as subject to the "Classpath" exception as provided * by Oracle in the LICENSE file that accompanied this code. * * This code is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License * version 2 for more details (a copy is included in the LICENSE file that * accompanied this code). * * You should have received a copy of the GNU General Public License version * 2 along with this work; if not, write to the Free Software Foundation, * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. * * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA * or visit www.oracle.com if you need additional information or have any * questions. */ /* * This file is available under and governed by the GNU General Public * License version 2 only, as published by the Free Software Foundation. * However, the following notice accompanied the original version of this * file: * * Written by Doug Lea with assistance from members of JCP JSR-166 * Expert Group and released to the public domain, as explained at * http://creativecommons.org/publicdomain/zero/1.0/ */ package java.util.concurrent; import java.lang.Thread.UncaughtExceptionHandler; import java.lang.reflect.Field; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.function.Consumer; import java.util.function.Predicate; import java.util.concurrent.CountDownLatch; import java.util.concurrent.locks.LockSupport; import jdk.internal.access.JavaLangAccess; import jdk.internal.access.JavaUtilConcurrentFJPAccess; import jdk.internal.access.SharedSecrets; import jdk.internal.misc.Unsafe; import jdk.internal.vm.SharedThreadContainer; import static java.util.concurrent.DelayScheduler.ScheduledForkJoinTask; /** * An {@link ExecutorService} for running {@link ForkJoinTask}s. * A {@code ForkJoinPool} provides the entry point for submissions * from non-{@code ForkJoinTask} clients, as well as management and * monitoring operations. * *
A {@code ForkJoinPool} differs from other kinds of {@link * ExecutorService} mainly by virtue of employing * work-stealing: all threads in the pool attempt to find and * execute tasks submitted to the pool and/or created by other active * tasks (eventually blocking waiting for work if none exist). This * enables efficient processing when most tasks spawn other subtasks * (as do most {@code ForkJoinTask}s), as well as when many small * tasks are submitted to the pool from external clients. Especially * when setting asyncMode to true in constructors, {@code * ForkJoinPool}s may also be appropriate for use with event-style * tasks that are never joined. All worker threads are initialized * with {@link Thread#isDaemon} set {@code true}. * *
A static {@link #commonPool()} is available and appropriate for * most applications. The common pool is used by any ForkJoinTask that * is not explicitly submitted to a specified pool. Using the common * pool normally reduces resource usage (its threads are slowly * reclaimed during periods of non-use, and reinstated upon subsequent * use). * *
For applications that require separate or custom pools, a {@code * ForkJoinPool} may be constructed with a given target parallelism * level; by default, equal to the number of available processors. * The pool attempts to maintain enough active (or available) threads * by dynamically adding, suspending, or resuming internal worker * threads, even if some tasks are stalled waiting to join others. * However, no such adjustments are guaranteed in the face of blocked * I/O or other unmanaged synchronization. The nested {@link * ManagedBlocker} interface enables extension of the kinds of * synchronization accommodated. The default policies may be * overridden using a constructor with parameters corresponding to * those documented in class {@link ThreadPoolExecutor}. * *
In addition to execution and lifecycle control methods, this * class provides status check methods (for example * {@link #getStealCount}) that are intended to aid in developing, * tuning, and monitoring fork/join applications. Also, method * {@link #toString} returns indications of pool state in a * convenient form for informal monitoring. * *
As is the case with other ExecutorServices, there are three * main task execution methods summarized in the following table. * These are designed to be used primarily by clients not already * engaged in fork/join computations in the current pool. The main * forms of these methods accept instances of {@code ForkJoinTask}, * but overloaded forms also allow mixed execution of plain {@code * Runnable}- or {@code Callable}- based activities as well. However, * tasks that are already executing in a pool should normally instead * use the within-computation forms listed in the table unless using * async event-style tasks that are not usually joined, in which case * there is little difference among choice of methods. * *
* | Call from non-fork/join clients | *Call from within fork/join computations | *
---|---|---|
Arrange async execution | *{@link #execute(ForkJoinTask)} | *{@link ForkJoinTask#fork} | *
Await and obtain result | *{@link #invoke(ForkJoinTask)} | *{@link ForkJoinTask#invoke} | *
Arrange exec and obtain Future | *{@link #submit(ForkJoinTask)} | *{@link ForkJoinTask#fork} (ForkJoinTasks are Futures) | *
Additionally, this class supports {@link * ScheduledExecutorService} methods to delay or periodically execute * tasks, as well as method {@link #submitWithTimeout} to cancel tasks * that take too long. The scheduled functions or actions may create * and invoke other {@linkplain ForkJoinTask ForkJoinTasks}. Delayed * actions become enabled and behave as ordinary submitted * tasks when their delays elapse. Scheduling methods return * {@linkplain ForkJoinTask ForkJoinTasks} that implement the {@link * ScheduledFuture} interface. Resource exhaustion encountered after * initial submission results in task cancellation. When time-based * methods are used, shutdown policies match the default policies of * class {@link ScheduledThreadPoolExecutor}: upon {@link #shutdown}, * existing periodic tasks will not re-execute, and the pool * terminates when quiescent and existing delayed tasks * complete. Method {@link #cancelDelayedTasksOnShutdown} may be used * to disable all delayed tasks upon shutdown, and method {@link * #shutdownNow} may be used to instead unconditionally initiate pool * termination. Monitoring methods such as {@link #getQueuedTaskCount} * do not include scheduled tasks that are not yet enabled to execute, * which are reported separately by method {@link * #getDelayedTaskCount}. * *
The parameters used to construct the common pool may be controlled by * setting the following {@linkplain System#getProperty system properties}: *
The sequence of task executions continues indefinitely until * one of the following exceptional completions occur: *
If any execution of this task takes longer than its period, then
* subsequent executions may start late, but will not concurrently
* execute.
* @param command the task to execute
* @param initialDelay the time to delay first execution
* @param period the period between successive executions
* @param unit the time unit of the initialDelay and period parameters
* @return a ForkJoinTask implementing the ScheduledFuture
* interface. The future's {@link Future#get() get()}
* method will never return normally, and will throw an
* exception upon task cancellation or abnormal
* termination of a task execution.
* @throws RejectedExecutionException if the pool is shutdown or
* submission encounters resource exhaustion.
* @throws NullPointerException if command or unit is null
* @throws IllegalArgumentException if period less than or equal to zero
* @since 25
*/
public ScheduledFuture> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period, TimeUnit unit) {
if (period <= 0L)
throw new IllegalArgumentException();
return scheduleDelayedTask(
new ScheduledForkJoinTask The sequence of task executions continues indefinitely until
* one of the following exceptional completions occur:
* If already terminated, or this is the {@link
* #commonPool()}, this method has no effect on execution, and
* does not wait. Otherwise, if interrupted while waiting, this
* method stops all executing tasks as if by invoking {@link
* #shutdownNow()}. It then continues to wait until all actively
* executing tasks have completed. Tasks that were awaiting
* execution are not executed. The interrupt status will be
* re-asserted before this method returns.
*
* @since 19
*/
@Override
public void close() {
if (workerNamePrefix != null) {
CountDownLatch done = null;
boolean interrupted = false;
while ((tryTerminate(interrupted, true) & TERMINATED) == 0) {
if (done == null)
done = terminationSignal();
else {
try {
done.await();
break;
} catch (InterruptedException ex) {
interrupted = true;
}
}
}
if (interrupted)
Thread.currentThread().interrupt();
}
}
/**
* Interface for extending managed parallelism for tasks running
* in {@link ForkJoinPool}s.
*
* A {@code ManagedBlocker} provides two methods. Method
* {@link #isReleasable} must return {@code true} if blocking is
* not necessary. Method {@link #block} blocks the current thread
* if necessary (perhaps internally invoking {@code isReleasable}
* before actually blocking). These actions are performed by any
* thread invoking {@link
* ForkJoinPool#managedBlock(ManagedBlocker)}. The unusual
* methods in this API accommodate synchronizers that may, but
* don't usually, block for long periods. Similarly, they allow
* more efficient internal handling of cases in which additional
* workers may be, but usually are not, needed to ensure
* sufficient parallelism. Toward this end, implementations of
* method {@code isReleasable} must be amenable to repeated
* invocation. Neither method is invoked after a prior invocation
* of {@code isReleasable} or {@code block} returns {@code true}.
*
* For example, here is a ManagedBlocker based on a
* ReentrantLock:
* Here is a class that possibly blocks waiting for an
* item on a given queue:
* This method repeatedly calls {@code blocker.isReleasable()} and
* {@code blocker.block()} until either method returns {@code true}.
* Every call to {@code blocker.block()} is preceded by a call to
* {@code blocker.isReleasable()} that returned {@code false}.
*
* If not running in a ForkJoinPool, this method is
* behaviorally equivalent to
*
*
* Subsequent executions are suppressed. Subsequent calls to
* {@link Future#isDone isDone()} on the returned future will
* return {@code true}.
* @param command the task to execute
* @param initialDelay the time to delay first execution
* @param delay the delay between the termination of one
* execution and the commencement of the next
* @param unit the time unit of the initialDelay and delay parameters
* @return a ForkJoinTask implementing the ScheduledFuture
* interface. The future's {@link Future#get() get()}
* method will never return normally, and will throw an
* exception upon task cancellation or abnormal
* termination of a task execution.
* @throws RejectedExecutionException if the pool is shutdown or
* submission encounters resource exhaustion.
* @throws NullPointerException if command or unit is null
* @throws IllegalArgumentException if delay less than or equal to zero
* @since 25
*/
public ScheduledFuture> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay, TimeUnit unit) {
if (delay <= 0L)
throw new IllegalArgumentException();
return scheduleDelayedTask(
new ScheduledForkJoinTask {@code
* class ManagedLocker implements ManagedBlocker {
* final ReentrantLock lock;
* boolean hasLock = false;
* ManagedLocker(ReentrantLock lock) { this.lock = lock; }
* public boolean block() {
* if (!hasLock)
* lock.lock();
* return true;
* }
* public boolean isReleasable() {
* return hasLock || (hasLock = lock.tryLock());
* }
* }}
*
* {@code
* class QueueTaker
*/
public static interface ManagedBlocker {
/**
* Possibly blocks the current thread, for example waiting for
* a lock or condition.
*
* @return {@code true} if no additional blocking is necessary
* (i.e., if isReleasable would return true)
* @throws InterruptedException if interrupted while waiting
* (the method is not required to do so, but is allowed to)
*/
boolean block() throws InterruptedException;
/**
* Returns {@code true} if blocking is unnecessary.
* @return {@code true} if blocking is unnecessary
*/
boolean isReleasable();
}
/**
* Runs the given possibly blocking task. When {@linkplain
* ForkJoinTask#inForkJoinPool() running in a ForkJoinPool}, this
* method possibly arranges for a spare thread to be activated if
* necessary to ensure sufficient parallelism while the current
* thread is blocked in {@link ManagedBlocker#block blocker.block()}.
*
* {@code
* while (!blocker.isReleasable())
* if (blocker.block())
* break;}
*
* If running in a ForkJoinPool, the pool may first be expanded to
* ensure sufficient parallelism available during the call to
* {@code blocker.block()}.
*
* @param blocker the blocker task
* @throws InterruptedException if {@code blocker.block()} did so
*/
public static void managedBlock(ManagedBlocker blocker)
throws InterruptedException {
Thread t; ForkJoinPool p;
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread &&
(p = ((ForkJoinWorkerThread)t).pool) != null)
p.compensatedBlock(blocker);
else
unmanagedBlock(blocker);
}
/** ManagedBlock for ForkJoinWorkerThreads */
private void compensatedBlock(ManagedBlocker blocker)
throws InterruptedException {
Objects.requireNonNull(blocker);
for (;;) {
int comp; boolean done;
long c = ctl;
if (blocker.isReleasable())
break;
if ((runState & STOP) != 0L)
throw new InterruptedException();
if ((comp = tryCompensate(c)) >= 0) {
try {
done = blocker.block();
} finally {
if (comp > 0)
getAndAddCtl(RC_UNIT);
}
if (done)
break;
}
}
}
/**
* Invokes tryCompensate to create or re-activate a spare thread to
* compensate for a thread that performs a blocking operation. When the
* blocking operation is done then endCompensatedBlock must be invoked
* with the value returned by this method to re-adjust the parallelism.
* @return value to use in endCompensatedBlock
*/
final long beginCompensatedBlock() {
int c;
do {} while ((c = tryCompensate(ctl)) < 0);
return (c == 0) ? 0L : RC_UNIT;
}
/**
* Re-adjusts parallelism after a blocking operation completes.
* @param post value from beginCompensatedBlock
*/
void endCompensatedBlock(long post) {
if (post > 0L) {
getAndAddCtl(post);
}
}
/** ManagedBlock for external threads */
private static void unmanagedBlock(ManagedBlocker blocker)
throws InterruptedException {
Objects.requireNonNull(blocker);
do {} while (!blocker.isReleasable() && !blocker.block());
}
@Override
protected