This is the mail archive of the ecos-patches@sources.redhat.com mailing list for the eCos project.


Index Nav: [Date Index] [Subject Index] [Author Index] [Thread Index]
Message Nav: [Date Prev] [Date Next] [Thread Prev] [Thread Next]
Other format: [Raw text]

Re: mq_timedsend & mq_timedreceive


Dmitriy Korovkin wrote:
Hi,
In my work on porting ISaGRAF PRO target to eCos I found that message queue send and receive with timeout are required. So I've added functions mq_timedsend and mq_timedreceive defined by POSIX 1003.1d draft. So, patches to isoinfra, posix and kernel packages a attached. Your coments are highly appreciated.
It seems a bit odd to me for POSIX 1003.1d to do this because there's plenty of ways to wake up a blocked function using signals in POSIX. I'm also generally loath to include functions only in a draft, although there's probably no harm as long as people are prepared to accept that the draft may change!

But no matter. Despite the apparent size of the patch, I don't believe a copyright assignment is required (fortunately!) since almost all is based on the existing mq_send/receive code. However could I ask you in future to a) include ChangeLog entries for each changed package - I have written them for you this time; b) generate patches with "cvs -q diff -u5 -p". For this it may be easiest to add:
diff -u5 -p
to your ~/.cvsrc file.

I made a few tweaks, so I'm including the patch in full again here for the record.

Thanks for contributing back!

