R: Understanding the impact of “makePSOCKcluster”

Solution for R: Understanding the impact of “makePSOCKcluster”
is Given Below:

I am using the R programming language. Recently, I came across the following function in R makePSOCKcluster, which can be used to “accelerate” the speed at which R can perform certain tasks (https://www.rdocumentation.org/packages/future/versions/1.19.1/topics/makeClusterPSOCK).

Based on some posts that I have seen, it seems like the makePSOCKcluster acts like a “wrapper” in which you place your code that would like to “accelerate” , for example:

library(doParallel)
library(future)

cl <- makePSOCKcluster(6) # 6 cpu cores out of 8

registerDoParallel(cl)

### enter your code here

stopCluster(cl) # when finished`

I tried to adapt this setup to help accelerate an R procedure I am using:

cl <- makePSOCKcluster(6) # 6 cpu cores out of 8

registerDoParallel(cl)

### START

library(dplyr)
library(data.table)

results_table <- data.frame()

grid_function <- function(train_data, random_1, random_2, random_3, random_4, split_1, split_2, split_3) {
    
    
    
    #bin data according to random criteria
    train_data <- train_data %>% mutate(cat = ifelse(a1 <= random_1 & b1 <= random_3, "a", ifelse(a1 <= random_2 & b1 <= random_4, "b", "c")))
    
    train_data$cat = as.factor(train_data$cat)
    
    #new splits
    a_table = train_data %>%
        filter(cat == "a") %>%
        select(a1, b1, c1, cat)
    
    b_table = train_data %>%
        filter(cat == "b") %>%
        select(a1, b1, c1, cat)
    
    c_table = train_data %>%
        filter(cat == "c") %>%
        select(a1, b1, c1, cat)
    
    
    #calculate random quantile ("quant") for each bin
    
    table_a = data.frame(a_table%>% group_by(cat) %>%
                             mutate(quant = quantile(c1, prob = split_1)))
    
    table_b = data.frame(b_table%>% group_by(cat) %>%
                             mutate(quant = quantile(c1, prob = split_2)))
    
    table_c = data.frame(c_table%>% group_by(cat) %>%
                             mutate(quant = quantile(c1, prob = split_3)))
    
    
    
    
    #create a new variable ("diff") that measures if the quantile is bigger tha the value of "c1"
    table_a$diff = ifelse(table_a$quant > table_a$c1,1,0)
    table_b$diff = ifelse(table_b$quant > table_b$c1,1,0)
    table_c$diff = ifelse(table_c$quant > table_c$c1,1,0)
    
    #group all tables
    
    final_table = rbind(table_a, table_b, table_c)
    
    #create a table: for each bin, calculate the average of "diff"
    final_table_2 = data.frame(final_table %>%
                                   group_by(cat) %>%
                                   summarize(
                                       mean = mean(diff)
                                   ))
    
    #add "total mean" to this table
    final_table_2 = data.frame(final_table_2 %>% add_row(cat = "total", mean = mean(final_table$diff)))
    
    #format this table: add the random criteria to this table for reference
    final_table_2$random_1 = random_1
    
    final_table_2$random_2 = random_2
    
    final_table_2$random_3 = random_3
    
    final_table_2$random_4 = random_4
    
    final_table_2$split_1 = split_1
    
    final_table_2$split_2 = split_2
    
    final_table_2$split_3 = split_3
    
    
    
    
    results_table <- rbind(results_table, final_table_2)
    
    final_results = dcast(setDT(results_table), random_1 + random_2 + random_3 + random_4 + split_1 + split_2 + split_3 ~ cat, value.var="mean")
    
}

# create some data for this example
a1 = rnorm(1000,100,10)
b1 = rnorm(1000,100,5)
c1 = sample.int(1000, 1000, replace = TRUE)
train_data = data.frame(a1,b1,c1)




#grid
random_1 <- seq(80,100,5)
random_2 <- seq(85,120,5)
random_3 <- seq(85,120,5)
random_4 <- seq(90,120,5)
split_1 =  seq(0,1,0.1)
split_2 =  seq(0,1,0.1)
split_3 =  seq(0,1,0.1)
DF_1 <- expand.grid(random_1 , random_2, random_3, random_4, split_1, split_2, split_3)

#reduce the size of the grid for this example
DF_1 = DF_1[1:100000,]

colnames(DF_1) <- c("random_1" , "random_2", "random_3",                     "random_4", "split_1", "split_2", "split_3")

train_data_new <- copy(train_data)


resultdf1 <- apply(DF_1,1, # 1 means rows
                   FUN=function(x){
                       do.call(
                           # Call Function grid_function2 with the arguments in
                           # a list
                           grid_function,
                           # force list type for the arguments
                           c(list(train_data_new), as.list(
                               # make the row to a named vector
                               unlist(x)
                           )
                           ))
                   }
)

l = resultdf1
final_output = rbindlist(l, fill = TRUE)

### END

# when finished`
stopCluster(cl) 

If I were to run the above code “locally”, it would take a very long time to run. I ran the above code using the “makePSOCKcluster” wrapper – the code is still running.

Question: I am not sure if the “makePSOCKcluster” will actually make a difference – I am also not sure if I have used the “makePSOCKcluster” wrapper in the correct way. Can someone please tell me if what I am doing is correct? Are there any other ways to accelerate this code?

Thanks

Here is an attempt at it.

1. Rewrite the function

The function below is faster. I believe it could be made faster if data.frames were replaced by matrices. The results are not identical to the results output by the original function in the question because there are some NA‘s that are now zeros.

library(foreach)
library(parallel)
library(dplyr)
library(data.table)

### START
grid_function_2 <- function(train_data, X) {
  random_1234 <- X[1:4]
  split_123 <- X[ seq_along(X)[-(1:4)] ]
  #bin data according to random criteria
  ia <- which(train_data$a1 <= random_1234[1] & train_data$b1 <= random_1234[3])
  ib <- which(train_data$a1 <= random_1234[2] & train_data$b1 <= random_1234[4])
  train_data$cat <- "c"
  train_data$cat[ia] <- "a"
  train_data$cat[ib] <- "b"
  train_data$cat <- factor(train_data$cat)
  #new splits
  table_abc <- split(train_data, train_data$cat)

  #calculate random quantile ("quant") for each bin

  table_abc <- lapply(seq_along(table_abc), function(i){
    quant <- quantile(table_abc[[i]]$c1, prob = split_123[i])
    diff <- as.integer(quant > table_abc[[i]]$c1)
    cbind(table_abc[[i]], diff = diff)
  })

  #group all tables
  final_table <- do.call(rbind, table_abc)
  final_table_2 <- final_table %>%
    group_by(cat) %>%
    summarise(mean = mean(diff))

  #add "total mean" to this table
  final_table_2 <- final_table_2 %>% add_row(cat = "total", mean = mean(final_table$diff))

  #format this table: add the random criteria to this table for reference
  final_table_2$random_1 <- random_1234[1]
  final_table_2$random_2 <- random_1234[2]
  final_table_2$random_3 <- random_1234[3]
  final_table_2$random_4 <- random_1234[4]
  final_table_2$split_1 <- split_123[1]
  final_table_2$split_2 <- split_123[2]
  final_table_2$split_3 <- split_123[3]

  dcast(as.data.table(final_table_2), random_1 + random_2 + random_3 + random_4 + split_1 + split_2 + split_3 ~ cat, value.var="mean")
}

2. The test data

The test data sets are smaller and made reproducible by setting the RNG seed.

# make the results reproducible
set.seed(2021)

# create some data for this example
a1 <- rnorm(1000,100,10)
b1 <- rnorm(1000,100,5)
c1 <- sample.int(1000, 1000, replace = TRUE)
train_data <- data.frame(a1,b1,c1)

#grid
n <- 3
random_1 <- seq(80,100,5)[1:n]
random_2 <- seq(85,120,5)[1:n]
random_3 <- seq(85,120,5)[1:n]
random_4 <- seq(90,120,5)[1:n]
split_1 <- seq(0,1,0.1)[1:n]
split_2 <- seq(0,1,0.1)[1:n]
split_3 <- seq(0,1,0.1)[1:n]
DF_1 <- expand.grid(random_1 , random_2, random_3, random_4, split_1, split_2, split_3)

# smaller data set with 3^7 rows
dim(DF_1)
#[1] 2187    7

## NOT RUN
#reduce the size of the grid for this example
#DF_1 <- DF_1[1:100000,]

colnames(DF_1) <- c("random_1" , "random_2", "random_3", "random_4",
                    "split_1", "split_2", "split_3")

train_data_new <- copy(train_data)

3. Timings

And now the comparative tests.
The reference value is t0 <- system.time({original apply loop and function}).

Then there are three tests, with apply/new function, with mclapply (not on Windows) and with parLapply.

t1 <- system.time({
  resultdf1 <- apply(DF_1, 1, # 1 means rows
                     FUN = function(x){
                       grid_function_2(train_data_new, x)
                     }
  )
})
final_output2 <- rbindlist(resultdf1, fill = TRUE)

# This cannot be run on Windows    
t2 <- system.time({
  ncores <- detectCores()
  resultdf1 <- mclapply(1:nrow(DF_1), function(i){
    grid_function_2(train_data_new, DF_1[i, ])
  },
  mc.cores = ncores - 1L)
})
final_output3 <- rbindlist(resultdf1, fill = TRUE)

t3 <- system.time({
  ncores <- detectCores()
  cl <- makePSOCKcluster(ncores - 1L) # reserve 1 cpu core
  clusterExport(cl, "train_data_new")
  clusterExport(cl, "DF_1")
  clusterExport(cl, "grid_function_2")
  clusterEvalQ(cl, "train_data_new")
  clusterEvalQ(cl, "DF_1")
  clusterEvalQ(cl, "grid_function_2")
  clusterEvalQ(cl, library(dplyr))
  clusterEvalQ(cl, library(data.table))
  resultdf1 <- parLapply(
    cl = cl,
    X = 1:nrow(DF_1),
    function(i){
      grid_function_2(train_data_new, DF_1[i, ])
    }
  )
  # when finished`
  stopCluster(cl)
})
final_output4 <- rbindlist(resultdf1, fill = TRUE)

4. Check the results

Like I said above, there are NA values mismatches when comparing the original final_output with this final_output2.

all.equal(final_output, final_output2)
#[1] "Column 'b': 'is.NA' value mismatch: 0 in current 243 in target"

identical(final_output2, final_output3)
#[1] TRUE
identical(final_output2, final_output4)
#[1] TRUE

As for the timings, remove t2 if the code is to be run on Windows. The fastest are t2 and t3 by around an order of magnitude.

rbind(t0, t1, t2, t3)