@@ -198,59 +198,62 @@ private String createHourDayAvg() {
198
198
private void createCbHourlyAvg () {
199
199
LOG .info ("createCbHourlyAvg()" );
200
200
LocalDateTime startAll = LocalDateTime .now ();
201
- MyTimeFrame timeFrame = this .serviceUtils .createTimeFrame (CB_HOUR_COL , QuoteCb .class , true );
201
+ MyTimeFrame timeFrame = this .serviceUtils .createTimeFrame (CB_HOUR_COL , QuoteCb .class , true );
202
202
Calendar now = Calendar .getInstance ();
203
203
now .setTime (Date .from (LocalDate .now ().atStartOfDay ().atZone (ZoneId .systemDefault ()).toInstant ()));
204
204
final var timeFrames = this .createTimeFrames (timeFrame , now );
205
205
if (this .cpuConstraint ) {
206
- timeFrames .stream ().forEachOrdered (timeFrame1 -> processHourTimeFrame (timeFrame1 ));
207
- } else {
206
+ timeFrames .stream ().forEachOrdered (timeFrame1 -> this . processTimeFrame (timeFrame1 , false ));
207
+ } else {
208
208
try (ForkJoinPool customThreadPool = new ForkJoinPool (2 )) {
209
- customThreadPool .submit (() -> timeFrames .parallelStream ().forEachOrdered (timeFrame1 -> processHourTimeFrame (timeFrame1 )));
209
+ customThreadPool .submit (() -> timeFrames .parallelStream ()
210
+ .forEachOrdered (timeFrame1 -> this .processTimeFrame (timeFrame1 , false )));
210
211
customThreadPool .shutdown ();
211
212
}
212
- }
213
+ }
213
214
LOG .info (this .serviceUtils .createAvgLogStatement (startAll , "Prepared Coinbase Hourly Data Time:" ));
214
215
}
215
216
216
- private void processHourTimeFrame (MyTimeFrame timeFrame1 ) {
217
+ private void processTimeFrame (MyTimeFrame timeFrame1 , boolean isDay ) {
217
218
Date start = new Date ();
218
219
SimpleDateFormat sdf = new SimpleDateFormat ("dd.MM.yyyy" );
219
220
final var nonZeroProperties = new AtomicInteger (0 );
220
221
Query query = new Query ();
221
222
query .addCriteria (
222
223
Criteria .where (DtoUtils .CREATEDAT ).gt (timeFrame1 .begin ().getTime ()).lt (timeFrame1 .end ().getTime ()));
223
224
// Coinbase
225
+ final var logFailed = String .format ("Coinbase prepare %s data failed" , isDay ? "day" : "hour" );
224
226
Mono <Collection <QuoteCb >> collectCb = this .myMongoRepository .find (query , QuoteCb .class )
225
- .timeout (Duration .ofSeconds (5L )).doOnError (ex -> LOG .warn ("Coinbase prepare hour data failed" , ex ))
227
+ .timeout (Duration .ofSeconds (10L )).doOnError (ex -> LOG .warn (logFailed , ex ))
226
228
.onErrorResume (ex -> Mono .empty ()).subscribeOn (this .mongoScheduler ).collectList ()
227
- .map (quotes -> makeCbQuoteHour (quotes , timeFrame1 .begin (), timeFrame1 .end ()));
229
+ .map (quotes -> isDay ? this .makeCbQuoteDay (quotes , timeFrame1 .begin (), timeFrame1 .end ())
230
+ : this .makeCbQuoteHour (quotes , timeFrame1 .begin (), timeFrame1 .end ()));
228
231
collectCb .filter (Predicate .not (Collection ::isEmpty ))
229
- .map (myColl -> countRelevantProperties (nonZeroProperties , myColl ))
230
- .flatMap (myColl -> this .myMongoRepository .insertAll (Mono .just (myColl ), CB_HOUR_COL )
231
- .timeout (Duration .ofSeconds (5L ))
232
- .doOnError (ex -> LOG .warn ("Coinbase prepare hour data failed" , ex ))
232
+ .map (myColl -> this .countRelevantProperties (nonZeroProperties , myColl ))
233
+ .flatMap (myColl -> this .myMongoRepository .insertAll (Mono .just (myColl ), isDay ? CB_DAY_COL : CB_HOUR_COL )
234
+ .timeout (Duration .ofSeconds (10L )).doOnError (ex -> LOG .warn (logFailed , ex ))
233
235
.onErrorResume (ex -> Mono .empty ()).subscribeOn (this .mongoScheduler ).collectList ())
234
236
.subscribeOn (this .mongoScheduler ).block ();
235
- LOG .info ("Prepared Coinbase Hour Data for: " + sdf .format (timeFrame1 .begin ().getTime ()) + " Time: "
237
+ LOG .info (String . format ( "Prepared Coinbase %s Data for: " , isDay ? "Day" : "Hour" ) + sdf .format (timeFrame1 .begin ().getTime ()) + " Time: "
236
238
+ (new Date ().getTime () - start .getTime ()) + "ms" + " 0 < properties: " + nonZeroProperties .get ());
237
239
}
238
240
239
241
private void createCbDailyAvg () {
240
242
LOG .info ("createCbDailyAvg()" );
241
243
LocalDateTime startAll = LocalDateTime .now ();
242
- final MyTimeFrame timeFrame = this .serviceUtils .createTimeFrame (CB_DAY_COL , QuoteCb .class , false );
244
+ final MyTimeFrame timeFrame = this .serviceUtils .createTimeFrame (CB_DAY_COL , QuoteCb .class , false );
243
245
final Calendar now = Calendar .getInstance ();
244
246
now .setTime (Date .from (LocalDate .now ().atStartOfDay ().atZone (ZoneId .systemDefault ()).toInstant ()));
245
247
final var timeFrames = this .createTimeFrames (timeFrame , now );
246
248
if (this .cpuConstraint ) {
247
- timeFrames .stream ().forEachOrdered (timeFrame1 -> processDayTimeFrame (timeFrame1 ));
249
+ timeFrames .stream ().forEachOrdered (timeFrame1 -> this . processTimeFrame (timeFrame1 , true ));
248
250
} else {
249
251
try (ForkJoinPool customThreadPool = new ForkJoinPool (2 )) {
250
- customThreadPool .submit (() -> timeFrames .parallelStream ().forEachOrdered (timeFrame1 -> processDayTimeFrame (timeFrame1 )));
252
+ customThreadPool .submit (() -> timeFrames .parallelStream ()
253
+ .forEachOrdered (timeFrame1 -> this .processTimeFrame (timeFrame1 , true )));
251
254
customThreadPool .shutdown ();
252
- }
253
- }
255
+ }
256
+ }
254
257
LOG .info (this .serviceUtils .createAvgLogStatement (startAll , "Prepared Coinbase Daily Data Time:" ));
255
258
}
256
259
@@ -275,29 +278,6 @@ private Calendar nextDay(Calendar begin) {
275
278
return begin ;
276
279
}
277
280
278
- private void processDayTimeFrame (MyTimeFrame timeFrame1 ) {
279
- Date start = new Date ();
280
- final SimpleDateFormat sdf = new SimpleDateFormat ("dd.MM.yyyy" );
281
- final var nonZeroProperties = new AtomicInteger (0 );
282
- Query query = new Query ();
283
- query .addCriteria (
284
- Criteria .where (DtoUtils .CREATEDAT ).gt (timeFrame1 .begin ().getTime ()).lt (timeFrame1 .end ().getTime ()));
285
- // Coinbase
286
- Mono <Collection <QuoteCb >> collectCb = this .myMongoRepository .find (query , QuoteCb .class )
287
- .timeout (Duration .ofSeconds (5L )).doOnError (ex -> LOG .warn ("Coinbase prepare day data failed" , ex ))
288
- .onErrorResume (ex -> Mono .empty ()).subscribeOn (this .mongoScheduler ).collectList ()
289
- .map (quotes -> makeCbQuoteDay (quotes , timeFrame1 .begin (), timeFrame1 .end ()));
290
- collectCb .filter (Predicate .not (Collection ::isEmpty ))
291
- .map (myColl -> countRelevantProperties (nonZeroProperties , myColl ))
292
- .flatMap (myColl -> this .myMongoRepository .insertAll (Mono .just (myColl ), CB_DAY_COL )
293
- .timeout (Duration .ofSeconds (5L ))
294
- .doOnError (ex -> LOG .warn ("Coinbase prepare day data failed" , ex ))
295
- .onErrorResume (ex -> Mono .empty ()).subscribeOn (this .mongoScheduler ).collectList ())
296
- .subscribeOn (this .mongoScheduler ).block ();
297
- LOG .info ("Prepared Coinbase Day Data for: " + sdf .format (timeFrame1 .begin ().getTime ()) + " Time: "
298
- + (new Date ().getTime () - start .getTime ()) + "ms" + " 0 < properties: " + nonZeroProperties .get ());
299
- }
300
-
301
281
private Collection <QuoteCb > countRelevantProperties (final AtomicInteger nonZeroProperties ,
302
282
Collection <QuoteCb > myColl ) {
303
283
var relevantProperties = myColl .stream ().flatMap (myQuote -> Stream .of (this .propertiesNonZero (myQuote )))
0 commit comments