Jifl
--
eCosCentric http://www.eCosCentric.com/ <info@eCosCentric.com>
--[ "You can complain because roses have thorns, or you ]--
--[ can rejoice because thorns have roses." -Lincoln ]-- Opinions==mine
Index: compat/posix/current/doc/posix.sgml
===================================================================
RCS file: /cvs/ecos/ecos/packages/compat/posix/current/doc/posix.sgml,v
retrieving revision 1.2
diff -u -5 -p -r1.2 posix.sgml
--- compat/posix/current/doc/posix.sgml	15 Sep 2002 22:09:21 -0000	1.2
+++ compat/posix/current/doc/posix.sgml	13 Jan 2003 05:45:47 -0000
@@ -1114,10 +1114,19 @@ ssize&lowbar;t mq&lowbar;receive( mqd&lo
 int mq&lowbar;setattr( mqd&lowbar;t mqdes, const struct mq&lowbar;attr &ast;mqstat,
 	        struct mq&lowbar;attr &ast;omqstat ); 
 int mq&lowbar;getattr( mqd&lowbar;t mqdes, struct mq&lowbar;attr &ast;mqstat ); 
 int mq&lowbar;notify( mqd&lowbar;t mqdes, const struct sigevent &ast;notification );
 </screen>
+<para>From POSIX 1003.1d draft: </para>
+<screen>
+int mq&lowbar;send( mqd&lowbar;t mqdes, const char &ast;msg&lowbar;ptr,
+	     size&lowbar;t msg&lowbar;len, unsigned int msg&lowbar;prio,
+             const struct timespec *abs_timeout ); 
+ssize&lowbar;t mq&lowbar;receive( mqd&lowbar;t mqdes, char &ast;msg&lowbar;ptr,
+	            size&lowbar;t msg&lowbar;len, unsigned int &ast;msg&lowbar;prio,
+             const struct timespec *abs_timeout );
+</screen>
 </sect2>
 
 <!-- =================================================================== -->
 
 <sect2>
Index: compat/posix/current/ChangeLog
===================================================================
RCS file: /cvs/ecos/ecos/packages/compat/posix/current/ChangeLog,v
retrieving revision 1.31
diff -u -5 -p -r1.31 ChangeLog
--- compat/posix/current/ChangeLog	10 Dec 2002 02:28:21 -0000	1.31
+++ compat/posix/current/ChangeLog	13 Jan 2003 05:45:47 -0000
@@ -1,5 +1,19 @@
+2003-01-13  Jonathan Larmour  <jifl@eCosCentric.com>
+
+	* doc/posix.sgml: Document them.
+
+	* src/mqueue.cxx (mq_timedreceive): Make fully compliant by dealing
+	with bogus timeouts.
+	(mq_timedsend): Ditto.
+
+2003-01-13  Dmitriy Korovkin  <dkorovkin@rambler.ru>
+
+	* src/mqueue.cxx (mq_timedsend): New function. Implementing POSIX
+	1003.1d draft definition.
+	(mq_timedreceive): Ditto.
+
 2002-12-10  Wade Jensen  <waj4news@cox.net>
 2002-12-10  Jonathan Larmour  <jifl@eCosCentric.com>
 
 	* src/mutex.cxx (pthread_cond_timedwait): Initialize clock converters
 	only once ever.
Index: compat/posix/current/src/mqueue.cxx
===================================================================
RCS file: /cvs/ecos/ecos/packages/compat/posix/current/src/mqueue.cxx,v
retrieving revision 1.5
diff -u -5 -p -r1.5 mqueue.cxx
--- compat/posix/current/src/mqueue.cxx	23 May 2002 22:59:59 -0000	1.5
+++ compat/posix/current/src/mqueue.cxx	13 Jan 2003 05:45:48 -0000
@@ -82,11 +82,14 @@
 #include <new>                       // C++ new
 #ifdef CYGFUN_POSIX_MQUEUE_NOTIFY
 # include <signal.h>
 # include "pprivate.h"               // cyg_sigqueue()
 #endif
-
+#ifdef CYGFUN_KERNEL_THREADS_TIMER
+# include <time.h>
+# include "pprivate.h"               // cyg_timespec_to_ticks()
+#endif
 
 /* CONSTANTS */
 
 #define MQ_VALID_MAGIC  0x6db256c1
 
@@ -669,11 +672,190 @@ mq_receive( mqd_t mqdes, char *msg_ptr, 
         return (ssize_t)-1; // keep compiler happy
     } // switch
     
 } // mq_receive()
 
+
+//------------------------------------------------------------------------
+#ifdef CYGFUN_KERNEL_THREADS_TIMER
+externC int
+mq_timedsend( mqd_t mqdes, const char *msg_ptr, size_t msg_len,
+              unsigned int msg_prio, const struct timespec *abs_timeout)
+{
+    CYG_REPORT_FUNCTYPE( "returning %d" );
+    CYG_REPORT_FUNCARG6( "mqdes=%08x, msg_ptr=%08x, msg_len=%u, msg_prio=%u,
+	                      abs_timeout = %lu, %ld",
+                         mqdes, msg_ptr, msg_len, msg_prio, 
+                         abs_timeout->tv_sec, abs_timeout->tv_nsec);
+    CYG_CHECK_DATA_PTRC( msg_ptr );
+    
+    struct mquser *user = (struct mquser *)mqdes;
+    struct mqtabent *tabent = user->tabent;
+
+#ifdef CYGIMP_POSIX_MQUEUE_VALIDATE_DESCRIPTOR
+    if ( user->magic != MQ_VALID_MAGIC ) {
+        errno  = EBADF;
+        CYG_REPORT_RETVAL( -1 );
+        return -1;
+    }
+#endif
+    
+    if ( msg_len > (size_t)tabent->msgsize ) {
+        errno = EMSGSIZE;
+        CYG_REPORT_RETVAL( -1 );
+        return -1;
+    }
+
+    if ( msg_prio > MQ_PRIO_MAX ) {
+        errno = EINVAL;
+        CYG_REPORT_RETVAL( -1 );
+        return -1;
+    }
+
+    if ( (O_WRONLY != (user->flags & O_WRONLY)) && 
+         (O_RDWR != (user->flags & O_RDWR)) ) {
+        errno = EBADF;
+        CYG_REPORT_RETVAL( -1 );
+        return -1;
+    }
+
+    // go for it
+    Cyg_Mqueue::qerr_t err;
+    bool nonblocking = ((user->flags & O_NONBLOCK) == O_NONBLOCK);
+    bool badtimespec = (abs_timeout->tv_nsec < 0) ||
+        (abs_timeout->tv_nsec > 999999999l);
+    err = tabent->mq->put( msg_ptr, msg_len, msg_prio,
+                           !nonblocking && !badtimespec,
+                           cyg_timespec_to_ticks(abs_timeout));
+    switch (err) {
+
+    case Cyg_Mqueue::INTR:
+        errno = EINTR;
+        CYG_REPORT_RETVAL( -1 );
+        return -1;
+
+    case Cyg_Mqueue::WOULDBLOCK:
+        if (badtimespec) {
+            errno = EINVAL;
+            CYG_REPORT_RETVAL( -1 );
+            return -1;
+        }
+        CYG_ASSERT( (user->flags & O_NONBLOCK) == O_NONBLOCK,
+                    "Message queue assumed non-blocking when blocking requested"
+            );
+        errno = EAGAIN;
+        CYG_REPORT_RETVAL( -1 );
+        return -1;
+
+    case Cyg_Mqueue::TIMEOUT:
+        errno = ETIMEDOUT;
+        CYG_REPORT_RETVAL( -1 );
+        return -1;
+        
+    case Cyg_Mqueue::OK:
+        CYG_REPORT_RETVAL( 0 );
+        return 0;
+
+    default:
+        CYG_FAIL( "unhandled message queue return code" );
+        return -1; // keep compiler happy
+    } // switch
+} // mq_timedsend()
+
+//------------------------------------------------------------------------
+
+
+externC ssize_t
+mq_timedreceive( mqd_t mqdes, char *msg_ptr, size_t msg_len,
+            unsigned int *msg_prio, const struct timespec *abs_timeout)
+{
+    CYG_REPORT_FUNCTYPE( "returning %ld" );
+    CYG_REPORT_FUNCARG6( "mqdes=%08x, msg_ptr=%08x, msg_len=%u, msg_prio=%08x,
+	                      abs_timeout = %lu, %ld",
+                         mqdes, msg_ptr, msg_len, msg_prio,
+                         abs_timeout->tv_sec, abs_timeout->tv_nsec );
+    CYG_CHECK_DATA_PTRC( msg_ptr );
+    CYG_CHECK_DATA_PTRC( msg_ptr+msg_len-1 );
+    if ( NULL != msg_prio )
+        CYG_CHECK_DATA_PTRC( msg_prio );
+    
+    
+    struct mquser *user = (struct mquser *)mqdes;
+    struct mqtabent *tabent = user->tabent;
+
+#ifdef CYGIMP_POSIX_MQUEUE_VALIDATE_DESCRIPTOR
+    if ( user->magic != MQ_VALID_MAGIC ) {
+        errno  = EBADF;
+        CYG_REPORT_RETVAL( -1 );
+        return (ssize_t)-1;
+    }
+#endif
+    
+    if ( (O_RDONLY != (user->flags & O_RDONLY)) && 
+         (O_RDWR != (user->flags & O_RDWR)) ) {
+        errno = EBADF;
+        CYG_REPORT_RETVAL( -1 );
+        return (ssize_t)-1;
+    }
+
+    if ( msg_len < (size_t)tabent->msgsize ) {
+        errno = EMSGSIZE;
+        CYG_REPORT_RETVAL( -1 );
+        return (ssize_t)-1;
+    }
+
+    // go for it
+    Cyg_Mqueue::qerr_t err;
+    bool nonblocking = ((user->flags & O_NONBLOCK) == O_NONBLOCK);
+    bool badtimespec = (abs_timeout->tv_nsec < 0) ||
+        (abs_timeout->tv_nsec > 999999999l);
+    err = tabent->mq->get( msg_ptr, &msg_len, msg_prio,
+                           !nonblocking && !badtimespec,
+                           cyg_timespec_to_ticks(abs_timeout) );
+    switch (err) {
+
+    case Cyg_Mqueue::INTR:
+        errno = EINTR;
+        CYG_REPORT_RETVAL( -1 );
+        return (ssize_t)-1;
+
+    case Cyg_Mqueue::WOULDBLOCK:
+        if (badtimespec) {
+            errno = EINVAL;
+            CYG_REPORT_RETVAL( -1 );
+            return -1;
+        }
+        CYG_ASSERT( (user->flags & O_NONBLOCK) == O_NONBLOCK,
+                    "Message queue assumed non-blocking when blocking requested"
+            );
+        errno = EAGAIN;
+        CYG_REPORT_RETVAL( -1 );
+        return (ssize_t)-1;
+
+    case Cyg_Mqueue::TIMEOUT:
+        errno = ETIMEDOUT;
+        CYG_REPORT_RETVAL( -1 );
+        return -1;
+        
+    case Cyg_Mqueue::OK:
+        CYG_ASSERT( msg_len <= (size_t)tabent->msgsize,
+                    "returned message too long" );
+        if ( NULL != msg_prio )
+            CYG_ASSERT( *msg_prio <= MQ_PRIO_MAX,
+                        "returned message has invalid priority" );
+        CYG_REPORT_RETVAL( msg_len );
+        return (ssize_t)msg_len;
+
+    default:
+        CYG_FAIL( "unhandled message queue return code" );
+        return (ssize_t)-1; // keep compiler happy
+    } // switch
+    
+} // mq_timedreceive()
+
 //------------------------------------------------------------------------
+#endif
 
 #ifdef CYGFUN_POSIX_MQUEUE_NOTIFY
 
 externC int
 mq_notify( mqd_t mqdes, const struct sigevent *notification )
Index: isoinfra/current/include/mqueue.h
===================================================================
RCS file: /cvs/ecos/ecos/packages/isoinfra/current/include/mqueue.h,v
retrieving revision 1.3
diff -u -5 -p -r1.3 mqueue.h
--- isoinfra/current/include/mqueue.h	23 May 2002 23:06:41 -0000	1.3
+++ isoinfra/current/include/mqueue.h	13 Jan 2003 05:45:48 -0000
@@ -57,10 +57,11 @@
 */
 
 /* CONFIGURATION */
 
 #include <pkgconf/isoinfra.h>          /* Configuration header */
+#include <pkgconf/kernel.h>            /* CYGFUN_KERNEL_THREADS_TIMER */
 
 /* INCLUDES */
 
 #ifdef _POSIX_MESSAGE_PASSING
 # ifdef CYGBLD_ISO_MQUEUE_HEADER
@@ -115,10 +116,26 @@ extern int
 mq_setattr( mqd_t /* mqdes */, const struct mq_attr * /* mqstat */,
             struct mq_attr * /* omqstat */ );
 
 extern int
 mq_getattr( mqd_t /* mqdes */, struct mq_attr * /* mqstat */ );
+
+# ifdef CYGFUN_KERNEL_THREADS_TIMER
+/* POSIX 1003.1d Draft functions - FIXME: should be conditionalized */
+
+struct timespec; /* forward declaration */
+
+extern int 
+mq_timedsend( mqd_t /* mqdes */, const char * /* msg_ptr */, 
+              size_t /* msg_len */, unsigned int /* msg_prio */,
+              const struct timespec * /* abs_timeout */ );
+
+extern ssize_t 
+mq_timedreceive( mqd_t /* mqdes */, char * /* msg_ptr */, 
+                 size_t /* msg_len */, unsigned int * /* msg_prio */,
+                 const struct timespec * /* abs_timeout */ );
+# endif
 
 #ifdef __cplusplus
 }   /* extern "C" */
 #endif
 
Index: isoinfra/current/ChangeLog
===================================================================
RCS file: /cvs/ecos/ecos/packages/isoinfra/current/ChangeLog,v
retrieving revision 1.19
diff -u -5 -p -r1.19 ChangeLog
--- isoinfra/current/ChangeLog	15 Dec 2002 21:17:17 -0000	1.19
+++ isoinfra/current/ChangeLog	13 Jan 2003 05:45:48 -0000
@@ -1,5 +1,10 @@
+2003-01-13  Jonathan Larmour  <jifl@eCosCentric.com>
+
+	* include/mqueue.h: Define POSIX 1003.1d draft mq_timedsend() and
+	mq_timedreceive() functions.
+
 2002-12-12  Bart Veer  <bartv@ecoscentric.com>
 
 	* include/stdlib.h: #include cyg_type.h, now that this header
 	uses NORET attributes
 
Index: kernel/current/include/mqueue.hxx
===================================================================
RCS file: /cvs/ecos/ecos/packages/kernel/current/include/mqueue.hxx,v
retrieving revision 1.4
diff -u -5 -p -r1.4 mqueue.hxx
--- kernel/current/include/mqueue.hxx	23 May 2002 23:06:48 -0000	1.4
+++ kernel/current/include/mqueue.hxx	13 Jan 2003 05:45:48 -0000
@@ -67,10 +67,11 @@
 
 #include <stddef.h>                  /* size_t */
 #include <cyg/infra/cyg_type.h>      /* Types */
 #include <cyg/infra/cyg_ass.h>       /* CYGDBG_DEFINE_CHECK_THIS,
                                         CYGDBG_USE_ASSERTS */
+#include <cyg/kernel/ktypes.h>       /* Kernel package types */
 #include <cyg/kernel/sema.hxx>       /* Cyg_Counting_Semaphore */
 
 /* CLASSES */
 
 class Cyg_Mqueue {
@@ -81,10 +82,13 @@ public:
 
     typedef enum {
         OK=0,
         NOMEM,
         WOULDBLOCK,
+#ifdef CYGFUN_KERNEL_THREADS_TIMER
+        TIMEOUT,
+#endif
         INTR
     } qerr_t;
 
 protected:
     struct qentry {
@@ -120,19 +124,25 @@ protected:
 public:
 
     Cyg_Mqueue( long maxmsgs, long maxmsgsize,
                 qalloc_fn_t qalloc, qfree_fn_t qfree, qerr_t *err );
     ~Cyg_Mqueue();
-
     // put() copies len bytes of *buf into the queue at priority prio
-    qerr_t put( const char *buf, size_t len, unsigned int prio,
-                bool block=true);
+    qerr_t put( const char *buf, size_t len, unsigned int prio, bool block=true
+#ifdef CYGFUN_KERNEL_THREADS_TIMER
+                ,cyg_tick_count timeout = 0
+#endif
+              );
 
     // get() returns the oldest highest priority message in the queue in *buf
     // and sets *prio to the priority (if prio is non-NULL) and *len to the
     // actual message size
-    qerr_t get( char *buf, size_t *len, unsigned int *prio, bool block=true ); 
+    qerr_t get( char *buf, size_t *len, unsigned int *prio, bool block=true
+#ifdef CYGFUN_KERNEL_THREADS_TIMER
+                ,cyg_tick_count timeout = 0
+#endif
+              ); 
 
     // count() returns the number of messages in the queue
     long count();
 
     // Supply a callback function to call (with the supplied data argument)
Index: kernel/current/include/mqueue.inl
===================================================================
RCS file: /cvs/ecos/ecos/packages/kernel/current/include/mqueue.inl,v
retrieving revision 1.5
diff -u -5 -p -r1.5 mqueue.inl
--- kernel/current/include/mqueue.inl	23 May 2002 23:06:48 -0000	1.5
+++ kernel/current/include/mqueue.inl	13 Jan 2003 05:45:49 -0000
@@ -270,11 +270,15 @@ Cyg_Mqueue::~Cyg_Mqueue()
 
 //------------------------------------------------------------------------
 
 // put() copies len bytes of *buf into the queue at priority prio
 CYGPRI_KERNEL_SYNCH_MQUEUE_INLINE Cyg_Mqueue::qerr_t
-Cyg_Mqueue::put( const char *buf, size_t len, unsigned int prio, bool block )
+Cyg_Mqueue::put( const char *buf, size_t len, unsigned int prio, bool block
+#ifdef CYGFUN_KERNEL_THREADS_TIMER
+                 , cyg_tick_count timeout
+#endif
+               )
 {
     CYG_REPORT_FUNCTYPE( "err=%d");
     CYG_REPORT_FUNCARG4( "buf=%08x, len=%ld, prio=%ud, block=%d",
                          buf, len, prio, block==true );
     CYG_CHECK_DATA_PTRC( buf );
@@ -284,10 +288,19 @@ Cyg_Mqueue::put( const char *buf, size_t
     qerr_t err;
     struct qentry *qtmp, *qent;
 
     // wait till a freelist entry is available
     if ( true == block ) {
+#ifdef CYGFUN_KERNEL_THREADS_TIMER
+        if ( timeout != 0) {
+	    if ( false == putsem.wait(timeout) ) {
+                err = TIMEOUT;
+                goto exit;
+            }
+        }
+        else
+#endif
         if ( false == putsem.wait() ) {
             err = INTR;
             goto exit;
         }
     } else { 
@@ -377,11 +390,15 @@ Cyg_Mqueue::put( const char *buf, size_t
 // get() returns the oldest highest priority message in the queue in *buf
 // and sets *prio to the priority (if prio is non-NULL) and *len to the
 // actual message size
 
 CYGPRI_KERNEL_SYNCH_MQUEUE_INLINE Cyg_Mqueue::qerr_t
-Cyg_Mqueue::get( char *buf, size_t *len, unsigned int *prio, bool block )
+Cyg_Mqueue::get( char *buf, size_t *len, unsigned int *prio, bool block
+#ifdef CYGFUN_KERNEL_THREADS_TIMER
+                 , cyg_tick_count timeout
+#endif
+               )
 {
     CYG_REPORT_FUNCTYPE( "err=%d");
     CYG_REPORT_FUNCARG4( "buf=%08x, len=%08x, prio=%08x, block=%d",
                          buf, len, prio, block==true );
     CYG_CHECK_DATA_PTRC( buf );
@@ -393,10 +410,19 @@ Cyg_Mqueue::get( char *buf, size_t *len,
     qerr_t err;
     struct qentry *qent;
 
     // wait till a q entry is available
     if ( true == block ) {
+#ifdef CYGFUN_KERNEL_THREADS_TIMER
+        if ( timeout != 0) {
+            if ( false == getsem.wait(timeout) ) {
+                err = TIMEOUT;
+                goto exit;
+            }
+        }
+        else
+#endif
         if ( false == getsem.wait() ) {
             err = INTR;
             goto exit;
         }
     } else { 
Index: kernel/current/ChangeLog
===================================================================
RCS file: /cvs/ecos/ecos/packages/kernel/current/ChangeLog,v
retrieving revision 1.86
diff -u -5 -p -r1.86 ChangeLog
--- kernel/current/ChangeLog	4 Jan 2003 03:23:15 -0000	1.86
+++ kernel/current/ChangeLog	13 Jan 2003 05:45:51 -0000
@@ -1,5 +1,11 @@
+2003-01-13  Dmitriy Korovkin  <dkorovkin@rambler.ru>
+2003-01-13  Jonathan Larmour  <jifl@eCosCentric.com>
+
+	* include/mqueue.hxx: Allow get/put to return time out.
+	* include mqueue.inl: Ditto.
+
 2003-01-02  Gary Thomas  <gary@mlbassoc.com>
 
 	* tests/kcache2.c: New subtest for raw data cache operations.
 
 2002-12-12  Nick Garnett  <nickg@ecoscentric.com>

Index Nav: [Date Index] [Subject Index] [Author Index] [Thread Index]
Message Nav: [Date Prev] [Date Next] [Thread Prev] [Thread Next]