Function for Uploading HYCOM currents to BigQuery

library(tidyverse)
library(lubridate)
library(ncdf4)
library(purrr)

This is an initial function to pull currents data from the HYCOM GOFS 3.1: 41-layer HYCOM + NCODA Global 1/12° Analysis. Data availability varies by time period but this particular function pulls for data that goes back to at least 2018-01-01. Slight modifications to the curl url may be necessary to pull earlier currents data.

The data resolution varied by latitude in the following manner. The grid is 0.08 deg lon x 0.08 deg lat between 40S-40N. Poleward of 40S/40N, the grid is 0.08 deg lon x 0.04 deg lat. It spans 80S to 90N. The temporal resolution is every 3 hours starting at 0:00 and going to 24:00, thus there can be repeats at midnight. To eliminate this, this function starts each day at 0:00 and the last time point is 21:00, leaving 24:00 to be represented by 0:00 of the following day.

The URL provides the daily data in a NetCDF format (.nc), which then needs to be processed. The data is stored as two, 3251 x 4500 x 9 array (the depth being the time points), one array for the northward currents and one array for the eastward currents.

I have combined the two current vectors into the resultant vector as sqrt(x_current^2 + y_current^2)

The function below is designed to loop through a series of dates to bulk upload all day for each day using bq load.

ISSUES:

Currently the daily table size appears to be too large to upload in a timely manner to Big Query. The total table is 117,036,000 rows per day and take ~45 - 60 minutes to upload and typically times out.

I am exploring other options for upload (possibly from GCS) or for making the uploads smaller.

UPDATE:

I reformulated the code to use data.table::rbindlist for binding the 3-hourly dataframes together. Previously I was using rbind which was not smart and got slower as each data frame was added. The data.table implementation also appears to be ~2x faster than dplyr::bind_rows. I also added data.table::fwrite for saving the .CSV prior to uploading to BQ. This implementation took a save process (for a 4.3GB file) that required nearly an hour using readr::read_csv and reduced it to ~20 seconds. I also removed the for loops by using purrr, which makes the code a bit tidier, but doesn’t really have an effect on the performance. The slowest portion of the entire process remains bq load and I don’t have a means of making that process any faster.

