File: conn.R

package info (click to toggle)
r-cran-rsclient 0.7-10-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 236 kB
  • sloc: ansic: 2,137; makefile: 2
file content (146 lines) | stat: -rw-r--r-- 5,449 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
RSconnect <- function(host="localhost", port=6311) {
  c <- socketConnection(host,port,open="a+b",blocking=TRUE)
  a <- readBin(c,"raw",32)
  if (!length(a)) { close(c); stop("Attempt to connect to Rserve timed out, connection closed") }
  if (length(a) != 32 || !length(grep("^Rsrv01..QAP1",rawToChar(a))))
    stop("Invalid response from Rserve")
  return( c )
}

RSeval <- function(c, expr) {
  r <- if (is.character(expr)) serialize(parse(text=paste("{",paste(expr,collapse="\n"),"}"))[[1]],NULL) else serialize(expr, NULL)
  writeBin(c(0xf5L, length(r), 0L, 0L), c, endian="little")
  writeBin(r, c)
  b <- readBin(c,"int",4,endian="little")
  if (length(b)<4 || b[1] != 65537L) stop("remote evaluation failed")
  unserialize(readBin(c,"raw",b[2]))
}

RSassign <- function (c, obj, name = deparse(substitute(obj))) {
  r <- serialize(list(name, obj), NULL)
  writeBin(c(0xf6L,length(r),0L,0L), c, endian="little")
  writeBin(r, c)
  b <- readBin(c,"int",4,endian="little")
  if (length(b)<4 || b[1] != 65537L)
    stop("remote assign failed")
  invisible(obj)
}

RSclose <- function(c) close(c)

# convert an array of unsigned integers into raw verctor safely
# by converting 16-bits at a time
.safe.int <- function(data) {
  r <- raw(length(data) * 4)
  j <- 1
  for (i in data) {
    hi <- as.integer(i / 0x10000 + 0.5)
    lo <- as.integer( (i - hi*0x10000) + 0.5)
    rs <- writeBin(c(lo, hi), raw(), endian="little")
    r[j] <- rs[1]
    r[j+1] <- rs[2]
    r[j+2] <- rs[5]
    r[j+3] <- rs[6]
    j <- j + 4
  }
  r
}

RSdetach <- function( c ) RSevalDetach( c, "" )

RSevalDetach <- function( c, cmd="" ) {
  # retrieve the host name from the connection (possibly unsafe!)
  host <- substr(strsplit(summary(c)$description,":")[[1]][1],3,999)
  if ( cmd != "" ) {
    r <- paste("serialize({", cmd[1], "},NULL)")
    l <- nchar(r[1])+1
    writeBin(as.integer(c(0x031,l+4,0,0,4+l*256)), c, endian="little")
    writeBin(as.character(r[1]), c)
    b <- readBin(c,"int",4,endian="little")
    if (b[1]%%256 == 2 || b[2] < 12) stop("Eval/detach failed with error: ",b[1]%/%0x1000000)
    ## We don't need "isLarge" because we never get large data back
  } else {
    l <- 0
    writeBin(as.integer(c(0x030,l+4,0,0,4+l*256)), c, endian="little")
    b <- readBin(c,"int",4,endian="little")
    if (b[1]%%256 != 1) stop("Detach failed with error: ",b[1]%/%0x1000000)
  }
  msgLen <- b[1]%/%256
  a <- readBin(c,"int",2,signed=FALSE,endian="little")
  if (!length(a)) { close(c); stop("Rserve connection timed out and closed") }
  ## a[1] is DT_INT, a[2] is the payload (port#)
  port <- a[ 2 ]
  readBin(c,"raw",4) ## this should be DT_BYTESTREAM
  key <- readBin(c,"raw",msgLen-12)
  RSclose(c)
  list( port=port, key=key, host=host )
}

RSattach <- function(session) {
  c <- socketConnection(session$host,session$port,open="a+b",blocking=TRUE)
  writeBin( session$key, c )
  b <- readBin(c,"int",4,endian="little")
  if (!length(b)) { close(c); stop("Rserve connection timed out and closed") }
  if (b[1]%%256 != 1) stop("Attach failed with error: ",b[1]%/%0x1000000)
  c
}

RSlogin <- function(c, user, pwd, silent=FALSE) {
  r <- paste(user,pwd,sep="\n")
  l <- nchar(r[1])+1
  writeBin(as.integer(c(1,l+4,0,0,4+l*256)), c, endian="little")
  writeBin(as.character(r[1]), c)
  b <- readBin(c,"int",4,endian="little")
  if (!length(b)) { close(c); stop("Rserve connection timed out and closed") }
  ##cat("header: ",b[1],", ",b[2],"\n")    
  msgLen <- b[2]
  if (msgLen > 0) a <- readBin(c,"raw",msgLen)
  if (b[1]%%256 != 1 && !silent) stop("Login failed with error: ",b[1]%/%0x1000000)
  invisible(b[1]%%256 == 1)
}

RSserverEval <- function(c, expr) {
  if (is.language(expr)) expr <- deparse(expr)
  if (!is.character(expr)) stop("expr must me a character vector, name, call or an expression")
  r <- charToRaw(paste(expr,collapse='\n'))
  l <- length(r) + 1L
  writeBin(as.integer(c(0x42L, l + 4L,0L ,0L ,4L + l * 256L)), c, endian="little")
  writeBin(r, c)
  writeBin(raw(1), c)
  b <- readBin(c, "int", 4, endian="little")
  if (!length(b)) { close(c); stop("Rserve connection timed out and closed") }
  msgLen <- b[2]
  if (msgLen > 0) a <- readBin(c,"raw",msgLen)
  if (b[1]%%256 != 1) stop("RSserverEval failed with error: ",b[1]%/%0x1000000)
  invisible(b[1]%%256 == 1)  
}

RSserverSource <- function(c, file) {
  if (!is.character(file) || length(file) != 1) stop("`file' must be a string")
  r <- charToRaw(file)
  l <- length(r) + 1L
  writeBin(as.integer(c(0x45L, l + 4L,0L ,0L ,4L + l * 256L)), c, endian="little")
  writeBin(r, c)
  writeBin(raw(1), c)
  b <- readBin(c, "int", 4, endian="little")
  if (!length(b)) { close(c); stop("Rserve connection timed out and closed") }
  msgLen <- b[2]
  if (msgLen > 0) a <- readBin(c,"raw",msgLen)
  if (b[1]%%256 != 1) stop("RSserverSource failed with error: ",b[1]%/%0x1000000)
  invisible(b[1]%%256 == 1)  
}

RSshutdown <- function(c, pwd=NULL, ctrl=FALSE) {
  if (ctrl) {
    writeBin(c(0x44L, 0L, 0L, 0L), c, endian="little")
    b <- readBin(c, "int", 4, endian="little")
    if (!length(b)) { close(c); stop("Rserve connection timed out and closed") }
    msgLen <- b[2]
    if (msgLen > 0) a <- readBin(c,"raw",msgLen)
    if (b[1]%%256 != 1) stop("ctrlShutdown failed with error: ",b[1]%/%0x1000000)
    invisible(b[1]%%256 == 1)  
  } else {
    # FIXME: we ignore pwd and don't check error status
    writeBin(as.integer(c(4, 0, 0, 0)), c, endian="little")
  }
}