Disclaimer
This article represents my perspective on this solution, thus any suggestions, fixes, or discussions will be highly appreciated.
The short story
We use Spring Boot and Spring Webflux to run our platform with Netty HTTP server and client. As you may know, Netty doesn't have an out-of-the-box solution to decompress incoming HTTP requests to an HTTP server, hence I needed to "reinvent the wheel" using Netty capabilities.
The implementation
There are a few ways to implement a decompression on the incoming request in Netty:
- via WebFilter
- via custom handler added to the Netty event loop
- via controller itself
The most convenient and flexible option is to use the WebFilter
interface with a high precedence configuration, as this approach allows us to filter requests not only by their metadata but also by the request's data we receive from our clients.
According to the MSDN documentation (Compression in HTTP) in order to start a decompression of the payload, the server have to get an HTTP header Content-Encoding: gzip
from a client. Thus, our first goal is to recognize the presence of such a header within a request. Let's create a utility class, CompressionUtils
, that will hold a static method to verify the presence of this header and another static method that will convert a GZIP input stream into a byte array to transfer this array further into the Netty pipeline:
import org.springframework.http.HttpHeaders;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.stereotype.Component;
import lombok.AccessLevel;
import java.io.IOException;
import java.io.InputStream;
import java.io.StringWriter;
import java.nio.charset.StandardCharsets;
import static java.util.Objects.nonNull;
import static org.apache.commons.io.IOUtils.copy;
import static org.springframework.http.HttpHeaders.CONTENT_ENCODING;
import static org.springframework.util.CollectionUtils.isEmpty;
@Component
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class CompressionUtils {
public static final String GZIP = "gzip";
public static final String UNKNOWN = "unknown";
public static byte[] getDeflatedBytes(InputStream inputStream) throws IOException {
String string = IOUtils.toString(inputStream, UTF_8);
return string.getBytes();
}
public static boolean isGzipRequest(ServerHttpRequest serverHttpRequest) {
return containsGzip(serverHttpRequest, CONTENT_ENCODING);
}
public static boolean isGzipResponseRequired(ServerHttpRequest serverHttpRequest) {
return containsGzip(serverHttpRequest, ACCEPT_ENCODING);
}
private static boolean containsGzip(ServerHttpRequest serverHttpRequest, String headerName) {
HttpHeaders headers = serverHttpRequest.getHeaders();
if (!isEmpty(headers)) {
String header = headers.getFirst(headerName);
return nonNull(header) && header.contains(GZIP);
}
return false;
}
}
To prevent blocking conversion of the incoming input stream into a byte array, I've used the Apache Commons IO project and the static toString
method uses the StringBuilder
class.
The second stage of our implementation is to wrap an incoming ServerHttpRequest
instance into our implementation. In this way, we can retain the source data of the incoming request and override the getBody
method. The Netty pipeline invokes this method to convert incoming data into a Netty's DataBuffer
wrapper. Therefore, it's a convenient way to swap this part of the flow instead of integrating it into the Netty flow with a custom handler. Customizing it via WebFilter
is a cleaner approach that does not affect all incoming requests and allows us to control the decompression process (with error handling and metrics).
So, our wrapper class GzipServerHttpRequest
has the main method getBody
with the following logic:
- Gets a body of the source server HTTP request.
- Transform it into an
InputStream
. - Transform a reactive stream of input streams into a
SequenceInputStream
to get the whole POST request body. - Decompress the GZIP body with
GZIPInputStream
. - Wrap the final byte array into a Netty
DataBuffer
again.
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.http.HttpCookie;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.server.RequestPath;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.SslInfo;
import org.springframework.util.MultiValueMap;
import reactor.core.publisher.Flux;
import reactor.core.publisher.SynchronousSink;
import lombok.RequiredArgsConstructor;
import java.io.InputStream;
import java.io.SequenceInputStream;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.zip.GZIPInputStream;
@SuppressWarnings("NullableProblems")
@RequiredArgsConstructor
public class GzipServerHttpRequest implements ServerHttpRequest {
private final ServerHttpRequest serverHttpRequest;
@Override
public String getId() {
return serverHttpRequest.getId();
}
@Override
public RequestPath getPath() {
return serverHttpRequest.getPath();
}
@Override
public MultiValueMap<String, String> getQueryParams() {
return serverHttpRequest.getQueryParams();
}
@Override
public MultiValueMap<String, HttpCookie> getCookies() {
return serverHttpRequest.getCookies();
}
@Override
public String getMethodValue() {
return serverHttpRequest.getMethodValue();
}
@Override
public URI getURI() {
return serverHttpRequest.getURI();
}
@Override
public HttpHeaders getHeaders() {
return serverHttpRequest.getHeaders();
}
@Override
public InetSocketAddress getRemoteAddress() {
return serverHttpRequest.getRemoteAddress();
}
@Override
public SslInfo getSslInfo() {
return serverHttpRequest.getSslInfo();
}
@Override
public HttpMethod getMethod() {
return serverHttpRequest.getMethod();
}
@Override
public Flux<DataBuffer> getBody() {
return serverHttpRequest.getBody()
.map(dataBuffer -> dataBuffer.asInputStream(true))
.reduce(SequenceInputStream::new)
.handle(this::decompress)
.flux();
}
private void decompress(InputStream inputStream, SynchronousSink<DataBuffer> sink) {
try (var gzipInputStream = new GZIPInputStream(inputStream)) {
byte[] deflatedBytes = getDeflatedBytes(gzipInputStream);
sink.next(new DefaultDataBufferFactory().wrap(deflatedBytes));
} catch (Exception exception) {
sink.error(getException());
}
}
private IllegalGzipRequestException getException() {
String exceptionMessage = String.format("Decompression of a gzip content failed, URI: [%s]", serverHttpRequest.getURI());
return new IllegalGzipRequestException(exceptionMessage);
}
}
Once we have the main implementation, we can now add the GzipDecompressionFilter
to mutate our ServerWebExchange
with the new instance of a wrapped ServerHttpRequest
:
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import org.springframework.http.HttpHeaders;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebFilter;
import org.springframework.web.server.WebFilterChain;
import reactor.core.publisher.Mono;
import static java.util.Objects.isNull;
import static org.apache.commons.lang3.exception.ExceptionUtils.getMessage;
import static org.springframework.http.HttpHeaders.CONTENT_ENCODING;
import static org.springframework.http.HttpStatus.OK;
import static org.springframework.util.CollectionUtils.isEmpty;
@Slf4j
@Component
@Order(Ordered.HIGHEST_PRECEDENCE)
public class GzipDecompressionFilter implements WebFilter {
@SuppressWarnings("NullableProblems")
@Override
public Mono<Void> filter(ServerWebExchange serverWebExchange, WebFilterChain webFilterChain) {
if (!isGzipRequest(serverWebExchange.getRequest()))
return webFilterChain.filter(serverWebExchange);
ServerWebExchange mutatedWebExchange = getMutatedWebExchange(serverWebExchange);
return webFilterChain
.filter(mutatedWebExchange)
.onErrorResume(this::logError);
}
private ServerWebExchange getMutatedWebExchange(ServerWebExchange serverWebExchange) {
ServerHttpRequest mutatedHttpRequest = new GzipServerHttpRequest(serverWebExchange.getRequest());
return serverWebExchange
.mutate()
.request(mutatedHttpRequest)
.build();
}
private Mono<Void> logError(Throwable exception) {
log.error("Gzip decompressed HTTP request failed, exception: [{}]", getMessage(exception));
return Mono.empty();
}
}
The last class we'll add is our custom RuntimeException, IllegalGzipRequestException
, to recognize errors of this type in the third-party system we use for intercepting exceptions in our services:
public class IllegalGzipRequestException extends RuntimeException {
public IllegalGzipRequestException(String message) {
super(message);
}
}
In case our client wants to get a compressed response, we can easily add this implementation via the application.properties
file:
server.compression.enabled=true
server.compression.min-response-size=1KB
Resources
Finding my articles helpful? You could give me a caffeine boost to keep them coming! Your coffee donation will keep my keyboard clacking and my ideas brewing. But remember, it's completely optional. Stay tuned, stay informed, and perhaps, keep the coffee flowing!
Top comments (0)