Using Remote Clusters with R

After reading my earlier blog post about running asynchronous R calls on a remote server, you probably got pumped at the idea of “nested futures”, remote clusters, or my use of the marquee HTML tag. Regardless of your excitement, it’s time to find out how you can take your parallel processing game to the next level.

This post is meant for two purposes: the first is to document an example of using remote clusters with R, and the second is to serve as instructions/reference for my lab members at Rochester.

Our issue last time was that we wanted to harness the power of the cluster, but could only connect to the cluster one a gateway/login node. But what if we set up a “remote” plan into the login/gateway to the cluster, and inside that future we had it establish a future plan to the other nodes on the cluster? What if we basically just “nested” these futures?

Turns out, yup, we can totally do that. But before we do, let’s make sure that we can actually use the other nodes in the cluster. This next section is primarily for my @Labmates, but could be useful for others using similar clusters.

Cluster set up

@Labmates: log in to the cluster through “cycle1.cs.rochester.edu” or however you’ve been doing it. There are a bunch of “nodes” in this cluster you can get to now that you’re in. You can connect to them once you’re in by doing ssh node<N> where <N> is the number of the node you want to connect to. I think theoretically it’s any number between 33 and 64, but in practice it seems like it’s a random subset of that1. If you try to connect and it just hangs for a while, you probably won’t be able to connect and should just try a different node. When you find one that will actually work it should say something like:

The authenticity of host 'node61 (XXX.XXX.XX.XXX)' can't be established.
ECDSA key fingerprint is SHA256:<random_string>.
ECDSA key fingerprint is MD5:<lots_of_hexadecimals_separated_by_colons>.
Are you sure you want to continue connecting (yes/no)? 

Type in “yes” and it should say: Warning: Permanently added 'node61,XXX.XXX.XX.XX' (ECDSA) to the list of known hosts.. You’ll probably need to add all the nodes you’ll be using to that list of known hosts. After a few times of doing this, you’ll start getting annoyed about having to enter your password each time2. So let’s change that.

SSH key authentication

@Labmates: I have to admit, you also might be able to use ssh-askpass in such a way that you don’t need to type your password each time you log in to the remote server. In my original troubleshooting, I removed my ability to log in without a password prompt, and I never went back and tried to see if I could get both working at the same time. I had followed the directions here. We’ll be doing something similar, but for connections between the nodes of the cluster.

The way the cluster works means that your home directory is the same across all nodes–change something on one node and it changes it on all of them. So loosely following the directions (but for Linux), we’ll do the following to establish a directory for our SSH keys (after logging in to any node on the cluster):

mkdir ~/.ssh
cd ~.ssh/
chmod 0700 ~/.ssh

Generate new keys as you did before (ssh-keygen -t rsa -b 2048) and give it the default name. Now, since all the nodes will share the same ~/.ssh files, just copy the public key and name it “authorized_keys” (i.e., cp ~/.ssh/id_rsa.pub ~/.ssh/authorized_keys). You should now be able to ssh between nodes without having to retype your password.

Now that we got that out of the way, we need to (sadly) make sure each node has the R packages we need. (Fortunately) that’s not quite as tedious as it sounds

Getting the R environments set up

In order to use R packages on the cluster, you need to make sure they’re installed on all the computers and nodes you’ll be using. Go ahead and install at least future, parallel, and listenv.

@Labmates: Until sometime in August, the nodes in the cluster won’t have all the R packages we want. So until then, you’ll have to install the packages you need on each node. install.packages() should put them in your home directory, which is great because it means you won’t have to do it individually for each node. However not all nodes run the same version of R (3.3.0–3.4.4) I think, and many packages (maybe all?) don’t work across 3.3.x and 3.4.x, so you’ll have to install it at least twice. When you ssh into a node, run R and see what version it is (it should tell you on start-up), and make sure you install all the packages on one running 3.3.x and one running 3.4.x.

Now, let’s get back to the good stuff.

Nested futures (or “future topologies”)

