Simple Network of Workstations for R

Luke Tierney
Department of Statistics and Actuarial Science
University of Iowa

The snow (Simple Network of Workstations) package implements a simple mechanism for using a collection of workstations or a Beowulf cluster for ``embarrassingly parallel'' computations in R. The interface, which is based in part on the Python CoW (Cluster of Workstations) package, is intended to be quite simple, and is designed so that it can be implemented on top of several different lower level communication mechanisms. Three low level interfaces have been implemented, one using PVM via the rpvm package by Li and Rossini, one using MPI via the Rmpi package by Hao Yu, and one using raw sockets that may be useful if PVM and MPI are not available. This note describes how to start up a cluster, the basic functions for computing with a cluster, and an example of using the cluster for parallel bootstrapping.

Two notes of caution:

Starting and Stopping a Cluster

Starting a workstation cluster is the only step in using a cluster that depends explicitly on the underlying communication mechanism. A cluster is started by calling the makeCluster function, but the details of the call currently vary slightly depending on the type of cluster. PVM and MPI clusters may also need some preliminary preparations to start the PVM or MPI systems.

Starting a PVM Cluster

To start a PVM (Parallel Virtual Machine) cluster you first need to start PVM. I prefer to do this using the PVM console pvm:

<starting PVM using the pvm console>=
luke@itasca cluster% pvm
pvm> add itasca
add itasca
0 successful
                    HOST     DTID
                  itasca Duplicate host
pvm> add owasso
add owasso
1 successful
                    HOST     DTID
                  owasso    80000
pvm> conf
conf
2 hosts, 2 data formats
                    HOST     DTID     ARCH   SPEED       DSIG
                  itasca    40000    LINUX    1000 0x00408841
                  owasso    80000   DARWIN    1000 0x0658eb59
pvm> 

The parallel virtual machine configuration shown by the conf command consists of two hosts, itasca running i386 Linux and owasso running Mac OS X.

As an alternative to the command line console pvm you can also use the graphical console xpvm, if it is installed, or you can start PVM from R using functions provided by the rpvm package.

Once PVM is running, to use a cluster of two worker nodes you would start R on the master, load the snow package, and call makeCluster with arguments 2, the number of worker nodes, and type = "PVM":

<starting a PVM cluster>=
cl <- makeCluster(2, type = "PVM")

The returned value is a list of references to these two processes.

Currently the default cluster type is "PVM" if the rpvm package is available and Rmpi is not already loaded, so the type = "PVM" argument can be omitted.

Once you are done with PVM remember to shut it down, for example with the halt command in the pvm console.

Starting an MPI Cluster

If you are using an MPI system such as LAM that supports the MPI 2 process spawning API then you can create MPI clusters along the same lines as socket or PVM clusters. A different approach is needed if your MPI system does not support process spawning. You may need to start MPI first, for example using lamboot; the details depend on your MPI system.

Once you are done with MPI remember to shut it down if yur MPI requires this; for LAM-MPI, for example, use lamhalt.

Spawned MPI Clusters

MPI clusters for MPI systems that support process spawning can be started with

<starting an MPI cluster with process spawning>=
cl <- makeCluster(2, type = "MPI")

Currently if rpvm is not available but Rmpi is available then the default cluster type is "MPI" and the type = "MPI" argument can be omitted.

MPI Clusters without Spawning

If your MPI system does not support process spawning, or if you prefer to run MPI jobs with mpirun, you will need to install Rmpi accordingly, and you need to install the RMPISNOW shell script from the installed snow package or inst directory of the package sources in an appropriate location, preferably on your path. If RMPISNOW is in your path you can then start R with

<starting R with mpirun>=
mpirun -np 3 RMPISNOW

This starts a master and two worker processes. A cluster is created in the master process. You can obtain a reference to the cluster with

<obtaining a cluster started with mpirun>= [D->]
cl <- getMPIcluster()

With recent versins of snow You can also obtain a reference to the running cluster with

<obtaining a cluster started with mpirun>+= [<-D->]
cl <- makeCluster()

or, for a cluster with two workers, with

<obtaining a cluster started with mpirun>+= [<-D]
cl <- makeCluster(2)

Starting a Socket Cluster

To start a socket cluster you use makeCluster and specify type="SOCK" and a list of machines to use. For example,

<starting a socket cluster>=
cl <- makeCluster(c("itasca", "owasso"), type = "SOCK")

creates a cluster consisting of two R processes, one running on a machine named itasca and the other on owasso. The returned value is a list of references to these two processes.

Additional Cluster Creation Arguments

