cctools
|
A master-worker library. More...
#include "timestamp.h"
Go to the source code of this file.
Data Structures | |
struct | work_queue_task |
A task description. More... | |
struct | work_queue_stats |
Statistics describing a work queue. More... | |
Macros | |
#define | WORK_QUEUE_DEFAULT_PORT 9123 |
Default Work Queue port number. More... | |
#define | WORK_QUEUE_RANDOM_PORT -1 |
Indicate to Work Queue to choose a random open port. More... | |
#define | WORK_QUEUE_WAITFORTASK -1 |
Wait for a task to complete before returning. More... | |
#define | WORK_QUEUE_SCHEDULE_FCFS 1 |
Select worker on a first-come-first-serve basis. More... | |
#define | WORK_QUEUE_SCHEDULE_FILES 2 |
Select worker that has the most files required by task. More... | |
#define | WORK_QUEUE_SCHEDULE_TIME 3 |
Select worker that has has best execution time. More... | |
#define | WORK_QUEUE_SCHEDULE_DEFAULT 3 |
Default algorithm (WORK_QUEUE_SCHEDULE_TIME). More... | |
#define | WORK_QUEUE_SCHEDULE_PREFERRED_HOSTS 4 |
Select worker from set of preferred hosts. More... | |
#define | WORK_QUEUE_SCHEDULE_RAND 5 |
Select a random worker. More... | |
#define | WORK_QUEUE_INPUT 0 |
Specify an input object. More... | |
#define | WORK_QUEUE_OUTPUT 1 |
Specify an output object. More... | |
#define | WORK_QUEUE_NOCACHE 0 |
Do not cache file at execution site. More... | |
#define | WORK_QUEUE_CACHE 1 |
Cache file at execution site for later use. More... | |
#define | WORK_QUEUE_SYMLINK 2 |
Create a symlink to the file rather than copying it, if possible. More... | |
#define | WORK_QUEUE_THIRDGET 8 |
Access the file on the client from a shared filesystem. More... | |
#define | WORK_QUEUE_THIRDPUT 8 |
Access the file on the client from a shared filesystem (included for readability) More... | |
#define | WORK_QUEUE_MASTER_MODE_STANDALONE 0 |
Work Queue master does not report to the catalog server. More... | |
#define | WORK_QUEUE_MASTER_MODE_CATALOG 1 |
Work Queue master reports to catalog server. More... | |
#define | WORK_QUEUE_WORKER_MODE_SHARED 0 |
Work Queue master accepts workers in shared or non-exclusive mode. More... | |
#define | WORK_QUEUE_WORKER_MODE_EXCLUSIVE 1 |
Work Queue master only accepts workers that have a preference for it. More... | |
Functions | |
Functions - Tasks | |
struct work_queue_task * | work_queue_task_create (const char *full_command) |
Create a new task specification. More... | |
void | work_queue_task_specify_file (struct work_queue_task *t, const char *local_name, const char *remote_name, int type, int flags) |
Add a file to a task. More... | |
void | work_queue_task_specify_buffer (struct work_queue_task *t, const char *data, int length, const char *remote_name, int flags) |
Add an input buffer to a task. More... | |
void | work_queue_task_specify_file_command (struct work_queue_task *t, const char *remote_name, const char *cmd, int type, int flags) |
Add a file created or handled by an arbitrary command to a task (eg: wget, ftp, chirp_get|put). More... | |
void | work_queue_task_specify_tag (struct work_queue_task *t, const char *tag) |
Attach a user defined logical name to the task. More... | |
int | work_queue_task_specify_algorithm (struct work_queue_task *t, int alg) |
Further define a task specification. More... | |
void | work_queue_task_specify_preferred_host (struct work_queue_task *t, const char *hostname) |
Indicate that the task would be optimally run on a given host. More... | |
void | work_queue_task_delete (struct work_queue_task *t) |
Delete a task specification. More... | |
Functions - Queues | |
struct work_queue * | work_queue_create (int port) |
Create a new work queue. More... | |
void | work_queue_submit (struct work_queue *q, struct work_queue_task *t) |
Submit a job to a work queue. More... | |
struct work_queue_task * | work_queue_wait (struct work_queue *q, int timeout) |
Wait for tasks to complete. More... | |
int | work_queue_hungry (struct work_queue *q) |
Determine whether the queue can support more tasks. More... | |
int | work_queue_empty (struct work_queue *q) |
Determine whether there are any known tasks queued, running, or waiting to be collected. More... | |
int | work_queue_port (struct work_queue *q) |
Get the listening port of the queue. More... | |
const char * | work_queue_name (struct work_queue *q) |
Get the project name of the queue. More... | |
void | work_queue_get_stats (struct work_queue *q, struct work_queue_stats *s) |
Get queue statistics. More... | |
int | work_queue_activate_fast_abort (struct work_queue *q, double multiplier) |
Turn on or off fast abort functionality for a given queue. More... | |
int | work_queue_specify_algorithm (struct work_queue *q, int alg) |
Change the worker selection algorithm for a given queue. More... | |
int | work_queue_specify_name (struct work_queue *q, const char *name) |
Change the project name for a given queue. More... | |
int | work_queue_specify_priority (struct work_queue *q, int priority) |
Change the priority for a given queue. More... | |
int | work_queue_specify_master_mode (struct work_queue *q, int mode) |
Specify the master mode for a given queue. More... | |
int | work_queue_specify_worker_mode (struct work_queue *q, int mode) |
Specify the worker mode for a given queue. More... | |
int | work_queue_shut_down_workers (struct work_queue *q, int n) |
Shut down workers connected to the work_queue system. More... | |
void | work_queue_delete (struct work_queue *q) |
Delete a work queue. More... | |
Functions - Deprecated | |
void | work_queue_task_specify_input_buf (struct work_queue_task *t, const char *buf, int length, const char *rname) |
Add an input buffer to a task. More... | |
void | work_queue_task_specify_input_file (struct work_queue_task *t, const char *fname, const char *rname) |
Add an input file to a task. More... | |
void | work_queue_task_specify_input_file_do_not_cache (struct work_queue_task *t, const char *fname, const char *rname) |
Add an input file to a task, without caching. More... | |
void | work_queue_task_specify_output_file (struct work_queue_task *t, const char *rname, const char *fname) |
Add an output file to a task. More... | |
void | work_queue_task_specify_output_file_do_not_cache (struct work_queue_task *t, const char *rname, const char *fname) |
Add an output file to a task without caching. More... | |
Variables | |
double | wq_option_fast_abort_multiplier |
Initial setting for fast abort multiplier upon creating queue. More... | |
int | wq_option_scheduler |
Initial setting for algorithm to assign tasks to workers upon creating queue . More... | |
A master-worker library.
The work queue provides an implementation of the master-worker computing model using TCP sockets, Unix applications, and files as intermediate buffers. A master process uses work_queue_create to create a queue, then work_queue_submit to submit tasks. Once tasks are running, call work_queue_wait to wait for completion. The generic worker
program can be run on any machine, and simply needs to be told the host and port of the master.
#define WORK_QUEUE_DEFAULT_PORT 9123 |
Default Work Queue port number.
#define WORK_QUEUE_RANDOM_PORT -1 |
Indicate to Work Queue to choose a random open port.
#define WORK_QUEUE_WAITFORTASK -1 |
Wait for a task to complete before returning.
#define WORK_QUEUE_SCHEDULE_FCFS 1 |
Select worker on a first-come-first-serve basis.
#define WORK_QUEUE_SCHEDULE_FILES 2 |
Select worker that has the most files required by task.
#define WORK_QUEUE_SCHEDULE_TIME 3 |
Select worker that has has best execution time.
#define WORK_QUEUE_SCHEDULE_DEFAULT 3 |
Default algorithm (WORK_QUEUE_SCHEDULE_TIME).
#define WORK_QUEUE_SCHEDULE_PREFERRED_HOSTS 4 |
Select worker from set of preferred hosts.
#define WORK_QUEUE_SCHEDULE_RAND 5 |
Select a random worker.
#define WORK_QUEUE_INPUT 0 |
Specify an input object.
#define WORK_QUEUE_OUTPUT 1 |
Specify an output object.
#define WORK_QUEUE_NOCACHE 0 |
Do not cache file at execution site.
#define WORK_QUEUE_CACHE 1 |
Cache file at execution site for later use.
#define WORK_QUEUE_SYMLINK 2 |
Create a symlink to the file rather than copying it, if possible.
#define WORK_QUEUE_THIRDGET 8 |
Access the file on the client from a shared filesystem.
#define WORK_QUEUE_THIRDPUT 8 |
Access the file on the client from a shared filesystem (included for readability)
#define WORK_QUEUE_MASTER_MODE_STANDALONE 0 |
Work Queue master does not report to the catalog server.
#define WORK_QUEUE_MASTER_MODE_CATALOG 1 |
Work Queue master reports to catalog server.
#define WORK_QUEUE_WORKER_MODE_SHARED 0 |
Work Queue master accepts workers in shared or non-exclusive mode.
#define WORK_QUEUE_WORKER_MODE_EXCLUSIVE 1 |
Work Queue master only accepts workers that have a preference for it.
struct work_queue_task* work_queue_task_create | ( | const char * | full_command | ) |
Create a new task specification.
Once created, the task may be passed to work_queue_submit.
full_command | The shell command line to be executed by the task. |
Referenced by work_queue.Task::__init__().
void work_queue_task_specify_file | ( | struct work_queue_task * | t, |
const char * | local_name, | ||
const char * | remote_name, | ||
int | type, | ||
int | flags | ||
) |
Add a file to a task.
t | The task to which to add a file. |
local_name | The name of the file on local disk or shared filesystem. |
remote_name | The name of the file at the execution site. |
type | Must be one of the following values:
|
flags | May be zero to indicate no special handling, or any of the or'd together: WORK_QUEUE_NOCACHE, WORK_QUEUE_CACHE, WORK_QUEUE_SYMLINK, WORK_QUEUE_THIRDGET, WORK_QUEUE_THIRDPUT. |
Referenced by work_queue.Task::specify_file().
void work_queue_task_specify_buffer | ( | struct work_queue_task * | t, |
const char * | data, | ||
int | length, | ||
const char * | remote_name, | ||
int | flags | ||
) |
Add an input buffer to a task.
t | The task to which to add a file. |
data | The contents of the buffer to pass as input. |
length | The length of the buffer, in bytes |
remote_name | The name of the remote file to create. |
flags | May take the same values as in work_queue_task_specify_file. |
Referenced by work_queue.Task::specify_buffer().
void work_queue_task_specify_file_command | ( | struct work_queue_task * | t, |
const char * | remote_name, | ||
const char * | cmd, | ||
int | type, | ||
int | flags | ||
) |
Add a file created or handled by an arbitrary command to a task (eg: wget, ftp, chirp_get|put).
t | The task to which to add a file. |
remote_name | The name of the file at the execution site. |
cmd | The command to run on the remote node to retrieve or store the file. |
type | Must be one of the following values: WORK_QUEUE_INPUT or WORK_QUEUE_OUTPUT. |
flags | May be zero to indicate no special handling, or any of the following or'd together: WORK_QUEUE_NOCACHE, WORK_QUEUE_CACHE. |
Referenced by work_queue.Task::specify_file_command().
void work_queue_task_specify_tag | ( | struct work_queue_task * | t, |
const char * | tag | ||
) |
Attach a user defined logical name to the task.
This field is not interpreted by the work queue, but simply maintained to help the user track tasks.
t | The task to which to add parameters |
tag | The tag to attach to task t. |
Referenced by work_queue.Task::specify_tag().
int work_queue_task_specify_algorithm | ( | struct work_queue_task * | t, |
int | alg | ||
) |
Further define a task specification.
Once completed, the task may be passed to work_queue_submit.
t | The task to which to add parameters |
alg | The algorithm to use in assigning a task to a worker. Valid possibilities are defined in this file as "WORK_QUEUE_SCHEDULE_X" values. |
Referenced by work_queue.Task::specify_algorithm().
void work_queue_task_specify_preferred_host | ( | struct work_queue_task * | t, |
const char * | hostname | ||
) |
Indicate that the task would be optimally run on a given host.
t | The task to which to add parameters |
hostname | The hostname to which this task would optimally be sent. |
Referenced by work_queue.Task::specify_preferred_host().
void work_queue_task_delete | ( | struct work_queue_task * | t | ) |
Delete a task specification.
This may be called on tasks after they are returned from work_queue_wait.
t | The task specification to delete. |
Referenced by work_queue.Task::__init__().
struct work_queue* work_queue_create | ( | int | port | ) |
Create a new work queue.
Users may modify the behavior of work_queue_create by setting the following environmental variables before calling the function:
If the queue has a project name, then queue statistics and information will be reported to a catalog server. To specify the catalog server, the user may set the CATALOG_HOST and CATALOG_PORT environmental variables as described in catalog_query_create.
port | The port number to listen on. If zero is specified, then the default is chosen, and if -1 is specified, a random port is chosen. |
Referenced by work_queue.WorkQueue::__init__().
void work_queue_submit | ( | struct work_queue * | q, |
struct work_queue_task * | t | ||
) |
Submit a job to a work queue.
It is safe to re-submit a task returned by work_queue_wait.
q | A work queue returned from work_queue_create. |
t | A task description returned from work_queue_task_create. |
Referenced by work_queue.WorkQueue::submit().
struct work_queue_task* work_queue_wait | ( | struct work_queue * | q, |
int | timeout | ||
) |
Wait for tasks to complete.
This call will block until the timeout has elapsed.
q | The work queue to wait on. |
timeout | The number of seconds to wait for a completed task before returning. Use an integer time to set the timeout or the constant WORK_QUEUE_WAITFORTASK to block until a task has completed. |
Referenced by work_queue.WorkQueue::wait().
int work_queue_hungry | ( | struct work_queue * | q | ) |
Determine whether the queue can support more tasks.
Returns the number of additional tasks it can support if "hungry" and 0 if "sated".
q | A pointer to the queue to query. |
Referenced by work_queue.WorkQueue::hungry().
int work_queue_empty | ( | struct work_queue * | q | ) |
Determine whether there are any known tasks queued, running, or waiting to be collected.
Returns 0 if there are tasks remaining in the system, 1 if the system is "empty".
q | A pointer to the queue to query. |
Referenced by work_queue.WorkQueue::empty().
int work_queue_port | ( | struct work_queue * | q | ) |
Get the listening port of the queue.
q | The work queue of interest. |
Referenced by work_queue.WorkQueue::__init__().
const char* work_queue_name | ( | struct work_queue * | q | ) |
Get the project name of the queue.
q | The work queue of interest. |
Referenced by work_queue.WorkQueue::__init__().
void work_queue_get_stats | ( | struct work_queue * | q, |
struct work_queue_stats * | s | ||
) |
Get queue statistics.
q | The queue to query. |
s | A pointer to a buffer that will be filed with statistics. |
Referenced by work_queue.WorkQueue::__init__().
int work_queue_activate_fast_abort | ( | struct work_queue * | q, |
double | multiplier | ||
) |
Turn on or off fast abort functionality for a given queue.
q | A pointer to the queue to modify. |
multiplier | The multiplier of the average task time at which point to abort; if negative (and by default) fast_abort is deactivated. |
Referenced by work_queue.WorkQueue::activate_fast_abort().
int work_queue_specify_algorithm | ( | struct work_queue * | q, |
int | alg | ||
) |
Change the worker selection algorithm for a given queue.
q | A pointer to the queue to modify. |
alg | The algorithm to use in assigning a task to a worker. Valid possibilities are defined in this file as "WORK_QUEUE_SCHEDULE_X" values. |
Referenced by work_queue.WorkQueue::specify_algorithm().
int work_queue_specify_name | ( | struct work_queue * | q, |
const char * | name | ||
) |
Change the project name for a given queue.
q | A pointer to the queue to modify. |
name | The new project name. |
Referenced by work_queue.WorkQueue::__init__(), and work_queue.WorkQueue::specify_name().
int work_queue_specify_priority | ( | struct work_queue * | q, |
int | priority | ||
) |
Change the priority for a given queue.
q | A pointer to the queue to modify. |
priority | An integer that presents the priorty of this work queue master. The higher the value, the higher the priority. |
Referenced by work_queue.WorkQueue::specify_priority().
int work_queue_specify_master_mode | ( | struct work_queue * | q, |
int | mode | ||
) |
Specify the master mode for a given queue.
q | A pointer to the queue to modify. |
mode | mode == WORK_QUEUE_MASTER_MODE_STANDALONE: standalone mode. In this mode the master would not report its information to a catalog server; mode == WORK_QUEUE_MASTER_MODE_CATALOG: catalog mode. In this mode the master report itself to a catalog server where workers get masters' information and select a master to serve. |
Referenced by work_queue.WorkQueue::__init__(), and work_queue.WorkQueue::specify_master_mode().
int work_queue_specify_worker_mode | ( | struct work_queue * | q, |
int | mode | ||
) |
Specify the worker mode for a given queue.
q | A pointer to the queue to modify. |
mode | mode == WORK_QUEUE_WORKER_MODE_SHARED: shared mode. In this mode the master would accept connections from shared workers; mode == WORK_QUEUE_WORKER_MODE_EXCLUSIVE: exclusive mode. In this mode the master would only accept workers that have specified a preference on it, which are the workers started with "-N name" where name is the name of the queue. |
Referenced by work_queue.WorkQueue::__init__(), and work_queue.WorkQueue::specify_worker_mode().
int work_queue_shut_down_workers | ( | struct work_queue * | q, |
int | n | ||
) |
Shut down workers connected to the work_queue system.
Gives a best effort and then returns the number of workers given the shut down order.
q | A pointer to the queue to query. |
n | The number to shut down. All workers if given "0". |
Referenced by work_queue.WorkQueue::shutdown_workers().
void work_queue_delete | ( | struct work_queue * | q | ) |
Delete a work queue.
q | The work queue to delete. |
Referenced by work_queue.WorkQueue::__init__().
void work_queue_task_specify_input_buf | ( | struct work_queue_task * | t, |
const char * | buf, | ||
int | length, | ||
const char * | rname | ||
) |
Add an input buffer to a task.
t | The task to which to add parameters |
buf | A pointer to the data buffer to send to the worker to be available to the commands. |
length | The number of bytes of data in the buffer |
rname | The name of the file in which to store the buffer data on the worker |
void work_queue_task_specify_input_file | ( | struct work_queue_task * | t, |
const char * | fname, | ||
const char * | rname | ||
) |
Add an input file to a task.
t | The task to which to add parameters |
fname | The name of the data file to send to the worker to be available to the commands. |
rname | The name of the file in which to store the buffer data on the worker. |
void work_queue_task_specify_input_file_do_not_cache | ( | struct work_queue_task * | t, |
const char * | fname, | ||
const char * | rname | ||
) |
Add an input file to a task, without caching.
t | The task to which to add parameters |
fname | The name of the data file to send to the worker to be available to the commands. |
rname | The name of the file in which to store the buffer data on the worker. |
void work_queue_task_specify_output_file | ( | struct work_queue_task * | t, |
const char * | rname, | ||
const char * | fname | ||
) |
Add an output file to a task.
t | The task to which to add parameters |
rname | The name of a file created by the program when it runs. |
fname | The name of the file local target for copying rname back. |
void work_queue_task_specify_output_file_do_not_cache | ( | struct work_queue_task * | t, |
const char * | rname, | ||
const char * | fname | ||
) |
Add an output file to a task without caching.
t | The task to which to add parameters |
rname | The name of a file created by the program when it runs. |
fname | The name of the file local target for copying rname back. |
double wq_option_fast_abort_multiplier |
Initial setting for fast abort multiplier upon creating queue.
Turned off if less than 0. Change prior to calling work_queue_create, after queue is created this variable is not considered and changes must be made through the API calls.
int wq_option_scheduler |
Initial setting for algorithm to assign tasks to workers upon creating queue .
Change prior to calling work_queue_create, after queue is created this variable is not considered and changes must be made through the API calls.