1 2 3 4 5 6 7 8
| WebClient.create().get() .uri("https://example.org/path") .httpRequest(httpRequest -> { HttpClientRequest reactorRequest = httpRequest.getNativeRequest(); reactorRequest.responseTimeout(Duration.ofSeconds(2)); }) .retrieve() .bodyToMono(String.class);
|
WebClient & HttpClient
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
| public class MyWebClient {
public static void main(String[] args) throws IOException { Mono<ClientResponse> clientResponseMono = createWebClient(createHttpClient("webClient-pool", 30)) .get() .uri("http://localhost:8089/") .httpRequest(httpRequest -> { HttpClientRequest reactorRequest = httpRequest.getNativeRequest(); reactorRequest.responseTimeout(Duration.ofSeconds(2)); }) .exchange() .doOnDiscard(PooledDataBuffer.class, DataBufferUtils::release);
clientResponseMono.block();
}
private static WebClient createWebClient(HttpClient httpClient) { return WebClient.builder() .filter(new LogFilter()) .clientConnector(new ReactorClientHttpConnector(httpClient)) .exchangeStrategies(ExchangeStrategies.builder().codecs(configurer -> configurer.defaultCodecs().maxInMemorySize(-1)).build()) .uriBuilderFactory(new MyUriBuilderFactory()) .build(); }
private static HttpClient createHttpClient(String poolName, int timeoutSec) { return HttpClient .create(ConnectionProvider.builder(poolName) .maxConnections(20480) .maxIdleTime(Duration.of(600, ChronoUnit.SECONDS)) .build() ) .tcpConfiguration(tcpClient -> tcpClient .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (int)TimeUnit.SECONDS.toMillis(10)) .option(ChannelOption.SO_KEEPALIVE, false) .doOnConnected(connection -> connection .addHandlerLast(new ReadTimeoutHandler(timeoutSec)) .markPersistent(false) ) ) .followRedirect(false) .keepAlive(false) .compress(true); }
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| import java.net.URI; import java.util.Optional;
import org.springframework.web.reactive.function.client.ClientRequest; import org.springframework.web.reactive.function.client.ClientResponse; import org.springframework.web.reactive.function.client.ExchangeFilterFunction; import org.springframework.web.reactive.function.client.ExchangeFunction;
import lombok.extern.slf4j.Slf4j; import reactor.core.publisher.Mono;
@Slf4j public class LogFilter implements ExchangeFilterFunction { @Override public Mono<ClientResponse> filter(ClientRequest request, ExchangeFunction next) { URI url = request.url(); Optional<String> traceIdOption = Optional.ofNullable(request.headers().getFirst("trace-id")); log.info("{} Request: {} {}", traceIdOption.orElse(request.logPrefix()), request.method(), url); return next.exchange(request).flatMap(response -> { log.info("{} Response:{} {} {}", traceIdOption.orElse(response.logPrefix()), request.method(), url, response.statusCode()); return Mono.just(response); }); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
| package com.example.demo.reactorclient;
import java.io.UnsupportedEncodingException; import java.net.URI; import java.net.URISyntaxException; import java.net.URLDecoder; import java.util.Map; import java.util.function.Supplier;
import org.apache.commons.codec.Charsets; import org.springframework.web.util.UriBuilderFactory; import org.springframework.web.util.UriComponentsBuilder;
import lombok.extern.slf4j.Slf4j;
@Slf4j public class MyUriBuilderFactory implements UriBuilderFactory { public UriComponentsBuilder uriString(String uriTemplate) { return UriComponentsBuilder.fromUriString(uriTemplate); }
@Override public UriComponentsBuilder builder() { return UriComponentsBuilder.newInstance(); }
@Override public URI expand(String uriTemplate, Map<String, ?> uriVariables) { return expand(uriTemplate, () -> uriVariables); }
@Override public URI expand(String uriTemplate, Object... uriVariables) { return expand(uriTemplate, () -> uriVariables); }
public <T> URI expand(String uriTemplate, Supplier<T> uriVariablesSupplier) { T uriVariables = uriVariablesSupplier.get(); try { return new URI(UriComponentsBuilder.fromHttpUrl(uriTemplate).build().expand(uriVariables).toUriString()); } catch (URISyntaxException e) { try { String decode = URLDecoder.decode(uriTemplate, Charsets.UTF_8.name()); String encodeUrl = UriComponentsBuilder.fromHttpUrl(decode).build().expand(uriVariables).encode().toUriString(); log.warn("Url encode:[{},{}]", uriTemplate, encodeUrl); return new URI(encodeUrl); } catch (URISyntaxException | IllegalArgumentException | UnsupportedEncodingException exception) { log.error(exception.getMessage() + ":" + uriTemplate, exception); throw new RuntimeException("bad request"); } } } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
| <properties> <java.version>1.8</java.version> <reactor-netty.version>0.9.12.RELEASE</reactor-netty.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-webflux</artifactId> <exclusions> <exclusion> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-logging</artifactId> </exclusion> <exclusion> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-reactor-netty</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> <version>4.5.10</version> </dependency> <dependency> <groupId>io.projectreactor.netty</groupId> <artifactId>reactor-netty</artifactId> <version>${reactor-netty.version}</version> </dependency> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.69.Final</version> </dependency>
</dependencies>
|
HttpClient keepalive config
ref: https://projectreactor.io/docs/netty/snapshot/api/index.html?reactor/netty/http/client/HttpClient.html
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| private static HttpClient createHttpClient(String poolName, int timeoutSec) { return HttpClient .create(ConnectionProvider.builder(poolName) .maxConnections(20480) .maxIdleTime(Duration.of(600, ChronoUnit.SECONDS)) .build() ) .tcpConfiguration(tcpClient -> tcpClient .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (int)TimeUnit.SECONDS.toMillis(10)) .option(ChannelOption.SO_KEEPALIVE, false) .doOnConnected(connection -> connection .addHandlerLast(new ReadTimeoutHandler(timeoutSec)) .markPersistent(false) ) ) .followRedirect(false) .keepAlive(false) .compress(true); }
|
note
如果 httpClient.keepalive = false,
可以通过 添加 header: "Connection","keep-alive"
, 启用 keepalive
参考文章