Data Structures | Defines | Variables

work_queue.h File Reference

A master-worker library. More...

#include "timestamp.h"

Go to the source code of this file.

Data Structures

struct  work_queue_task
 A task description. More...
struct  work_queue_stats
 Statistics describing a work queue. More...

Defines

#define WORK_QUEUE_DEFAULT_PORT   9123
 Default Work Queue port number.
#define WORK_QUEUE_RANDOM_PORT   -1
 Indicate to Work Queue to choose a random open port.
#define WORK_QUEUE_WAITFORTASK   -1
 Wait for a task to complete before returning.
#define WORK_QUEUE_SCHEDULE_FCFS   1
 Select worker on a first-come-first-serve basis.
#define WORK_QUEUE_SCHEDULE_FILES   2
 Select worker that has the most files required by task.
#define WORK_QUEUE_SCHEDULE_TIME   3
 Select worker that has has best execution time.
#define WORK_QUEUE_SCHEDULE_DEFAULT   3
 Default algorithm (WORK_QUEUE_SCHEDULE_TIME).
#define WORK_QUEUE_SCHEDULE_PREFERRED_HOSTS   4
 Select worker from set of preferred hosts.
#define WORK_QUEUE_SCHEDULE_RAND   5
 Select a random worker.
#define WORK_QUEUE_INPUT   0
 Specify an input object.
#define WORK_QUEUE_OUTPUT   1
 Specify an output object.
#define WORK_QUEUE_NOCACHE   0
 Do not cache file at execution site.
#define WORK_QUEUE_CACHE   1
 Cache file at execution site for later use.
#define WORK_QUEUE_SYMLINK   2
 Create a symlink to the file rather than copying it, if possible.
#define WORK_QUEUE_THIRDGET   8
 Access the file on the client from a shared filesystem.
#define WORK_QUEUE_THIRDPUT   8
 Access the file on the client from a shared filesystem (included for readability).
#define WORK_QUEUE_MASTER_MODE_STANDALONE   0
 Work Queue master does not report to the catalog server.
#define WORK_QUEUE_MASTER_MODE_CATALOG   1
 Work Queue master reports to catalog server.
#define WORK_QUEUE_WORKER_MODE_SHARED   0
 Work Queue master accepts workers in shared or non-exclusive mode.
#define WORK_QUEUE_WORKER_MODE_EXCLUSIVE   1
 Work Queue master only accepts workers that have a preference for it.

Functions

Functions - Tasks

struct work_queue_taskwork_queue_task_create (const char *full_command)
 Create a new task specification.
void work_queue_task_specify_file (struct work_queue_task *t, const char *local_name, const char *remote_name, int type, int flags)
 Add a file to a task.
void work_queue_task_specify_buffer (struct work_queue_task *t, const char *data, int length, const char *remote_name, int flags)
 Add an input buffer to a task.
void work_queue_task_specify_file_command (struct work_queue_task *t, const char *remote_name, const char *cmd, int type, int flags)
 Add a file created or handled by an arbitrary command to a task (eg: wget, ftp, chirp_get|put).
void work_queue_task_specify_tag (struct work_queue_task *t, const char *tag)
 Attach a user defined logical name to the task.
int work_queue_task_specify_algorithm (struct work_queue_task *t, int alg)
 Further define a task specification.
void work_queue_task_specify_preferred_host (struct work_queue_task *t, const char *hostname)
 Indicate that the task would be optimally run on a given host.
void work_queue_task_delete (struct work_queue_task *t)
 Delete a task specification.
Functions - Queues

struct work_queue * work_queue_create (int port)
 Create a new work queue.
void work_queue_submit (struct work_queue *q, struct work_queue_task *t)
 Submit a job to a work queue.
struct work_queue_taskwork_queue_wait (struct work_queue *q, int timeout)
 Wait for tasks to complete.
int work_queue_hungry (struct work_queue *q)
 Determine whether the queue can support more tasks.