The cluster creation function makeCluster accepts a number of additional named arguments for specifying options for the node creation processes. The most useful arguments are type for specifying the cluster type, homogeneous for specifying whether the startup process should assume that worker hosts have the same architecture and file system layout as the master (see Section [->] for more on heterogeneous clusters), and outfile for specifying where output from worker processes is to be sent. By default outfile is /dev/null. For debugging purposes, especially during initial configuration, it can be useful to redirect output to a proper file.

An alternative to specifying additional cluster options with each cluster creation call is to use setDefaultClusterOptions to change the defaults. For example, to specify that the default should be an inhomogeneous MPI cluster use

setDefaultClusterOptions(type="MPI", homogeneous = FALSE)

Stopping a Cluster

To stop a cluster you should use

<stopping a cluster>=
stopCluster(cl)

Socket clusters should stop automatically when the process that created them terminates; however, it is still a good idea to call stopCluster. If you do not call stopCluster to terminate a PVM cluster before the creating process exits you can use the halt command in the PVM console. Your MPI system may provide a similar facility; for LAM you can use lamhalt or lamwipe.

Inhomogeneous Systems

[*] The default option settings for cluster creation are intended for homogeneous systems where the R shell script and the snow library live at the same absolute paths on all nodes. snow need not be installed in the system library, but, for the defaults to work, it has to be in one of the libraries specified in the R_LIBS environment variable for the master.

For inhomogeneous systems, such as systems with differing file system layouts or systems where different architectures share a common file system, some additional configuration is needed:

  1. On each machine used to run worker processes copy the script RunSnowNode from the installed snow to a location on the search path for that machine and make sure it is executable.
  2. On the master machine and on each machine used to run worker processes set the environment variable R_SNOW_LIB to the name of the directory that contains the snow package and supporting packages such as rpvm, Rmpi, or rlecuyer for the appropriate architecture.
  3. Make sure the appropriate version of R is available on the search path on each machine.
Currently setting the R_SNOW_LIB variable to a nonempty value on the master causes the default setting of the homogeneous cluster option to be FALSE; otherwise, it is TRUE.

Using the Cluster

Some Basic Functions

The basic functions for using a cluster are clusterCall and clusterApply. clusterCall calls a specified function with identical arguments on each member of the cluster and returns a list of the results. The calls are executed in parallel. For example, we can ask the nodes for their names and machine types:

<R session>= [D->]
> clusterCall(cl, function() Sys.info()[c("nodename","machine")])
[[1]]
               nodename                 machine 
"owasso.stat.uiowa.edu"       "Power Macintosh" 

[[2]]
nodename  machine 
"itasca"   "i686" 

A useful variation of clusterCall is clusterEvalQ, defined as

<definition of clusterEvalQ>=
clusterEvalQ<-function(cl, expr)
    clusterCall(cl, eval, substitute(expr), env=.GlobalEnv)

This can be used, for example, to load a package on each cluster node with

<loading a library on all nodes>=
clusterEvalQ(cl, library(boot))

clusterApply is a version of lapply that evaluates each call on a different member of the cluster. It requires that the number of nodes in the cluster be at least as large as the number of elements in the list argument. A simple example:

<R session>+= [<-D->]
> clusterApply(cl, 1:2, get("+"), 3)
[[1]]
[1] 4

[[2]]
[1] 5

Random Number Generation

The default random number generators are likely to be quite correlated:

<R session>+= [<-D->]
> clusterCall(cl, runif, 3)
[[1]]
[1] 0.08496597 0.35232298 0.60300751

[[2]]
[1] 0.08496597 0.35232298 0.60300751

A quick and (very) dirty way of addressing this is random seeding, using something like

<random seeding of cluster generators>=
clusterApply(cl, runif(length(cl),max=10000000), set.seed)

A better approach is to use a parallel random number genarator package. Several are available. By default snow uses therlecuyer package, which provides an interface to a parallel random number generator package of L'Ecuyer, Simard, Chen, and Kelton. The function clusterSetupRNG handles the initialization. The default call with no additional arguments uses a random seed; named arguments described in the help page can be used for more control:

<R session>+= [<-D->]
> clusterSetupRNG(cl)
> clusterCall(cl, runif, 3)
[[1]]
[1] 0.749391854 0.007316102 0.152742874

[[2]]
[1] 0.8424790 0.8896625 0.2256776

A Bootstrap Example

The boot package includes an example using the data nuclear. The setup code, given in the boot help page, is

