Skip to content

Commit d6782b1

Browse files
committedMay 23, 2024
fix: webclient
1 parent 7ced395 commit d6782b1

File tree

5 files changed

+45
-48
lines changed

5 files changed

+45
-48
lines changed
 

‎backend/src/main/java/ch/xxx/trader/adapter/config/SchedulingConfig.java

+1-37
Original file line numberDiff line numberDiff line change
@@ -15,32 +15,20 @@
1515
*/
1616
package ch.xxx.trader.adapter.config;
1717

18-
import java.time.Duration;
1918
import java.util.concurrent.Executor;
2019

21-
import javax.net.ssl.SSLException;
22-
2320
import org.slf4j.Logger;
2421
import org.slf4j.LoggerFactory;
2522
import org.springframework.context.annotation.Bean;
2623
import org.springframework.context.annotation.Configuration;
2724
import org.springframework.context.annotation.EnableAspectJAutoProxy;
28-
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
2925
import org.springframework.scheduling.annotation.EnableAsync;
3026
import org.springframework.scheduling.annotation.EnableScheduling;
3127
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
32-
import org.springframework.web.reactive.function.client.WebClient;
3328

3429
import io.micrometer.core.aop.TimedAspect;
3530
import io.micrometer.core.instrument.MeterRegistry;
36-
import io.netty.channel.ChannelOption;
37-
import io.netty.handler.ssl.SslContextBuilder;
38-
import io.netty.handler.timeout.ReadTimeoutHandler;
39-
import io.netty.handler.timeout.WriteTimeoutHandler;
4031
import net.javacrumbs.shedlock.spring.annotation.EnableSchedulerLock;
41-
import reactor.netty.http.client.HttpClient;
42-
import reactor.netty.resources.ConnectionProvider;
43-
import reactor.netty.tcp.SslProvider.SslContextSpec;
4432

4533
@Configuration
4634
@EnableAspectJAutoProxy
@@ -55,30 +43,6 @@ TimedAspect timedAspect(MeterRegistry registry) {
5543
return new TimedAspect(registry);
5644
}
5745

58-
@Bean
59-
public WebClient createWebClient() {
60-
ConnectionProvider provider = ConnectionProvider.builder("Client").maxConnections(20)
61-
.maxIdleTime(Duration.ofSeconds(6)).maxLifeTime(Duration.ofSeconds(7))
62-
.pendingAcquireTimeout(Duration.ofSeconds(9L)).evictInBackground(Duration.ofSeconds(10)).build();
63-
64-
WebClient webClient = WebClient.builder().clientConnector(new ReactorClientHttpConnector(HttpClient
65-
.create(provider).secure(spec -> sslTimeouts(spec)).option(ChannelOption.SO_KEEPALIVE, false)
66-
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
67-
.doOnConnected(
68-
c -> c.addHandlerLast(new ReadTimeoutHandler(6)).addHandlerLast(new WriteTimeoutHandler(7)))
69-
.responseTimeout(Duration.ofSeconds(7L)))).build();
70-
return webClient;
71-
}
72-
73-
private void sslTimeouts(SslContextSpec spec) {
74-
try {
75-
spec.sslContext(SslContextBuilder.forClient().build()).handshakeTimeout(Duration.ofSeconds(8))
76-
.closeNotifyFlushTimeout(Duration.ofSeconds(6)).closeNotifyReadTimeout(Duration.ofSeconds(6));
77-
} catch (SSLException e) {
78-
throw new RuntimeException(e);
79-
}
80-
}
81-
8246
@Bean(name = "clientTaskExecutor")
8347
public Executor threadPoolTaskExecutor() {
8448
return this.createThreadPoolTaskExecutor(20);
@@ -89,7 +53,7 @@ private Executor createThreadPoolTaskExecutor(int maxPoolSize) {
8953
executor.setMaxPoolSize(maxPoolSize);
9054
executor.setQueueCapacity(1);
9155
executor.setKeepAliveSeconds(1);
92-
executor.setAllowCoreThreadTimeOut(true);
56+
executor.setAllowCoreThreadTimeOut(true);
9357
return executor;
9458
}
9559
}

‎backend/src/main/java/ch/xxx/trader/adapter/cron/ScheduledTask.java

+6-9
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@
3030
import org.springframework.scheduling.annotation.Async;
3131
import org.springframework.scheduling.annotation.Scheduled;
3232
import org.springframework.stereotype.Component;
33-
import org.springframework.web.reactive.function.client.WebClient;
3433