int work_queue_empty (struct work_queue *q)
 Determine whether there are any known tasks queued, running, or waiting to be collected.
int work_queue_port (struct work_queue *q)
 Get the listening port of the queue.
const char * work_queue_name (struct work_queue *q)
 Get the project name of the queue.
void work_queue_get_stats (struct work_queue *q, struct work_queue_stats *s)
 Get queue statistics.
int work_queue_activate_fast_abort (struct work_queue *q, double multiplier)
 Turn on or off fast abort functionality for a given queue.
int work_queue_specify_algorithm (struct work_queue *q, int alg)
 Change the worker selection algorithm for a given queue.
int work_queue_specify_name (struct work_queue *q, const char *name)
 Change the project name for a given queue.
int work_queue_specify_priority (struct work_queue *q, int priority)
 Change the priority for a given queue.
int work_queue_specify_master_mode (struct work_queue *q, int mode)
 Specify the master mode for a given queue.
int work_queue_specify_worker_mode (struct work_queue *q, int mode)
 Specify the worker mode for a given queue.
int work_queue_shut_down_workers (struct work_queue *q, int n)
 Shut down workers connected to the work_queue system.
void work_queue_delete (struct work_queue *q)
 Delete a work queue.
Functions - Deprecated

void work_queue_task_specify_input_buf (struct work_queue_task *t, const char *buf, int length, const char *rname)
 Add an input buffer to a task.
void work_queue_task_specify_input_file (struct work_queue_task *t, const char *fname, const char *rname)
 Add an input file to a task.
void work_queue_task_specify_input_file_do_not_cache (struct work_queue_task *t, const char *fname, const char *rname)
 Add an input file to a task, without caching.
void work_queue_task_specify_output_file (struct work_queue_task *t, const char *rname, const char *fname)
 Add an output file to a task.
void work_queue_task_specify_output_file_do_not_cache (struct work_queue_task *t, const char *rname, const char *fname)
 Add an output file to a task without caching.

Variables

double wq_option_fast_abort_multiplier
 Initial setting for fast abort multiplier upon creating queue.
int wq_option_scheduler
 Initial setting for algorithm to assign tasks to workers upon creating queue .

Detailed Description

A master-worker library.

The work queue provides an implementation of the master-worker computing model using TCP sockets, Unix applications, and files as intermediate buffers. A master process uses work_queue_create to create a queue, then work_queue_submit to submit tasks. Once tasks are running, call work_queue_wait to wait for completion. The generic worker program can be run on any machine, and simply needs to be told the host and port of the master.


Define Documentation

#define WORK_QUEUE_DEFAULT_PORT   9123

Default Work Queue port number.

#define WORK_QUEUE_RANDOM_PORT   -1

Indicate to Work Queue to choose a random open port.

#define WORK_QUEUE_WAITFORTASK   -1

Wait for a task to complete before returning.

#define WORK_QUEUE_SCHEDULE_FCFS   1

Select worker on a first-come-first-serve basis.

#define WORK_QUEUE_SCHEDULE_FILES   2

Select worker that has the most files required by task.

#define WORK_QUEUE_SCHEDULE_TIME   3

Select worker that has has best execution time.

#define WORK_QUEUE_SCHEDULE_DEFAULT   3

Default algorithm (WORK_QUEUE_SCHEDULE_TIME).

#define WORK_QUEUE_SCHEDULE_PREFERRED_HOSTS   4

Select worker from set of preferred hosts.

#define WORK_QUEUE_SCHEDULE_RAND   5

Select a random worker.

#define WORK_QUEUE_INPUT   0

Specify an input object.

#define WORK_QUEUE_OUTPUT   1

Specify an output object.

#define WORK_QUEUE_NOCACHE   0

Do not cache file at execution site.

#define WORK_QUEUE_CACHE   1

Cache file at execution site for later use.

#define WORK_QUEUE_SYMLINK   2

