diff --git a/README.md b/README.md index e48de1d..24a65f6 100644 --- a/README.md +++ b/README.md @@ -510,7 +510,52 @@ and there are also gains to this different mode of operation: However, with batch execution control comes responsibility! If you forget to make the call to `dispatch()` then the futures in the load request queue will never be batched, and thus _will never complete_! So be careful when crafting your loader designs. -## Scheduled Dispatching +## The BatchLoader Scheduler + +By default, when `dataLoader.dispatch()` is called, the `BatchLoader` / `MappedBatchLoader` function will be invoked +immediately. + +However, you can provide your own `BatchLoaderScheduler` that allows this call to be done some time into +the future. + +You will be passed a callback (`ScheduledBatchLoaderCall` / `ScheduledMapBatchLoaderCall`) and you are expected +to eventually call this callback method to make the batch loading happen. + +The following is a `BatchLoaderScheduler` that waits 10 milliseconds before invoking the batch loading functions. + +```java + new BatchLoaderScheduler() { + + @Override + public CompletionStage> scheduleBatchLoader(ScheduledBatchLoaderCall scheduledCall, List keys, BatchLoaderEnvironment environment) { + return CompletableFuture.supplyAsync(() -> { + snooze(10); + return scheduledCall.invoke(); + }).thenCompose(Function.identity()); + } + + @Override + public CompletionStage> scheduleMappedBatchLoader(ScheduledMappedBatchLoaderCall scheduledCall, List keys, BatchLoaderEnvironment environment) { + return CompletableFuture.supplyAsync(() -> { + snooze(10); + return scheduledCall.invoke(); + }).thenCompose(Function.identity()); + } + }; +``` + +You are given the keys to be loaded and an optional `BatchLoaderEnvironment` for informative purposes. You can't change the list of +keys that will be loaded via this mechanism say. + +Also note, because there is a max batch size, it is possible for this scheduling to happen N times for a given `dispatch()` +call. The total set of keys will be sliced into batches themselves and then the `BatchLoaderScheduler` will be called for +each batch of keys. + +Do not assume that a single call to `dispatch()` results in a single call to `BatchLoaderScheduler`. + +This code is inspired from the scheduling code in the [reference JS implementation](https://github.com/graphql/dataloader#batch-scheduling) + +## Scheduled Registry Dispatching `ScheduledDataLoaderRegistry` is a registry that allows for dispatching to be done on a schedule. It contains a predicate that is evaluated (per data loader contained within) when `dispatchAll` is invoked. @@ -538,6 +583,95 @@ since it was last dispatched". The above acts as a kind of minimum batch depth, with a time overload. It won't dispatch if the loader depth is less than or equal to 10 but if 200ms pass it will dispatch. +## Chaining DataLoader calls + +It's natural to want to have chained `DataLoader` calls. + +```java + CompletableFuture chainedCalls = dataLoaderA.load("user1") + .thenCompose(userAsKey -> dataLoaderB.load(userAsKey)); +``` + +However, the challenge here is how to be efficient in batching terms. + +This is discussed in detail in the https://github.com/graphql-java/java-dataloader/issues/54 issue. + +Since CompletableFuture's are async and can complete at some time in the future, when is the best time to call +`dispatch` again when a load call has completed to maximize batching? + +The most naive approach is to immediately dispatch the second chained call as follows : + +```java + CompletableFuture chainedWithImmediateDispatch = dataLoaderA.load("user1") + .thenCompose(userAsKey -> { + CompletableFuture loadB = dataLoaderB.load(userAsKey); + dataLoaderB.dispatch(); + return loadB; + }); +``` + +The above will work however the window of batching together multiple calls to `dataLoaderB` will be very small and since +it will likely result in batch sizes of 1. + +This is a very difficult problem to solve because you have to balance two competing design ideals which is to maximize the +batching window of secondary calls in a small window of time so you customer requests don't take longer than necessary. + +* If the batching window is wide you will maximize the number of keys presented to a `BatchLoader` but your request latency will increase. + +* If the batching window is narrow you will reduce your request latency, but also you will reduce the number of keys presented to a `BatchLoader`. + + +### ScheduledDataLoaderRegistry ticker mode + +The `ScheduledDataLoaderRegistry` offers one solution to this called "ticker mode" where it will continually reschedule secondary +`DataLoader` calls after the initial `dispatch()` call is made. + +The batch window of time is controlled by the schedule duration setup at when the `ScheduledDataLoaderRegistry` is created. + +```java + ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); + + ScheduledDataLoaderRegistry registry = ScheduledDataLoaderRegistry.newScheduledRegistry() + .register("a", dataLoaderA) + .register("b", dataLoaderB) + .scheduledExecutorService(executorService) + .schedule(Duration.ofMillis(10)) + .tickerMode(true) // ticker mode is on + .build(); + + CompletableFuture chainedCalls = dataLoaderA.load("user1") + .thenCompose(userAsKey -> dataLoaderB.load(userAsKey)); + +``` +When ticker mode is on the chained dataloader calls will complete but the batching window size will depend on how quickly +the first level of `DataLoader` calls returned compared to the `schedule` of the `ScheduledDataLoaderRegistry`. + +If you use ticker mode, then you MUST `registry.close()` on the `ScheduledDataLoaderRegistry` at the end of the request (say) otherwise +it will continue to reschedule tasks to the `ScheduledExecutorService` associated with the registry. + +You will want to look at sharing the `ScheduledExecutorService` in some way between requests when creating the `ScheduledDataLoaderRegistry` +otherwise you will be creating a thread per `ScheduledDataLoaderRegistry` instance created and with enough concurrent requests +you may create too many threads. + +### ScheduledDataLoaderRegistry dispatching algorithm + +When ticker mode is **false** the `ScheduledDataLoaderRegistry` algorithm is as follows : + +* Nothing starts scheduled - some code must call `registry.dispatchAll()` a first time +* Then for every `DataLoader` in the registry + * The `DispatchPredicate` is called to test if the data loader should be dispatched + * if it returns **false** then a task is scheduled to re-evaluate this specific dataloader in the near future + * If it returns **true**, then `dataLoader.dispatch()` is called and the dataloader is not rescheduled again +* The re-evaluation tasks are run periodically according to the `registry.getScheduleDuration()` + +When ticker mode is **true** the `ScheduledDataLoaderRegistry` algorithm is as follows: + +* Nothing starts scheduled - some code must call `registry.dispatchAll()` a first time +* Then for every `DataLoader` in the registry + * The `DispatchPredicate` is called to test if the data loader should be dispatched + * if it returns **false** then a task is scheduled to re-evaluate this specific dataloader in the near future + * If it returns **true**, then `dataLoader.dispatch()` is called **and** a task is scheduled to re-evaluate this specific dataloader in the near future +* The re-evaluation tasks are run periodically according to the `registry.getScheduleDuration()` ## Other information sources diff --git a/build.gradle b/build.gradle index 8d8c392..f5064ed 100644 --- a/build.gradle +++ b/build.gradle @@ -20,7 +20,7 @@ def getDevelopmentVersion() { println "git hash is empty: error: ${error.toString()}" throw new IllegalStateException("git hash could not be determined") } - def version = new SimpleDateFormat('yyyy-MM-dd\'T\'HH-mm-ss').format(new Date()) + "-" + gitHash + def version = "0.0.0-" + new SimpleDateFormat('yyyy-MM-dd\'T\'HH-mm-ss').format(new Date()) + "-" + gitHash println "created development version: $version" version } diff --git a/src/main/java/org/dataloader/DataLoaderHelper.java b/src/main/java/org/dataloader/DataLoaderHelper.java index 883189c..67db4e2 100644 --- a/src/main/java/org/dataloader/DataLoaderHelper.java +++ b/src/main/java/org/dataloader/DataLoaderHelper.java @@ -3,6 +3,7 @@ import org.dataloader.annotations.GuardedBy; import org.dataloader.annotations.Internal; import org.dataloader.impl.CompletableFutureKit; +import org.dataloader.scheduler.BatchLoaderScheduler; import org.dataloader.stats.StatisticsCollector; import org.dataloader.stats.context.IncrementBatchLoadCountByStatisticsContext; import org.dataloader.stats.context.IncrementBatchLoadExceptionCountStatisticsContext; @@ -417,10 +418,23 @@ CompletableFuture> invokeLoader(List keys, List keyContexts) @SuppressWarnings("unchecked") private CompletableFuture> invokeListBatchLoader(List keys, BatchLoaderEnvironment environment) { CompletionStage> loadResult; + BatchLoaderScheduler batchLoaderScheduler = loaderOptions.getBatchLoaderScheduler(); if (batchLoadFunction instanceof BatchLoaderWithContext) { - loadResult = ((BatchLoaderWithContext) batchLoadFunction).load(keys, environment); + BatchLoaderWithContext loadFunction = (BatchLoaderWithContext) batchLoadFunction; + if (batchLoaderScheduler != null) { + BatchLoaderScheduler.ScheduledBatchLoaderCall loadCall = () -> loadFunction.load(keys, environment); + loadResult = batchLoaderScheduler.scheduleBatchLoader(loadCall, keys, environment); + } else { + loadResult = loadFunction.load(keys, environment); + } } else { - loadResult = ((BatchLoader) batchLoadFunction).load(keys); + BatchLoader loadFunction = (BatchLoader) batchLoadFunction; + if (batchLoaderScheduler != null) { + BatchLoaderScheduler.ScheduledBatchLoaderCall loadCall = () -> loadFunction.load(keys); + loadResult = batchLoaderScheduler.scheduleBatchLoader(loadCall, keys, null); + } else { + loadResult = loadFunction.load(keys); + } } return nonNull(loadResult, () -> "Your batch loader function MUST return a non null CompletionStage").toCompletableFuture(); } @@ -434,10 +448,23 @@ private CompletableFuture> invokeListBatchLoader(List keys, BatchLoad private CompletableFuture> invokeMapBatchLoader(List keys, BatchLoaderEnvironment environment) { CompletionStage> loadResult; Set setOfKeys = new LinkedHashSet<>(keys); + BatchLoaderScheduler batchLoaderScheduler = loaderOptions.getBatchLoaderScheduler(); if (batchLoadFunction instanceof MappedBatchLoaderWithContext) { - loadResult = ((MappedBatchLoaderWithContext) batchLoadFunction).load(setOfKeys, environment); + MappedBatchLoaderWithContext loadFunction = (MappedBatchLoaderWithContext) batchLoadFunction; + if (batchLoaderScheduler != null) { + BatchLoaderScheduler.ScheduledMappedBatchLoaderCall loadCall = () -> loadFunction.load(setOfKeys, environment); + loadResult = batchLoaderScheduler.scheduleMappedBatchLoader(loadCall, keys, environment); + } else { + loadResult = loadFunction.load(setOfKeys, environment); + } } else { - loadResult = ((MappedBatchLoader) batchLoadFunction).load(setOfKeys); + MappedBatchLoader loadFunction = (MappedBatchLoader) batchLoadFunction; + if (batchLoaderScheduler != null) { + BatchLoaderScheduler.ScheduledMappedBatchLoaderCall loadCall = () -> loadFunction.load(setOfKeys); + loadResult = batchLoaderScheduler.scheduleMappedBatchLoader(loadCall, keys, null); + } else { + loadResult = loadFunction.load(setOfKeys); + } } CompletableFuture> mapBatchLoad = nonNull(loadResult, () -> "Your batch loader function MUST return a non null CompletionStage").toCompletableFuture(); return mapBatchLoad.thenApply(map -> { diff --git a/src/main/java/org/dataloader/DataLoaderOptions.java b/src/main/java/org/dataloader/DataLoaderOptions.java index 4c79296..bac9476 100644 --- a/src/main/java/org/dataloader/DataLoaderOptions.java +++ b/src/main/java/org/dataloader/DataLoaderOptions.java @@ -18,6 +18,7 @@ import org.dataloader.annotations.PublicApi; import org.dataloader.impl.Assertions; +import org.dataloader.scheduler.BatchLoaderScheduler; import org.dataloader.stats.NoOpStatisticsCollector; import org.dataloader.stats.StatisticsCollector; @@ -46,6 +47,7 @@ public class DataLoaderOptions { private Supplier statisticsCollector; private BatchLoaderContextProvider environmentProvider; private ValueCacheOptions valueCacheOptions; + private BatchLoaderScheduler batchLoaderScheduler; /** * Creates a new data loader options with default settings. @@ -58,6 +60,7 @@ public DataLoaderOptions() { statisticsCollector = NoOpStatisticsCollector::new; environmentProvider = NULL_PROVIDER; valueCacheOptions = ValueCacheOptions.newOptions(); + batchLoaderScheduler = null; } /** @@ -77,6 +80,7 @@ public DataLoaderOptions(DataLoaderOptions other) { this.statisticsCollector = other.statisticsCollector; this.environmentProvider = other.environmentProvider; this.valueCacheOptions = other.valueCacheOptions; + batchLoaderScheduler = other.batchLoaderScheduler; } /** @@ -304,4 +308,24 @@ public DataLoaderOptions setValueCacheOptions(ValueCacheOptions valueCacheOption this.valueCacheOptions = Assertions.nonNull(valueCacheOptions); return this; } + + /** + * @return the {@link BatchLoaderScheduler} to use, which can be null + */ + public BatchLoaderScheduler getBatchLoaderScheduler() { + return batchLoaderScheduler; + } + + /** + * Sets in a new {@link BatchLoaderScheduler} that allows the call to a {@link BatchLoader} function to be scheduled + * to some future time. + * + * @param batchLoaderScheduler the scheduler + * + * @return the data loader options for fluent coding + */ + public DataLoaderOptions setBatchLoaderScheduler(BatchLoaderScheduler batchLoaderScheduler) { + this.batchLoaderScheduler = batchLoaderScheduler; + return this; + } } diff --git a/src/main/java/org/dataloader/DataLoaderRegistry.java b/src/main/java/org/dataloader/DataLoaderRegistry.java index 9b19c29..0bc54cb 100644 --- a/src/main/java/org/dataloader/DataLoaderRegistry.java +++ b/src/main/java/org/dataloader/DataLoaderRegistry.java @@ -14,7 +14,7 @@ import java.util.function.Function; /** - * This allows data loaders to be registered together into a single place so + * This allows data loaders to be registered together into a single place, so * they can be dispatched as one. It also allows you to retrieve data loaders by * name from a central place */ diff --git a/src/main/java/org/dataloader/Try.java b/src/main/java/org/dataloader/Try.java index 3f9a129..c7eac67 100644 --- a/src/main/java/org/dataloader/Try.java +++ b/src/main/java/org/dataloader/Try.java @@ -167,7 +167,7 @@ public V get() { */ public Throwable getThrowable() { if (isSuccess()) { - throw new UnsupportedOperationException("You have called Try.getThrowable() with a failed Try", throwable); + throw new UnsupportedOperationException("You have called Try.getThrowable() with a successful Try"); } return throwable; } diff --git a/src/main/java/org/dataloader/annotations/GuardedBy.java b/src/main/java/org/dataloader/annotations/GuardedBy.java index c26b2ef..85c5765 100644 --- a/src/main/java/org/dataloader/annotations/GuardedBy.java +++ b/src/main/java/org/dataloader/annotations/GuardedBy.java @@ -15,7 +15,7 @@ public @interface GuardedBy { /** - * The lock that should be held. + * @return The lock that should be held. */ String value(); } diff --git a/src/main/java/org/dataloader/registries/DispatchPredicate.java b/src/main/java/org/dataloader/registries/DispatchPredicate.java index d5bd31b..247a51a 100644 --- a/src/main/java/org/dataloader/registries/DispatchPredicate.java +++ b/src/main/java/org/dataloader/registries/DispatchPredicate.java @@ -10,6 +10,16 @@ */ @FunctionalInterface public interface DispatchPredicate { + + /** + * A predicate that always returns true + */ + DispatchPredicate DISPATCH_ALWAYS = (dataLoaderKey, dataLoader) -> true; + /** + * A predicate that always returns false + */ + DispatchPredicate DISPATCH_NEVER = (dataLoaderKey, dataLoader) -> false; + /** * This predicate tests whether the data loader should be dispatched or not. * diff --git a/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java b/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java index 4be317e..fada66d 100644 --- a/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java +++ b/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java @@ -5,8 +5,9 @@ import org.dataloader.annotations.ExperimentalApi; import java.time.Duration; -import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -14,32 +15,62 @@ import static org.dataloader.impl.Assertions.nonNull; /** - * This {@link DataLoaderRegistry} will use a {@link DispatchPredicate} when {@link #dispatchAll()} is called + * This {@link DataLoaderRegistry} will use {@link DispatchPredicate}s when {@link #dispatchAll()} is called * to test (for each {@link DataLoader} in the registry) if a dispatch should proceed. If the predicate returns false, then a task is scheduled * to perform that predicate dispatch again via the {@link ScheduledExecutorService}. *

