R parallel outer product

The goal

Very often in R you can use the outer product of two vectors to fasten the compuation of a generic term of a matrix.

> outer( 1:3, 5:10, function(a,b){a+b} )
     [,1] [,2] [,3] [,4] [,5] [,6]
[1,]    6    7    8    9   10   11
[2,]    7    8    9   10   11   12
[3,]    8    9   10   11   12   13

Remark: functions handled by outer must operate elementwise on array (and the function will be called only one time for the whole). This can be unnatural for some computations. In that case you can do somthing like:

> outer(1:5,1:3, function(a,b){mapply(min,a,b)} )
     [,1] [,2] [,3]
[1,]    1    1    1
[2,]    1    2    2
[3,]    1    2    3
[4,]    1    2    3
[5,]    1    2    3

Note you also can use the Vectorize function instead of mapply

.

But this does not work for more sophisticated arguments as lists:

> outer( as.list(1:3), as.list(5:10), function(a,b){a+b} )
Erreur dans a + b : argument non numérique pour un opérateur binaire

Moreover (and more important) this kind of operation should be automatically parallelized when running in an mpi environment.

The solution

Write you own outer function with a basic environment detection. Remark : this is stupid to use theses function if you have no parallelization and if your lists elements are reducted to a number (in that case use the outer function, it will be faster especially if you write a real pairwise function).

pouter <-
function( listA, listB, myfun,verbose=F,force=F  ) {
	
	if ((Sys.getenv(c("OAR_NODEFILE") ) == "") && !force ){
	###################################################################
	#                  Non MPI part (on oar systems)                  #
	###################################################################
	
	
		answer <- outer( 
			1:length(listA),
			1:length(listB), 
			function(a,b) mapply(myfun,listA[a],listB[b]) 
		)
	
} else {
	###################################################################
	#                           MPI part                              #
	###################################################################
	# This one should also work on a single computer but              #
	# 1/ requires an installation of Rmpi and associated tools        #
	# 2/ sometimes consumes a lot of useless CPU (at least on OSX)    #
	# 3/ is a waste of time if you do not provide more than one slave #
	###################################################################
	
	library("Rmpi")
	
	.Last <- function(){
	    if (is.loaded("mpi_initialize")){
	        if (mpi.comm.size(1) > 0){
	            mpi.close.Rslaves()
	        }
	        .Call("mpi_finalize")
	    }
	}


	mpi.spawn.Rslaves()
	if (mpi.comm.size() < 2) {
    	print("No slave found !")
    	mpi.quit()
    	return
    }
	
	slave <- function() {
	# Note the use of the tag for sent messages: 
    #     1=ready_for_task, 2=done_task, 3=exiting 
    # Note the use of the tag for received messages: 
    #     1=task, 2=done_tasks 
    junk <- 0 
    done <- 0 
    while (done != 1) {
        # Signal being ready to receive a new task 
        mpi.send.Robj(junk,0,1) 
        # Receive a task 
        task <- mpi.recv.Robj(mpi.any.source(),mpi.any.tag()) 
        #print(task)
        
        task_info <- mpi.get.sourcetag() 
        tag <- task_info[2] 	
        #print(tag)
        
        if (tag == 1) {
            # Send a results message back to the master
            results <- list(i=task$i,j=task$j,result=match.fun(getAnywhere(myfun)[[1]])(task$dataI,task$dataJ) )
            mpi.send.Robj(results,0,2)
            }
        else if (tag == 2) {
            done <- 1
            }
        # We'll just ignore any unknown messages
        }
    mpi.send.Robj(junk,0,3)
	}


	# Now, send the data to the slaves
	# mpi.bcast.Robj2slave(traindatas)
	# mpi.bcast.Robj2slave(testdatas)

	# Send the functions to the slaves
	mpi.bcast.Robj2slave(slave)
	mpi.bcast.Robj2slave(myfun)

	# Call the function in all the slaves to get them ready to
	# undertake tasks
	mpi.bcast.cmd(slave())

	#Initialize matrix
	answer <- matrix(0, length(listA), length(listB))
	
	tasks <- unlist(lapply(
				1:length(listA), 
				function(a){lapply(1:length(listB), function(b){list(i=a,j=b)})} 
			  ),recursive=F)

	#main execution loop
	junk <- 0 
	closed_slaves <- 0 
	n_slaves <- mpi.comm.size()-1 
	while (closed_slaves < n_slaves) { 
    # Receive a message from a slave 
    message <- mpi.recv.Robj(mpi.any.source(),mpi.any.tag()) 
    message_info <- mpi.get.sourcetag() 
    slave_id <- message_info[1] 
    tag <- message_info[2] 

    if (tag == 1) { 
        # slave is ready for a task. Give it the next task, or tell it tasks 
        # are done if there are none. 
        if (length(tasks) > 0) { 
            # Send a task, and then remove it from the task list
            task = tasks[[1]]
            task$dataI = listA[[task$i]]
            task$dataJ = listB[[task$j]]
            if (verbose) {
            	cat("Sending ",task$i,task$j,"to slave", slave_id, "\n",file=stderr()) 
             }
             mpi.send.Robj( c( task ) , slave_id, 1);
            
            tasks[[1]]<-NULL
            } 
        else { 
            mpi.send.Robj(junk, slave_id, 2) 
            } 
        } 
    else if (tag == 2) { 
        # The message contains results. Do something with the results. 
        # Store them in the data structure
        answer[message$i,message$j] <-  message$result
		  } 
    else if (tag == 3) { 
        # A slave has closed down. 
        closed_slaves <- closed_slaves + 1 
        } 
    } 

	mpi.close.Rslaves()
}

	return(answer)

}

