Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007 #ifndef WORK_QUEUE_H
00008 #define WORK_QUEUE_H
00009
00020 #include "timestamp.h"
00021
00022 #define WORK_QUEUE_DEFAULT_PORT 9123
00023 #define WORK_QUEUE_LINE_MAX 1024
00024
00025 #define WORK_QUEUE_WAITFORTASK -1
00026
00027 #define WORK_QUEUE_RETURN_STATUS_UNSET -1
00028
00029 #define WORK_QUEUE_RESULT_UNSET 0
00030 #define WORK_QUEUE_RESULT_INPUT_FAIL 1
00031 #define WORK_QUEUE_RESULT_INPUT_MISSING 2
00032 #define WORK_QUEUE_RESULT_FUNCTION_FAIL 4
00033 #define WORK_QUEUE_RESULT_OUTPUT_FAIL 8
00034 #define WORK_QUEUE_RESULT_OUTPUT_MISSING 16
00035 #define WORK_QUEUE_RESULT_LINK_FAIL 32
00036
00037 #define WORK_QUEUE_SCHEDULE_UNSET 0 // default setting for task.
00038 #define WORK_QUEUE_SCHEDULE_FCFS 1
00039 #define WORK_QUEUE_SCHEDULE_FILES 2
00040 #define WORK_QUEUE_SCHEDULE_TIME 3
00041 #define WORK_QUEUE_SCHEDULE_DEFAULT 3 // default setting for queue.
00042 #define WORK_QUEUE_SCHEDULE_PREFERRED_HOSTS 4
00043 #define WORK_QUEUE_SCHEDULE_RAND 5
00044 #define WORK_QUEUE_SCHEDULE_MAX 5
00045
00046 #define WORK_QUEUE_INPUT 0
00047 #define WORK_QUEUE_OUTPUT 1
00048
00049 #define WORK_QUEUE_NOCACHE 0
00050 #define WORK_QUEUE_CACHE 1
00051 #define WORK_QUEUE_SYMLINK 2
00052 #define WORK_QUEUE_PREEXIST 4
00053 #define WORK_QUEUE_THIRDGET 8
00054 #define WORK_QUEUE_THIRDPUT 8 // THIRDPUT/THIRDGET identical flags, including both for readability
00055
00056 #define WORK_QUEUE_MASTER_MODE_STANDALONE 0
00057 #define WORK_QUEUE_MASTER_MODE_CATALOG 1
00058 #define WORK_QUEUE_NAME_MAX 256
00059 #define WORK_QUEUE_MASTER_PRIORITY_MAX 100
00060 #define WORK_QUEUE_MASTER_PRIORITY_DEFAULT 10
00061 #define WORK_QUEUE_WORKER_MODE_SHARED 0
00062 #define WORK_QUEUE_WORKER_MODE_EXCLUSIVE 1
00063 #define WORK_QUEUE_CATALOG_LINE_MAX 1024
00064 #define WORK_QUEUE_CATALOG_UPDATE_INTERVAL 60
00065 #define WORK_QUEUE_CATALOG_LIFETIME 180
00066
00067 #define WORK_QUEUE_FS_CMD 1
00068 #define WORK_QUEUE_FS_PATH 2
00069 #define WORK_QUEUE_FS_SYMLINK 3
00070
00071
00072 extern double wq_option_fast_abort_multiplier;
00073 extern int wq_option_scheduler;
00077 struct work_queue_task {
00078 char *tag;
00079 char *command_line;
00080 int worker_selection_algorithm;
00081 char *output;
00082 struct list *input_files;
00083 struct list *output_files;
00084 char *preferred_host;
00085 int taskid;
00086 int status;
00087 int return_status;
00088 int result;
00089 char *host;
00090 timestamp_t submit_time;
00091 timestamp_t start_time;
00092 timestamp_t finish_time;
00093 timestamp_t transfer_start_time;
00094 timestamp_t computation_time;
00095 INT64_T total_bytes_transferred;
00096 timestamp_t total_transfer_time;
00097 };
00098
00101 struct work_queue_stats {
00102 int workers_init;
00103 int workers_ready;
00104 int workers_busy;
00105 int tasks_running;
00106 int tasks_waiting;
00107 int tasks_complete;
00108 int total_tasks_dispatched;
00109 int total_tasks_complete;
00110 int total_workers_joined;
00111 int total_workers_removed;
00112 INT64_T total_bytes_sent;
00113 INT64_T total_bytes_received;
00114 timestamp_t total_send_time;
00115 timestamp_t total_receive_time;
00117 };
00118
00122
00126 struct work_queue_task *work_queue_task_create(const char *full_command);
00127
00141 void work_queue_task_specify_file(struct work_queue_task *t, const char *local_name, const char *remote_name, int type, int flags);
00142
00150 void work_queue_task_specify_buffer(struct work_queue_task *t, const char *data, int length, const char *remote_name, int flags);
00151
00162 void work_queue_task_specify_file_command(struct work_queue_task *t, const char *remote_name, const char *cmd, int type, int flags);
00163
00169 void work_queue_task_specify_tag(struct work_queue_task *t, const char *tag);
00170
00175 int work_queue_task_specify_algorithm(struct work_queue_task *t, int alg);
00176
00181 void work_queue_task_specify_preferred_host(struct work_queue_task *t, const char *hostname);
00182
00186 void work_queue_task_delete(struct work_queue_task *t);
00187
00189
00193
00210 struct work_queue *work_queue_create(int port);
00211
00217 void work_queue_submit(struct work_queue *q, struct work_queue_task *t);
00218
00224 struct work_queue_task *work_queue_wait(struct work_queue *q, int timeout);
00225
00229 int work_queue_hungry(struct work_queue *q);
00230
00234 int work_queue_empty(struct work_queue *q);
00235
00240 int work_queue_port(struct work_queue *q);
00241
00246 void work_queue_get_stats(struct work_queue *q, struct work_queue_stats *s);
00247
00253 int work_queue_activate_fast_abort(struct work_queue *q, double multiplier);
00254
00259 int work_queue_specify_algorithm(struct work_queue *q, int alg);
00260
00265 int work_queue_specify_name(struct work_queue *q, const char *name);
00266
00272 int work_queue_specify_priority(struct work_queue *q, int priority);
00273
00281 int work_queue_specify_master_mode(struct work_queue *q, int mode);
00282
00290 int work_queue_specify_worker_mode(struct work_queue *q, int mode);
00291
00296 int work_queue_shut_down_workers(struct work_queue *q, int n);
00297
00301 void work_queue_delete(struct work_queue *q);
00302
00304
00308
00316 void work_queue_task_specify_input_buf(struct work_queue_task *t, const char *buf, int length, const char *rname);
00317
00324 void work_queue_task_specify_input_file(struct work_queue_task *t, const char *fname, const char *rname);
00325
00332 void work_queue_task_specify_input_file_do_not_cache(struct work_queue_task *t, const char *fname, const char *rname);
00333
00340 void work_queue_task_specify_output_file(struct work_queue_task *t, const char *rname, const char *fname);
00341
00348 void work_queue_task_specify_output_file_do_not_cache(struct work_queue_task *t, const char *rname, const char *fname);
00349
00351
00352 #endif