OpenWalnut 1.3.1
WModuleContainer.cpp
00001 //---------------------------------------------------------------------------
00002 //
00003 // Project: OpenWalnut ( http://www.openwalnut.org )
00004 //
00005 // Copyright 2009 OpenWalnut Community, BSV@Uni-Leipzig and CNCF@MPI-CBS
00006 // For more information see http://www.openwalnut.org/copying
00007 //
00008 // This file is part of OpenWalnut.
00009 //
00010 // OpenWalnut is free software: you can redistribute it and/or modify
00011 // it under the terms of the GNU Lesser General Public License as published by
00012 // the Free Software Foundation, either version 3 of the License, or
00013 // (at your option) any later version.
00014 //
00015 // OpenWalnut is distributed in the hope that it will be useful,
00016 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00017 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00018 // GNU Lesser General Public License for more details.
00019 //
00020 // You should have received a copy of the GNU Lesser General Public License
00021 // along with OpenWalnut. If not, see <http://www.gnu.org/licenses/>.
00022 //
00023 //---------------------------------------------------------------------------
00024 
00025 #include <list>
00026 #include <set>
00027 #include <vector>
00028 #include <string>
00029 #include <sstream>
00030 #include <algorithm>
00031 #include <utility>
00032 
00033 #include "../common/WLogger.h"
00034 #include "../common/WThreadedRunner.h"
00035 #include "../common/exceptions/WSignalSubscriptionFailed.h"
00036 #include "WBatchLoader.h"
00037 #include "WModuleCombiner.h"
00038 #include "WModuleFactory.h"
00039 #include "WModuleInputConnector.h"
00040 #include "WModuleOutputConnector.h"
00041 #include "WModuleTypes.h"
00042 #include "combiner/WApplyCombiner.h"
00043 #include "exceptions/WModuleAlreadyAssociated.h"
00044 #include "exceptions/WModuleUninitialized.h"
00045 #include "WDataModule.h"
00046 
00047 #include "WModuleContainer.h"
00048 
00049 WModuleContainer::WModuleContainer( std::string name, std::string description ):
00050     WModule(),
00051     m_name( name ),
00052     m_description( description ),
00053     m_crashIfModuleCrashes( true )
00054 {
00055     WLogger::getLogger()->addLogMessage( "Constructing module container." , "ModuleContainer (" + getName() + ")", LL_DEBUG );
00056     // initialize members
00057 }
00058 
00059 WModuleContainer::~WModuleContainer()
00060 {
00061     // cleanup
00062 }
00063 
00064 void WModuleContainer::moduleMain()
00065 {
00066     // do nothing here. The WModule class enforces us to overwrite this method here, but we do not need it.
00067     // Only set the ready flag.
00068     ready();
00069 }
00070 
00071 boost::shared_ptr< WModule > WModuleContainer::factory() const
00072 {
00073     // this factory is not used actually.
00074     return boost::shared_ptr< WModule >( new WModuleContainer( getName(), getDescription() ) );
00075 }
00076 
00077 void WModuleContainer::add( boost::shared_ptr< WModule > module, bool run )
00078 {
00079     if( !module )
00080     {
00081         // just ignore NULL Pointer
00082         return;
00083     }
00084 
00085     WLogger::getLogger()->addLogMessage( "Adding module \"" + module->getName() + "\" to container." ,
00086             "ModuleContainer (" + getName() + ")", LL_INFO );
00087 
00088     if( !module->isInitialized()() )
00089     {
00090         std::ostringstream s;
00091         s << "Could not add module \"" << module->getName() << "\" to container \"" + getName() + "\". Reason: module not initialized.";
00092 
00093         throw WModuleUninitialized( s.str() );
00094     }
00095 
00096     // already associated with this container?
00097     if( module->getAssociatedContainer() == shared_from_this() )
00098     {
00099         WLogger::getLogger()->addLogMessage( "Adding module \"" + module->getName() + "\" to container not needed. Its already inside." ,
00100             "ModuleContainer (" + getName() + ")", LL_INFO );
00101         return;
00102     }
00103 
00104     // is this module already associated?
00105     if( module->isAssociated()() )
00106     {
00107         module->getAssociatedContainer()->remove( module );
00108     }
00109 
00110     // get write lock
00111     ModuleSharedContainerType::WriteTicket wlock = m_modules.getWriteTicket();
00112     wlock->get().insert( module );
00113     wlock.reset();
00114 
00115     module->setAssociatedContainer( boost::shared_static_cast< WModuleContainer >( shared_from_this() ) );
00116     WLogger::getLogger()->addLogMessage( "Associated module \"" + module->getName() + "\" with container." , "ModuleContainer (" + getName() + ")",
00117             LL_INFO );
00118 
00119     // now module->isUsable() is true
00120 
00121     // Connect the error handler and all default handlers:
00122     ModuleSubscriptionsSharedType::WriteTicket subscriptionsLock = m_moduleSubscriptions.getWriteTicket();
00123 
00124     // connect the containers signal handler explicitly
00125     t_ModuleErrorSignalHandlerType func = boost::bind( &WModuleContainer::moduleError, this, _1, _2 );
00126     boost::signals2::connection signalCon = module->subscribeSignal( WM_ERROR, func );
00127     subscriptionsLock->get().insert( ModuleSubscription( module, signalCon ) );
00128 
00129     // connect default notifiers:
00130     boost::shared_lock<boost::shared_mutex> slock = boost::shared_lock<boost::shared_mutex>( m_errorNotifiersLock );
00131     for( std::list< t_ModuleErrorSignalHandlerType >::iterator iter = m_errorNotifiers.begin(); iter != m_errorNotifiers.end(); ++iter)
00132     {
00133         signalCon = module->subscribeSignal( WM_ERROR, ( *iter ) );
00134         subscriptionsLock->get().insert( ModuleSubscription( module, signalCon ) );
00135     }
00136     slock = boost::shared_lock<boost::shared_mutex>( m_associatedNotifiersLock );
00137     for( std::list< t_ModuleGenericSignalHandlerType >::iterator iter = m_associatedNotifiers.begin(); iter != m_associatedNotifiers.end(); ++iter)
00138     {
00139         // call associated notifier
00140         ( *iter )( module );
00141     }
00142     slock = boost::shared_lock<boost::shared_mutex>( m_connectorNotifiersLock );
00143     for( std::list< t_GenericSignalHandlerType >::iterator iter = m_connectorEstablishedNotifiers.begin();
00144                                                             iter != m_connectorEstablishedNotifiers.end(); ++iter )
00145     {
00146         // subscribe on each input
00147         for( InputConnectorList::const_iterator ins = module->getInputConnectors().begin(); ins != module->getInputConnectors().end(); ++ins )
00148         {
00149             signalCon = ( *ins )->subscribeSignal( CONNECTION_ESTABLISHED, ( *iter ) );
00150             subscriptionsLock->get().insert( ModuleSubscription( module, signalCon ) );
00151         }
00152     }
00153     for( std::list< t_GenericSignalHandlerType >::iterator iter = m_connectorClosedNotifiers.begin();
00154                                                             iter != m_connectorClosedNotifiers.end(); ++iter )
00155     {
00156         // subscribe on each input
00157         for( InputConnectorList::const_iterator ins = module->getInputConnectors().begin(); ins != module->getInputConnectors().end(); ++ins )
00158         {
00159             signalCon = ( *ins )->subscribeSignal( CONNECTION_CLOSED, ( *iter ) );
00160             subscriptionsLock->get().insert( ModuleSubscription( module, signalCon ) );
00161         }
00162     }
00163     slock = boost::shared_lock<boost::shared_mutex>( m_readyNotifiersLock );
00164     for( std::list< t_ModuleGenericSignalHandlerType >::iterator iter = m_readyNotifiers.begin(); iter != m_readyNotifiers.end(); ++iter)
00165     {
00166         signalCon = module->subscribeSignal( WM_READY, ( *iter ) );
00167         subscriptionsLock->get().insert( ModuleSubscription( module, signalCon ) );
00168     }
00169     slock.unlock();
00170 
00171     // free the subscriptions lock
00172     subscriptionsLock.reset();
00173 
00174     // add the modules progress to local progress combiner
00175     m_progress->addSubProgress( module->getRootProgressCombiner() );
00176 
00177     // run it
00178     if( run )
00179     {
00180         module->run();
00181     }
00182 }
00183 
00184 WModule::SPtr WModuleContainer::createAndAdd( std::string name )
00185 {
00186     WModule::SPtr module = WModuleFactory::getModuleFactory()->create(
00187         WModuleFactory::getModuleFactory()->getPrototypeByName( name )
00188     );
00189 
00190     // add to the container
00191     add( module );
00192     module->isReady().wait();
00193 
00194     return module;
00195 }
00196 
00197 void WModuleContainer::remove( boost::shared_ptr< WModule > module )
00198 {
00199     // simple flat removal.
00200 
00201     WLogger::getLogger()->addLogMessage( "Removing module \"" + module->getName() + "\" from container." , "ModuleContainer (" + getName() + ")",
00202             LL_DEBUG );
00203 
00204     if( module->getAssociatedContainer() != shared_from_this() )
00205     {
00206         return;
00207     }
00208 
00209     // remove connections inside this container
00210     module->disconnect();
00211 
00212     // remove progress combiner
00213     m_progress->removeSubProgress( module->getRootProgressCombiner() );
00214 
00215     // remove signal subscriptions to this containers default notifiers
00216     ModuleSubscriptionsSharedType::WriteTicket subscriptionsLock = m_moduleSubscriptions.getWriteTicket();
00217 
00218     // find all subscriptions for this module
00219     std::pair< ModuleSubscriptionsIterator, ModuleSubscriptionsIterator > subscriptions = subscriptionsLock->get().equal_range( module );
00220     for( ModuleSubscriptionsIterator it = subscriptions.first; it != subscriptions.second; ++it )
00221     {
00222         // disconnect subscription.
00223         ( *it ).second.disconnect();
00224     }
00225     // erase them
00226     subscriptionsLock->get().erase( subscriptions.first, subscriptions.second );
00227     subscriptionsLock.reset();
00228 
00229     // get write lock
00230     ModuleSharedContainerType::WriteTicket wlock = m_modules.getWriteTicket();
00231     wlock->get().erase( module );
00232     wlock.reset();
00233 
00234     module->setAssociatedContainer( boost::shared_ptr< WModuleContainer >() );
00235 
00236     // tell all interested about removal
00237     boost::shared_lock<boost::shared_mutex> slock = boost::shared_lock<boost::shared_mutex>( m_removedNotifiersLock );
00238     for( std::list< t_ModuleGenericSignalHandlerType >::iterator iter = m_removedNotifiers.begin(); iter != m_removedNotifiers.end(); ++iter)
00239     {
00240         // call associated notifier
00241         ( *iter )( module );
00242     }
00243     slock.unlock();
00244 }
00245 
00246 WModuleContainer::DataModuleListType WModuleContainer::getDataModules()
00247 {
00248     DataModuleListType l;
00249 
00250     // lock, unlocked if l looses focus
00251     ModuleSharedContainerType::ReadTicket lock = m_modules.getReadTicket();
00252 
00253     // iterate module list
00254     for( ModuleConstIterator iter = lock->get().begin(); iter != lock->get().end(); ++iter )
00255     {
00256         // is this module a data module?
00257         if( ( *iter )->getType() == MODULE_DATA )
00258         {
00259             boost::shared_ptr< WDataModule > dm = boost::shared_static_cast< WDataModule >( *iter );
00260 
00261             // now check the contained dataset ( isTexture and whether it is ready )
00262             if( dm->isReady()() )
00263             {
00264                 l.insert( dm );
00265             }
00266         }
00267     }
00268 
00269     return l;
00270 }
00271 
00272 void WModuleContainer::stop()
00273 {
00274     WLogger::getLogger()->addLogMessage( "Stopping pending threads." , "ModuleContainer (" + getName() + ")", LL_INFO );
00275 
00276     // read lock
00277     boost::shared_lock<boost::shared_mutex> slock = boost::shared_lock<boost::shared_mutex>( m_pendingThreadsLock );
00278     for( std::set< boost::shared_ptr< WThreadedRunner > >::iterator listIter = m_pendingThreads.begin(); listIter != m_pendingThreads.end();
00279             ++listIter )
00280     {
00281         ( *listIter )->wait( true );
00282     }
00283     slock.unlock();
00284 
00285     WLogger::getLogger()->addLogMessage( "Stopping modules." , "ModuleContainer (" + getName() + ")", LL_INFO );
00286 
00287     // lock, unlocked if l looses focus
00288     ModuleSharedContainerType::ReadTicket lock = m_modules.getReadTicket();
00289 
00290     for( ModuleConstIterator listIter = lock->get().begin(); listIter != lock->get().end(); ++listIter )
00291     {
00292         WLogger::getLogger()->addLogMessage( "Waiting for module \"" + ( *listIter )->getName() + "\" to finish." ,
00293                 "ModuleContainer (" + getName() + ")", LL_INFO );
00294         ( *listIter )->wait( true );
00295         ( *listIter )->setAssociatedContainer( boost::shared_ptr< WModuleContainer >() );   // remove last refs to this container inside the module
00296     }
00297     lock.reset();
00298 
00299     // get write lock
00300     // lock, unlocked if l looses focus
00301     ModuleSharedContainerType::WriteTicket wlock = m_modules.getWriteTicket();
00302     wlock->get().clear();
00303 }
00304 
00305 const std::string WModuleContainer::getName() const
00306 {
00307     return m_name;
00308 }
00309 
00310 const std::string WModuleContainer::getDescription() const
00311 {
00312     return m_description;
00313 }
00314 
00315 void WModuleContainer::addDefaultNotifier( MODULE_SIGNAL signal, t_ModuleGenericSignalHandlerType notifier )
00316 {
00317     boost::unique_lock<boost::shared_mutex> lock;
00318     switch( signal)
00319     {
00320         case WM_ASSOCIATED:
00321             lock = boost::unique_lock<boost::shared_mutex>( m_associatedNotifiersLock );
00322             m_associatedNotifiers.push_back( notifier );
00323             lock.unlock();
00324             break;
00325         case WM_READY:
00326             lock = boost::unique_lock<boost::shared_mutex>( m_readyNotifiersLock );
00327             m_readyNotifiers.push_back( notifier );
00328             lock.unlock();
00329             break;
00330         case WM_REMOVED:
00331             lock = boost::unique_lock<boost::shared_mutex>( m_removedNotifiersLock );
00332             m_removedNotifiers.push_back( notifier );
00333             lock.unlock();
00334             break;
00335         default:
00336             std::ostringstream s;
00337             s << "Could not subscribe to unknown signal.";
00338             throw WSignalSubscriptionFailed( s.str() );
00339             break;
00340     }
00341 }
00342 
00343 void WModuleContainer::addDefaultNotifier( MODULE_SIGNAL signal, t_ModuleErrorSignalHandlerType notifier )
00344 {
00345     boost::unique_lock<boost::shared_mutex> lock;
00346     switch( signal)
00347     {
00348         case WM_ERROR:
00349             lock = boost::unique_lock<boost::shared_mutex>( m_errorNotifiersLock );
00350             m_errorNotifiers.push_back( notifier );
00351             lock.unlock();
00352             break;
00353         default:
00354             std::ostringstream s;
00355             s << "Could not subscribe to unknown signal.";
00356             throw WSignalSubscriptionFailed( s.str() );
00357             break;
00358     }
00359 }
00360 
00361 void WModuleContainer::addDefaultNotifier( MODULE_CONNECTOR_SIGNAL signal, t_GenericSignalHandlerType notifier )
00362 {
00363     boost::unique_lock<boost::shared_mutex> lock;
00364     switch( signal)
00365     {
00366         case CONNECTION_ESTABLISHED:
00367             lock = boost::unique_lock<boost::shared_mutex>( m_connectorNotifiersLock );
00368             m_connectorEstablishedNotifiers.push_back( notifier );
00369             lock.unlock();
00370             break;
00371         case CONNECTION_CLOSED:
00372             lock = boost::unique_lock<boost::shared_mutex>( m_connectorNotifiersLock );
00373             m_connectorClosedNotifiers.push_back( notifier );
00374             lock.unlock();
00375             break;
00376         default:
00377             std::ostringstream s;
00378             s << "Could not subscribe to unknown signal.";
00379             throw WSignalSubscriptionFailed( s.str() );
00380             break;
00381     }
00382 }
00383 
00384 boost::shared_ptr< WModule > WModuleContainer::applyModule( boost::shared_ptr< WModule > applyOn, std::string what, bool tryOnly )
00385 {
00386     boost::shared_ptr< WModule >prototype = boost::shared_ptr< WModule >();
00387     if( tryOnly )
00388     {
00389         // isPrototypeAvailable returns the prototype or NULL if not found, but does not throw an exception
00390         prototype = WModuleFactory::getModuleFactory()->isPrototypeAvailable( what );
00391         if( !prototype )
00392         {
00393             return prototype;
00394         }
00395     }
00396     else
00397     {
00398         prototype = WModuleFactory::getModuleFactory()->getPrototypeByName( what );
00399     }
00400 
00401     return applyModule( applyOn, prototype );
00402 }
00403 
00404 boost::shared_ptr< WModule > WModuleContainer::applyModule( boost::shared_ptr< WModule > applyOn,
00405                                                                          boost::shared_ptr< WModule > prototype )
00406 {
00407     // is this module already associated with another container?
00408     if( applyOn->isAssociated()() && ( applyOn->getAssociatedContainer() != shared_from_this() ) )
00409     {
00410         throw WModuleAlreadyAssociated( std::string( "The specified module \"" ) + applyOn->getName() +
00411                                         std::string( "\" is associated with another container." ) );
00412     }
00413 
00414     // create a new initialized instance of the module
00415     boost::shared_ptr< WModule > m = WModuleFactory::getModuleFactory()->create( prototype );
00416 
00417     // add it
00418     add( m, true );
00419     applyOn->isReadyOrCrashed().wait();
00420     m->isReadyOrCrashed().wait();
00421 
00422     // should we ignore the crash case? In general, a crashed module can be connected. The sense or non-sense of it is questionable but assume a
00423     // crashed module has set some data on its output and some other module needs it. -> so we ignore the case of crashed modules here.
00424 
00425     // get offered outputs
00426     WModule::InputConnectorList ins = m->getInputConnectors();
00427     // get offered inputs
00428     WModule::OutputConnectorList outs = applyOn->getOutputConnectors();
00429 
00430     // connect the first connectors. For a more sophisticated way of connecting modules, use ModuleCombiners.
00431     if( !ins.empty() && !outs.empty() )
00432     {
00433         ( *ins.begin() )->connect( ( *outs.begin() ) );
00434     }
00435 
00436     return m;
00437 }
00438 
00439 WBatchLoader::SPtr WModuleContainer::loadDataSets( std::vector< std::string > filenames, bool suppressColormaps )
00440 {
00441     // create thread which actually loads the data
00442     boost::shared_ptr< WBatchLoader > t = boost::shared_ptr< WBatchLoader >( new WBatchLoader( filenames,
00443                 boost::shared_static_cast< WModuleContainer >( shared_from_this() ) )
00444     );
00445     t->setSuppressColormaps( suppressColormaps );
00446     t->run();
00447     return t;
00448 }
00449 
00450 WBatchLoader::SPtr WModuleContainer::loadDataSetsSynchronously( std::vector< std::string > filenames, bool suppressColormaps )
00451 {
00452     // create thread which actually loads the data
00453     boost::shared_ptr< WBatchLoader > t = boost::shared_ptr< WBatchLoader >( new WBatchLoader( filenames,
00454                 boost::shared_static_cast< WModuleContainer >( shared_from_this() ) )
00455     );
00456     t->setSuppressColormaps( suppressColormaps );
00457     t->run();
00458     t->wait();
00459     return t;
00460 }
00461 
00462 void WModuleContainer::addPendingThread( boost::shared_ptr< WThreadedRunner > thread )
00463 {
00464     boost::unique_lock<boost::shared_mutex> lock = boost::unique_lock<boost::shared_mutex>( m_pendingThreadsLock );
00465     m_pendingThreads.insert( thread );
00466     lock.unlock();
00467 }
00468 
00469 void WModuleContainer::finishedPendingThread( boost::shared_ptr< WThreadedRunner > thread )
00470 {
00471     boost::unique_lock<boost::shared_mutex> lock = boost::unique_lock<boost::shared_mutex>( m_pendingThreadsLock );
00472     m_pendingThreads.erase( thread );
00473     lock.unlock();
00474 }
00475 
00476 void WModuleContainer::moduleError( boost::shared_ptr< WModule > module, const WException& exception )
00477 {
00478     errorLog() << "Error in module \"" << module->getName() << "\". Forwarding to nesting container.";
00479 
00480     // simply forward it to the other signal handler
00481     signal_error( module, exception );
00482 
00483     if( m_crashIfModuleCrashes )
00484     {
00485         infoLog() << "Crash caused this container to shutdown.";
00486         requestStop();
00487         m_isCrashed( true );
00488     }
00489 }
00490 
00491 void WModuleContainer::setCrashIfModuleCrashes( bool crashIfCrashed )
00492 {
00493     m_crashIfModuleCrashes = crashIfCrashed;
00494 }
00495 
00496 WModuleContainer::ModuleSharedContainerType::ReadTicket WModuleContainer::getModules() const
00497 {
00498     return m_modules.getReadTicket();
00499 }
00500 
00501 WModuleContainer::ModuleVectorType WModuleContainer::getModules( std::string name ) const
00502 {
00503     // get the list of all first.
00504     WModuleContainer::ModuleSharedContainerType::ReadTicket lock = getModules();
00505 
00506     // put results in here
00507     WModuleContainer::ModuleVectorType result;
00508 
00509     // handle each module
00510     for( ModuleConstIterator listIter = lock->get().begin(); listIter != lock->get().end(); ++listIter )
00511     {
00512         // check name
00513         if( name == ( *listIter )->getName() )
00514         {
00515             result.push_back( ( *listIter ) );
00516         }
00517     }
00518 
00519     return result;
00520 }
00521 
00522 WCombinerTypes::WCompatiblesList WModuleContainer::getPossibleConnections( boost::shared_ptr< WModule > module )
00523 {
00524     WCombinerTypes::WCompatiblesList complist;
00525 
00526     if( !module )
00527     {
00528         // be nice in case of a null pointer
00529         return complist;
00530     }
00531 
00532     // read lock the container
00533     ModuleSharedContainerType::ReadTicket lock = m_modules.getReadTicket();
00534 
00535     // handle each module
00536     for( ModuleConstIterator listIter = lock->get().begin(); listIter != lock->get().end(); ++listIter )
00537     {
00538         WCombinerTypes::WOneToOneCombiners lComp = WApplyCombiner::createCombinerList< WApplyCombiner>( module, ( *listIter ) );
00539 
00540         if( lComp.size() != 0 )
00541         {
00542             complist.push_back( WCombinerTypes::WCompatiblesGroup( ( *listIter ), lComp ) );
00543         }
00544     }
00545 
00546     // sort the compatibles
00547     std::sort( complist.begin(), complist.end(), WCombinerTypes::compatiblesSort );
00548 
00549     return complist;
00550 }
00551