LCOV - code coverage report
Current view: top level - monetdb5/extras/rapi - rapi.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 293 374 78.3 %
Date: 2021-10-13 02:24:04 Functions: 14 15 93.3 %

          Line data    Source code
       1             : /*
       2             :  * This Source Code Form is subject to the terms of the Mozilla Public
       3             :  * License, v. 2.0.  If a copy of the MPL was not distributed with this
       4             :  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
       5             :  *
       6             :  * Copyright 1997 - July 2008 CWI, August 2008 - 2021 MonetDB B.V.
       7             :  */
       8             : 
       9             : /*
      10             :  * H. Muehleisen, M. Kersten
      11             :  * The R interface
      12             :  */
      13             : #include "monetdb_config.h"
      14             : #include "mal.h"
      15             : #include "mal_stack.h"
      16             : #include "mal_linker.h"
      17             : #include "gdk_utils.h"
      18             : #include "gdk.h"
      19             : #include "sql_catalog.h"
      20             : #include "sql_execute.h"
      21             : #include "mutils.h"
      22             : 
      23             : #define RAPI_MAX_TUPLES 2147483647L
      24             : 
      25             : // R headers
      26             : #define R_INTERFACE_PTRS 1
      27             : #define CSTACK_DEFNS 1
      28             : 
      29             : /* R redefines these */
      30             : #undef SIZEOF_SIZE_T
      31             : #undef ERROR
      32             : 
      33             : #define USE_RINTERNALS 1
      34             : 
      35             : #include <Rversion.h>
      36             : #include <Rembedded.h>
      37             : #include <Rdefines.h>
      38             : #include <Rinternals.h>
      39             : #include <R_ext/Parse.h>
      40             : 
      41             : // other headers
      42             : #include <string.h>
      43             : 
      44             : //#define _RAPI_DEBUG_
      45             : 
      46             : // this macro blows up mmath.h pragmas
      47             : #ifdef warning
      48             : # undef warning
      49             : #endif
      50             : 
      51             : #define RSTR(somestr) mkCharCE(somestr, CE_UTF8)
      52             : 
      53             : //Element-wise conversion functions, use no-op as passthrough when no conversion required
      54             : #define M_TO_R_NOOP(v)               (v)
      55             : #define R_TO_M_NOOP(v)               (v)
      56             : #define M_TO_R_DATE(v)               mDate_to_rDate(v)
      57             : #define R_TO_M_DATE(v)               rDate_to_mDate(v)
      58             : 
      59             : #define BAT_TO_SXP(bat,bati,tpe,retsxp,newfun,ptrfun,ctype,naval,memcopy,mapfun) \
      60             :         do {                                                                                                                            \
      61             :                 tpe v; size_t j;                                                                                                \
      62             :                 ctype *valptr = NULL;                                                                                   \
      63             :                 tpe* p = (tpe*) bati.base;                                                                              \
      64             :                 retsxp = PROTECT(newfun(bati.count));                                                   \
      65             :                 if (!retsxp) break;                                                                                             \
      66             :                 valptr = ptrfun(retsxp);                                                                                \
      67             :                 if (bat->tnonil && !bat->tnil) {                                                          \
      68             :                         if (memcopy) {                                                                                          \
      69             :                                 memcpy(valptr, p,                                                                               \
      70             :                                            bati.count * sizeof(tpe));                                           \
      71             :                         } else {                                                                                                        \
      72             :                                 for (j = 0; j < bati.count; j++) {                                           \
      73             :                                         valptr[j] = mapfun((ctype) p[j]);                                       \
      74             :                                 }                                                                                                               \
      75             :                         }                                                                                                                       \
      76             :                 } else {                                                                                                                \
      77             :                         for (j = 0; j < bati.count; j++) {                                                   \
      78             :                                 v = p[j];                                                                                               \
      79             :                                 if ( is_##tpe##_nil(v))                                                                 \
      80             :                                         valptr[j] = naval;                                                                      \
      81             :                                 else                                                                                                    \
      82             :                                         valptr[j] = mapfun((ctype) v);                                          \
      83             :                         }                                                                                                                       \
      84             :                 }                                                                                                                               \
      85             :         } while (0)
      86             : 
      87             : #define BAT_TO_INTSXP(bat,bati,tpe,retsxp,memcopy)                                              \
      88             :         BAT_TO_SXP(bat,bati,tpe,retsxp,NEW_INTEGER,INTEGER_POINTER,int,NA_INTEGER,memcopy,M_TO_R_NOOP)\
      89             : 
      90             : #define BAT_TO_REALSXP(bat,bati,tpe,retsxp,memcopy)                                             \
      91             :         BAT_TO_SXP(bat,bati,tpe,retsxp,NEW_NUMERIC,NUMERIC_POINTER,double,NA_REAL,memcopy,M_TO_R_NOOP)\
      92             : 
      93             : //DATE stored as integer in MonetDB with epoch 0, R uses double and epoch 1970
      94             : #define BAT_TO_DATESXP(bat,bati,tpe,retsxp,memcopy)                                                     \
      95             :         BAT_TO_SXP(bat,bati,tpe,retsxp,NEW_NUMERIC,NUMERIC_POINTER,double,NA_REAL,memcopy, M_TO_R_DATE); \
      96             :         SEXP klass = mkString("Date");                                                                                \
      97             :         classgets(retsxp, klass);
      98             : 
      99             : #define SXP_TO_BAT(tpe, access_fun, na_check, mapfun)                                   \
     100             :         do {                                                                                                                            \
     101             :                 tpe *p, prev = tpe##_nil; size_t j;                                                             \
     102             :                 b = COLnew(0, TYPE_##tpe, cnt, TRANSIENT);                                              \
     103             :                 if (!b) break;                                                  \
     104             :                 b->tnil = false; b->tnonil = true; b->tkey = false;                            \
     105             :                 b->tsorted = true; b->trevsorted = true;                                          \
     106             :                 b->tseqbase = oid_nil;                                                                                       \
     107             :                 p = (tpe*) Tloc(b, 0);                                                                                  \
     108             :                 for( j = 0; j < cnt; j++, p++){                                                                  \
     109             :                         *p = mapfun((tpe) access_fun(s)[j]);                                            \
     110             :                         if (na_check){ b->tnil = true;       b->tnonil = false;   *p= tpe##_nil;} \
     111             :                         if (j > 0){                                                                                                  \
     112             :                                 if (b->trevsorted && !is_##tpe##_nil(*p) && (is_##tpe##_nil(prev) || *p > prev)){ \
     113             :                                         b->trevsorted = false;                                                               \
     114             :                                 } else                                                                                                  \
     115             :                                         if (b->tsorted && !is_##tpe##_nil(prev) && (is_##tpe##_nil(*p) || *p < prev)){ \
     116             :                                                 b->tsorted = false;                                                          \
     117             :                                         }                                                                                                       \
     118             :                         }                                                                                                                       \
     119             :                         prev = *p;                                                                                                      \
     120             :                 }                                                                                                                               \
     121             :                 BATsetcount(b, cnt);                                                                                    \
     122             :                 BATsettrivprop(b);                                                                                              \
     123             :         } while (0)
     124             : 
     125             : // DATE epoch differs betwen MonetDB (00-01-01) and R (1970-01-01)
     126             : // no c API for R date handling so use fixed offset
     127             : // >>`-as.double(as.Date(0, origin="0-1-1"))`
     128             : static const int days0To1970 = 719528;
     129             : 
     130             : static int
     131             : mDate_to_rDate(int v)
     132             : {
     133           4 :         return v-days0To1970;
     134             : }
     135             : 
     136             : static int
     137             : rDate_to_mDate(int v)
     138             : {
     139           3 :         return v+days0To1970;
     140             : }
     141             : 
     142             : static SEXP
     143          92 : bat_to_sexp(BAT* b, int type)
     144             : {
     145             :         SEXP varvalue = NULL;
     146          92 :         BATiter bi = bat_iterator(b);
     147             :         // TODO: deal with SQL types (DECIMAL/TIME/TIMESTAMP)
     148          92 :         switch (ATOMstorage(b->ttype)) {
     149           0 :         case TYPE_void: {
     150             :                 size_t i = 0;
     151           0 :                 varvalue = PROTECT(NEW_LOGICAL(BATcount(b)));
     152           0 :                 if (!varvalue) {
     153           0 :                         bat_iterator_end(&bi);
     154           0 :                         return NULL;
     155             :                 }
     156           0 :                 for (i = 0; i < BATcount(b); i++) {
     157           0 :                         LOGICAL_POINTER(varvalue)[i] = NA_LOGICAL;
     158             :                 }
     159             :                 break;
     160             :         }
     161           6 :         case TYPE_bte:
     162          26 :                 BAT_TO_INTSXP(b, bi, bte, varvalue, 0);
     163             :                 break;
     164           1 :         case TYPE_sht:
     165           7 :                 BAT_TO_INTSXP(b, bi, sht, varvalue, 0);
     166             :                 break;
     167          46 :         case TYPE_int:
     168             :                 //Storage is int but the actual defined type may be different
     169             :                 switch (type) {
     170             :                 case TYPE_int:
     171             :                         //Storage is int but the actual defined type may be different
     172             :                         switch (type) {
     173          42 :                                 case TYPE_int: {
     174             :                                         // special case: memcpy for int-to-int conversion without NULLs
     175        1220 :                                         BAT_TO_INTSXP(b, bi, int, varvalue, 1);
     176             :                                 } break;
     177             :                                 default: {
     178             :                                         if (type == ATOMindex("date")) {
     179             :                                                 BAT_TO_DATESXP(b, bi, int, varvalue, 0);
     180             :                                         } else {
     181             :                                                 //Type stored as int but no implementation to decode into native R type
     182             :                                                 BAT_TO_INTSXP(b, bi, int, varvalue, 1);
     183             :                                         }
     184             :                                 }
     185             :                         }
     186             :                         break;
     187           4 :                 default:
     188           4 :                         if (type == TYPE_date) {
     189           8 :                                 BAT_TO_DATESXP(b, bi, int, varvalue, 0);
     190             :                         } else {
     191             :                                 //Type stored as int but no implementation to decode into native R type
     192           7 :                                 BAT_TO_INTSXP(b, bi, int, varvalue, 1);
     193             :                         }
     194             :                         break;
     195             :                 }
     196             :                 break;
     197             : #ifdef HAVE_HGE
     198           1 :         case TYPE_hge: /* R's integers are stored as int, so we cannot be sure hge will fit */
     199           6 :                 BAT_TO_REALSXP(b, bi, hge, varvalue, 0);
     200             :                 break;
     201             : #endif
     202           2 :         case TYPE_flt:
     203           9 :                 BAT_TO_REALSXP(b, bi, flt, varvalue, 0);
     204             :                 break;
     205          18 :         case TYPE_dbl:
     206             :                 // special case: memcpy for double-to-double conversion without NULLs
     207        2610 :                 BAT_TO_REALSXP(b, bi, dbl, varvalue, 1);
     208             :                 break;
     209          11 :         case TYPE_lng: /* R's integers are stored as int, so we cannot be sure long will fit */
     210      201004 :                 BAT_TO_REALSXP(b, bi, lng, varvalue, 0);
     211             :                 break;
     212           7 :         case TYPE_str: { // there is only one string type, thus no macro here
     213             :                 BUN p, q, j = 0;
     214           7 :                 varvalue = PROTECT(NEW_STRING(BATcount(b)));
     215           7 :                 if (varvalue == NULL) {
     216           0 :                         bat_iterator_end(&bi);
     217           0 :                         return NULL;
     218             :                 }
     219             :                 /* special case where we exploit the duplicate-eliminated string heap */
     220           7 :                 if (GDK_ELIMDOUBLES(b->tvheap)) {
     221           7 :                         SEXP* sexp_ptrs = GDKzalloc(b->tvheap->free * sizeof(SEXP));
     222           7 :                         if (!sexp_ptrs) {
     223           0 :                                 bat_iterator_end(&bi);
     224           0 :                                 return NULL;
     225             :                         }
     226         490 :                         BATloop(b, p, q) {
     227         483 :                                 const char *t = (const char *) BUNtvar(bi, p);
     228         483 :                                 ptrdiff_t offset = t - b->tvheap->base;
     229         483 :                                 if (!sexp_ptrs[offset]) {
     230          21 :                                         if (strNil(t)) {
     231           1 :                                                 sexp_ptrs[offset] = NA_STRING;
     232             :                                         } else {
     233          20 :                                                 sexp_ptrs[offset] = RSTR(t);
     234             :                                         }
     235             :                                 }
     236         483 :                                 SET_STRING_ELT(varvalue, j++, sexp_ptrs[offset]);
     237             :                         }
     238           7 :                         GDKfree(sexp_ptrs);
     239             :                 }
     240             :                 else {
     241           0 :                         if (b->tnonil) {
     242           0 :                                 BATloop(b, p, q) {
     243           0 :                                         SET_STRING_ELT(varvalue, j++, RSTR(
     244             :                                                                            (const char *) BUNtvar(bi, p)));
     245             :                                 }
     246             :                         }
     247             :                         else {
     248           0 :                                 BATloop(b, p, q) {
     249           0 :                                         const char *t = (const char *) BUNtvar(bi, p);
     250           0 :                                         if (strNil(t)) {
     251           0 :                                                 SET_STRING_ELT(varvalue, j++, NA_STRING);
     252             :                                         } else {
     253           0 :                                                 SET_STRING_ELT(varvalue, j++, RSTR(t));
     254             :                                         }
     255             :                                 }
     256             :                         }
     257             :                 }
     258             :         }       break;
     259             :         }
     260          92 :         bat_iterator_end(&bi);
     261          92 :         return varvalue;
     262             : }
     263             : 
     264          69 : static BAT* sexp_to_bat(SEXP s, int type) {
     265             :         BAT* b = NULL;
     266          69 :         BUN cnt = LENGTH(s);
     267          69 :         switch (type) {
     268          32 :         case TYPE_int:
     269          32 :                 if (!IS_INTEGER(s)) {
     270             :                         return NULL;
     271             :                 }
     272      402306 :                 SXP_TO_BAT(int, INTEGER_POINTER, *p==NA_INTEGER, R_TO_M_NOOP);
     273             :                 break;
     274           1 :         case TYPE_lng:
     275           1 :                 if (!IS_INTEGER(s)) {
     276             :                         return NULL;
     277             :                 }
     278           6 :                 SXP_TO_BAT(lng, INTEGER_POINTER, *p==NA_INTEGER, R_TO_M_NOOP);
     279             :                 break;
     280             : #ifdef HAVE_HGE
     281           1 :         case TYPE_hge:
     282           1 :                 if (!IS_INTEGER(s)) {
     283             :                         return NULL;
     284             :                 }
     285           6 :                 SXP_TO_BAT(hge, INTEGER_POINTER, *p==NA_INTEGER, R_TO_M_NOOP);
     286             :                 break;
     287             : #endif
     288           3 :         case TYPE_bte:
     289             :         case TYPE_bit:                   // only R logical types fit into bit BATs
     290           3 :                 if (!IS_LOGICAL(s)) {
     291             :                         return NULL;
     292             :                 }
     293        1008 :                 SXP_TO_BAT(bit, LOGICAL_POINTER, *p==NA_LOGICAL, R_TO_M_NOOP);
     294             :                 break;
     295          25 :         case TYPE_dbl:
     296          25 :                 if (!IS_NUMERIC(s)) {
     297             :                         return NULL;
     298             :                 }
     299        1057 :                 SXP_TO_BAT(dbl, NUMERIC_POINTER, (ISNA(*p) || isnan(*p) || isinf(*p)), R_TO_M_NOOP);
     300             :                 break;
     301           6 :         case TYPE_str: {
     302             :                 SEXP levels;
     303             :                 size_t j;
     304           6 :                 if (!IS_CHARACTER(s) && !isFactor(s)) {
     305             :                         return NULL;
     306             :                 }
     307           6 :                 b = COLnew(0, TYPE_str, cnt, TRANSIENT);
     308           6 :                 if (!b) return NULL;
     309           6 :                 b->tnil = false;
     310           6 :                 b->tnonil = true;
     311           6 :                 b->tkey = false;
     312           6 :                 b->tsorted = false;
     313           6 :                 b->trevsorted = false;
     314             :                 /* get levels once, since this is a function call */
     315           6 :                 levels = GET_LEVELS(s);
     316             : 
     317          32 :                 for (j = 0; j < cnt; j++) {
     318             :                         SEXP rse;
     319          26 :                         if (isFactor(s)) {
     320           5 :                                 int ii = INTEGER(s)[j];
     321           5 :                                 if (ii == NA_INTEGER) {
     322           1 :                                         rse = NA_STRING;
     323             :                                 } else {
     324           4 :                                         rse = STRING_ELT(levels, ii - 1);
     325             :                                 }
     326             :                         } else {
     327          21 :                                 rse = STRING_ELT(s, j);
     328             :                         }
     329          26 :                         if (rse == NA_STRING) {
     330           2 :                                 b->tnil = true;
     331           2 :                                 b->tnonil = false;
     332           2 :                                 if (BUNappend(b, str_nil, false) != GDK_SUCCEED) {
     333           0 :                                         BBPreclaim(b);
     334           0 :                                         return NULL;
     335             :                                 }
     336             :                         } else {
     337          24 :                                 if (BUNappend(b, CHAR(rse), false) != GDK_SUCCEED) {
     338           0 :                                         BBPreclaim(b);
     339           0 :                                         return NULL;
     340             :                                 }
     341             :                         }
     342             :                 }
     343             :                 break;
     344             :         }
     345           1 :         default:
     346           1 :                 if (type == TYPE_date) {
     347           1 :                         if (!IS_NUMERIC(s)) {
     348             :                                 return NULL;
     349             :                         }
     350           4 :                         SXP_TO_BAT(date, NUMERIC_POINTER, *p==NA_REAL, R_TO_M_DATE);
     351             :                 }
     352             :         }
     353             : 
     354          68 :         if (b) {
     355          68 :                 BATsetcount(b, cnt);
     356          68 :                 BBPkeepref(b->batCacheid);
     357             :         }
     358             :         return b;
     359             : }
     360             : 
     361             : const char* rapi_enableflag = "embedded_r";
     362             : 
     363          66 : static bool RAPIEnabled(void) {
     364          66 :         return (GDKgetenv_istrue(rapi_enableflag)
     365          66 :                         || GDKgetenv_isyes(rapi_enableflag));
     366             : }
     367             : 
     368             : // The R-environment should be single threaded, calling for some protective measures.
     369             : static MT_Lock rapiLock = MT_LOCK_INITIALIZER(rapiLock);
     370             : static bool rapiInitialized = false;
     371             : #if 0
     372             : static char* rtypenames[] = { "NIL", "SYM", "LIST", "CLO", "ENV", "PROM",
     373             :                 "LANG", "SPECIAL", "BUILTIN", "CHAR", "LGL", "unknown", "unknown",
     374             :                 "INT", "REAL", "CPLX", "STR", "DOT", "ANY", "VEC", "EXPR", "BCODE",
     375             :                 "EXTPTR", "WEAKREF", "RAW", "S4" };
     376             : #endif
     377             : 
     378             : static Client rapiClient = NULL;
     379             : 
     380             : 
     381             : #if 0
     382             : // helper function to translate R TYPEOF() return values to something readable
     383             : char* rtypename(int rtypeid) {
     384             :         if (rtypeid < 0 || rtypeid > 25) {
     385             :                 return "unknown";
     386             :         } else
     387             :                 return rtypenames[rtypeid];
     388             : }
     389             : #endif
     390             : 
     391           0 : static void writeConsoleEx(const char * buf, int buflen, int foo) {
     392             :         (void) buflen;
     393             :         (void) foo;
     394             :         (void) buf; // silence compiler
     395             : #ifdef _RAPI_DEBUG_
     396             :         printf("# %s", buf);
     397             : #endif
     398           0 : }
     399             : 
     400          66 : static void writeConsole(const char * buf, int buflen) {
     401             :         writeConsoleEx(buf, buflen, -42);
     402          66 : }
     403             : 
     404           4 : static void clearRErrConsole(void) {
     405             :         // Do nothing?
     406           4 : }
     407             : 
     408             : static char *RAPIinstalladdons(void);
     409             : 
     410             : /* UNIX-like initialization */
     411             : #ifndef WIN32
     412             : 
     413             : #define R_INTERFACE_PTRS 1
     414             : #define CSTACK_DEFNS 1
     415             : #include <Rinterface.h>
     416             : 
     417           6 : static char *RAPIinitialize(void) {
     418             : // TODO: check for header/library version mismatch?
     419             :         char *e;
     420             : 
     421             :         // set R_HOME for packages etc. We know this from our configure script
     422           6 :         putenv("R_HOME=" RHOME);
     423             : 
     424             :         // set some command line arguments
     425             :         {
     426             :                 structRstart rp;
     427           6 :                 char *rargv[] = { "R",
     428             : #if R_VERSION >= R_Version(4,0,0)
     429             :                                                   "--no-echo",
     430             : #else
     431             :                                                   "--slave",
     432             : #endif
     433             :                                                   "--vanilla" };
     434             :                 int stat = 0;
     435             : 
     436           6 :                 R_DefParams(&rp);
     437             : #if R_VERSION >= R_Version(4,0,0)
     438           6 :                 rp.R_NoEcho = (Rboolean) TRUE;
     439             : #else
     440             :                 rp.R_Slave = (Rboolean) TRUE;
     441             : #endif
     442           6 :                 rp.R_Quiet = (Rboolean) TRUE;
     443           6 :                 rp.R_Interactive = (Rboolean) FALSE;
     444           6 :                 rp.R_Verbose = (Rboolean) FALSE;
     445           6 :                 rp.LoadSiteFile = (Rboolean) FALSE;
     446           6 :                 rp.LoadInitFile = (Rboolean) FALSE;
     447           6 :                 rp.RestoreAction = SA_NORESTORE;
     448           6 :                 rp.SaveAction = SA_NOSAVE;
     449           6 :                 rp.NoRenviron = TRUE;
     450           6 :                 stat = Rf_initialize_R(2, rargv);
     451           6 :                 if (stat < 0) {
     452           0 :                         return "Rf_initialize failed";
     453             :                 }
     454           6 :                 R_SetParams(&rp);
     455             :         }
     456             : 
     457             :         /* disable stack checking, because threads will throw it off */
     458           6 :         R_CStackLimit = (uintptr_t) -1;
     459             :         /* redirect input/output and set error handler */
     460           6 :         R_Outputfile = NULL;
     461           6 :         R_Consolefile = NULL;
     462             :         /* we do not want R to handle any signal, will interfere with monetdbd */
     463           6 :         R_SignalHandlers = 0;
     464             :         /* we want control R's output and input */
     465           6 :         ptr_R_WriteConsoleEx = writeConsoleEx;
     466           6 :         ptr_R_WriteConsole = writeConsole;
     467           6 :         ptr_R_ReadConsole = NULL;
     468           6 :         ptr_R_ClearerrConsole = clearRErrConsole;
     469             : 
     470             :         // big boy here
     471           6 :         setup_Rmainloop();
     472             : 
     473           6 :         if ((e = RAPIinstalladdons()) != 0) {
     474             :                 return e;
     475             :         }
     476             :         // patch R internals to disallow quit and system. Setting them to NULL produces an error.
     477           6 :         SET_INTERNAL(install("quit"), R_NilValue);
     478             :         // install.packages() uses system2 to call gcc etc., so we cannot disable it (perhaps store the pointer somewhere just for that?)
     479             :         //SET_INTERNAL(install("system"), R_NilValue);
     480             : 
     481           6 :         rapiInitialized = true;
     482           6 :         return NULL;
     483             : }
     484             : #else
     485             : 
     486             : #define S_IRWXU         0000700
     487             : 
     488             : static char *RAPIinitialize(void) {
     489             :         return "Sorry, no R API on Windows";
     490             : }
     491             : 
     492             : #endif
     493             : 
     494             : 
     495           6 : static char *RAPIinstalladdons(void) {
     496             :         int evalErr;
     497             :         ParseStatus status;
     498             :         char rlibs[FILENAME_MAX];
     499             :         char rapiinclude[BUFSIZ];
     500             :         SEXP librisexp;
     501             :         int len;
     502             : 
     503             :         // r library folder, create if not exists
     504           6 :         len = snprintf(rlibs, sizeof(rlibs), "%s%c%s", GDKgetenv("gdk_dbpath"), DIR_SEP, "rapi_packages");
     505           6 :         if (len == -1 || len >= FILENAME_MAX)
     506             :                 return "cannot create rapi_packages directory because the path is too large";
     507             : 
     508           6 :         if (MT_mkdir(rlibs) != 0 && errno != EEXIST) {
     509             :                 return "cannot create rapi_packages directory";
     510             :         }
     511             : #ifdef _RAPI_DEBUG_
     512             :         printf("# R libraries installed in %s\n",rlibs);
     513             : #endif
     514             : 
     515           6 :         PROTECT(librisexp = allocVector(STRSXP, 1));
     516           6 :         SET_STRING_ELT(librisexp, 0, mkChar(rlibs));
     517           6 :         Rf_defineVar(Rf_install(".rapi.libdir"), librisexp, R_GlobalEnv);
     518           6 :         UNPROTECT(1);
     519             : 
     520             :         // run rapi.R environment setup script
     521             :         {
     522           6 :                 char *f = locate_file("rapi", ".R", 0);
     523           6 :                 snprintf(rapiinclude, sizeof(rapiinclude), "source(\"%s\")", f);
     524           6 :                 GDKfree(f);
     525             :         }
     526             : #if DIR_SEP != '/'
     527             :         {
     528             :                 char *p;
     529             :                 for (p = rapiinclude; *p; p++)
     530             :                         if (*p == DIR_SEP)
     531             :                                 *p = '/';
     532             :         }
     533             : #endif
     534           6 :         R_tryEvalSilent(
     535           6 :                 VECTOR_ELT(
     536             :                         R_ParseVector(mkString(rapiinclude), 1, &status,
     537             :                                                   R_NilValue), 0), R_GlobalEnv, &evalErr);
     538             : 
     539             :         // of course the script may contain errors as well
     540           6 :         if (evalErr != FALSE) {
     541           0 :                 return "failure running R setup script";
     542             :         }
     543             :         return NULL;
     544             : }
     545             : 
     546             : static str
     547           2 : empty_return(MalBlkPtr mb, MalStkPtr stk, InstrPtr pci, size_t retcols, oid seqbase)
     548             : {
     549             :         str msg = MAL_SUCCEED;
     550           2 :         void **res = GDKzalloc(retcols * sizeof(void*));
     551             : 
     552           2 :         if (!res) {
     553           0 :                 msg = createException(MAL, "rapi.eval", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     554           0 :                 goto bailout;
     555             :         }
     556             : 
     557           4 :         for (size_t i = 0; i < retcols; i++) {
     558           2 :                 if (isaBatType(getArgType(mb, pci, i))) {
     559           1 :                         BAT *b = COLnew(seqbase, getBatType(getArgType(mb, pci, i)), 0, TRANSIENT);
     560           1 :                         if (!b) {
     561           0 :                                 msg = createException(MAL, "rapi.eval", GDK_EXCEPTION);
     562           0 :                                 goto bailout;
     563             :                         }
     564           1 :                         ((BAT**)res)[i] = b;
     565             :                 } else { // single value return, only for non-grouped aggregations
     566             :                         // return NULL to conform to SQL aggregates
     567             :                         int tpe = getArgType(mb, pci, i);
     568           1 :                         if (!VALinit(&stk->stk[pci->argv[i]], tpe, ATOMnilptr(tpe))) {
     569           0 :                                 msg = createException(MAL, "rapi.eval", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     570           0 :                                 goto bailout;
     571             :                         }
     572           1 :                         ((ValPtr*)res)[i] = &stk->stk[pci->argv[i]];
     573             :                 }
     574             :         }
     575             : 
     576           2 : bailout:
     577           2 :         if (res) {
     578           4 :                 for (size_t i = 0; i < retcols; i++) {
     579           2 :                         if (isaBatType(getArgType(mb, pci, i))) {
     580           1 :                                 BAT *b = ((BAT**)res)[i];
     581             : 
     582           1 :                                 if (b && msg) {
     583           0 :                                         BBPreclaim(b);
     584           1 :                                 } else if (b) {
     585           1 :                                         BBPkeepref(*getArgReference_bat(stk, pci, i) = b->batCacheid);
     586             :                                 }
     587           1 :                         } else if (msg) {
     588           0 :                                 ValPtr pt = ((ValPtr*)res)[i];
     589             : 
     590           0 :                                 if (pt)
     591           0 :                                         VALclear(pt);
     592             :                         }
     593             :                 }
     594           2 :                 GDKfree(res);
     595             :         }
     596           2 :         return msg;
     597             : }
     598             : 
     599          60 : static str RAPIeval(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci, bit grouped) {
     600             :         sql_func * sqlfun = NULL;
     601          60 :         str exprStr = *getArgReference_str(stk, pci, pci->retc + 1);
     602             : 
     603             :         SEXP x, env, retval;
     604             :         SEXP varname = R_NilValue;
     605             :         SEXP varvalue = R_NilValue;
     606             :         ParseStatus status;
     607             :         int i = 0;
     608             :         char argbuf[64];
     609             :         char *argnames = NULL;
     610             :         size_t argnameslen;
     611             :         size_t pos;
     612             :         char* rcall = NULL;
     613             :         size_t rcalllen;
     614             :         int ret_cols = 0; /* int because pci->retc is int, too*/
     615             :         str *args;
     616             :         int evalErr;
     617             :         char *msg = MAL_SUCCEED;
     618             :         BAT *b;
     619             :         node * argnode;
     620             :         int seengrp = FALSE;
     621             : 
     622          60 :         rapiClient = cntxt;
     623             : 
     624          60 :         if (!RAPIEnabled()) {
     625           0 :                 throw(MAL, "rapi.eval",
     626             :                           "Embedded R has not been enabled. Start server with --set %s=true",
     627             :                           rapi_enableflag);
     628             :         }
     629          60 :         if (!rapiInitialized) {
     630           0 :                 throw(MAL, "rapi.eval",
     631             :                           "Embedded R initialization has failed");
     632             :         }
     633             : 
     634          60 :         if (!grouped) {
     635          50 :                 sql_subfunc *sqlmorefun = (*(sql_subfunc**) getArgReference(stk, pci, pci->retc));
     636          50 :                 if (sqlmorefun) sqlfun = (*(sql_subfunc**) getArgReference(stk, pci, pci->retc))->func;
     637             :         } else {
     638          10 :                 sqlfun = *(sql_func**) getArgReference(stk, pci, pci->retc);
     639             :         }
     640             : 
     641          60 :         args = (str*) GDKzalloc(sizeof(str) * pci->argc);
     642          60 :         if (args == NULL) {
     643           0 :                 throw(MAL, "rapi.eval", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     644             :         }
     645             : 
     646             :         // get the lock even before initialization of the R interpreter, as this can take a second and must be done only once.
     647          60 :         MT_lock_set(&rapiLock);
     648             : 
     649          60 :         env = PROTECT(eval(lang1(install("new.env")), R_GlobalEnv));
     650          60 :         assert(env != NULL);
     651             : 
     652             :         // first argument after the return contains the pointer to the sql_func structure
     653             :         // NEW macro temporarily renamed to MNEW to allow including sql_catalog.h
     654             : 
     655          60 :         if (sqlfun != NULL && sqlfun->ops->cnt > 0) {
     656          23 :                 int carg = pci->retc + 2;
     657          23 :                 argnode = sqlfun->ops->h;
     658          70 :                 while (argnode) {
     659          47 :                         char* argname = ((sql_arg*) argnode->data)->name;
     660          47 :                         args[carg] = GDKstrdup(argname);
     661          47 :                         carg++;
     662          47 :                         argnode = argnode->next;
     663             :                 }
     664             :         }
     665             :         // the first unknown argument is the group, we don't really care for the rest.
     666             :         argnameslen = 2;
     667         154 :         for (i = pci->retc + 2; i < pci->argc; i++) {
     668          94 :                 if (args[i] == NULL) {
     669          47 :                         if (!seengrp && grouped) {
     670           6 :                                 args[i] = GDKstrdup("aggr_group");
     671             :                                 seengrp = TRUE;
     672             :                         } else {
     673          41 :                                 snprintf(argbuf, sizeof(argbuf), "arg%i", i - pci->retc - 1);
     674          41 :                                 args[i] = GDKstrdup(argbuf);
     675             :                         }
     676             :                 }
     677          94 :                 argnameslen += strlen(args[i]) + 2; /* extra for ", " */
     678             :         }
     679             : 
     680             :         // install the MAL variables into the R environment
     681             :         // we can basically map values to int ("INTEGER") or double ("REAL")
     682         149 :         for (i = pci->retc + 2; i < pci->argc; i++) {
     683          91 :                 int bat_type = getBatType(getArgType(mb,pci,i));
     684             :                 // check for BAT or scalar first, keep code left
     685          91 :                 if (!isaBatType(getArgType(mb,pci,i))) {
     686          22 :                         b = COLnew(0, getArgType(mb, pci, i), 0, TRANSIENT);
     687          22 :                         if (b == NULL) {
     688           0 :                                 msg = createException(MAL, "rapi.eval", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     689           0 :                                 goto wrapup;
     690             :                         }
     691          22 :                         if ( getArgType(mb,pci,i) == TYPE_str) {
     692           1 :                                 if (BUNappend(b, *getArgReference_str(stk, pci, i), false) != GDK_SUCCEED) {
     693           0 :                                         BBPreclaim(b);
     694             :                                         b = NULL;
     695           0 :                                         msg = createException(MAL, "rapi.eval", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     696           0 :                                         goto wrapup;
     697             :                                 }
     698             :                         } else {
     699          21 :                                 if (BUNappend(b, getArgReference(stk, pci, i), false) != GDK_SUCCEED) {
     700           0 :                                         BBPreclaim(b);
     701             :                                         b = NULL;
     702           0 :                                         msg = createException(MAL, "rapi.eval", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     703           0 :                                         goto wrapup;
     704             :                                 }
     705             :                         }
     706             :                 } else {
     707          69 :                         b = BATdescriptor(*getArgReference_bat(stk, pci, i));
     708          69 :                         if (b == NULL) {
     709           0 :                                 msg = createException(MAL, "rapi.eval", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     710           0 :                                 goto wrapup;
     711             :                         }
     712          69 :                         if (BATcount(b) == 0) { /* empty input, generate trivial return */
     713             :                                 /* I expect all inputs to have the same size, so this should be safe */
     714           2 :                                 msg = empty_return(mb, stk, pci, pci->retc, b->hseqbase);
     715           2 :                                 BBPunfix(b->batCacheid);
     716           2 :                                 goto wrapup;
     717             :                         }
     718             :                 }
     719             : 
     720             :                 // check the BAT count, if it is bigger than RAPI_MAX_TUPLES, fail
     721          89 :                 if (BATcount(b) > RAPI_MAX_TUPLES) {
     722           0 :                         msg = createException(MAL, "rapi.eval",
     723             :                                                                   "Got "BUNFMT" rows, but can only handle "LLFMT". Sorry.",
     724             :                                                                   BATcount(b), (lng) RAPI_MAX_TUPLES);
     725           0 :                         BBPunfix(b->batCacheid);
     726           0 :                         goto wrapup;
     727             :                 }
     728          89 :                 varname = PROTECT(Rf_install(args[i]));
     729          89 :                 varvalue = bat_to_sexp(b, bat_type);
     730          89 :                 if (varvalue == NULL) {
     731           0 :                         msg = createException(MAL, "rapi.eval", "unknown argument type ");
     732           0 :                         goto wrapup;
     733             :                 }
     734          89 :                 BBPunfix(b->batCacheid);
     735             : 
     736             :                 // install vector into R environment
     737          89 :                 Rf_defineVar(varname, varvalue, env);
     738          89 :                 UNPROTECT(2);
     739             :         }
     740             : 
     741             :         /* we are going to evaluate the user function within an anonymous function call:
     742             :          * ret <- (function(arg1){return(arg1*2)})(42)
     743             :          * the user code is put inside the {}, this keeps our environment clean (TM) and gives
     744             :          * a clear path for return values, namely using the builtin return() function
     745             :          * this is also compatible with PL/R
     746             :          */
     747             :         pos = 0;
     748          58 :         argnames = malloc(argnameslen);
     749          58 :         if (argnames == NULL) {
     750           0 :                 msg = createException(MAL, "rapi.eval", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     751           0 :                 goto wrapup;
     752             :         }
     753          58 :         argnames[0] = '\0';
     754         147 :         for (i = pci->retc + 2; i < pci->argc; i++) {
     755         178 :                 pos += snprintf(argnames + pos, argnameslen - pos, "%s%s",
     756         132 :                                                 args[i], i < pci->argc - 1 ? ", " : "");
     757             :         }
     758          58 :         rcalllen = 2 * pos + strlen(exprStr) + 100;
     759          58 :         rcall = malloc(rcalllen);
     760          58 :         if (rcall == NULL) {
     761           0 :                 msg = createException(MAL, "rapi.eval", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     762           0 :                 goto wrapup;
     763             :         }
     764          58 :         snprintf(rcall, rcalllen,
     765             :                          "ret <- as.data.frame((function(%s){%s})(%s), nm=NA, stringsAsFactors=F)\n",
     766             :                          argnames, exprStr, argnames);
     767          58 :         free(argnames);
     768             :         argnames = NULL;
     769             : #ifdef _RAPI_DEBUG_
     770             :         printf("# R call %s\n",rcall);
     771             : #endif
     772             : 
     773          58 :         x = R_ParseVector(mkString(rcall), 1, &status, R_NilValue);
     774             : 
     775          58 :         if (LENGTH(x) != 1 || status != PARSE_OK) {
     776           0 :                 msg = createException(MAL, "rapi.eval",
     777             :                                                           "Error parsing R expression '%s'. ", exprStr);
     778           0 :                 goto wrapup;
     779             :         }
     780             : 
     781          58 :         retval = R_tryEval(VECTOR_ELT(x, 0), env, &evalErr);
     782          58 :         if (evalErr != FALSE) {
     783           4 :                 char* errormsg = strdup(R_curErrorBuf());
     784             :                 size_t c;
     785           4 :                 if (errormsg == NULL) {
     786           0 :                         msg = createException(MAL, "rapi.eval", "Error running R expression.");
     787           0 :                         goto wrapup;
     788             :                 }
     789             :                 // remove newlines from error message so it fits into a MAPI error (lol)
     790         630 :                 for (c = 0; c < strlen(errormsg); c++) {
     791         626 :                         if (errormsg[c] == '\r' || errormsg[c] == '\n') {
     792          11 :                                 errormsg[c] = ' ';
     793             :                         }
     794             :                 }
     795           4 :                 msg = createException(MAL, "rapi.eval",
     796             :                                                           "Error running R expression: %s", errormsg);
     797           4 :                 free(errormsg);
     798           4 :                 goto wrapup;
     799             :         }
     800             : 
     801             :         // ret should be a data frame with exactly as many columns as we need from retc
     802          54 :         ret_cols = LENGTH(retval);
     803          54 :         if (ret_cols != pci->retc) {
     804           0 :                 msg = createException(MAL, "rapi.eval",
     805             :                                                           "Expected result of %d columns, got %d", pci->retc, ret_cols);
     806           0 :                 goto wrapup;
     807             :         }
     808             : 
     809             :         // collect the return values
     810         122 :         for (i = 0; i < pci->retc; i++) {
     811          69 :                 SEXP ret_col = VECTOR_ELT(retval, i);
     812          69 :                 int bat_type = getBatType(getArgType(mb,pci,i));
     813          69 :                 if (bat_type == TYPE_any || bat_type == TYPE_void) {
     814           0 :                         getArgType(mb,pci,i) = bat_type;
     815           0 :                         msg = createException(MAL, "rapi.eval",
     816             :                                                                   "Unknown return value, possibly projecting with no parameters.");
     817           0 :                         goto wrapup;
     818             :                 }
     819             : 
     820             :                 // hand over the vector into a BAT
     821          69 :                 b = sexp_to_bat(ret_col, bat_type);
     822          69 :                 if (b == NULL) {
     823           1 :                         msg = createException(MAL, "rapi.eval",
     824             :                                                                   "Failed to convert column %i", i);
     825           1 :                         goto wrapup;
     826             :                 }
     827             :                 // bat return
     828          68 :                 if (isaBatType(getArgType(mb,pci,i))) {
     829          58 :                         *getArgReference_bat(stk, pci, i) = b->batCacheid;
     830             :                 } else { // single value return, only for non-grouped aggregations
     831          10 :                         BATiter li = bat_iterator(b);
     832          10 :                         if (VALinit(&stk->stk[pci->argv[i]], bat_type,
     833          10 :                                                 BUNtail(li, 0)) == NULL) { // TODO BUNtail here
     834           0 :                                 msg = createException(MAL, "rapi.eval", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     835           0 :                                 bat_iterator_end(&li);
     836           0 :                                 goto wrapup;
     837             :                         }
     838          10 :                         bat_iterator_end(&li);
     839             :                 }
     840             :                 msg = MAL_SUCCEED;
     841             :         }
     842          53 :   wrapup:
     843             :         /* unprotect environment, so it will be eaten by the GC. */
     844          60 :         UNPROTECT(1);
     845          60 :         MT_lock_unset(&rapiLock);
     846          60 :         if (argnames)
     847           0 :                 free(argnames);
     848          60 :         if (rcall)
     849          58 :                 free(rcall);
     850         349 :         for (i = 0; i < pci->argc; i++)
     851         289 :                 GDKfree(args[i]);
     852          60 :         GDKfree(args);
     853             : 
     854          60 :         return msg;
     855             : }
     856             : 
     857          50 : static str RAPIevalStd(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
     858             :                                                         InstrPtr pci) {
     859          50 :         return RAPIeval(cntxt, mb, stk, pci, 0);
     860             : }
     861          10 : static str RAPIevalAggr(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
     862             :                                                          InstrPtr pci) {
     863          10 :         return RAPIeval(cntxt, mb, stk, pci, 1);
     864             : }
     865             : 
     866             : /* used for loopback queries from R
     867             :  * see test rapi10 in monetdb5/extras/rapi */
     868             : extern
     869             : #ifdef WIN32
     870             : __declspec(dllexport)
     871             : #endif
     872             : void *RAPIloopback(void *query);
     873             : 
     874             : void *
     875           3 : RAPIloopback(void *query) {
     876           3 :         res_table* output = NULL;
     877           3 :         char* querystr = (char*)CHAR(STRING_ELT(query, 0));
     878           3 :         char* err = SQLstatementIntern(rapiClient, querystr, "name", 1, 0, &output);
     879             : 
     880           3 :         if (err) { // there was an error
     881           0 :                 return ScalarString(RSTR(err));
     882             :         }
     883           3 :         if (output) {
     884           3 :                 int ncols = output->nr_cols;
     885           3 :                 if (ncols > 0) {
     886             :                         int i;
     887             :                         SEXP retlist, names, varvalue = R_NilValue;
     888           3 :                         retlist = PROTECT(allocVector(VECSXP, ncols));
     889           3 :                         names = PROTECT(NEW_STRING(ncols));
     890           6 :                         for (i = 0; i < ncols; i++) {
     891           3 :                                 BAT *b = BATdescriptor(output->cols[i].b);
     892           3 :                                 if (b == NULL || !(varvalue = bat_to_sexp(b, TYPE_any))) {
     893           0 :                                         UNPROTECT(i + 3);
     894           0 :                                         if (b)
     895           0 :                                                 BBPunfix(b->batCacheid);
     896           0 :                                         return ScalarString(RSTR("Conversion error"));
     897             :                                 }
     898           3 :                                 BBPunfix(b->batCacheid);
     899           3 :                                 SET_STRING_ELT(names, i, RSTR(output->cols[i].name));
     900           3 :                                 SET_VECTOR_ELT(retlist, i, varvalue);
     901             :                         }
     902           3 :                         res_table_destroy(output);
     903           3 :                         SET_NAMES(retlist, names);
     904           3 :                         UNPROTECT(ncols + 2);
     905           3 :                         return retlist;
     906             :                 }
     907           0 :                 res_table_destroy(output);
     908             :         }
     909           0 :         return ScalarLogical(1);
     910             : }
     911             : 
     912           6 : static str RAPIprelude(void *ret) {
     913             :         (void) ret;
     914             : 
     915           6 :         if (RAPIEnabled()) {
     916           6 :                 MT_lock_set(&rapiLock);
     917             :                 /* startup internal R environment  */
     918           6 :                 if (!rapiInitialized) {
     919             :                         char *initstatus;
     920           6 :                         initstatus = RAPIinitialize();
     921           6 :                         if (initstatus != 0) {
     922           0 :                                 MT_lock_unset(&rapiLock);
     923           0 :                                 throw(MAL, "rapi.eval",
     924             :                                           "failed to initialize R environment (%s)", initstatus);
     925             :                         }
     926           6 :                         Rf_defineVar(Rf_install("MONETDB_LIBDIR"), ScalarString(RSTR(LIBDIR)), R_GlobalEnv);
     927             : 
     928             :                 }
     929           6 :                 MT_lock_unset(&rapiLock);
     930           6 :                 printf("# MonetDB/R   module loaded\n");
     931             :         }
     932             :         return MAL_SUCCEED;
     933             : }
     934             : 
     935             : #include "mel.h"
     936             : static mel_func rapi_init_funcs[] = {
     937             :  pattern("rapi", "eval", RAPIevalStd, false, "Execute a simple R script returning a single value", args(1,3, argany("",0),arg("fptr",ptr),arg("expr",str))),
     938             :  pattern("rapi", "eval", RAPIevalStd, false, "Execute a simple R script value", args(1,4, varargany("",0),arg("fptr",ptr),arg("expr",str),varargany("arg",0))),
     939             :  pattern("rapi", "subeval_aggr", RAPIevalAggr, false, "grouped aggregates through R", args(1,4, varargany("",0),arg("fptr",ptr),arg("expr",str),varargany("arg",0))),
     940             :  pattern("rapi", "eval_aggr", RAPIevalAggr, false, "grouped aggregates through R", args(1,4, varargany("",0),arg("fptr",ptr),arg("expr",str),varargany("arg",0))),
     941             :  command("rapi", "prelude", RAPIprelude, false, "", args(1,1, arg("",void))),
     942             :  pattern("batrapi", "eval", RAPIevalStd, false, "Execute a simple R script value", args(1,4, varargany("",0),arg("fptr",ptr),arg("expr",str),varargany("arg",0))),
     943             :  pattern("batrapi", "subeval_aggr", RAPIevalAggr, false, "grouped aggregates through R", args(1,4, varargany("",0),arg("fptr",ptr),arg("expr",str),varargany("arg",0))),
     944             :  pattern("batrapi", "eval_aggr", RAPIevalAggr, false, "grouped aggregates through R", args(1,4, varargany("",0),arg("fptr",ptr),arg("expr",str),varargany("arg",0))),
     945             :  { .imp=NULL }
     946             : };
     947             : #include "mal_import.h"
     948             : #ifdef _MSC_VER
     949             : #undef read
     950             : #pragma section(".CRT$XCU",read)
     951             : #endif
     952           6 : LIB_STARTUP_FUNC(init_rapi_mal)
     953           6 : { mal_module("rapi", NULL, rapi_init_funcs); }

Generated by: LCOV version 1.14