OpenWalnut 1.2.5

WThreadedFunction.h

00001 //---------------------------------------------------------------------------
00002 //
00003 // Project: OpenWalnut ( http://www.openwalnut.org )
00004 //
00005 // Copyright 2009 OpenWalnut Community, BSV@Uni-Leipzig and CNCF@MPI-CBS
00006 // For more information see http://www.openwalnut.org/copying
00007 //
00008 // This file is part of OpenWalnut.
00009 //
00010 // OpenWalnut is free software: you can redistribute it and/or modify
00011 // it under the terms of the GNU Lesser General Public License as published by
00012 // the Free Software Foundation, either version 3 of the License, or
00013 // (at your option) any later version.
00014 //
00015 // OpenWalnut is distributed in the hope that it will be useful,
00016 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00017 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00018 // GNU Lesser General Public License for more details.
00019 //
00020 // You should have received a copy of the GNU Lesser General Public License
00021 // along with OpenWalnut. If not, see <http://www.gnu.org/licenses/>.
00022 //
00023 //---------------------------------------------------------------------------
00024 
00025 #ifndef WTHREADEDFUNCTION_H
00026 #define WTHREADEDFUNCTION_H
00027 
00028 #include <memory.h>
00029 #include <iostream>
00030 
00031 #include <string>
00032 #include <vector>
00033 #include <boost/thread.hpp>
00034 
00035 #include "WAssert.h"
00036 #include "WWorkerThread.h"
00037 #include "WSharedObject.h"
00038 #include "WExportCommon.h"
00039 
00040 /**
00041  * An enum indicating the status of a multithreaded computation
00042  */
00043 enum WThreadedFunctionStatus
00044 {
00045     W_THREADS_INITIALIZED,      //! the status after constructing the function
00046     W_THREADS_RUNNING,          //! the threads were started
00047     W_THREADS_STOP_REQUESTED,   //! a stop was requested and not all threads have stopped yet
00048     W_THREADS_ABORTED,          //! at least one thread was aborted due to a stop request or an exception
00049     W_THREADS_FINISHED          //! all threads completed their work successfully
00050 };
00051 
00052 /**
00053  * An enum indicating the number of threads used
00054  */
00055 enum WThreadedFunctionNbThreads
00056 {
00057     W_AUTOMATIC_NB_THREADS = 0      //!< Use half the available cores as number of threads
00058 };
00059 
00060 /**
00061  * \class WThreadedFunctionBase
00062  *
00063  * A virtual base class for threaded functions (see below).
00064  */
00065 class OWCOMMON_EXPORT WThreadedFunctionBase // NOLINT
00066 {
00067     //! a type for exception signals
00068     typedef boost::signal< void ( WException const& ) > ExceptionSignal;
00069 
00070 public:
00071 
00072     //! a type for exception callbacks
00073     typedef boost::function< void ( WException const& ) > ExceptionFunction;
00074 
00075     /**
00076      * Standard constructor.
00077      */
00078     WThreadedFunctionBase();
00079 
00080     /**
00081      * Destroys the thread pool and stops all threads, if any one of them is still running.
00082      *
00083      * \note Of course, the client has to make sure the threads do not work endlessly on a single job.
00084      */
00085     virtual ~WThreadedFunctionBase();
00086 
00087     /**
00088      * Starts the threads.
00089      */
00090     virtual void run() = 0;
00091 
00092     /**
00093      * Request all threads to stop. Returns immediately, so you might
00094      * have to wait() for the threads to actually finish.
00095      */
00096     virtual void stop() = 0;
00097 
00098     /**
00099      * Wait for all threads to stop.
00100      */
00101     virtual void wait() = 0;
00102 
00103     /**
00104      * Get the status of the threads.
00105      *
00106      * \return The current status.
00107      */
00108     WThreadedFunctionStatus status();
00109 
00110     /**
00111      * Returns a condition that gets fired when all threads have finished.
00112      *
00113      * \return The condition indicating all threads are done.
00114      */
00115     boost::shared_ptr< WCondition > getThreadsDoneCondition();
00116 
00117     /**
00118      * Subscribe a function to an exception signal.
00119      *
00120      * \param func The function to subscribe.
00121      */
00122     void subscribeExceptionSignal( ExceptionFunction func );
00123 
00124 protected:
00125     /**
00126      * WThreadedFunctionBase is non-copyable, so the copy constructor is not implemented.
00127      */
00128     WThreadedFunctionBase( WThreadedFunctionBase const& ); // NOLINT
00129 
00130     /**
00131      * WThreadedFunctionBase is non-copyable, so the copy operator is not implemented.
00132      *
00133      * \return this function
00134      */
00135     WThreadedFunctionBase& operator = ( WThreadedFunctionBase const& );
00136 
00137     //! a condition that gets notified when the work is complete
00138     boost::shared_ptr< WCondition > m_doneCondition;
00139 
00140     //! a signal for exceptions
00141     ExceptionSignal m_exceptionSignal;
00142 
00143     //! the current status
00144     WSharedObject< WThreadedFunctionStatus > m_status;
00145 };
00146 
00147 /**
00148  * \class WThreadedFunction
00149  *
00150  * Creates threads that computes a function in a multithreaded fashion. The template parameter
00151  * is an object that provides a function to execute. The following function needs to be implemented:
00152  *
00153  * void operator ( std::size_t id, std::size_t mx, WBoolFlag const& s );
00154  *
00155  * Here, 'id' is the number of the thread currently executing the function, ranging from
00156  * 0 to mx - 1, where 'mx' is the number of threads running. 's' is a flag that indicates
00157  * if the execution should be stopped. Make sure to check the flag often, so that the threads
00158  * can be stopped when needed.
00159  *
00160  * This class itself is NOT thread-safe, do not access it from different threads simultaneously.
00161  * Also, make sure any resources used by your function are accessed in a threadsafe manner,
00162  * as all threads share the same function object.
00163  *
00164  * Any exception thrown by your function will be caught and forwarded via the exception
00165  * signal. Beware that the signal function will be called in the executing threads, as opposed
00166  * to in your module thread. This means that the exception handler bound to the exception
00167  * signal must be threadsafe too.
00168  *
00169  * The status of the execution can be checked via the status() function. Also, when all threads
00170  * finish (due to throwing exceptions or actually successfully finishing computation ), a condition
00171  * will be notified.
00172  *
00173  * \ingroup common
00174  */
00175 template< class Function_T >
00176 class WThreadedFunction : public WThreadedFunctionBase
00177 {
00178     //! a type for exception signals
00179     typedef boost::signal< void ( WException const& ) > ExceptionSignal;
00180 
00181 public:
00182 
00183     //! a type for exception callbacks
00184     typedef boost::function< void ( WException const& ) > ExceptionFunction;
00185 
00186     /**
00187      * Creates the thread pool with a given number of threads.
00188      *
00189      * \param numThreads The number of threads to create.
00190      * \param function The function object.
00191      *
00192      * \note If the number of threads equals 0, a good number of threads will be determined by the threadpool.
00193      */
00194     WThreadedFunction( std::size_t numThreads, boost::shared_ptr< Function_T > function );
00195 
00196     /**
00197      * Destroys the thread pool and stops all threads, if any one of them is still running.
00198      *
00199      * \note Of course, the client has to make sure the threads do not work endlessly on a single job.
00200      */
00201     virtual ~WThreadedFunction();
00202 
00203     /**
00204      * Starts the threads.
00205      */
00206     virtual void run();
00207 
00208     /**
00209      * Request all threads to stop. Returns immediately, so you might
00210      * have to wait() for the threads to actually finish.
00211      */
00212     virtual void stop();
00213 
00214     /**
00215      * Wait for all threads to stop.
00216      */
00217     virtual void wait();
00218 
00219 private:
00220     /**
00221      * WThreadedFunction is non-copyable, so the copy constructor is not implemented.
00222      */
00223     WThreadedFunction( WThreadedFunction const& ); // NOLINT
00224 
00225     /**
00226      * WThreadedFunction is non-copyable, so the copy operator is not implemented.
00227      *
00228      * \return this function
00229      */
00230     WThreadedFunction& operator = ( WThreadedFunction const& );
00231 
00232     /**
00233      * This function gets subscribed to the threads' stop signals.
00234      */
00235     void handleThreadDone();
00236 
00237     /**
00238      * This function handles exceptions thrown in the worker threads.
00239      *
00240      * \param e The exception that was thrown.
00241      */
00242     void handleThreadException( WException const& e );
00243 
00244     //! the number of threads to manage
00245     std::size_t m_numThreads;
00246 
00247     //! the threads
00248     // use shared_ptr here, because WWorkerThread is non-copyable
00249     std::vector< boost::shared_ptr< WWorkerThread< Function_T > > > m_threads;
00250 
00251     //! the function object
00252     boost::shared_ptr< Function_T > m_func;
00253 
00254     //! a counter that keeps track of how many threads have finished
00255     WSharedObject< std::size_t > m_threadsDone;
00256 };
00257 
00258 template< class Function_T >
00259 WThreadedFunction< Function_T >::WThreadedFunction( std::size_t numThreads, boost::shared_ptr< Function_T > function )
00260     : WThreadedFunctionBase(),
00261       m_numThreads( numThreads ),
00262       m_threads(),
00263       m_func( function ),
00264       m_threadsDone()
00265 {
00266     if( !m_func )
00267     {
00268         throw WException( std::string( "No valid thread function pointer." ) );
00269     }
00270 
00271     // find a suitable number of threads
00272     if( m_numThreads == W_AUTOMATIC_NB_THREADS )
00273     {
00274         m_numThreads = 1;
00275         while( m_numThreads < boost::thread::hardware_concurrency() / 2 && m_numThreads < 1024 )
00276         {
00277             m_numThreads *= 2;
00278         }
00279     }
00280 
00281     // set number of finished threads to 0
00282     m_threadsDone.getWriteTicket()->get() = 0;
00283 
00284     // create threads
00285     for( std::size_t k = 0; k < m_numThreads; ++k )
00286     {
00287         boost::shared_ptr< WWorkerThread< Function_T > > t( new WWorkerThread< Function_T >( m_func, k, m_numThreads ) );
00288         t->subscribeStopSignal( boost::bind( &WThreadedFunction::handleThreadDone, this ) );
00289         t->subscribeExceptionSignal( boost::bind( &WThreadedFunction::handleThreadException, this, _1 ) );
00290         m_threads.push_back( t );
00291     }
00292 }
00293 
00294 template< class Function_T >
00295 WThreadedFunction< Function_T >::~WThreadedFunction()
00296 {
00297     stop();
00298 }
00299 
00300 template< class Function_T >
00301 void WThreadedFunction< Function_T >::run()
00302 {
00303     // set the number of finished threads to 0
00304     m_threadsDone.getWriteTicket()->get() = 0;
00305     // change status
00306     m_status.getWriteTicket()->get() = W_THREADS_RUNNING;
00307     // start threads
00308     for( std::size_t k = 0; k < m_numThreads; ++k )
00309     {
00310         m_threads[ k ]->run();
00311     }
00312 }
00313 
00314 template< class Function_T >
00315 void WThreadedFunction< Function_T >::stop()
00316 {
00317     // change status
00318     m_status.getWriteTicket()->get() = W_THREADS_STOP_REQUESTED;
00319 
00320     typename std::vector< boost::shared_ptr< WWorkerThread< Function_T > > >::iterator it;
00321     // tell the threads to stop
00322     for( it = m_threads.begin(); it != m_threads.end(); ++it )
00323     {
00324         ( *it )->requestStop();
00325     }
00326 }
00327 
00328 template< class Function_T >
00329 void WThreadedFunction< Function_T >::wait()
00330 {
00331     typename std::vector< boost::shared_ptr< WWorkerThread< Function_T > > >::iterator it;
00332     // wait for the threads to stop
00333     for( it = m_threads.begin(); it != m_threads.end(); ++it )
00334     {
00335         ( *it )->wait();
00336     }
00337 }
00338 
00339 template< class Function_T >
00340 void WThreadedFunction< Function_T >::handleThreadDone()
00341 {
00342     typedef typename WSharedObject< std::size_t >::WriteTicket WT;
00343 
00344     WT t = m_threadsDone.getWriteTicket();
00345     WAssert( t->get() < m_numThreads, "" );
00346     ++t->get();
00347     std::size_t k = t->get();
00348     t = WT();
00349 
00350     if( m_numThreads == k )
00351     {
00352         typedef typename WSharedObject< WThreadedFunctionStatus >::WriteTicket ST;
00353         ST s = m_status.getWriteTicket();
00354         if( s->get() == W_THREADS_RUNNING )
00355         {
00356             s->get() = W_THREADS_FINISHED;
00357         }
00358         else if( s->get() == W_THREADS_STOP_REQUESTED )
00359         {
00360             s->get() = W_THREADS_ABORTED;
00361         }
00362         else
00363         {
00364             throw WException( std::string( "Invalid status change." ) );
00365         }
00366         m_doneCondition->notify();
00367     }
00368 }
00369 
00370 template< class Function_T >
00371 void WThreadedFunction< Function_T >::handleThreadException( WException const& e )
00372 {
00373     // change status
00374     typedef typename WSharedObject< WThreadedFunctionStatus >::WriteTicket WT;
00375     WT w = m_status.getWriteTicket();
00376     WAssert( w->get() != W_THREADS_FINISHED &&
00377              w->get() != W_THREADS_ABORTED, "" );
00378     if( w->get() == W_THREADS_RUNNING )
00379     {
00380         w->get() = W_THREADS_STOP_REQUESTED;
00381     }
00382     // force destruction of the write ticket
00383     w = WT();
00384     // update the number of finished threads
00385     handleThreadDone();
00386 
00387     m_exceptionSignal( e );
00388 }
00389 
00390 #endif  // WTHREADEDFUNCTION_H
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator Friends