################################################################################ # 기상관측계 : CWS # 지하수위계 : GWL # 다층 토양수분계 : MSM # 초음파관유량계(S) : PFS # 레이다유량계 : RFM # 초음파수위계 : UWL iwmp.quality.control.station.data <- function(EnvList){ library(data.table) library(dplyr) library(zoo) prjdir <- EnvList$prjdir msm_moist_stdv <- EnvList$msm_moist_stdv msm_temp_stdv <- EnvList$msm_temp_stdv msm_max_na_length <- EnvList$msm_max_na_length # cws_temp_stdv <- EnvList$cws_temp_stdv cws_max_na_length <- EnvList$cws_max_na_length # pfs_flow_stdv <- EnvList$pfs_flow_stdv pfs_max_na_length <- EnvList$pfs_max_na_length rfm_cor_coeff_thold <- EnvList$rfm_cor_coeff_thold rfm_max_na_length <- EnvList$rfm_max_na_length uwl_depth_stdv <- EnvList$uwl_depth_stdv uwl_max_na_length <- EnvList$uwl_max_na_length stn.dir <- file.path(prjdir, "02_extracted.station") qc.dir <- file.path(prjdir, "03_quality.control") if(!dir.exists(qc.dir)) dir.create(qc.dir, showWarnings = F, recursive = T) flist <- list.files(stn.dir, pattern = glob2rx("*.csv"), full.names = F) stypes <- unique(matrix(unlist(strsplit(flist, "-")), nrow=length(flist), byrow=T)[,1]) for(i in 1:length(stypes)){ stype <- stypes[i] flist <- list.files(stn.dir, pattern = glob2rx(sprintf("%s*.csv", stype)), full.names = T) for(j in 1:length(flist)){ fname <- flist[j] cat(sprintf("%s started for interpolation!\n", basename(fname))) stn.dt <- as.data.table(read.csv(fname, header = T)) stn.dt <- setDT(stn.dt) if(stype == "cws"){ # Replace with NA stn.dt[, temp := if (.N > 1 && length(unique(temp)) == 1) NA else temp, by = .(date = as.Date(time))] stn.dt[, dew := if (.N > 1 && length(unique(dew)) == 1) NA else dew, by = .(date = as.Date(time))] stn.dt[, rhum := if (.N > 1 && length(unique(rhum)) == 1) NA else rhum, by = .(date = as.Date(time))] stn.dt[, wspd := if (.N > 1 && length(unique(wspd)) == 1) NA else wspd, by = .(date = as.Date(time))] stn.dt[, wdir := if (.N > 1 && length(unique(wdir)) == 1) NA else wdir, by = .(date = as.Date(time))] stn.dt[, rsds := if (.N > 1 && length(unique(rsds)) == 1) NA else rsds, by = .(date = as.Date(time))] stn.dt[, sshine := if (.N > 1 && length(unique(sshine)) == 1) NA else sshine, by = .(date = as.Date(time))] stn.dt[, lux := if (.N > 1 && length(unique(lux)) == 1) NA else lux, by = .(date = as.Date(time))] stn.dt[, presure := if (.N > 1 && length(unique(presure)) == 1) NA else presure, by = .(date = as.Date(time))] stn.dt[, pet := if (.N > 1 && length(unique(pet)) == 1) NA else pet, by = .(date = as.Date(time))] # convert accumulated to hourly data stn.dt[, prcp := c(0, diff(prcp))] stn.dt[prcp < 0, prcp := 0] # convert accumulated to hourly data stn.dt[, sshine := c(0, diff(sshine))] stn.dt[sshine < 0, sshine := 0] # Unit converstion from W/m2 to MJ/m2: 1 Wh = 0.0036 MJ stn.dt[, rsds := rsds * 0.0036] stn.dt[dew<=0, dew := NA] stn.dt[rhum<=0, rhum := NA] stn.dt[wdir<=0, wdir := NA] stn.dt[presure<=0, presure := NA] # # Check outliers # stn.dt[, sd_temp := sd(temp, na.rm = TRUE)/mean(temp, na.rm=TRUE), by = .(as.Date(time))] # stn.dt[sd_temp > cws_temp_stdv, temp := NA] stn.dt$time <- format(stn.dt$time, format = "%Y-%m-%d %H:%M:%S") # Interpolating NA values stn.dt[, c("temp", "dew", "rhum", "wspd", "wdir", "rsds", "sshine", "lux", "presure") := lapply(.SD, interpolate_na, max_na_length = cws_max_na_length), .SDcols = c("temp", "dew", "rhum", "wspd", "wdir", "rsds", "sshine", "lux", "presure")] } if(stype == "msm"){ ## Soil moistures if(any(grep("sm10", names(stn.dt)))){ stn.dt[sm10<=0, sm10 := NA] stn.dt[, sd_sm10 := sd(sm10, na.rm = TRUE)/mean(sm10, na.rm=TRUE), by = .(as.Date(time))] stn.dt[sd_sm10 > msm_moist_stdv, sm10 := NA] stn.dt <- stn.dt[, sd_sm10 := NULL] stn.dt[, sm10 := interpolate_na(sm10, msm_max_na_length)] } if(any(grep("sm20", names(stn.dt)))){ stn.dt[sm20<=0, sm20 := NA] stn.dt[, sd_sm20 := sd(sm20, na.rm = TRUE)/mean(sm20, na.rm=TRUE), by = .(as.Date(time))] stn.dt[sd_sm20 > msm_moist_stdv, sm20 := NA] stn.dt <- stn.dt[, sd_sm20 := NULL] stn.dt[, sm20 := interpolate_na(sm20, msm_max_na_length)] } if(any(grep("sm30", names(stn.dt)))){ stn.dt[sm30<=0, sm30 := NA] stn.dt[, sd_sm30 := sd(sm30, na.rm = TRUE)/mean(sm30, na.rm=TRUE), by = .(as.Date(time))] stn.dt[sd_sm30 > msm_moist_stdv, sm30 := NA] stn.dt <- stn.dt[, sd_sm30 := NULL] stn.dt[, sm30 := interpolate_na(sm30, msm_max_na_length)] } if(any(grep("sm40", names(stn.dt)))){ stn.dt[sm40<=0, sm40 := NA] stn.dt[, sd_sm40 := sd(sm40, na.rm = TRUE)/mean(sm40, na.rm=TRUE), by = .(as.Date(time))] stn.dt[sd_sm40 > msm_moist_stdv, sm40 := NA] stn.dt <- stn.dt[, sd_sm40 := NULL] stn.dt[, sm40 := interpolate_na(sm40, msm_max_na_length)] } if(any(grep("sm50", names(stn.dt)))){ stn.dt[sm50<=0, sm50 := NA] stn.dt[, sd_sm50 := sd(sm50, na.rm = TRUE)/mean(sm50, na.rm=TRUE), by = .(as.Date(time))] stn.dt[sd_sm50 > msm_moist_stdv, sm50 := NA] stn.dt <- stn.dt[, sd_sm50 := NULL] stn.dt[, sm50 := interpolate_na(sm50, msm_max_na_length)] } ## Soil temperature if(any(grep("st10", names(stn.dt)))){ stn.dt[st10<=0, st10 := NA] stn.dt[, sd_st10 := sd(st10, na.rm = TRUE)/mean(st10, na.rm=TRUE), by = .(as.Date(time))] stn.dt[sd_st10 > msm_temp_stdv, st10 := NA] stn.dt <- stn.dt[, sd_st10 := NULL] stn.dt[, st10 := interpolate_na(st10, msm_max_na_length)] } if(any(grep("st20", names(stn.dt)))){ stn.dt[st20<=0, st20 := NA] stn.dt[, sd_st20 := sd(st20, na.rm = TRUE)/mean(st20, na.rm=TRUE), by = .(as.Date(time))] stn.dt[sd_st20 > msm_temp_stdv, st20 := NA] stn.dt <- stn.dt[, sd_st20 := NULL] stn.dt[, st20 := interpolate_na(st20, msm_max_na_length)] } if(any(grep("st30", names(stn.dt)))){ stn.dt[st30<=0, st30 := NA] stn.dt[, sd_st30 := sd(st30, na.rm = TRUE)/mean(st30, na.rm=TRUE), by = .(as.Date(time))] stn.dt[sd_st30 > msm_temp_stdv, st30 := NA] stn.dt <- stn.dt[, sd_st30 := NULL] stn.dt[, st30 := interpolate_na(st30, msm_max_na_length)] } if(any(grep("st40", names(stn.dt)))){ stn.dt[st40<=0, st40 := NA] stn.dt[, sd_st40 := sd(st40, na.rm = TRUE)/mean(st40, na.rm=TRUE), by = .(as.Date(time))] stn.dt[sd_st40 > msm_temp_stdv, st40 := NA] stn.dt <- stn.dt[, sd_st40 := NULL] stn.dt[, st40 := interpolate_na(st40, msm_max_na_length)] } if(any(grep("st50", names(stn.dt)))){ stn.dt[st50<=0, st50 := NA] stn.dt[, sd_st50 := sd(st50, na.rm = TRUE)/mean(st50, na.rm=TRUE), by = .(as.Date(time))] stn.dt[sd_st50 > msm_temp_stdv, st50 := NA] stn.dt <- stn.dt[, sd_st50 := NULL] stn.dt[, st50 := interpolate_na(st50, msm_max_na_length)] } stn.dt$time <- format(stn.dt$time, format = "%Y-%m-%d %H:%M:%S") } if(stype == "pfs"){ # colnames(stn.dt) <- c("date", "flow", "accu", "para") # # Check outliers # stn.dt[, sd_flow := sd(flow, na.rm = TRUE)/mean(flow, na.rm=TRUE), by = .(substr(date, 1, 7))] # stn.dt[sd_flow > pfs_flow_stdv, flow := NA] } if(stype == "rfm"){ stn.dt$time <- format(stn.dt$time, format = "%Y-%m-%d %H:%M:%S") stn.dt <- setDT(stn.dt) stn.dt[depth==h.ref, depth := NA] stn.dt[depth<=-0.2, depth := NA] stn.dt[velocity<=0, velocity := NA] diff_cor <- 1 while(diff_cor >= rfm_cor_coeff_thold){ data <- stn.dt[,c("velocity", "depth")] ini_cor <- cor(data$depth, data$velocity, use = "complete.obs") result <- find_max_correlation(data, ini_cor) new_cor <- result$new_cor col_name <- result$col_name row_no <- result$row_no diff_cor <- new_cor - ini_cor if(diff_cor > rfm_cor_coeff_thold) { if(col_name == "depth") stn.dt[row_no, depth := NA] if(col_name == "velocity") stn.dt[row_no, velocity := NA] } cat(sprintf(" value has been treated as NA (row.no: %d, col.name: %s, diff.cor: %5.4f)\n", row_no, col_name, diff_cor)) } stn.dt$time <- format(stn.dt$time, format = "%Y-%m-%d %H:%M:%S") stn.dt[, c("velocity", "depth", "h.ref") := lapply(.SD, interpolate_na, max_na_length = uwl_max_na_length), .SDcols = c("velocity", "depth", "h.ref")] min.depth <- min(stn.dt$depth, na.rm=T) if(min.depth < 0) stn.dt$depth <- stn.dt$depth + abs(min.depth) } if(stype == "uwl"){ stn.dt[depth==h.ref, depth := NA] stn.dt[depth<=-0.2, depth := NA] stn.dt[, sd_depth := sd(depth, na.rm = TRUE)/mean(depth, na.rm=TRUE), by = .(as.Date(time))] stn.dt[sd_depth > uwl_depth_stdv, depth := NA] stn.dt <- stn.dt[, sd_depth := NULL] stn.dt$time <- format(stn.dt$time, format = "%Y-%m-%d %H:%M:%S") stn.dt[, c("depth", "h.ref") := lapply(.SD, interpolate_na, max_na_length = uwl_max_na_length), .SDcols = c("depth", "h.ref")] min.depth <- min(stn.dt$depth, na.rm=T) if(min.depth < 0) stn.dt$depth <- stn.dt$depth + abs(min.depth) } cat(sprintf(" => %s has been finished!\n\n", basename(fname))) OrgDFile <- file.path(qc.dir, basename(fname)) write.csv(stn.dt, OrgDFile, na="", row.names = F) } # Station IDs } # If sensor type is all }