diff --git a/README.md b/README.md index 3db4ee8..e7d1400 100644 --- a/README.md +++ b/README.md @@ -3,52 +3,58 @@ Concurrency Patterns and features found in Java, through multithreaded programming. ## Features: -* Threads and Runnables -* Locks - * Intrinsic +* [Threads and Runnables](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/features/threads/UsingThreads.java) +* [Locks](https://github.com/LeonardoZ/java-concurrency-patterns/tree/master/src/main/java/br/com/leonardoz/features/locks) + * [Intrinsic](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/features/locks/UsingIntrinsicLocks.java) * Explicit - * Reentrant - * ReadWrite -* Synchronizers - * Latches - * Semaphores - * Barriers -* Synchronized Collections -* Concurrent Collections - * CopyOnWriteArrayList - * ConcurrentHashMap - * Blocking Queue -* Executors - * Fixed Thread Pool - * Cached Thread Pool - * Single Thread Pool - * Scheduled Thread Pool -* Atomics -* Futures - * FutureTask - * CompletableFuture -* Java Memory Model + * [Reentrant](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/features/locks/UsingExplicitReentrantLocks.java) + * [ReadWrite](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/features/locks/UsingExplicitReadWriteLocks.java) +* [Synchronizers](https://github.com/LeonardoZ/java-concurrency-patterns/tree/master/src/main/java/br/com/leonardoz/features/synchronizers) + * [Latches](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/features/synchronizers/UsingLatches.java) + * [Semaphores](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/features/synchronizers/UsingSemaphores.java) + * [Barriers](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/features/synchronizers/UsingBarriers.java) +* [Synchronized Collections](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/features/collections/UsingSynchronizedCollections.java) +* [Concurrent Collections](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/features/collections/UsingConcurrentCollections.java) + * [CopyOnWriteArrayList](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/features/collections/UsingConcurrentCollections.java#L89) + * [ConcurrentHashMap](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/features/collections/UsingConcurrentCollections.java#L40) + * [Blocking Queue](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/features/collections/UsingConcurrentCollections.java#L141) +* [Executors](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/features/executors/UsingExecutors.java) + * [Fixed Thread Pool](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/features/executors/UsingExecutors.java#L94) + * [Cached Thread Pool](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/features/executors/UsingExecutors.java#L65) + * [Single Thread Pool](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/features/executors/UsingExecutors.java#L51) + * [Scheduled Thread Pool](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/features/executors/UsingExecutors.java#L122) + * [Single Thread Scheduled Pool](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/features/executors/UsingExecutors.java#L139) + * [Work-Stealing Pool](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/features/executors/UsingExecutors.java#L156) +* [Atomics](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/features/atomics/UsingAtomics.java) +* [Futures](https://github.com/LeonardoZ/java-concurrency-patterns/tree/master/src/main/java/br/com/leonardoz/features/futures) + * [FutureTask](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/features/futures/UsingFutureTasks.java) + * [CompletableFuture](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/features/futures/UsingCompletableFuture.java) +* [Fork/Join Framework](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/features/forkjoin/UsingForkJoinFramework.java) +* [Parallel Streams](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/features/parallel_stream/UsingParallelStreams.java) +* [Java Memory Model](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/features/java_memory_model/WhatIsJavaMemoryModel.java) ## Patterns -* Protect Shared State -* Lock Split -* Protecting Composed Actions -* Fixed Lock Ordering -* Thread Local Confinement -* Immutable Object -* Safe Instantiation -* Safe Publication -* Interruption -* Resource Pool -* Condition Queues (wait-notify / await-signal) -* Background Task Executor -* Task Cancel -* Producer-Consumer -* Task Convergence -* Non-Blocking with Atomics -* Controlled Concurrent Initialization +* [Protect Shared State](https://github.com/LeonardoZ/java-concurrency-patterns/tree/master/src/main/java/br/com/leonardoz/patterns/thread_safe/shared_state) +* [Atomic Compound Actions](https://github.com/LeonardoZ/java-concurrency-patterns/tree/master/src/main/java/br/com/leonardoz/patterns/thread_safe/compound_actions) +* [Lock Split](https://github.com/LeonardoZ/java-concurrency-patterns/tree/master/src/main/java/br/com/leonardoz/patterns/thread_safe/lock_split) +* [Fixed Lock Ordering](https://github.com/LeonardoZ/java-concurrency-patterns/tree/master/src/main/java/br/com/leonardoz/patterns/fixed_lock_ordering) +* [Thread Local Confinement](https://github.com/LeonardoZ/java-concurrency-patterns/tree/master/src/main/java/br/com/leonardoz/patterns/thread_safe/thread_confinement) +* [Immutable Object](https://github.com/LeonardoZ/java-concurrency-patterns/tree/master/src/main/java/br/com/leonardoz/patterns/thread_safe/immutable_object) +* [Safe Lazy Initialization](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/patterns/thread_safe/initialization/SafeInitializationHolder.java) +* [Safe Publishing](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/patterns/thread_safe/publishing/SafePublishing.java) +* [Resource Pool](https://github.com/LeonardoZ/java-concurrency-patterns/tree/master/src/main/java/br/com/leonardoz/patterns/resource_pool) +* [Condition Queues](https://github.com/LeonardoZ/java-concurrency-patterns/tree/master/src/main/java/br/com/leonardoz/patterns/condition_queues) + * [wait-notify](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/patterns/condition_queues/WaitNotifyQueue.java) + * [await-signal](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/patterns/condition_queues/ExplicitConditionQueue.java) +* [Background Task Executor](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/patterns/task_execution/BackgroundTaskExecutor.java) +* [Task Cancel](https://github.com/LeonardoZ/java-concurrency-patterns/tree/master/src/main/java/br/com/leonardoz/patterns/task_cancel) +* [Producer-Consumer](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/patterns/producer_consumer/ProducerConsumer.java) +* [Task Convergence](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/patterns/task_convergence/TaskConvergence.java) +* [Non-Blocking with Atomics](https://github.com/LeonardoZ/java-concurrency-patterns/tree/master/src/main/java/br/com/leonardoz/patterns/non_blocking) +* [Controlled Concurrent Initialization](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/patterns/controlled_initialization/ControlledInitialization.java) +* [Parallel Divide and Conquer](https://github.com/LeonardoZ/java-concurrency-patterns/blob/master/src/main/java/br/com/leonardoz/patterns/divideconquer) ## About Patterns and Algorithms inspired by the Java Concurrency in Practice book. - \ No newline at end of file + diff --git a/pom.xml b/pom.xml index 0052648..8f27a3d 100644 --- a/pom.xml +++ b/pom.xml @@ -14,8 +14,8 @@ 3.7.0 - 1.8 - 1.8 + 11 + 11 diff --git a/src/main/java/br/com/leonardoz/features/atomics/UsingAtomics.java b/src/main/java/br/com/leonardoz/features/atomics/UsingAtomics.java index e62cbae..97ff747 100644 --- a/src/main/java/br/com/leonardoz/features/atomics/UsingAtomics.java +++ b/src/main/java/br/com/leonardoz/features/atomics/UsingAtomics.java @@ -1,6 +1,5 @@ package br.com.leonardoz.features.atomics; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -47,13 +46,13 @@ public int get() { } public static void main(String[] args) throws InterruptedException { - AtomicCounter counter = new AtomicCounter(); - ExecutorService ctp = Executors.newCachedThreadPool(); + var counter = new AtomicCounter(); + var cachedThreadPool = Executors.newCachedThreadPool(); for (int i = 0; i < 10_000; i++) { - ctp.execute(() -> counter.increment()); + cachedThreadPool.execute(() -> counter.increment()); } - ctp.shutdown(); - ctp.awaitTermination(4000, TimeUnit.SECONDS); + cachedThreadPool.shutdown(); + cachedThreadPool.awaitTermination(4000, TimeUnit.SECONDS); System.out.println("Result shound be 10000: Actual result is: " + counter.get()); } } diff --git a/src/main/java/br/com/leonardoz/features/collections/UsingConcurrentCollections.java b/src/main/java/br/com/leonardoz/features/collections/UsingConcurrentCollections.java index cfab0f1..5073cac 100644 --- a/src/main/java/br/com/leonardoz/features/collections/UsingConcurrentCollections.java +++ b/src/main/java/br/com/leonardoz/features/collections/UsingConcurrentCollections.java @@ -2,10 +2,8 @@ import java.util.Random; import java.util.UUID; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @@ -38,10 +36,10 @@ public class UsingConcurrentCollections { * - Just a few Writers can modify it. */ public static void usingConcurrentHashMap() { - ExecutorService executor = Executors.newCachedThreadPool(); System.out.println("=== ConcurrentHashMap ==="); - Random random = new Random(); - ConcurrentHashMap valuesPerUuid = new ConcurrentHashMap<>(); + var executor = Executors.newCachedThreadPool(); + var random = new Random(); + var valuesPerUuid = new ConcurrentHashMap(); // atomic operations valuesPerUuid.putIfAbsent(UUID.randomUUID(), random.nextInt(100)); @@ -87,28 +85,28 @@ public static void usingConcurrentHashMap() { * */ public static void usingCopyOnWriteArrayList() { - ExecutorService executor = Executors.newCachedThreadPool(); System.out.println("=== CopyOnWriteArrayList ==="); - Random random = new Random(); + var executor = Executors.newCachedThreadPool(); + var random = new Random(); // No ConcurrentModificationException - CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList(); + var copyOnWriteArrayList = new CopyOnWriteArrayList(); for (int i = 0; i < 100; i++) { if (i % 8 == 0) { // write executor.execute(() -> { - Integer value = random.nextInt(10); + var value = random.nextInt(10); System.err.println("Added " + value); copyOnWriteArrayList.add(value); }); } else { // read executor.execute(() -> { - StringBuilder sb = new StringBuilder(); - for (Integer vv : copyOnWriteArrayList) { - sb.append(vv + " "); + var builder = new StringBuilder(); + for (var value : copyOnWriteArrayList) { + builder.append(value + " "); } - System.out.println("Reading " + sb.toString()); + System.out.println("Reading " + builder.toString()); }); } } @@ -142,7 +140,7 @@ public static void usingBlockingQueue() { System.out.println("=== BlockingQueue ==="); // Bounded UUID queue - BlockingQueue uuidQueue = new LinkedBlockingQueue<>(10); + var uuidQueue = new LinkedBlockingQueue(10); System.out.println("Queue will execute for 10s"); @@ -150,18 +148,20 @@ public static void usingBlockingQueue() { Runnable runConsumer = () -> { while (!Thread.currentThread().isInterrupted()) { try { - UUID uuid = uuidQueue.take(); + var uuid = uuidQueue.take(); System.out.println("Consumed: " + uuid + " by " + Thread.currentThread().getName()); } catch (InterruptedException e) { // interrupted pattern + // InterruptedException makes isInterrupted returns false + Thread.currentThread().interrupt(); System.err.println("Consumer Finished"); } } }; - Thread consumer1 = new Thread(runConsumer); + var consumer1 = new Thread(runConsumer); consumer1.start(); - Thread consumer2 = new Thread(runConsumer); + var consumer2 = new Thread(runConsumer); consumer2.start(); // Producer Thread @@ -181,12 +181,12 @@ public static void usingBlockingQueue() { } }; - // Multiple producers - Examples using simple threads this time. - Thread producer1 = new Thread(runProducer); + // Multiple producers - Examples using simple threads this time. + var producer1 = new Thread(runProducer); producer1.start(); - Thread producer2 = new Thread(runProducer); + var producer2 = new Thread(runProducer); producer2.start(); - Thread producer3 = new Thread(runProducer); + var producer3 = new Thread(runProducer); producer3.start(); try { diff --git a/src/main/java/br/com/leonardoz/features/collections/UsingSynchronizedCollections.java b/src/main/java/br/com/leonardoz/features/collections/UsingSynchronizedCollections.java index 0b89058..fb24ce6 100644 --- a/src/main/java/br/com/leonardoz/features/collections/UsingSynchronizedCollections.java +++ b/src/main/java/br/com/leonardoz/features/collections/UsingSynchronizedCollections.java @@ -1,7 +1,6 @@ package br.com.leonardoz.features.collections; import java.util.Vector; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -30,7 +29,7 @@ public class UsingSynchronizedCollections { */ public static void insertIfAbsent(Vector list, Long value) { synchronized (list) { - boolean contains = list.contains(value); + var contains = list.contains(value); if (!contains) { list.add(value); System.out.println("Value added: " + value); @@ -43,7 +42,7 @@ public static void insertIfAbsent(Vector list, Long value) { * results */ public static void insertIfAbsentUnsafe(Vector list, Long value) { - boolean contains = list.contains(value); + var contains = list.contains(value); if (!contains) { list.add(value); System.out.println("Value added: " + value); @@ -51,13 +50,13 @@ public static void insertIfAbsentUnsafe(Vector list, Long value) { } public static void main(String[] args) throws InterruptedException { - ExecutorService executor = Executors.newCachedThreadPool(); + var executor = Executors.newCachedThreadPool(); // Synchronized - Vector - Vector vec = new Vector<>(); + var vector = new Vector(); Runnable insertIfAbsent = () -> { long millis = System.currentTimeMillis() / 1000; - insertIfAbsent(vec, millis); + insertIfAbsent(vector, millis); }; for (int i = 0; i < 10001; i++) { executor.execute(insertIfAbsent); diff --git a/src/main/java/br/com/leonardoz/features/executors/UsingExecutors.java b/src/main/java/br/com/leonardoz/features/executors/UsingExecutors.java index 4294de4..93c4bff 100644 --- a/src/main/java/br/com/leonardoz/features/executors/UsingExecutors.java +++ b/src/main/java/br/com/leonardoz/features/executors/UsingExecutors.java @@ -1,13 +1,11 @@ package br.com.leonardoz.features.executors; import java.util.LinkedList; -import java.util.List; import java.util.UUID; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; /** @@ -15,7 +13,7 @@ * * Executors help us to decouple task submission from execution. * - * We have 4 types of executors: + * We have 6 types of executors: * * - Single Thread Executor: Uses a single worker to process tasks. * @@ -27,6 +25,13 @@ * * - Scheduled Thread Pool: Bounded thread limit, used for delayed tasks. * + * - Single-Thread Scheduled Pool: Similar to the scheduled thread pool, but + * single-threaded, with only one active task at the time. + * + * - Work-Stealing Thread Pool: Based on Fork/Join Framework, applies the + * work-stealing algorithm for balancing tasks, with available processors as a + * paralellism level. + * * And 2 types of tasks: * * - execute: Executes without giving feedback. Fire-and-forget. @@ -45,7 +50,7 @@ public class UsingExecutors { public static void usingSingleThreadExecutor() { System.out.println("=== SingleThreadExecutor ==="); - ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor(); + var singleThreadExecutor = Executors.newSingleThreadExecutor(); singleThreadExecutor.execute(() -> System.out.println("Print this.")); singleThreadExecutor.execute(() -> System.out.println("and this one to.")); singleThreadExecutor.shutdown(); @@ -59,15 +64,15 @@ public static void usingSingleThreadExecutor() { public static void usingCachedThreadPool() { System.out.println("=== CachedThreadPool ==="); - ExecutorService cachedThreadPool = Executors.newCachedThreadPool(); - List> uuids = new LinkedList<>(); + var cachedThreadPool = Executors.newCachedThreadPool(); + var uuids = new LinkedList>(); for (int i = 0; i < 10; i++) { - Future submitted = cachedThreadPool.submit(() -> { - UUID randomUUID = UUID.randomUUID(); + var submittedUUID = cachedThreadPool.submit(() -> { + var randomUUID = UUID.randomUUID(); System.out.println("UUID " + randomUUID + " from " + Thread.currentThread().getName()); return randomUUID; }); - uuids.add(submitted); + uuids.add(submittedUUID); } cachedThreadPool.execute(() -> uuids.forEach((f) -> { try { @@ -88,11 +93,11 @@ public static void usingCachedThreadPool() { public static void usingFixedThreadPool() { System.out.println("=== FixedThreadPool ==="); - ExecutorService fixedPool = Executors.newFixedThreadPool(4); - List> uuids = new LinkedList<>(); + var fixedPool = Executors.newFixedThreadPool(4); + var uuids = new LinkedList>(); for (int i = 0; i < 20; i++) { - Future submitted = fixedPool.submit(() -> { - UUID randomUUID = UUID.randomUUID(); + var submitted = fixedPool.submit(() -> { + var randomUUID = UUID.randomUUID(); System.out.println("UUID " + randomUUID + " from " + Thread.currentThread().getName()); return randomUUID; }); @@ -116,11 +121,11 @@ public static void usingFixedThreadPool() { public static void usingScheduledThreadPool() { System.out.println("=== ScheduledThreadPool ==="); - ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(4); - scheduledThreadPool.scheduleAtFixedRate( - () -> System.out.println("Print every 2s"), 0, 2, TimeUnit.SECONDS); - scheduledThreadPool.scheduleWithFixedDelay( - () -> System.out.println("Print every 2s delay"), 0, 2, TimeUnit.SECONDS); + var scheduledThreadPool = Executors.newScheduledThreadPool(4); + scheduledThreadPool.scheduleAtFixedRate(() -> System.out.println("1) Print every 2s"), 0, 2, TimeUnit.SECONDS); + scheduledThreadPool.scheduleAtFixedRate(() -> System.out.println("2) Print every 2s"), 0, 2, TimeUnit.SECONDS); + scheduledThreadPool.scheduleWithFixedDelay(() -> System.out.println("3) Print every 2s delay"), 0, 2, + TimeUnit.SECONDS); try { scheduledThreadPool.awaitTermination(6, TimeUnit.SECONDS); @@ -128,12 +133,66 @@ public static void usingScheduledThreadPool() { } catch (InterruptedException e) { e.printStackTrace(); } + System.out.println("\n\n"); + } + + public static void usingSingleTreadScheduledExecutor() { + System.out.println("=== SingleThreadScheduledThreadPool ==="); + var singleThreadScheduler = Executors.newSingleThreadScheduledExecutor(); + singleThreadScheduler.scheduleAtFixedRate(() -> System.out.println("1) Print every 2s"), 0, 2, TimeUnit.SECONDS); + singleThreadScheduler.scheduleWithFixedDelay(() -> System.out.println("2) Print every 2s delay"), 0, 2, + TimeUnit.SECONDS); + + try { + singleThreadScheduler.awaitTermination(6, TimeUnit.SECONDS); + singleThreadScheduler.shutdown(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + System.out.println("\n\n"); + + } + + public static void usingWorkStealingThreadPool() { + System.out.println("=== WorkStealingThreadPool ==="); + var workStealingPool = Executors.newWorkStealingPool(); + + workStealingPool.execute(() -> System.out.println("Prints normally")); + + Callable generatesUUID = UUID::randomUUID; + var severalUUIDsTasks = new LinkedList>(); + for (int i = 0; i < 20; i++) { + severalUUIDsTasks.add(generatesUUID); + } + + try { + var futureUUIDs = workStealingPool.invokeAll(severalUUIDsTasks); + for (var future : futureUUIDs) { + if (future.isDone()) { + var uuid = future.get(); + System.out.println("New UUID :" + uuid); + } + } + } catch (InterruptedException | ExecutionException e) { + e.printStackTrace(); + } + try { + workStealingPool.awaitTermination(6, TimeUnit.SECONDS); + workStealingPool.shutdown(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + System.out.println("\n\n"); } + + public static void main(String[] args) { usingSingleThreadExecutor(); usingCachedThreadPool(); usingFixedThreadPool(); usingScheduledThreadPool(); + usingSingleTreadScheduledExecutor(); + usingWorkStealingThreadPool(); } } diff --git a/src/main/java/br/com/leonardoz/features/forkjoin/UsingForkJoinFramework.java b/src/main/java/br/com/leonardoz/features/forkjoin/UsingForkJoinFramework.java new file mode 100644 index 0000000..f61eea7 --- /dev/null +++ b/src/main/java/br/com/leonardoz/features/forkjoin/UsingForkJoinFramework.java @@ -0,0 +1,253 @@ +package br.com.leonardoz.features.forkjoin; + +import java.lang.Thread.UncaughtExceptionHandler; +import java.math.BigInteger; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.RecursiveAction; +import java.util.concurrent.RecursiveTask; + +/** + * + * The Fork/Join Framework + * + * Parallelism is the execution of multiple tasks simultaneously. Fork Join + * Framework helps with the execution of those tasks in parallel. + * + * Why? + * + * The Fork/Join approach speeds up the execution of a task that can be split + * into subtasks/small tasks, be executing them in parallel and combining those + * results. + * + * Limitations + * + * One requirement for using the Fork/Join Framework is that all of the Subtasks + * must be "completable and independent" of each other to be truly parallel, so + * not every problem can be solved using this method. In general, the ForkJoin Framework + * is to be used by CPU-intensive computations, not IO bound computations, due to the + * long wait periods that could happen. + * + * How it works + * + * It uses a divide and conquer approach, dividing a major task into minor + * subtasks recursively (Fork), until the division limit is reached and the + * tasks can be solved, to be later combined (Join). + * + * For the execution of those tasks in parallel, the framework will use a thread + * pool, which has, be default, the same size of the number of processors + * available for the JVM. + * + * A thread from the pool has it's own double ended queue, for the matter of + * storing all the tasks that are being executed/to be executed. The double + * ended queue nature enables inserts or deletes to both the head and last + * position of the queue. + * + * The work-stealing algorithm is the greatest functionality for the speed up + * aspect of the ForkJoin Framework. The algorithm balances the workload between + * threads, allowing the threads that doesn't have any task at the moment to + * "steal" from last position of a thread's queue that can't process his own + * last task at the moment. In theory, there will be more task being processed. + * + * Framework architecture overview + * + * - ForkJoinPool: Base class for the pools, used to balance tasks that can be + * "work-stealed". + * + * - ForkJoinTask: Represents a task to be executed in a ForkJoinPool. + * + * - RecursiveTask: Specialization of ForkJoinTask, holds a result. + * + * - RecursiveAction: Specialization of ForkJoinTask, just process something + * without yielding a result. + * + * + * Workflow + * + * The idea is that you can split bigger tasks into smaller ones, until that the work + * is small enough to be completed. + * + * the following algorithm describes how to use the ForkJoinFramework correctly. + * + * if (my task is small enough) + * + * complete my task + * + * else + * split my task into two small tasks + * + * execute both tasks and wait for the results + * + * + * Then do your work based on the result + * + * + */ +public class UsingForkJoinFramework { + + /** + * Common Pool + * + * Default instance of a fork join pool in a Java app, used by + * CompletableFuture, and parallel streams. All threads used by the common pool + * can be reused, released and reinstated after some time. This approach reduces + * the resource consumption. It doesn't need to be closed/shutdown. + * + */ + public ForkJoinPool getCommonPool() { + return ForkJoinPool.commonPool(); + } + + /** + * Customize ForkJoinPool + * + * Parallelism: Parallelism level, default is Runtime#availableProcessors + * + * ForkJoinWorkerThreadFactory: Factory used for creating threads for the pool. + * + * UncaughtExceptionHandler: handles worker threads that terminates due some + * "unrecoverable" problem. + * + * True-value AsyncMode: FIFO scheduling mode, used by tasks that are never + * joined, like event-oriented asynchronous tasks. + * + */ + public ForkJoinPool customForkJoinPool(int parallelism, + ForkJoinPool.ForkJoinWorkerThreadFactory factory, + UncaughtExceptionHandler handler, + boolean asyncMode) { + return new ForkJoinPool(parallelism, factory, handler, asyncMode); + } + + /** + * + * Tasks + * + * ForkJoinTask is the base type of a task. It represents a "lightweight + * thread", with the ForkJoinPool being it's scheduler. + * + * RecursiveTask: Task that returns a value, result of a computation. + * + * RecursiveAction: Task that doesn't returns a value. + * + * Both can be used to implement the workflow algorithm described in the + * Workflow section, with he aid of Fork and Join. + * + */ + + /** + * RecursiveTask + * + * Represents a result of a computation. + * + * In the example bellow, it follows the algorithm, partitioning the numbers + * list in half, using fork and join to control the task flow. + * + */ + static class RecSumTask extends RecursiveTask { + + private static final long serialVersionUID = 1L; + public static final int DIVIDE_AT = 500; + + private List numbers; + + public RecSumTask(List numbers) { + this.numbers = numbers; + } + + @Override + protected BigInteger compute() { + var subTasks = new LinkedList(); + if (numbers.size() < DIVIDE_AT) { + // directly + var subSum = BigInteger.ZERO; + for (Integer number : numbers) { + subSum = subSum.add(BigInteger.valueOf(number)); + } + return subSum; + } else { + // Divide to conquer + var size = numbers.size(); + var numbersLeft = numbers.subList(0, size / 2); + var numbersRight = numbers.subList(size / 2, size); + + var recSumLeft = new RecSumTask(numbersLeft); + var recSumRight = new RecSumTask(numbersRight); + + subTasks.add(recSumRight); + subTasks.add(recSumLeft); + + // Fork Child Tasks + recSumLeft.fork(); + recSumRight.fork(); + } + + var sum = BigInteger.ZERO; + for (var recSum : subTasks) { + // Join Child Tasks + sum = sum.add(recSum.join()); + } + return sum; + } + } + + public static void main(String[] args) { + // prepares dataset for the example + var numbers = new LinkedList(); + for (int i = 0; i < 500_000; i++) { + numbers.add(i); + } + + // Usage + var commonPool = ForkJoinPool.commonPool(); + var task = new RecSumTask(numbers); + BigInteger result = commonPool.invoke(task); + System.out.println("Result is: " + result); + System.out.println("\n\n"); + } + + /** + * RecursiveTask + * + * Represents a result of a computation, resembles RecursiveTask, but without + * the return value. + * + */ + static class ARecursiveAction extends RecursiveAction { + + private static final long serialVersionUID = 1L; + + @Override + protected void compute() { + // same pattern goes here + } + + } + + /** + * It's possible to extract informations about the pool's current state. + * + * Active thread count: Number of threads that are stealing or executing tasks. + * + * Pool size: Number of worker threads that are started but not terminated yet. + * + * Parallelism level: Equivalent to the number of available processors. + * + * Queue submitted tasks: Number of submitted tasks, but not executing. Steal + * count: + * + * Number of stealed tasks from a thread to another, useful for monitoring. + * + */ + public static void debugPool(ForkJoinPool commonPool) { + System.out.println("Debuggin ForJoinPool"); + System.out.println("Active Thread Count: " + commonPool.getActiveThreadCount()); + System.out.println("Pool Size: " + commonPool.getPoolSize()); + System.out.println("Parallelism level: " + commonPool.getParallelism()); + System.out.println("Queue submitted tasks: " + commonPool.getQueuedSubmissionCount()); + System.out.println("Steal count: " + commonPool.getStealCount()); + System.out.println("\n"); + } + +} diff --git a/src/main/java/br/com/leonardoz/features/futures/UsingCompletableFuture.java b/src/main/java/br/com/leonardoz/features/futures/UsingCompletableFuture.java index 6841327..bbffd0f 100644 --- a/src/main/java/br/com/leonardoz/features/futures/UsingCompletableFuture.java +++ b/src/main/java/br/com/leonardoz/features/futures/UsingCompletableFuture.java @@ -3,7 +3,6 @@ import java.util.Random; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; @@ -11,7 +10,7 @@ /** * * CompletableFuture is a Future that may be manually completed. It combines a - * Future interface with the CompletionState interface, supporting dependent + * Future interface with the CompletionStage interface, supporting dependent * actions that trigger upon its completion, similarly to a callback. * * Important: Specify an Executor for async methods when available. All async @@ -30,6 +29,7 @@ * xxxAsync(..., Executor executor); // Executed in the specified Executor, good * for Java EE. * + * * = supply x run = * * supplyAsync(Supplier supplier); // will complete asynchronously by calling @@ -37,6 +37,7 @@ * * runAsync(Runnable runnable); // will complete after the runnable executions; * + * * = thenApply x thenAccept x thenRun * * thenApply: transforms a value to another type; @@ -140,8 +141,8 @@ public class UsingCompletableFuture { public static void main(String[] args) throws InterruptedException, ExecutionException { - Random random = new Random(); - ExecutorService executor = Executors.newCachedThreadPool(); + var random = new Random(); + var executor = Executors.newCachedThreadPool(); // Creating CompletableFuture randomNum = CompletableFuture.supplyAsync(() -> random.nextInt(140), executor); @@ -165,8 +166,8 @@ public static void main(String[] args) throws InterruptedException, ExecutionExc Integer value = mappedAndCombined.get(); System.out.println("Sum " + value); - // Indefined time task - Supplier ind = () -> { + // Undefined time task + Supplier randomDouble = () -> { try { Thread.sleep(random.nextInt(1000)); } catch (InterruptedException e) { @@ -176,14 +177,14 @@ public static void main(String[] args) throws InterruptedException, ExecutionExc }; // Run after executed - CompletableFuture f1 = CompletableFuture.supplyAsync(ind); - CompletableFuture f2 = CompletableFuture.supplyAsync(ind); - CompletableFuture f3 = CompletableFuture.supplyAsync(ind); - CompletableFuture f4 = CompletableFuture.supplyAsync(ind); + CompletableFuture f1 = CompletableFuture.supplyAsync(randomDouble); + CompletableFuture f2 = CompletableFuture.supplyAsync(randomDouble); + CompletableFuture f3 = CompletableFuture.supplyAsync(randomDouble); + CompletableFuture f4 = CompletableFuture.supplyAsync(randomDouble); CompletableFuture.anyOf(f1, f2, f3, f4).thenRun(() -> System.out.println("Completed")); // Fastest result will be delivered - // Indefined time task - static value + // Undefined time task - static value Supplier getVal = () -> { try { Thread.sleep(random.nextInt(1000)); diff --git a/src/main/java/br/com/leonardoz/features/java_memory_model/UsingReactiveStreams.java b/src/main/java/br/com/leonardoz/features/java_memory_model/UsingReactiveStreams.java new file mode 100644 index 0000000..2f8bef2 --- /dev/null +++ b/src/main/java/br/com/leonardoz/features/java_memory_model/UsingReactiveStreams.java @@ -0,0 +1,5 @@ +package br.com.leonardoz.features.java_memory_model; + +public class UsingReactiveStreams { + +} diff --git a/src/main/java/br/com/leonardoz/features/java_memory_model/WhatIsJavaMemoryModel.java b/src/main/java/br/com/leonardoz/features/java_memory_model/WhatIsJavaMemoryModel.java index 08f09ed..d3e309d 100644 --- a/src/main/java/br/com/leonardoz/features/java_memory_model/WhatIsJavaMemoryModel.java +++ b/src/main/java/br/com/leonardoz/features/java_memory_model/WhatIsJavaMemoryModel.java @@ -13,8 +13,8 @@ * == Reordering == * * When data is shared across threads, there's not so much guarantees on the - * execution order by the threads, and because of the imprevisibility, it's - * import to identify shared data and to use a proper synchronization mechanism + * execution order by the threads, and because of the unpredictability, it's + * important to identify shared data and to use a proper synchronization mechanism * to ensure order and to keep the visibility guarantees provided by the JMM. * * == Happens-Before == diff --git a/src/main/java/br/com/leonardoz/features/locks/UsingExplicitReadWriteLocks.java b/src/main/java/br/com/leonardoz/features/locks/UsingExplicitReadWriteLocks.java index 7fa3fe8..d531c65 100644 --- a/src/main/java/br/com/leonardoz/features/locks/UsingExplicitReadWriteLocks.java +++ b/src/main/java/br/com/leonardoz/features/locks/UsingExplicitReadWriteLocks.java @@ -2,7 +2,6 @@ import java.util.Random; import java.util.UUID; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; @@ -65,8 +64,8 @@ public void writeContent(String newContentToAppend) { } public static void main(String[] args) { - ExecutorService executor = Executors.newCachedThreadPool(); - UsingExplicitReadWriteLocks uerwl = new UsingExplicitReadWriteLocks(); + var executor = Executors.newCachedThreadPool(); + var self = new UsingExplicitReadWriteLocks(); // Readers for (int i = 0; i < 100; i++) { executor.submit(() -> { @@ -76,13 +75,13 @@ public static void main(String[] args) { } catch (InterruptedException e) { e.printStackTrace(); } - System.out.println(uerwl.showContent()); + System.out.println(self.showContent()); }); } // Writers - only if no writer is available for (int i = 0; i < 5; i++) { - executor.execute(() -> uerwl.writeContent(UUID.randomUUID().toString())); + executor.execute(() -> self.writeContent(UUID.randomUUID().toString())); } executor.shutdown(); } diff --git a/src/main/java/br/com/leonardoz/features/locks/UsingExplicitReentrantLocks.java b/src/main/java/br/com/leonardoz/features/locks/UsingExplicitReentrantLocks.java index a0006a6..9f9974c 100644 --- a/src/main/java/br/com/leonardoz/features/locks/UsingExplicitReentrantLocks.java +++ b/src/main/java/br/com/leonardoz/features/locks/UsingExplicitReentrantLocks.java @@ -1,6 +1,5 @@ package br.com.leonardoz.features.locks; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; @@ -60,16 +59,16 @@ public void lockMyHearthWithTiming() throws InterruptedException { } public static void main(String[] args) { - ExecutorService executor = Executors.newCachedThreadPool(); - UsingExplicitReentrantLocks uel = new UsingExplicitReentrantLocks(); + var executor = Executors.newCachedThreadPool(); + var self = new UsingExplicitReentrantLocks(); for (int i = 0; i < 10; i++) { - executor.execute(() -> uel.lockMyHearth()); + executor.execute(() -> self.lockMyHearth()); } for (int i = 0; i < 40; i++) { executor.execute(() -> { try { - uel.lockMyHearthWithTiming(); + self.lockMyHearthWithTiming(); } catch (InterruptedException e) { e.printStackTrace(); } diff --git a/src/main/java/br/com/leonardoz/features/locks/UsingIntrinsicLocks.java b/src/main/java/br/com/leonardoz/features/locks/UsingIntrinsicLocks.java index 89e29ac..a9cc800 100644 --- a/src/main/java/br/com/leonardoz/features/locks/UsingIntrinsicLocks.java +++ b/src/main/java/br/com/leonardoz/features/locks/UsingIntrinsicLocks.java @@ -1,6 +1,5 @@ package br.com.leonardoz.features.locks; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** @@ -69,18 +68,18 @@ public synchronized void reentrancy() { } public static void main(String[] args) throws InterruptedException { - ExecutorService executor = Executors.newCachedThreadPool(); - UsingIntrinsicLocks uil = new UsingIntrinsicLocks(); + var executor = Executors.newCachedThreadPool(); + var self = new UsingIntrinsicLocks(); for (int i = 0; i < 100; i++) { - executor.execute(() -> uil.mySynchronizedMethod()); + executor.execute(() -> self.mySynchronizedMethod()); } Thread.sleep(1000); for (int i = 0; i < 10; i++) { - executor.execute(() -> uil.mySynchronizedBlock()); + executor.execute(() -> self.mySynchronizedBlock()); } Thread.sleep(1000); for (int i = 0; i < 10; i++) { - executor.execute(() -> uil.reentrancy()); + executor.execute(() -> self.reentrancy()); } executor.shutdown(); } diff --git a/src/main/java/br/com/leonardoz/features/parallel_stream/UsingParallelStreams.java b/src/main/java/br/com/leonardoz/features/parallel_stream/UsingParallelStreams.java new file mode 100644 index 0000000..eaf2b46 --- /dev/null +++ b/src/main/java/br/com/leonardoz/features/parallel_stream/UsingParallelStreams.java @@ -0,0 +1,99 @@ +package br.com.leonardoz.features.parallel_stream; + +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * + * Warning + * + * Parallel Streams uses the ForkJoin Pool, so be aware of using it in Java + * EE/Jakarta EE environments! + * + * The Stream API allows, with methods like parallel() or parallelStream(), the + * execution of streams's operations in parallel. + * + * It enables each element to be processed in parallel, having a thread for each + * one of them, depending on the number of cores available. Like the Fork/Join + * Framework, it has an overhead, so the speed of execution can get much better + * in some cases, getting worse in some others. + * + * Streams are composed of a Source, several intermediate operations and a + * terminal operation. Streams are executed only when a terminal operation is + * executed, so they're lazy too. The intermediate operations respect the order + * that you used, they're sequential. The work of each intermediate operation is + * parallel. + * + * CPU intensive tasks benefits from this feature. + * + * + */ +public class UsingParallelStreams { + + public static void main(String[] args) { + // Creating Parallel Streams from existing collection + new ArrayList<>().parallelStream(); + + // Making Stream Parallel + IntStream.rangeClosed(0, 30_000) // source + .parallel().mapToObj(BigInteger::valueOf).map(UsingParallelStreams::isPrime) // Intermediate operations + .collect(Collectors.toList()); // Terminal Operations + + // Each operation run in parallel, out of order + IntStream.rangeClosed(0, 20) // source + .parallel().mapToObj(Integer::toString) // Intermediate operation + .forEach(System.out::print); // Terminal operation + + System.out.println("\n"); + + // Runs sequentially, in order. + IntStream.rangeClosed(0, 20) + .mapToObj(Integer::toString) + .forEach(System.out::print); + + System.out.println("\n"); + + dummyPerformanceCheck(); + } + + private static void dummyPerformanceCheck() { + + // Sequential Stream + var start1 = System.currentTimeMillis(); + IntStream.rangeClosed(0, 50_000) + .mapToObj(BigInteger::valueOf) + .map(UsingParallelStreams::isPrime) + .collect(Collectors.toList()); + var end1 = System.currentTimeMillis(); + var time1 = (end1 - start1) / 1000; + System.out.println("Sequential: " + time1); + + // Parallel Stream + var start2 = System.currentTimeMillis(); + IntStream.rangeClosed(0, 50_000) + .parallel() + .mapToObj(BigInteger::valueOf) + .map(UsingParallelStreams::isPrime) + .collect(Collectors.toList()); + var end2 = System.currentTimeMillis(); + var time2 = (end2 - start2) / 1000; + System.out.println("Parallel: " + time2); + } + + // thanks to linski on + // https://stackoverflow.com/questions/15862271/java-compute-intensive-task + public static boolean isPrime(BigInteger n) { + var counter = BigInteger.ONE.add(BigInteger.ONE); + var isPrime = true; + while (counter.compareTo(n) == -1) { + if (n.remainder(counter).compareTo(BigInteger.ZERO) == 0) { + isPrime = false; + break; + } + counter = counter.add(BigInteger.ONE); + } + return isPrime; + } +} diff --git a/src/main/java/br/com/leonardoz/features/synchronizers/UsingBarriers.java b/src/main/java/br/com/leonardoz/features/synchronizers/UsingBarriers.java index c5776d9..0b4610d 100644 --- a/src/main/java/br/com/leonardoz/features/synchronizers/UsingBarriers.java +++ b/src/main/java/br/com/leonardoz/features/synchronizers/UsingBarriers.java @@ -3,7 +3,6 @@ import java.util.Random; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** @@ -20,10 +19,10 @@ public class UsingBarriers { public static void main(String[] args) { - ExecutorService executor = Executors.newCachedThreadPool(); Runnable barrierAction = () -> System.out.println("Well done, guys!"); - CyclicBarrier barrier = new CyclicBarrier(10, barrierAction); + var executor = Executors.newCachedThreadPool(); + var barrier = new CyclicBarrier(10, barrierAction); Runnable task = () -> { try { diff --git a/src/main/java/br/com/leonardoz/features/synchronizers/UsingLatches.java b/src/main/java/br/com/leonardoz/features/synchronizers/UsingLatches.java index 90904d4..1da6b2b 100644 --- a/src/main/java/br/com/leonardoz/features/synchronizers/UsingLatches.java +++ b/src/main/java/br/com/leonardoz/features/synchronizers/UsingLatches.java @@ -1,7 +1,6 @@ package br.com.leonardoz.features.synchronizers; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -19,8 +18,8 @@ public class UsingLatches { public static void main(String[] args) { - ExecutorService executor = Executors.newCachedThreadPool(); - CountDownLatch latch = new CountDownLatch(3); + var executor = Executors.newCachedThreadPool(); + var latch = new CountDownLatch(3); Runnable r = () -> { try { Thread.sleep(1000); diff --git a/src/main/java/br/com/leonardoz/features/synchronizers/UsingSemaphores.java b/src/main/java/br/com/leonardoz/features/synchronizers/UsingSemaphores.java index cfaf9b5..8fbffff 100644 --- a/src/main/java/br/com/leonardoz/features/synchronizers/UsingSemaphores.java +++ b/src/main/java/br/com/leonardoz/features/synchronizers/UsingSemaphores.java @@ -1,6 +1,5 @@ package br.com.leonardoz.features.synchronizers; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; @@ -20,8 +19,9 @@ */ public class UsingSemaphores { public static void main(String[] args) { - ExecutorService executor = Executors.newCachedThreadPool(); - Semaphore semaphore = new Semaphore(3); + + var executor = Executors.newCachedThreadPool(); + var semaphore = new Semaphore(3); Runnable r = () -> { try { diff --git a/src/main/java/br/com/leonardoz/features/threads/UsingThreads.java b/src/main/java/br/com/leonardoz/features/threads/UsingThreads.java index 8d62445..129c1e2 100644 --- a/src/main/java/br/com/leonardoz/features/threads/UsingThreads.java +++ b/src/main/java/br/com/leonardoz/features/threads/UsingThreads.java @@ -14,12 +14,12 @@ public class UsingThreads { public static void main(String[] args) throws InterruptedException { // Creating - Thread created = new Thread(); + var created = new Thread(); created.start(); // .run() runs on main thread // Assigning a task for running on a thread - we pass a Runnable instance - Thread threadWithTask = new Thread(() -> System.out.println("Inside thread" + Thread.currentThread().getName())); + var threadWithTask = new Thread(() -> System.out.println("Inside thread" + Thread.currentThread().getName())); threadWithTask.start(); // Interrupting a thread @@ -28,7 +28,7 @@ public static void main(String[] args) throws InterruptedException { System.out.println("Im not interrupted " + Thread.currentThread().getName()); } }; - Thread interruptable = new Thread(interrupatblyTask); + var interruptable = new Thread(interrupatblyTask); interruptable.start(); Thread.sleep(3000); interruptable.interrupt(); diff --git a/src/main/java/br/com/leonardoz/patterns/condition_queues/ExplicitConditionQueue.java b/src/main/java/br/com/leonardoz/patterns/condition_queues/ExplicitConditionQueue.java index 42220c6..3a9d205 100644 --- a/src/main/java/br/com/leonardoz/patterns/condition_queues/ExplicitConditionQueue.java +++ b/src/main/java/br/com/leonardoz/patterns/condition_queues/ExplicitConditionQueue.java @@ -22,7 +22,7 @@ */ public class ExplicitConditionQueue { - private static final int LIMIT = 8; + private static final int LIMIT = 5; private int messageCount = 0; private Lock lock = new ReentrantLock(); private Condition limitReachedCondition = lock.newCondition(); @@ -58,13 +58,13 @@ public void printMessages(String message) throws InterruptedException { } public static void main(String[] args) { - ExplicitConditionQueue eqc = new ExplicitConditionQueue(); + var queue = new ExplicitConditionQueue(); // Will run indefinitely new Thread(() -> { while (true) { - String uuidMessage = UUID.randomUUID().toString(); + var uuidMessage = UUID.randomUUID().toString(); try { - eqc.printMessages(uuidMessage); + queue.printMessages(uuidMessage); } catch (InterruptedException e) { e.printStackTrace(); } @@ -73,7 +73,7 @@ public static void main(String[] args) { new Thread(() -> { while (true) { try { - eqc.stopMessages(); + queue.stopMessages(); } catch (InterruptedException e) { e.printStackTrace(); } diff --git a/src/main/java/br/com/leonardoz/patterns/condition_queues/WaitNotifyQueue.java b/src/main/java/br/com/leonardoz/patterns/condition_queues/WaitNotifyQueue.java index fbf1c37..afa138f 100644 --- a/src/main/java/br/com/leonardoz/patterns/condition_queues/WaitNotifyQueue.java +++ b/src/main/java/br/com/leonardoz/patterns/condition_queues/WaitNotifyQueue.java @@ -39,16 +39,16 @@ public synchronized void stopsMessaging() { public synchronized void message() throws InterruptedException { while (!continueToNotify) wait(); - String message = messages.take(); + var message = messages.take(); System.out.println(message); } public static void main(String[] args) { - List messages = new LinkedList<>(); + var messages = new LinkedList(); for (int i = 0; i < 130; i++) { messages.add(UUID.randomUUID().toString()); } - WaitNotifyQueue waitNotifyQueue = new WaitNotifyQueue(messages); + var waitNotifyQueue = new WaitNotifyQueue(messages); new Thread(() -> { try { while (true) { @@ -56,13 +56,14 @@ public static void main(String[] args) { Thread.sleep(300); } } catch (InterruptedException e) { + Thread.currentThread().interrupt(); e.printStackTrace(); } }).start(); - Random r = new Random(); + var random = new Random(); new Thread(() -> { while (true) { - int val = r.nextInt(100); + int val = random.nextInt(100); System.out.println(val); if (val == 99) { break; @@ -70,6 +71,7 @@ public static void main(String[] args) { try { Thread.sleep(400); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); e.printStackTrace(); } } @@ -77,6 +79,7 @@ public static void main(String[] args) { try { Thread.sleep(500); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); e.printStackTrace(); } }).start(); diff --git a/src/main/java/br/com/leonardoz/patterns/controlled_initialization/ControlledInitialization.java b/src/main/java/br/com/leonardoz/patterns/controlled_initialization/ControlledInitialization.java index 10a8d89..5576ac1 100644 --- a/src/main/java/br/com/leonardoz/patterns/controlled_initialization/ControlledInitialization.java +++ b/src/main/java/br/com/leonardoz/patterns/controlled_initialization/ControlledInitialization.java @@ -1,7 +1,6 @@ package br.com.leonardoz.patterns.controlled_initialization; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** @@ -81,7 +80,7 @@ public ControlledInitialization() { } private void doTask() { - System.out.println("=== Resources initialized ==="); + System.out.println("=== Resources Initialized ==="); System.out.println("Resource 1 instance " + resource1); System.out.println("Resource 2 instance " + resource2); System.out.println("Resource 3 instance " + resource3); @@ -90,7 +89,7 @@ private void doTask() { private void initialize() { System.out.println("=== Initializing Resources ==="); - ExecutorService executor = Executors.newFixedThreadPool(3); + var executor = Executors.newFixedThreadPool(3); executor.execute(initResource1); executor.execute(initResource2); executor.execute(initResource3); diff --git a/src/main/java/br/com/leonardoz/patterns/divideconquer/ParallelDivideAndConquer.java b/src/main/java/br/com/leonardoz/patterns/divideconquer/ParallelDivideAndConquer.java new file mode 100644 index 0000000..4cef549 --- /dev/null +++ b/src/main/java/br/com/leonardoz/patterns/divideconquer/ParallelDivideAndConquer.java @@ -0,0 +1,53 @@ +package br.com.leonardoz.patterns.divideconquer; + +import java.util.List; +import java.util.concurrent.RecursiveTask; + +/* + * Pattern: Parallel Divide and Conquer + * + * Motivations: Some kind of tasks, algorithms or flows can be divided + * into smaller pieces, which are independently resolvable, making them fall + * into the category of divide and conquer computations. Those properties + * allows the execution to be parallel, possible increasing the speed if those + * problems were difficult enough to compensate the cost involved in parallelization. + * + * Intent: Make the execution of some computation that uses the divide and conquer + * approach parallel, in order to increase performance if the problem difficulty is + * enough to overcome the overhead of using threads and coordination. It uses the ForkJoin Framework + * to model those problems. + * + * Applicability: Divide and conquer computations. + * + */ +public class ParallelDivideAndConquer { + + private final static int THRESHOLD = 10; // Choosing a number to split the computation + + + public static class Task extends RecursiveTask { + + private static final long serialVersionUID = 1L; + private List somethingToDivideAndConquer; + + public Task(List somethingToDivideAndConquer) { + this.somethingToDivideAndConquer = somethingToDivideAndConquer; + + } + + @Override + protected Integer compute() { + var size = somethingToDivideAndConquer.size(); + if (size < THRESHOLD) { + // solves directly + return 1; + } else { + // creates tasks, fork and join + return 2; + } + } + + } + + +} diff --git a/src/main/java/br/com/leonardoz/patterns/divideconquer/ParallelSum.java b/src/main/java/br/com/leonardoz/patterns/divideconquer/ParallelSum.java new file mode 100644 index 0000000..8fff70e --- /dev/null +++ b/src/main/java/br/com/leonardoz/patterns/divideconquer/ParallelSum.java @@ -0,0 +1,106 @@ +package br.com.leonardoz.patterns.divideconquer; + +import java.math.BigInteger; +import java.util.List; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.RecursiveTask; +import java.util.stream.Collectors; +import java.util.stream.LongStream; + +/* + * Pattern: Parallel Divide and Conquer + * + * Example: Parallel Sum + * + * This is a simple example for educational purpose only, run on your + * machine and check which one is better! + */ +public class ParallelSum extends RecursiveTask { + + private static final long serialVersionUID = 1L; + private final static int THRESHOLD = 10_000; // Choosing a number to split the computation + + private List nums; + + public ParallelSum(List nums) { + this.nums = nums; + } + + @Override + protected BigInteger compute() { + var size = nums.size(); + if (size < THRESHOLD) { + return sequentialSum(nums); + } else { + var x = new ParallelSum(nums.subList(0, size / 2)); + var y = new ParallelSum(nums.subList(size / 2, size)); + x.fork(); + y.fork(); + var xResult = x.join(); + var yResult = y.join(); + return yResult.add(xResult); + } + } + + /* + * Just showing how to use the pattern and some really dummy benchmark. Don't + * take it seriously. + */ + public static void main(String[] args) throws InterruptedException { + var nums = LongStream.range(0, 10_000_000L) + .mapToObj(BigInteger::valueOf) + .collect(Collectors.toList()); + +// Run one then comment and run another + Runnable parallel = () -> { + var commonPool = ForkJoinPool.commonPool(); + var result = commonPool.invoke(new ParallelSum(nums)); + + System.out.println("Parallel Result is: " + result); + }; + + Runnable sequential = () -> { + var acc = sequentialSum(nums); + + System.out.println("Sequential Result is: " + acc); + }; + + sequential.run(); + parallel.run(); + + Thread.sleep(2000); + + System.out.println("#### After some JIT \n\n"); + + dummyBenchmark(sequential); + dummyBenchmark(parallel); + + Thread.sleep(2000); + + System.out.println("#### After more JIT \n\n"); + + dummyBenchmark(sequential); + dummyBenchmark(parallel); + } + + private static BigInteger sequentialSum(List nums) { + var acc = BigInteger.ZERO; + for (var value : nums) { + acc = acc.add(value); + } + return acc; + } + + static void getHot(Runnable runnable) { + runnable.run(); + } + + static void dummyBenchmark(Runnable runnable) { + var before = System.currentTimeMillis(); + runnable.run(); + var after = System.currentTimeMillis(); + System.out.println("Executed in: " + (after - before)); + System.out.println("######\n"); + } + +} diff --git a/src/main/java/br/com/leonardoz/patterns/fixed_lock_ordering/CoinTransfer.java b/src/main/java/br/com/leonardoz/patterns/fixed_lock_ordering/CoinTransfer.java index 68291c2..862dd89 100644 --- a/src/main/java/br/com/leonardoz/patterns/fixed_lock_ordering/CoinTransfer.java +++ b/src/main/java/br/com/leonardoz/patterns/fixed_lock_ordering/CoinTransfer.java @@ -48,8 +48,8 @@ public void withdrawCoins(BigInteger amount) { } public void transferBetweenPlayers(Player playerFrom, Player playerTo, BigInteger amount) { - int from = playerFrom.getId(); - int to = playerTo.getId(); + var from = playerFrom.getId(); + var to = playerTo.getId(); if (from < to) { synchronized (playerFrom) { synchronized (playerTo) { diff --git a/src/main/java/br/com/leonardoz/patterns/fixed_lock_ordering/FixedLockOrdering.java b/src/main/java/br/com/leonardoz/patterns/fixed_lock_ordering/FixedLockOrdering.java index c56a955..5ac14d6 100644 --- a/src/main/java/br/com/leonardoz/patterns/fixed_lock_ordering/FixedLockOrdering.java +++ b/src/main/java/br/com/leonardoz/patterns/fixed_lock_ordering/FixedLockOrdering.java @@ -38,8 +38,8 @@ public void setAnotherValue(String anotherValue) { } public void doSomeOperation(LockableObject obj1, LockableObject obj2) { - int obj1Id = obj1.getId(); - int obj2Id = obj2.getId(); + var obj1Id = obj1.getId(); + var obj2Id = obj2.getId(); if (obj1Id < obj2Id) { synchronized (obj1) { synchronized (obj2) { diff --git a/src/main/java/br/com/leonardoz/patterns/producer_consumer/ProducerConsumer.java b/src/main/java/br/com/leonardoz/patterns/producer_consumer/ProducerConsumer.java index 65f884c..972bc7b 100644 --- a/src/main/java/br/com/leonardoz/patterns/producer_consumer/ProducerConsumer.java +++ b/src/main/java/br/com/leonardoz/patterns/producer_consumer/ProducerConsumer.java @@ -3,7 +3,6 @@ import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @@ -28,7 +27,7 @@ public class ProducerConsumer { private Callable consumer = () -> { while (true) { - String dataUnit = data.poll(5, TimeUnit.SECONDS); + var dataUnit = data.poll(5, TimeUnit.SECONDS); if (dataUnit == null) break; System.out.println("Consumed " + dataUnit + " from " + Thread.currentThread().getName()); @@ -38,14 +37,14 @@ public class ProducerConsumer { private Callable producer = () -> { for (int i = 0; i < 90_000; i++) { - String dataUnit = UUID.randomUUID().toString(); + var dataUnit = UUID.randomUUID().toString(); data.put(dataUnit); } return null; }; public void run(long forHowLong, TimeUnit unit) throws InterruptedException { - ExecutorService pool = Executors.newCachedThreadPool(); + var pool = Executors.newCachedThreadPool(); pool.submit(producer); pool.submit(consumer); pool.submit(consumer); @@ -54,7 +53,7 @@ public void run(long forHowLong, TimeUnit unit) throws InterruptedException { } public static void main(String[] args) { - ProducerConsumer producerConsumer = new ProducerConsumer(); + var producerConsumer = new ProducerConsumer(); try { producerConsumer.run(5, TimeUnit.SECONDS); } catch (InterruptedException e) { diff --git a/src/main/java/br/com/leonardoz/patterns/resource_pool/ResourcePoolUsage.java b/src/main/java/br/com/leonardoz/patterns/resource_pool/ResourcePoolUsage.java index cc12cee..bcd12a0 100644 --- a/src/main/java/br/com/leonardoz/patterns/resource_pool/ResourcePoolUsage.java +++ b/src/main/java/br/com/leonardoz/patterns/resource_pool/ResourcePoolUsage.java @@ -2,7 +2,6 @@ import java.util.Arrays; import java.util.Random; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** @@ -12,16 +11,16 @@ */ public class ResourcePoolUsage { public static void main(String[] args) { - ExecutorService executor = Executors.newCachedThreadPool(); - ResourcePool pool = new ResourcePool<>(15, + var executor = Executors.newCachedThreadPool(); + var pool = new ResourcePool(15, Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 0, 10, 11, 12, 13, 14)); - Random r = new Random(); + var random = new Random(); for (int i = 0; i < 30; i++) { executor.execute(() -> { try { - Integer value = pool.get(60); + var value = pool.get(60); System.out.println("Value taken: " + value); - Thread.sleep(r.nextInt(5000)); + Thread.sleep(random.nextInt(5000)); pool.release(value); System.out.println("Value released " + value); } catch (InterruptedException e) { diff --git a/src/main/java/br/com/leonardoz/patterns/task_cancel/BackgroundTimePrintTask.java b/src/main/java/br/com/leonardoz/patterns/task_cancel/BackgroundTimePrintTask.java index 738703a..f9b7903 100644 --- a/src/main/java/br/com/leonardoz/patterns/task_cancel/BackgroundTimePrintTask.java +++ b/src/main/java/br/com/leonardoz/patterns/task_cancel/BackgroundTimePrintTask.java @@ -14,7 +14,7 @@ public class BackgroundTimePrintTask { private Thread thread; private Runnable task = () -> { while (!Thread.currentThread().isInterrupted()) { - Date date = new Date(System.currentTimeMillis()); + var date = new Date(System.currentTimeMillis()); System.out.println(new SimpleDateFormat().format(date)); try { Thread.sleep(1000); @@ -37,13 +37,13 @@ public void cancel() { } public static void main(String[] args) { - BackgroundTimePrintTask bttt = new BackgroundTimePrintTask(); - bttt.run(); + var self = new BackgroundTimePrintTask(); + self.run(); try { Thread.sleep(4000); } catch (InterruptedException e) { e.printStackTrace(); } - bttt.cancel(); + self.cancel(); } } diff --git a/src/main/java/br/com/leonardoz/patterns/task_cancel/ThreadTaskCancel.java b/src/main/java/br/com/leonardoz/patterns/task_cancel/ThreadTaskCancel.java index 60be10c..25e53e2 100644 --- a/src/main/java/br/com/leonardoz/patterns/task_cancel/ThreadTaskCancel.java +++ b/src/main/java/br/com/leonardoz/patterns/task_cancel/ThreadTaskCancel.java @@ -19,7 +19,7 @@ public class ThreadTaskCancel { private Thread thread; private Runnable task = () -> { while (!Thread.currentThread().isInterrupted()) { - // keep going + // keep going - be aware of using this Pattern with the Interrupted exception! It won't work. } }; diff --git a/src/main/java/br/com/leonardoz/patterns/task_convergence/TaskConvergence.java b/src/main/java/br/com/leonardoz/patterns/task_convergence/TaskConvergence.java index 534ab0c..0b56494 100644 --- a/src/main/java/br/com/leonardoz/patterns/task_convergence/TaskConvergence.java +++ b/src/main/java/br/com/leonardoz/patterns/task_convergence/TaskConvergence.java @@ -8,6 +8,7 @@ import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; /** * Pattern: Task Convergence @@ -32,10 +33,10 @@ public class TaskConvergence { private ExecutorService executor; private Runnable run = () -> { - Random random = new Random(); - List results = new LinkedList<>(); + var random = new Random(); + var results = new LinkedList(); for (int i = 0; i < ITERS; i++) { - Long next = (long) random.nextInt(BOUND); + var next = (long) random.nextInt(BOUND); results.add(next); } try { @@ -60,13 +61,19 @@ public class TaskConvergence { public TaskConvergence() { barrier = new CyclicBarrier(CORES, onComplete); synchronizedLinkedList = Collections.synchronizedList(new LinkedList<>()); - executor = Executors.newFixedThreadPool(4); + executor = Executors.newFixedThreadPool(CORES); } public void run() { for (int i = 0; i < CORES; i++) { executor.execute(run); } + try { + executor.awaitTermination(5, TimeUnit.SECONDS); + executor.shutdown(); + } catch (InterruptedException e) { + e.printStackTrace(); + } } private void persist(List randomNumbers) { diff --git a/src/main/java/br/com/leonardoz/patterns/task_execution/BackgroundTaskExecutor.java b/src/main/java/br/com/leonardoz/patterns/task_execution/BackgroundTaskExecutor.java index ea90d29..599df79 100644 --- a/src/main/java/br/com/leonardoz/patterns/task_execution/BackgroundTaskExecutor.java +++ b/src/main/java/br/com/leonardoz/patterns/task_execution/BackgroundTaskExecutor.java @@ -43,22 +43,26 @@ public BackgroundTaskExecutor(int threadsForTasks) { } public Future execute(Callable task) { - Future submited = executor.submit(task); + var submited = executor.submit(task); return submited; } public List> execute(List> tasks) { - List> futureTasks = tasks.stream().map(executor::submit).collect(Collectors.toList()); + var futureTasks = tasks.stream() + .map(executor::submit) + .collect(Collectors.toList()); return futureTasks; } public boolean cancel(Future task) { - boolean canceled = task.cancel(true); + var canceled = task.cancel(true); return canceled; } public boolean cancel(List> task) { - boolean hasAFalse = task.stream().map(f -> f.cancel(true)).anyMatch(b -> b.equals(false)); + var hasAFalse = task.stream() + .map(f -> f.cancel(true)) + .anyMatch(b -> b.equals(false)); return !hasAFalse; } @@ -71,7 +75,9 @@ public List> completeTask(List> tasks, OnInterruption< return Optional.empty(); } }; - List> results = tasks.stream().map(fn).collect(Collectors.toList()); + var results = tasks.stream() + .map(fn) + .collect(Collectors.toList()); return results; } @@ -94,7 +100,7 @@ public void shutdownTasks(long timeout, TimeUnit timeUnit, OnShutdownError onShu } public List shutdownNowTasks(long timeout, TimeUnit timeUnit, OnShutdownError onShutdownError) { - List remainingTasks = executor.shutdownNow(); + var remainingTasks = executor.shutdownNow(); try { executor.awaitTermination(timeout, timeUnit); } catch (InterruptedException e) { diff --git a/src/main/java/br/com/leonardoz/patterns/thread_safe/shared_state/VisitCounter.java b/src/main/java/br/com/leonardoz/patterns/thread_safe/shared_state/VisitCounter.java index 2fd4048..006744b 100644 --- a/src/main/java/br/com/leonardoz/patterns/thread_safe/shared_state/VisitCounter.java +++ b/src/main/java/br/com/leonardoz/patterns/thread_safe/shared_state/VisitCounter.java @@ -1,6 +1,5 @@ package br.com.leonardoz.patterns.thread_safe.shared_state; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import br.com.leonardoz.patterns.GuardedBy; @@ -30,8 +29,8 @@ public synchronized void decrease() { } public static void main(String[] args) { - VisitCounter counter = new VisitCounter(); - ExecutorService threadPool = Executors.newCachedThreadPool(); + var counter = new VisitCounter(); + var threadPool = Executors.newCachedThreadPool(); for (int i = 1; i <= 50; i++) { System.out.println("value " + counter.actualValue() + " i " + i); threadPool.execute(() -> counter.increase());