###################################################################
#                  Start your code here                           #
###################################################################

print ( pouter( as.list(1:3), as.list(1:5), function(a,b){a+b} ) )

###################################################################

Running it on a cluster (at Inria or Grid5000)

master.sh 
---------
#!/bin/bash
lamboot -ssi boot_rsh_agent "oarsh" -v $OAR_FILE_NODES
mpirun.openmpi -np 1 --hostfile $OAR_NODEFILE R --slave -f pouter.R
lamhalt

Do not modify the number of process of master.sh (it must be 1). You can replace pouter.R with the name of your script.

Launch it with (here you can change the number of cores)

oarsub -l /core=5,walltime=72:00:00 -p"cluster='sonic'"  ./master.sh

The code execs normally on your computer and on the cluster (if you have installed Rmpi):

mary@frontale:~/outerR$ cat OAR.45158.std*
n-1<5889> ssi:boot:base:linear: booting n0 (sonic-3)
n-1<5889> ssi:boot:base:linear: booting n1 (sonic-4)
n-1<5889> ssi:boot:base:linear: booting n2 (sonic-5)
n-1<5889> ssi:boot:base:linear: finished
Sending  1 1 to slave 1 
Sending  1 2 to slave 3 
Sending  1 3 to slave 5 
Sending  1 4 to slave 4 
Sending  1 5 to slave 2 
Sending  2 1 to slave 1 
Sending  2 2 to slave 3 
Sending  2 3 to slave 5 
Sending  2 4 to slave 4 
Sending  2 5 to slave 1 
Sending  3 1 to slave 2 
Sending  3 2 to slave 3 
Sending  3 3 to slave 1 
Sending  3 4 to slave 2 
Sending  3 5 to slave 5 

LAM 7.1.2/MPI 2 C++/ROMIO - Indiana University

	5 slaves are spawned successfully. 0 failed.
master (rank 0, comm 1) of size 6 is running on: sonic-3 
slave1 (rank 1, comm 1) of size 6 is running on: sonic-3 
slave2 (rank 2, comm 1) of size 6 is running on: sonic-3 
slave3 (rank 3, comm 1) of size 6 is running on: sonic-4 
slave4 (rank 4, comm 1) of size 6 is running on: sonic-5 
slave5 (rank 5, comm 1) of size 6 is running on: sonic-5 
     [,1] [,2] [,3] [,4] [,5]
[1,]    2    3    4    5    6
[2,]    3    4    5    6    7
[3,]    4    5    6    7    8

LAM 7.1.2/MPI 2 C++/ROMIO - Indiana University

Final remark: you can write you own parallel mapply function using the same ideas. But in that case your should read about mpi.iapply related functions (there is even one specific to monte carlo replication on cluster)

Design selector