OpenWalnut  1.4.0
WModuleContainer.cpp
00001 //---------------------------------------------------------------------------
00002 //
00003 // Project: OpenWalnut ( http://www.openwalnut.org )
00004 //
00005 // Copyright 2009 OpenWalnut Community, BSV@Uni-Leipzig and CNCF@MPI-CBS
00006 // For more information see http://www.openwalnut.org/copying
00007 //
00008 // This file is part of OpenWalnut.
00009 //
00010 // OpenWalnut is free software: you can redistribute it and/or modify
00011 // it under the terms of the GNU Lesser General Public License as published by
00012 // the Free Software Foundation, either version 3 of the License, or
00013 // (at your option) any later version.
00014 //
00015 // OpenWalnut is distributed in the hope that it will be useful,
00016 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00017 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00018 // GNU Lesser General Public License for more details.
00019 //
00020 // You should have received a copy of the GNU Lesser General Public License
00021 // along with OpenWalnut. If not, see <http://www.gnu.org/licenses/>.
00022 //
00023 //---------------------------------------------------------------------------
00024 
00025 #include <list>
00026 #include <set>
00027 #include <vector>
00028 #include <string>
00029 #include <sstream>
00030 #include <algorithm>
00031 #include <utility>
00032 
00033 #include "../common/WLogger.h"
00034 #include "../common/WThreadedRunner.h"
00035 #include "../common/exceptions/WSignalSubscriptionFailed.h"
00036 #include "WBatchLoader.h"
00037 #include "WModuleCombiner.h"
00038 #include "WModuleFactory.h"
00039 #include "WModuleInputConnector.h"
00040 #include "WModuleOutputConnector.h"
00041 #include "WModuleTypes.h"
00042 #include "combiner/WApplyCombiner.h"
00043 #include "exceptions/WModuleAlreadyAssociated.h"
00044 #include "exceptions/WModuleUninitialized.h"
00045 #include "WDataModule.h"
00046 
00047 #include "WModuleContainer.h"
00048 
00049 WModuleContainer::WModuleContainer( std::string name, std::string description ):
00050     WModule(),
00051     m_name( name ),
00052     m_description( description ),
00053     m_crashIfModuleCrashes( true )
00054 {
00055     WLogger::getLogger()->addLogMessage( "Constructing module container." , "ModuleContainer (" + getName() + ")", LL_DEBUG );
00056     // initialize members
00057 }
00058 
00059 WModuleContainer::~WModuleContainer()
00060 {
00061     // cleanup
00062 }
00063 
00064 void WModuleContainer::moduleMain()
00065 {
00066     // do nothing here. The WModule class enforces us to overwrite this method here, but we do not need it.
00067     // Only set the ready flag.
00068     ready();
00069 }
00070 
00071 boost::shared_ptr< WModule > WModuleContainer::factory() const
00072 {
00073     // this factory is not used actually.
00074     return boost::shared_ptr< WModule >( new WModuleContainer( getName(), getDescription() ) );
00075 }
00076 
00077 void WModuleContainer::add( boost::shared_ptr< WModule > module, bool run )
00078 {
00079     if( !module )
00080     {
00081         // just ignore NULL Pointer
00082         return;
00083     }
00084 
00085     WLogger::getLogger()->addLogMessage( "Adding module \"" + module->getName() + "\" to container." ,
00086             "ModuleContainer (" + getName() + ")", LL_INFO );
00087 
00088     if( !module->isInitialized()() )
00089     {
00090         std::ostringstream s;
00091         s << "Could not add module \"" << module->getName() << "\" to container \"" + getName() + "\". Reason: module not initialized.";
00092 
00093         throw WModuleUninitialized( s.str() );
00094     }
00095 
00096     // already associated with this container?
00097     if( module->getAssociatedContainer() == shared_from_this() )
00098     {
00099         WLogger::getLogger()->addLogMessage( "Adding module \"" + module->getName() + "\" to container not needed. Its already inside." ,
00100             "ModuleContainer (" + getName() + ")", LL_INFO );
00101         return;
00102     }
00103 
00104     // is this module already associated?
00105     if( module->isAssociated()() )
00106     {
00107         module->getAssociatedContainer()->remove( module );
00108     }
00109 
00110     // get write lock
00111     ModuleSharedContainerType::WriteTicket wlock = m_modules.getWriteTicket();
00112     wlock->get().insert( module );
00113     wlock.reset();
00114 
00115     module->setAssociatedContainer( boost::static_pointer_cast< WModuleContainer >( shared_from_this() ) );
00116     WLogger::getLogger()->addLogMessage( "Associated module \"" + module->getName() + "\" with container." , "ModuleContainer (" + getName() + ")",
00117             LL_INFO );
00118 
00119     // now module->isUsable() is true
00120 
00121     // Connect the error handler and all default handlers:
00122     ModuleSubscriptionsSharedType::WriteTicket subscriptionsLock = m_moduleSubscriptions.getWriteTicket();
00123 
00124     // connect the containers signal handler explicitly
00125     t_ModuleErrorSignalHandlerType func = boost::bind( &WModuleContainer::moduleError, this, _1, _2 );
00126     boost::signals2::connection signalCon = module->subscribeSignal( WM_ERROR, func );
00127     subscriptionsLock->get().insert( ModuleSubscription( module, signalCon ) );
00128 
00129     // connect default notifiers:
00130     boost::shared_lock<boost::shared_mutex> slock = boost::shared_lock<boost::shared_mutex>( m_errorNotifiersLock );
00131     for( std::list< t_ModuleErrorSignalHandlerType >::iterator iter = m_errorNotifiers.begin(); iter != m_errorNotifiers.end(); ++iter)
00132     {
00133         signalCon = module->subscribeSignal( WM_ERROR, ( *iter ) );
00134         subscriptionsLock->get().insert( ModuleSubscription( module, signalCon ) );
00135     }
00136     slock = boost::shared_lock<boost::shared_mutex>( m_associatedNotifiersLock );
00137     for( std::list< t_ModuleGenericSignalHandlerType >::iterator iter = m_associatedNotifiers.begin(); iter != m_associatedNotifiers.end(); ++iter)
00138     {
00139         // call associated notifier
00140         ( *iter )( module );
00141     }
00142     slock = boost::shared_lock<boost::shared_mutex>( m_connectorNotifiersLock );
00143     for( std::list< t_GenericSignalHandlerType >::iterator iter = m_connectorEstablishedNotifiers.begin();
00144                                                             iter != m_connectorEstablishedNotifiers.end(); ++iter )
00145     {
00146         // subscribe on each input
00147         for( InputConnectorList::const_iterator ins = module->getInputConnectors().begin(); ins != module->getInputConnectors().end(); ++ins )
00148         {
00149             signalCon = ( *ins )->subscribeSignal( CONNECTION_ESTABLISHED, ( *iter ) );
00150             subscriptionsLock->get().insert( ModuleSubscription( module, signalCon ) );
00151         }
00152     }
00153     for( std::list< t_GenericSignalHandlerType >::iterator iter = m_connectorClosedNotifiers.begin();
00154                                                             iter != m_connectorClosedNotifiers.end(); ++iter )
00155     {
00156         // subscribe on each input
00157         for( InputConnectorList::const_iterator ins = module->getInputConnectors().begin(); ins != module->getInputConnectors().end(); ++ins )
00158         {
00159             signalCon = ( *ins )->subscribeSignal( CONNECTION_CLOSED, ( *iter ) );
00160             subscriptionsLock->get().insert( ModuleSubscription( module, signalCon ) );
00161         }
00162     }
00163     slock = boost::shared_lock<boost::shared_mutex>( m_readyNotifiersLock );
00164     for( std::list< t_ModuleGenericSignalHandlerType >::iterator iter = m_readyNotifiers.begin(); iter != m_readyNotifiers.end(); ++iter)
00165     {
00166         signalCon = module->subscribeSignal( WM_READY, ( *iter ) );
00167         subscriptionsLock->get().insert( ModuleSubscription( module, signalCon ) );
00168     }
00169     slock.unlock();
00170 
00171     // free the subscriptions lock
00172     subscriptionsLock.reset();
00173 
00174     // add the modules progress to local progress combiner
00175     m_progress->addSubProgress( module->getRootProgressCombiner() );
00176 
00177     // run it
00178     if( run )
00179     {
00180         module->run();
00181     }
00182 }
00183 
00184 WModule::SPtr WModuleContainer::createAndAdd( std::string name )
00185 {
00186     WModule::SPtr module = WModuleFactory::getModuleFactory()->create(
00187         WModuleFactory::getModuleFactory()->getPrototypeByName( name )
00188     );
00189 
00190     // add to the container
00191     add( module );
00192     module->isReady().wait();
00193 
00194     return module;
00195 }
00196 
00197 void WModuleContainer::remove( boost::shared_ptr< WModule > module )
00198 {
00199     // simple flat removal.
00200 
00201     WLogger::getLogger()->addLogMessage( "Removing module \"" + module->getName() + "\" from container." , "ModuleContainer (" + getName() + ")",
00202             LL_DEBUG );
00203 
00204     if( module->getAssociatedContainer() != shared_from_this() )
00205     {
00206         return;
00207     }
00208 
00209     // remove connections inside this container
00210     module->disconnect();
00211 
00212     // remove progress combiner
00213     m_progress->removeSubProgress( module->getRootProgressCombiner() );
00214 
00215     // remove signal subscriptions to this containers default notifiers
00216     ModuleSubscriptionsSharedType::WriteTicket subscriptionsLock = m_moduleSubscriptions.getWriteTicket();
00217 
00218     // find all subscriptions for this module
00219     std::pair< ModuleSubscriptionsIterator, ModuleSubscriptionsIterator > subscriptions = subscriptionsLock->get().equal_range( module );
00220     for( ModuleSubscriptionsIterator it = subscriptions.first; it != subscriptions.second; ++it )
00221     {
00222         // disconnect subscription.
00223         ( *it ).second.disconnect();
00224     }
00225     // erase them
00226     subscriptionsLock->get().erase( subscriptions.first, subscriptions.second );
00227     subscriptionsLock.reset();
00228 
00229     // get write lock
00230     ModuleSharedContainerType::WriteTicket wlock = m_modules.getWriteTicket();
00231     wlock->get().erase( module );
00232     wlock.reset();
00233 
00234     module->setAssociatedContainer( boost::shared_ptr< WModuleContainer >() );
00235 
00236     // tell all interested about removal
00237     boost::shared_lock<boost::shared_mutex> slock = boost::shared_lock<boost::shared_mutex>( m_removedNotifiersLock );
00238     for( std::list< t_ModuleGenericSignalHandlerType >::iterator iter = m_removedNotifiers.begin(); iter != m_removedNotifiers.end(); ++iter)
00239     {
00240         // call associated notifier
00241         ( *iter )( module );
00242     }
00243     slock.unlock();
00244 }
00245 
00246 WModuleContainer::DataModuleListType WModuleContainer::getDataModules()
00247 {
00248     DataModuleListType l;
00249 
00250     // lock, unlocked if l looses focus
00251     ModuleSharedContainerType::ReadTicket lock = m_modules.getReadTicket();
00252 
00253     // iterate module list
00254     for( ModuleConstIterator iter = lock->get().begin(); iter != lock->get().end(); ++iter )
00255     {
00256         // is this module a data module?
00257         if( ( *iter )->getType() == MODULE_DATA )
00258         {
00259             boost::shared_ptr< WDataModule > dm = boost::static_pointer_cast< WDataModule >( *iter );
00260 
00261             // now check the contained dataset ( isTexture and whether it is ready )
00262             if( dm->isReady()() )
00263             {
00264                 l.insert( dm );
00265             }
00266         }
00267     }
00268 
00269     return l;
00270 }
00271 
00272 void WModuleContainer::removeAll()
00273 {
00274     const size_t nonZero = 1;
00275     size_t numberOfModules = nonZero;
00276     std::vector< boost::shared_ptr< WModule > > modulesToRemove;
00277 
00278     while( numberOfModules != 0 )
00279     {
00280         modulesToRemove.clear();
00281         // get names of all remaining modules to try to remove them
00282         {
00283             WModuleContainer::ModuleSharedContainerType::ReadTicket lock = getModules();
00284             numberOfModules = lock->get().size();
00285             for( ModuleConstIterator listIter = lock->get().begin(); listIter != lock->get().end(); ++listIter )
00286             {
00287                 modulesToRemove.push_back( *listIter );
00288             }
00289         }
00290         for( std::vector<  boost::shared_ptr< WModule > >::iterator nameIter = modulesToRemove.begin();
00291              nameIter != modulesToRemove.end();
00292              ++nameIter )
00293         {
00294             remove( *nameIter );
00295         }
00296     }
00297 }
00298 
00299 void WModuleContainer::stop()
00300 {
00301     WLogger::getLogger()->addLogMessage( "Stopping pending threads." , "ModuleContainer (" + getName() + ")", LL_INFO );
00302 
00303     // read lock
00304     boost::shared_lock<boost::shared_mutex> slock = boost::shared_lock<boost::shared_mutex>( m_pendingThreadsLock );
00305     for( std::set< boost::shared_ptr< WThreadedRunner > >::iterator listIter = m_pendingThreads.begin(); listIter != m_pendingThreads.end();
00306             ++listIter )
00307     {
00308         ( *listIter )->wait( true );
00309     }
00310     slock.unlock();
00311 
00312     WLogger::getLogger()->addLogMessage( "Stopping modules." , "ModuleContainer (" + getName() + ")", LL_INFO );
00313 
00314     // lock, unlocked if l looses focus
00315     ModuleSharedContainerType::ReadTicket lock = m_modules.getReadTicket();
00316 
00317     for( ModuleConstIterator listIter = lock->get().begin(); listIter != lock->get().end(); ++listIter )
00318     {
00319         WLogger::getLogger()->addLogMessage( "Waiting for module \"" + ( *listIter )->getName() + "\" to finish." ,
00320                 "ModuleContainer (" + getName() + ")", LL_INFO );
00321         ( *listIter )->wait( true );
00322         ( *listIter )->setAssociatedContainer( boost::shared_ptr< WModuleContainer >() );   // remove last refs to this container inside the module
00323     }
00324     lock.reset();
00325 
00326     // get write lock
00327     // lock, unlocked if l looses focus
00328     ModuleSharedContainerType::WriteTicket wlock = m_modules.getWriteTicket();
00329     wlock->get().clear();
00330 }
00331 
00332 const std::string WModuleContainer::getName() const
00333 {
00334     return m_name;
00335 }
00336 
00337 const std::string WModuleContainer::getDescription() const
00338 {
00339     return m_description;
00340 }
00341 
00342 void WModuleContainer::addDefaultNotifier( MODULE_SIGNAL signal, t_ModuleGenericSignalHandlerType notifier )
00343 {
00344     boost::unique_lock<boost::shared_mutex> lock;
00345     switch( signal)
00346     {
00347         case WM_ASSOCIATED:
00348             lock = boost::unique_lock<boost::shared_mutex>( m_associatedNotifiersLock );
00349             m_associatedNotifiers.push_back( notifier );
00350             lock.unlock();
00351             break;
00352         case WM_READY:
00353             lock = boost::unique_lock<boost::shared_mutex>( m_readyNotifiersLock );
00354             m_readyNotifiers.push_back( notifier );
00355             lock.unlock();
00356             break;
00357         case WM_REMOVED:
00358             lock = boost::unique_lock<boost::shared_mutex>( m_removedNotifiersLock );
00359             m_removedNotifiers.push_back( notifier );
00360             lock.unlock();
00361             break;
00362         default:
00363             std::ostringstream s;
00364             s << "Could not subscribe to unknown signal.";
00365             throw WSignalSubscriptionFailed( s.str() );
00366             break;
00367     }
00368 }
00369 
00370 void WModuleContainer::addDefaultNotifier( MODULE_SIGNAL signal, t_ModuleErrorSignalHandlerType notifier )
00371 {
00372     boost::unique_lock<boost::shared_mutex> lock;
00373     switch( signal)
00374     {
00375         case WM_ERROR:
00376             lock = boost::unique_lock<boost::shared_mutex>( m_errorNotifiersLock );
00377             m_errorNotifiers.push_back( notifier );
00378             lock.unlock();
00379             break;
00380         default:
00381             std::ostringstream s;
00382             s << "Could not subscribe to unknown signal.";
00383             throw WSignalSubscriptionFailed( s.str() );
00384             break;
00385     }
00386 }
00387 
00388 void WModuleContainer::addDefaultNotifier( MODULE_CONNECTOR_SIGNAL signal, t_GenericSignalHandlerType notifier )
00389 {
00390     boost::unique_lock<boost::shared_mutex> lock;
00391     switch( signal)
00392     {
00393         case CONNECTION_ESTABLISHED:
00394             lock = boost::unique_lock<boost::shared_mutex>( m_connectorNotifiersLock );
00395             m_connectorEstablishedNotifiers.push_back( notifier );
00396             lock.unlock();
00397             break;
00398         case CONNECTION_CLOSED:
00399             lock = boost::unique_lock<boost::shared_mutex>( m_connectorNotifiersLock );
00400             m_connectorClosedNotifiers.push_back( notifier );
00401             lock.unlock();
00402             break;
00403         default:
00404             std::ostringstream s;
00405             s << "Could not subscribe to unknown signal.";
00406             throw WSignalSubscriptionFailed( s.str() );
00407             break;
00408     }
00409 }
00410 
00411 boost::shared_ptr< WModule > WModuleContainer::applyModule( boost::shared_ptr< WModule > applyOn, std::string what, bool tryOnly )
00412 {
00413     boost::shared_ptr< WModule >prototype = boost::shared_ptr< WModule >();
00414     if( tryOnly )
00415     {
00416         // isPrototypeAvailable returns the prototype or NULL if not found, but does not throw an exception
00417         prototype = WModuleFactory::getModuleFactory()->isPrototypeAvailable( what );
00418         if( !prototype )
00419         {
00420             return prototype;
00421         }
00422     }
00423     else
00424     {
00425         prototype = WModuleFactory::getModuleFactory()->getPrototypeByName( what );
00426     }
00427 
00428     return applyModule( applyOn, prototype );
00429 }
00430 
00431 boost::shared_ptr< WModule > WModuleContainer::applyModule( boost::shared_ptr< WModule > applyOn,
00432                                                                          boost::shared_ptr< WModule > prototype )
00433 {
00434     // is this module already associated with another container?
00435     if( applyOn->isAssociated()() && ( applyOn->getAssociatedContainer() != shared_from_this() ) )
00436     {
00437         throw WModuleAlreadyAssociated( std::string( "The specified module \"" ) + applyOn->getName() +
00438                                         std::string( "\" is associated with another container." ) );
00439     }
00440 
00441     // create a new initialized instance of the module
00442     boost::shared_ptr< WModule > m = WModuleFactory::getModuleFactory()->create( prototype );
00443 
00444     // add it
00445     add( m, true );
00446     applyOn->isReadyOrCrashed().wait();
00447     m->isReadyOrCrashed().wait();
00448 
00449     // should we ignore the crash case? In general, a crashed module can be connected. The sense or non-sense of it is questionable but assume a
00450     // crashed module has set some data on its output and some other module needs it. -> so we ignore the case of crashed modules here.
00451 
00452     // get offered outputs
00453     WModule::InputConnectorList ins = m->getInputConnectors();
00454     // get offered inputs
00455     WModule::OutputConnectorList outs = applyOn->getOutputConnectors();
00456 
00457     // connect the first connectors. For a more sophisticated way of connecting modules, use ModuleCombiners.
00458     if( !ins.empty() && !outs.empty() )
00459     {
00460         ( *ins.begin() )->connect( ( *outs.begin() ) );
00461     }
00462 
00463     return m;
00464 }
00465 
00466 WBatchLoader::SPtr WModuleContainer::loadDataSets( std::vector< std::string > filenames, bool suppressColormaps )
00467 {
00468     // create thread which actually loads the data
00469     boost::shared_ptr< WBatchLoader > t( new WBatchLoader( filenames, boost::static_pointer_cast< WModuleContainer >( shared_from_this() ) ) );
00470     t->setSuppressColormaps( suppressColormaps );
00471     t->run();
00472     return t;
00473 }
00474 
00475 WBatchLoader::SPtr WModuleContainer::loadDataSetsSynchronously( std::vector< std::string > filenames, bool suppressColormaps )
00476 {
00477     // create thread which actually loads the data
00478     boost::shared_ptr< WBatchLoader > t( new WBatchLoader( filenames, boost::static_pointer_cast< WModuleContainer >( shared_from_this() ) ) );
00479     t->setSuppressColormaps( suppressColormaps );
00480     t->run();
00481     t->wait();
00482     return t;
00483 }
00484 
00485 void WModuleContainer::addPendingThread( boost::shared_ptr< WThreadedRunner > thread )
00486 {
00487     boost::unique_lock<boost::shared_mutex> lock = boost::unique_lock<boost::shared_mutex>( m_pendingThreadsLock );
00488     m_pendingThreads.insert( thread );
00489     lock.unlock();
00490 }
00491 
00492 void WModuleContainer::finishedPendingThread( boost::shared_ptr< WThreadedRunner > thread )
00493 {
00494     boost::unique_lock<boost::shared_mutex> lock = boost::unique_lock<boost::shared_mutex>( m_pendingThreadsLock );
00495     m_pendingThreads.erase( thread );
00496     lock.unlock();
00497 }
00498 
00499 void WModuleContainer::moduleError( boost::shared_ptr< WModule > module, const WException& exception )
00500 {
00501     errorLog() << "Error in module \"" << module->getName() << "\". Forwarding to nesting container.";
00502 
00503     // simply forward it to the other signal handler
00504     signal_error( module, exception );
00505 
00506     if( m_crashIfModuleCrashes )
00507     {
00508         infoLog() << "Crash caused this container to shutdown.";
00509         requestStop();
00510         m_isCrashed( true );
00511     }
00512 }
00513 
00514 void WModuleContainer::setCrashIfModuleCrashes( bool crashIfCrashed )
00515 {
00516     m_crashIfModuleCrashes = crashIfCrashed;
00517 }
00518 
00519 WModuleContainer::ModuleSharedContainerType::ReadTicket WModuleContainer::getModules() const
00520 {
00521     return m_modules.getReadTicket();
00522 }
00523 
00524 WModuleContainer::ModuleVectorType WModuleContainer::getModules( std::string name ) const
00525 {
00526     // get the list of all first.
00527     WModuleContainer::ModuleSharedContainerType::ReadTicket lock = getModules();
00528 
00529     // put results in here
00530     WModuleContainer::ModuleVectorType result;
00531 
00532     // handle each module
00533     for( ModuleConstIterator listIter = lock->get().begin(); listIter != lock->get().end(); ++listIter )
00534     {
00535         // check name
00536         if( name == ( *listIter )->getName() )
00537         {
00538             result.push_back( ( *listIter ) );
00539         }
00540     }
00541 
00542     return result;
00543 }
00544 
00545 WCombinerTypes::WCompatiblesList WModuleContainer::getPossibleConnections( boost::shared_ptr< WModule > module )
00546 {
00547     WCombinerTypes::WCompatiblesList complist;
00548 
00549     if( !module )
00550     {
00551         // be nice in case of a null pointer
00552         return complist;
00553     }
00554 
00555     // read lock the container
00556     ModuleSharedContainerType::ReadTicket lock = m_modules.getReadTicket();
00557 
00558     // handle each module
00559     for( ModuleConstIterator listIter = lock->get().begin(); listIter != lock->get().end(); ++listIter )
00560     {
00561         WCombinerTypes::WOneToOneCombiners lComp = WApplyCombiner::createCombinerList< WApplyCombiner>( module, ( *listIter ) );
00562 
00563         if( lComp.size() != 0 )
00564         {
00565             complist.push_back( WCombinerTypes::WCompatiblesGroup( ( *listIter ), lComp ) );
00566         }
00567     }
00568 
00569     // sort the compatibles
00570     std::sort( complist.begin(), complist.end(), WCombinerTypes::compatiblesSort );
00571 
00572     return complist;
00573 }
00574