Create a symlink to the file rather than copying it, if possible.

#define WORK_QUEUE_THIRDGET   8

Access the file on the client from a shared filesystem.

#define WORK_QUEUE_THIRDPUT   8

Access the file on the client from a shared filesystem (included for readability).

#define WORK_QUEUE_MASTER_MODE_STANDALONE   0

Work Queue master does not report to the catalog server.

#define WORK_QUEUE_MASTER_MODE_CATALOG   1

Work Queue master reports to catalog server.

#define WORK_QUEUE_WORKER_MODE_SHARED   0

Work Queue master accepts workers in shared or non-exclusive mode.

#define WORK_QUEUE_WORKER_MODE_EXCLUSIVE   1

Work Queue master only accepts workers that have a preference for it.


Function Documentation

struct work_queue_task* work_queue_task_create ( const char *  full_command  )  [read]

Create a new task specification.

Once created, the task may be passed to work_queue_submit.

Parameters:
full_command The shell command line to be executed by the task.
void work_queue_task_specify_file ( struct work_queue_task t,
const char *  local_name,
const char *  remote_name,
int  type,
int  flags 
)

Add a file to a task.

Parameters:
t The task to which to add a file.
local_name The name of the file on local disk or shared filesystem.
remote_name The name of the file at the execution site.
type Must be one of the following values:

  • WORK_QUEUE_INPUT to indicate an input file to be consumed by the task
  • WORK_QUEUE_OUTPUT to indicate an output file to be produced by the task
flags May be zero to indicate no special handling, or any of the or'd together: WORK_QUEUE_NOCACHE, WORK_QUEUE_CACHE, WORK_QUEUE_SYMLINK, WORK_QUEUE_THIRDGET, WORK_QUEUE_THIRDPUT.
void work_queue_task_specify_buffer ( struct work_queue_task t,
const char *  data,
int  length,
const char *  remote_name,
int  flags 
)

Add an input buffer to a task.

Parameters:
t The task to which to add a file.
data The contents of the buffer to pass as input.
length The length of the buffer, in bytes
remote_name The name of the remote file to create.
flags May take the same values as in work_queue_task_specify_file.
void work_queue_task_specify_file_command ( struct work_queue_task t,
const char *  remote_name,
const char *  cmd,
int  type,
int  flags 
)

Add a file created or handled by an arbitrary command to a task (eg: wget, ftp, chirp_get|put).

Parameters:
t The task to which to add a file.
remote_name The name of the file at the execution site.
cmd The command to run on the remote node to retrieve or store the file.
type Must be one of the following values: WORK_QUEUE_INPUT or WORK_QUEUE_OUTPUT.
flags May be zero to indicate no special handling, or any of the following or'd together: WORK_QUEUE_NOCACHE, WORK_QUEUE_CACHE.
void work_queue_task_specify_tag ( struct work_queue_task t,
const char *  tag 
)

Attach a user defined logical name to the task.

This field is not interpreted by the work queue, but simply maintained to help the user track tasks.

Parameters:
t The task to which to add parameters
tag The tag to attach to task t.
int work_queue_task_specify_algorithm ( struct work_queue_task t,
int  alg 
)

Further define a task specification.

Once completed, the task may be passed to work_queue_submit.

Parameters:
t The task to which to add parameters
alg The algorithm to use in assigning a task to a worker. Valid possibilities are defined in this file as "WORK_QUEUE_SCHEDULE_X" values.
void work_queue_task_specify_preferred_host ( struct work_queue_task t,
const char *  hostname 
)

Indicate that the task would be optimally run on a given host.

Parameters:
t The task to which to add parameters
hostname The hostname to which this task would optimally be sent.
void work_queue_task_delete ( struct work_queue_task t  ) 

Delete a task specification.

This may be called on tasks after they are returned from work_queue_wait.

Parameters:
t The task specification to delete.
struct work_queue* work_queue_create ( int  port  )  [read]

Create a new work queue.