<bootstrap setup>=
library(boot)
#  In this example we show the use of boot in a prediction from 
#  regression based on the nuclear data.  This example is taken 
#  from Example 6.8 of Davison and Hinkley (1997).  Notice also 
#  that two extra arguments to statistic are passed through boot.
data(nuclear)
nuke <- nuclear[,c(1,2,5,7,8,10,11)]
nuke.lm <- glm(log(cost)~date+log(cap)+ne+ ct+log(cum.n)+pt, data=nuke)
nuke.diag <- glm.diag(nuke.lm)
nuke.res <- nuke.diag$res*nuke.diag$sd
nuke.res <- nuke.res-mean(nuke.res)

#  We set up a new dataframe with the data, the standardized 
#  residuals and the fitted values for use in the bootstrap.
nuke.data <- data.frame(nuke,resid=nuke.res,fit=fitted(nuke.lm))

#  Now we want a prediction of plant number 32 but at date 73.00
new.data <- data.frame(cost=1, date=73.00, cap=886, ne=0,
                       ct=0, cum.n=11, pt=1)
new.fit <- predict(nuke.lm, new.data)

nuke.fun <- function(dat, inds, i.pred, fit.pred, x.pred) {
     assign(".inds", inds, envir=.GlobalEnv)
     lm.b <- glm(fit+resid[.inds] ~date+log(cap)+ne+ct+
                 log(cum.n)+pt, data=dat)
     pred.b <- predict(lm.b,x.pred)
     remove(".inds", envir=.GlobalEnv)
     c(coef(lm.b), pred.b-(fit.pred+dat$resid[i.pred]))
}

On a single workstation the actual bootstrap example takes approximately half a minute:

<R session>+= [<-D->]
> system.time(nuke.boot <-
+             boot(nuke.data, nuke.fun, R=999, m=1,
+                  fit.pred=new.fit, x.pred=new.data))
[1] 26.32  0.71 27.02  0.00  0.00

Using a two-machine cluster (identical i686 nodes this time) is approximately twice as fast:

<R session>+= [<-D->]
> clusterEvalQ(cl, library(boot))
> system.time(cl.nuke.boot <-
+             clusterCall(cl,boot,nuke.data, nuke.fun, R=500, m=1,
+                         fit.pred=new.fit, x.pred=new.data))
[1]  0.01  0.00 14.27  0.00  0.00

Using a five node cluster takes about 6 seconds:

<R session>+= [<-D]
> cl<-makeCluster(5)
> clusterEvalQ(cl,library(boot))
> system.time(cl.nuke.boot <-
+             clusterCall(cl,boot,nuke.data, nuke.fun, R=200, m=1,
+                         fit.pred=new.fit, x.pred=new.data))
[1] 0.03 0.00 5.58 0.00 0.00

The value to use comparing speeds is the third one, the elapsed time.

Some Higher Level Functions

Higher level functions defined in terms of clusterApply include parLapply, parSapply, and parApply. These are parallel versions of lapply, sapply, and apply. Some examples:

<higher level examples>= [D->]
sum(parApply(cl, matrix(1:100,10), 1, sum))
sum(parApply(cl, matrix(1:10000,100), 1, sum))

<higher level examples>+= [<-D->]
x<-1:100/101
system.time(qtukey(x, 2, df=2))
system.time(unlist(parLapply(cl, x, qtukey, 2, df=2)))

A very simple version of a parallel matrix multiply is also provides as an illustration.

<higher level examples>+= [<-D]
A<-matrix(rnorm(10000),100)
system.time(A %*% A)
system.time(parMM(cl,A , A))

Comments

Some points to keep in mind:

Open Issues

Error Handling

Error handling is currently very primitive. All evaluations on worker nodes are carried out using try. Results returned can therefore either be legitimate results or try errors. In principle user code should check for this.

The right way to handle situations where some nodes signal errors and some do not is not obvious and can depend on the application. Once R obtains a more sophisticated error handling mechanism it will be possible to build on that mechanism to provide flexible user control over the handling of errors in snow.

Interrupt Handling

Ideally, an interrupt on the master should be propagated to the worker nodes and interrupt any calculations currently in progress. This does not happen. In fact, an interrupt on the master during a snow computation will most likely leave the communication infrastructure in a confused state that makes further use of the cluster within the current R session impossible. The temporary solution: don't interrupt a snow computation. This is clearly not a satisfactory solution. Once R obtains a mechanism for controlling the handling of interrupts it sill be possible to do somewhat better. Whether it sill be possible to actually propagate interrupts to worker nodes is not completely clear and may depend on operating system considerations.

*