37
37
import java .util .concurrent .CompletableFuture ;
38
38
import java .util .concurrent .ConcurrentHashMap ;
39
39
import java .util .concurrent .TimeUnit ;
40
+ import java .util .concurrent .atomic .AtomicInteger ;
40
41
import java .util .function .BiConsumer ;
41
42
import java .util .function .Function ;
42
43
import java .util .function .Predicate ;
@@ -200,6 +201,7 @@ private void createCbHourlyAvg() {
200
201
now .setTime (Date .from (LocalDate .now ().atStartOfDay ().atZone (ZoneId .systemDefault ()).toInstant ()));
201
202
while (timeFrame .end ().before (now )) {
202
203
Date start = new Date ();
204
+ final var nonZeroProperties = new AtomicInteger (0 );
203
205
Query query = new Query ();
204
206
query .addCriteria (
205
207
Criteria .where (DtoUtils .CREATEDAT ).gt (timeFrame .begin ().getTime ()).lt (timeFrame .end ().getTime ()));
@@ -209,6 +211,7 @@ private void createCbHourlyAvg() {
209
211
.onErrorResume (ex -> Mono .empty ()).subscribeOn (this .mongoScheduler ).collectList ()
210
212
.map (quotes -> makeCbQuoteHour (quotes , timeFrame .begin (), timeFrame .end ()));
211
213
collectCb .filter (Predicate .not (Collection ::isEmpty ))
214
+ .map (myColl -> countRelevantProperties (nonZeroProperties , myColl ))
212
215
.flatMap (myColl -> this .myMongoRepository .insertAll (Mono .just (myColl ), CB_HOUR_COL )
213
216
.timeout (Duration .ofSeconds (5L ))
214
217
.doOnError (ex -> LOG .warn ("Coinbase prepare hour data failed" , ex ))
@@ -218,8 +221,8 @@ private void createCbHourlyAvg() {
218
221
timeFrame .begin ().add (Calendar .DAY_OF_YEAR , 1 );
219
222
timeFrame .end ().add (Calendar .DAY_OF_YEAR , 1 );
220
223
LOG .info ("Prepared Coinbase Hour Data for: " + sdf .format (timeFrame .begin ().getTime ()) + " Time: "
221
- + (new Date ().getTime () - start .getTime ()) + "ms" + " properties: "
222
- + ( cbFunctionCache . size () / 2 ));
224
+ + (new Date ().getTime () - start .getTime ()) + "ms" + " 0 < properties: "
225
+ + nonZeroProperties . get ( ));
223
226
}
224
227
LOG .info (this .serviceUtils .createAvgLogStatement (startAll , "Prepared Coinbase Hourly Data Time:" ));
225
228
}
@@ -233,6 +236,7 @@ private void createCbDailyAvg() {
233
236
now .setTime (Date .from (LocalDate .now ().atStartOfDay ().atZone (ZoneId .systemDefault ()).toInstant ()));
234
237
while (timeFrame .end ().before (now )) {
235
238
Date start = new Date ();
239
+ final var nonZeroProperties = new AtomicInteger (0 );
236
240
Query query = new Query ();
237
241
query .addCriteria (
238
242
Criteria .where (DtoUtils .CREATEDAT ).gt (timeFrame .begin ().getTime ()).lt (timeFrame .end ().getTime ()));
@@ -242,6 +246,7 @@ private void createCbDailyAvg() {
242
246
.onErrorResume (ex -> Mono .empty ()).subscribeOn (this .mongoScheduler ).collectList ()
243
247
.map (quotes -> makeCbQuoteDay (quotes , timeFrame .begin (), timeFrame .end ()));
244
248
collectCb .filter (Predicate .not (Collection ::isEmpty ))
249
+ .map (myColl -> countRelevantProperties (nonZeroProperties , myColl ))
245
250
.flatMap (myColl -> this .myMongoRepository .insertAll (Mono .just (myColl ), CB_DAY_COL )
246
251
.timeout (Duration .ofSeconds (5L ))
247
252
.doOnError (ex -> LOG .warn ("Coinbase prepare day data failed" , ex ))
@@ -251,12 +256,21 @@ private void createCbDailyAvg() {
251
256
timeFrame .begin ().add (Calendar .DAY_OF_YEAR , 1 );
252
257
timeFrame .end ().add (Calendar .DAY_OF_YEAR , 1 );
253
258
LOG .info ("Prepared Coinbase Day Data for: " + sdf .format (timeFrame .begin ().getTime ()) + " Time: "
254
- + (new Date ().getTime () - start .getTime ()) + "ms" + " properties: "
255
- + ( cbFunctionCache . size () / 2 ));
259
+ + (new Date ().getTime () - start .getTime ()) + "ms" + " 0 < properties: "
260
+ + nonZeroProperties . get ( ));
256
261
}
257
262
LOG .info (this .serviceUtils .createAvgLogStatement (startAll , "Prepared Coinbase Daily Data Time:" ));
258
263
}
259
264
265
+ private Collection <QuoteCb > countRelevantProperties (final AtomicInteger nonZeroProperties ,
266
+ Collection <QuoteCb > myColl ) {
267
+ var relevantProperties = myColl .stream ().flatMap (myQuote -> Stream .of (this .propertiesNonZero (myQuote )))
268
+ .mapToInt (v -> v ).max ().orElse (0 );
269
+ nonZeroProperties
270
+ .set (nonZeroProperties .get () < relevantProperties ? relevantProperties : nonZeroProperties .get ());
271
+ return myColl ;
272
+ }
273
+
260
274
private Collection <QuoteCb > makeCbQuoteDay (List <QuoteCb > quotes , Calendar begin , Calendar end ) {
261
275
List <QuoteCb > hourQuotes = new LinkedList <QuoteCb >();
262
276
QuoteCb quoteCb = new QuoteCb ();
@@ -300,12 +314,25 @@ private QuoteCb avgCbQuotePeriod(QuoteCb q1, QuoteCb q2, long count) {
300
314
return result ;
301
315
}
302
316
317
+ private Integer propertiesNonZero (QuoteCb quote ) {
318
+ var result = new AtomicInteger (0 );
319
+ this .propertyDescriptors .forEach (myPropertyDescriptor -> {
320
+ try {
321
+ var gsmf = this .createGetMethodFunction (myPropertyDescriptor );
322
+ BigDecimal num1 = gsmf .getter .apply (quote );
323
+ result .set (num1 .compareTo (BigDecimal .ZERO ) > 0 ? result .addAndGet (1 ) : result .get ());
324
+ } catch (Exception e ) {
325
+ throw new RuntimeException (e );
326
+ }
327
+ });
328
+ return result .get ();
329
+ }
330
+
303
331
private QuoteCb avgCbQuotePeriodMF (QuoteCb q1 , QuoteCb q2 , long count ) {
304
332
QuoteCb result = new QuoteCb ();
305
333
this .propertyDescriptors .forEach (myPropertyDescriptor -> {
306
- GetSetMethodFunctions gsmf ;
307
334
try {
308
- gsmf = this .createGetMethodFunction (myPropertyDescriptor );
335
+ var gsmf = this .createGetMethodFunction (myPropertyDescriptor );
309
336
BigDecimal num1 = gsmf .getter .apply (q1 );
310
337
BigDecimal num2 = gsmf .getter .apply (q2 );
311
338
BigDecimal resultValue = this .serviceUtils .avgHourValue (num1 , num2 , count );
0 commit comments