# function to process individual time points within each day
process_timepoint <- function(index, x_current.array, y_current.array,lonlat, nlat, nlon, date, hour_seq) {
    print(glue::glue('Processing Hour: {hour_seq[index]}'))
    # select specified time point
    x_current.array_day <- x_current.array[,, index]
    y_current.array_day <- y_current.array[,, index]
    
    # convert to long vector
    x_current.vec.long <- as.numeric(as.vector(x_current.array_day))
    y_current.vec.long <- as.numeric(as.vector(y_current.array_day))
    
    # convert variable to matrix the length of lat/lon
    x_current.mat <- matrix(x_current.vec.long, nrow = nlon * nlat, ncol = 1)
    y_current.mat <- matrix(y_current.vec.long, nrow = nlon * nlat, ncol = 1)
    
    #expand lat/lon and bind to wind speed variable
    #lonlat <- expand.grid(lon, lat)
    x_current_df <- data.frame(cbind(lonlat, x_current.mat))
    y_current_df <- data.frame(cbind(lonlat, y_current.mat))
    
    #rename fields
    names(x_current_df) <- c('lon','lat','x_current')
    names(y_current_df) <- c('lon','lat','y_current')
    
    x_current_df$lon <- c(x_current_df$lon)
    x_current_df$lat <- c(x_current_df$lat)
    
    y_current_df$lon <- c(y_current_df$lon)
    y_current_df$lat <- c(y_current_df$lat)
    
    #combine x_current and y_current into combined current vector
     total_current_vector <-  x_current_df %>%
        dplyr::mutate(date = date,
                      lat = round(lat, 2),
                      lon = round(lon, 2),
                      current = sqrt(x_current * x_current +
                                     y_current_df$y_current * y_current_df$y_current),
                      current = ifelse(is.na(current), '', current),
                      hour = hour_seq[index]) %>%
       dplyr::select(-x_current)
  
}
# function to pull data from HYCOM by day, processes all time points within a
# day and append data to Big Query table.
process_currents <- function(date) {
  
  temp_list <- list()
  
  next_date <- as.Date(date, tz = 'UTC') + lubridate::days(1)
  
  print(glue::glue('Downloading Date: {date}' ))
  
  #curl command to download data to working directory
  curl::curl_download(glue::glue(
'http://ncss.hycom.org/thredds/ncss/GLBv0.08/expt_93.0/uv3z?var=water_u&
var=water_v&north=90.0000&west=0.0000&east=359.9200&south=-80.0000&
disableLLSubset=on&disableProjSubset=on&horizStride=1&
time_start={date}T00%3A00%3A00Z&time_end={next_date}T00%3A00%3A00Z&timeStride=1&vertCoord=0.0&addLatLon=true&accept=netcdf'),'testing_netcdf.nc')

  currents_file <- ncdf4::nc_open('./testing_netcdf.nc')
  
  lon <- ncdf4::ncvar_get(currents_file, "lon")
  nlon <- dim(lon)

  lat <- ncdf4::ncvar_get(currents_file, "lat")
  nlat <- dim(lat)
  
  #get times
  time <- ncdf4::ncvar_get(currents_file, "time")
  
  tunits <- ncdf4::ncatt_get(currents_file,"time","units")
  
  # convert time -- split the time units string into fields
  tustr <- strsplit(tunits$value, " ")
  tdstr <- strsplit(unlist(tustr)[3], "-")
  tmonth <- as.integer(unlist(tdstr)[2])
  tday <- as.integer(unlist(tdstr)[3])
  tyear <- as.integer(unlist(tdstr)[1])
  new_time <- as.POSIXct(paste0(tyear,'-',tmonth,'-',tday), tz = 'UTC') + 
                              lubridate::hours(time)
  
  hour_seq <- data.frame(time_c = new_time) %>%
    mutate(date_c = as.Date(time_c, tz = 'UTC')) %>%
    filter(date_c == as.Date(date, tz = 'UTC')) %>%
    mutate(time_hour = lubridate::hour(time_c)) %>%
    pull(time_hour)
  
  #wind speed variables (u, v)
  x_current.array <- ncdf4::ncvar_get(currents_file, 'water_u')
  y_current.array <- ncdf4::ncvar_get(currents_file, 'water_v')
  
  # NA value
  x.current.fillvalue <- ncdf4::ncatt_get(currents_file, 'water_u', "_FillValue")
  y.current.fillvalue <- ncdf4::ncatt_get(currents_file, 'water_v', "_FillValue")
  
  #replace fill value with NA
  x_current.array[x_current.array == x.current.fillvalue$value] <- NA_real_
  y_current.array[y_current.array == y.current.fillvalue$value] <- NA_real_
  
  #specify lon/lat grid
  lat_grid <- c(seq(-80,-40.04,length.out = 1000), 
                seq(-40,40,length.out = 1001), 
                seq(40.04,90,length.out = 1250))
  lon_grid <- seq(0,359.92, length.out = 4500)
  lonlat <- expand.grid(lon_grid, lat_grid)
  
  #for each time point (ignoring the last timepoint of the day (24:00))
 
  temp_list <- setNames(purrr::map(.x = seq_along(hour_seq), 
             .f = process_timepoint,
             x_current.array, 
             y_current.array,
             lonlat,
             nlat,
             nlon,
             date,
             hour_seq), nm = hour_seq)
  
  
  #running_total
  #save table
  print('Saving File')
  day_total_df <- data.table::rbindlist(temp_list)
  data.table::fwrite(day_total_df, './testing_currents.csv')
  print('Uploading to BigQuery')
   #upload to Big Query (in RStudio need complete path to bq function)
  command = 'bq load --skip_leading_rows=1 --noreplace 
  world-fishing-827:scratch_nate.testing_currents 
  testing_currents.csv lon:float,lat:float,date:date,current:float,hour:integer'
  system(command)
   # delete .CSV 
  system('rm testing_currents.csv')
      
}

Example of function for 1 day

process_currents('2019-01-01')

Example of function for 3 days

date_sequence <- seq.Date(as.Date('2018-01-01', tz = 'UTC'),
                          as.Date('2018-01-03', tz = 'UTC'), 
                          by = 'day')

purrr::map(.x = date_sequence,
           .f = process_currents)