Skip to content

Commit ea40d20

Browse files
committed
feat: improve multi threading
1 parent a696ba2 commit ea40d20

File tree

1 file changed

+92
-56
lines changed

1 file changed

+92
-56
lines changed

backend/src/main/java/ch/xxx/trader/usecase/services/CoinbaseService.java

+92-56
Original file line numberDiff line numberDiff line change
@@ -27,15 +27,18 @@
2727
import java.time.LocalDate;
2828
import java.time.LocalDateTime;
2929
import java.time.ZoneId;
30+
import java.util.ArrayList;
3031
import java.util.Calendar;
3132
import java.util.Collection;
3233
import java.util.Date;
34+
import java.util.GregorianCalendar;
3335
import java.util.LinkedList;
3436
import java.util.List;
3537
import java.util.Map;
3638
import java.util.Optional;
3739
import java.util.concurrent.CompletableFuture;
3840
import java.util.concurrent.ConcurrentHashMap;
41+
import java.util.concurrent.ForkJoinPool;
3942
import java.util.concurrent.TimeUnit;
4043
import java.util.concurrent.atomic.AtomicInteger;
4144
import java.util.function.BiConsumer;
@@ -81,7 +84,7 @@ private record GetSetMethodFunctions(Function<QuoteCb, BigDecimal> getter, BiCon
8184
private boolean cpuConstraint;
8285
private final List<String> nonValueFieldNames = List.of("_id", "createdAt", "class");
8386
private final List<PropertyDescriptor> propertyDescriptors;
84-
private final Scheduler mongoScheduler = Schedulers.newBoundedElastic(5, 10, "mongoImport", 10);
87+
private final Scheduler mongoScheduler = Schedulers.newBoundedElastic(10, 10, "mongoImport", 10);
8588
@Value("${single.instance.deployment:false}")
8689
private boolean singleInstanceDeployment;
8790

@@ -195,73 +198,106 @@ private String createHourDayAvg() {
195198
private void createCbHourlyAvg() {
196199
LOG.info("createCbHourlyAvg()");
197200
LocalDateTime startAll = LocalDateTime.now();
198-
MyTimeFrame timeFrame = this.serviceUtils.createTimeFrame(CB_HOUR_COL, QuoteCb.class, true);
199-
SimpleDateFormat sdf = new SimpleDateFormat("dd.MM.yyyy");
201+
MyTimeFrame timeFrame = this.serviceUtils.createTimeFrame(CB_HOUR_COL, QuoteCb.class, true);
200202
Calendar now = Calendar.getInstance();
201203
now.setTime(Date.from(LocalDate.now().atStartOfDay().atZone(ZoneId.systemDefault()).toInstant()));
202-
while (timeFrame.end().before(now)) {
203-
Date start = new Date();
204-
final var nonZeroProperties = new AtomicInteger(0);
205-
Query query = new Query();
206-
query.addCriteria(
207-
Criteria.where(DtoUtils.CREATEDAT).gt(timeFrame.begin().getTime()).lt(timeFrame.end().getTime()));
208-
// Coinbase
209-
Mono<Collection<QuoteCb>> collectCb = this.myMongoRepository.find(query, QuoteCb.class)
210-
.timeout(Duration.ofSeconds(5L)).doOnError(ex -> LOG.warn("Coinbase prepare hour data failed", ex))
211-
.onErrorResume(ex -> Mono.empty()).subscribeOn(this.mongoScheduler).collectList()
212-
.map(quotes -> makeCbQuoteHour(quotes, timeFrame.begin(), timeFrame.end()));
213-
collectCb.filter(Predicate.not(Collection::isEmpty))
214-
.map(myColl -> countRelevantProperties(nonZeroProperties, myColl))
215-
.flatMap(myColl -> this.myMongoRepository.insertAll(Mono.just(myColl), CB_HOUR_COL)
216-
.timeout(Duration.ofSeconds(5L))
217-
.doOnError(ex -> LOG.warn("Coinbase prepare hour data failed", ex))
218-
.onErrorResume(ex -> Mono.empty()).subscribeOn(this.mongoScheduler).collectList())
219-
.subscribeOn(this.mongoScheduler).block();
220-
221-
timeFrame.begin().add(Calendar.DAY_OF_YEAR, 1);
222-
timeFrame.end().add(Calendar.DAY_OF_YEAR, 1);
223-
LOG.info("Prepared Coinbase Hour Data for: " + sdf.format(timeFrame.begin().getTime()) + " Time: "
224-
+ (new Date().getTime() - start.getTime()) + "ms" + " 0 < properties: "
225-
+ nonZeroProperties.get());
226-
}
204+
final var timeFrames = this.createTimeFrames(timeFrame, now);
205+
if (this.cpuConstraint) {
206+
timeFrames.stream().forEachOrdered(timeFrame1 -> processHourTimeFrame(timeFrame1));
207+
} else {
208+
try (ForkJoinPool customThreadPool = new ForkJoinPool(2)) {
209+
customThreadPool.submit(() -> timeFrames.parallelStream().forEachOrdered(timeFrame1 -> processHourTimeFrame(timeFrame1)));
210+
customThreadPool.shutdown();
211+
}
212+
}
227213
LOG.info(this.serviceUtils.createAvgLogStatement(startAll, "Prepared Coinbase Hourly Data Time:"));
228214
}
229215

216+
private void processHourTimeFrame(MyTimeFrame timeFrame1) {
217+
Date start = new Date();
218+
SimpleDateFormat sdf = new SimpleDateFormat("dd.MM.yyyy");
219+
final var nonZeroProperties = new AtomicInteger(0);
220+
Query query = new Query();
221+
query.addCriteria(
222+
Criteria.where(DtoUtils.CREATEDAT).gt(timeFrame1.begin().getTime()).lt(timeFrame1.end().getTime()));
223+
// Coinbase
224+
Mono<Collection<QuoteCb>> collectCb = this.myMongoRepository.find(query, QuoteCb.class)
225+
.timeout(Duration.ofSeconds(5L)).doOnError(ex -> LOG.warn("Coinbase prepare hour data failed", ex))
226+
.onErrorResume(ex -> Mono.empty()).subscribeOn(this.mongoScheduler).collectList()
227+
.map(quotes -> makeCbQuoteHour(quotes, timeFrame1.begin(), timeFrame1.end()));
228+
collectCb.filter(Predicate.not(Collection::isEmpty))
229+
.map(myColl -> countRelevantProperties(nonZeroProperties, myColl))
230+
.flatMap(myColl -> this.myMongoRepository.insertAll(Mono.just(myColl), CB_HOUR_COL)
231+
.timeout(Duration.ofSeconds(5L))
232+
.doOnError(ex -> LOG.warn("Coinbase prepare hour data failed", ex))
233+
.onErrorResume(ex -> Mono.empty()).subscribeOn(this.mongoScheduler).collectList())
234+
.subscribeOn(this.mongoScheduler).block();
235+
LOG.info("Prepared Coinbase Hour Data for: " + sdf.format(timeFrame1.begin().getTime()) + " Time: "
236+
+ (new Date().getTime() - start.getTime()) + "ms" + " 0 < properties: " + nonZeroProperties.get());
237+
}
238+
230239
private void createCbDailyAvg() {
231240
LOG.info("createCbDailyAvg()");
232241
LocalDateTime startAll = LocalDateTime.now();
233-
MyTimeFrame timeFrame = this.serviceUtils.createTimeFrame(CB_DAY_COL, QuoteCb.class, false);
234-
SimpleDateFormat sdf = new SimpleDateFormat("dd.MM.yyyy");
235-
Calendar now = Calendar.getInstance();
242+
final MyTimeFrame timeFrame = this.serviceUtils.createTimeFrame(CB_DAY_COL, QuoteCb.class, false);
243+
final Calendar now = Calendar.getInstance();
236244
now.setTime(Date.from(LocalDate.now().atStartOfDay().atZone(ZoneId.systemDefault()).toInstant()));
237-
while (timeFrame.end().before(now)) {
238-
Date start = new Date();
239-
final var nonZeroProperties = new AtomicInteger(0);
240-
Query query = new Query();
241-
query.addCriteria(
242-
Criteria.where(DtoUtils.CREATEDAT).gt(timeFrame.begin().getTime()).lt(timeFrame.end().getTime()));
243-
// Coinbase
244-
Mono<Collection<QuoteCb>> collectCb = this.myMongoRepository.find(query, QuoteCb.class)
245-
.timeout(Duration.ofSeconds(5L)).doOnError(ex -> LOG.warn("Coinbase prepare day data failed", ex))
246-
.onErrorResume(ex -> Mono.empty()).subscribeOn(this.mongoScheduler).collectList()
247-
.map(quotes -> makeCbQuoteDay(quotes, timeFrame.begin(), timeFrame.end()));
248-
collectCb.filter(Predicate.not(Collection::isEmpty))
249-
.map(myColl -> countRelevantProperties(nonZeroProperties, myColl))
250-
.flatMap(myColl -> this.myMongoRepository.insertAll(Mono.just(myColl), CB_DAY_COL)
251-
.timeout(Duration.ofSeconds(5L))
252-
.doOnError(ex -> LOG.warn("Coinbase prepare day data failed", ex))
253-
.onErrorResume(ex -> Mono.empty()).subscribeOn(this.mongoScheduler).collectList())
254-
.subscribeOn(this.mongoScheduler).block();
255-
256-
timeFrame.begin().add(Calendar.DAY_OF_YEAR, 1);
257-
timeFrame.end().add(Calendar.DAY_OF_YEAR, 1);
258-
LOG.info("Prepared Coinbase Day Data for: " + sdf.format(timeFrame.begin().getTime()) + " Time: "
259-
+ (new Date().getTime() - start.getTime()) + "ms" + " 0 < properties: "
260-
+ nonZeroProperties.get());
261-
}
245+
final var timeFrames = this.createTimeFrames(timeFrame, now);
246+
if (this.cpuConstraint) {
247+
timeFrames.stream().forEachOrdered(timeFrame1 -> processDayTimeFrame(timeFrame1));
248+
} else {
249+
try (ForkJoinPool customThreadPool = new ForkJoinPool(2)) {
250+
customThreadPool.submit(() -> timeFrames.parallelStream().forEachOrdered(timeFrame1 -> processDayTimeFrame(timeFrame1)));
251+
customThreadPool.shutdown();
252+
}
253+
}
262254
LOG.info(this.serviceUtils.createAvgLogStatement(startAll, "Prepared Coinbase Daily Data Time:"));
263255
}
264256

257+
private List<MyTimeFrame> createTimeFrames(final MyTimeFrame timeFrame, final Calendar now) {
258+
final var timeFrames = new ArrayList<MyTimeFrame>();
259+
var begin = timeFrame.begin();
260+
var end = timeFrame.end();
261+
while (end.before(now)) {
262+
var myTimeFrame = new MyTimeFrame(begin, end);
263+
timeFrames.add(myTimeFrame);
264+
begin = nextDay(begin);
265+
end = nextDay(end);
266+
}
267+
return timeFrames;
268+
}
269+
270+
private Calendar nextDay(Calendar begin) {
271+
var begin1 = GregorianCalendar.getInstance();
272+
begin1.setTime(begin.getTime());
273+
begin1.add(Calendar.DAY_OF_YEAR, 1);
274+
begin = begin1;
275+
return begin;
276+
}
277+
278+
private void processDayTimeFrame(MyTimeFrame timeFrame1) {
279+
Date start = new Date();
280+
final SimpleDateFormat sdf = new SimpleDateFormat("dd.MM.yyyy");
281+
final var nonZeroProperties = new AtomicInteger(0);
282+
Query query = new Query();
283+
query.addCriteria(
284+
Criteria.where(DtoUtils.CREATEDAT).gt(timeFrame1.begin().getTime()).lt(timeFrame1.end().getTime()));
285+
// Coinbase
286+
Mono<Collection<QuoteCb>> collectCb = this.myMongoRepository.find(query, QuoteCb.class)
287+
.timeout(Duration.ofSeconds(5L)).doOnError(ex -> LOG.warn("Coinbase prepare day data failed", ex))
288+
.onErrorResume(ex -> Mono.empty()).subscribeOn(this.mongoScheduler).collectList()
289+
.map(quotes -> makeCbQuoteDay(quotes, timeFrame1.begin(), timeFrame1.end()));
290+
collectCb.filter(Predicate.not(Collection::isEmpty))
291+
.map(myColl -> countRelevantProperties(nonZeroProperties, myColl))
292+
.flatMap(myColl -> this.myMongoRepository.insertAll(Mono.just(myColl), CB_DAY_COL)
293+
.timeout(Duration.ofSeconds(5L))
294+
.doOnError(ex -> LOG.warn("Coinbase prepare day data failed", ex))
295+
.onErrorResume(ex -> Mono.empty()).subscribeOn(this.mongoScheduler).collectList())
296+
.subscribeOn(this.mongoScheduler).block();
297+
LOG.info("Prepared Coinbase Day Data for: " + sdf.format(timeFrame1.begin().getTime()) + " Time: "
298+
+ (new Date().getTime() - start.getTime()) + "ms" + " 0 < properties: " + nonZeroProperties.get());
299+
}
300+
265301
private Collection<QuoteCb> countRelevantProperties(final AtomicInteger nonZeroProperties,
266302
Collection<QuoteCb> myColl) {
267303
var relevantProperties = myColl.stream().flatMap(myQuote -> Stream.of(this.propertiesNonZero(myQuote)))

0 commit comments

Comments
 (0)