OpenWalnut  1.4.0
WModuleContainer.cpp
1 //---------------------------------------------------------------------------
2 //
3 // Project: OpenWalnut ( http://www.openwalnut.org )
4 //
5 // Copyright 2009 OpenWalnut Community, BSV@Uni-Leipzig and CNCF@MPI-CBS
6 // For more information see http://www.openwalnut.org/copying
7 //
8 // This file is part of OpenWalnut.
9 //
10 // OpenWalnut is free software: you can redistribute it and/or modify
11 // it under the terms of the GNU Lesser General Public License as published by
12 // the Free Software Foundation, either version 3 of the License, or
13 // (at your option) any later version.
14 //
15 // OpenWalnut is distributed in the hope that it will be useful,
16 // but WITHOUT ANY WARRANTY; without even the implied warranty of
17 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 // GNU Lesser General Public License for more details.
19 //
20 // You should have received a copy of the GNU Lesser General Public License
21 // along with OpenWalnut. If not, see <http://www.gnu.org/licenses/>.
22 //
23 //---------------------------------------------------------------------------
24 
25 #include <list>
26 #include <set>
27 #include <vector>
28 #include <string>
29 #include <sstream>
30 #include <algorithm>
31 #include <utility>
32 
33 #include "../common/WLogger.h"
34 #include "../common/WThreadedRunner.h"
35 #include "../common/exceptions/WSignalSubscriptionFailed.h"
36 #include "WBatchLoader.h"
37 #include "WModuleCombiner.h"
38 #include "WModuleFactory.h"
39 #include "WModuleInputConnector.h"
40 #include "WModuleOutputConnector.h"
41 #include "WModuleTypes.h"
42 #include "combiner/WApplyCombiner.h"
43 #include "exceptions/WModuleAlreadyAssociated.h"
44 #include "exceptions/WModuleUninitialized.h"
45 #include "WDataModule.h"
46 
47 #include "WModuleContainer.h"
48 
49 WModuleContainer::WModuleContainer( std::string name, std::string description ):
50  WModule(),
51  m_name( name ),
52  m_description( description ),
53  m_crashIfModuleCrashes( true )
54 {
55  WLogger::getLogger()->addLogMessage( "Constructing module container." , "ModuleContainer (" + getName() + ")", LL_DEBUG );
56  // initialize members
57 }
58 
60 {
61  // cleanup
62 }
63 
65 {
66  // do nothing here. The WModule class enforces us to overwrite this method here, but we do not need it.
67  // Only set the ready flag.
68  ready();
69 }
70 
71 boost::shared_ptr< WModule > WModuleContainer::factory() const
72 {
73  // this factory is not used actually.
74  return boost::shared_ptr< WModule >( new WModuleContainer( getName(), getDescription() ) );
75 }
76 
77 void WModuleContainer::add( boost::shared_ptr< WModule > module, bool run )
78 {
79  if( !module )
80  {
81  // just ignore NULL Pointer
82  return;
83  }
84 
85  WLogger::getLogger()->addLogMessage( "Adding module \"" + module->getName() + "\" to container." ,
86  "ModuleContainer (" + getName() + ")", LL_INFO );
87 
88  if( !module->isInitialized()() )
89  {
90  std::ostringstream s;
91  s << "Could not add module \"" << module->getName() << "\" to container \"" + getName() + "\". Reason: module not initialized.";
92 
93  throw WModuleUninitialized( s.str() );
94  }
95 
96  // already associated with this container?
97  if( module->getAssociatedContainer() == shared_from_this() )
98  {
99  WLogger::getLogger()->addLogMessage( "Adding module \"" + module->getName() + "\" to container not needed. Its already inside." ,
100  "ModuleContainer (" + getName() + ")", LL_INFO );
101  return;
102  }
103 
104  // is this module already associated?
105  if( module->isAssociated()() )
106  {
107  module->getAssociatedContainer()->remove( module );
108  }
109 
110  // get write lock
112  wlock->get().insert( module );
113  wlock.reset();
114 
115  module->setAssociatedContainer( boost::static_pointer_cast< WModuleContainer >( shared_from_this() ) );
116  WLogger::getLogger()->addLogMessage( "Associated module \"" + module->getName() + "\" with container." , "ModuleContainer (" + getName() + ")",
117  LL_INFO );
118 
119  // now module->isUsable() is true
120 
121  // Connect the error handler and all default handlers:
123 
124  // connect the containers signal handler explicitly
125  t_ModuleErrorSignalHandlerType func = boost::bind( &WModuleContainer::moduleError, this, _1, _2 );
126  boost::signals2::connection signalCon = module->subscribeSignal( WM_ERROR, func );
127  subscriptionsLock->get().insert( ModuleSubscription( module, signalCon ) );
128 
129  // connect default notifiers:
130  boost::shared_lock<boost::shared_mutex> slock = boost::shared_lock<boost::shared_mutex>( m_errorNotifiersLock );
131  for( std::list< t_ModuleErrorSignalHandlerType >::iterator iter = m_errorNotifiers.begin(); iter != m_errorNotifiers.end(); ++iter)
132  {
133  signalCon = module->subscribeSignal( WM_ERROR, ( *iter ) );
134  subscriptionsLock->get().insert( ModuleSubscription( module, signalCon ) );
135  }
136  slock = boost::shared_lock<boost::shared_mutex>( m_associatedNotifiersLock );
137  for( std::list< t_ModuleGenericSignalHandlerType >::iterator iter = m_associatedNotifiers.begin(); iter != m_associatedNotifiers.end(); ++iter)
138  {
139  // call associated notifier
140  ( *iter )( module );
141  }
142  slock = boost::shared_lock<boost::shared_mutex>( m_connectorNotifiersLock );
143  for( std::list< t_GenericSignalHandlerType >::iterator iter = m_connectorEstablishedNotifiers.begin();
144  iter != m_connectorEstablishedNotifiers.end(); ++iter )
145  {
146  // subscribe on each input
147  for( InputConnectorList::const_iterator ins = module->getInputConnectors().begin(); ins != module->getInputConnectors().end(); ++ins )
148  {
149  signalCon = ( *ins )->subscribeSignal( CONNECTION_ESTABLISHED, ( *iter ) );
150  subscriptionsLock->get().insert( ModuleSubscription( module, signalCon ) );
151  }
152  }
153  for( std::list< t_GenericSignalHandlerType >::iterator iter = m_connectorClosedNotifiers.begin();
154  iter != m_connectorClosedNotifiers.end(); ++iter )
155  {
156  // subscribe on each input
157  for( InputConnectorList::const_iterator ins = module->getInputConnectors().begin(); ins != module->getInputConnectors().end(); ++ins )
158  {
159  signalCon = ( *ins )->subscribeSignal( CONNECTION_CLOSED, ( *iter ) );
160  subscriptionsLock->get().insert( ModuleSubscription( module, signalCon ) );
161  }
162  }
163  slock = boost::shared_lock<boost::shared_mutex>( m_readyNotifiersLock );
164  for( std::list< t_ModuleGenericSignalHandlerType >::iterator iter = m_readyNotifiers.begin(); iter != m_readyNotifiers.end(); ++iter)
165  {
166  signalCon = module->subscribeSignal( WM_READY, ( *iter ) );
167  subscriptionsLock->get().insert( ModuleSubscription( module, signalCon ) );
168  }
169  slock.unlock();
170 
171  // free the subscriptions lock
172  subscriptionsLock.reset();
173 
174  // add the modules progress to local progress combiner
175  m_progress->addSubProgress( module->getRootProgressCombiner() );
176 
177  // run it
178  if( run )
179  {
180  module->run();
181  }
182 }
183 
185 {
187  WModuleFactory::getModuleFactory()->getPrototypeByName( name )
188  );
189 
190  // add to the container
191  add( module );
192  module->isReady().wait();
193 
194  return module;
195 }
196 
197 void WModuleContainer::remove( boost::shared_ptr< WModule > module )
198 {
199  // simple flat removal.
200 
201  WLogger::getLogger()->addLogMessage( "Removing module \"" + module->getName() + "\" from container." , "ModuleContainer (" + getName() + ")",
202  LL_DEBUG );
203 
204  if( module->getAssociatedContainer() != shared_from_this() )
205  {
206  return;
207  }
208 
209  // remove connections inside this container
210  module->disconnect();
211 
212  // remove progress combiner
213  m_progress->removeSubProgress( module->getRootProgressCombiner() );
214 
215  // remove signal subscriptions to this containers default notifiers
217 
218  // find all subscriptions for this module
219  std::pair< ModuleSubscriptionsIterator, ModuleSubscriptionsIterator > subscriptions = subscriptionsLock->get().equal_range( module );
220  for( ModuleSubscriptionsIterator it = subscriptions.first; it != subscriptions.second; ++it )
221  {
222  // disconnect subscription.
223  ( *it ).second.disconnect();
224  }
225  // erase them
226  subscriptionsLock->get().erase( subscriptions.first, subscriptions.second );
227  subscriptionsLock.reset();
228 
229  // get write lock
231  wlock->get().erase( module );
232  wlock.reset();
233 
234  module->setAssociatedContainer( boost::shared_ptr< WModuleContainer >() );
235 
236  // tell all interested about removal
237  boost::shared_lock<boost::shared_mutex> slock = boost::shared_lock<boost::shared_mutex>( m_removedNotifiersLock );
238  for( std::list< t_ModuleGenericSignalHandlerType >::iterator iter = m_removedNotifiers.begin(); iter != m_removedNotifiers.end(); ++iter)
239  {
240  // call associated notifier
241  ( *iter )( module );
242  }
243  slock.unlock();
244 }
245 
247 {
249 
250  // lock, unlocked if l looses focus
252 
253  // iterate module list
254  for( ModuleConstIterator iter = lock->get().begin(); iter != lock->get().end(); ++iter )
255  {
256  // is this module a data module?
257  if( ( *iter )->getType() == MODULE_DATA )
258  {
259  boost::shared_ptr< WDataModule > dm = boost::static_pointer_cast< WDataModule >( *iter );
260 
261  // now check the contained dataset ( isTexture and whether it is ready )
262  if( dm->isReady()() )
263  {
264  l.insert( dm );
265  }
266  }
267  }
268 
269  return l;
270 }
271 
273 {
274  const size_t nonZero = 1;
275  size_t numberOfModules = nonZero;
276  std::vector< boost::shared_ptr< WModule > > modulesToRemove;
277 
278  while( numberOfModules != 0 )
279  {
280  modulesToRemove.clear();
281  // get names of all remaining modules to try to remove them
282  {
284  numberOfModules = lock->get().size();
285  for( ModuleConstIterator listIter = lock->get().begin(); listIter != lock->get().end(); ++listIter )
286  {
287  modulesToRemove.push_back( *listIter );
288  }
289  }
290  for( std::vector< boost::shared_ptr< WModule > >::iterator nameIter = modulesToRemove.begin();
291  nameIter != modulesToRemove.end();
292  ++nameIter )
293  {
294  remove( *nameIter );
295  }
296  }
297 }
298 
300 {
301  WLogger::getLogger()->addLogMessage( "Stopping pending threads." , "ModuleContainer (" + getName() + ")", LL_INFO );
302 
303  // read lock
304  boost::shared_lock<boost::shared_mutex> slock = boost::shared_lock<boost::shared_mutex>( m_pendingThreadsLock );
305  for( std::set< boost::shared_ptr< WThreadedRunner > >::iterator listIter = m_pendingThreads.begin(); listIter != m_pendingThreads.end();
306  ++listIter )
307  {
308  ( *listIter )->wait( true );
309  }
310  slock.unlock();
311 
312  WLogger::getLogger()->addLogMessage( "Stopping modules." , "ModuleContainer (" + getName() + ")", LL_INFO );
313 
314  // lock, unlocked if l looses focus
316 
317  for( ModuleConstIterator listIter = lock->get().begin(); listIter != lock->get().end(); ++listIter )
318  {
319  WLogger::getLogger()->addLogMessage( "Waiting for module \"" + ( *listIter )->getName() + "\" to finish." ,
320  "ModuleContainer (" + getName() + ")", LL_INFO );
321  ( *listIter )->wait( true );
322  ( *listIter )->setAssociatedContainer( boost::shared_ptr< WModuleContainer >() ); // remove last refs to this container inside the module
323  }
324  lock.reset();
325 
326  // get write lock
327  // lock, unlocked if l looses focus
329  wlock->get().clear();
330 }
331 
332 const std::string WModuleContainer::getName() const
333 {
334  return m_name;
335 }
336 
337 const std::string WModuleContainer::getDescription() const
338 {
339  return m_description;
340 }
341 
342 void WModuleContainer::addDefaultNotifier( MODULE_SIGNAL signal, t_ModuleGenericSignalHandlerType notifier )
343 {
344  boost::unique_lock<boost::shared_mutex> lock;
345  switch( signal)
346  {
347  case WM_ASSOCIATED:
348  lock = boost::unique_lock<boost::shared_mutex>( m_associatedNotifiersLock );
349  m_associatedNotifiers.push_back( notifier );
350  lock.unlock();
351  break;
352  case WM_READY:
353  lock = boost::unique_lock<boost::shared_mutex>( m_readyNotifiersLock );
354  m_readyNotifiers.push_back( notifier );
355  lock.unlock();
356  break;
357  case WM_REMOVED:
358  lock = boost::unique_lock<boost::shared_mutex>( m_removedNotifiersLock );
359  m_removedNotifiers.push_back( notifier );
360  lock.unlock();
361  break;
362  default:
363  std::ostringstream s;
364  s << "Could not subscribe to unknown signal.";
365  throw WSignalSubscriptionFailed( s.str() );
366  break;
367  }
368 }
369 
370 void WModuleContainer::addDefaultNotifier( MODULE_SIGNAL signal, t_ModuleErrorSignalHandlerType notifier )
371 {
372  boost::unique_lock<boost::shared_mutex> lock;
373  switch( signal)
374  {
375  case WM_ERROR:
376  lock = boost::unique_lock<boost::shared_mutex>( m_errorNotifiersLock );
377  m_errorNotifiers.push_back( notifier );
378  lock.unlock();
379  break;
380  default:
381  std::ostringstream s;
382  s << "Could not subscribe to unknown signal.";
383  throw WSignalSubscriptionFailed( s.str() );
384  break;
385  }
386 }
387 
388 void WModuleContainer::addDefaultNotifier( MODULE_CONNECTOR_SIGNAL signal, t_GenericSignalHandlerType notifier )
389 {
390  boost::unique_lock<boost::shared_mutex> lock;
391  switch( signal)
392  {
393  case CONNECTION_ESTABLISHED:
394  lock = boost::unique_lock<boost::shared_mutex>( m_connectorNotifiersLock );
395  m_connectorEstablishedNotifiers.push_back( notifier );
396  lock.unlock();
397  break;
398  case CONNECTION_CLOSED:
399  lock = boost::unique_lock<boost::shared_mutex>( m_connectorNotifiersLock );
400  m_connectorClosedNotifiers.push_back( notifier );
401  lock.unlock();
402  break;
403  default:
404  std::ostringstream s;
405  s << "Could not subscribe to unknown signal.";
406  throw WSignalSubscriptionFailed( s.str() );
407  break;
408  }
409 }
410 
411 boost::shared_ptr< WModule > WModuleContainer::applyModule( boost::shared_ptr< WModule > applyOn, std::string what, bool tryOnly )
412 {
413  boost::shared_ptr< WModule >prototype = boost::shared_ptr< WModule >();
414  if( tryOnly )
415  {
416  // isPrototypeAvailable returns the prototype or NULL if not found, but does not throw an exception
417  prototype = WModuleFactory::getModuleFactory()->isPrototypeAvailable( what );
418  if( !prototype )
419  {
420  return prototype;
421  }
422  }
423  else
424  {
425  prototype = WModuleFactory::getModuleFactory()->getPrototypeByName( what );
426  }
427 
428  return applyModule( applyOn, prototype );
429 }
430 
431 boost::shared_ptr< WModule > WModuleContainer::applyModule( boost::shared_ptr< WModule > applyOn,
432  boost::shared_ptr< WModule > prototype )
433 {
434  // is this module already associated with another container?
435  if( applyOn->isAssociated()() && ( applyOn->getAssociatedContainer() != shared_from_this() ) )
436  {
437  throw WModuleAlreadyAssociated( std::string( "The specified module \"" ) + applyOn->getName() +
438  std::string( "\" is associated with another container." ) );
439  }
440 
441  // create a new initialized instance of the module
442  boost::shared_ptr< WModule > m = WModuleFactory::getModuleFactory()->create( prototype );
443 
444  // add it
445  add( m, true );
446  applyOn->isReadyOrCrashed().wait();
447  m->isReadyOrCrashed().wait();
448 
449  // should we ignore the crash case? In general, a crashed module can be connected. The sense or non-sense of it is questionable but assume a
450  // crashed module has set some data on its output and some other module needs it. -> so we ignore the case of crashed modules here.
451 
452  // get offered outputs
453  WModule::InputConnectorList ins = m->getInputConnectors();
454  // get offered inputs
455  WModule::OutputConnectorList outs = applyOn->getOutputConnectors();
456 
457  // connect the first connectors. For a more sophisticated way of connecting modules, use ModuleCombiners.
458  if( !ins.empty() && !outs.empty() )
459  {
460  ( *ins.begin() )->connect( ( *outs.begin() ) );
461  }
462 
463  return m;
464 }
465 
466 WBatchLoader::SPtr WModuleContainer::loadDataSets( std::vector< std::string > filenames, bool suppressColormaps )
467 {
468  // create thread which actually loads the data
469  boost::shared_ptr< WBatchLoader > t( new WBatchLoader( filenames, boost::static_pointer_cast< WModuleContainer >( shared_from_this() ) ) );
470  t->setSuppressColormaps( suppressColormaps );
471  t->run();
472  return t;
473 }
474 
475 WBatchLoader::SPtr WModuleContainer::loadDataSetsSynchronously( std::vector< std::string > filenames, bool suppressColormaps )
476 {
477  // create thread which actually loads the data
478  boost::shared_ptr< WBatchLoader > t( new WBatchLoader( filenames, boost::static_pointer_cast< WModuleContainer >( shared_from_this() ) ) );
479  t->setSuppressColormaps( suppressColormaps );
480  t->run();
481  t->wait();
482  return t;
483 }
484 
485 void WModuleContainer::addPendingThread( boost::shared_ptr< WThreadedRunner > thread )
486 {
487  boost::unique_lock<boost::shared_mutex> lock = boost::unique_lock<boost::shared_mutex>( m_pendingThreadsLock );
488  m_pendingThreads.insert( thread );
489  lock.unlock();
490 }
491 
492 void WModuleContainer::finishedPendingThread( boost::shared_ptr< WThreadedRunner > thread )
493 {
494  boost::unique_lock<boost::shared_mutex> lock = boost::unique_lock<boost::shared_mutex>( m_pendingThreadsLock );
495  m_pendingThreads.erase( thread );
496  lock.unlock();
497 }
498 
499 void WModuleContainer::moduleError( boost::shared_ptr< WModule > module, const WException& exception )
500 {
501  errorLog() << "Error in module \"" << module->getName() << "\". Forwarding to nesting container.";
502 
503  // simply forward it to the other signal handler
504  signal_error( module, exception );
505 
507  {
508  infoLog() << "Crash caused this container to shutdown.";
509  requestStop();
510  m_isCrashed( true );
511  }
512 }
513 
514 void WModuleContainer::setCrashIfModuleCrashes( bool crashIfCrashed )
515 {
516  m_crashIfModuleCrashes = crashIfCrashed;
517 }
518 
520 {
521  return m_modules.getReadTicket();
522 }
523 
525 {
526  // get the list of all first.
528 
529  // put results in here
531 
532  // handle each module
533  for( ModuleConstIterator listIter = lock->get().begin(); listIter != lock->get().end(); ++listIter )
534  {
535  // check name
536  if( name == ( *listIter )->getName() )
537  {
538  result.push_back( ( *listIter ) );
539  }
540  }
541 
542  return result;
543 }
544 
545 WCombinerTypes::WCompatiblesList WModuleContainer::getPossibleConnections( boost::shared_ptr< WModule > module )
546 {
547  WCombinerTypes::WCompatiblesList complist;
548 
549  if( !module )
550  {
551  // be nice in case of a null pointer
552  return complist;
553  }
554 
555  // read lock the container
557 
558  // handle each module
559  for( ModuleConstIterator listIter = lock->get().begin(); listIter != lock->get().end(); ++listIter )
560  {
561  WCombinerTypes::WOneToOneCombiners lComp = WApplyCombiner::createCombinerList< WApplyCombiner>( module, ( *listIter ) );
562 
563  if( lComp.size() != 0 )
564  {
565  complist.push_back( WCombinerTypes::WCompatiblesGroup( ( *listIter ), lComp ) );
566  }
567  }
568 
569  // sort the compatibles
570  std::sort( complist.begin(), complist.end(), WCombinerTypes::compatiblesSort );
571 
572  return complist;
573 }
574