maartenjan.dev

Hi! I'm Maarten-Jan, a Software Engineer focussed on Java, Kotlin, Event Sourcing and Kubernetes. I sometimes write down what I do.

Streaming query results to the client

This is a blog adapted from a presentation based on solving a problem, using this excellent blog post as inspiration. Read that one first!

The problem

Aggregating a large amount of data from a postgres database and making it downloadable as a CSV file through an API. The existing solution was very slow and could only deliver a limited dataset.

What we wanted to do is stream the required data from the database directly to the client without having to keep the entire data bulk in memory.

Step 1: Jdbc template stream

We're using the queryForStream method on the JdbcTemplate to query our data. This will result in a Java stream, which will allow us to handle the data in memory per item or subset of items. Important point to think about is the fetch size: this will set the amount of data retrieved per query and thus the amount of roundtrips jdbc will do to the database (and thus how much you will keep in memory).

We're using the StreamingResponseBody to let the client know the data is streaming in. We're using try with resources in order to make sure the resource is cleaned up in the end.

Example code:

JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
jdbc.

setFetchSize(1000);

public StreamingResponseBody getStukInformatieOverzicht(parameters…) {
    var resultStream = jdbcTemplate.queryForStream("query", parameters, new RowMapper());
    return httpResponseOutputStream -> {
        try (Writer writer = new BufferedWriter(new OutputStreamWriter(httpResponseOutputStream))) {
            try {
                writer.write(HEADER + LINE_SEPARATOR);
                resultStream.forEach(csvRegel -> {
                    try {
                        writer.write(csvRegel.toCsvLine() + LINE_SEPARATOR);
                        writer.flush();
                    } catch (IOException e) {
                        LOGGER.error("Error!", e);
                    }
                });
            } catch (Exception e) {
                LOGGER.error("Error!", e);
            }
        }
    };
}

Step 3: the Controller

In the Spring controller, we're telling the client we're returning a file (through mediatype octet stream and content disposition). And the responsetype again indicates streaming data.


@GetMapping(value = "/endpoint")
public ResponseEntity<StreamingResponseBody> response(
        @RequestParam(name = "from") @DateTimeFormat(pattern = "yyyy-MM-dd") LocalDate vanaf,
        @RequestParam(name = "till") @DateTimeFormat(pattern = "yyyy-MM-dd") LocalDate totEnMet) {
    return ResponseEntity
            .status(HttpStatus.OK)
            .contentType(MediaType.APPLICATION_OCTET_STREAM)
            .header(HttpHeaders.CONTENT_DISPOSITION,
                    "attachment; filename=stukinformatie-overzicht.csv")
            .body(service.getStreamingResponseBody(vanaf, totEnMet));
}

Step 4: Config

As far as I got it, this config is required to limit maximum number of threads used by the streaming api to perform the streaming.


@Bean
public ConcurrentTaskExecutor concurrentTaskExecutor() {
    return new ConcurrentTaskExecutor(Executors.newFixedThreadPool(5, new ThreadFactory() {
        private final AtomicInteger threadCounter = new AtomicInteger(0);

        @Override
        public Thread newThread(@NonNull Runnable runnable) {
            return new Thread(runnable, "asyncThread-" + threadCounter.incrementAndGet());
        }
    }));
}

@Bean
public WebMvcConfigurer webMvcConfigurer(ConcurrentTaskExecutor concurrentTaskExecutor) {
    return new WebMvcConfigurer() {
        @Override
        public void configureAsyncSupport(@NonNull AsyncSupportConfigurer configurer) {
            configurer.setDefaultTimeout(-1);
            configurer.setTaskExecutor(concurrentTaskExecutor);
        }
    };
}