WThreadedJobs.h

00001 //---------------------------------------------------------------------------
00002 //
00003 // Project: OpenWalnut ( http://www.openwalnut.org )
00004 //
00005 // Copyright 2009 OpenWalnut Community, BSV@Uni-Leipzig and CNCF@MPI-CBS
00006 // For more information see http://www.openwalnut.org/copying
00007 //
00008 // This file is part of OpenWalnut.
00009 //
00010 // OpenWalnut is free software: you can redistribute it and/or modify
00011 // it under the terms of the GNU Lesser General Public License as published by
00012 // the Free Software Foundation, either version 3 of the License, or
00013 // (at your option) any later version.
00014 //
00015 // OpenWalnut is distributed in the hope that it will be useful,
00016 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00017 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00018 // GNU Lesser General Public License for more details.
00019 //
00020 // You should have received a copy of the GNU Lesser General Public License
00021 // along with OpenWalnut. If not, see <http://www.gnu.org/licenses/>.
00022 //
00023 //---------------------------------------------------------------------------
00024 
00025 #ifndef WTHREADEDJOBS_H
00026 #define WTHREADEDJOBS_H
00027 
00028 #include <iostream>
00029 #include <string>
00030 
00031 #include <boost/shared_ptr.hpp>
00032 
00033 #include "WException.h"
00034 #include "WFlag.h"
00035 #include "WLogger.h"
00036 
00037 /**
00038  * \class WThreadedJobs
00039  *
00040  * A threaded functor base class for producer-consumer-style multithreaded computation.
00041  *
00042  * A job generator function produces jobs that are then distributed to the threads in
00043  * a first come first serve manner. The first template parameter is the type of the input data,
00044  * for example a WDataSetScalar. The second template parameter is the type of object that
00045  * represents the jobs.
00046  *
00047  * Both the getJob() and the compute() functions need to be implemented.
00048  *
00049  * \ingroup common
00050  */
00051 template< class Input_T, class Job_T >
00052 class WThreadedJobs
00053 {
00054 public:
00055     //! the input type
00056     typedef Input_T InputType;
00057 
00058     //! the job type
00059     typedef Job_T JobType;
00060 
00061     /**
00062      * Constructor.
00063      *
00064      * \param input The input.
00065      */
00066     WThreadedJobs( boost::shared_ptr< InputType const > input ); // NOLINT
00067 
00068     /**
00069      * Destructor.
00070      */
00071     virtual ~WThreadedJobs();
00072 
00073     /**
00074      * The threaded function operation. Pulls jobs and executes the \see compute()
00075      * function.
00076      *
00077      * \param id The thread's ID.
00078      * \param numThreads How many threads are working on the jobs.
00079      * \param shutdown A shared flag indicating the thread should be stopped.
00080      */
00081     void operator() ( std::size_t id, std::size_t numThreads, WBoolFlag const& shutdown );
00082 
00083     /**
00084      * Abstract function for the job aquisition.
00085      *
00086      * \param job The job (output).
00087      * \return false, iff no more jobs need to be processed.
00088      */
00089     virtual bool getJob( JobType& job ) = 0; // NOLINT
00090 
00091     /**
00092      * Abstract function that performs the actual computation per job.
00093      *
00094      * \param input The input data.
00095      * \param job The current job.
00096      */
00097     virtual void compute( boost::shared_ptr< InputType const > input, JobType const& job ) = 0;
00098 
00099 protected:
00100 
00101     //! the input
00102     boost::shared_ptr< InputType const > m_input;
00103 private:
00104 };
00105 
00106 template< class Input_T, class Job_T >
00107 WThreadedJobs< Input_T, Job_T >::WThreadedJobs( boost::shared_ptr< InputType const > input )
00108     : m_input( input )
00109 {
00110     if( !m_input )
00111     {
00112         throw WException( std::string( "Invalid input." ) );
00113     }
00114 }
00115 
00116 template< class Input_T, class Job_T >
00117 WThreadedJobs< Input_T, Job_T >::~WThreadedJobs()
00118 {
00119 }
00120 
00121 template< class Input_T, class Job_T >
00122 void WThreadedJobs< Input_T, Job_T >::operator() ( std::size_t /* id */, std::size_t /* numThreads */, WBoolFlag const& shutdown )
00123 {
00124     JobType job;
00125     while( getJob( job ) && !shutdown() )
00126     {
00127         compute( m_input, job );
00128     }
00129 }
00130 
00131 /**
00132  * Nearly the same class as WThreadedJobs, but this class is intended to be used for multithreaded operations on voxels and therefore it
00133  * uses Striping to partition the data. This is necessarry since if the threads are not operating on blocks, they slow down!
00134  */
00135 template< class Input_T, class Job_T >
00136 class WThreadedStripingJobs
00137 {
00138 public:
00139     //! the input type
00140     typedef Input_T InputType;
00141 
00142     //! the job type
00143     typedef Job_T JobType;
00144 
00145     /**
00146      * Constructor.
00147      *
00148      * \param input The input.
00149      */
00150     WThreadedStripingJobs( boost::shared_ptr< InputType const > input ); // NOLINT
00151 
00152     /**
00153      * Destructor.
00154      */
00155     virtual ~WThreadedStripingJobs();
00156 
00157     /**
00158      * The threaded function operation. Pulls jobs and executes the \see compute()
00159      * function.
00160      *
00161      * \param id The thread's ID.
00162      * \param numThreads How many threads are working on the jobs.
00163      * \param shutdown A shared flag indicating the thread should be stopped.
00164      */
00165     void operator() ( std::size_t id, std::size_t numThreads, WBoolFlag const& shutdown );
00166 
00167     /**
00168      * Abstract function that performs the actual computation per voxel.
00169      *
00170      * \param input The input data.
00171      * \param voxelNum The voxel number to operate on.
00172      */
00173     virtual void compute( boost::shared_ptr< InputType const > input, std::size_t voxelNum ) = 0;
00174 
00175 protected:
00176 
00177     //! the input
00178     boost::shared_ptr< InputType const > m_input;
00179 private:
00180 };
00181 
00182 template< class Input_T, class Job_T >
00183 WThreadedStripingJobs< Input_T, Job_T >::WThreadedStripingJobs( boost::shared_ptr< InputType const > input )
00184     : m_input( input )
00185 {
00186     if( !m_input )
00187     {
00188         throw WException( std::string( "Invalid input." ) );
00189     }
00190 }
00191 
00192 template< class Input_T, class Job_T >
00193 WThreadedStripingJobs< Input_T, Job_T >::~WThreadedStripingJobs()
00194 {
00195 }
00196 
00197 template< class Input_T, class Job_T >
00198 void WThreadedStripingJobs< Input_T, Job_T >::operator() ( std::size_t id, std::size_t numThreads, WBoolFlag const& shutdown )
00199 {
00200     WAssert( m_input, "Bug: operations of an invalid input requested." );
00201     size_t numElements = m_input->size();
00202 
00203     // partition the voxels via simple striping
00204     size_t start = numElements / numThreads * id;
00205     size_t end = ( id + 1 ) * ( numElements / numThreads );
00206     if( id == numThreads - 1 ) // last thread may have less elements to take care.
00207     {
00208         end = numElements;
00209     }
00210 
00211     for( size_t voxelNum = start; ( voxelNum < end ) && !shutdown(); ++voxelNum )
00212     {
00213         compute( m_input, voxelNum );
00214     }
00215 }
00216 
00217 #endif  // WTHREADEDJOBS_H
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator Friends