34+
import ch.xxx.trader.domain.common.WebUtils;
3535
import ch.xxx.trader.domain.model.dto.WrapperCb;
3636
import ch.xxx.trader.domain.model.entity.QuoteBf;
3737
import ch.xxx.trader.domain.model.entity.QuoteBs;
@@ -59,7 +59,6 @@ public class ScheduledTask {
5959
private static final String URLPA = "https://api.paxos.com/v2";
6060
private static final String URLBF = "https://api.bitfinex.com";
6161

62-
private final WebClient webClient;
6362
private final BitstampService bitstampService;
6463
private final BitfinexService bitfinexService;
6564
private final ItbitService itbitService;
@@ -69,9 +68,7 @@ public class ScheduledTask {
6968
private final Scheduler mongoImportScheduler = Schedulers.newBoundedElastic(20, 40, "mongoImport", 10);
7069

7170
public ScheduledTask(BitstampService bitstampService, MyUserService myUserService, EventMapper messageMapper,
72-
BitfinexService bitfinexService, ItbitService itbitService, CoinbaseService coinbaseService,
73-
WebClient webClient) {
74-
this.webClient = webClient;
71+
BitfinexService bitfinexService, ItbitService itbitService, CoinbaseService coinbaseService) {
7572
this.bitstampService = bitstampService;
7673
this.bitfinexService = bitfinexService;
7774
this.itbitService = itbitService;
@@ -106,7 +103,7 @@ private void insertBsQuote(String currPair) {
106103
final AtomicBoolean exceptionLogged = new AtomicBoolean(false);
107104
Disposable subscribe = null;
108105
try {
109-
Mono<QuoteBs> request = this.webClient.get()
106+
Mono<QuoteBs> request = WebUtils.createWebClient().get()
110107
.uri(String.format("%s/v2/ticker/%s/", ScheduledTask.URLBS, currPair))
111108
.accept(MediaType.APPLICATION_JSON).exchangeToMono(response -> response.bodyToMono(QuoteBs.class))
112109
.map(res -> {
@@ -178,7 +175,7 @@ public void insertCoinbaseQuote() {
178175
final AtomicBoolean exceptionLogged = new AtomicBoolean(false);
179176
Disposable subscribe = null;
180177
try {
181-
Mono<QuoteCb> request = this.webClient.get().uri(ScheduledTask.URLCB + "/exchange-rates?currency=BTC")
178+
Mono<QuoteCb> request = WebUtils.createWebClient().get().uri(ScheduledTask.URLCB + "/exchange-rates?currency=BTC")
182179
.accept(MediaType.APPLICATION_JSON).exchangeToMono(response -> {
183180
return response.bodyToMono(WrapperCb.class);
184181
// return response.bodyToMono(String.class);
@@ -217,7 +214,7 @@ public void insertItbitUsdQuote() {
217214
final AtomicBoolean exceptionLogged = new AtomicBoolean(false);
218215
Disposable subscribe = null;
219216
try {
220-
Mono<QuoteIb> request = this.webClient.get()
217+
Mono<QuoteIb> request = WebUtils.createWebClient().get()
221218
.uri(String.format("%s/markets/%s/ticker", ScheduledTask.URLPA, currPair))
222219
.accept(MediaType.APPLICATION_JSON)
223220
.exchangeToMono(response -> response.bodyToMono(PaxosQuote.class)).map(res -> {
@@ -309,7 +306,7 @@ private void insertBfQuote(String currPair) {
309306
final AtomicBoolean exceptionLogged = new AtomicBoolean(false);
310307
Disposable subscribe = null;
311308
try {
312-
Mono<QuoteBf> request = this.webClient.get()
309+
Mono<QuoteBf> request = WebUtils.createWebClient().get()
313310
.uri(String.format("%s/v1/pubticker/%s", ScheduledTask.URLBF, currPair))
314311
.accept(MediaType.APPLICATION_JSON).exchangeToMono(response -> response.bodyToMono(QuoteBf.class))
315312
.map(res -> {

‎backend/src/main/java/ch/xxx/trader/domain/common/WebUtils.java

+33-1
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,20 @@
55
import java.util.Map;
66
import java.util.Optional;
77

8-
import jakarta.servlet.http.HttpServletRequest;
8+
import javax.net.ssl.SSLException;
99

1010
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
1111
import org.springframework.web.reactive.function.client.WebClient;
1212

13+
import io.netty.channel.ChannelOption;
14+
import io.netty.handler.ssl.SslContextBuilder;
15+
import io.netty.handler.timeout.ReadTimeoutHandler;
16+
import io.netty.handler.timeout.WriteTimeoutHandler;
17+
import jakarta.servlet.http.HttpServletRequest;
18+
import reactor.netty.http.client.HttpClient;
19+
import reactor.netty.resources.ConnectionProvider;
20+
import reactor.netty.tcp.SslProvider.SslContextSpec;
21+
1322
public class WebUtils {
1423

1524
public static final String LASTOBCALLBF = "LAST_ORDERBOOK_CALL_BITFINEX";
@@ -34,7 +43,30 @@ public static WebClient buildWebClient(String url) {
3443
ReactorClientHttpConnector connector = new ReactorClientHttpConnector();
3544
return WebClient.builder().clientConnector(connector).baseUrl(url).build();
3645
}
46+
47+
public static WebClient createWebClient() {
48+
ConnectionProvider provider = ConnectionProvider.builder("Client").maxConnections(20)
49+
.maxIdleTime(Duration.ofSeconds(6)).maxLifeTime(Duration.ofSeconds(7))
50+
.pendingAcquireTimeout(Duration.ofSeconds(9L)).evictInBackground(Duration.ofSeconds(10)).build();
3751

52+
WebClient webClient = WebClient.builder().clientConnector(new ReactorClientHttpConnector(HttpClient
53+
.create(provider).secure(spec -> sslTimeouts(spec)).option(ChannelOption.SO_KEEPALIVE, false)
54+
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000)
55+
.doOnConnected(
56+
c -> c.addHandlerLast(new ReadTimeoutHandler(6)).addHandlerLast(new WriteTimeoutHandler(7)))
57+
.responseTimeout(Duration.ofSeconds(7L)))).build();
58+
return webClient;
59+
}
60+
61+
private static void sslTimeouts(SslContextSpec spec) {
62+
try {
63+
spec.sslContext(SslContextBuilder.forClient().build()).handshakeTimeout(Duration.ofSeconds(8))
64+
.closeNotifyFlushTimeout(Duration.ofSeconds(6)).closeNotifyReadTimeout(Duration.ofSeconds(6));
65+
} catch (SSLException e) {
66+
throw new RuntimeException(e);
67+
}
68+
}
69+
3870
public static Optional<String> extractToken(Map<String,String> headers) {
3971
String authStr = headers.get(AUTHORIZATION);
4072
return extractToken(Optional.ofNullable(authStr));

‎backend/src/main/java/ch/xxx/trader/usecase/services/CoinbaseService.java

+4
Original file line numberDiff line numberDiff line change
@@ -320,6 +320,10 @@ private GetSetMethodFunctions createGetMethodFunction(PropertyDescriptor propert
320320
// log.info(propertyDescriptor.getName());
321321
if (gsmf == null) {
322322
synchronized (this) {
323+
if (cbFunctionCache.size() > 10000) {
324+
LOG.info("CbFunctionCache size: {}", cbFunctionCache.size());
325+
cbFunctionCache.clear();
326+
}
323327
gsmf = cbFunctionCache.get(propertyDescriptor.getName());
324328
if (gsmf == null) {
325329
final MethodHandles.Lookup lookupGetter = MethodHandles.lookup();

‎backend/src/test/java/ch/xxx/trader/adapter/cron/ScheduledTaskTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public class ScheduledTaskTest {
3838
@Test
3939
public void convertTest() throws JsonMappingException, JsonProcessingException {
4040
PaxosQuote paxosQuote = this.objectMapper.readValue(VALUE, PaxosQuote.class);
41-
ScheduledTask scheduledTask = new ScheduledTask(null, null, null, null, null, null, null);
41+
ScheduledTask scheduledTask = new ScheduledTask(null, null, null, null, null, null);
4242
QuoteIb quoteIb = scheduledTask.convert(paxosQuote);
4343
Assertions.assertEquals("XBTUSD", quoteIb.getPair());
4444
Assertions.assertEquals(paxosQuote.getBestBid().getPrice().toString(), quoteIb.getBid().toString());

0 commit comments

Comments
 (0)
Please sign in to comment.