Users may modify the behavior of work_queue_create by setting the following environmental variables before calling the function:

  • WORK_QUEUE_PORT: This sets the default port of the queue (if unset, the default is 9123).
  • WORK_QUEUE_LOW_PORT: If the user requests a random port, then this sets the first port number in the scan range (if unset, the default is 1024).
  • WORK_QUEUE_HIGH_PORT: If the user requests a random port, then this sets the last port number in the scan range (if unset, the default is 32767).
  • WORK_QUEUE_NAME: This sets the project name of the queue, which is reported to a catalog server (by default this is unset).
  • WORK_QUEUE_PRIORITY: This sets the priority of the queue, which is used by workers to sort masters such that higher priority masters will be served first (if unset, the default is 10).

If the queue has a project name, then queue statistics and information will be reported to a catalog server. To specify the catalog server, the user may set the CATALOG_HOST and CATALOG_PORT environmental variables as described in catalog_query_create.

Parameters:
port The port number to listen on. If zero is specified, then the default is chosen, and if -1 is specified, a random port is chosen.
Returns:
A new work queue, or null if it could not be created.
void work_queue_submit ( struct work_queue *  q,
struct work_queue_task t 
)

Submit a job to a work queue.

It is safe to re-submit a task returned by work_queue_wait.

Parameters:
q A work queue returned from work_queue_create.
t A task description returned from work_queue_task_create.
struct work_queue_task* work_queue_wait ( struct work_queue *  q,
int  timeout 
) [read]

Wait for tasks to complete.

This call will block until the timeout has elapsed.

Parameters:
q The work queue to wait on.
timeout The number of seconds to wait for a completed task before returning. Use an integer time to set the timeout or the constant WORK_QUEUE_WAITFORTASK to block until a task has completed.
Returns:
A completed task description, or null if the queue is empty or the timeout was reached without a completed task. The returned task must be deleted with work_queue_task_delete or resubmitted with work_queue_submit.
int work_queue_hungry ( struct work_queue *  q  ) 

Determine whether the queue can support more tasks.

Returns the number of additional tasks it can support if "hungry" and 0 if "sated".

Parameters:
q A pointer to the queue to query.
int work_queue_empty ( struct work_queue *  q  ) 

Determine whether there are any known tasks queued, running, or waiting to be collected.

Returns 0 if there are tasks remaining in the system, 1 if the system is "empty".

Parameters:
q A pointer to the queue to query.
int work_queue_port ( struct work_queue *  q  ) 

Get the listening port of the queue.

Parameters:
q The work queue of interest.
Returns:
The port the queue is listening on.
const char* work_queue_name ( struct work_queue *  q  ) 

Get the project name of the queue.

Parameters:
q The work queue of interest.
Returns:
The project name of the queue.
void work_queue_get_stats ( struct work_queue *  q,
struct work_queue_stats s 
)

Get queue statistics.

Parameters:
q The queue to query.
s A pointer to a buffer that will be filed with statistics.
int work_queue_activate_fast_abort ( struct work_queue *  q,
double  multiplier 
)

Turn on or off fast abort functionality for a given queue.

Parameters:
q A pointer to the queue to modify.
multiplier The multiplier of the average task time at which point to abort; if negative (and by default) fast_abort is deactivated.
Returns:
0 if activated or deactivated with an appropriate multiplier, 1 if deactivated due to inappropriate multiplier.
int work_queue_specify_algorithm ( struct work_queue *  q,
int  alg 
)

Change the worker selection algorithm for a given queue.

Parameters:
q A pointer to the queue to modify.
alg The algorithm to use in assigning a task to a worker. Valid possibilities are defined in this file as "WORK_QUEUE_SCHEDULE_X" values.
int work_queue_specify_name ( struct work_queue *  q,
const char *  name 
)

Change the project name for a given queue.

Parameters:
q A pointer to the queue to modify.
name The new project name.
int work_queue_specify_priority ( struct work_queue *  q,
int  priority 
)

