@@ -206,14 +206,20 @@ private void processTimeFrame(MyTimeFrame timeFrame1, boolean isDay) {
206
206
// Coinbase
207
207
final var logFailed = String .format ("Coinbase prepare %s data failed" , isDay ? "day" : "hour" );
208
208
Mono <Collection <QuoteCb >> collectCb = this .myMongoRepository .find (query , QuoteCb .class )
209
- .timeout (this .slowIo ? Duration .ofSeconds (30L ) : Duration .ofSeconds (10L )).doOnError (ex -> LOG .warn (logFailed , ex ))
210
- .onErrorResume (ex -> Mono .empty ()).subscribeOn (this .mongoScheduler ).collectList ()
209
+ .timeout (this .slowIo ? Duration .ofSeconds (30L ) : Duration .ofSeconds (10L ))
210
+ .doOnError (ex -> LOG .warn (logFailed , ex )).onErrorResume (ex -> {
211
+ LOG .warn (logFailed , ex );
212
+ return Mono .empty ();
213
+ }).subscribeOn (this .mongoScheduler ).collectList ()
211
214
.map (quotes -> this .createCbQuoteTimeFrame (timeFrame1 , isDay , quotes ));
212
215
collectCb .filter (Predicate .not (Collection ::isEmpty ))
213
216
.map (myColl -> this .countRelevantProperties (nonZeroProperties , myColl ))
214
217
.flatMap (myColl -> this .myMongoRepository .insertAll (Mono .just (myColl ), isDay ? CB_DAY_COL : CB_HOUR_COL )
215
- .timeout (this .slowIo ? Duration .ofSeconds (30L ) : Duration .ofSeconds (10L )).doOnError (ex -> LOG .warn (logFailed , ex ))
216
- .onErrorResume (ex -> Mono .empty ()).subscribeOn (this .mongoScheduler ).collectList ())
218
+ .timeout (this .slowIo ? Duration .ofSeconds (30L ) : Duration .ofSeconds (10L ))
219
+ .doOnError (ex -> LOG .warn (logFailed , ex )).onErrorResume (ex -> {
220
+ LOG .warn (logFailed , ex );
221
+ return Mono .empty ();
222
+ }).subscribeOn (this .mongoScheduler ).collectList ())
217
223
.subscribeOn (this .mongoScheduler ).block ();
218
224
LOG .info (String .format ("Prepared Coinbase %s Data for: " , isDay ? "Day" : "Hour" )
219
225
+ sdf .format (timeFrame1 .begin ().getTime ()) + " Time: " + (new Date ().getTime () - start .getTime ()) + "ms"
@@ -227,7 +233,8 @@ private Collection<QuoteCb> createCbQuoteTimeFrame(final MyTimeFrame timeFrame1,
227
233
var result = isDay ? this .makeCbQuoteDay (quotes , timeFrame1 .begin (), timeFrame1 .end ())
228
234
: this .makeCbQuoteHour (quotes , timeFrame1 .begin (), timeFrame1 .end ());
229
235
LOG .info (String .format ("Calculate Coinbase %s Data for: " , isDay ? "Day" : "Hour" )
230
- + sdf .format (timeFrame1 .begin ().getTime ()) + " Time: " + (new Date ().getTime () - start .getTime ()) + "ms" );
236
+ + sdf .format (timeFrame1 .begin ().getTime ()) + " Time: " + (new Date ().getTime () - start .getTime ())
237
+ + "ms" );
231
238
return result ;
232
239
}
233
240
0 commit comments