SysV: Message Queue

 

Message Format

struct msgbuf {
    long mtype;    /* type of message */
    char mtext[N]; /* message text */
}

 

Can be thought of as a template for message data

  • mtype is mandatory and must be a positive number
  • mtext is not restricted to holding only arrays of characters, but any data, in any form

Message Format은 보내고싶은 메시지의 템플릿이라고 볼 수 있습니다. SysV의 Message Queue에 들어가는 Message는 구조체로 만들어줘야 하며, mtype은 반드시 양수인 숫자여야합니다. mtext는 보내고픈 데이터의 종류에 따라 data type을 바꿀 수 있습니다.

 

SysV: Creating a Message Queue

 

  • int msgget(key_t key, int flag);

To create a new message queue, or access an existing queue → returns message queue ID

새로운 message queue를 만들거나, 이미 존재하는 message queue에 접근합니다. return값은 해당 message queue의 ID입니다.

 

key

  • System-wide unique indentifier describing the queue
  • Every other process that wants to connect to this queue will have to use the same key
  • Can be hard-coded or use ftok() to avoid a conflict with existing one

system의 측면에서 queue를 나타내는 유일한 값입니다. 해당 message queue에 접근하고픈 process들은 같은 key를 가져야합니다. 이는 하드 코딩이나, ftok() 메소드를 이용해 이미 존재하는 queue와의 충돌을 피할 수 있습니다.

 

flag

  • IPC_CREAT: Create the queue if the key doesn't already exist in the kernel
  • IPC_EXCL: Fail if queue already exists
  • Least significant bits define the permission of the message queue

IPC_CREAT는 만약 그 key값에 해당하는 queue가 kernel에 없다면 queue를 만들고, 이미 존재한다면, 접근합니다. IPC_EXCL은 이미 kernel에 queue가 존재한다면, fail을 return합니다. 마지막 비트들은 message queue의 접근에 대한 허가 정도를 나타냅니다.

 

A message queue created doesn't go away until you destroy it

  • All the processes that have ever used it can quit, but the queue will still exist
  • Removing a message queue in program
    • msgctl() with IPC_RMID
  • Removing a message queue in command line
    • pics command: check if any of your unused message queues exist
    • ipcrm command: destroy a message queue

message queue는 pipe와 달리 (pipe는 두 descriptors들을 close하면 알아서 삭제된다) 어느 프로세스도 message queue를 사용하지 않더라도 사라지지 않습니다. 이는 program상에서는 msgctl() with IPC_RMID, command line상에서는 ipcrm명령어를 통해 삭제할 수 있습니다.

 

SysV: Sending a Message

 

  • int msgsnd(int msgid, const void *ptr, size_t nbytes, int flag);

Delivers a message to a queue

 

The message is inserted at the end of the queue

 

nbytes

  • Length of actual message (mtext in struct msgbuf)

flag

  • IPC_NOWAIT: If the message queue is full, then the message is not written to the queue, and control is returned to the calling process

Return Value

  • On failure function returns -1, otherwise returns 0

인자로 들어가는 ptr은 message buf 구조체의 포인터값을 넣어야합니다. 하지만 nbytes는 실제 전송하는 message의 크기로, mtype이 아닌, mtext의 길이만을 넣습니다. IPC_NOWAIT은 message queue가 꽉 찼을 때, queue가 빌 때 까지 잠자면서 기다리지 않고 그냥 control을 넘기는 flag입니다. pipe의 write과는 달리 쓴 message의 byte크기가 아닌 성공과 실패값을 return합니다. (message oriented)

 

SysV: Receiving a Message

 

  • int msgrcv(int msgid, void *ptr, size_t nbytes, long type, int flag)

Retrieves a message from the queue

 

type

  • If type == 0
    • Retrieve the next message on the queue, regardless of its mtype
  • If type > 0
    • Get the next message with an mtype equal to the specified type
  • If type < 0
    • Retrieve the first message on the queue whose mtype field is less than or equal to the absolute value of the type argument

Return value

  • On failure function returns -1, otherwise returns the number of bytes actually copied

flag (If the message is longer than nbytes)

  • IPC_NOWAIT
  • if MSG_NOERROR is specified
    • the message text will be truncated (and the truncated part will be lost)
  • if MSG_NOERROR is not specified
    • the message isn't removed from the queue and the system call fails returning -1

