Random Notes by agilob

JavaDangers of OpenCSV beans streaming

Jump to Section

That one time I have to extract, transform and load a massive CSV file into a bunch of database entities and it was kinda slow…

The class had position based CSV bindings, loaded into beans and streamed from a pretty big CSV file (+10Gb):

public class CSVUserEntry {

    @CsvBindByPosition(position = 0)
    private String userId;

    @CsvBindByPosition(position = 1)
    private String username;

    @CsvBindByPosition(position = 2)
    private String deviceId;

    @CsvBindByPosition(position = 3)
    private String keyAlias;

    @CsvBindByPosition(position = 4)
    private String passcodeKeyAlias;

    @CsvBindByPosition(position = 5)
    private String confirmationId;

}

Then I opened the stream in generic way, with a simple and fluent interfaces of Java stream API:

import com.opencsv.bean.CsvToBean;
import com.opencsv.bean.CsvToBeanBuilder;

public Stream<T> openStreamReader(final String filename, final Class<T> clazz) throws IOException {
    reader = new FileReader(Path.of(filename).toString());

    final CsvToBean<T> csvReader = new CsvToBeanBuilder<T>(reader)
            .withType(clazz)
            .withSeparator(',')
            .withIgnoreEmptyLine(true)
            .withSkipLines(1) //skip header
            .build();

    return csvReader.stream();
}

The returned stream will be consumed by .forEach().

With that simple code things started going south. Application was suddenly consuming all the CPUs it could, memory usage went thought the roof, overall performance degraded and after several minutes we got OOM crashes with 5.5Gb heap dumps… welp. Quick investigation with Eclipse MAT showed me the problem.

Heap usage:

Eclipse MAT memory allocation

Thread allocations:

Eclipse MAT memory allocation grouped

Now, that’s enough information, but let’s dig a bit deeper to see if I can easily fix it. I checked for references and path to get from top of the thread to my object:

JMC OpenCSV memory usage

That’s awfully a lot of ConcurrentSkipListMap references to get to a single bean.

I checked why .stream() is producing so many objects, as it literally can’t be CSV parsing and creating one bean object at a time consuming most of the allocated memory and causing OOM crashes.

The following source is available in OpenCSV 5.5.2, the implementation details might not be true in other versions.

public Stream<T> stream() throws IllegalStateException {
    prepareToReadInput();
    CompleteFileReader<T> completeFileReader = new CompleteFileReader<>(
            csvReader, filter, ignoreEmptyLines,
            mappingStrategy, exceptionHandler, verifiers);
    executor = new LineExecutor<T>(orderedResults, errorLocale, completeFileReader);
    executor.prepare();
    return StreamSupport.stream(executor, false);
}

To add some context here:

  • CompleteFileReader - Runnable
  • IntolerantThreadPoolExecutor - ThreadPoolExecutor and Spliterator
  • Stream returned from StreamSupport.stream is not parallel
  • AccumulateCsvResults is a Thread

The executor contains resultantBeansMap which is of type ConcurrentSkipListMap. this is my problematic object consuming most memory.

It uses BlockingQueue to read file line by line and while 3 conditions are met, the AccumulateCsvResults keeps running. The running loop: creates OrderedObject, for which the purpose is documented:

// Move the output objects from the unsorted queue to the
// navigable map. Only the next expected objects are moved;
// if a gap in numbering occurs, the thread waits until those
// results have been filled in before continuing.

The loop puts next ordinal element to the map. Then we go back to the loop in LineExecutor class, which checks for next available object in resultantBeansMap , the lineExecutor goes to StreamSupport which gives us the object in .forEach(), but taking it from a Spliterator again. The spliterator will be taken from ConcurrentSkipListMap.values().spliterator().

This journey explains why there are so many spliterator objects that live for so long, and why I had to go deep to find my final CSV bean. It still feels very wasteful to go through this process. The implementation uses Multiple maps, Queues, BlockingQueues to read one object in an unordered for loop, but the implementation and documentation is solid. The only thing I noticed: IntolerantThreadPoolExecutor.resultantBeansMap might not always remove processed beans on time and keep ordinal reference and the bean in memory. At least I can’t find any place where the map removes the key-value pair.

I quickly looked at alternatives how to consume the CSV file without using Stream/Spliterator APIs. There is another method .iterator() which simply exposes Iterator<T>. Now, I’ll just compare implementation before trying it:

@Override
public Iterator<T> iterator() {
    prepareToReadInput();
    return new CsvToBeanIterator();
}

Yup, LGTM. I’m going to use this one to fix my memory leak. A few minutes later, memory usage dropped from ~13000Mb (yes, the JVM was started with -Xmx15500m) to stable 150Mb.