iovec to messagelet ring

I’ve been working on fos, the Factored Operating System.  fos is a project at MIT CSAIL. It uses messages to communicate between applications and services, in the same way a standard operating system uses system calls and function calls.

One thing I’ve been doing is adding iovec support to the message API.  This is equivalent to the difference between write(2), which writes a single buffer of data to a file, and writev(2), which uses an iovec data structure to gather pieces of the data into a single write to the file.  An iovec is an array of structures, each of which contains a pointer and length.

One of the types of high performance message transports in fos is a ring of cache-line sized messagelets.  Each messagelet has an 8 byte header and 56 bytes of data.  To send a message, one waits until the next messagelet in the ring is free (as shown by flags in the header), then you write an 8 byte message length field, and then copy the rest of the message into the messagelet.  If it doesn’t all fit, then you mark the first messagelet as filled, and wait for the next one to be free, and continue writing the message.

This is a slow design, because the sender must copy a longer message in 56 byte chunks, but it is also a rather fast method, because the receiver can be draining the head of the message while the sender is writing the tail.  The idea comes from a communications method in the Barrelfish research operating system.

With iovec, the sender has a bigger problem. In order to know the total length of the message, you have to add up the lengths in all the iovec entries. Then, you have to step through the iovec, and copy each one into a sequential series of messagelets.  An iovec entry may end in the middle of a messagelet.

How would you write this?  I’ve just started thinking about it, and will post my code here when I figure it out.

UPDATE

Here’s my version.

 

/* iovec_to_messagelet_ring.c
 * L. Stewart
 * 2011-12-29
 */

#include <stddef.h>
#include <stdint.h>
#include <sys/uio.h>
#include <string.h>

/* Messagelet functions */

typedef void CHANNEL; /* placeholder */

#define ML_SIZE 56


/* Returns a pointer to the data area of a messagelet.
 * The header is -8 bytes offset
 */

void *getfreemessagelet(CHANNEL *ch);

/* sets ready flag in messagelet header, turning it over to the receiver */
void postmessagelet(CHANNEL *ch, void *m);

void send(CHANNEL *ch, struct iovec *in_iov, int in_iovcnt)
{
  total_size = 0;
  int iov_index;  /* current iovec entry */
  void *m = NULL; /* current messagelet */
  void *mp; /* current pointer into messagelet */
  size_t ml_len; /* space left in current messagelet */
  size_t copy_length; /* amount to copy this time around the loop */
  struct iovec iov; /* working iovec entry */
  /* calculate total size of message by adding the lengths of the iovec entries */
  for (iov_index = 0; iov_index < in_iovcnt; iov_index += 1)
    total_size += in_iov[iov_index].iov_len;
  if (total_size == 0) return; /* nothing to do */
  m = getfreemessagelet(ch);
  *((uint64_t *) m) = total_size; /* set length of message */
  mp = (void *) ((uintptr_t) m + sizeof(uint64_t));
  ml_len = ML_SIZE - sizeof(uint64_t);
  iov_index = 0;
  iov.iov_len = 0;
  while (total_size > 0) {
    if (ml_len == 0) {
      m = mp = getfreemessagelet(ch);
      ml_len = ML_SIZE;
    }
    if (iov.iov_len == 0) {
      iov = in_iov[iov_index];
      iov_index += 1;
    }
    copy_length = (iov.iov_len < ml_len) ? iov.iov_len : ml_len;
    memcpy(mp, iov.iov_base, copy_length);
    ml_len -= copy_length;
    iov.iov_len -= copy_length;
    mp = (void *) ((uintptr_t) mp + copy_length);
    iov.iov_base = (void *) ((uintptr_t) iov.iov_base + copy_length);
    if (ml_len == 0) postmessagelet(ch, m);
    total_size -= copy_length;
  }
}