Work Queue is Copyright (C) 2009 The University of Notre Dame. This software is distributed under the GNU General Public License. See the file COPYING for details.
Work Queue is a framework for building master/worker applications. In Work Queue, a Master process is a custom, application-specific program that uses the Work Queue API to define and submit a large number of small tasks. The tasks are executed by many Worker processes, which can run on any available machine. A single Master may direct hundreds to thousands of Workers, allowing users to easily construct highly scalable programs.
Work Queue is a stable framework that has been used to create highly scalable scientific applications in biometrics, bioinformatics, economics, and other fields. It can also be used as an execution engine for the Makeflow workflow engine.
Work Queue is part of the Cooperating Computing Tools. You can download the CCTools from this web page, follow the installation instructions, and you are ready to go. From the same website, or from within the CCTools you can view documentation for the full set features of the Work Queue API.
We assume that you have already downloaded and installed the cctools in the directory CCTOOLS. Next, download work_queue_example.c and compile it like this:
gcc work_queue_example.c -o work_queue_example -I${CCTOOLS}/include/cctools -L${CCTOOLS}/lib -ldttools -lmThis example program simply compresses a bunch of files in parallel. List the files to be compressed on the command line. Each will be transmitted to a remote worker, compressed, and then sent back to the master. (This isn't necessarily faster than doing it locally, but it is easy to run.) For example, to compress files a, b, and c, run this:
./work_queue_example a b cYou will see this right away:
listening on port 9123... submitted task: /usr/bin/gzip < a > a.gz submitted task: /usr/bin/gzip < b > b.gz submitted task: /usr/bin/gzip < c > c.gz waiting for tasks to complete...The master is now waiting for workers to connect and begin requesting work. (Without any workers, it will wait forever.) You can start one worker on the same machine by opening a new shell and running:
work_queue_worker MACHINENAME 9123(Obviously, substitute the name of your machine for MACHINENAME.) If you have access to other machines, you can ssh there and run workers as well. In general, the more you start, the faster the work gets done. If a worker should fail, the work queue infrastructure will retry the work elsewhere, so it is safe to submit many workers to an unreliable system.
If you have access to a Condor pool, you can use this shortcut to submit ten workers at once via Condor:
% condor_submit_workers MACHINENAME 9123 10 Submitting job(s).......... Logging submit event(s).......... 10 job(s) submitted to cluster 298.Or, if you have access to an SGE cluster, do this:
% sge_submit_workers MACHINENAME 9123 10 Your job 153083 ("worker.sh") has been submitted Your job 153084 ("worker.sh") has been submitted Your job 153085 ("worker.sh") has been submitted ...
When the master completes, if the workers were not shut down in the master, your workers will still be available, so you can either run another master with the same workers, or you can remove the workers with kill, condor_rm, or qdel as appropriate. If you forget to remove them, they will exit automatically after fifteen minutes. (This can be adjusted with the -t option to worker.)
q = work_queue_create(port); for(all tasks) { t = work_queue_task_create(command); /* add to the task description */ work_queue_submit(q,t); } while(!work_queue_empty(q)) { t = work_queue_wait(q); work_queue_task_delete(t); } work_queue_delete(q);First create a queue that is listening on a particular TCP port:
q = work_queue_create(port);The master then creates tasks to submit to the queue. Each task consists of a command line to run and a statement of what data is needed, and what data will be produced by the command. Input data can be provided in the form of a file or a local memory buffer. Output data can be provided in the form of a file or the standard output of the program. It is also required to specify whether the data, input or output, need to be cached at the worker site for later use. In the example, we specify a command that takes a single input file, produces a single output file, and requires both files to be cached:
t = work_queue_task_create("/usr/bin/gzip < infile > outfile.gz"); work_queue_task_specify_file(t,"infile","infile",WORK_QUEUE_INPUT,WORK_QUEUE_CACHE); work_queue_task_specify_file(t,"outfile.gz","outfile.gz",WORK_QUEUE_OUTPUT,WORK_QUEUE_CACHE);If a file does not need to be cached at the execution site to avoid wasteful strorage, it can be specified so:
work_queue_task_specify_file(t,"outfile.gz","outfile.gz",WORK_QUEUE_OUTPUT,WORK_QUEUE_NOCACHE);You can also run a program that is not necessarily installed at the remote location, by specifying it as an input file. If the file is installed on the local machine, then specify the full local path, and the plain remote path. For example:
t = work_queue_task_create("./my_compress_program < infile > outfile.gz"); work_queue_task_specify_file(t,"/usr/local/bin/my_compress_program","my_compress_program",WORK_QUEUE_INPUT,WORK_QUEUE_CACHE); work_queue_task_specify_file(t,"infile","infile",WORK_QUEUE_INPUT,WORK_QUEUE_CACHE); work_queue_task_specify_file(t,"outfile.gz","outfile.gz",WORK_QUEUE_OUTPUT,WORK_QUEUE_CACHE);Once a task has been fully specified, it can be submitted to the queue:
work_queue_submit(q,t);Next, wait for a task to complete, stating how long you are willing to wait for a result, in seconds. (If no tasks have completed by the timeout, work_queue_wait will returns null.)
t = work_queue_wait(q,5);A completed task will have its output files written to disk. You may examine the standard output of the task in t->output and the exit code in t->exit_status When you are done with the task, delete it:
work_queue_task_delete(t);Continue submitting and waiting for tasks until all work is complete. You may check to make sure that the queue is empty with work_queue_empty When all is done, delete the queue:
work_queue_delete(q);Full details of all of the Work Queue functions can be found in the Work Queue API.
work_queue_task_specify_file(t,"infile.$OS.$ARCH","infile",WORK_QUEUE_INPUT,WORK_QUEUE_CACHE)This will transfer infile.Linux.x86_64 to workers running on a Linux system with an x86_64 architecture and infile.Cygwin.i686 to workers on Cygwin with an i686 architecture.