Yes, you can embed futures in other futures. Check out this vignette for an intro and simple demonstration. Specifically, check out the section entitled “Example: A remote compute cluster,” which fits our situation almost perfectly3.

Proof-of-concept demonstration

Let’s go over a simple demonstration of my own making first:

library(future)
library(parallel)

# Remember to use these!
Sys.setenv(PATH=paste0(Sys.getenv('PATH'), ':/Applications/RStudio.app/Contents/MacOS/postback/'))
Sys.setenv(SSH_ASKPASS='/usr/local/bin/ssh-askpass')

# Establish a remote persistent connection with a login node
plan(remote, workers = c("zburchil@cycle2.cs.rochester.edu"))

# Make `x` a future with the plan being the login node
x %<-% {
  # Since this is being run "in" the cluster, you can now use the
  #   "cluster" plan on the other nodes
  plan(cluster, 
       # Since your username is the same within the cluster, you 
       #    don't need to specify it
       workers=c("node33", "node34")
       )
  # `future::future_lapply` is actually deprecated in favor of 
  #   `future.apply::future_lapply`, but I'm not going to have 
  #   you install another package for a quick demo.
  # This basically gets the hostname of the computer being used 
  #   4 times using whatever `plan` you set
  xx <- future::future_lapply(1:4, function(x) {  Sys.info()[["nodename"]] })
  xx 
}

# Check to see if x has its values yet
resolved(futureOf(x))
## [1] TRUE
x
## [[1]]
## [1] "node33.cs.rochester.edu"
## 
## [[2]]
## [1] "node33.cs.rochester.edu"
## 
## [[3]]
## [1] "node34.cs.rochester.edu"
## 
## [[4]]
## [1] "node34.cs.rochester.edu"

If the execution hangs, it’s probably due to some difficulty connecting to the nodes/gateway. Make sure you can connect to the nodes you’re using via ssh if something goes wrong.

Important note

I don’t claim to 100% understand most of how future works yet, but note that you don’t need to explicitly load the libraries and define the functions you’re using in the global environment in the nested futures.

For example, I tried loading and purrr in the global environment and put map(xx, ~paste0(., "!")) at the end of the future and when it ran on the remote node it knew to use purrr::map. I’m guessing this won’t work if the package isn’t installed on the remote nodes, but you can try that out yourself.

future’s built-in method

But the creator of future (Henrik Bengtsson) has already anticipated situations similar to ours. future can let you embed “plans”” within plans from the master computer. This example is adapted from one of his4.

This “future topology” makes use of three layers—the remote connection to the login node, the cluster connection to the cluster nodes, and a “multiprocess” plan that lets you use the multiple cores on each cluster in parallel. From this example it seems that giving plan() a list of strategies will be interpreted as a nested series of future strategies.

library("future")
library("listenv")

Sys.setenv(PATH=paste0(Sys.getenv('PATH'), ':/Applications/RStudio.app/Contents/MacOS/postback/'))
Sys.setenv(SSH_ASKPASS='/usr/local/bin/ssh-askpass')


# Set up access to remote login node
#   "tweak" is basically a way of saving a set of specific arguments to `plan`
login <- tweak(remote, workers = "zburchil@cycle1.cs.rochester.edu")

# Specify future topology
# login node -> { cluster nodes } -> { multiple cores }
plan(list(
  login,
  tweak(cluster, workers = c("node33", "node34")),
  # the 'multiprocess' plan uses a machine's cores for parallel processing 
  multiprocess
))

# This is Henrik's example. It's not the way that I would 
#   have thought to do it (I've never used `listenv`), but 
#   it makes sense if you think about how futures
#   would need to be called in parallel.

