Change logAn implementation for the Message System

This document defines (and tries to justify) the software setup of the Robot software. The chosen setup is based on two assumptions:

  1. The robot software, as described in the paper by Pieter et all, is split onto a number of modules. These modules communicate with each other, but are otherwise independent. Each module will most likely be implemented as a separate task, by different programmers. (To avoid confusion with the Unix term process, we use the term task which can be either an Unix process or an Unix thread.)
  2. The described structure implies 3 types of communication: questions (Where is the ball?), orders (Kick the ball!) and triggered notifications (The battery is almost empty). Of course the question type implicitly includes an answer type.

The easiest way to implement the inter-module communication is based on polling. At convenient intervals each module asks all other modules if they want to communicate. If so the two modules setup some communication channel and data will be exchanged. This approach has some major drawbacks. Modules spend a lot of time to poll each other and some will have to wait until its partner is ready for communication. 

Therefore the more complicated approach based on message passing and queues was chosen. In a message passing system modules communicate by sending each other messages. Every module has a queue behind its input. A module can put a message in an input queue at any time, without having to wait for the receiver to be ready for it. The receiving module reads the message when it is ready with processing the previous message from the queue. (Of course the rate of message generation must not exceed the rate of message processing, but generation and processing don't have to be synchronized.)

Each module needs to implement a so called "message loop" which will get a message from the queue, process it, get the next message, process it and so on. Processing can take as long as it needs, although it might be advisable to start a new thread to handle real lengthy jobs. When the input queue is empty, the execution of the message loop thread will be suspended until a message becomes available. Each module should have only one message loop, so all messages are guaranteed to be processed in the same way.

The single message loop requirement makes it difficult to handle answers as normal messages (questions, orders and notifications). Normal operation is to ask a question and then to wait for the answer. During the wait normal messages should not be removed from the message queue, only the answer to the question should be processed. But the message loop shouldn't remove answers, just normal messages. Therefore we added a queue to each module for answers only.

We decided to implement 8 functions. All functions are (should be at least) thread safe, they don't use static or global variables. Every function returns -1 when an error occurred, something else when there were no problems. Buffers for the messages are dynamically created and destroyed, but are reused if possible.  (See below for an explanation how and why.)

int MsgSetup(int module_id);
This function initializes the message system. The module_id parameter is used as the sender ID of messages send by this thread. Each thread calling MsgNewMessage must call MsgSetup first! The first call to MsgSetup with a given module ID in a process will empty the queues for that module ID.
 
long MsgNewMessage(int destination, int command, int priority, int data_size, void **data_buffer);
This function supplies a buffer of data_size bytes for a new message. It builds the message envelope with source, destination and command and returns a pointer to the data part of the message in data_buffer. The priority parameter is not used yet. Every message created by MsgNewMessage can be posted just once. The result of the function is a message ID (unique within each process). 

If on entering MsgNewMessage the pointer in data_buffer is NULL, memory is allocated for the message. If the pointer is not NULL, it must point to a previously created message. If this message is large enough to hold data_size bytes, it is reused. If not, the memory for it is freed and new memory is allocated. It is advisable (but not required) to call MsgNewMessage every time with the same data_size.
 
long MsgNewQuestion(int destination, int command, int priority, int data_size, void **data_buffer);
This function also supplies a buffer for a new message. The parameters and the result are the same as for MsgNewMessage. The new message contains an indication that the sender is expecting an answer. The Communication module might use this to send a fake answer if a destination is unreachable.
 
int MsgPostMessage(void **data_buffer);
This function sends the message indicated by data_buffer to the queue of the receiver. The data buffer is no longer valid after calling this function. Every message should be created and initialized by MsgNewMessage. Currently the pointer in data_buffer is not modified, but this may change in the future if the underlying mechanism changes.
 
int MsgGetMessage(int *sender, void **data_buffer, int timeout);
This function gets the message at the head of the queue. If the queue is empty, it waits for a message for timeout milliseconds (if timeout is -1, it waits indefinitely). It returns the sender and a pointer to the data in sender and data_buffer respectively. The data in the buffer is valid until the next call to MsgGetMessage or until a call to MsgNewAnswer. The result is the command as inserted into the message by MsgNewMessage when a message was available, 0 when a timeout occurred or -1 when something went wrong.

If on entering MsgGetMessage the pointer in data_buffer is NULL, memory is allocated for the message. If the pointer is not NULL, it must point to a previously created message. If this message is large enough to hold the newly received message, it is reused. If not, the memory for it is freed and new memory is allocated. 
 
int MsgNewAnswer(void **data_buffer, void **data_out_buffer);
This function creates a new message as an answer to the question in data_buffer, which must have been retrieved by MsgGetMessage. The original buffer (and the data in it)  is not valid anymore after this call and the data_buffer parameter is set to NULL. The parameter data_out_buffer will get a pointer to the new data buffer, which contains the data from the incoming message. The new buffer can safely be used in other threads.
 
long MsgGetAnswer(long message_id, void **data_out_buffer, int timeout);
This function waits timeout milliseconds for the answer to the question message_id, where message_id was returned by MsgNewMessage, or 0 to get the first answer in the answer queue. A pointer to the data buffer is returned in data_out_buffer. The data in the buffer is valid until the next call to MsgGetAnswer. The result is the message ID when an answer was available, 0 when a timeout occurred or -1 when something went wrong.

If on entering MsgGetAnswer the pointer in data_buffer is NULL, memory is allocated for the message. If the pointer is not NULL, it must point to a previously created message. If this message is large enough to hold the newly received message, it is reused. If not, the memory for it is freed and new memory is allocated.
 
int MsgDestroy(void **data_buffer);
This function removes/destroys a message buffer returned by MsgNewMessage, MsgNewQuestion, MsgGetMessage, MsgNewAnswer or MsgGetAnswer. It should be used before leaving functions to free resources allocated for a message buffer.

Change logSome examples

This sections contains some pseudo-code examples for typical use of the Message System. The first example shows how to send messages to another module and how to retrieve answers. The second and third examples show how to make a message loop. The second example shows a single threaded module, while the third uses a multi threaded approach.

#include <stdlib.h>
#include <stdio.h>
#include <Msg.h>

#define EXM_CMD  1
#define EXM_QSTN 2 

struct ExmParam { int getal; double breuk; double result; };

int main(int argc, char *argv[]) {
    long mid;
    struct ExmParam *ptr = NULL; /* a pointer to a buffer must be
                                  * initialized to NULL so MsgNewMessage
                                  * will allocate a new message */

    /* Setup/initialize the message system for this example module */
    MsgSetup(EXM_ID);

    /* Create a message, allocating memory because ptr is NULL */
    ERRETVOID(MsgNewMessage(EXM_ID, EXM_CMD, 0,
                            sizeof(struct ExmParam), (void **)&ptr));

    /* Set the parameters */
    ptr->getal = 4; ptr->breuk = 6.21;

    /* Send the message, this will invalidate the message */
    ERRETVOID(MsgPostMessage((void **)&ptr));

    /* Get a new message buffer, reuse the old message and remember the ID */
    ERRETVOID(mid = MsgNewMessage(EXM_ID, EXM_QSTN, 0,
                                  sizeof(struct ExmParam), (void **)&ptr));

    /* Set the parameters */
    ptr->getal = 4; ptr->breuk = 6.21;

    /* Send the message */
    ERRETVOID(MsgPostMessage((void **)&ptr));

    /* Get the answer to the question indicated by the message ID */
    ERRETVOID(MsgGetAnswer(mid, (void **)&ptr, -1));

    /* Handle the result */
    printf("The answer to my question was: %g\n", ptr->result);

    /* Free the message buffer */
    ERRETVOID(MsgDestroy((void **)&ptr));
}

ERRETVOID is a macro which prints an error message and returns from the current routine if its parameter evaluates to -1. For simplicity this program uses one structure type for the parameters of all messages. Real programs will use different types for each message and therefore will need more typecasting. The same ptr variable  should be used in all MsgXxxxx calls to optimize memory access.

The second example is a message loop which handles all messages in a single thread. This is convenient for operations which take little time, or for operations which have to be executed sequentially anyway. The program uses functions to process the messages, so the difference with the  multi-threaded implementation will be more clear. All handler functions have the same parameter, the address of the pointer to the buffer. All typecasting, parameter extraction etc. is done in the handler function and the message loop stays clean and short. The pointer is also used to return the answer buffer for question handlers, so the message loop can reuse the buffer.

#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <Msg.h>

#define EXM_CMD  1
#define EXM_QSTN 2

struct ExmParam { int getal; double breuk; double result; };

void ExmCommand(void **vptr);
void ExmQuestion(void **vptr);
 
int main(int argc, char *argv[]) {
    int command, sender;
    void *ptr = NULL;         /* a pointer to a buffer must be initialized 
                               * to NULL so MsgGetMessage will allocate a
                               * new buffer */

    /* Setup/initialize the message system */
    MsgSetup(EXM_ID);

    /* The message loop, which will run forever */
    while (1) {
        switch (MsgGetMessage(&sender, &ptr, -1)) {
        case -1:              /* Error in MsgGetMessage */
            ERRPRINT;
            exit(1);
            break;
        case EXM_CMD:         /* The command */
            ExmCommand(&ptr);
            break;
        case EXM_QSTN:        /* The question */
            ExmQuestion(&ptr);
            break;
        default:              /* What? */ 
            fprintf(stderr, "main: Unknown command.\n"); 
            exit(1);
        }
    }
}

void ExmCommand(void **vptr) { 
    struct ExmParam *ptr = (struct ExmParam *)*vptr; /* Instead of typecasting
                                                      * and dereferencing,
                                                      * this makes programming
                                                      * a lot easier */

    printf("The command parameters are %d and %g\n", ptr->getal, ptr->breuk);
    printf("The command will do some time consuming calcultions\n");
    sleep(5);
    printf("The command is ready\n");
}

void ExmQuestion(void **vptr) {
    struct ExmParam *ptr;

    /* Make a new answer, this will destroy the question after copying */
    ERRETVOID(MsgNewAnswer(vptr, (void **)&ptr)); 

    /* Calculate and store the answer in the answer message */
    printf("The question is %d * %g, ", ptr->getal, ptr->breuk);
    ptr->result = ptr->getal * ptr->breuk;
    printf("the answer is %g\n", ptr->result);

    /* Send the answer */
    ERRETVOID(MsgPostMessage((void **)&ptr));

    /* Make the answer message available for reuse by the message loop */
    *vptr = ptr;
 }

The final example creates a new thread for each incoming message. For simplicity this example processes different command/question types in parallel and equal commands/questions sequentially. Before starting a new thread for a specific command/question the message loop waits until the previous thread has completed. For every command type MsgNewAnser is used to make a thread-save version of the buffer. The resulting pointer is used as the argument parameter in the call to pthread_create.

#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
#include <Msg.h>
 
#define EXM_CMD  1 
#define EXM_QSTN 2
 
struct ExmParam { int getal; double breuk; double result; };

void *ExmCommand(void *vptr);
void *ExmQuestion(void *vptr);\

int main(int argc, char *argv[]) {
    int command, sender;
    pthread_t cmd = 0, qstn = 0;
    void *nptr;
    void *ptr = NULL;       /* a pointer to a buffer must be initialized 
                             * to NULL so MsgGetMessage will allocate a
                             * new buffer */

    /* Setup/initialize the message system */
    MsgSetup(EXM_ID);
    
    /* The message loop, which will run forever */
    while (1) {
        switch (MsgGetMessage(&sender, &ptr, -1)) {
        case -1:               /* Error in MsgGetMessage */
            ERRPRINT;
            exit(1);
            break;
        case EXM_CMD:          /* The command */
            ERRETVOID(MsgNewAnswer(&ptr, &nptr));
            if (cmd) pthread_join(cmd, NULL);
            pthread_create(&cmd, NULL, ExmCommand, nptr);
            break;
        case EXM_QSTN:         /* The question */
            ERRETVOID(MsgNewAnswer(&ptr, &nptr));
            if (qstn) pthread_join(qstn, NULL);
            pthread_create(&qstn, NULL, ExmQuestion, nptr);
            break;
        default:               /* What? */
            fprintf(stderr, "main: Unknown command.\n");
            exit(1);
        }
    }
} 

void *ExmCommand(void *vptr) {
    struct ExmParam *ptr = (struct ExmParam *)vptr; /* Instead of typecasting
                                                     * and dereferencing,
                                                     * this makes programming
                                                     * a lot easier */

    printf("The command parameters are %d and %g\n", ptr->getal, ptr->breuk);
    printf("The command will do some time consuming calcultions\n");
    sleep(5);
    printf("The command is ready\n");
}

void *ExmQuestion(void *vptr) {
    struct ExmParam *ptr = (struct ExmParam *)vptr; /* Instead of typecasting
                                                     * and dereferencing,
                                                     * this makes programming
                                                     * a lot easier */

    /* Calculate and store the answer in the answer message */
    printf("The question is %d * %g, ", ptr->getal, ptr->breuk);
    ptr->result = ptr->getal * ptr->breuk;
    printf("the answer is %g\n", ptr->result);
    
    /* Send the answer */
    ERRETVOID(MsgPostMessage((void **)&ptr));

    /* Destroy the buffer and release it resources */
    ERRETVOID(MsgDestroy((void **)&ptr));
}

The difference in output from the second and third examples clearly show the sequential vs. parallel processing of the incoming messages.

Change logThe implementation background

The implementation is based on the message queue facility of Linux, although the functions are specified with the shared memory capabilities in mind. So maybe somewhere in the future the basic implementation will change. We tried to make the functions thread-safe, because we expect threads will be used extensively in the Robot software.

Because we don't know the size of the buffers beforehand we have to dynamically allocate and deallocate memory for them. The logical places to do that are:

Allocate Deallocate
MsgNewMessage
MsgNewQuestion
MsgPostMessage
MsgGetMessage MsgGetMessage
MsgNewAnswer
MsgNewAnswer MsgPostMessage
MsgGetAnswer MsgGetAnswer

Allocating and freeing memory can be very time consuming. Therefore we decided to add an optimization. Buffers will not be freeing in MsgPostMessage, MsgGetMessage, MsgNewAnswer etc., but just marked as invalid. Then if an invalid buffer is passed to MsgNewMessage, MsgGetMessage, MsgNewAnswer etc., it will be used again if large enough. If the used buffer is too small, it will be deallocated. If the buffer is still valid or no buffer is allocated a new buffer will be allocated. This makes it necessary to remember the value of data_buffer c.q. data_out_buffer between calls to MsgPostMessage and MsgNewMessage (and other pairs). Therefore non-automatic variables have to be used to store the pointer. For functions which don't have to be thread-save, these variables can be static or global. For thread-save functions it's more complicated, see the Linux man page for pthread_key_create for a possible solution. As a slower alternative, automatic variables can be used, but these have to be initialized to NULL before the first Msg call, and destroyed by MsgDestroy when the buffer is no longer needed.

Normal messages are put in the queue with mtype 1, questions with mtype 2 (other mtypes are reserved for future use). Messages are removed  by msgrcv in FIFO order. Answers are put into the answer queue with their question_id as their mtype. The msgrcv function can easily be used to remove a specific answer (by MsgGetAnswer with a specific message ID) or to remove any answer (by MsgGetAnswer with a 0 message ID).

A message consists of the concatenation of two data areas: the system part and the user part. The structure of the  user part is not important for the functions, only the size matters. The system part is the following structure:

struct msgsystem {
    int maxsize;	/* The maximum size of the user part */
    int valid;          /* The buffer valid flag */
    long mtype;		/* Message type, required by msgsnd/msgrcv */
    long msgid;		/* The message ID for this buffer */
    int actualsize;	/* The size of this buffer (after reusing) */
    int source;		/* Sending module ID */
    int destination;	/* Destination module ID */
    int command;	/* The action to perform by the receiver */
}

Change logThe software sources

The current version is 1.2. It is available as a gzipped tar file: msg-1.2.tar.gz. This file will unpack into a subdirectory msg with the source files and a Makefile. To use the Msg* functions, include the header file Msg.h in your source and link your application with Msg.o. Remember to use the correct -I options for the compiler!