Skip to content

Commit fdb1cb1

Browse files
committed
Adds ForkJoin feature
1 parent 32e7b53 commit fdb1cb1

File tree

2 files changed

+257
-0
lines changed

2 files changed

+257
-0
lines changed

README.md

+1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ Concurrency Patterns and features found in Java, through multithreaded programmi
2626
* [Atomics](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/features/atomics/UsingAtomics.java)
2727
* [Futures](https://github.com/LeonardoZ/java-concurrency-patterns/tree/master/src/main/java/br/com/leonardoz/features/futures)
2828
* [FutureTask](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/features/futures/UsingFutureTasks.java)
29+
* [Fork/Join Framework](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/features/forkjoin/UsingForkJoinFramework.java)
2930
* [CompletableFuture](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/features/futures/UsingCompletableFuture.java)
3031
* [Java Memory Model](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/features/java_memory_model/WhatIsJavaMemoryModel.java)
3132

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,256 @@
1+
package br.com.leonardoz.features.forkjoin;
2+
3+
import java.lang.Thread.UncaughtExceptionHandler;
4+
import java.math.BigInteger;
5+
import java.util.LinkedList;
6+
import java.util.List;
7+
import java.util.concurrent.ForkJoinPool;
8+
import java.util.concurrent.RecursiveAction;
9+
import java.util.concurrent.RecursiveTask;
10+
import java.util.concurrent.TimeUnit;
11+
12+
/**
13+
*
14+
* The Fork/Join Framework
15+
*
16+
* Parallelism is the execution of multiple tasks simultaneously. Fork Join
17+
* Framework helps with the execution of those tasks in parallel.
18+
*
19+
* Why?
20+
*
21+
* The Fork/Join approach speeds up the execution of a task that can be split
22+
* into subtasks/small tasks, be executing them in parallel and combining those
23+
* results.
24+
*
25+
* Limitations
26+
*
27+
* One requirement for using the Fork/Join Framework is that all of the Subtasks
28+
* must be "completable and independent" of each other to be truly parallel, so
29+
* not every problem can be solved using this method.
30+
*
31+
* How it works
32+
*
33+
* It uses a divide and conquer approach, dividing a major task into minor
34+
* subtasks recursively (Fork), until the division limit is reached and the
35+
* tasks can be solved, to be later combined (Join).
36+
*
37+
* For the execution of those tasks in parallel, the framework will use a thread
38+
* pool, which has, be default, the same size of the number of processors
39+
* available for the JVM.
40+
*
41+
* A thread from the pool has it's own double ended queue, for the matter of
42+
* storing all the tasks that are being executed/to be executed. The double
43+
* ended queue nature enables inserts or deletes to both the head and last
44+
* position of the queue.
45+
*
46+
* The work-stealing algorithm is the greatest functionality for the speed up
47+
* aspect of the ForkJoin Framework. The algorithm balances the workload between
48+
* threads, allowing the threads that doesn't have any task at the moment to
49+
* "steal" from last position of a thread's queue that can't process his own
50+
* last task at the moment. In theory, there will be more task being processed.
51+
*
52+
* Framework architecture overview
53+
*
54+
* - ForkJoinPool: Base class for the pools, used to balance tasks that can be
55+
* "work-stealed".
56+
*
57+
* - ForkJoinTask: Represents a task to be executed in a ForkJoinPool.
58+
*
59+
* - RecursiveTask: Specialization of ForkJoinTask, holds a result.
60+
*
61+
* - RecursiveAction: Specialization of ForkJoinTask, just process something
62+
* without yielding a result.
63+
*
64+
*
65+
* Workflow
66+
*
67+
* The idea is that you can split bigger tasks into smaller ones, until that the work
68+
* is small enough to be completed.
69+
*
70+
* the following algorithm describes how to use the ForkJoinFramework correctly.
71+
*
72+
* if (my task is small enough)
73+
*
74+
* complete my task
75+
*
76+
* else
77+
* split my task into two small tasks
78+
*
79+
* execute both tasks and wait for the results
80+
*
81+
*
82+
* Then do your work based on the result
83+
*
84+
*
85+
*/
86+
public class UsingForkJoinFramework {
87+
88+
/**
89+
* Common Pool
90+
*
91+
* Default instance of a fork join pool in a Java app, used by
92+
* CompletableFuture, and parallel streams. All threads used by the common pool
93+
* can be reused, released and reinstated after some time. This approach reduces
94+
* the resource consumption.
95+
*
96+
*/
97+
public ForkJoinPool getCommonPool() {
98+
ForkJoinPool commonPool = ForkJoinPool.commonPool();
99+
return commonPool;
100+
}
101+
102+
/**
103+
* Customize ForkJoinPool
104+
*
105+
* Parallelism: Parallelism level, default is Runtime#availableProcessors
106+
*
107+
* ForkJoinWorkerThreadFactory: Factory used for creating threads for the pool.
108+
*
109+
* UncaughtExceptionHandler: handles worker threads that terminates due some
110+
* "unrecoverable" problem.
111+
*
112+
* True-value AsyncMode: FIFO scheduling mode, used by tasks that are never
113+
* joined, like event-oriented asynchronous tasks.
114+
*
115+
*/
116+
public ForkJoinPool customForkJoinPool(int parallelism, ForkJoinPool.ForkJoinWorkerThreadFactory factory,
117+
UncaughtExceptionHandler handler, boolean asyncMode) {
118+
return new ForkJoinPool(parallelism, factory, handler, asyncMode);
119+
}
120+
121+
/**
122+
*
123+
* Tasks
124+
*
125+
* ForkJoinTask is the base type of a task. It represents a "lightweight
126+
* thread", with the ForkJoinPool being it's scheduler.
127+
*
128+
* RecursiveTask: Task that returns a value, result of a computation.
129+
*
130+
* RecursiveAction: Task that doesn't returns a value.
131+
*
132+
* Both can be used to implement the workflow algorithm described in the
133+
* Workflow section, with he aid of Fork and Join.
134+
*
135+
*/
136+
137+
/**
138+
* RecursiveTask
139+
*
140+
* Represents a result of a computation.
141+
*
142+
* In the example bellow, it follows the algorithm, partitioning the numbers
143+
* list in half, using fork and join to control the task flow.
144+
*
145+
*/
146+
static class RecSumTask extends RecursiveTask<BigInteger> {
147+
148+
private static final long serialVersionUID = 1L;
149+
public static final int DIVIDE_AT = 500;
150+
151+
private List<Integer> numbers;
152+
153+
public RecSumTask(List<Integer> numbers) {
154+
this.numbers = numbers;
155+
}
156+
157+
@Override
158+
protected BigInteger compute() {
159+
List<RecSumTask> subTasks = new LinkedList<>();
160+
if (numbers.size() < DIVIDE_AT) {
161+
// directly
162+
BigInteger subSum = BigInteger.ZERO;
163+
for (Integer number : numbers) {
164+
subSum = subSum.add(BigInteger.valueOf(number));
165+
}
166+
return subSum;
167+
} else {
168+
// Divide to conquer
169+
int size = numbers.size();
170+
List<Integer> numbersLeft = numbers.subList(0, size / 2);
171+
List<Integer> numbersRight = numbers.subList(size / 2, size);
172+
173+
RecSumTask recSumLeft = new RecSumTask(numbersLeft);
174+
RecSumTask recSumRight = new RecSumTask(numbersRight);
175+
176+
subTasks.add(recSumRight);
177+
subTasks.add(recSumLeft);
178+
179+
// Fork Child Tasks
180+
recSumLeft.fork();
181+
recSumRight.fork();
182+
}
183+
184+
BigInteger sum = BigInteger.ZERO;
185+
for (RecSumTask recSum : subTasks) {
186+
// Join Child Tasks
187+
sum = sum.add(recSum.join());
188+
}
189+
return sum;
190+
}
191+
}
192+
193+
public static void main(String[] args) {
194+
// prepares dataset for the example
195+
LinkedList<Integer> numbers = new LinkedList<>();
196+
for (int i = 0; i < 500_000; i++) {
197+
numbers.add(i);
198+
}
199+
200+
// Usage
201+
ForkJoinPool commonPool = ForkJoinPool.commonPool();
202+
RecSumTask task = new RecSumTask(numbers);
203+
BigInteger result = commonPool.invoke(task);
204+
System.out.println("Result is: " + result);
205+
try {
206+
commonPool.awaitTermination(4, TimeUnit.SECONDS);
207+
} catch (InterruptedException e) {
208+
e.printStackTrace();
209+
}
210+
System.out.println("\n\n");
211+
}
212+
213+
/**
214+
* RecursiveTask
215+
*
216+
* Represents a result of a computation, resembles RecursiveTask, but without
217+
* the return value.
218+
*
219+
*/
220+
static class ARecursiveAction extends RecursiveAction {
221+
222+
private static final long serialVersionUID = 1L;
223+
224+
@Override
225+
protected void compute() {
226+
// same pattern goes here
227+
}
228+
229+
}
230+
231+
/**
232+
* It's possible to extract informations about the pool's current state.
233+
*
234+
* Active thread count: Number of threads that are stealing or executing tasks.
235+
*
236+
* Pool size: Number of worker threads that are started but not terminated yet.
237+
*
238+
* Parallelism level: Equivalent to the number of available processors.
239+
*
240+
* Queue submitted tasks: Number of submitted tasks, but not executing. Steal
241+
* count:
242+
*
243+
* Number of stealed tasks from a thread to another, useful for monitoring.
244+
*
245+
*/
246+
public static void debugPool(ForkJoinPool commonPool) {
247+
System.out.println("Debuggin ForJoinPool");
248+
System.out.println("Active Thread Count: " + commonPool.getActiveThreadCount());
249+
System.out.println("Pool Size: " + commonPool.getPoolSize());
250+
System.out.println("Parallelism level: " + commonPool.getParallelism());
251+
System.out.println("Queue submitted tasks: " + commonPool.getQueuedSubmissionCount());
252+
System.out.println("Steal count: " + commonPool.getStealCount());
253+
System.out.println("\n");
254+
}
255+
256+
}

0 commit comments

Comments
 (0)