Queue Facilities in QNX 4 =========================================================================== In some designs it is advantageous to have a non-blocking form of interprocess communication. Normal QNX messaging (Send(), Receive(), Reply() is the fastest and best approach for synchronous design: Process A will block on Process B until Process B is ready to Receive() the message. At times though, you may wish to have Process A NOT block on a Send() to Process B. In this case you could look at using a queueing mechanism. The queue facilities consist of the following files: README : what you're reading now. queue : the executable for the queue administrator. queue.c : the source for the queue administrator. queuelib.o : an object file that must be linked with any program using the queue routines. queuelib.c : the source to the queue functions. queue.h : a header file that must be included with any program using the queue routines. Makefile : the makefile that will generate the programs. qinfo.c : a sample program that shows how to use queue_info() openq.c : a program that opens some queues (and creates them) writeq.c : a program that writes some sample data to a queue. readq.c : a program that reads some sample data from a queue. The 'queue' administrator ============================================================================ In order to use the queue functions you must first start a queue administrator on at least one node in your network. This program will run as a background process and will buffer the queue data between queue readers and queue writers. To use the queue routines you must run the program QUEUE in background. $ queue [-global] & This process attaches itself to the system as a new administrator process providing queue services to your QNX processes. If you specify the -global flag then the queue will be a network wide service. Any process on any machine in the local area network may open a queue. If you do not specify the global parameter then the queue is local to the machine on which it is run. You may run QUEUE on each node in the network. It is more efficient to access a local queue then one on a remote node. The queue system is implemented using the QNX SEND, RECEIVE and REPLY messaging primitives. If you are starting a new design you should attempt to directly use these three primitives for speed and efficiency. The queue routines should be considered when your design already exists and is based upon queues or you need a buffered non-blocking send capability. Since the queue system is implemented as a QNX administrator process written in C it can be modified and extended to a particular application and need. Once the 'queue' administrator is running, then a program may use the queue services described below: Queue services: ================ Function Description QUEUE_OPEN Open a queue QUEUE_CLOSE Close a queue QUEUE_WRITE Post a message to a queue QUEUE_READ Read a message from a queue QUEUE_INFO Return information on queue QUEUE_PURGE Purge all messages from a queue QUEUE_WAIT Wait for a message to arrive at a queue QUEUE_WAIT_SIG Signal when a message arrives at a queue QUEUE_LAST_STATUS Obtain the status for last signal QUEUE_KILL Remove a queue even if it is in use For some working examples take a look at the sample programs: openq.c, writeq.c, readq.c, qinfo.c ============================================================================== QUEUE_OPEN "Open a queue" ============================================================================== int queue_open(queue_name, open_mode, window_size) char *queue_name; char *open_mode; int window_size; queue_name -Pointer to a string which is the name of the queue. open_mode -A pointer to a string. window_size -Number of messages to buffer before blocking a write Returns: >=0 Identifier id for the opened queue. < 0 Status code as described below. These manifests are described in queue.h #define QSTAT_WAITM -1 Queue has a msg #define QSTAT_WAITW -2 Queue has no writers #define QSTAT_WAITE -3 Queue is empty #define QSTAT_WAITR -4 Queue has no readers #define QSTAT_WAITS -5 Queue has room for msg #define QSTAT_WAITT -6 Queue timeout #define QSTAT_BADMSG -9 Invalid message #define QSTAT_OPEN_MODE -10 Invalid open mode #define QSTAT_OPEN_PERMS -11 Open for r or w denied #define QSTAT_CREATE -12 Unable to create #define QSTAT_MEM -13 No memory for request #define QSTAT_EXIST -14 Queue does not exist #define QSTAT_WINDOW -15 Queue window is full #define QSTAT_REPLY -16 Network error #define QSTAT_NAME -17 Invalid queue name #define QSTAT_FIND -18 Can't locate Queue administrator #define QSTAT_SLOT -19 Too many open queues #define QSTAT_SEND -20 Send failed #define QSTAT_MSGSIZE -21 Message to big for read. #define QSTAT_INTERNAL -22 Internal error. Examples: queue_id = queue_open("ottawa", "cprW", 100); /* Global queue */ queue_id = queue_open("ottawa", "w", 100); /* Global queue */ queue_id = queue_open("//0/ottawa", "w", 10); /* Local queue */ queue_id = queue_open("//3/ottawa", "w", 2); /* Node 3 queue */ QUEUE_OPEN opens a queue for writing messages to or reading messages from. It provides a means of sending and receiving messages in a manner similar to mailboxes. Each queue has a name associated with it ... other processes may then read or write messages to this name. The queue name can be up to 16 characters in length and may contain any character other than a null ('\0'). If the name starts with a node number //node_num/ then the queue will be opened on the indicated node. There must be a QUEUE administrator running on that node. A node number of zero will always refer to the local node. If a node specifier is not present then the global queue will be used. For example. "mailbox" -Use global QUEUE administrator. "//0/mailbox" -Use local QUEUE administrator (on current node) "//3/mailbox" -Use QUEUE administrator on node 3. If the queue does not exist already the open will fail unless the 'create' open mode ("c") is set. The open mode may contain the following characters. Letter Meaning r Open the queue for read. You may call QUEUE_READ not QUEUE_WRITE. w Open the queue for write. You may call QUEUE_WRITE not QUEUE_READ. c If the queue does not exist create it. The following options are only looked at when the queue is created ("c"). t The queue is temporary. Remove it and all messages when the process which created it dies. This is the default if none of t,p or h are specified. p The queue is permanent. It will survive any processes' death. It can only be removed by the QUEUE_REMOVE function. h The queue will be held around as long as it contains a message or is opened by at least one process. b Treat the queue as a byte stream rather than a queue of complete messages. This affects the operation of QUEUE_READ only. R Allow other processes to open the queue for read. W Allow other processes to open the queue for write. Once open, the returned queue id may be used to write messages to the queue and read messages placed in the queue based upon the open mode. If the queue is opened for neither read nor write then you may still use the routines QUEUE_WAIT and QUEUE_INFO. The window_size determines the number of messages to buffer before a QUEUE_WRITE operation will block or fail. A value of zero will cause each write to block until a process reads the message using QUEUE_READ. This implements very tight coupling. Larger values allow for a more elastic coupling. This value can be used to flow control processes adding data to the queue. The following examples illustrate some typical applications. Data Flow Buffering: One process is collecting data from a serial link and another is writing the data to a hard disk. The data arrives as a series of messages from the serial line. The average speed of the disk is faster than the serial line but there may be short periods where the messages arrive faster than the disk can save them. The queue routines may be used as a buffering agent to even out the flow to the disk. Serial ÚÄÄÄÄÄÄ¿ write ÚÄÄÄÄÄÄ¿ read ÚÄÄÄÄÄÄ¿ Line ==>³Serial³ÄÄÄÄÄÄ>³ Mbox ³<ÄÄÄÄÄÄij Disk ³===> Disk ³Process ³ ³ ³Process ÀÄÄÄÄÄÄÙ ÀÄÄÄÄÄÄÙ ÀÄÄÄÄÄÄÙ Transaction Processing: One or more processes are generating transactions (producers) and one or more processes are processing them (consumers). Note that in this example you may simplify either side to a single producer/consumer. Also note that the processes do not have to be on the same node (machine). ÚÄÄÄÄÄÄÄÄ¿ write ÚÄÄÄÄÄÄ¿ read ÚÄÄÄÄÄÄÄÄ¿ ³Producer³ÄÄÄÄÄÄÂÄÄÄ>³ Mbox ³<ÄÄÄÂÄÄÄÄÄijConsumer³ ÀÄÄÄÄÄÄÄÄÙ ³ ÀÄÄÄÄÄÄÙ ³ ÀÄÄÄÄÄÄÄÄÙ ÚÄÄÄÄÄÄÄÄ¿ write³ ³ read ÚÄÄÄÄÄÄÄÄ¿ ³Producer³ÄÄÄÄÄÄÙ ÀÄÄÄÄÄijConsumer³ ÀÄÄÄÄÄÄÄÄÙ ÀÄÄÄÄÄÄÄÄÙ ============================================================================== QUEUE_CLOSE "Close a queue" ============================================================================== int queue_close(queue_id) int queue_id; Arguments: queue_id -The queue id returned from QUEUE_OPEN. Returns: = 0 If the queue was closed. < 0 Status code as described in QUEUE_OPEN. Example: status = queue_close(queue_id); QUEUE_CLOSE will close a previously open queue. One of the following actions will occur based on the original open mode. "t - Temporary" If this was the process which created the queue then all waiting messages are removed. If any processes are waiting for messages on that queue they will be unblocked. The queue is then removed. "h - Hold" If no other process has the queue open and the queue contains no messages then any processes waiting for messages on that queue will be unblocked and the queue is removed. If there waiting messages, the queue will be removed when the last message is read. "p - Permanent" The queue is not effected. In all cases any waits in effect which are sensitive to number of readers/writers will be triggered if the count of readers/writers goes to zero. ============================================================================== QUEUE_WRITE "Post a message to a queue" ============================================================================== int queue_write(queue_id, &msg, msg_size, priority); int queue_id; void *msg; int msg_size, priority; Arguments: queue_id -The queue id returned from QUEUE_OPEN. msg -Pointer to your message. Typically a structure of data. msg_size -Number of bytes in the message. Maximum of 2000. priority -Priority insertion (0=highest). Return Values: = 0 If the message was posted. < 0 Status code as described in QUEUE_OPEN. Example: struct template { char queue_header[QHDR_SIZE]; char data[100]; } msg; status = queue_write(queue_id, &msg, 100, 0); QUEUE_WRITE posts a message to the indicated queue. This routine assumes that the buffer you pass it contains a header that is at least QHDR_SIZE (defined in queue.h) in size. The header is used by the library routines for establishing the message pass. At the time of this writing the size of the header is 15 bytes ( sizeof(struct queuehdr) as defined in queue.h). Your message starts at byte QHDR_SIZE+1 of the message. The 'msg_size' argument does not include the header. ÚÄÄÄÄÄÄÄÄÄÄÄÄÄÄÄÄÄÄÄÄÄÄÄÄÂÄÄÄÄÄÄÄÄÄÄ ³ QHDR_SIZE bytes .......... ÀÄÄÄÄÄÄÄÄÄÄÄÄÄÄÄÄÄÄÄÄÄÄÄÄÁÄÄÄÄÄÄÄÄÄÄ ÃÄÄ header ÄÅÄ msg data The message portion starting at byte QHDR_SIZE+1 is treated as a sequence of bytes with no data structure assumed. e.g. #include #include struct template { char qheader[QHDR_SIZE]; char data[100]; } msg; main() { int qid; int priority = 10; /* 0 is highest, 32767 is lowest */ qid = queue_open("test1","w",100); if ( qid >= 0 ) { sprintf(msg.data,"This data will be added to the queue\n"); ret = queue_write(qid, &msg, 100, priority); } ============================================================================== QUEUE_READ "Read a message from a queue" ============================================================================== int queue_read(queue_id, wait_type, &msg, msg_size); int queue_id; int wait_type; void *msg; int msg_size; Arguments: queue_id -The queue id returned from QUEUE_OPEN. wait_type -Wait/Query for a message. msg -Pointer to where you want the message. msg_size -Maximum size message to read. Maximum of 2000. Return Values: >=0 Number of bytes read. < 0 Status code as described in QUEUE_OPEN. Example: struct template { char queue_header[QHDR_SIZE]; .... } msg; nbytes = queue_read(queue_id, 0x0201, &msg, sizeof(msg)); QUEUE_READ reads a message which was posted to a queue. If more than one message has been posted to the queue they are read in the order in which they were posted. The parameter 'wait_type' can be used to select one of several events to wait on. The special value of zero can be used to try and read the queue for a message without waiting. Note that this wait_type is a reduced version of that used by QUEUE_WAIT. wait_type ÚÄÄÄÄÄÄÄÄÄÄÄÄÄÄÄÂÄÄÄÄÄÄÄÄÄÄÄÄÄÄÄ¿ ³t t t t t t t t³0 0 0 0 0 0 w m³ ÀÄÁÄÁÄÁÄÁÄÁÄÁÄÁÄÁÄÁÄÁÄÁÄÁÄÁÄÁÄÁÄÙ 15 8 7 0 m- Wait until the queue contains a message. w- Wait until there are no writers. No process has the queue open for write. t- A timeout value in seconds. Zero means no timeout. You may use the 'wait_type' inplace of a separate call to QUEUE_WAIT. if(queue_wait(queue_id, 0x0001) == QSTAT_WAITM) nbytes = queue_read(queue_id, 0x0000, &msg, sizeof(msg)); or nbytes = queue_read(queue_id, 0x0001, &msg, sizeof(msg)); If more than one process performs a read with wait, they are queued in time order. When a message is posted the first process in the queue will receive the message. The others will not unblock. This routine assumes that the buffer you pass it contains a header that is at least QHDR_SIZE (defined in queue.h) in size. The header is used by the library routines for establishing the message pass. At the time of this writing the size of the header is 15 bytes ( sizeof(struct queuehdr) as defined in queue.h). Your message starts at byte QHDR_SIZE+1 of the message. The 'msg_size' argument does not include the header. ÚÄÄÄÄÄÄÄÄÄÄÄÄÄÄÄÄÄÄÄÄÄÄÄÄÂÄÄÄÄÄÄÄÄÄÄ ³ QHDR_SIZE bytes .......... ÀÄÄÄÄÄÄÄÄÄÄÄÄÄÄÄÄÄÄÄÄÄÄÄÄÁÄÄÄÄÄÄÄÄÄÄ ÃÄÄ header ÄÅÄ msg data The message portion starting at byte QHDR_SIZE+1 is treated as a sequence of bytes with no data structure assumed. e.g. #include #include struct template { char qheader[QHDR_SIZE]; char data[100]; } msg; main() { int qid; qid = queue_open("test1","r",100); if ( qid >= 0 ) ret = queue_read(qid, &msg, 100); /* read the message up to 100 bytes */ } If the queue was created as a byte queue (created with "b") then you may read part of a message placed in the queue by QUEUE_WRITE. This will happen when the message in the queue is larger than 'msg_size' given by the read. Your next read will continue where the last left off. In no case will a read cross a message boundary. If there are two messages in the queue of 10 bytes each and you ask for 20 bytes you will only read 10 bytes on the first read and 10 bytes on the second read. If the queue was not created with the byte option then the message will not be read unless you provide a buffer large enough to read the entire message with one read. In this case a queue status indicating that the message was too big will be returned (QSTAT_MSGSIZE). ============================================================================== QUEUE_INFO "Return information on queue" ============================================================================== #include queue_info(nid, queue_id, data) nid_t nid; int queue_id; struct { char queue_header[QHDR_SIZE]; struct queue_info info_data; } *data; Arguments: nid -Node number where the queue administrator is running. queue_id -Starting queue id. data -Pointer to where to store information returned. Returns: >=0 First queue id in use. < 0 Status code as described in QUEUE_OPEN. Example: #include #include struct { char queue_header[QHDR_SIZE]; struct queue_info info_data; } data; int id; for ( id=0; id >= 0; ++id ) { if((id = queue_info(nid, id, &data)) >= 0) { printf("Queue id number %d is called %s and has %d messages unread\n", id, data.info_data.queue_info_name, data.info_data.queue_info_nmsgs); } else break; } QUEUE_INFO can be used to determine the global status of the queue system. You provide the starting queue id and it will return the first queue id greater than or equal to the one you provided that is in use. The information returned includes the queue name, the number of posted messages waiting and a list of up to 20 waiting processes. A waiting process is defined as a process that is trying to read from the queue (and there is no data there for it to read so it is blocked). This routine may be used in a utility program to display the current state of the queue. This may aid in the debugging of a complex queue application. See the structure 'queue_info' in queue.h for more details. Also see the sample program 'qinfo.c' for a working example. ============================================================================== QUEUE_PURGE "Purge all messages from a queue" ============================================================================== int queue_purge(queue_id) int queue_id; Arguments: queue_id -The queue id returned from QUEUE_OPEN. Returns: = 0 If the queue was purged. < 0 Status code as described in QUEUE_OPEN. QUEUE_PURGE purges all messages in the queue. You must have the queue open for read in order to purge the queue. ============================================================================== QUEUE_WAIT "Wait for a message to arrive at a queue" ============================================================================== int queue_wait(queue_id, wait_type) int queue_id; int wait_type; Arguments: queue_id -The queue id returned from QUEUE_OPEN. wait_type -What to wait on. Return Values: Status code as described in QUEUE_OPEN. Example: status = queue_wait(queue_id, 0x0001); QUEUE_WAIT will cause your process to block until an event occurs at the queue. The parameter 'wait_type' can be used to select one of several events to wait on. The special value of zero can be used to query the queue for a message without waiting. wait_type ÚÄÄÄÄÄÄÄÄÄÄÄÄÄÄÄÂÄÄÄÄÄÄÄÄÄÄÄÄÄÄÄ¿ ³t t t t t t t t³0 0 0 s r e w m³ ÀÄÁÄÁÄÁÄÁÄÁÄÁÄÁÄÁÄÁÄÁÄÁÄÁÄÁÄÁÄÁÄÙ 15 8 7 0 m- Wait until the queue contains a message. w- Wait until there are no writers. No process has the queue open for write. e- Wait until the queue is empty. r- Wait until there are no readers. No process has the queue open for read. s- Wait until there is room in the queue based upon the window size. t- A timeout value in seconds. Zero means no timeout. If the event is already present QUEUE_WAIT will return right away. If several processes all wait on the same queue for a message using QUEUE_WAIT they will all unblock and the first process that will be scheduled to run will be the process with the highest priority that issued the first wait. ============================================================================== QUEUE_WAIT_SIG "Signal when a message arrives at a queue" ============================================================================== int queue_wait_sig(queue_id, wait_type, proxy) int queue_id, wait_type; pid_t proxy; Arguments: queue_id -The queue id returned from QUEUE_OPEN. wait_type -What to wait on. proxy -Proxy to trigger when wait expires. Returns: Status code as described in QUEUE_OPEN. Example: status = queue_wait_sig(queue_id, 0x0001, proxy); QUEUE_WAIT_SIG will cause a proxy to be triggered when an event occurs at the queue. It is similar in operation to QUEUE_WAIT except it does not block. This allows a program to wait for a message or a trigger of a proxy. The parameter 'wait_type' can be used to select one of several events to wait on. wait_type ÚÄÄÄÄÄÄÄÄÄÄÄÄÄÄÄÂÄÄÄÄÄÄÄÄÄÄÄÄÄÄÄ¿ ³0 0 0 0 0 0 0 0³0 0 0 0 0 0 0 m³ ÀÄÁÄÁÄÁÄÁÄÁÄÁÄÁÄÁÄÁÄÁÄÁÄÁÄÁÄÁÄÁÄÙ 15 8 7 0 m- Trigger the proxy when the queue contains a message. e.g. proxy = qnx_proxy_attach( 0,0,0,0 ); int wait_bits = 0x0001; queue_wait_sig(queue_id, wait_bits, proxy); pid = Receive(0, &msg, MAX_SIZE); if(pid == proxy) { /* something happened on the queue */ status = queue_last_status(queue_id); /* check to see what it was */ } else { /* Received a normal message */ } ============================================================================== QUEUE_LAST_STATUS "Obtain the status for last signal" ============================================================================== int queue_last_status(queue_id) int queue_id; Arguments: queue_id -The queue id returned from QUEUE_OPEN. Return Values: Status code as described in QUEUE_OPEN. Example: status = queue_last_status(queue_id); QUEUE_LAST_STATUS is used to obtain the status for the last trigger of a proxy caused by QUEUE_WAIT_SIG. The trigger itself cannot return a status so it is necessary to explicitly request it. ============================================================================== QUEUE_KILL "Remove a queue even if it is in use" ============================================================================== int queue_kill(queue_id) int queue_id; Arguments: queue_id -The queue id returned from QUEUE_OPEN. Return Values: = 0 If the queue is removed. < 0 Status code as described in QUEUE_OPEN. Example: queue_kill(queue_id); QUEUE_KILL will remove a previously open queue. The queue and all waiting messages are removed. If any processes are waiting for messages on that queue they will be unblocked. This is a rather drastic action. QUEUE_CLOSE is the normal way to unlink from a queue. This routine is the only way to remove a permanent queue. ============================================================================== Other Technical Notes of Interest: ================================== 1. 'Queue' uses malloc() to store the queued data. 'Queue' is a 32-bit small model program. It will grow its data/code segment in increments of 8K at a time. So if you add a queue entry that is 200 bytes long, and 'queue' needs to grow its data segment to fit it, then the data segment will grow by 8K. If 'queue' cannot malloc() room for the data then an error of QSTAT_MEM will be returned to the queue_write() function. In system designs where memory usage is critical and you cannot have the 'queue' process growing its data segment to fill available memory, then you should either look at: A. restricting the number of queue entries allowed for the queue (with the argument on queue_open()) B. add code to the queue_post routine in the 'queue' source to restrict how much memory you can malloc() in total. 2. The routines dealing with triggering a proxy have not been extensively tested. Most testing concerned reading/writing to the queue. 3. A special priority value exists on queue_write(). Normal priority is 0 (highest) to 32767 (lowest). If you use a priority of -1 then the queue_entry will be IMMEDIATELY added to the bottom of the queue without having to walk the queue_list in order to put it in priority order. This time saving in not having to walk the queue_list could be important in some situations.