reactive webclient

1
2
3
4
5
6
7
8
WebClient.create().get()
.uri("https://example.org/path")
.httpRequest(httpRequest -> {
HttpClientRequest reactorRequest = httpRequest.getNativeRequest();
reactorRequest.responseTimeout(Duration.ofSeconds(2));
})
.retrieve()
.bodyToMono(String.class);

WebClient & HttpClient

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
public class MyWebClient {

public static void main(String[] args) throws IOException {
Mono<ClientResponse> clientResponseMono = createWebClient(createHttpClient("webClient-pool", 30))
.get()
.uri("http://localhost:8089/")
// .headers(httpHeaders -> httpHeaders.add("Connection", "keep-alive"))
.httpRequest(httpRequest -> {
HttpClientRequest reactorRequest = httpRequest.getNativeRequest();
reactorRequest.responseTimeout(Duration.ofSeconds(2));
})
.exchange()
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);

clientResponseMono.block();

/*
ServerRequest request;
createWebClient(createHttpClient("webClient-pool", 30))
.method(HttpMethod.GET)
.uri("Url")
.body(BodyInserters.fromDataBuffers(request.body(BodyExtractors.toDataBuffers())))
.headers(httpHeaders -> httpHeaders.addAll(request.headers().asHttpHeaders()))
.exchange()
.doOnError(throwable -> log.error(""))
.doOnSuccess(clientResponse -> log.info("success"))
.doFinally(signalType -> log.info("do on finally"))
.doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
*/
}

private static WebClient createWebClient(HttpClient httpClient) {
return WebClient.builder()
.filter(new LogFilter())
.clientConnector(new ReactorClientHttpConnector(httpClient))
.exchangeStrategies(ExchangeStrategies.builder().codecs(configurer -> configurer.defaultCodecs().maxInMemorySize(-1)).build())
.uriBuilderFactory(new MyUriBuilderFactory())
.build();
}

private static HttpClient createHttpClient(String poolName, int timeoutSec) {
return HttpClient
.create(ConnectionProvider.builder(poolName)
.maxConnections(20480)
.maxIdleTime(Duration.of(600, ChronoUnit.SECONDS))
.build()
)
.tcpConfiguration(tcpClient -> tcpClient
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (int)TimeUnit.SECONDS.toMillis(10))
.option(ChannelOption.SO_KEEPALIVE, false)
.doOnConnected(connection -> connection
.addHandlerLast(new ReadTimeoutHandler(timeoutSec))
.markPersistent(false)
)
)
.followRedirect(false)
.keepAlive(false)
.compress(true);
}

}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import java.net.URI;
import java.util.Optional;

import org.springframework.web.reactive.function.client.ClientRequest;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
import org.springframework.web.reactive.function.client.ExchangeFunction;

import lombok.extern.slf4j.Slf4j;
import reactor.core.publisher.Mono;

@Slf4j
public class LogFilter implements ExchangeFilterFunction {
@Override
public Mono<ClientResponse> filter(ClientRequest request, ExchangeFunction next) {
URI url = request.url();
Optional<String> traceIdOption = Optional.ofNullable(request.headers().getFirst("trace-id"));
log.info("{} Request: {} {}", traceIdOption.orElse(request.logPrefix()), request.method(), url);
return next.exchange(request).flatMap(response -> {
log.info("{} Response:{} {} {}", traceIdOption.orElse(response.logPrefix()), request.method(), url, response.statusCode());
return Mono.just(response);
});
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package com.example.demo.reactorclient;

import java.io.UnsupportedEncodingException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URLDecoder;
import java.util.Map;
import java.util.function.Supplier;

import org.apache.commons.codec.Charsets;
import org.springframework.web.util.UriBuilderFactory;
import org.springframework.web.util.UriComponentsBuilder;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class MyUriBuilderFactory implements UriBuilderFactory {
public UriComponentsBuilder uriString(String uriTemplate) {
return UriComponentsBuilder.fromUriString(uriTemplate);
}

@Override
public UriComponentsBuilder builder() {
return UriComponentsBuilder.newInstance();
}

@Override
public URI expand(String uriTemplate, Map<String, ?> uriVariables) {
return expand(uriTemplate, () -> uriVariables);
}

@Override
public URI expand(String uriTemplate, Object... uriVariables) {
return expand(uriTemplate, () -> uriVariables);
}

public <T> URI expand(String uriTemplate, Supplier<T> uriVariablesSupplier) {
T uriVariables = uriVariablesSupplier.get();
try {
return new URI(UriComponentsBuilder.fromHttpUrl(uriTemplate).build().expand(uriVariables).toUriString());
} catch (URISyntaxException e) {
try {
String decode = URLDecoder.decode(uriTemplate, Charsets.UTF_8.name());
String encodeUrl = UriComponentsBuilder.fromHttpUrl(decode).build().expand(uriVariables).encode().toUriString();
log.warn("Url encode:[{},{}]", uriTemplate, encodeUrl);
return new URI(encodeUrl);
} catch (URISyntaxException | IllegalArgumentException | UnsupportedEncodingException exception) {
log.error(exception.getMessage() + ":" + uriTemplate, exception); // log error for monitor
throw new RuntimeException("bad request");
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
<properties>
<java.version>1.8</java.version>
<reactor-netty.version>0.9.12.RELEASE</reactor-netty.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-reactor-netty</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.10</version>
</dependency>
<dependency>
<groupId>io.projectreactor.netty</groupId>
<artifactId>reactor-netty</artifactId>
<version>${reactor-netty.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.69.Final</version>
</dependency>

</dependencies>

HttpClient keepalive config

ref: https://projectreactor.io/docs/netty/snapshot/api/index.html?reactor/netty/http/client/HttpClient.html
img.png

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private static HttpClient createHttpClient(String poolName, int timeoutSec) {
return HttpClient
.create(ConnectionProvider.builder(poolName)
.maxConnections(20480)
.maxIdleTime(Duration.of(600, ChronoUnit.SECONDS))
.build()
)
.tcpConfiguration(tcpClient -> tcpClient
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (int)TimeUnit.SECONDS.toMillis(10))
.option(ChannelOption.SO_KEEPALIVE, false)
.doOnConnected(connection -> connection
.addHandlerLast(new ReadTimeoutHandler(timeoutSec))
.markPersistent(false)
)
)
.followRedirect(false)
.keepAlive(false)
.compress(true);
}

note

如果 httpClient.keepalive = false,
可以通过 添加 header: "Connection","keep-alive", 启用 keepalive

参考文章

评论