cctools

work_queue.h

Go to the documentation of this file.
00001 /*
00002 Copyright (C) 2008- The University of Notre Dame
00003 This software is distributed under the GNU General Public License.
00004 See the file COPYING for details.
00005 */
00006 
00007 #ifndef WORK_QUEUE_H
00008 #define WORK_QUEUE_H
00009 
00020 #include "timestamp.h"
00021 
00022 #define WORK_QUEUE_DEFAULT_PORT 9123 
00023 #define WORK_QUEUE_RANDOM_PORT -1    
00024 #define WORK_QUEUE_LINE_MAX 1024
00025 
00026 #define WORK_QUEUE_WAITFORTASK -1   
00028 #define WORK_QUEUE_RETURN_STATUS_UNSET -1
00029 
00030 #define WORK_QUEUE_RESULT_UNSET 0
00031 #define WORK_QUEUE_RESULT_INPUT_FAIL 1
00032 #define WORK_QUEUE_RESULT_INPUT_MISSING 2
00033 #define WORK_QUEUE_RESULT_FUNCTION_FAIL 4
00034 #define WORK_QUEUE_RESULT_OUTPUT_FAIL 8
00035 #define WORK_QUEUE_RESULT_OUTPUT_MISSING 16
00036 #define WORK_QUEUE_RESULT_LINK_FAIL 32
00037 
00038 #define WORK_QUEUE_SCHEDULE_UNSET 0     
00039 #define WORK_QUEUE_SCHEDULE_FCFS 1      
00040 #define WORK_QUEUE_SCHEDULE_FILES 2     
00041 #define WORK_QUEUE_SCHEDULE_TIME 3      
00042 #define WORK_QUEUE_SCHEDULE_DEFAULT 3   
00043 #define WORK_QUEUE_SCHEDULE_PREFERRED_HOSTS 4 
00044 #define WORK_QUEUE_SCHEDULE_RAND 5      
00045 #define WORK_QUEUE_SCHEDULE_MAX 5
00046 
00047 #define WORK_QUEUE_INPUT  0     
00048 #define WORK_QUEUE_OUTPUT 1     
00050 #define WORK_QUEUE_NOCACHE 0    
00051 #define WORK_QUEUE_CACHE 1      
00052 #define WORK_QUEUE_SYMLINK 2    
00053 #define WORK_QUEUE_PREEXIST 4
00054 #define WORK_QUEUE_THIRDGET 8   
00055 #define WORK_QUEUE_THIRDPUT 8   
00057 #define WORK_QUEUE_MASTER_MODE_STANDALONE 0 
00058 #define WORK_QUEUE_MASTER_MODE_CATALOG 1    
00059 #define WORK_QUEUE_NAME_MAX 256
00060 #define WORK_QUEUE_MASTER_PRIORITY_MAX 100
00061 #define WORK_QUEUE_MASTER_PRIORITY_DEFAULT 10
00062 #define WORK_QUEUE_WORKER_MODE_SHARED 0     
00063 #define WORK_QUEUE_WORKER_MODE_EXCLUSIVE 1  
00064 #define WORK_QUEUE_CATALOG_LINE_MAX 1024
00065 #define WORK_QUEUE_CATALOG_UPDATE_INTERVAL 60
00066 #define WORK_QUEUE_CATALOG_LIFETIME     180
00067 
00068 #define WORK_QUEUE_FS_CMD 1
00069 #define WORK_QUEUE_FS_PATH 2
00070 #define WORK_QUEUE_FS_SYMLINK 3
00071 
00072 
00073 extern double wq_option_fast_abort_multiplier; 
00074 extern int wq_option_scheduler; 
00078 struct work_queue_task {
00079         char *tag;                      
00080         char *command_line;             
00081         int worker_selection_algorithm;           
00082         char *output;                   
00083         struct list *input_files;       
00084         struct list *output_files;      
00085         char *preferred_host;           
00086         int taskid;                     
00087         int status;     
00088         int return_status;              
00089         int result;                     
00090         char *host;                     
00091         timestamp_t submit_time;        
00092         timestamp_t transfer_start_time;        
00093         timestamp_t start_time;         
00094         timestamp_t finish_time;        
00095         timestamp_t computation_time;   
00096         INT64_T total_bytes_transferred;
00097         timestamp_t total_transfer_time;    
00098 };
00099 
00102 struct work_queue_stats {
00103         int workers_init;               
00104         int workers_ready;              
00105         int workers_busy;               
00106         int tasks_running;              
00107         int tasks_waiting;              
00108         int tasks_complete;             
00109         int total_tasks_dispatched;     
00110         int total_tasks_complete;       
00111         int total_workers_joined;       
00112         int total_workers_removed;      
00113         INT64_T total_bytes_sent;   
00114         INT64_T total_bytes_received;   
00115         timestamp_t total_send_time;
00116         timestamp_t total_receive_time;
00118 };
00119 
00123 
00127 struct work_queue_task *work_queue_task_create(const char *full_command);
00128 
00143 void work_queue_task_specify_file(struct work_queue_task *t, const char *local_name, const char *remote_name, int type, int flags);
00144 
00152 void work_queue_task_specify_buffer(struct work_queue_task *t, const char *data, int length, const char *remote_name, int flags);
00153 
00165 void work_queue_task_specify_file_command(struct work_queue_task *t, const char *remote_name, const char *cmd, int type, int flags);
00166 
00172 void work_queue_task_specify_tag(struct work_queue_task *t, const char *tag);
00173 
00178 int work_queue_task_specify_algorithm(struct work_queue_task *t, int alg);
00179 
00184 void work_queue_task_specify_preferred_host(struct work_queue_task *t, const char *hostname);
00185 
00189 void work_queue_task_delete(struct work_queue_task *t);
00190 
00192 
00196 
00213 struct work_queue *work_queue_create(int port);
00214 
00220 void work_queue_submit(struct work_queue *q, struct work_queue_task *t);
00221 
00227 struct work_queue_task *work_queue_wait(struct work_queue *q, int timeout);
00228 
00232 int work_queue_hungry(struct work_queue *q);
00233 
00237 int work_queue_empty(struct work_queue *q);
00238 
00243 int work_queue_port(struct work_queue *q);
00244 
00249 const char *work_queue_name(struct work_queue *q);
00250 
00255 void work_queue_get_stats(struct work_queue *q, struct work_queue_stats *s);
00256 
00262 int work_queue_activate_fast_abort(struct work_queue *q, double multiplier);
00263 
00268 int work_queue_specify_algorithm(struct work_queue *q, int alg);
00269 
00274 int work_queue_specify_name(struct work_queue *q, const char *name);
00275 
00281 int work_queue_specify_priority(struct work_queue *q, int priority);
00282 
00290 int work_queue_specify_master_mode(struct work_queue *q, int mode);
00291 
00299 int work_queue_specify_worker_mode(struct work_queue *q, int mode);
00300 
00305 int work_queue_shut_down_workers(struct work_queue *q, int n);
00306 
00310 void work_queue_delete(struct work_queue *q);
00311 
00313 
00317 
00325 void work_queue_task_specify_input_buf(struct work_queue_task *t, const char *buf, int length, const char *rname);
00326 
00333 void work_queue_task_specify_input_file(struct work_queue_task *t, const char *fname, const char *rname);
00334 
00341 void work_queue_task_specify_input_file_do_not_cache(struct work_queue_task *t, const char *fname, const char *rname);
00342 
00349 void work_queue_task_specify_output_file(struct work_queue_task *t, const char *rname, const char *fname);
00350 
00357 void work_queue_task_specify_output_file_do_not_cache(struct work_queue_task *t, const char *rname, const char *fname);
00358 
00360 
00361 #endif