OpenWalnut
1.4.0
|
00001 //--------------------------------------------------------------------------- 00002 // 00003 // Project: OpenWalnut ( http://www.openwalnut.org ) 00004 // 00005 // Copyright 2009 OpenWalnut Community, BSV@Uni-Leipzig and CNCF@MPI-CBS 00006 // For more information see http://www.openwalnut.org/copying 00007 // 00008 // This file is part of OpenWalnut. 00009 // 00010 // OpenWalnut is free software: you can redistribute it and/or modify 00011 // it under the terms of the GNU Lesser General Public License as published by 00012 // the Free Software Foundation, either version 3 of the License, or 00013 // (at your option) any later version. 00014 // 00015 // OpenWalnut is distributed in the hope that it will be useful, 00016 // but WITHOUT ANY WARRANTY; without even the implied warranty of 00017 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00018 // GNU Lesser General Public License for more details. 00019 // 00020 // You should have received a copy of the GNU Lesser General Public License 00021 // along with OpenWalnut. If not, see <http://www.gnu.org/licenses/>. 00022 // 00023 //--------------------------------------------------------------------------- 00024 00025 #include <list> 00026 #include <set> 00027 #include <vector> 00028 #include <string> 00029 #include <sstream> 00030 #include <algorithm> 00031 #include <utility> 00032 00033 #include "../common/WLogger.h" 00034 #include "../common/WThreadedRunner.h" 00035 #include "../common/exceptions/WSignalSubscriptionFailed.h" 00036 #include "WBatchLoader.h" 00037 #include "WModuleCombiner.h" 00038 #include "WModuleFactory.h" 00039 #include "WModuleInputConnector.h" 00040 #include "WModuleOutputConnector.h" 00041 #include "WModuleTypes.h" 00042 #include "combiner/WApplyCombiner.h" 00043 #include "exceptions/WModuleAlreadyAssociated.h" 00044 #include "exceptions/WModuleUninitialized.h" 00045 #include "WDataModule.h" 00046 00047 #include "WModuleContainer.h" 00048 00049 WModuleContainer::WModuleContainer( std::string name, std::string description ): 00050 WModule(), 00051 m_name( name ), 00052 m_description( description ), 00053 m_crashIfModuleCrashes( true ) 00054 { 00055 WLogger::getLogger()->addLogMessage( "Constructing module container." , "ModuleContainer (" + getName() + ")", LL_DEBUG ); 00056 // initialize members 00057 } 00058 00059 WModuleContainer::~WModuleContainer() 00060 { 00061 // cleanup 00062 } 00063 00064 void WModuleContainer::moduleMain() 00065 { 00066 // do nothing here. The WModule class enforces us to overwrite this method here, but we do not need it. 00067 // Only set the ready flag. 00068 ready(); 00069 } 00070 00071 boost::shared_ptr< WModule > WModuleContainer::factory() const 00072 { 00073 // this factory is not used actually. 00074 return boost::shared_ptr< WModule >( new WModuleContainer( getName(), getDescription() ) ); 00075 } 00076 00077 void WModuleContainer::add( boost::shared_ptr< WModule > module, bool run ) 00078 { 00079 if( !module ) 00080 { 00081 // just ignore NULL Pointer 00082 return; 00083 } 00084 00085 WLogger::getLogger()->addLogMessage( "Adding module \"" + module->getName() + "\" to container." , 00086 "ModuleContainer (" + getName() + ")", LL_INFO ); 00087 00088 if( !module->isInitialized()() ) 00089 { 00090 std::ostringstream s; 00091 s << "Could not add module \"" << module->getName() << "\" to container \"" + getName() + "\". Reason: module not initialized."; 00092 00093 throw WModuleUninitialized( s.str() ); 00094 } 00095 00096 // already associated with this container? 00097 if( module->getAssociatedContainer() == shared_from_this() ) 00098 { 00099 WLogger::getLogger()->addLogMessage( "Adding module \"" + module->getName() + "\" to container not needed. Its already inside." , 00100 "ModuleContainer (" + getName() + ")", LL_INFO ); 00101 return; 00102 } 00103 00104 // is this module already associated? 00105 if( module->isAssociated()() ) 00106 { 00107 module->getAssociatedContainer()->remove( module ); 00108 } 00109 00110 // get write lock 00111 ModuleSharedContainerType::WriteTicket wlock = m_modules.getWriteTicket(); 00112 wlock->get().insert( module ); 00113 wlock.reset(); 00114 00115 module->setAssociatedContainer( boost::static_pointer_cast< WModuleContainer >( shared_from_this() ) ); 00116 WLogger::getLogger()->addLogMessage( "Associated module \"" + module->getName() + "\" with container." , "ModuleContainer (" + getName() + ")", 00117 LL_INFO ); 00118 00119 // now module->isUsable() is true 00120 00121 // Connect the error handler and all default handlers: 00122 ModuleSubscriptionsSharedType::WriteTicket subscriptionsLock = m_moduleSubscriptions.getWriteTicket(); 00123 00124 // connect the containers signal handler explicitly 00125 t_ModuleErrorSignalHandlerType func = boost::bind( &WModuleContainer::moduleError, this, _1, _2 ); 00126 boost::signals2::connection signalCon = module->subscribeSignal( WM_ERROR, func ); 00127 subscriptionsLock->get().insert( ModuleSubscription( module, signalCon ) ); 00128 00129 // connect default notifiers: 00130 boost::shared_lock<boost::shared_mutex> slock = boost::shared_lock<boost::shared_mutex>( m_errorNotifiersLock ); 00131 for( std::list< t_ModuleErrorSignalHandlerType >::iterator iter = m_errorNotifiers.begin(); iter != m_errorNotifiers.end(); ++iter) 00132 { 00133 signalCon = module->subscribeSignal( WM_ERROR, ( *iter ) ); 00134 subscriptionsLock->get().insert( ModuleSubscription( module, signalCon ) ); 00135 } 00136 slock = boost::shared_lock<boost::shared_mutex>( m_associatedNotifiersLock ); 00137 for( std::list< t_ModuleGenericSignalHandlerType >::iterator iter = m_associatedNotifiers.begin(); iter != m_associatedNotifiers.end(); ++iter) 00138 { 00139 // call associated notifier 00140 ( *iter )( module ); 00141 } 00142 slock = boost::shared_lock<boost::shared_mutex>( m_connectorNotifiersLock ); 00143 for( std::list< t_GenericSignalHandlerType >::iterator iter = m_connectorEstablishedNotifiers.begin(); 00144 iter != m_connectorEstablishedNotifiers.end(); ++iter ) 00145 { 00146 // subscribe on each input 00147 for( InputConnectorList::const_iterator ins = module->getInputConnectors().begin(); ins != module->getInputConnectors().end(); ++ins ) 00148 { 00149 signalCon = ( *ins )->subscribeSignal( CONNECTION_ESTABLISHED, ( *iter ) ); 00150 subscriptionsLock->get().insert( ModuleSubscription( module, signalCon ) ); 00151 } 00152 } 00153 for( std::list< t_GenericSignalHandlerType >::iterator iter = m_connectorClosedNotifiers.begin(); 00154 iter != m_connectorClosedNotifiers.end(); ++iter ) 00155 { 00156 // subscribe on each input 00157 for( InputConnectorList::const_iterator ins = module->getInputConnectors().begin(); ins != module->getInputConnectors().end(); ++ins ) 00158 { 00159 signalCon = ( *ins )->subscribeSignal( CONNECTION_CLOSED, ( *iter ) ); 00160 subscriptionsLock->get().insert( ModuleSubscription( module, signalCon ) ); 00161 } 00162 } 00163 slock = boost::shared_lock<boost::shared_mutex>( m_readyNotifiersLock ); 00164 for( std::list< t_ModuleGenericSignalHandlerType >::iterator iter = m_readyNotifiers.begin(); iter != m_readyNotifiers.end(); ++iter) 00165 { 00166 signalCon = module->subscribeSignal( WM_READY, ( *iter ) ); 00167 subscriptionsLock->get().insert( ModuleSubscription( module, signalCon ) ); 00168 } 00169 slock.unlock(); 00170 00171 // free the subscriptions lock 00172 subscriptionsLock.reset(); 00173 00174 // add the modules progress to local progress combiner 00175 m_progress->addSubProgress( module->getRootProgressCombiner() ); 00176 00177 // run it 00178 if( run ) 00179 { 00180 module->run(); 00181 } 00182 } 00183 00184 WModule::SPtr WModuleContainer::createAndAdd( std::string name ) 00185 { 00186 WModule::SPtr module = WModuleFactory::getModuleFactory()->create( 00187 WModuleFactory::getModuleFactory()->getPrototypeByName( name ) 00188 ); 00189 00190 // add to the container 00191 add( module ); 00192 module->isReady().wait(); 00193 00194 return module; 00195 } 00196 00197 void WModuleContainer::remove( boost::shared_ptr< WModule > module ) 00198 { 00199 // simple flat removal. 00200 00201 WLogger::getLogger()->addLogMessage( "Removing module \"" + module->getName() + "\" from container." , "ModuleContainer (" + getName() + ")", 00202 LL_DEBUG ); 00203 00204 if( module->getAssociatedContainer() != shared_from_this() ) 00205 { 00206 return; 00207 } 00208 00209 // remove connections inside this container 00210 module->disconnect(); 00211 00212 // remove progress combiner 00213 m_progress->removeSubProgress( module->getRootProgressCombiner() ); 00214 00215 // remove signal subscriptions to this containers default notifiers 00216 ModuleSubscriptionsSharedType::WriteTicket subscriptionsLock = m_moduleSubscriptions.getWriteTicket(); 00217 00218 // find all subscriptions for this module 00219 std::pair< ModuleSubscriptionsIterator, ModuleSubscriptionsIterator > subscriptions = subscriptionsLock->get().equal_range( module ); 00220 for( ModuleSubscriptionsIterator it = subscriptions.first; it != subscriptions.second; ++it ) 00221 { 00222 // disconnect subscription. 00223 ( *it ).second.disconnect(); 00224 } 00225 // erase them 00226 subscriptionsLock->get().erase( subscriptions.first, subscriptions.second ); 00227 subscriptionsLock.reset(); 00228 00229 // get write lock 00230 ModuleSharedContainerType::WriteTicket wlock = m_modules.getWriteTicket(); 00231 wlock->get().erase( module ); 00232 wlock.reset(); 00233 00234 module->setAssociatedContainer( boost::shared_ptr< WModuleContainer >() ); 00235 00236 // tell all interested about removal 00237 boost::shared_lock<boost::shared_mutex> slock = boost::shared_lock<boost::shared_mutex>( m_removedNotifiersLock ); 00238 for( std::list< t_ModuleGenericSignalHandlerType >::iterator iter = m_removedNotifiers.begin(); iter != m_removedNotifiers.end(); ++iter) 00239 { 00240 // call associated notifier 00241 ( *iter )( module ); 00242 } 00243 slock.unlock(); 00244 } 00245 00246 WModuleContainer::DataModuleListType WModuleContainer::getDataModules() 00247 { 00248 DataModuleListType l; 00249 00250 // lock, unlocked if l looses focus 00251 ModuleSharedContainerType::ReadTicket lock = m_modules.getReadTicket(); 00252 00253 // iterate module list 00254 for( ModuleConstIterator iter = lock->get().begin(); iter != lock->get().end(); ++iter ) 00255 { 00256 // is this module a data module? 00257 if( ( *iter )->getType() == MODULE_DATA ) 00258 { 00259 boost::shared_ptr< WDataModule > dm = boost::static_pointer_cast< WDataModule >( *iter ); 00260 00261 // now check the contained dataset ( isTexture and whether it is ready ) 00262 if( dm->isReady()() ) 00263 { 00264 l.insert( dm ); 00265 } 00266 } 00267 } 00268 00269 return l; 00270 } 00271 00272 void WModuleContainer::removeAll() 00273 { 00274 const size_t nonZero = 1; 00275 size_t numberOfModules = nonZero; 00276 std::vector< boost::shared_ptr< WModule > > modulesToRemove; 00277 00278 while( numberOfModules != 0 ) 00279 { 00280 modulesToRemove.clear(); 00281 // get names of all remaining modules to try to remove them 00282 { 00283 WModuleContainer::ModuleSharedContainerType::ReadTicket lock = getModules(); 00284 numberOfModules = lock->get().size(); 00285 for( ModuleConstIterator listIter = lock->get().begin(); listIter != lock->get().end(); ++listIter ) 00286 { 00287 modulesToRemove.push_back( *listIter ); 00288 } 00289 } 00290 for( std::vector< boost::shared_ptr< WModule > >::iterator nameIter = modulesToRemove.begin(); 00291 nameIter != modulesToRemove.end(); 00292 ++nameIter ) 00293 { 00294 remove( *nameIter ); 00295 } 00296 } 00297 } 00298 00299 void WModuleContainer::stop() 00300 { 00301 WLogger::getLogger()->addLogMessage( "Stopping pending threads." , "ModuleContainer (" + getName() + ")", LL_INFO ); 00302 00303 // read lock 00304 boost::shared_lock<boost::shared_mutex> slock = boost::shared_lock<boost::shared_mutex>( m_pendingThreadsLock ); 00305 for( std::set< boost::shared_ptr< WThreadedRunner > >::iterator listIter = m_pendingThreads.begin(); listIter != m_pendingThreads.end(); 00306 ++listIter ) 00307 { 00308 ( *listIter )->wait( true ); 00309 } 00310 slock.unlock(); 00311 00312 WLogger::getLogger()->addLogMessage( "Stopping modules." , "ModuleContainer (" + getName() + ")", LL_INFO ); 00313 00314 // lock, unlocked if l looses focus 00315 ModuleSharedContainerType::ReadTicket lock = m_modules.getReadTicket(); 00316 00317 for( ModuleConstIterator listIter = lock->get().begin(); listIter != lock->get().end(); ++listIter ) 00318 { 00319 WLogger::getLogger()->addLogMessage( "Waiting for module \"" + ( *listIter )->getName() + "\" to finish." , 00320 "ModuleContainer (" + getName() + ")", LL_INFO ); 00321 ( *listIter )->wait( true ); 00322 ( *listIter )->setAssociatedContainer( boost::shared_ptr< WModuleContainer >() ); // remove last refs to this container inside the module 00323 } 00324 lock.reset(); 00325 00326 // get write lock 00327 // lock, unlocked if l looses focus 00328 ModuleSharedContainerType::WriteTicket wlock = m_modules.getWriteTicket(); 00329 wlock->get().clear(); 00330 } 00331 00332 const std::string WModuleContainer::getName() const 00333 { 00334 return m_name; 00335 } 00336 00337 const std::string WModuleContainer::getDescription() const 00338 { 00339 return m_description; 00340 } 00341 00342 void WModuleContainer::addDefaultNotifier( MODULE_SIGNAL signal, t_ModuleGenericSignalHandlerType notifier ) 00343 { 00344 boost::unique_lock<boost::shared_mutex> lock; 00345 switch( signal) 00346 { 00347 case WM_ASSOCIATED: 00348 lock = boost::unique_lock<boost::shared_mutex>( m_associatedNotifiersLock ); 00349 m_associatedNotifiers.push_back( notifier ); 00350 lock.unlock(); 00351 break; 00352 case WM_READY: 00353 lock = boost::unique_lock<boost::shared_mutex>( m_readyNotifiersLock ); 00354 m_readyNotifiers.push_back( notifier ); 00355 lock.unlock(); 00356 break; 00357 case WM_REMOVED: 00358 lock = boost::unique_lock<boost::shared_mutex>( m_removedNotifiersLock ); 00359 m_removedNotifiers.push_back( notifier ); 00360 lock.unlock(); 00361 break; 00362 default: 00363 std::ostringstream s; 00364 s << "Could not subscribe to unknown signal."; 00365 throw WSignalSubscriptionFailed( s.str() ); 00366 break; 00367 } 00368 } 00369 00370 void WModuleContainer::addDefaultNotifier( MODULE_SIGNAL signal, t_ModuleErrorSignalHandlerType notifier ) 00371 { 00372 boost::unique_lock<boost::shared_mutex> lock; 00373 switch( signal) 00374 { 00375 case WM_ERROR: 00376 lock = boost::unique_lock<boost::shared_mutex>( m_errorNotifiersLock ); 00377 m_errorNotifiers.push_back( notifier ); 00378 lock.unlock(); 00379 break; 00380 default: 00381 std::ostringstream s; 00382 s << "Could not subscribe to unknown signal."; 00383 throw WSignalSubscriptionFailed( s.str() ); 00384 break; 00385 } 00386 } 00387 00388 void WModuleContainer::addDefaultNotifier( MODULE_CONNECTOR_SIGNAL signal, t_GenericSignalHandlerType notifier ) 00389 { 00390 boost::unique_lock<boost::shared_mutex> lock; 00391 switch( signal) 00392 { 00393 case CONNECTION_ESTABLISHED: 00394 lock = boost::unique_lock<boost::shared_mutex>( m_connectorNotifiersLock ); 00395 m_connectorEstablishedNotifiers.push_back( notifier ); 00396 lock.unlock(); 00397 break; 00398 case CONNECTION_CLOSED: 00399 lock = boost::unique_lock<boost::shared_mutex>( m_connectorNotifiersLock ); 00400 m_connectorClosedNotifiers.push_back( notifier ); 00401 lock.unlock(); 00402 break; 00403 default: 00404 std::ostringstream s; 00405 s << "Could not subscribe to unknown signal."; 00406 throw WSignalSubscriptionFailed( s.str() ); 00407 break; 00408 } 00409 } 00410 00411 boost::shared_ptr< WModule > WModuleContainer::applyModule( boost::shared_ptr< WModule > applyOn, std::string what, bool tryOnly ) 00412 { 00413 boost::shared_ptr< WModule >prototype = boost::shared_ptr< WModule >(); 00414 if( tryOnly ) 00415 { 00416 // isPrototypeAvailable returns the prototype or NULL if not found, but does not throw an exception 00417 prototype = WModuleFactory::getModuleFactory()->isPrototypeAvailable( what ); 00418 if( !prototype ) 00419 { 00420 return prototype; 00421 } 00422 } 00423 else 00424 { 00425 prototype = WModuleFactory::getModuleFactory()->getPrototypeByName( what ); 00426 } 00427 00428 return applyModule( applyOn, prototype ); 00429 } 00430 00431 boost::shared_ptr< WModule > WModuleContainer::applyModule( boost::shared_ptr< WModule > applyOn, 00432 boost::shared_ptr< WModule > prototype ) 00433 { 00434 // is this module already associated with another container? 00435 if( applyOn->isAssociated()() && ( applyOn->getAssociatedContainer() != shared_from_this() ) ) 00436 { 00437 throw WModuleAlreadyAssociated( std::string( "The specified module \"" ) + applyOn->getName() + 00438 std::string( "\" is associated with another container." ) ); 00439 } 00440 00441 // create a new initialized instance of the module 00442 boost::shared_ptr< WModule > m = WModuleFactory::getModuleFactory()->create( prototype ); 00443 00444 // add it 00445 add( m, true ); 00446 applyOn->isReadyOrCrashed().wait(); 00447 m->isReadyOrCrashed().wait(); 00448 00449 // should we ignore the crash case? In general, a crashed module can be connected. The sense or non-sense of it is questionable but assume a 00450 // crashed module has set some data on its output and some other module needs it. -> so we ignore the case of crashed modules here. 00451 00452 // get offered outputs 00453 WModule::InputConnectorList ins = m->getInputConnectors(); 00454 // get offered inputs 00455 WModule::OutputConnectorList outs = applyOn->getOutputConnectors(); 00456 00457 // connect the first connectors. For a more sophisticated way of connecting modules, use ModuleCombiners. 00458 if( !ins.empty() && !outs.empty() ) 00459 { 00460 ( *ins.begin() )->connect( ( *outs.begin() ) ); 00461 } 00462 00463 return m; 00464 } 00465 00466 WBatchLoader::SPtr WModuleContainer::loadDataSets( std::vector< std::string > filenames, bool suppressColormaps ) 00467 { 00468 // create thread which actually loads the data 00469 boost::shared_ptr< WBatchLoader > t( new WBatchLoader( filenames, boost::static_pointer_cast< WModuleContainer >( shared_from_this() ) ) ); 00470 t->setSuppressColormaps( suppressColormaps ); 00471 t->run(); 00472 return t; 00473 } 00474 00475 WBatchLoader::SPtr WModuleContainer::loadDataSetsSynchronously( std::vector< std::string > filenames, bool suppressColormaps ) 00476 { 00477 // create thread which actually loads the data 00478 boost::shared_ptr< WBatchLoader > t( new WBatchLoader( filenames, boost::static_pointer_cast< WModuleContainer >( shared_from_this() ) ) ); 00479 t->setSuppressColormaps( suppressColormaps ); 00480 t->run(); 00481 t->wait(); 00482 return t; 00483 } 00484 00485 void WModuleContainer::addPendingThread( boost::shared_ptr< WThreadedRunner > thread ) 00486 { 00487 boost::unique_lock<boost::shared_mutex> lock = boost::unique_lock<boost::shared_mutex>( m_pendingThreadsLock ); 00488 m_pendingThreads.insert( thread ); 00489 lock.unlock(); 00490 } 00491 00492 void WModuleContainer::finishedPendingThread( boost::shared_ptr< WThreadedRunner > thread ) 00493 { 00494 boost::unique_lock<boost::shared_mutex> lock = boost::unique_lock<boost::shared_mutex>( m_pendingThreadsLock ); 00495 m_pendingThreads.erase( thread ); 00496 lock.unlock(); 00497 } 00498 00499 void WModuleContainer::moduleError( boost::shared_ptr< WModule > module, const WException& exception ) 00500 { 00501 errorLog() << "Error in module \"" << module->getName() << "\". Forwarding to nesting container."; 00502 00503 // simply forward it to the other signal handler 00504 signal_error( module, exception ); 00505 00506 if( m_crashIfModuleCrashes ) 00507 { 00508 infoLog() << "Crash caused this container to shutdown."; 00509 requestStop(); 00510 m_isCrashed( true ); 00511 } 00512 } 00513 00514 void WModuleContainer::setCrashIfModuleCrashes( bool crashIfCrashed ) 00515 { 00516 m_crashIfModuleCrashes = crashIfCrashed; 00517 } 00518 00519 WModuleContainer::ModuleSharedContainerType::ReadTicket WModuleContainer::getModules() const 00520 { 00521 return m_modules.getReadTicket(); 00522 } 00523 00524 WModuleContainer::ModuleVectorType WModuleContainer::getModules( std::string name ) const 00525 { 00526 // get the list of all first. 00527 WModuleContainer::ModuleSharedContainerType::ReadTicket lock = getModules(); 00528 00529 // put results in here 00530 WModuleContainer::ModuleVectorType result; 00531 00532 // handle each module 00533 for( ModuleConstIterator listIter = lock->get().begin(); listIter != lock->get().end(); ++listIter ) 00534 { 00535 // check name 00536 if( name == ( *listIter )->getName() ) 00537 { 00538 result.push_back( ( *listIter ) ); 00539 } 00540 } 00541 00542 return result; 00543 } 00544 00545 WCombinerTypes::WCompatiblesList WModuleContainer::getPossibleConnections( boost::shared_ptr< WModule > module ) 00546 { 00547 WCombinerTypes::WCompatiblesList complist; 00548 00549 if( !module ) 00550 { 00551 // be nice in case of a null pointer 00552 return complist; 00553 } 00554 00555 // read lock the container 00556 ModuleSharedContainerType::ReadTicket lock = m_modules.getReadTicket(); 00557 00558 // handle each module 00559 for( ModuleConstIterator listIter = lock->get().begin(); listIter != lock->get().end(); ++listIter ) 00560 { 00561 WCombinerTypes::WOneToOneCombiners lComp = WApplyCombiner::createCombinerList< WApplyCombiner>( module, ( *listIter ) ); 00562 00563 if( lComp.size() != 0 ) 00564 { 00565 complist.push_back( WCombinerTypes::WCompatiblesGroup( ( *listIter ), lComp ) ); 00566 } 00567 } 00568 00569 // sort the compatibles 00570 std::sort( complist.begin(), complist.end(), WCombinerTypes::compatiblesSort ); 00571 00572 return complist; 00573 } 00574