OpenWalnut 1.3.1
WThreadedJobs.h
00001 //---------------------------------------------------------------------------
00002 //
00003 // Project: OpenWalnut ( http://www.openwalnut.org )
00004 //
00005 // Copyright 2009 OpenWalnut Community, BSV@Uni-Leipzig and CNCF@MPI-CBS
00006 // For more information see http://www.openwalnut.org/copying
00007 //
00008 // This file is part of OpenWalnut.
00009 //
00010 // OpenWalnut is free software: you can redistribute it and/or modify
00011 // it under the terms of the GNU Lesser General Public License as published by
00012 // the Free Software Foundation, either version 3 of the License, or
00013 // (at your option) any later version.
00014 //
00015 // OpenWalnut is distributed in the hope that it will be useful,
00016 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00017 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00018 // GNU Lesser General Public License for more details.
00019 //
00020 // You should have received a copy of the GNU Lesser General Public License
00021 // along with OpenWalnut. If not, see <http://www.gnu.org/licenses/>.
00022 //
00023 //---------------------------------------------------------------------------
00024 
00025 #ifndef WTHREADEDJOBS_H
00026 #define WTHREADEDJOBS_H
00027 
00028 #include <iostream>
00029 #include <string>
00030 
00031 #include <boost/shared_ptr.hpp>
00032 
00033 #include "WException.h"
00034 #include "WFlag.h"
00035 #include "WLogger.h"
00036 
00037 /**
00038  * \class WThreadedJobs
00039  *
00040  * A threaded functor base class for producer-consumer-style multithreaded computation.
00041  *
00042  * A job generator function produces jobs that are then distributed to the threads in
00043  * a first come first serve manner. The first template parameter is the type of the input data,
00044  * for example a WDataSetScalar. The second template parameter is the type of object that
00045  * represents the jobs.
00046  *
00047  * Both the getJob() and the compute() functions need to be implemented.
00048  *
00049  * \ingroup common
00050  */
00051 template< class Input_T, class Job_T >
00052 class WThreadedJobs
00053 {
00054 public:
00055     //! the input type
00056     typedef Input_T InputType;
00057 
00058     //! the job type
00059     typedef Job_T JobType;
00060 
00061     /**
00062      * Constructor.
00063      *
00064      * \param input The input.
00065      */
00066     WThreadedJobs( boost::shared_ptr< InputType const > input ); // NOLINT
00067 
00068     /**
00069      * Destructor.
00070      */
00071     virtual ~WThreadedJobs();
00072 
00073     /**
00074      * The threaded function operation. Pulls jobs and executes the \see compute()
00075      * function.
00076      *
00077      * \param id The thread's ID.
00078      * \param numThreads How many threads are working on the jobs.
00079      * \param shutdown A shared flag indicating the thread should be stopped.
00080      */
00081     void operator() ( std::size_t id, std::size_t numThreads, WBoolFlag const& shutdown );
00082 
00083     /**
00084      * Abstract function for the job aquisition.
00085      *
00086      * \param job The job (output).
00087      * \return false, iff no more jobs need to be processed.
00088      */
00089     virtual bool getJob( JobType& job ) = 0; // NOLINT
00090 
00091     /**
00092      * Abstract function that performs the actual computation per job.
00093      *
00094      * \param input The input data.
00095      * \param job The current job.
00096      */
00097     virtual void compute( boost::shared_ptr< InputType const > input, JobType const& job ) = 0;
00098 
00099 protected:
00100     //! the input
00101     boost::shared_ptr< InputType const > m_input;
00102 private:
00103 };
00104 
00105 template< class Input_T, class Job_T >
00106 WThreadedJobs< Input_T, Job_T >::WThreadedJobs( boost::shared_ptr< InputType const > input )
00107     : m_input( input )
00108 {
00109     if( !m_input )
00110     {
00111         throw WException( std::string( "Invalid input." ) );
00112     }
00113 }
00114 
00115 template< class Input_T, class Job_T >
00116 WThreadedJobs< Input_T, Job_T >::~WThreadedJobs()
00117 {
00118 }
00119 
00120 template< class Input_T, class Job_T >
00121 void WThreadedJobs< Input_T, Job_T >::operator() ( std::size_t /* id */, std::size_t /* numThreads */, WBoolFlag const& shutdown )
00122 {
00123     JobType job;
00124     while( getJob( job ) && !shutdown() )
00125     {
00126         compute( m_input, job );
00127     }
00128 }
00129 
00130 /**
00131  * Nearly the same class as WThreadedJobs, but this class is intended to be used for multithreaded operations on voxels and therefore it
00132  * uses Striping to partition the data. This is necessarry since if the threads are not operating on blocks, they slow down!
00133  */
00134 template< class Input_T, class Job_T >
00135 class WThreadedStripingJobs
00136 {
00137 public:
00138     //! the input type
00139     typedef Input_T InputType;
00140 
00141     //! the job type
00142     typedef Job_T JobType;
00143 
00144     /**
00145      * Constructor.
00146      *
00147      * \param input The input.
00148      */
00149     WThreadedStripingJobs( boost::shared_ptr< InputType const > input ); // NOLINT
00150 
00151     /**
00152      * Destructor.
00153      */
00154     virtual ~WThreadedStripingJobs();
00155 
00156     /**
00157      * The threaded function operation. Pulls jobs and executes the \see compute()
00158      * function.
00159      *
00160      * \param id The thread's ID.
00161      * \param numThreads How many threads are working on the jobs.
00162      * \param shutdown A shared flag indicating the thread should be stopped.
00163      */
00164     void operator() ( std::size_t id, std::size_t numThreads, WBoolFlag const& shutdown );
00165 
00166     /**
00167      * Abstract function that performs the actual computation per voxel.
00168      *
00169      * \param input The input data.
00170      * \param voxelNum The voxel number to operate on.
00171      */
00172     virtual void compute( boost::shared_ptr< InputType const > input, std::size_t voxelNum ) = 0;
00173 
00174 protected:
00175     //! the input
00176     boost::shared_ptr< InputType const > m_input;
00177 private:
00178 };
00179 
00180 template< class Input_T, class Job_T >
00181 WThreadedStripingJobs< Input_T, Job_T >::WThreadedStripingJobs( boost::shared_ptr< InputType const > input )
00182     : m_input( input )
00183 {
00184     if( !m_input )
00185     {
00186         throw WException( std::string( "Invalid input." ) );
00187     }
00188 }
00189 
00190 template< class Input_T, class Job_T >
00191 WThreadedStripingJobs< Input_T, Job_T >::~WThreadedStripingJobs()
00192 {
00193 }
00194 
00195 template< class Input_T, class Job_T >
00196 void WThreadedStripingJobs< Input_T, Job_T >::operator() ( std::size_t id, std::size_t numThreads, WBoolFlag const& shutdown )
00197 {
00198     WAssert( m_input, "Bug: operations of an invalid input requested." );
00199     size_t numElements = m_input->size();
00200 
00201     // partition the voxels via simple striping
00202     size_t start = numElements / numThreads * id;
00203     size_t end = ( id + 1 ) * ( numElements / numThreads );
00204     if( id == numThreads - 1 ) // last thread may have less elements to take care.
00205     {
00206         end = numElements;
00207     }
00208 
00209     for( size_t voxelNum = start; ( voxelNum < end ) && !shutdown(); ++voxelNum )
00210     {
00211         compute( m_input, voxelNum );
00212     }
00213 }
00214 
00215 #endif  // WTHREADEDJOBS_H