type인자의 값에 따라 받아오는 message를 어느 정도 정할 수 있습니다. 또한 msgsnd()와 달리 성공할 경우 0이 아닌 읽어들인 message의 bytes수를 return합니다. 실패시 -1을 return하는 것은 동일합니다. 받아오는 message가 message buffer의 크기보다 큰 경우 flag에 따라 결과가 달라질 수 있습니다. 만약 MSG_NOERROR를 킨다면, message buffer크기의 데이터만 받아오고 잘린 데이터는 버립니다. MSG_NOERROR가 꺼져있다면, -1을 return합니다.

 

Example (SysV MQ Sender)

# define MAX_ID 5

int main(void)
{
    key_t ipckey;
    int mqdes, i;
    size_t buf_len;
    struct{
        long id;
        int value;
    } mymsg;
    
    buf_len = sizeof(mymsg.value);
    
    ipckey = ftok("./tmp/foo", 1946);
    mqdes = msgget(ipckey, IPC_CREAT|0600); /* Read/Write */
    if(mqdes < 0){
        perror("msgget()");
        exit(0);
    }
    
    for(i = 0;, i <= MAX_ID; i++){
        mymsg.id = i+1;
        mymsg.value = i*3;
        printf("Sending a message (val: %d, id: %ld)\n", mymsg.value, mymsg.id);
        
        if(msgsnd(mqdes, &mymsg, buf_len, 0) == -1){
            perror("msgsnd()");
            exit(0);
        }
    }
    
    return 0;
}

 

Example (SysV MQ Receiver)

# define MAX_ID 5

int main()
{
    key_t ipckey;
    int mqdes, i;
    size_t buf_len;
    struct{
        long id;
        int value;
    } mymsg;
    
    buf_len = sizeof(mymsg.value);
    
    ipckey = ftok("./tmp/foo", 1946);
    mqdes = msgget(ipckey, IPC_CREAT|0600);
    if(mqdes < 0){
        perror("msgget()");
        exit(0);
    }
    
    for(i = 0; i <= MAX_ID; i++){
        if(msgrcv(mqdes, &mymsg, buf_len, i+1, 0) == -1){
            perror("msgcrv()");
            exit(0);
        }
        else
            printf("Receive a message (val: %d, id: %ld)\n", mymsg.value, mymsg.id);
    }
    
    return 0;
}

 

result

result

 

POSIX: Message Queue

 

POSIX: Creating a Message Queue

 

  • mqd_t mq_open(const char *name, int oflag)

Creates a new message queue or opens an existing queue

 

Return -1 on fail

 

oflag

  • O_RDONLY, O_WRONLY, or O_RDWR
  • O_NONBLOCK: open the queue in nonblocking mode

message queue를 만들거나 존재하는 queue에 접근합니다. 실패할 경우 -1을 return합니다. oflag로는 message queue의 접근에 대한 허가를 제어하는 flag들과 (O_RDONLY, ...) 기존 SysV의 IPC_NOWAIT flag와 달리 (IPC_NOWAIT은 send나 recieve할 때 smgsnd(), smgrcv()의 인자로 flag을 넣어줍니다) O_NONBLOCK은 message queue를 생성할 때 설정해줍니다. 

 

  • mqd_t mq_open(const char *name, int oflag, mode_t mode, struct mq_attr *attr)

If O_CREAT is specified in oflag, then mode and attr arguments must be supplied

 

mode specifies the permissions

 

attr

  • long mq_maxmsg: max # of message on queue
  • long mq_msgsize: max message size (bytes)
  • these cannot larger than their default values
  • Default attributes
    • - /proc/sys/fs/mqueue/msg_max (10)
    • - /proc/sys/fs/mqueue/msgsize_max (8192)
  • Can also be get and set via mq_getattr() and mq_setattr(), respectively

oflag값에 O_CREAT을 넣는다면, 이후에 들어가는 mode와 attr인자를 반드시 넣어주어야합니다. mode는 message queue의 접근과 관련한 permission을 의미하고, attr은 생성되는 message queue특징을 의미하는데, msg queue의 크기와 msg queue 한 칸의 최대 크기를 정할 수 있습니다.

 

POSIX: Sending a Message

 

  • mqd_t mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned prio)

Writes a message to a queue

 

msg_ptr points the message buffer

 

msg_len must be less than or equal to the mq_msgsize attribute

 

Messages are ordered within the queue in descending order of priority (i.e., 0 is the lowest priority)

 

