OpenWalnut  1.4.0
WThreadedJobs.h
00001 //---------------------------------------------------------------------------
00002 //
00003 // Project: OpenWalnut ( http://www.openwalnut.org )
00004 //
00005 // Copyright 2009 OpenWalnut Community, BSV@Uni-Leipzig and CNCF@MPI-CBS
00006 // For more information see http://www.openwalnut.org/copying
00007 //
00008 // This file is part of OpenWalnut.
00009 //
00010 // OpenWalnut is free software: you can redistribute it and/or modify
00011 // it under the terms of the GNU Lesser General Public License as published by
00012 // the Free Software Foundation, either version 3 of the License, or
00013 // (at your option) any later version.
00014 //
00015 // OpenWalnut is distributed in the hope that it will be useful,
00016 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00017 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00018 // GNU Lesser General Public License for more details.
00019 //
00020 // You should have received a copy of the GNU Lesser General Public License
00021 // along with OpenWalnut. If not, see <http://www.gnu.org/licenses/>.
00022 //
00023 //---------------------------------------------------------------------------
00024 
00025 #ifndef WTHREADEDJOBS_H
00026 #define WTHREADEDJOBS_H
00027 
00028 #include <string>
00029 
00030 #include <boost/shared_ptr.hpp>
00031 
00032 #include "WException.h"
00033 #include "WFlag.h"
00034 
00035 /**
00036  * \class WThreadedJobs
00037  *
00038  * A threaded functor base class for producer-consumer-style multithreaded computation.
00039  *
00040  * A job generator function produces jobs that are then distributed to the threads in
00041  * a first come first serve manner. The first template parameter is the type of the input data,
00042  * for example a WDataSetScalar. The second template parameter is the type of object that
00043  * represents the jobs.
00044  *
00045  * Both the getJob() and the compute() functions need to be implemented.
00046  *
00047  * \ingroup common
00048  */
00049 template< class Input_T, class Job_T >
00050 class WThreadedJobs
00051 {
00052 public:
00053     //! the input type
00054     typedef Input_T InputType;
00055 
00056     //! the job type
00057     typedef Job_T JobType;
00058 
00059     /**
00060      * Constructor.
00061      *
00062      * \param input The input.
00063      */
00064     WThreadedJobs( boost::shared_ptr< InputType const > input ); // NOLINT
00065 
00066     /**
00067      * Destructor.
00068      */
00069     virtual ~WThreadedJobs();
00070 
00071     /**
00072      * The threaded function operation. Pulls jobs and executes the \see compute()
00073      * function.
00074      *
00075      * \param id The thread's ID.
00076      * \param numThreads How many threads are working on the jobs.
00077      * \param shutdown A shared flag indicating the thread should be stopped.
00078      */
00079     void operator() ( std::size_t id, std::size_t numThreads, WBoolFlag const& shutdown );
00080 
00081     /**
00082      * Abstract function for the job aquisition.
00083      *
00084      * \param job The job (output).
00085      * \return false, iff no more jobs need to be processed.
00086      */
00087     virtual bool getJob( JobType& job ) = 0; // NOLINT
00088 
00089     /**
00090      * Abstract function that performs the actual computation per job.
00091      *
00092      * \param input The input data.
00093      * \param job The current job.
00094      */
00095     virtual void compute( boost::shared_ptr< InputType const > input, JobType const& job ) = 0;
00096 
00097 protected:
00098     //! the input
00099     boost::shared_ptr< InputType const > m_input;
00100 private:
00101 };
00102 
00103 template< class Input_T, class Job_T >
00104 WThreadedJobs< Input_T, Job_T >::WThreadedJobs( boost::shared_ptr< InputType const > input )
00105     : m_input( input )
00106 {
00107     if( !m_input )
00108     {
00109         throw WException( std::string( "Invalid input." ) );
00110     }
00111 }
00112 
00113 template< class Input_T, class Job_T >
00114 WThreadedJobs< Input_T, Job_T >::~WThreadedJobs()
00115 {
00116 }
00117 
00118 template< class Input_T, class Job_T >
00119 void WThreadedJobs< Input_T, Job_T >::operator() ( std::size_t /* id */, std::size_t /* numThreads */, WBoolFlag const& shutdown )
00120 {
00121     JobType job;
00122     while( getJob( job ) && !shutdown() )
00123     {
00124         compute( m_input, job );
00125     }
00126 }
00127 
00128 /**
00129  * Nearly the same class as WThreadedJobs, but this class is intended to be used for multithreaded operations on voxels and therefore it
00130  * uses Striping to partition the data. This is necessarry since if the threads are not operating on blocks, they slow down!
00131  */
00132 template< class Input_T, class Job_T >
00133 class WThreadedStripingJobs
00134 {
00135 public:
00136     //! the input type
00137     typedef Input_T InputType;
00138 
00139     //! the job type
00140     typedef Job_T JobType;
00141 
00142     /**
00143      * Constructor.
00144      *
00145      * \param input The input.
00146      */
00147     WThreadedStripingJobs( boost::shared_ptr< InputType const > input ); // NOLINT
00148 
00149     /**
00150      * Destructor.
00151      */
00152     virtual ~WThreadedStripingJobs();
00153 
00154     /**
00155      * The threaded function operation. Pulls jobs and executes the \see compute()
00156      * function.
00157      *
00158      * \param id The thread's ID.
00159      * \param numThreads How many threads are working on the jobs.
00160      * \param shutdown A shared flag indicating the thread should be stopped.
00161      */
00162     void operator() ( std::size_t id, std::size_t numThreads, WBoolFlag const& shutdown );
00163 
00164     /**
00165      * Abstract function that performs the actual computation per voxel.
00166      *
00167      * \param input The input data.
00168      * \param voxelNum The voxel number to operate on.
00169      */
00170     virtual void compute( boost::shared_ptr< InputType const > input, std::size_t voxelNum ) = 0;
00171 
00172 protected:
00173     //! the input
00174     boost::shared_ptr< InputType const > m_input;
00175 private:
00176 };
00177 
00178 template< class Input_T, class Job_T >
00179 WThreadedStripingJobs< Input_T, Job_T >::WThreadedStripingJobs( boost::shared_ptr< InputType const > input )
00180     : m_input( input )
00181 {
00182     if( !m_input )
00183     {
00184         throw WException( std::string( "Invalid input." ) );
00185     }
00186 }
00187 
00188 template< class Input_T, class Job_T >
00189 WThreadedStripingJobs< Input_T, Job_T >::~WThreadedStripingJobs()
00190 {
00191 }
00192 
00193 template< class Input_T, class Job_T >
00194 void WThreadedStripingJobs< Input_T, Job_T >::operator() ( std::size_t id, std::size_t numThreads, WBoolFlag const& shutdown )
00195 {
00196     WAssert( m_input, "Bug: operations of an invalid input requested." );
00197     size_t numElements = m_input->size();
00198 
00199     // partition the voxels via simple striping
00200     size_t start = numElements / numThreads * id;
00201     size_t end = ( id + 1 ) * ( numElements / numThreads );
00202     if( id == numThreads - 1 ) // last thread may have less elements to take care.
00203     {
00204         end = numElements;
00205     }
00206 
00207     for( size_t voxelNum = start; ( voxelNum < end ) && !shutdown(); ++voxelNum )
00208     {
00209         compute( m_input, voxelNum );
00210     }
00211 }
00212 
00213 #endif  // WTHREADEDJOBS_H