This document defines (and tries to justify) the software setup of the Robot software. The chosen setup is based on two assumptions:
The easiest way to implement the inter-module communication is based on polling. At convenient intervals each module asks all other modules if they want to communicate. If so the two modules setup some communication channel and data will be exchanged. This approach has some major drawbacks. Modules spend a lot of time to poll each other and some will have to wait until its partner is ready for communication.
Therefore the more complicated approach based on message passing and queues was chosen. In a message passing system modules communicate by sending each other messages. Every module has a queue behind its input. A module can put a message in an input queue at any time, without having to wait for the receiver to be ready for it. The receiving module reads the message when it is ready with processing the previous message from the queue. (Of course the rate of message generation must not exceed the rate of message processing, but generation and processing don't have to be synchronized.)
Each module needs to implement a so called "message loop" which will get a message from the queue, process it, get the next message, process it and so on. Processing can take as long as it needs, although it might be advisable to start a new thread to handle real lengthy jobs. When the input queue is empty, the execution of the message loop thread will be suspended until a message becomes available. Each module should have only one message loop, so all messages are guaranteed to be processed in the same way.
The single message loop requirement makes it difficult to handle answers as normal messages (questions, orders and notifications). Normal operation is to ask a question and then to wait for the answer. During the wait normal messages should not be removed from the message queue, only the answer to the question should be processed. But the message loop shouldn't remove answers, just normal messages. Therefore we added a queue to each module for answers only.
We decided to implement 8 functions. All functions are (should be at least) thread safe, they don't use static or global variables. Every function returns -1 when an error occurred, something else when there were no problems. Buffers for the messages are dynamically created and destroyed, but are reused if possible. (See below for an explanation how and why.)
This sections contains some pseudo-code examples for typical use of the Message System. The first example shows how to send messages to another module and how to retrieve answers. The second and third examples show how to make a message loop. The second example shows a single threaded module, while the third uses a multi threaded approach.
#include <stdlib.h> #include <stdio.h> #include <Msg.h> #define EXM_CMD 1 #define EXM_QSTN 2 struct ExmParam { int getal; double breuk; double result; }; int main(int argc, char *argv[]) { long mid; struct ExmParam *ptr = NULL; /* a pointer to a buffer must be * initialized to NULL so MsgNewMessage * will allocate a new message */ /* Setup/initialize the message system for this example module */ MsgSetup(EXM_ID); /* Create a message, allocating memory because ptr is NULL */ ERRETVOID(MsgNewMessage(EXM_ID, EXM_CMD, 0, sizeof(struct ExmParam), (void **)&ptr)); /* Set the parameters */ ptr->getal = 4; ptr->breuk = 6.21; /* Send the message, this will invalidate the message */ ERRETVOID(MsgPostMessage((void **)&ptr)); /* Get a new message buffer, reuse the old message and remember the ID */ ERRETVOID(mid = MsgNewMessage(EXM_ID, EXM_QSTN, 0, sizeof(struct ExmParam), (void **)&ptr)); /* Set the parameters */ ptr->getal = 4; ptr->breuk = 6.21; /* Send the message */ ERRETVOID(MsgPostMessage((void **)&ptr)); /* Get the answer to the question indicated by the message ID */ ERRETVOID(MsgGetAnswer(mid, (void **)&ptr, -1)); /* Handle the result */ printf("The answer to my question was: %g\n", ptr->result); /* Free the message buffer */ ERRETVOID(MsgDestroy((void **)&ptr)); }
ERRETVOID is a macro which prints an error message and returns from the current routine if its parameter evaluates to -1. For simplicity this program uses one structure type for the parameters of all messages. Real programs will use different types for each message and therefore will need more typecasting. The same ptr variable should be used in all MsgXxxxx calls to optimize memory access.
The second example is a message loop which handles all messages in a single thread. This is convenient for operations which take little time, or for operations which have to be executed sequentially anyway. The program uses functions to process the messages, so the difference with the multi-threaded implementation will be more clear. All handler functions have the same parameter, the address of the pointer to the buffer. All typecasting, parameter extraction etc. is done in the handler function and the message loop stays clean and short. The pointer is also used to return the answer buffer for question handlers, so the message loop can reuse the buffer.
#include <stdlib.h> #include <stdio.h> #include <unistd.h> #include <Msg.h> #define EXM_CMD 1 #define EXM_QSTN 2 struct ExmParam { int getal; double breuk; double result; }; void ExmCommand(void **vptr); void ExmQuestion(void **vptr); int main(int argc, char *argv[]) { int command, sender; void *ptr = NULL; /* a pointer to a buffer must be initialized * to NULL so MsgGetMessage will allocate a * new buffer */ /* Setup/initialize the message system */ MsgSetup(EXM_ID); /* The message loop, which will run forever */ while (1) { switch (MsgGetMessage(&sender, &ptr, -1)) { case -1: /* Error in MsgGetMessage */ ERRPRINT; exit(1); break; case EXM_CMD: /* The command */ ExmCommand(&ptr); break; case EXM_QSTN: /* The question */ ExmQuestion(&ptr); break; default: /* What? */ fprintf(stderr, "main: Unknown command.\n"); exit(1); } } } void ExmCommand(void **vptr) { struct ExmParam *ptr = (struct ExmParam *)*vptr; /* Instead of typecasting * and dereferencing, * this makes programming * a lot easier */ printf("The command parameters are %d and %g\n", ptr->getal, ptr->breuk); printf("The command will do some time consuming calcultions\n"); sleep(5); printf("The command is ready\n"); } void ExmQuestion(void **vptr) { struct ExmParam *ptr; /* Make a new answer, this will destroy the question after copying */ ERRETVOID(MsgNewAnswer(vptr, (void **)&ptr)); /* Calculate and store the answer in the answer message */ printf("The question is %d * %g, ", ptr->getal, ptr->breuk); ptr->result = ptr->getal * ptr->breuk; printf("the answer is %g\n", ptr->result); /* Send the answer */ ERRETVOID(MsgPostMessage((void **)&ptr)); /* Make the answer message available for reuse by the message loop */ *vptr = ptr; }
The final example creates a new thread for each incoming message. For simplicity this example processes different command/question types in parallel and equal commands/questions sequentially. Before starting a new thread for a specific command/question the message loop waits until the previous thread has completed. For every command type MsgNewAnser is used to make a thread-save version of the buffer. The resulting pointer is used as the argument parameter in the call to pthread_create.
#include <stdlib.h> #include <stdio.h> #include <unistd.h> #include <pthread.h> #include <Msg.h> #define EXM_CMD 1 #define EXM_QSTN 2 struct ExmParam { int getal; double breuk; double result; }; void *ExmCommand(void *vptr); void *ExmQuestion(void *vptr);\ int main(int argc, char *argv[]) { int command, sender; pthread_t cmd = 0, qstn = 0; void *nptr; void *ptr = NULL; /* a pointer to a buffer must be initialized * to NULL so MsgGetMessage will allocate a * new buffer */ /* Setup/initialize the message system */ MsgSetup(EXM_ID); /* The message loop, which will run forever */ while (1) { switch (MsgGetMessage(&sender, &ptr, -1)) { case -1: /* Error in MsgGetMessage */ ERRPRINT; exit(1); break; case EXM_CMD: /* The command */ ERRETVOID(MsgNewAnswer(&ptr, &nptr)); if (cmd) pthread_join(cmd, NULL); pthread_create(&cmd, NULL, ExmCommand, nptr); break; case EXM_QSTN: /* The question */ ERRETVOID(MsgNewAnswer(&ptr, &nptr)); if (qstn) pthread_join(qstn, NULL); pthread_create(&qstn, NULL, ExmQuestion, nptr); break; default: /* What? */ fprintf(stderr, "main: Unknown command.\n"); exit(1); } } } void *ExmCommand(void *vptr) { struct ExmParam *ptr = (struct ExmParam *)vptr; /* Instead of typecasting * and dereferencing, * this makes programming * a lot easier */ printf("The command parameters are %d and %g\n", ptr->getal, ptr->breuk); printf("The command will do some time consuming calcultions\n"); sleep(5); printf("The command is ready\n"); } void *ExmQuestion(void *vptr) { struct ExmParam *ptr = (struct ExmParam *)vptr; /* Instead of typecasting * and dereferencing, * this makes programming * a lot easier */ /* Calculate and store the answer in the answer message */ printf("The question is %d * %g, ", ptr->getal, ptr->breuk); ptr->result = ptr->getal * ptr->breuk; printf("the answer is %g\n", ptr->result); /* Send the answer */ ERRETVOID(MsgPostMessage((void **)&ptr)); /* Destroy the buffer and release it resources */ ERRETVOID(MsgDestroy((void **)&ptr)); }
The difference in output from the second and third examples clearly show the sequential vs. parallel processing of the incoming messages.
The implementation is based on the message queue facility of Linux, although the functions are specified with the shared memory capabilities in mind. So maybe somewhere in the future the basic implementation will change. We tried to make the functions thread-safe, because we expect threads will be used extensively in the Robot software.
Because we don't know the size of the buffers beforehand we have to dynamically allocate and deallocate memory for them. The logical places to do that are:
Allocate | Deallocate |
---|---|
MsgNewMessage MsgNewQuestion |
MsgPostMessage |
MsgGetMessage | MsgGetMessage MsgNewAnswer |
MsgNewAnswer | MsgPostMessage |
MsgGetAnswer | MsgGetAnswer |
Allocating and freeing memory can be very time consuming. Therefore we decided to add an optimization. Buffers will not be freeing in MsgPostMessage, MsgGetMessage, MsgNewAnswer etc., but just marked as invalid. Then if an invalid buffer is passed to MsgNewMessage, MsgGetMessage, MsgNewAnswer etc., it will be used again if large enough. If the used buffer is too small, it will be deallocated. If the buffer is still valid or no buffer is allocated a new buffer will be allocated. This makes it necessary to remember the value of data_buffer c.q. data_out_buffer between calls to MsgPostMessage and MsgNewMessage (and other pairs). Therefore non-automatic variables have to be used to store the pointer. For functions which don't have to be thread-save, these variables can be static or global. For thread-save functions it's more complicated, see the Linux man page for pthread_key_create for a possible solution. As a slower alternative, automatic variables can be used, but these have to be initialized to NULL before the first Msg call, and destroyed by MsgDestroy when the buffer is no longer needed.
Normal messages are put in the queue with mtype 1, questions with mtype 2 (other mtypes are reserved for future use). Messages are removed by msgrcv in FIFO order. Answers are put into the answer queue with their question_id as their mtype. The msgrcv function can easily be used to remove a specific answer (by MsgGetAnswer with a specific message ID) or to remove any answer (by MsgGetAnswer with a 0 message ID).
A message consists of the concatenation of two data areas: the system part and the user part. The structure of the user part is not important for the functions, only the size matters. The system part is the following structure:
struct msgsystem { int maxsize; /* The maximum size of the user part */ int valid; /* The buffer valid flag */ long mtype; /* Message type, required by msgsnd/msgrcv */ long msgid; /* The message ID for this buffer */ int actualsize; /* The size of this buffer (after reusing) */ int source; /* Sending module ID */ int destination; /* Destination module ID */ int command; /* The action to perform by the receiver */ }
The current version is 1.2. It is available as a gzipped tar file: msg-1.2.tar.gz. This file will unpack into a subdirectory msg with the source files and a Makefile. To use the Msg* functions, include the header file Msg.h in your source and link your application with Msg.o. Remember to use the correct -I options for the compiler!