+ * It;s possible to have a {@link DispatchPredicate} per dataloader as well as a default {@link DispatchPredicate} for the + * whole {@link ScheduledDataLoaderRegistry}. + *

* This will continue to loop (test false and reschedule) until such time as the predicate returns true, in which case - * no rescheduling will occur and you will need to call dispatch again to restart the process. + * no rescheduling will occur, and you will need to call dispatch again to restart the process. + *

+ * In the default mode, when {@link #tickerMode} is false, the registry will continue to loop (test false and reschedule) until such time as the predicate returns true, in which case + * no rescheduling will occur, and you will need to call dispatch again to restart the process. + *

+ * However, when {@link #tickerMode} is true, the registry will always reschedule continuously after the first ever call to {@link #dispatchAll()}. + *

+ * This will allow you to chain together {@link DataLoader} load calls like this : + *

{@code
+ *   CompletableFuture future = dataLoaderA.load("A")
+ *                                          .thenCompose(value -> dataLoaderB.load(value));
+ * }
+ *

+ * However, it may mean your batching will not be as efficient as it might be. In environments + * like graphql this might mean you are too eager in fetching. The {@link DispatchPredicate} still runs to decide if + * dispatch should happen however in ticker mode it will be continuously rescheduled. + *

+ * When {@link #tickerMode} is true, you really SHOULD close the registry say at the end of a request otherwise you will leave a job + * on the {@link ScheduledExecutorService} that is continuously dispatching. *

* If you wanted to create a ScheduledDataLoaderRegistry that started a rescheduling immediately, just create one and * call {@link #rescheduleNow()}. *

+ * By default, it uses a {@link Executors#newSingleThreadScheduledExecutor()}} to schedule the tasks. However, if you + * are creating a {@link ScheduledDataLoaderRegistry} per request you will want to look at sharing this {@link ScheduledExecutorService} + * to avoid creating a new thread per registry created. + *

* This code is currently marked as {@link ExperimentalApi} */ @ExperimentalApi public class ScheduledDataLoaderRegistry extends DataLoaderRegistry implements AutoCloseable { - private final ScheduledExecutorService scheduledExecutorService; + private final Map, DispatchPredicate> dataLoaderPredicates = new ConcurrentHashMap<>(); private final DispatchPredicate dispatchPredicate; + private final ScheduledExecutorService scheduledExecutorService; private final Duration schedule; + private final boolean tickerMode; private volatile boolean closed; private ScheduledDataLoaderRegistry(Builder builder) { + super(); this.dataLoaders.putAll(builder.dataLoaders); this.scheduledExecutorService = builder.scheduledExecutorService; - this.dispatchPredicate = builder.dispatchPredicate; this.schedule = builder.schedule; + this.tickerMode = builder.tickerMode; this.closed = false; + this.dispatchPredicate = builder.dispatchPredicate; + this.dataLoaderPredicates.putAll(builder.dataLoaderPredicates); } /** @@ -57,6 +88,76 @@ public Duration getScheduleDuration() { return schedule; } + /** + * @return true of the registry is in ticker mode or false otherwise + */ + public boolean isTickerMode() { + return tickerMode; + } + + /** + * This will combine all the current data loaders in this registry and all the data loaders from the specified registry + * and return a new combined registry + * + * @param registry the registry to combine into this registry + * + * @return a new combined registry + */ + public ScheduledDataLoaderRegistry combine(DataLoaderRegistry registry) { + Builder combinedBuilder = ScheduledDataLoaderRegistry.newScheduledRegistry() + .dispatchPredicate(this.dispatchPredicate); + combinedBuilder.registerAll(this); + combinedBuilder.registerAll(registry); + return combinedBuilder.build(); + } + + + /** + * This will unregister a new dataloader + * + * @param key the key of the data loader to unregister + * + * @return this registry + */ + public ScheduledDataLoaderRegistry unregister(String key) { + DataLoader dataLoader = dataLoaders.remove(key); + if (dataLoader != null) { + dataLoaderPredicates.remove(dataLoader); + } + return this; + } + + /** + * @return a map of data loaders to specific dispatch predicates + */ + public Map, DispatchPredicate> getDataLoaderPredicates() { + return new LinkedHashMap<>(dataLoaderPredicates); + } + + /** + * There is a default predicate that applies to the whole {@link ScheduledDataLoaderRegistry} + * + * @return the default dispatch predicate + */ + public DispatchPredicate getDispatchPredicate() { + return dispatchPredicate; + } + + /** + * This will register a new dataloader and dispatch predicate associated with that data loader + * + * @param key the key to put the data loader under + * @param dataLoader the data loader to register + * @param dispatchPredicate the dispatch predicate to associate with this data loader + * + * @return this registry + */ + public ScheduledDataLoaderRegistry register(String key, DataLoader dataLoader, DispatchPredicate dispatchPredicate) { + dataLoaders.put(key, dataLoader); + dataLoaderPredicates.put(dataLoader, dispatchPredicate); + return this; + } + @Override public void dispatchAll() { dispatchAllWithCount(); @@ -68,33 +169,33 @@ public int dispatchAllWithCount() { for (Map.Entry> entry : dataLoaders.entrySet()) { DataLoader dataLoader = entry.getValue(); String key = entry.getKey(); - if (dispatchPredicate.test(key, dataLoader)) { - sum += dataLoader.dispatchWithCounts().getKeysCount(); - } else { - reschedule(key, dataLoader); - } + sum += dispatchOrReschedule(key, dataLoader); } return sum; } + /** * This will immediately dispatch the {@link DataLoader}s in the registry - * without testing the predicate + * without testing the predicates */ public void dispatchAllImmediately() { - super.dispatchAll(); + dispatchAllWithCountImmediately(); } /** * This will immediately dispatch the {@link DataLoader}s in the registry - * without testing the predicate + * without testing the predicates * * @return total number of entries that were dispatched from registered {@link org.dataloader.DataLoader}s. */ public int dispatchAllWithCountImmediately() { - return super.dispatchAllWithCount(); + return dataLoaders.values().stream() + .mapToInt(dataLoader -> dataLoader.dispatchWithCounts().getKeysCount()) + .sum(); } + /** * This will schedule a task to check the predicate and dispatch if true right now. It will not do * a pre check of the preodicate like {@link #dispatchAll()} would @@ -103,6 +204,25 @@ public void rescheduleNow() { dataLoaders.forEach(this::reschedule); } + /** + * Returns true if the dataloader has a predicate which returned true, OR the overall + * registry predicate returned true. + * + * @param dataLoaderKey the key in the dataloader map + * @param dataLoader the dataloader + * + * @return true if it should dispatch + */ + private boolean shouldDispatch(String dataLoaderKey, DataLoader dataLoader) { + DispatchPredicate dispatchPredicate = dataLoaderPredicates.get(dataLoader); + if (dispatchPredicate != null) { + if (dispatchPredicate.test(dataLoaderKey, dataLoader)) { + return true; + } + } + return this.dispatchPredicate.test(dataLoaderKey, dataLoader); + } + private void reschedule(String key, DataLoader dataLoader) { if (!closed) { Runnable runThis = () -> dispatchOrReschedule(key, dataLoader); @@ -110,17 +230,21 @@ private void reschedule(String key, DataLoader dataLoader) { } } - private void dispatchOrReschedule(String key, DataLoader dataLoader) { - if (dispatchPredicate.test(key, dataLoader)) { - dataLoader.dispatch(); - } else { + private int dispatchOrReschedule(String key, DataLoader dataLoader) { + int sum = 0; + boolean shouldDispatch = shouldDispatch(key, dataLoader); + if (shouldDispatch) { + sum = dataLoader.dispatchWithCounts().getKeysCount(); + } + if (tickerMode || !shouldDispatch) { reschedule(key, dataLoader); } + return sum; } /** - * By default this will create use a {@link Executors#newSingleThreadScheduledExecutor()} - * and a schedule duration of 10 milli seconds. + * By default, this will create use a {@link Executors#newSingleThreadScheduledExecutor()} + * and a schedule duration of 10 milliseconds. * * @return A builder of {@link ScheduledDataLoaderRegistry}s */ @@ -130,10 +254,12 @@ public static Builder newScheduledRegistry() { public static class Builder { + private final Map> dataLoaders = new LinkedHashMap<>(); + private final Map, DispatchPredicate> dataLoaderPredicates = new LinkedHashMap<>(); + private DispatchPredicate dispatchPredicate = DispatchPredicate.DISPATCH_ALWAYS; private ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); - private DispatchPredicate dispatchPredicate = (key, dl) -> true; private Duration schedule = Duration.ofMillis(10); - private final Map> dataLoaders = new HashMap<>(); + private boolean tickerMode = false; public Builder scheduledExecutorService(ScheduledExecutorService executorService) { this.scheduledExecutorService = nonNull(executorService); @@ -145,11 +271,6 @@ public Builder schedule(Duration schedule) { return this; } - public Builder dispatchPredicate(DispatchPredicate dispatchPredicate) { - this.dispatchPredicate = nonNull(dispatchPredicate); - return this; - } - /** * This will register a new dataloader * @@ -163,8 +284,24 @@ public Builder register(String key, DataLoader dataLoader) { return this; } + + /** + * This will register a new dataloader with a specific {@link DispatchPredicate} + * + * @param key the key to put the data loader under + * @param dataLoader the data loader to register + * @param dispatchPredicate the dispatch predicate + * + * @return this builder for a fluent pattern + */ + public Builder register(String key, DataLoader dataLoader, DispatchPredicate dispatchPredicate) { + register(key, dataLoader); + dataLoaderPredicates.put(dataLoader, dispatchPredicate); + return this; + } + /** - * This will combine together the data loaders in this builder with the ones + * This will combine the data loaders in this builder with the ones * from a previous {@link DataLoaderRegistry} * * @param otherRegistry the previous {@link DataLoaderRegistry} @@ -173,6 +310,37 @@ public Builder register(String key, DataLoader dataLoader) { */ public Builder registerAll(DataLoaderRegistry otherRegistry) { dataLoaders.putAll(otherRegistry.getDataLoadersMap()); + if (otherRegistry instanceof ScheduledDataLoaderRegistry) { + ScheduledDataLoaderRegistry other = (ScheduledDataLoaderRegistry) otherRegistry; + dataLoaderPredicates.putAll(other.dataLoaderPredicates); + } + return this; + } + + /** + * This sets a default predicate on the {@link DataLoaderRegistry} that will control + * whether all {@link DataLoader}s in the {@link DataLoaderRegistry }should be dispatched. + * + * @param dispatchPredicate the predicate + * + * @return this builder for a fluent pattern + */ + public Builder dispatchPredicate(DispatchPredicate dispatchPredicate) { + this.dispatchPredicate = dispatchPredicate; + return this; + } + + /** + * This sets ticker mode on the registry. When ticker mode is true the registry will + * continuously reschedule the data loaders for possible dispatching after the first call + * to dispatchAll. + * + * @param tickerMode true or false + * + * @return this builder for a fluent pattern + */ + public Builder tickerMode(boolean tickerMode) { + this.tickerMode = tickerMode; return this; } diff --git a/src/main/java/org/dataloader/scheduler/BatchLoaderScheduler.java b/src/main/java/org/dataloader/scheduler/BatchLoaderScheduler.java new file mode 100644 index 0000000..bcebfa0 --- /dev/null +++ b/src/main/java/org/dataloader/scheduler/BatchLoaderScheduler.java @@ -0,0 +1,74 @@ +package org.dataloader.scheduler; + +import org.dataloader.BatchLoader; +import org.dataloader.BatchLoaderEnvironment; +import org.dataloader.DataLoader; +import org.dataloader.DataLoaderOptions; +import org.dataloader.MappedBatchLoader; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletionStage; + +/** + * By default, when {@link DataLoader#dispatch()} is called, the {@link BatchLoader} / {@link MappedBatchLoader} function will be invoked + * immediately. However, you can provide your own {@link BatchLoaderScheduler} that allows this call to be done some time into + * the future. You will be passed a callback ({@link ScheduledBatchLoaderCall} / {@link ScheduledMappedBatchLoaderCall} and you are expected + * to eventually call this callback method to make the batch loading happen. + *

+ * Note: Because there is a {@link DataLoaderOptions#maxBatchSize()} it is possible for this scheduling to happen N times for a given {@link DataLoader#dispatch()} + * call. The total set of keys will be sliced into batches themselves and then the {@link BatchLoaderScheduler} will be called for + * each batch of keys. Do not assume that a single call to {@link DataLoader#dispatch()} results in a single call to {@link BatchLoaderScheduler}. + */ +public interface BatchLoaderScheduler { + + + /** + * This represents a callback that will invoke a {@link BatchLoader} function under the covers + * + * @param the value type + */ + interface ScheduledBatchLoaderCall { + CompletionStage> invoke(); + } + + /** + * This represents a callback that will invoke a {@link MappedBatchLoader} function under the covers + * + * @param the key type + * @param the value type + */ + interface ScheduledMappedBatchLoaderCall { + CompletionStage> invoke(); + } + + /** + * This is called to schedule a {@link BatchLoader} call. + * + * @param scheduledCall the callback that needs to be invoked to allow the {@link BatchLoader} to proceed. + * @param keys this is the list of keys that will be passed to the {@link BatchLoader}. + * This is provided only for informative reasons and you cant change the keys that are used + * @param environment this is the {@link BatchLoaderEnvironment} in place, + * which can be null if it's a simple {@link BatchLoader} call + * @param the key type + * @param the value type + * + * @return a promise to the values that come from the {@link BatchLoader} + */ + CompletionStage> scheduleBatchLoader(ScheduledBatchLoaderCall scheduledCall, List keys, BatchLoaderEnvironment environment); + + /** + * This is called to schedule a {@link MappedBatchLoader} call. + * + * @param scheduledCall the callback that needs to be invoked to allow the {@link MappedBatchLoader} to proceed. + * @param keys this is the list of keys that will be passed to the {@link MappedBatchLoader}. + * This is provided only for informative reasons and you cant change the keys that are used + * @param environment this is the {@link BatchLoaderEnvironment} in place, + * which can be null if it's a simple {@link MappedBatchLoader} call + * @param the key type + * @param the value type + * + * @return a promise to the values that come from the {@link BatchLoader} + */ + CompletionStage> scheduleMappedBatchLoader(ScheduledMappedBatchLoaderCall scheduledCall, List keys, BatchLoaderEnvironment environment); +} diff --git a/src/test/java/ReadmeExamples.java b/src/test/java/ReadmeExamples.java index e37550e..d25dfa7 100644 --- a/src/test/java/ReadmeExamples.java +++ b/src/test/java/ReadmeExamples.java @@ -12,17 +12,22 @@ import org.dataloader.fixtures.UserManager; import org.dataloader.registries.DispatchPredicate; import org.dataloader.registries.ScheduledDataLoaderRegistry; +import org.dataloader.scheduler.BatchLoaderScheduler; import org.dataloader.stats.Statistics; import org.dataloader.stats.ThreadLocalStatisticsCollector; import java.time.Duration; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.function.Function; import java.util.stream.Collectors; import static java.lang.String.format; @@ -278,7 +283,31 @@ private void statsConfigExample() { DataLoader userDataLoader = DataLoaderFactory.newDataLoader(userBatchLoader, options); } - private void ScheduledDispatche() { + private void snooze(int i) { + } + + private void BatchLoaderSchedulerExample() { + new BatchLoaderScheduler() { + + @Override + public CompletionStage> scheduleBatchLoader(ScheduledBatchLoaderCall scheduledCall, List keys, BatchLoaderEnvironment environment) { + return CompletableFuture.supplyAsync(() -> { + snooze(10); + return scheduledCall.invoke(); + }).thenCompose(Function.identity()); + } + + @Override + public CompletionStage> scheduleMappedBatchLoader(ScheduledMappedBatchLoaderCall scheduledCall, List keys, BatchLoaderEnvironment environment) { + return CompletableFuture.supplyAsync(() -> { + snooze(10); + return scheduledCall.invoke(); + }).thenCompose(Function.identity()); + } + }; + } + + private void ScheduledDispatcher() { DispatchPredicate depthOrTimePredicate = DispatchPredicate.dispatchIfDepthGreaterThan(10) .or(DispatchPredicate.dispatchIfLongerThan(Duration.ofMillis(200))); @@ -288,4 +317,35 @@ private void ScheduledDispatche() { .register("users", userDataLoader) .build(); } + + + DataLoader dataLoaderA = DataLoaderFactory.newDataLoader(userBatchLoader); + DataLoader dataLoaderB = DataLoaderFactory.newDataLoader(keys -> { + return CompletableFuture.completedFuture(Collections.singletonList(1L)); + }); + + private void ScheduledDispatcherChained() { + CompletableFuture chainedCalls = dataLoaderA.load("user1") + .thenCompose(userAsKey -> dataLoaderB.load(userAsKey)); + + + CompletableFuture chainedWithImmediateDispatch = dataLoaderA.load("user1") + .thenCompose(userAsKey -> { + CompletableFuture loadB = dataLoaderB.load(userAsKey); + dataLoaderB.dispatch(); + return loadB; + }); + + + ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); + + ScheduledDataLoaderRegistry registry = ScheduledDataLoaderRegistry.newScheduledRegistry() + .register("a", dataLoaderA) + .register("b", dataLoaderB) + .scheduledExecutorService(executorService) + .schedule(Duration.ofMillis(10)) + .tickerMode(true) // ticker mode is on + .build(); + + } } diff --git a/src/test/java/org/dataloader/fixtures/TestKit.java b/src/test/java/org/dataloader/fixtures/TestKit.java index 5c87148..ac29a0c 100644 --- a/src/test/java/org/dataloader/fixtures/TestKit.java +++ b/src/test/java/org/dataloader/fixtures/TestKit.java @@ -1,13 +1,23 @@ package org.dataloader.fixtures; import org.dataloader.BatchLoader; +import org.dataloader.BatchLoaderWithContext; import org.dataloader.DataLoader; import org.dataloader.DataLoaderFactory; import org.dataloader.DataLoaderOptions; +import org.dataloader.MappedBatchLoader; +import org.dataloader.MappedBatchLoaderWithContext; +import java.time.Duration; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; +import java.util.LinkedHashSet; +import java.util.HashMap; import java.util.List; +import java.util.Set; +import java.util.Map; +import java.util.Set; import java.util.concurrent.CompletableFuture; import static java.util.stream.Collectors.toList; @@ -19,6 +29,27 @@ public static BatchLoader keysAsValues() { return CompletableFuture::completedFuture; } + public static BatchLoaderWithContext keysAsValuesWithContext() { + return (keys, env) -> CompletableFuture.completedFuture(keys); + } + + public static MappedBatchLoader keysAsMapOfValues() { + return keys -> mapOfKeys(keys); + } + + public static MappedBatchLoaderWithContext keysAsMapOfValuesWithContext() { + return (keys, env) -> mapOfKeys(keys); + } + + private static CompletableFuture> mapOfKeys(Set keys) { + Map map = new HashMap<>(); + for (K key : keys) { + //noinspection unchecked + map.put(key, (V) key); + } + return CompletableFuture.completedFuture(map); + } + public static BatchLoader keysAsValues(List> loadCalls) { return keys -> { List ks = new ArrayList<>(keys); @@ -31,6 +62,23 @@ public static BatchLoader keysAsValues(List> loadCalls) { }; } + public static BatchLoader keysAsValuesAsync(Duration delay) { + return keysAsValuesAsync(new ArrayList<>(), delay); + } + + public static BatchLoader keysAsValuesAsync(List> loadCalls, Duration delay) { + return keys -> CompletableFuture.supplyAsync(() -> { + snooze(delay.toMillis()); + List ks = new ArrayList<>(keys); + loadCalls.add(ks); + @SuppressWarnings("unchecked") + List values = keys.stream() + .map(k -> (V) k) + .collect(toList()); + return values; + }); + } + public static DataLoader idLoader() { return idLoader(null, new ArrayList<>()); } @@ -43,6 +91,14 @@ public static DataLoader idLoader(DataLoaderOptions options, List DataLoader idLoaderAsync(Duration delay) { + return idLoaderAsync(null, new ArrayList<>(), delay); + } + + public static DataLoader idLoaderAsync(DataLoaderOptions options, List> loadCalls, Duration delay) { + return DataLoaderFactory.newDataLoader(keysAsValuesAsync(loadCalls, delay), options); + } + public static Collection listFrom(int i, int max) { List ints = new ArrayList<>(); for (int j = i; j < max; j++) { @@ -55,7 +111,7 @@ public static CompletableFuture futureError() { return failedFuture(new IllegalStateException("Error")); } - public static void snooze(int millis) { + public static void snooze(long millis) { try { Thread.sleep(millis); } catch (InterruptedException e) { @@ -67,4 +123,12 @@ public static void snooze(int millis) { public static List sort(Collection collection) { return collection.stream().sorted().collect(toList()); } + + public static Set asSet(T... elements) { + return new LinkedHashSet<>(Arrays.asList(elements)); + } + + public static Set asSet(Collection elements) { + return new LinkedHashSet<>(elements); + } } diff --git a/src/test/java/org/dataloader/registries/ScheduledDataLoaderRegistryPredicateTest.java b/src/test/java/org/dataloader/registries/ScheduledDataLoaderRegistryPredicateTest.java new file mode 100644 index 0000000..43da82f --- /dev/null +++ b/src/test/java/org/dataloader/registries/ScheduledDataLoaderRegistryPredicateTest.java @@ -0,0 +1,242 @@ +package org.dataloader.registries; + +import org.dataloader.BatchLoader; +import org.dataloader.DataLoader; +import org.dataloader.DataLoaderRegistry; +import org.junit.Test; + +import java.time.Duration; +import java.util.concurrent.CompletableFuture; + +import static java.util.Arrays.asList; +import static org.awaitility.Awaitility.await; +import static org.dataloader.DataLoaderFactory.newDataLoader; +import static org.dataloader.fixtures.TestKit.asSet; +import static org.dataloader.registries.DispatchPredicate.DISPATCH_NEVER; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; + +public class ScheduledDataLoaderRegistryPredicateTest { + final BatchLoader identityBatchLoader = CompletableFuture::completedFuture; + + static class CountingDispatchPredicate implements DispatchPredicate { + int count = 0; + int max = 0; + + public CountingDispatchPredicate(int max) { + this.max = max; + } + + @Override + public boolean test(String dataLoaderKey, DataLoader dataLoader) { + boolean shouldFire = count >= max; + count++; + return shouldFire; + } + } + + @Test + public void predicate_registration_works() { + DataLoader dlA = newDataLoader(identityBatchLoader); + DataLoader dlB = newDataLoader(identityBatchLoader); + DataLoader dlC = newDataLoader(identityBatchLoader); + + DispatchPredicate predicateA = new CountingDispatchPredicate(1); + DispatchPredicate predicateB = new CountingDispatchPredicate(2); + DispatchPredicate predicateC = new CountingDispatchPredicate(3); + + DispatchPredicate predicateOverAll = new CountingDispatchPredicate(10); + + ScheduledDataLoaderRegistry registry = ScheduledDataLoaderRegistry.newScheduledRegistry() + .register("a", dlA, predicateA) + .register("b", dlB, predicateB) + .register("c", dlC, predicateC) + .dispatchPredicate(predicateOverAll) + .build(); + + assertThat(registry.getDataLoaders(), equalTo(asList(dlA, dlB, dlC))); + assertThat(registry.getDataLoadersMap().keySet(), equalTo(asSet("a", "b", "c"))); + assertThat(asSet(registry.getDataLoadersMap().values()), equalTo(asSet(dlA, dlB, dlC))); + assertThat(registry.getDispatchPredicate(), equalTo(predicateOverAll)); + assertThat(asSet(registry.getDataLoaderPredicates().values()), equalTo(asSet(predicateA, predicateB, predicateC))); + + // and unregister (fluently) + DataLoaderRegistry dlR = registry.unregister("c"); + assertThat(dlR, equalTo(registry)); + + assertThat(registry.getDataLoaders(), equalTo(asList(dlA, dlB))); + assertThat(registry.getDispatchPredicate(), equalTo(predicateOverAll)); + assertThat(asSet(registry.getDataLoaderPredicates().values()), equalTo(asSet(predicateA, predicateB))); + + // direct on the registry works + registry.register("c", dlC, predicateC); + assertThat(registry.getDataLoaders(), equalTo(asList(dlA, dlB, dlC))); + assertThat(registry.getDispatchPredicate(), equalTo(predicateOverAll)); + assertThat(asSet(registry.getDataLoaderPredicates().values()), equalTo(asSet(predicateA, predicateB, predicateC))); + + } + + @Test + public void predicate_firing_works() { + DataLoader dlA = newDataLoader(identityBatchLoader); + DataLoader dlB = newDataLoader(identityBatchLoader); + DataLoader dlC = newDataLoader(identityBatchLoader); + + DispatchPredicate predicateA = new CountingDispatchPredicate(1); + DispatchPredicate predicateB = new CountingDispatchPredicate(2); + DispatchPredicate predicateC = new CountingDispatchPredicate(3); + + DispatchPredicate predicateOnTen = new CountingDispatchPredicate(10); + + ScheduledDataLoaderRegistry registry = ScheduledDataLoaderRegistry.newScheduledRegistry() + .register("a", dlA, predicateA) + .register("b", dlB, predicateB) + .register("c", dlC, predicateC) + .dispatchPredicate(predicateOnTen) + .schedule(Duration.ofHours(1000)) // make this so long its never rescheduled + .build(); + + + CompletableFuture cfA = dlA.load("A"); + CompletableFuture cfB = dlB.load("B"); + CompletableFuture cfC = dlC.load("C"); + + int count = registry.dispatchAllWithCount(); // first firing + // none should fire + assertThat(count, equalTo(0)); + assertThat(cfA.isDone(), equalTo(false)); + assertThat(cfB.isDone(), equalTo(false)); + assertThat(cfC.isDone(), equalTo(false)); + + count = registry.dispatchAllWithCount(); // second firing + // one should fire + assertThat(count, equalTo(1)); + assertThat(cfA.isDone(), equalTo(true)); + assertThat(cfA.join(), equalTo("A")); + + assertThat(cfB.isDone(), equalTo(false)); + assertThat(cfC.isDone(), equalTo(false)); + + count = registry.dispatchAllWithCount(); // third firing + assertThat(count, equalTo(1)); + assertThat(cfA.isDone(), equalTo(true)); + assertThat(cfB.isDone(), equalTo(true)); + assertThat(cfB.join(), equalTo("B")); + assertThat(cfC.isDone(), equalTo(false)); + + count = registry.dispatchAllWithCount(); // fourth firing + assertThat(count, equalTo(1)); + assertThat(cfA.isDone(), equalTo(true)); + assertThat(cfB.isDone(), equalTo(true)); + assertThat(cfC.isDone(), equalTo(true)); + assertThat(cfC.join(), equalTo("C")); + } + + @Test + public void test_the_registry_overall_predicate_firing_works() { + DataLoader dlA = newDataLoader(identityBatchLoader); + DataLoader dlB = newDataLoader(identityBatchLoader); + DataLoader dlC = newDataLoader(identityBatchLoader); + + DispatchPredicate predicateOnSix = new CountingDispatchPredicate(6); + + ScheduledDataLoaderRegistry registry = ScheduledDataLoaderRegistry.newScheduledRegistry() + .register("a", dlA, DISPATCH_NEVER) + .register("b", dlB, DISPATCH_NEVER) + .register("c", dlC, DISPATCH_NEVER) + .dispatchPredicate(predicateOnSix) + .schedule(Duration.ofHours(1000)) + .build(); + + + CompletableFuture cfA = dlA.load("A"); + CompletableFuture cfB = dlB.load("B"); + CompletableFuture cfC = dlC.load("C"); + + int count = registry.dispatchAllWithCount(); // first firing + assertThat(count, equalTo(0)); + assertThat(cfA.isDone(), equalTo(false)); + assertThat(cfB.isDone(), equalTo(false)); + assertThat(cfC.isDone(), equalTo(false)); + + count = registry.dispatchAllWithCount(); // second firing but the overall been asked 6 times already + assertThat(count, equalTo(0)); + assertThat(cfA.isDone(), equalTo(false)); + assertThat(cfB.isDone(), equalTo(false)); + assertThat(cfC.isDone(), equalTo(false)); + + count = registry.dispatchAllWithCount(); // third firing but the overall been asked 9 times already + assertThat(count, equalTo(3)); + assertThat(cfA.isDone(), equalTo(true)); + assertThat(cfB.isDone(), equalTo(true)); + assertThat(cfC.isDone(), equalTo(true)); + } + + @Test + public void dispatch_immediate_firing_works() { + DataLoader dlA = newDataLoader(identityBatchLoader); + DataLoader dlB = newDataLoader(identityBatchLoader); + DataLoader dlC = newDataLoader(identityBatchLoader); + + DispatchPredicate predicateA = new CountingDispatchPredicate(1); + DispatchPredicate predicateB = new CountingDispatchPredicate(2); + DispatchPredicate predicateC = new CountingDispatchPredicate(3); + + DispatchPredicate predicateOverAll = new CountingDispatchPredicate(10); + + ScheduledDataLoaderRegistry registry = ScheduledDataLoaderRegistry.newScheduledRegistry() + .register("a", dlA, predicateA) + .register("b", dlB, predicateB) + .register("c", dlC, predicateC) + .dispatchPredicate(predicateOverAll) + .schedule(Duration.ofHours(1000)) + .build(); + + + CompletableFuture cfA = dlA.load("A"); + CompletableFuture cfB = dlB.load("B"); + CompletableFuture cfC = dlC.load("C"); + + int count = registry.dispatchAllWithCountImmediately(); // all should fire + assertThat(count, equalTo(3)); + assertThat(cfA.isDone(), equalTo(true)); + assertThat(cfA.join(), equalTo("A")); + assertThat(cfB.isDone(), equalTo(true)); + assertThat(cfB.join(), equalTo("B")); + assertThat(cfC.isDone(), equalTo(true)); + assertThat(cfC.join(), equalTo("C")); + } + + @Test + public void test_the_registry_overall_predicate_firing_works_when_on_schedule() { + DataLoader dlA = newDataLoader(identityBatchLoader); + DataLoader dlB = newDataLoader(identityBatchLoader); + DataLoader dlC = newDataLoader(identityBatchLoader); + + DispatchPredicate predicateOnTwenty = new CountingDispatchPredicate(20); + + ScheduledDataLoaderRegistry registry = ScheduledDataLoaderRegistry.newScheduledRegistry() + .register("a", dlA, DISPATCH_NEVER) + .register("b", dlB, DISPATCH_NEVER) + .register("c", dlC, DISPATCH_NEVER) + .dispatchPredicate(predicateOnTwenty) + .schedule(Duration.ofMillis(5)) + .build(); + + + CompletableFuture cfA = dlA.load("A"); + CompletableFuture cfB = dlB.load("B"); + CompletableFuture cfC = dlC.load("C"); + + int count = registry.dispatchAllWithCount(); // first firing + assertThat(count, equalTo(0)); + + // the calls will be rescheduled until eventually the counting predicate returns true + await().until(cfA::isDone, is(true)); + + assertThat(cfA.isDone(), equalTo(true)); + assertThat(cfB.isDone(), equalTo(true)); + assertThat(cfC.isDone(), equalTo(true)); + } +} diff --git a/src/test/java/org/dataloader/registries/ScheduledDataLoaderRegistryTest.java b/src/test/java/org/dataloader/registries/ScheduledDataLoaderRegistryTest.java index 527f419..5e0cd9a 100644 --- a/src/test/java/org/dataloader/registries/ScheduledDataLoaderRegistryTest.java +++ b/src/test/java/org/dataloader/registries/ScheduledDataLoaderRegistryTest.java @@ -1,6 +1,7 @@ package org.dataloader.registries; import junit.framework.TestCase; +import org.awaitility.core.ConditionTimeoutException; import org.dataloader.DataLoader; import org.dataloader.DataLoaderFactory; import org.dataloader.DataLoaderRegistry; @@ -11,13 +12,17 @@ import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import static java.util.Arrays.asList; import static java.util.Collections.singletonList; +import static org.awaitility.Awaitility.await; +import static org.awaitility.Duration.TWO_SECONDS; import static org.dataloader.fixtures.TestKit.keysAsValues; import static org.dataloader.fixtures.TestKit.snooze; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertThat; public class ScheduledDataLoaderRegistryTest extends TestCase { @@ -257,4 +262,65 @@ public void test_close_is_a_one_way_door() { snooze(200); assertEquals(counter.get(), countThen + 1); } + + public void test_can_tick_after_first_dispatch_for_chain_data_loaders() { + + // delays much bigger than the tick rate will mean multiple calls to dispatch + DataLoader dlA = TestKit.idLoaderAsync(Duration.ofMillis(100)); + DataLoader dlB = TestKit.idLoaderAsync(Duration.ofMillis(200)); + + CompletableFuture chainedCF = dlA.load("AK1").thenCompose(dlB::load); + + AtomicBoolean done = new AtomicBoolean(); + chainedCF.whenComplete((v, t) -> done.set(true)); + + ScheduledDataLoaderRegistry registry = ScheduledDataLoaderRegistry.newScheduledRegistry() + .register("a", dlA) + .register("b", dlB) + .dispatchPredicate(alwaysDispatch) + .schedule(Duration.ofMillis(10)) + .tickerMode(true) + .build(); + + assertThat(registry.isTickerMode(), equalTo(true)); + + int count = registry.dispatchAllWithCount(); + assertThat(count,equalTo(1)); + + await().atMost(TWO_SECONDS).untilAtomic(done, is(true)); + + registry.close(); + } + + public void test_chain_data_loaders_will_hang_if_not_in_ticker_mode() { + + // delays much bigger than the tick rate will mean multiple calls to dispatch + DataLoader dlA = TestKit.idLoaderAsync(Duration.ofMillis(100)); + DataLoader dlB = TestKit.idLoaderAsync(Duration.ofMillis(200)); + + CompletableFuture chainedCF = dlA.load("AK1").thenCompose(dlB::load); + + AtomicBoolean done = new AtomicBoolean(); + chainedCF.whenComplete((v, t) -> done.set(true)); + + ScheduledDataLoaderRegistry registry = ScheduledDataLoaderRegistry.newScheduledRegistry() + .register("a", dlA) + .register("b", dlB) + .dispatchPredicate(alwaysDispatch) + .schedule(Duration.ofMillis(10)) + .tickerMode(false) + .build(); + + assertThat(registry.isTickerMode(), equalTo(false)); + + int count = registry.dispatchAllWithCount(); + assertThat(count,equalTo(1)); + + try { + await().atMost(TWO_SECONDS).untilAtomic(done, is(true)); + fail("This should not have completed but rather timed out"); + } catch (ConditionTimeoutException expected) { + } + registry.close(); + } } \ No newline at end of file diff --git a/src/test/java/org/dataloader/scheduler/BatchLoaderSchedulerTest.java b/src/test/java/org/dataloader/scheduler/BatchLoaderSchedulerTest.java new file mode 100644 index 0000000..beb7c18 --- /dev/null +++ b/src/test/java/org/dataloader/scheduler/BatchLoaderSchedulerTest.java @@ -0,0 +1,167 @@ +package org.dataloader.scheduler; + +import org.dataloader.BatchLoaderEnvironment; +import org.dataloader.DataLoader; +import org.dataloader.DataLoaderOptions; +import org.junit.Test; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; + +import static org.awaitility.Awaitility.await; +import static org.dataloader.DataLoaderFactory.newDataLoader; +import static org.dataloader.DataLoaderFactory.newMappedDataLoader; +import static org.dataloader.fixtures.TestKit.keysAsMapOfValues; +import static org.dataloader.fixtures.TestKit.keysAsMapOfValuesWithContext; +import static org.dataloader.fixtures.TestKit.keysAsValues; +import static org.dataloader.fixtures.TestKit.keysAsValuesWithContext; +import static org.dataloader.fixtures.TestKit.snooze; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; + +public class BatchLoaderSchedulerTest { + + BatchLoaderScheduler immediateScheduling = new BatchLoaderScheduler() { + + @Override + public CompletionStage> scheduleBatchLoader(ScheduledBatchLoaderCall scheduledCall, List keys, BatchLoaderEnvironment environment) { + return scheduledCall.invoke(); + } + + @Override + public CompletionStage> scheduleMappedBatchLoader(ScheduledMappedBatchLoaderCall scheduledCall, List keys, BatchLoaderEnvironment environment) { + return scheduledCall.invoke(); + } + }; + + private BatchLoaderScheduler delayedScheduling(int ms) { + return new BatchLoaderScheduler() { + + @Override + public CompletionStage> scheduleBatchLoader(ScheduledBatchLoaderCall scheduledCall, List keys, BatchLoaderEnvironment environment) { + return CompletableFuture.supplyAsync(() -> { + snooze(ms); + return scheduledCall.invoke(); + }).thenCompose(Function.identity()); + } + + @Override + public CompletionStage> scheduleMappedBatchLoader(ScheduledMappedBatchLoaderCall scheduledCall, List keys, BatchLoaderEnvironment environment) { + return CompletableFuture.supplyAsync(() -> { + snooze(ms); + return scheduledCall.invoke(); + }).thenCompose(Function.identity()); + } + }; + } + + private static void commonSetupAndSimpleAsserts(DataLoader identityLoader) { + CompletableFuture future1 = identityLoader.load(1); + CompletableFuture future2 = identityLoader.load(2); + + identityLoader.dispatch(); + + await().until(() -> future1.isDone() && future2.isDone()); + assertThat(future1.join(), equalTo(1)); + assertThat(future2.join(), equalTo(2)); + } + + @Test + public void can_allow_a_simple_scheduler() { + DataLoaderOptions options = DataLoaderOptions.newOptions().setBatchLoaderScheduler(immediateScheduling); + + DataLoader identityLoader = newDataLoader(keysAsValues(), options); + + commonSetupAndSimpleAsserts(identityLoader); + } + + @Test + public void can_allow_a_simple_scheduler_with_context() { + DataLoaderOptions options = DataLoaderOptions.newOptions().setBatchLoaderScheduler(immediateScheduling); + + DataLoader identityLoader = newDataLoader(keysAsValuesWithContext(), options); + + commonSetupAndSimpleAsserts(identityLoader); + } + + @Test + public void can_allow_a_simple_scheduler_with_mapped_batch_load() { + DataLoaderOptions options = DataLoaderOptions.newOptions().setBatchLoaderScheduler(immediateScheduling); + + DataLoader identityLoader = newMappedDataLoader(keysAsMapOfValues(), options); + + commonSetupAndSimpleAsserts(identityLoader); + } + + @Test + public void can_allow_a_simple_scheduler_with_mapped_batch_load_with_context() { + DataLoaderOptions options = DataLoaderOptions.newOptions().setBatchLoaderScheduler(immediateScheduling); + + DataLoader identityLoader = newMappedDataLoader(keysAsMapOfValuesWithContext(), options); + + commonSetupAndSimpleAsserts(identityLoader); + } + + @Test + public void can_allow_an_async_scheduler() { + DataLoaderOptions options = DataLoaderOptions.newOptions().setBatchLoaderScheduler(delayedScheduling(50)); + + DataLoader identityLoader = newDataLoader(keysAsValues(), options); + + commonSetupAndSimpleAsserts(identityLoader); + } + + + @Test + public void can_allow_a_funky_scheduler() { + AtomicBoolean releaseTheHounds = new AtomicBoolean(); + BatchLoaderScheduler funkyScheduler = new BatchLoaderScheduler() { + @Override + public CompletionStage> scheduleBatchLoader(ScheduledBatchLoaderCall scheduledCall, List keys, BatchLoaderEnvironment environment) { + return CompletableFuture.supplyAsync(() -> { + while (!releaseTheHounds.get()) { + snooze(10); + } + return scheduledCall.invoke(); + }).thenCompose(Function.identity()); + } + + @Override + public CompletionStage> scheduleMappedBatchLoader(ScheduledMappedBatchLoaderCall scheduledCall, List keys, BatchLoaderEnvironment environment) { + return CompletableFuture.supplyAsync(() -> { + while (!releaseTheHounds.get()) { + snooze(10); + } + return scheduledCall.invoke(); + }).thenCompose(Function.identity()); + } + }; + DataLoaderOptions options = DataLoaderOptions.newOptions().setBatchLoaderScheduler(funkyScheduler); + + DataLoader identityLoader = newDataLoader(keysAsValues(), options); + + CompletableFuture future1 = identityLoader.load(1); + CompletableFuture future2 = identityLoader.load(2); + + identityLoader.dispatch(); + + // we can spin around for a while - nothing will happen until we release the hounds + for (int i = 0; i < 5; i++) { + assertThat(future1.isDone(), equalTo(false)); + assertThat(future2.isDone(), equalTo(false)); + snooze(50); + } + + releaseTheHounds.set(true); + + await().until(() -> future1.isDone() && future2.isDone()); + assertThat(future1.join(), equalTo(1)); + assertThat(future2.join(), equalTo(2)); + } + + +}