On efficient reading and writing for large scale simulations

Last year, my colleagues and I presented a paper on giga model simulations in an SPE conference: Giga-Model Simulations In A Commercial Simulator – Challenges & Solutions. During this talk, we talked about the complexity of I/O for such simulations. We had ordered data as input that we needed to split in chunks to send them on the relevant MPI ranks, and then the same process was required for writing the results, gathering the chunks and then writing them down to the disk.

The central point is that some clusters have parallel file systems, and these works well when you try to access big blobs of aligned data. In fact, as they are the bottleneck of the whole system, you need to limit the number of accesses to what you actually require. For instance in HDF5, you can specify the alignment of datasets, so you can say that all HDF5 datasets will be aligned on the filesystem specifications (so for instance 1MB if your Lustre/GPFS has a chunk size of 1MB) and read or write chunks that are multiple of these values.

Gathering data (writing)

Writing result has the following constraints:

  • Limiting the number of write calls
  • Choosing the proper number of writers

To limit the number of write calls, we need to ensure that the call size is a multiple of the stripe size (for Lustre), and that these calls are aligned to the same stripe size. To ensure the latter, we can use HDF5 alignment specification if we are using HDF5.

The number of writers is dependent on the file system. Lustre is supposed to behave better with the number of writers equal to the number of file controllers, and the write size equal to the stripe size but GPFS doesn’t officially have these constraints.

Let’s say that we have a mapping with global values to local values (so on each rank we know that we have some values that have to be written in non contiguous locations in the output dataset).

A simple algorithm to write data is the following:

  • Split the output data in chunks
  • For each chunk in order, on each rank
    • Write your value where you have a value
    • Put +infinite where you don’t have a value
    • Reduce the chunk on the writer rank with the MIN operator
    • Write the chunk

Of course, this is not really efficient. If you have lots of ranks, then the chunks will be more or less full of infinite values, and reducing that on the writer rank will consume network bandwidth. One solution is to use some kind of the algorithm prefix. The simple implementation of the algorithm is the following one:

  • Split the output data in chunks
  • For each chunk in order, on each rank
    • Compute the number of values you have for the chunk
    • Send that value to the writer rank
    • Send to the writer rank the values and the global position for each value
    • On the writer rank, get the data and sort the chunk accordingly
    • Write the data on disk

This is the explanation of the simple algorithm. More complex algorithms will use threads to write data, will create several chunks at the same time for the same writer, or simply test which of the transfers are finished and advance each chunk in parallel.

The implementation I share here is an intermediate step. It is an asynchronous version, as it exchanges several chunks at the same time, one per writer, threaded as the writing is done on a thread. But it is not the best you can achieve, as it synchronizes all chunk exchanges at each step. All the chunks exchange relevant sizes on each rank, then it exchanges the data and the indices and finally sorts the data before writing it. A more advanced version would start on the non writing ranks all the exchanges, and not the sizes, and it would not explicitly synchronize each of the three steps.

The reason this example is still somewhat synchronized is that we can check at each step that everything is done. It’s just a matter of understanding.

This algorithm, or more precisely an improved version of it, was used for the paper and it shows significant improvement over the reduce implementation (a factor of three on runs with several thousands cores). Of course, for each chunk, the algorithm requires 3 MPI calls instead of just one, but the required bandwidth is scaled down by the number of cores. To reduce the number of calls, one solution is to increase the chunk size or to process several chunks destined to be written by the same writer at the same time.

Scattering data (reading)

Scattering data is the opposite problem. We only started to address it this year, but we tested several implementations. The first one we tested is the equivalent of the one I presented before. The other one is a simple broadcast based one.

  • Split the output data in chunks
  • Read data from the readers
  • Broadcast from the readers to all processes
  • Process the data from each rank

Of course, the same kind of optimization than for the gathering can be used here. Instead of reading before the broadcast, just wait for the read to end, start the next one and send the previous one, instead of explicitly process all data synchronously, you can process a chunk as soon as the broadcast is finished and launch the next one (with MPI_Testall).

Actually this broadcast algorithm performed better than the scatter version of the previous one. But why? I think the reason is really simple. With the scatter, you have 3 calls, and you send at least the same number of values as in the chunk. In the end, you send even more data and you have more communication latency. Indeed, the broadcast makes only one chunk fly over the network to all blades, so that makes it faster than the other solutions.

There is no equivalent to the broadcast for data output. You can only have different chunks coming from the different ranks, so you can benefit from he same facilities. But at least this makes it easy to implement different solutions!


I implemented this a long time ago, and now all the asynchronous collectives are starting being available natively on all MPI implementations. I find it quite neat to be able to read and write data for large simulations. I haven’t seen at the time any explanation as to how to accomplish this on the Internet, so it was time I share this solution.

I heard other people used other techniques to do this, they always involve some kind of mapping (I used a sorted vector of global and local indices, other use a distributed hash table, but I think it bowls down to similar things). My solution doesn’t store the amount of data each rank has for each chunk, not sure it is better to do so or not…

Buy Me a Coffee!
Other Amount:
Your Email Address:

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.