Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007 #ifndef WORK_QUEUE_H
00008 #define WORK_QUEUE_H
00009
00020 #include "timestamp.h"
00021
00022 #define WORK_QUEUE_DEFAULT_PORT 9123
00023 #define WORK_QUEUE_RANDOM_PORT -1
00024 #define WORK_QUEUE_LINE_MAX 1024
00025
00026 #define WORK_QUEUE_WAITFORTASK -1
00028 #define WORK_QUEUE_RETURN_STATUS_UNSET -1
00029
00030 #define WORK_QUEUE_RESULT_UNSET 0
00031 #define WORK_QUEUE_RESULT_INPUT_FAIL 1
00032 #define WORK_QUEUE_RESULT_INPUT_MISSING 2
00033 #define WORK_QUEUE_RESULT_FUNCTION_FAIL 4
00034 #define WORK_QUEUE_RESULT_OUTPUT_FAIL 8
00035 #define WORK_QUEUE_RESULT_OUTPUT_MISSING 16
00036 #define WORK_QUEUE_RESULT_LINK_FAIL 32
00037
00038 #define WORK_QUEUE_SCHEDULE_UNSET 0
00039 #define WORK_QUEUE_SCHEDULE_FCFS 1
00040 #define WORK_QUEUE_SCHEDULE_FILES 2
00041 #define WORK_QUEUE_SCHEDULE_TIME 3
00042 #define WORK_QUEUE_SCHEDULE_DEFAULT 3
00043 #define WORK_QUEUE_SCHEDULE_PREFERRED_HOSTS 4
00044 #define WORK_QUEUE_SCHEDULE_RAND 5
00045 #define WORK_QUEUE_SCHEDULE_MAX 5
00046
00047 #define WORK_QUEUE_INPUT 0
00048 #define WORK_QUEUE_OUTPUT 1
00050 #define WORK_QUEUE_NOCACHE 0
00051 #define WORK_QUEUE_CACHE 1
00052 #define WORK_QUEUE_SYMLINK 2
00053 #define WORK_QUEUE_PREEXIST 4
00054 #define WORK_QUEUE_THIRDGET 8
00055 #define WORK_QUEUE_THIRDPUT 8
00057 #define WORK_QUEUE_MASTER_MODE_STANDALONE 0
00058 #define WORK_QUEUE_MASTER_MODE_CATALOG 1
00059 #define WORK_QUEUE_NAME_MAX 256
00060 #define WORK_QUEUE_MASTER_PRIORITY_MAX 100
00061 #define WORK_QUEUE_MASTER_PRIORITY_DEFAULT 10
00062 #define WORK_QUEUE_WORKER_MODE_SHARED 0
00063 #define WORK_QUEUE_WORKER_MODE_EXCLUSIVE 1
00064 #define WORK_QUEUE_CATALOG_LINE_MAX 1024
00065 #define WORK_QUEUE_CATALOG_UPDATE_INTERVAL 60
00066 #define WORK_QUEUE_CATALOG_LIFETIME 180
00067
00068 #define WORK_QUEUE_FS_CMD 1
00069 #define WORK_QUEUE_FS_PATH 2
00070 #define WORK_QUEUE_FS_SYMLINK 3
00071
00072
00073 extern double wq_option_fast_abort_multiplier;
00074 extern int wq_option_scheduler;
00078 struct work_queue_task {
00079 char *tag;
00080 char *command_line;
00081 int worker_selection_algorithm;
00082 char *output;
00083 struct list *input_files;
00084 struct list *output_files;
00085 char *preferred_host;
00086 int taskid;
00087 int status;
00088 int return_status;
00089 int result;
00090 char *host;
00091 timestamp_t submit_time;
00092 timestamp_t transfer_start_time;
00093 timestamp_t start_time;
00094 timestamp_t finish_time;
00095 timestamp_t computation_time;
00096 INT64_T total_bytes_transferred;
00097 timestamp_t total_transfer_time;
00098 };
00099
00102 struct work_queue_stats {
00103 int workers_init;
00104 int workers_ready;
00105 int workers_busy;
00106 int tasks_running;
00107 int tasks_waiting;
00108 int tasks_complete;
00109 int total_tasks_dispatched;
00110 int total_tasks_complete;
00111 int total_workers_joined;
00112 int total_workers_removed;
00113 INT64_T total_bytes_sent;
00114 INT64_T total_bytes_received;
00115 timestamp_t total_send_time;
00116 timestamp_t total_receive_time;
00118 };
00119
00123
00127 struct work_queue_task *work_queue_task_create(const char *full_command);
00128
00143 void work_queue_task_specify_file(struct work_queue_task *t, const char *local_name, const char *remote_name, int type, int flags);
00144
00152 void work_queue_task_specify_buffer(struct work_queue_task *t, const char *data, int length, const char *remote_name, int flags);
00153
00165 void work_queue_task_specify_file_command(struct work_queue_task *t, const char *remote_name, const char *cmd, int type, int flags);
00166
00172 void work_queue_task_specify_tag(struct work_queue_task *t, const char *tag);
00173
00178 int work_queue_task_specify_algorithm(struct work_queue_task *t, int alg);
00179
00184 void work_queue_task_specify_preferred_host(struct work_queue_task *t, const char *hostname);
00185
00189 void work_queue_task_delete(struct work_queue_task *t);
00190
00192
00196
00213 struct work_queue *work_queue_create(int port);
00214
00220 void work_queue_submit(struct work_queue *q, struct work_queue_task *t);
00221
00227 struct work_queue_task *work_queue_wait(struct work_queue *q, int timeout);
00228
00232 int work_queue_hungry(struct work_queue *q);
00233
00237 int work_queue_empty(struct work_queue *q);
00238
00243 int work_queue_port(struct work_queue *q);
00244
00249 const char *work_queue_name(struct work_queue *q);
00250
00255 void work_queue_get_stats(struct work_queue *q, struct work_queue_stats *s);
00256
00262 int work_queue_activate_fast_abort(struct work_queue *q, double multiplier);
00263
00268 int work_queue_specify_algorithm(struct work_queue *q, int alg);
00269
00274 int work_queue_specify_name(struct work_queue *q, const char *name);
00275
00281 int work_queue_specify_priority(struct work_queue *q, int priority);
00282
00290 int work_queue_specify_master_mode(struct work_queue *q, int mode);
00291
00299 int work_queue_specify_worker_mode(struct work_queue *q, int mode);
00300
00305 int work_queue_shut_down_workers(struct work_queue *q, int n);
00306
00310 void work_queue_delete(struct work_queue *q);
00311
00313
00317
00325 void work_queue_task_specify_input_buf(struct work_queue_task *t, const char *buf, int length, const char *rname);
00326
00333 void work_queue_task_specify_input_file(struct work_queue_task *t, const char *fname, const char *rname);
00334
00341 void work_queue_task_specify_input_file_do_not_cache(struct work_queue_task *t, const char *fname, const char *rname);
00342
00349 void work_queue_task_specify_output_file(struct work_queue_task *t, const char *rname, const char *fname);
00350
00357 void work_queue_task_specify_output_file_do_not_cache(struct work_queue_task *t, const char *rname, const char *fname);
00358
00360
00361 #endif