Change the priority for a given queue.

Parameters:
q A pointer to the queue to modify.
priority An integer that presents the priorty of this work queue master. The higher the value, the higher the priority.
Returns:
The priority that has been set.
int work_queue_specify_master_mode ( struct work_queue *  q,
int  mode 
)

Specify the master mode for a given queue.

Parameters:
q A pointer to the queue to modify.
mode mode == WORK_QUEUE_MASTER_MODE_STANDALONE: standalone mode. In this mode the master would not report its information to a catalog server; mode == WORK_QUEUE_MASTER_MODE_CATALOG: catalog mode. In this mode the master report itself to a catalog server where workers get masters' information and select a master to serve.
Returns:
The mode that has been set.
int work_queue_specify_worker_mode ( struct work_queue *  q,
int  mode 
)

Specify the worker mode for a given queue.

Parameters:
q A pointer to the queue to modify.
mode mode == WORK_QUEUE_WORKER_MODE_SHARED: shared mode. In this mode the master would accept connections from shared workers; mode == WORK_QUEUE_WORKER_MODE_EXCLUSIVE: exclusive mode. In this mode the master would only accept workers that have specified a preference on it, which are the workers started with "-N name" where name is the name of the queue.
Returns:
The mode that has been set.
int work_queue_shut_down_workers ( struct work_queue *  q,
int  n 
)

Shut down workers connected to the work_queue system.

Gives a best effort and then returns the number of workers given the shut down order.

Parameters:
q A pointer to the queue to query.
n The number to shut down. All workers if given "0".
void work_queue_delete ( struct work_queue *  q  ) 

Delete a work queue.

Parameters:
q The work queue to delete.
void work_queue_task_specify_input_buf ( struct work_queue_task t,
const char *  buf,
int  length,
const char *  rname 
)

Add an input buffer to a task.

Parameters:
t The task to which to add parameters
buf A pointer to the data buffer to send to the worker to be available to the commands.
length The number of bytes of data in the buffer
rname The name of the file in which to store the buffer data on the worker
Deprecated:
Use work_queue_task_specify_buffer instead.
void work_queue_task_specify_input_file ( struct work_queue_task t,
const char *  fname,
const char *  rname 
)

Add an input file to a task.

Parameters:
t The task to which to add parameters
fname The name of the data file to send to the worker to be available to the commands.
rname The name of the file in which to store the buffer data on the worker.
Deprecated:
See work_queue_task_specify_file instead.
void work_queue_task_specify_input_file_do_not_cache ( struct work_queue_task t,
const char *  fname,
const char *  rname 
)

Add an input file to a task, without caching.

Parameters:
t The task to which to add parameters
fname The name of the data file to send to the worker to be available to the commands.
rname The name of the file in which to store the buffer data on the worker.
Deprecated:
See work_queue_task_specify_file instead.
void work_queue_task_specify_output_file ( struct work_queue_task t,
const char *  rname,
const char *  fname 
)

Add an output file to a task.

Parameters:
t The task to which to add parameters
rname The name of a file created by the program when it runs.
fname The name of the file local target for copying rname back.
Deprecated:
See work_queue_task_specify_file instead.
void work_queue_task_specify_output_file_do_not_cache ( struct work_queue_task t,
const char *  rname,
const char *  fname 
)

Add an output file to a task without caching.

Parameters:
t The task to which to add parameters
rname The name of a file created by the program when it runs.
fname The name of the file local target for copying rname back.
Deprecated:
See work_queue_task_specify_file instead.

Variable Documentation

Initial setting for fast abort multiplier upon creating queue.

Turned off if less than 0. Change prior to calling work_queue_create, after queue is created this variable is not considered and changes must be made through the API calls.

Initial setting for algorithm to assign tasks to workers upon creating queue .

Change prior to calling work_queue_create, after queue is created this variable is not considered and changes must be made through the API calls.