OpenWalnut 1.2.5

WModuleContainer.cpp

00001 //---------------------------------------------------------------------------
00002 //
00003 // Project: OpenWalnut ( http://www.openwalnut.org )
00004 //
00005 // Copyright 2009 OpenWalnut Community, BSV@Uni-Leipzig and CNCF@MPI-CBS
00006 // For more information see http://www.openwalnut.org/copying
00007 //
00008 // This file is part of OpenWalnut.
00009 //
00010 // OpenWalnut is free software: you can redistribute it and/or modify
00011 // it under the terms of the GNU Lesser General Public License as published by
00012 // the Free Software Foundation, either version 3 of the License, or
00013 // (at your option) any later version.
00014 //
00015 // OpenWalnut is distributed in the hope that it will be useful,
00016 // but WITHOUT ANY WARRANTY; without even the implied warranty of
00017 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00018 // GNU Lesser General Public License for more details.
00019 //
00020 // You should have received a copy of the GNU Lesser General Public License
00021 // along with OpenWalnut. If not, see <http://www.gnu.org/licenses/>.
00022 //
00023 //---------------------------------------------------------------------------
00024 
00025 #include <list>
00026 #include <set>
00027 #include <vector>
00028 #include <string>
00029 #include <sstream>
00030 #include <algorithm>
00031 #include <utility>
00032 
00033 #include "../common/WLogger.h"
00034 #include "../common/WThreadedRunner.h"
00035 #include "WBatchLoader.h"
00036 #include "WKernel.h"
00037 #include "WModule.h"
00038 #include "WModuleCombiner.h"
00039 #include "WModuleFactory.h"
00040 #include "WModuleInputConnector.h"
00041 #include "WModuleOutputConnector.h"
00042 #include "WModuleTypes.h"
00043 #include "combiner/WApplyCombiner.h"
00044 #include "exceptions/WModuleAlreadyAssociated.h"
00045 #include "exceptions/WModuleSignalSubscriptionFailed.h"
00046 #include "exceptions/WModuleUninitialized.h"
00047 #include "WDataModule.h"
00048 
00049 #include "WModuleContainer.h"
00050 
00051 WModuleContainer::WModuleContainer( std::string name, std::string description ):
00052     WModule(),
00053     m_name( name ),
00054     m_description( description ),
00055     m_crashIfModuleCrashes( true )
00056 {
00057     WLogger::getLogger()->addLogMessage( "Constructing module container." , "ModuleContainer (" + getName() + ")", LL_DEBUG );
00058     // initialize members
00059 }
00060 
00061 WModuleContainer::~WModuleContainer()
00062 {
00063     // cleanup
00064 }
00065 
00066 void WModuleContainer::moduleMain()
00067 {
00068     // do nothing here. The WModule class enforces us to overwrite this method here, but we do not need it.
00069     // Only set the ready flag.
00070     ready();
00071 }
00072 
00073 boost::shared_ptr< WModule > WModuleContainer::factory() const
00074 {
00075     // this factory is not used actually.
00076     return boost::shared_ptr< WModule >( new WModuleContainer( getName(), getDescription() ) );
00077 }
00078 
00079 void WModuleContainer::add( boost::shared_ptr< WModule > module, bool run )
00080 {
00081     if( !module )
00082     {
00083         // just ignore NULL Pointer
00084         return;
00085     }
00086 
00087     WLogger::getLogger()->addLogMessage( "Adding module \"" + module->getName() + "\" to container." ,
00088             "ModuleContainer (" + getName() + ")", LL_INFO );
00089 
00090     if( !module->isInitialized()() )
00091     {
00092         std::ostringstream s;
00093         s << "Could not add module \"" << module->getName() << "\" to container \"" + getName() + "\". Reason: module not initialized.";
00094 
00095         throw WModuleUninitialized( s.str() );
00096     }
00097 
00098     // already associated with this container?
00099     if( module->getAssociatedContainer() == shared_from_this() )
00100     {
00101         WLogger::getLogger()->addLogMessage( "Adding module \"" + module->getName() + "\" to container not needed. Its already inside." ,
00102             "ModuleContainer (" + getName() + ")", LL_INFO );
00103         return;
00104     }
00105 
00106     // is this module already associated?
00107     if( module->isAssociated()() )
00108     {
00109         module->getAssociatedContainer()->remove( module );
00110     }
00111 
00112     // get write lock
00113     ModuleSharedContainerType::WriteTicket wlock = m_modules.getWriteTicket();
00114     wlock->get().insert( module );
00115     wlock.reset();
00116 
00117     module->setAssociatedContainer( boost::shared_static_cast< WModuleContainer >( shared_from_this() ) );
00118     WLogger::getLogger()->addLogMessage( "Associated module \"" + module->getName() + "\" with container." , "ModuleContainer (" + getName() + ")",
00119             LL_INFO );
00120 
00121     // now module->isUsable() is true
00122 
00123     // Connect the error handler and all default handlers:
00124     ModuleSubscriptionsSharedType::WriteTicket subscriptionsLock = m_moduleSubscriptions.getWriteTicket();
00125 
00126     // connect the containers signal handler explicitly
00127     t_ModuleErrorSignalHandlerType func = boost::bind( &WModuleContainer::moduleError, this, _1, _2 );
00128     boost::signals2::connection signalCon = module->subscribeSignal( WM_ERROR, func );
00129     subscriptionsLock->get().insert( ModuleSubscription( module, signalCon ) );
00130 
00131     // connect default notifiers:
00132     boost::shared_lock<boost::shared_mutex> slock = boost::shared_lock<boost::shared_mutex>( m_errorNotifiersLock );
00133     for( std::list< t_ModuleErrorSignalHandlerType >::iterator iter = m_errorNotifiers.begin(); iter != m_errorNotifiers.end(); ++iter)
00134     {
00135         signalCon = module->subscribeSignal( WM_ERROR, ( *iter ) );
00136         subscriptionsLock->get().insert( ModuleSubscription( module, signalCon ) );
00137     }
00138     slock = boost::shared_lock<boost::shared_mutex>( m_associatedNotifiersLock );
00139     for( std::list< t_ModuleGenericSignalHandlerType >::iterator iter = m_associatedNotifiers.begin(); iter != m_associatedNotifiers.end(); ++iter)
00140     {
00141         // call associated notifier
00142         ( *iter )( module );
00143     }
00144     slock = boost::shared_lock<boost::shared_mutex>( m_connectorNotifiersLock );
00145     for( std::list< t_GenericSignalHandlerType >::iterator iter = m_connectorEstablishedNotifiers.begin();
00146                                                             iter != m_connectorEstablishedNotifiers.end(); ++iter )
00147     {
00148         // subscribe on each input
00149         for( InputConnectorList::const_iterator ins = module->getInputConnectors().begin(); ins != module->getInputConnectors().end(); ++ins )
00150         {
00151             signalCon = ( *ins )->subscribeSignal( CONNECTION_ESTABLISHED, ( *iter ) );
00152             subscriptionsLock->get().insert( ModuleSubscription( module, signalCon ) );
00153         }
00154     }
00155     for( std::list< t_GenericSignalHandlerType >::iterator iter = m_connectorClosedNotifiers.begin();
00156                                                             iter != m_connectorClosedNotifiers.end(); ++iter )
00157     {
00158         // subscribe on each input
00159         for( InputConnectorList::const_iterator ins = module->getInputConnectors().begin(); ins != module->getInputConnectors().end(); ++ins )
00160         {
00161             signalCon = ( *ins )->subscribeSignal( CONNECTION_CLOSED, ( *iter ) );
00162             subscriptionsLock->get().insert( ModuleSubscription( module, signalCon ) );
00163         }
00164     }
00165     slock = boost::shared_lock<boost::shared_mutex>( m_readyNotifiersLock );
00166     for( std::list< t_ModuleGenericSignalHandlerType >::iterator iter = m_readyNotifiers.begin(); iter != m_readyNotifiers.end(); ++iter)
00167     {
00168         signalCon = module->subscribeSignal( WM_READY, ( *iter ) );
00169         subscriptionsLock->get().insert( ModuleSubscription( module, signalCon ) );
00170     }
00171     slock.unlock();
00172 
00173     // free the subscriptions lock
00174     subscriptionsLock.reset();
00175 
00176     // add the modules progress to local progress combiner
00177     m_progress->addSubProgress( module->getRootProgressCombiner() );
00178 
00179     // run it
00180     if( run )
00181     {
00182         module->run();
00183     }
00184 }
00185 
00186 void WModuleContainer::remove( boost::shared_ptr< WModule > module )
00187 {
00188     // simple flat removal.
00189 
00190     WLogger::getLogger()->addLogMessage( "Removing module \"" + module->getName() + "\" from container." , "ModuleContainer (" + getName() + ")",
00191             LL_DEBUG );
00192 
00193     if( module->getAssociatedContainer() != shared_from_this() )
00194     {
00195         return;
00196     }
00197 
00198     // remove connections inside this container
00199     module->disconnect();
00200 
00201     // remove progress combiner
00202     m_progress->removeSubProgress( module->getRootProgressCombiner() );
00203 
00204     // remove signal subscriptions to this containers default notifiers
00205     ModuleSubscriptionsSharedType::WriteTicket subscriptionsLock = m_moduleSubscriptions.getWriteTicket();
00206 
00207     // find all subscriptions for this module
00208     std::pair< ModuleSubscriptionsIterator, ModuleSubscriptionsIterator > subscriptions = subscriptionsLock->get().equal_range( module );
00209     for( ModuleSubscriptionsIterator it = subscriptions.first; it != subscriptions.second; ++it )
00210     {
00211         // disconnect subscription.
00212         ( *it ).second.disconnect();
00213     }
00214     // erase them
00215     subscriptionsLock->get().erase( subscriptions.first, subscriptions.second );
00216     subscriptionsLock.reset();
00217 
00218     // get write lock
00219     ModuleSharedContainerType::WriteTicket wlock = m_modules.getWriteTicket();
00220     wlock->get().erase( module );
00221     wlock.reset();
00222 
00223     module->setAssociatedContainer( boost::shared_ptr< WModuleContainer >() );
00224 
00225     // tell all interested about removal
00226     boost::shared_lock<boost::shared_mutex> slock = boost::shared_lock<boost::shared_mutex>( m_removedNotifiersLock );
00227     for( std::list< t_ModuleGenericSignalHandlerType >::iterator iter = m_removedNotifiers.begin(); iter != m_removedNotifiers.end(); ++iter)
00228     {
00229         // call associated notifier
00230         ( *iter )( module );
00231     }
00232     slock.unlock();
00233 }
00234 
00235 void WModuleContainer::removeDeep( boost::shared_ptr< WModule > module )
00236 {
00237     WLogger::getLogger()->addLogMessage( "Deep removal of modules is not yet implemented.", "ModuleContainer (" + getName() + ")", LL_WARNING );
00238 
00239     // at least, remove the module itself
00240     remove( module );
00241 }
00242 
00243 WModuleContainer::DataModuleListType WModuleContainer::getDataModules()
00244 {
00245     DataModuleListType l;
00246 
00247     // lock, unlocked if l looses focus
00248     ModuleSharedContainerType::ReadTicket lock = m_modules.getReadTicket();
00249 
00250     // iterate module list
00251     for( ModuleConstIterator iter = lock->get().begin(); iter != lock->get().end(); ++iter )
00252     {
00253         // is this module a data module?
00254         if( ( *iter )->getType() == MODULE_DATA )
00255         {
00256             boost::shared_ptr< WDataModule > dm = boost::shared_static_cast< WDataModule >( *iter );
00257 
00258             // now check the contained dataset ( isTexture and whether it is ready )
00259             if( dm->isReady()() )
00260             {
00261                 l.insert( dm );
00262             }
00263         }
00264     }
00265 
00266     return l;
00267 }
00268 
00269 void WModuleContainer::stop()
00270 {
00271     WLogger::getLogger()->addLogMessage( "Stopping pending threads." , "ModuleContainer (" + getName() + ")", LL_INFO );
00272 
00273     // read lock
00274     boost::shared_lock<boost::shared_mutex> slock = boost::shared_lock<boost::shared_mutex>( m_pendingThreadsLock );
00275     for( std::set< boost::shared_ptr< WThreadedRunner > >::iterator listIter = m_pendingThreads.begin(); listIter != m_pendingThreads.end();
00276             ++listIter )
00277     {
00278         ( *listIter )->wait( true );
00279     }
00280     slock.unlock();
00281 
00282     WLogger::getLogger()->addLogMessage( "Stopping modules." , "ModuleContainer (" + getName() + ")", LL_INFO );
00283 
00284     // lock, unlocked if l looses focus
00285     ModuleSharedContainerType::ReadTicket lock = m_modules.getReadTicket();
00286 
00287     for( ModuleConstIterator listIter = lock->get().begin(); listIter != lock->get().end(); ++listIter )
00288     {
00289         WLogger::getLogger()->addLogMessage( "Waiting for module \"" + ( *listIter )->getName() + "\" to finish." ,
00290                 "ModuleContainer (" + getName() + ")", LL_INFO );
00291         ( *listIter )->wait( true );
00292         ( *listIter )->setAssociatedContainer( boost::shared_ptr< WModuleContainer >() );   // remove last refs to this container inside the module
00293     }
00294     lock.reset();
00295 
00296     // get write lock
00297     // lock, unlocked if l looses focus
00298     ModuleSharedContainerType::WriteTicket wlock = m_modules.getWriteTicket();
00299     wlock->get().clear();
00300 }
00301 
00302 const std::string WModuleContainer::getName() const
00303 {
00304     return m_name;
00305 }
00306 
00307 const std::string WModuleContainer::getDescription() const
00308 {
00309     return m_description;
00310 }
00311 
00312 void WModuleContainer::addDefaultNotifier( MODULE_SIGNAL signal, t_ModuleGenericSignalHandlerType notifier )
00313 {
00314     boost::unique_lock<boost::shared_mutex> lock;
00315     switch (signal)
00316     {
00317         case WM_ASSOCIATED:
00318             lock = boost::unique_lock<boost::shared_mutex>( m_associatedNotifiersLock );
00319             m_associatedNotifiers.push_back( notifier );
00320             lock.unlock();
00321             break;
00322         case WM_READY:
00323             lock = boost::unique_lock<boost::shared_mutex>( m_readyNotifiersLock );
00324             m_readyNotifiers.push_back( notifier );
00325             lock.unlock();
00326             break;
00327         case WM_REMOVED:
00328             lock = boost::unique_lock<boost::shared_mutex>( m_removedNotifiersLock );
00329             m_removedNotifiers.push_back( notifier );
00330             lock.unlock();
00331             break;
00332         default:
00333             std::ostringstream s;
00334             s << "Could not subscribe to unknown signal.";
00335             throw WModuleSignalSubscriptionFailed( s.str() );
00336             break;
00337     }
00338 }
00339 
00340 void WModuleContainer::addDefaultNotifier( MODULE_SIGNAL signal, t_ModuleErrorSignalHandlerType notifier )
00341 {
00342     boost::unique_lock<boost::shared_mutex> lock;
00343     switch (signal)
00344     {
00345         case WM_ERROR:
00346             lock = boost::unique_lock<boost::shared_mutex>( m_errorNotifiersLock );
00347             m_errorNotifiers.push_back( notifier );
00348             lock.unlock();
00349             break;
00350         default:
00351             std::ostringstream s;
00352             s << "Could not subscribe to unknown signal.";
00353             throw WModuleSignalSubscriptionFailed( s.str() );
00354             break;
00355     }
00356 }
00357 
00358 void WModuleContainer::addDefaultNotifier( MODULE_CONNECTOR_SIGNAL signal, t_GenericSignalHandlerType notifier )
00359 {
00360     boost::unique_lock<boost::shared_mutex> lock;
00361     switch (signal)
00362     {
00363         case CONNECTION_ESTABLISHED:
00364             lock = boost::unique_lock<boost::shared_mutex>( m_connectorNotifiersLock );
00365             m_connectorEstablishedNotifiers.push_back( notifier );
00366             lock.unlock();
00367             break;
00368         case CONNECTION_CLOSED:
00369             lock = boost::unique_lock<boost::shared_mutex>( m_connectorNotifiersLock );
00370             m_connectorClosedNotifiers.push_back( notifier );
00371             lock.unlock();
00372             break;
00373         default:
00374             std::ostringstream s;
00375             s << "Could not subscribe to unknown signal.";
00376             throw WModuleSignalSubscriptionFailed( s.str() );
00377             break;
00378     }
00379 }
00380 
00381 boost::shared_ptr< WModule > WModuleContainer::applyModule( boost::shared_ptr< WModule > applyOn, std::string what, bool tryOnly )
00382 {
00383     boost::shared_ptr< WModule >prototype = boost::shared_ptr< WModule >();
00384     if( tryOnly )
00385     {
00386         // isPrototypeAvailable returns the prototype or NULL if not found, but does not throw an exception
00387         prototype = WModuleFactory::getModuleFactory()->isPrototypeAvailable( what );
00388         if( !prototype )
00389         {
00390             return prototype;
00391         }
00392     }
00393     else
00394     {
00395         prototype = WModuleFactory::getModuleFactory()->getPrototypeByName( what );
00396     }
00397 
00398     return applyModule( applyOn, prototype );
00399 }
00400 
00401 boost::shared_ptr< WModule > WModuleContainer::applyModule( boost::shared_ptr< WModule > applyOn,
00402                                                                          boost::shared_ptr< WModule > prototype )
00403 {
00404     // is this module already associated with another container?
00405     if( applyOn->isAssociated()() && ( applyOn->getAssociatedContainer() != shared_from_this() ) )
00406     {
00407         throw WModuleAlreadyAssociated( std::string( "The specified module \"" ) + applyOn->getName() +
00408                                         std::string( "\" is associated with another container." ) );
00409     }
00410 
00411     // create a new initialized instance of the module
00412     boost::shared_ptr< WModule > m = WModuleFactory::getModuleFactory()->create( prototype );
00413 
00414     // add it
00415     add( m, true );
00416     applyOn->isReadyOrCrashed().wait();
00417     m->isReadyOrCrashed().wait();
00418 
00419     // should we ignore the crash case? In general, a crashed module can be connected. The sense or non-sense of it is questionable but assume a
00420     // crashed module has set some data on its output and some other module needs it. -> so we ignore the case of crashed modules here.
00421 
00422     // get offered outputs
00423     WModule::InputConnectorList ins = m->getInputConnectors();
00424     // get offered inputs
00425     WModule::OutputConnectorList outs = applyOn->getOutputConnectors();
00426 
00427     // connect the first connectors. For a more sophisticated way of connecting modules, use ModuleCombiners.
00428     if( !ins.empty() && !outs.empty() )
00429     {
00430         ( *ins.begin() )->connect( ( *outs.begin() ) );
00431     }
00432 
00433     return m;
00434 }
00435 
00436 boost::shared_ptr< WBatchLoader > WModuleContainer::loadDataSets( std::vector< std::string > fileNames )
00437 {
00438     // create thread which actually loads the data
00439     boost::shared_ptr< WBatchLoader > t = boost::shared_ptr< WBatchLoader >( new WBatchLoader( fileNames,
00440                 boost::shared_static_cast< WModuleContainer >( shared_from_this() ) )
00441     );
00442     t->run();
00443     return t;
00444 }
00445 
00446 void WModuleContainer::loadDataSetsSynchronously( std::vector< std::string > fileNames )
00447 {
00448     // create thread which actually loads the data
00449     boost::shared_ptr< WBatchLoader > t = boost::shared_ptr< WBatchLoader >( new WBatchLoader( fileNames,
00450                 boost::shared_static_cast< WModuleContainer >( shared_from_this() ) )
00451     );
00452     t->run();
00453     t->wait();
00454 }
00455 
00456 void WModuleContainer::addPendingThread( boost::shared_ptr< WThreadedRunner > thread )
00457 {
00458     boost::unique_lock<boost::shared_mutex> lock = boost::unique_lock<boost::shared_mutex>( m_pendingThreadsLock );
00459     m_pendingThreads.insert( thread );
00460     lock.unlock();
00461 }
00462 
00463 void WModuleContainer::finishedPendingThread( boost::shared_ptr< WThreadedRunner > thread )
00464 {
00465     boost::unique_lock<boost::shared_mutex> lock = boost::unique_lock<boost::shared_mutex>( m_pendingThreadsLock );
00466     m_pendingThreads.erase( thread );
00467     lock.unlock();
00468 }
00469 
00470 void WModuleContainer::moduleError( boost::shared_ptr< WModule > module, const WException& exception )
00471 {
00472     errorLog() << "Error in module \"" << module->getName() << "\". Forwarding to nesting container.";
00473 
00474     // simply forward it to the other signal handler
00475     signal_error( module, exception );
00476 
00477     if( m_crashIfModuleCrashes )
00478     {
00479         infoLog() << "Crash caused this container to shutdown.";
00480         requestStop();
00481         m_isCrashed( true );
00482     }
00483 }
00484 
00485 void WModuleContainer::setCrashIfModuleCrashes( bool crashIfCrashed )
00486 {
00487     m_crashIfModuleCrashes = crashIfCrashed;
00488 }
00489 
00490 WModuleContainer::ModuleSharedContainerType::ReadTicket WModuleContainer::getModules() const
00491 {
00492     return m_modules.getReadTicket();
00493 }
00494 
00495 WCombinerTypes::WCompatiblesList WModuleContainer::getPossibleConnections( boost::shared_ptr< WModule > module )
00496 {
00497     WCombinerTypes::WCompatiblesList complist;
00498 
00499     if( !module )
00500     {
00501         // be nice in case of a null pointer
00502         return complist;
00503     }
00504 
00505     // read lock the container
00506     ModuleSharedContainerType::ReadTicket lock = m_modules.getReadTicket();
00507 
00508     // handle each module
00509     for( ModuleConstIterator listIter = lock->get().begin(); listIter != lock->get().end(); ++listIter )
00510     {
00511         WCombinerTypes::WOneToOneCombiners lComp = WApplyCombiner::createCombinerList< WApplyCombiner>( module, ( *listIter ) );
00512 
00513         if( lComp.size() != 0 )
00514         {
00515             complist.push_back( WCombinerTypes::WCompatiblesGroup( ( *listIter ), lComp ) );
00516         }
00517     }
00518 
00519     // sort the compatibles
00520     std::sort( complist.begin(), complist.end(), WCombinerTypes::compatiblesSort );
00521 
00522     return complist;
00523 }
00524 
 All Classes Namespaces Functions Variables Typedefs Enumerations Enumerator Friends