POSIX의 Message queue는 message를 보낼 때, templete이 없이 그냥 buffer로 받을 수 있습니다. 이때 쓰는 데이터의 길이는 mq_msgsize값을 넘을 수 없습니다. priority가 큰 것이 우선순위가 높은 것으로 우선순위가 높은 것이 queue의 앞부분으로 이동합니다. 정상적으로 마무리하면 0을, 실패한다면 -1을 return합니다.

 

POSIX: Receiving a Message

 

  • ssize_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned *msg_prio)

Reads a message from a queue

 

msg_len must be greater than or equal to the mq_msgsize attribute

 

If msg_prio is not NULL, then the buffer to which it points is used to return the priority associated with the received message

 

msg_len값은 mq_msgsize보다 크거나 같아야합니다. 또한 msg_prio의 주소값을 인자로 받는데, 이는 받아오는 message의 priority값을 저장합니다.

 

POSIX: Closing/Removing an MQ

 

  • mqd_t mq_close(mqd_t mqdes)

Closes the message queue descripter mqdes

 

  • mqd_t mq_unlink(const char *name)

Removes the message queue indentified by name

 

Causes the queue to be removed when the last process closes it

 

SysV와 달리 (SysV는 program상에서는 msgctl() with IPC_RMID로, command line상에서는 ipcrm MQID를 통해서 삭제할 수 있습니다) POSIX에서는 mq_close()와 mq_unlink()를 이용해서 MQ를 삭제합니다. 우선 mq_close()하면 해당 프로세스에서는 MQ로의 접근을 종료합니다. 하지만 모든 프로세스가 close했다고 MQ가 삭제되지는 않습니다. 어느 한 프로세스에서 mq_unlink()를 통해 kernel에게 '모든 프로세스가 close하면 MQ를 삭제해달라'는 요청을 해야합니다. 이는 한 번 요청을 해놓으면 kernel이 알아서 모든 프로세스가 close했음을 확인하고 삭제합니다.

 

Example (POSIX MQ Sender)

#define MSG_SIZE 4
#define NAME "/m_queue"
#define MAX_PRIO 5

int main()
{
    struct mq_attr attr;
    int value = 0;
    unsigned int prio;
    mqd_t mqdes;
    
    attr.mq_maxmsg = 10;
    attr.mq_msgsize = MSG_SIZE;
    
    mqdes = mq_open(NAME, O_CREAT|O_WRONLY, 0600, &attr);
    if(mqdes < 0){
        perror("mq_open()");
        exit(0);
    }
    
    for(prio = 0; prio <= MAX_PRIO; prio++){
        printf("Sending a message (val: %d, prio: %ld)\n", value, prio);
        
        if(mq_send(mqdes, (char*)&value, MSG_SIZE, prio) == -1){
            perror("mq_send()");
            break;
        }
        else
            value += 3;
    }
    
    mq_close(mqdes);
    
    return 0;
}

 

Example (POSIX MQ Receiver)

#define MSG_SIZE 4
#define NAME "/m_queue"
#define MAX_PRIO 5

int main()
{
    struct mq_attr attr;
    int value = 0, num_msg = 0;
    unsigned int prio;
    mqd_t mqdes;
    
    attr.mq_maxmsg = 10;
    attr.mq_msgsize = MSG_SIZE;
    
    mqdes = mq_open(NAME, O_CREAT|O_RDWR, 0600, &attr);
    if(mqdes < 0){
        perror("mq_open()");
        exit(0);
    }
    
    while(num_msg <= MAX_PRIO){
        if(mq_receive(mqdes, (char*)&value, MSG_SIZE, &prio) == -1){
            perror("mq_receive()");
            break;
        }
        else{
            printf("Receive a message (val: %d, prio: %ld)\n", value, prio);
            num_msg++;
        }
    }
    
    mq_close(mqdes);
    mq_unlink(NAME);

	return 0;
}

 

result

result

 

IPC + SIGNAL

 

When the pipe is empty or full, read() or write() blocks

  • If the call is interrupted by a signal, the call returns -1 and errno is EINTR

pipe에서 pipe가 비어있을 때, read()를 하거나 pipe가 꽉 차있을 때, write()를 하면 process가 block상태가 되는데, 그 상태에서 SIGNAL이 들어온다면 SIGNAL을 튕겨냅니다. kill()의 return값은 -1이 되고, errno는 EINTR (Error INTeRrupt)가 됩니다.

 

이는 Message Queue의 msgsnd(), msgrcv(), mq_send(), and mq_receive()도 마찬가지로 행동합니다.