1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159
|
process_has_input_connection <- function(self, private) {
"!DEBUG process_has_input_connection `private$get_short_name()`"
!is.null(private$stdin_pipe)
}
process_has_output_connection <- function(self, private) {
"!DEBUG process_has_output_connection `private$get_short_name()`"
!is.null(private$stdout_pipe)
}
process_has_error_connection <- function(self, private) {
"!DEBUG process_has_error_connection `private$get_short_name()`"
!is.null(private$stderr_pipe)
}
process_has_poll_connection <- function(self, private) {
"!DEBUG process_has_error_connection `private$get_short_name()`"
!is.null(private$poll_pipe)
}
process_get_input_connection <- function(self, private) {
"!DEBUG process_get_input_connection `private$get_short_name()`"
if (!self$has_input_connection())
throw(new_error("stdin is not a pipe."))
private$stdin_pipe
}
process_get_output_connection <- function(self, private) {
"!DEBUG process_get_output_connection `private$get_short_name()`"
if (!self$has_output_connection())
throw(new_error("stdout is not a pipe."))
private$stdout_pipe
}
process_get_error_connection <- function(self, private) {
"!DEBUG process_get_error_connection `private$get_short_name()`"
if (!self$has_error_connection())
throw(new_error("stderr is not a pipe."))
private$stderr_pipe
}
process_get_poll_connection <- function(self, private) {
"!DEBUG process_get_poll_connection `private$get_short_name()`"
if (!self$has_poll_connection()) throw(new_error("No poll connection"))
private$poll_pipe
}
process_read_output <- function(self, private, n) {
"!DEBUG process_read_output `private$get_short_name()`"
con <- process_get_output_connection(self, private)
if (private$pty) if (poll(list(con), 0)[[1]] == "timeout") return("")
chain_call(c_processx_connection_read_chars, con, n)
}
process_read_error <- function(self, private, n) {
"!DEBUG process_read_error `private$get_short_name()`"
con <- process_get_error_connection(self, private)
chain_call(c_processx_connection_read_chars, con, n)
}
process_read_output_lines <- function(self, private, n) {
"!DEBUG process_read_output_lines `private$get_short_name()`"
con <- process_get_output_connection(self, private)
if (private$pty) {
throw(new_error("Cannot read lines from a pty (see manual)"))
}
chain_call(c_processx_connection_read_lines, con, n)
}
process_read_error_lines <- function(self, private, n) {
"!DEBUG process_read_error_lines `private$get_short_name()`"
con <- process_get_error_connection(self, private)
chain_call(c_processx_connection_read_lines, con, n)
}
process_is_incompelete_output <- function(self, private) {
con <- process_get_output_connection(self, private)
! chain_call(c_processx_connection_is_eof, con)
}
process_is_incompelete_error <- function(self, private) {
con <- process_get_error_connection(self, private)
! chain_call(c_processx_connection_is_eof, con)
}
process_read_all_output <- function(self, private) {
result <- ""
while (self$is_incomplete_output()) {
self$poll_io(-1)
result <- paste0(result, self$read_output())
}
result
}
process_read_all_error <- function(self, private) {
result <- ""
while (self$is_incomplete_error()) {
self$poll_io(-1)
result <- paste0(result, self$read_error())
}
result
}
process_read_all_output_lines <- function(self, private) {
results <- character()
while (self$is_incomplete_output()) {
self$poll_io(-1)
results <- c(results, self$read_output_lines())
}
results
}
process_read_all_error_lines <- function(self, private) {
results <- character()
while (self$is_incomplete_error()) {
self$poll_io(-1)
results <- c(results, self$read_error_lines())
}
results
}
process_write_input <- function(self, private, str, sep) {
"!DEBUG process_write_input `private$get_short_name()`"
con <- process_get_input_connection(self, private)
if (is.character(str)) {
pstr <- paste(str, collapse = sep)
str <- iconv(pstr, "", private$encoding, toRaw = TRUE)[[1]]
}
invisible(chain_call(c_processx_connection_write_bytes, con, str))
}
process_get_input_file <- function(self, private) {
private$stdin
}
process_get_output_file <- function(self, private) {
private$stdout
}
process_get_error_file <- function(self, private) {
private$stderr
}
# Corresponds to processx.h, update there as well
poll_codes <- c(
"nopipe", # PXNOPIPE
"ready", # PXREADY
"timeout", # PXTIMEOUT
"closed", # PXCLOSED
"silent", # PXSILENT
"event", # PXEVENT
"connect" # PXCONNECT
)
process_poll_io <- function(self, private, ms) {
poll(list(self), ms)[[1]]
}
|