# (a) This will be evaluated on the cluster login computer
x %<-% {
  # Gets the login node's hostname
  thost <- Sys.info()[["nodename"]]
  # Gets the process id of what's running on the login none
  tpid <- Sys.getpid()
  # `listenv` basically makes a mini-environment that's a 
  #   little like a list.  I'm interpreting its use here to 
  #   be something that you can assign multiple futures to 
  #   without needing to check their values until you do 
  #   the `Reduce()` at the end.
  y <- listenv()
  for (task in 1:4) {
    # (b) This will be evaluated on a compute node on the cluster
    y[[task]] %<-% {
      # Gets the cluster node's hostname
      mhost <- Sys.info()[["nodename"]]
      # Gets the process id running on the cluster node
      mpid <- Sys.getpid()
      z <- listenv()
      for (jj in 1:2) {
        # (c) These will be evaluated in separate processes 
        #   on the same compute node
        z[[jj]] %<-% data.frame(task = task,
                                top.host = thost, top.pid = tpid,
                                mid.host = mhost, mid.pid = mpid,
                                host = Sys.info()[["nodename"]],
                                pid = Sys.getpid())
      }
      Reduce(rbind, z)
    }
  }
  Reduce(rbind, y)
}

print(x)
##   task                top.host top.pid                mid.host
## 1    1 cycle1.cs.rochester.edu    9156 node33.cs.rochester.edu
## 2    1 cycle1.cs.rochester.edu    9156 node33.cs.rochester.edu
## 3    2 cycle1.cs.rochester.edu    9156 node34.cs.rochester.edu
## 4    2 cycle1.cs.rochester.edu    9156 node34.cs.rochester.edu
## 5    3 cycle1.cs.rochester.edu    9156 node33.cs.rochester.edu
## 6    3 cycle1.cs.rochester.edu    9156 node33.cs.rochester.edu
## 7    4 cycle1.cs.rochester.edu    9156 node33.cs.rochester.edu
## 8    4 cycle1.cs.rochester.edu    9156 node33.cs.rochester.edu
##   mid.pid                    host   pid
## 1    3422 node33.cs.rochester.edu  3468
## 2    3422 node33.cs.rochester.edu  3469
## 3   16543 node34.cs.rochester.edu 16586
## 4   16543 node34.cs.rochester.edu 16588
## 5    3422 node33.cs.rochester.edu  3470
## 6    3422 node33.cs.rochester.edu  3471
## 7    3422 node33.cs.rochester.edu  3472
## 8    3422 node33.cs.rochester.edu  3474

So that’s it! You need to be careful about how you’re coding these futures given their asynchronicity and how future() works though. You can use listenv()s like Henrik does here, or you can use packages and functions that take care of that stuff for you, such future.apply. future also plays nice with a bunch of other packages, and Henrik has a super helpful blog post about how to connect them.

To note

Here’s what you should know setting out on your own:



Footnotes

  1. I think it might be that when nodes are particularly busy, it just takes too long to establish a connection. But I don’t really know. I’m talking with people about why this is. 

  2. Especially if it’s 24+ characters long, like I foolishly made mine. 

  3. I find more often than not that I only find a good answer online after I’ve basically solved the problem. I put at least an hour or two of effort into getting a proof-of-concept remote cluster working with the login nodes only to come to the conclusion that it was pretty infeasible due to pretty esoteric/undocumented socket reasons. The remote computers being used as clusters (via plan(cluster, ...)) need to be able to open connections back to the master computer, which is super simple when that “master” is already on the cluster, but not when it’s my local computer. This differs from the remote connection of plan(remote, ...) in that the non-cluster remote connection is “persistent”. 

  4. Running his code OOTB didn’t actually work for me. Notice that his example gets the node cluster explicitly with parallel::makeCluster() before starting the future topology. When I try this, I get: Error in summary.connection(con) : invalid connection, which I believes comes from parallel. Instead, I just use future’s default clustering method, and just call the worker names in the tweak

  5. I literally got 400+ likes and spawned a number of super-interesting threads just for recommending a single function. These people are thirsty (for knowledge). 

  Buy me a beer? Litecoin address: LaiZUuF4RY3PkC8VMFLu3YKvXob7ZGZ5o3