00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 #include <list>
00026 #include <set>
00027 #include <vector>
00028 #include <string>
00029 #include <sstream>
00030 #include <algorithm>
00031 #include <utility>
00032
00033 #include "../common/WLogger.h"
00034 #include "../common/WThreadedRunner.h"
00035 #include "WBatchLoader.h"
00036 #include "WKernel.h"
00037 #include "WModule.h"
00038 #include "WModuleCombiner.h"
00039 #include "WModuleFactory.h"
00040 #include "WModuleInputConnector.h"
00041 #include "WModuleOutputConnector.h"
00042 #include "WModuleTypes.h"
00043 #include "combiner/WApplyCombiner.h"
00044 #include "exceptions/WModuleAlreadyAssociated.h"
00045 #include "exceptions/WModuleSignalSubscriptionFailed.h"
00046 #include "exceptions/WModuleUninitialized.h"
00047 #include "WDataModule.h"
00048
00049 #include "WModuleContainer.h"
00050
00051 WModuleContainer::WModuleContainer( std::string name, std::string description ):
00052 WModule(),
00053 m_name( name ),
00054 m_description( description ),
00055 m_crashIfModuleCrashes( true )
00056 {
00057 WLogger::getLogger()->addLogMessage( "Constructing module container." , "ModuleContainer (" + getName() + ")", LL_DEBUG );
00058
00059 }
00060
00061 WModuleContainer::~WModuleContainer()
00062 {
00063
00064 }
00065
00066 void WModuleContainer::moduleMain()
00067 {
00068
00069
00070 ready();
00071 }
00072
00073 boost::shared_ptr< WModule > WModuleContainer::factory() const
00074 {
00075
00076 return boost::shared_ptr< WModule >( new WModuleContainer( getName(), getDescription() ) );
00077 }
00078
00079 void WModuleContainer::add( boost::shared_ptr< WModule > module, bool run )
00080 {
00081 if( !module )
00082 {
00083
00084 return;
00085 }
00086
00087 WLogger::getLogger()->addLogMessage( "Adding module \"" + module->getName() + "\" to container." ,
00088 "ModuleContainer (" + getName() + ")", LL_INFO );
00089
00090 if( !module->isInitialized()() )
00091 {
00092 std::ostringstream s;
00093 s << "Could not add module \"" << module->getName() << "\" to container \"" + getName() + "\". Reason: module not initialized.";
00094
00095 throw WModuleUninitialized( s.str() );
00096 }
00097
00098
00099 if( module->getAssociatedContainer() == shared_from_this() )
00100 {
00101 WLogger::getLogger()->addLogMessage( "Adding module \"" + module->getName() + "\" to container not needed. Its already inside." ,
00102 "ModuleContainer (" + getName() + ")", LL_INFO );
00103 return;
00104 }
00105
00106
00107 if( module->isAssociated()() )
00108 {
00109 module->getAssociatedContainer()->remove( module );
00110 }
00111
00112
00113 ModuleSharedContainerType::WriteTicket wlock = m_modules.getWriteTicket();
00114 wlock->get().insert( module );
00115 wlock.reset();
00116
00117 module->setAssociatedContainer( boost::shared_static_cast< WModuleContainer >( shared_from_this() ) );
00118 WLogger::getLogger()->addLogMessage( "Associated module \"" + module->getName() + "\" with container." , "ModuleContainer (" + getName() + ")",
00119 LL_INFO );
00120
00121
00122
00123
00124 ModuleSubscriptionsSharedType::WriteTicket subscriptionsLock = m_moduleSubscriptions.getWriteTicket();
00125
00126
00127 t_ModuleErrorSignalHandlerType func = boost::bind( &WModuleContainer::moduleError, this, _1, _2 );
00128 boost::signals2::connection signalCon = module->subscribeSignal( WM_ERROR, func );
00129 subscriptionsLock->get().insert( ModuleSubscription( module, signalCon ) );
00130
00131
00132 boost::shared_lock<boost::shared_mutex> slock = boost::shared_lock<boost::shared_mutex>( m_errorNotifiersLock );
00133 for( std::list< t_ModuleErrorSignalHandlerType >::iterator iter = m_errorNotifiers.begin(); iter != m_errorNotifiers.end(); ++iter)
00134 {
00135 signalCon = module->subscribeSignal( WM_ERROR, ( *iter ) );
00136 subscriptionsLock->get().insert( ModuleSubscription( module, signalCon ) );
00137 }
00138 slock = boost::shared_lock<boost::shared_mutex>( m_associatedNotifiersLock );
00139 for( std::list< t_ModuleGenericSignalHandlerType >::iterator iter = m_associatedNotifiers.begin(); iter != m_associatedNotifiers.end(); ++iter)
00140 {
00141
00142 ( *iter )( module );
00143 }
00144 slock = boost::shared_lock<boost::shared_mutex>( m_connectorNotifiersLock );
00145 for( std::list< t_GenericSignalHandlerType >::iterator iter = m_connectorEstablishedNotifiers.begin();
00146 iter != m_connectorEstablishedNotifiers.end(); ++iter )
00147 {
00148
00149 for( InputConnectorList::const_iterator ins = module->getInputConnectors().begin(); ins != module->getInputConnectors().end(); ++ins )
00150 {
00151 signalCon = ( *ins )->subscribeSignal( CONNECTION_ESTABLISHED, ( *iter ) );
00152 subscriptionsLock->get().insert( ModuleSubscription( module, signalCon ) );
00153 }
00154 }
00155 for( std::list< t_GenericSignalHandlerType >::iterator iter = m_connectorClosedNotifiers.begin();
00156 iter != m_connectorClosedNotifiers.end(); ++iter )
00157 {
00158
00159 for( InputConnectorList::const_iterator ins = module->getInputConnectors().begin(); ins != module->getInputConnectors().end(); ++ins )
00160 {
00161 signalCon = ( *ins )->subscribeSignal( CONNECTION_CLOSED, ( *iter ) );
00162 subscriptionsLock->get().insert( ModuleSubscription( module, signalCon ) );
00163 }
00164 }
00165 slock = boost::shared_lock<boost::shared_mutex>( m_readyNotifiersLock );
00166 for( std::list< t_ModuleGenericSignalHandlerType >::iterator iter = m_readyNotifiers.begin(); iter != m_readyNotifiers.end(); ++iter)
00167 {
00168 signalCon = module->subscribeSignal( WM_READY, ( *iter ) );
00169 subscriptionsLock->get().insert( ModuleSubscription( module, signalCon ) );
00170 }
00171 slock.unlock();
00172
00173
00174 subscriptionsLock.reset();
00175
00176
00177 m_progress->addSubProgress( module->getRootProgressCombiner() );
00178
00179
00180 if( run )
00181 {
00182 module->run();
00183 }
00184 }
00185
00186 void WModuleContainer::remove( boost::shared_ptr< WModule > module )
00187 {
00188
00189
00190 WLogger::getLogger()->addLogMessage( "Removing module \"" + module->getName() + "\" from container." , "ModuleContainer (" + getName() + ")",
00191 LL_DEBUG );
00192
00193 if( module->getAssociatedContainer() != shared_from_this() )
00194 {
00195 return;
00196 }
00197
00198
00199 module->disconnect();
00200
00201
00202 m_progress->removeSubProgress( module->getRootProgressCombiner() );
00203
00204
00205 ModuleSubscriptionsSharedType::WriteTicket subscriptionsLock = m_moduleSubscriptions.getWriteTicket();
00206
00207
00208 std::pair< ModuleSubscriptionsIterator, ModuleSubscriptionsIterator > subscriptions = subscriptionsLock->get().equal_range( module );
00209 for( ModuleSubscriptionsIterator it = subscriptions.first; it != subscriptions.second; ++it )
00210 {
00211
00212 ( *it ).second.disconnect();
00213 }
00214
00215 subscriptionsLock->get().erase( subscriptions.first, subscriptions.second );
00216 subscriptionsLock.reset();
00217
00218
00219 ModuleSharedContainerType::WriteTicket wlock = m_modules.getWriteTicket();
00220 wlock->get().erase( module );
00221 wlock.reset();
00222
00223 module->setAssociatedContainer( boost::shared_ptr< WModuleContainer >() );
00224
00225
00226 boost::shared_lock<boost::shared_mutex> slock = boost::shared_lock<boost::shared_mutex>( m_removedNotifiersLock );
00227 for( std::list< t_ModuleGenericSignalHandlerType >::iterator iter = m_removedNotifiers.begin(); iter != m_removedNotifiers.end(); ++iter)
00228 {
00229
00230 ( *iter )( module );
00231 }
00232 slock.unlock();
00233 }
00234
00235 void WModuleContainer::removeDeep( boost::shared_ptr< WModule > module )
00236 {
00237 WLogger::getLogger()->addLogMessage( "Deep removal of modules is not yet implemented.", "ModuleContainer (" + getName() + ")", LL_WARNING );
00238
00239
00240 remove( module );
00241 }
00242
00243 WModuleContainer::DataModuleListType WModuleContainer::getDataModules()
00244 {
00245 DataModuleListType l;
00246
00247
00248 ModuleSharedContainerType::ReadTicket lock = m_modules.getReadTicket();
00249
00250
00251 for( ModuleConstIterator iter = lock->get().begin(); iter != lock->get().end(); ++iter )
00252 {
00253
00254 if( ( *iter )->getType() == MODULE_DATA )
00255 {
00256 boost::shared_ptr< WDataModule > dm = boost::shared_static_cast< WDataModule >( *iter );
00257
00258
00259 if( dm->isReady()() )
00260 {
00261 l.insert( dm );
00262 }
00263 }
00264 }
00265
00266 return l;
00267 }
00268
00269 void WModuleContainer::stop()
00270 {
00271 WLogger::getLogger()->addLogMessage( "Stopping pending threads." , "ModuleContainer (" + getName() + ")", LL_INFO );
00272
00273
00274 boost::shared_lock<boost::shared_mutex> slock = boost::shared_lock<boost::shared_mutex>( m_pendingThreadsLock );
00275 for( std::set< boost::shared_ptr< WThreadedRunner > >::iterator listIter = m_pendingThreads.begin(); listIter != m_pendingThreads.end();
00276 ++listIter )
00277 {
00278 ( *listIter )->wait( true );
00279 }
00280 slock.unlock();
00281
00282 WLogger::getLogger()->addLogMessage( "Stopping modules." , "ModuleContainer (" + getName() + ")", LL_INFO );
00283
00284
00285 ModuleSharedContainerType::ReadTicket lock = m_modules.getReadTicket();
00286
00287 for( ModuleConstIterator listIter = lock->get().begin(); listIter != lock->get().end(); ++listIter )
00288 {
00289 WLogger::getLogger()->addLogMessage( "Waiting for module \"" + ( *listIter )->getName() + "\" to finish." ,
00290 "ModuleContainer (" + getName() + ")", LL_INFO );
00291 ( *listIter )->wait( true );
00292 ( *listIter )->setAssociatedContainer( boost::shared_ptr< WModuleContainer >() );
00293 }
00294 lock.reset();
00295
00296
00297
00298 ModuleSharedContainerType::WriteTicket wlock = m_modules.getWriteTicket();
00299 wlock->get().clear();
00300 }
00301
00302 const std::string WModuleContainer::getName() const
00303 {
00304 return m_name;
00305 }
00306
00307 const std::string WModuleContainer::getDescription() const
00308 {
00309 return m_description;
00310 }
00311
00312 void WModuleContainer::addDefaultNotifier( MODULE_SIGNAL signal, t_ModuleGenericSignalHandlerType notifier )
00313 {
00314 boost::unique_lock<boost::shared_mutex> lock;
00315 switch (signal)
00316 {
00317 case WM_ASSOCIATED:
00318 lock = boost::unique_lock<boost::shared_mutex>( m_associatedNotifiersLock );
00319 m_associatedNotifiers.push_back( notifier );
00320 lock.unlock();
00321 break;
00322 case WM_READY:
00323 lock = boost::unique_lock<boost::shared_mutex>( m_readyNotifiersLock );
00324 m_readyNotifiers.push_back( notifier );
00325 lock.unlock();
00326 break;
00327 case WM_REMOVED:
00328 lock = boost::unique_lock<boost::shared_mutex>( m_removedNotifiersLock );
00329 m_removedNotifiers.push_back( notifier );
00330 lock.unlock();
00331 break;
00332 default:
00333 std::ostringstream s;
00334 s << "Could not subscribe to unknown signal.";
00335 throw WModuleSignalSubscriptionFailed( s.str() );
00336 break;
00337 }
00338 }
00339
00340 void WModuleContainer::addDefaultNotifier( MODULE_SIGNAL signal, t_ModuleErrorSignalHandlerType notifier )
00341 {
00342 boost::unique_lock<boost::shared_mutex> lock;
00343 switch (signal)
00344 {
00345 case WM_ERROR:
00346 lock = boost::unique_lock<boost::shared_mutex>( m_errorNotifiersLock );
00347 m_errorNotifiers.push_back( notifier );
00348 lock.unlock();
00349 break;
00350 default:
00351 std::ostringstream s;
00352 s << "Could not subscribe to unknown signal.";
00353 throw WModuleSignalSubscriptionFailed( s.str() );
00354 break;
00355 }
00356 }
00357
00358 void WModuleContainer::addDefaultNotifier( MODULE_CONNECTOR_SIGNAL signal, t_GenericSignalHandlerType notifier )
00359 {
00360 boost::unique_lock<boost::shared_mutex> lock;
00361 switch (signal)
00362 {
00363 case CONNECTION_ESTABLISHED:
00364 lock = boost::unique_lock<boost::shared_mutex>( m_connectorNotifiersLock );
00365 m_connectorEstablishedNotifiers.push_back( notifier );
00366 lock.unlock();
00367 break;
00368 case CONNECTION_CLOSED:
00369 lock = boost::unique_lock<boost::shared_mutex>( m_connectorNotifiersLock );
00370 m_connectorClosedNotifiers.push_back( notifier );
00371 lock.unlock();
00372 break;
00373 default:
00374 std::ostringstream s;
00375 s << "Could not subscribe to unknown signal.";
00376 throw WModuleSignalSubscriptionFailed( s.str() );
00377 break;
00378 }
00379 }
00380
00381 boost::shared_ptr< WModule > WModuleContainer::applyModule( boost::shared_ptr< WModule > applyOn, std::string what, bool tryOnly )
00382 {
00383 boost::shared_ptr< WModule >prototype = boost::shared_ptr< WModule >();
00384 if( tryOnly )
00385 {
00386
00387 prototype = WModuleFactory::getModuleFactory()->isPrototypeAvailable( what );
00388 if( !prototype )
00389 {
00390 return prototype;
00391 }
00392 }
00393 else
00394 {
00395 prototype = WModuleFactory::getModuleFactory()->getPrototypeByName( what );
00396 }
00397
00398 return applyModule( applyOn, prototype );
00399 }
00400
00401 boost::shared_ptr< WModule > WModuleContainer::applyModule( boost::shared_ptr< WModule > applyOn,
00402 boost::shared_ptr< WModule > prototype )
00403 {
00404
00405 if( applyOn->isAssociated()() && ( applyOn->getAssociatedContainer() != shared_from_this() ) )
00406 {
00407 throw WModuleAlreadyAssociated( std::string( "The specified module \"" ) + applyOn->getName() +
00408 std::string( "\" is associated with another container." ) );
00409 }
00410
00411
00412 boost::shared_ptr< WModule > m = WModuleFactory::getModuleFactory()->create( prototype );
00413
00414
00415 add( m, true );
00416 applyOn->isReadyOrCrashed().wait();
00417 m->isReadyOrCrashed().wait();
00418
00419
00420
00421
00422
00423 WModule::InputConnectorList ins = m->getInputConnectors();
00424
00425 WModule::OutputConnectorList outs = applyOn->getOutputConnectors();
00426
00427
00428 if( !ins.empty() && !outs.empty() )
00429 {
00430 ( *ins.begin() )->connect( ( *outs.begin() ) );
00431 }
00432
00433 return m;
00434 }
00435
00436 boost::shared_ptr< WBatchLoader > WModuleContainer::loadDataSets( std::vector< std::string > fileNames )
00437 {
00438
00439 boost::shared_ptr< WBatchLoader > t = boost::shared_ptr< WBatchLoader >( new WBatchLoader( fileNames,
00440 boost::shared_static_cast< WModuleContainer >( shared_from_this() ) )
00441 );
00442 t->run();
00443 return t;
00444 }
00445
00446 void WModuleContainer::loadDataSetsSynchronously( std::vector< std::string > fileNames )
00447 {
00448
00449 boost::shared_ptr< WBatchLoader > t = boost::shared_ptr< WBatchLoader >( new WBatchLoader( fileNames,
00450 boost::shared_static_cast< WModuleContainer >( shared_from_this() ) )
00451 );
00452 t->run();
00453 t->wait();
00454 }
00455
00456 void WModuleContainer::addPendingThread( boost::shared_ptr< WThreadedRunner > thread )
00457 {
00458 boost::unique_lock<boost::shared_mutex> lock = boost::unique_lock<boost::shared_mutex>( m_pendingThreadsLock );
00459 m_pendingThreads.insert( thread );
00460 lock.unlock();
00461 }
00462
00463 void WModuleContainer::finishedPendingThread( boost::shared_ptr< WThreadedRunner > thread )
00464 {
00465 boost::unique_lock<boost::shared_mutex> lock = boost::unique_lock<boost::shared_mutex>( m_pendingThreadsLock );
00466 m_pendingThreads.erase( thread );
00467 lock.unlock();
00468 }
00469
00470 void WModuleContainer::moduleError( boost::shared_ptr< WModule > module, const WException& exception )
00471 {
00472 errorLog() << "Error in module \"" << module->getName() << "\". Forwarding to nesting container.";
00473
00474
00475 signal_error( module, exception );
00476
00477 if( m_crashIfModuleCrashes )
00478 {
00479 infoLog() << "Crash caused this container to shutdown.";
00480 requestStop();
00481 m_isCrashed( true );
00482 }
00483 }
00484
00485 void WModuleContainer::setCrashIfModuleCrashes( bool crashIfCrashed )
00486 {
00487 m_crashIfModuleCrashes = crashIfCrashed;
00488 }
00489
00490 WModuleContainer::ModuleSharedContainerType::ReadTicket WModuleContainer::getModules() const
00491 {
00492 return m_modules.getReadTicket();
00493 }
00494
00495 WCombinerTypes::WCompatiblesList WModuleContainer::getPossibleConnections( boost::shared_ptr< WModule > module )
00496 {
00497 WCombinerTypes::WCompatiblesList complist;
00498
00499 if( !module )
00500 {
00501
00502 return complist;
00503 }
00504
00505
00506 ModuleSharedContainerType::ReadTicket lock = m_modules.getReadTicket();
00507
00508
00509 for( ModuleConstIterator listIter = lock->get().begin(); listIter != lock->get().end(); ++listIter )
00510 {
00511 WCombinerTypes::WOneToOneCombiners lComp = WApplyCombiner::createCombinerList< WApplyCombiner>( module, ( *listIter ) );
00512
00513 if( lComp.size() != 0 )
00514 {
00515 complist.push_back( WCombinerTypes::WCompatiblesGroup( ( *listIter ), lComp ) );
00516 }
00517 }
00518
00519
00520 std::sort( complist.begin(), complist.end(), WCombinerTypes::compatiblesSort );
00521
00522 return complist;
00523 }
00524