LCOV - code coverage report
Current view: top level - monetdb5/modules/mal - remote.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 428 778 55.0 %
Date: 2021-10-27 03:06:47 Functions: 19 25 76.0 %

          Line data    Source code
       1             : /*
       2             :  * This Source Code Form is subject to the terms of the Mozilla Public
       3             :  * License, v. 2.0.  If a copy of the MPL was not distributed with this
       4             :  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
       5             :  *
       6             :  * Copyright 1997 - July 2008 CWI, August 2008 - 2021 MonetDB B.V.
       7             :  */
       8             : 
       9             : /*
      10             :  * (c) Fabian Groffen, Martin Kersten
      11             :  * Remote querying functionality
      12             :  * Communication with other mservers at the MAL level is a delicate task.
      13             :  * However, it is indispensable for any distributed functionality.  This
      14             :  * module provides an abstract way to store and retrieve objects on a
      15             :  * remote site.  Additionally, functions on a remote site can be executed
      16             :  * using objects available in the remote session context.  This yields in
      17             :  * four primitive functions that form the basis for distribution methods:
      18             :  * get, put, register and exec.
      19             :  *
      20             :  * The get method simply retrieves a copy of a remote object.  Objects can
      21             :  * be simple values, strings or Column.  The same holds for the put method,
      22             :  * but the other way around.  A local object can be stored on a remote
      23             :  * site.  Upon a successful store, the put method returns the remote
      24             :  * identifier for the stored object.  With this identifier the object can
      25             :  * be addressed, e.g. using the get method to retrieve the object that was
      26             :  * stored using put.
      27             :  *
      28             :  * The get and put methods are symmetric.  Performing a get on an
      29             :  * identifier that was returned by put, results in an object with the same
      30             :  * value and type as the one that was put.  The result of such an operation is
      31             :  * equivalent to making an (expensive) copy of the original object.
      32             :  *
      33             :  * The register function takes a local MAL function and makes it known at a
      34             :  * remote site. It ensures that it does not overload an already known
      35             :  * operation remotely, which could create a semantic conflict.
      36             :  * Deregistering a function is forbidden, because it would allow for taking
      37             :  * over the remote site completely.
      38             :  * C-implemented functions, such as io.print() cannot be remotely stored.
      39             :  * It would require even more complicated (byte) code shipping and remote
      40             :  * compilation to make it work.
      41             :  *
      42             :  * The choice to let exec only execute functions avoids problems
      43             :  * to decide what should be returned to the caller.  With a function it is
      44             :  * clear and simple to return that what the function signature prescribes.
      45             :  * Any side effect (e.g. io.print calls) may cause havoc in the system,
      46             :  * but are currently ignored.
      47             :  *
      48             :  * This leads to the final contract of this module.  The methods should be
      49             :  * used correctly, by obeying their contract.  Failing to do so will result
      50             :  * in errors and possibly undefined behaviour.
      51             :  *
      52             :  * The resolve() function can be used to query Merovingian.  It returns one
      53             :  * or more databases discovered in its vicinity matching the given pattern.
      54             :  *
      55             :  */
      56             : #include "monetdb_config.h"
      57             : #include "remote.h"
      58             : 
      59             : /*
      60             :  * Technically, these methods need to be serialised per connection,
      61             :  * hence a scheduler that interleaves e.g. multiple get calls, simply
      62             :  * violates this constraint.  If parallelism to the same site is
      63             :  * desired, a user could create a second connection.  This is not always
      64             :  * easy to generate at the proper place, e.g. overloading the dataflow
      65             :  * optimizer to patch connections structures is not acceptable.
      66             :  *
      67             :  * Instead, we maintain a simple lock with each connection, which can be
      68             :  * used to issue a safe, but blocking get/put/exec/register request.
      69             :  */
      70             : #ifdef HAVE_MAPI
      71             : 
      72             : 
      73             : #include "mal_exception.h"
      74             : #include "mal_interpreter.h"
      75             : #include "mal_function.h" /* for printFunction */
      76             : #include "mal_listing.h"
      77             : #include "mal_instruction.h" /* for getmodule/func macros */
      78             : #include "mal_authorize.h"
      79             : #include "mapi.h"
      80             : #include "mutils.h"
      81             : 
      82             : #define RMTT_L_ENDIAN   (0<<1)
      83             : #define RMTT_B_ENDIAN   (1<<1)
      84             : #define RMTT_32_BITS    (0<<2)
      85             : #define RMTT_64_BITS    (1<<2)
      86             : #define RMTT_32_OIDS    (0<<3)
      87             : #define RMTT_64_OIDS    (1<<3)
      88             : 
      89             : typedef struct _connection {
      90             :         MT_Lock            lock;      /* lock to avoid interference */
      91             :         str                name;      /* the handle for this connection */
      92             :         Mapi               mconn;     /* the Mapi handle for the connection */
      93             :         unsigned char      type;      /* binary profile of the connection target */
      94             :         size_t             nextid;    /* id counter */
      95             :         struct _connection *next;     /* the next connection in the list */
      96             : } *connection;
      97             : 
      98             : #ifndef WIN32
      99             : #include <sys/socket.h> /* socket */
     100             : #include <sys/un.h> /* sockaddr_un */
     101             : #endif
     102             : #include <unistd.h> /* gethostname */
     103             : 
     104             : static connection conns = NULL;
     105             : static unsigned char localtype = 0177;
     106             : 
     107             : static inline str RMTquery(MapiHdl *ret, const char *func, Mapi conn, const char *query);
     108             : 
     109             : /**
     110             :  * Returns a BAT with valid redirects for the given pattern.  If
     111             :  * merovingian is not running, this function throws an error.
     112             :  */
     113           0 : static str RMTresolve(bat *ret, str *pat) {
     114             : #ifdef NATIVE_WIN32
     115             :         (void) ret;
     116             :         (void) pat;
     117             :         throw(MAL, "remote.resolve", "merovingian is not available on "
     118             :                         "your platform, sorry"); /* please upgrade to Linux, etc. */
     119             : #else
     120             :         BAT *list;
     121             :         const char *mero_uri;
     122             :         char *p;
     123             :         unsigned int port;
     124             :         char **redirs;
     125             :         char **or;
     126             : 
     127           0 :         if (pat == NULL || *pat == NULL || strcmp(*pat, (str)str_nil) == 0)
     128           0 :                 throw(ILLARG, "remote.resolve",
     129             :                                 ILLEGAL_ARGUMENT ": pattern is NULL or nil");
     130             : 
     131           0 :         mero_uri = GDKgetenv("merovingian_uri");
     132           0 :         if (mero_uri == NULL)
     133           0 :                 throw(MAL, "remote.resolve", "this function needs the mserver "
     134             :                                 "have been started by merovingian");
     135             : 
     136           0 :         list = COLnew(0, TYPE_str, 0, TRANSIENT);
     137           0 :         if (list == NULL)
     138           0 :                 throw(MAL, "remote.resolve", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     139             : 
     140             :         /* extract port from mero_uri, let mapi figure out the rest */
     141           0 :         mero_uri+=strlen("mapi:monetdb://");
     142           0 :         if (*mero_uri == '[') {
     143           0 :                 if ((mero_uri = strchr(mero_uri, ']')) == NULL) {
     144           0 :                         BBPreclaim(list);
     145           0 :                         throw(MAL, "remote.resolve", "illegal IPv6 address on merovingian_uri: %s",
     146             :                                   GDKgetenv("merovingian_uri"));
     147             :                 }
     148             :         }
     149           0 :         if ((p = strchr(mero_uri, ':')) == NULL) {
     150           0 :                 BBPreclaim(list);
     151           0 :                 throw(MAL, "remote.resolve", "illegal merovingian_uri setting: %s",
     152             :                                 GDKgetenv("merovingian_uri"));
     153             :         }
     154           0 :         port = (unsigned int)atoi(p + 1);
     155             : 
     156           0 :         or = redirs = mapi_resolve(NULL, port, *pat);
     157             : 
     158           0 :         if (redirs == NULL) {
     159           0 :                 BBPreclaim(list);
     160           0 :                 throw(MAL, "remote.resolve", "unknown failure when resolving pattern");
     161             :         }
     162             : 
     163           0 :         while (*redirs != NULL) {
     164           0 :                 if (BUNappend(list, (ptr)*redirs, false) != GDK_SUCCEED) {
     165           0 :                         BBPreclaim(list);
     166             :                         do
     167           0 :                                 free(*redirs);
     168           0 :                         while (*++redirs);
     169           0 :                         free(or);
     170           0 :                         throw(MAL, "remote.resolve", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     171             :                 }
     172           0 :                 free(*redirs);
     173           0 :                 redirs++;
     174             :         }
     175           0 :         free(or);
     176             : 
     177           0 :         BBPkeepref(*ret = list->batCacheid);
     178           0 :         return(MAL_SUCCEED);
     179             : #endif
     180             : }
     181             : 
     182             : 
     183             : /* for unique connection identifiers */
     184             : static size_t connection_id = 0;
     185             : 
     186             : /**
     187             :  * Returns a connection to the given uri.  It always returns a newly
     188             :  * created connection.
     189             :  */
     190          79 : static str RMTconnectScen(
     191             :                 str *ret,
     192             :                 str *ouri,
     193             :                 str *user,
     194             :                 str *passwd,
     195             :                 str *scen,
     196             :                 bit *columnar)
     197             : {
     198             :         connection c;
     199             :         char conn[BUFSIZ];
     200             :         char *s;
     201             :         Mapi m;
     202             :         MapiHdl hdl;
     203             :         str msg;
     204             : 
     205             :         /* just make sure the return isn't garbage */
     206          79 :         *ret = 0;
     207             : 
     208          79 :         if (ouri == NULL || *ouri == NULL || strcmp(*ouri, (str)str_nil) == 0)
     209           0 :                 throw(ILLARG, "remote.connect", ILLEGAL_ARGUMENT ": database uri "
     210             :                                 "is NULL or nil");
     211          79 :         if (user == NULL || *user == NULL || strcmp(*user, (str)str_nil) == 0)
     212           0 :                 throw(ILLARG, "remote.connect", ILLEGAL_ARGUMENT ": username is "
     213             :                                 "NULL or nil");
     214          79 :         if (passwd == NULL || *passwd == NULL || strcmp(*passwd, (str)str_nil) == 0)
     215           0 :                 throw(ILLARG, "remote.connect", ILLEGAL_ARGUMENT ": password is "
     216             :                                 "NULL or nil");
     217          79 :         if (scen == NULL || *scen == NULL || strcmp(*scen, (str)str_nil) == 0)
     218           0 :                 throw(ILLARG, "remote.connect", ILLEGAL_ARGUMENT ": scenario is "
     219             :                                 "NULL or nil");
     220          79 :         if (strcmp(*scen, "mal") != 0 && strcmp(*scen, "msql") != 0)
     221           0 :                 throw(ILLARG, "remote.connect", ILLEGAL_ARGUMENT ": scenario '%s' "
     222             :                                 "is not supported", *scen);
     223             : 
     224          79 :         m = mapi_mapiuri(*ouri, *user, *passwd, *scen);
     225          79 :         if (mapi_error(m)) {
     226           0 :                 msg = createException(MAL, "remote.connect",
     227             :                                                           "unable to connect to '%s': %s",
     228             :                                                           *ouri, mapi_error_str(m));
     229           0 :                 mapi_destroy(m);
     230           0 :                 return msg;
     231             :         }
     232             : 
     233          79 :         MT_lock_set(&mal_remoteLock);
     234             : 
     235             :         /* generate an unique connection name, they are only known
     236             :          * within one mserver, id is primary key, the rest is super key */
     237          79 :         snprintf(conn, BUFSIZ, "%s_%s_%zu", mapi_get_dbname(m), *user, connection_id++);
     238             :         /* make sure we can construct MAL identifiers using conn */
     239        1590 :         for (s = conn; *s != '\0'; s++) {
     240        1511 :                 if (!isalnum((unsigned char)*s)) {
     241         219 :                         *s = '_';
     242             :                 }
     243             :         }
     244             : 
     245          79 :         if (mapi_reconnect(m) != MOK) {
     246           3 :                 MT_lock_unset(&mal_remoteLock);
     247           3 :                 msg = createException(IO, "remote.connect",
     248             :                                                           "unable to connect to '%s': %s",
     249             :                                                           *ouri, mapi_error_str(m));
     250           3 :                 mapi_destroy(m);
     251           3 :                 return msg;
     252             :         }
     253             : 
     254          76 :         if (columnar && *columnar) {
     255             :                 char set_protocol_query_buf[50];
     256           1 :                 snprintf(set_protocol_query_buf, 50, "sql.set_protocol(%d:int);", PROTOCOL_COLUMNAR);
     257           1 :                 if ((msg = RMTquery(&hdl, "remote.connect", m, set_protocol_query_buf))) {
     258           0 :                         mapi_destroy(m);
     259           0 :                         MT_lock_unset(&mal_remoteLock);
     260           0 :                         return msg;
     261             :                 }
     262             :         }
     263             : 
     264             :         /* connection established, add to list */
     265          76 :         c = GDKzalloc(sizeof(struct _connection));
     266          76 :         if ( c == NULL || (c->name = GDKstrdup(conn)) == NULL) {
     267           0 :                 GDKfree(c);
     268           0 :                 mapi_destroy(m);
     269           0 :                 MT_lock_unset(&mal_remoteLock);
     270           0 :                 throw(MAL,"remote.connect", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     271             :         }
     272          76 :         c->mconn = m;
     273          76 :         c->nextid = 0;
     274          76 :         MT_lock_init(&c->lock, c->name);
     275          76 :         c->next = conns;
     276          76 :         conns = c;
     277             : 
     278          76 :         msg = RMTquery(&hdl, "remote.connect", m, "remote.bintype();");
     279          76 :         if (msg) {
     280           0 :                 MT_lock_unset(&mal_remoteLock);
     281           0 :                 return msg;
     282             :         }
     283         152 :         if (hdl != NULL && mapi_fetch_row(hdl)) {
     284          76 :                 char *val = mapi_fetch_field(hdl, 0);
     285          76 :                 c->type = (unsigned char)atoi(val);
     286          76 :                 mapi_close_handle(hdl);
     287             :         } else {
     288           0 :                 c->type = 0;
     289             :         }
     290             : 
     291             : #ifdef _DEBUG_MAPI_
     292             :         mapi_trace(c->mconn, true);
     293             : #endif
     294             : 
     295          76 :         MT_lock_unset(&mal_remoteLock);
     296             : 
     297          76 :         *ret = GDKstrdup(conn);
     298          76 :         if(*ret == NULL)
     299           0 :                 throw(MAL,"remote.connect", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     300             :         return(MAL_SUCCEED);
     301             : }
     302             : 
     303             : static str
     304           0 : RMTconnect(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) {
     305             :         (void) cntxt;
     306             :         (void) mb;
     307           0 :                 str* ret        = getArgReference_str(stk, pci, 0);
     308           0 :                 str* uri        = getArgReference_str(stk, pci, 1);
     309           0 :                 str* user       = getArgReference_str(stk, pci, 2);
     310           0 :                 str* passwd     = getArgReference_str(stk, pci, 3);
     311             : 
     312           0 :                 str scen = "msql";
     313             : 
     314           0 :                 if (pci->argc >= 5)
     315           0 :                         scen = *getArgReference_str(stk, pci, 4);
     316             : 
     317           0 :                 return RMTconnectScen(ret, uri, user, passwd, &scen, NULL);
     318             : }
     319             : 
     320             : static str
     321          78 : RMTconnectTable(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
     322             : {
     323             :         char *local_table;
     324          78 :         char *remoteuser = NULL;
     325          78 :         char *passwd = NULL;
     326          78 :         char *uri = NULL;
     327             :         char *tmp;
     328             :         char *ret;
     329             :         str scen;
     330             :         str msg;
     331             :         ValPtr v;
     332             : 
     333             :         (void)mb;
     334             :         (void)cntxt;
     335             : 
     336          78 :         local_table = *getArgReference_str(stk, pci, 1);
     337          78 :         scen = *getArgReference_str(stk, pci, 2);
     338          78 :         if (local_table == NULL || strcmp(local_table, (str)str_nil) == 0) {
     339           0 :                 throw(ILLARG, "remote.connect", ILLEGAL_ARGUMENT ": local table is NULL or nil");
     340             :         }
     341             : 
     342          78 :         rethrow("remote.connect", tmp, AUTHgetRemoteTableCredentials(local_table, &uri, &remoteuser, &passwd));
     343          78 :         if (!remoteuser)
     344           0 :                 remoteuser = GDKstrdup("");
     345          78 :         if (!passwd)
     346           0 :                 passwd = GDKstrdup("");
     347             :         /* The password we just got is hashed. Add the byte \1 in front to
     348             :          * signal this fact to the mapi. */
     349          78 :         size_t pwlen = strlen(passwd);
     350          78 :         char *pwhash = (char*)GDKmalloc(pwlen + 2);
     351          78 :         if (pwhash == NULL) {
     352           0 :                 GDKfree(remoteuser);
     353           0 :                 GDKfree(passwd);
     354           0 :                 throw(MAL, "remote.connect", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     355             :         }
     356          78 :         snprintf(pwhash, pwlen + 2, "\1%s", passwd);
     357             : 
     358          78 :         msg = RMTconnectScen(&ret, &uri, &remoteuser, &pwhash, &scen, NULL);
     359             : 
     360          78 :         GDKfree(passwd);
     361          78 :         GDKfree(pwhash);
     362             : 
     363          78 :         if (msg == MAL_SUCCEED) {
     364          75 :                 v = &stk->stk[pci->argv[0]];
     365          75 :                 if (VALinit(v, TYPE_str, ret) == NULL) {
     366           0 :                         GDKfree(ret);
     367           0 :                         throw(MAL, "remote.connect", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     368             :                 }
     369             :         }
     370             : 
     371          78 :         GDKfree(ret);
     372          78 :         return msg;
     373             : }
     374             : 
     375             : 
     376             : /**
     377             :  * Disconnects a connection.  The connection needs not to exist in the
     378             :  * system, it only needs to exist for the client (i.e. it was once
     379             :  * created).
     380             :  */
     381             : str
     382          76 : RMTdisconnect(void *ret, str *conn) {
     383             :         connection c, t;
     384             : 
     385          76 :         if (conn == NULL || *conn == NULL || strcmp(*conn, (str)str_nil) == 0)
     386           0 :                 throw(ILLARG, "remote.disconnect", ILLEGAL_ARGUMENT ": connection "
     387             :                                 "is NULL or nil");
     388             : 
     389             : 
     390             :         (void) ret;
     391             : 
     392             :         /* we need a lock because the same user can be handled by multiple
     393             :          * threads */
     394          76 :         MT_lock_set(&mal_remoteLock);
     395          76 :         c = conns;
     396             :         t = NULL; /* parent */
     397             :         /* walk through the list */
     398          97 :         while (c != NULL) {
     399          97 :                 if (strcmp(c->name, *conn) == 0) {
     400             :                         /* ok, delete it... */
     401          76 :                         if (t == NULL) {
     402          65 :                                 conns = c->next;
     403             :                         } else {
     404          11 :                                 t->next = c->next;
     405             :                         }
     406             : 
     407          76 :                         MT_lock_set(&c->lock); /* shared connection */
     408          76 :                         mapi_disconnect(c->mconn);
     409          76 :                         mapi_destroy(c->mconn);
     410          76 :                         MT_lock_unset(&c->lock);
     411          76 :                         MT_lock_destroy(&c->lock);
     412          76 :                         GDKfree(c->name);
     413          76 :                         GDKfree(c);
     414          76 :                         MT_lock_unset(&mal_remoteLock);
     415          76 :                         return MAL_SUCCEED;
     416             :                 }
     417             :                 t = c;
     418          21 :                 c = c->next;
     419             :         }
     420             : 
     421           0 :         MT_lock_unset(&mal_remoteLock);
     422           0 :         throw(MAL, "remote.disconnect", "no such connection: %s", *conn);
     423             : }
     424             : 
     425             : /**
     426             :  * Helper function to return a connection matching a given string, or an
     427             :  * error if it does not exist.  Since this function is internal, it
     428             :  * doesn't check the argument conn, as it should have been checked
     429             :  * already.
     430             :  * NOTE: this function acquires the mal_remoteLock before accessing conns
     431             :  */
     432             : static inline str
     433        3402 : RMTfindconn(connection *ret, const char *conn) {
     434             :         connection c;
     435             : 
     436             :         /* just make sure the return isn't garbage */
     437        3402 :         *ret = NULL;
     438        3402 :         MT_lock_set(&mal_remoteLock); /* protect c */
     439        3402 :         c = conns;
     440        3965 :         while (c != NULL) {
     441        3965 :                 if (strcmp(c->name, conn) == 0) {
     442        3402 :                         *ret = c;
     443        3402 :                         MT_lock_unset(&mal_remoteLock);
     444        3402 :                         return(MAL_SUCCEED);
     445             :                 }
     446         563 :                 c = c->next;
     447             :         }
     448           0 :         MT_lock_unset(&mal_remoteLock);
     449           0 :         throw(MAL, "remote.<findconn>", "no such connection: %s", conn);
     450             : }
     451             : 
     452             : /**
     453             :  * Little helper function that returns a GDKmalloced string containing a
     454             :  * valid identifier that is supposed to be unique in the connection's
     455             :  * remote context.  The generated string depends on the module and
     456             :  * function the caller is in. But also the runtime context is important.
     457             :  * The format is rmt<id>_<retvar>_<type>.  Every RMTgetId uses a fresh id,
     458             :  * to distinguish amongst different (parallel) execution context.
     459             :  * Re-use of this remote identifier should be done with care.
     460             :  * The encoding of the type allows for ease of type checking later on.
     461             :  */
     462             : static inline str
     463        1922 : RMTgetId(char *buf, MalBlkPtr mb, InstrPtr p, int arg) {
     464             :         InstrPtr f;
     465             :         const char *mod;
     466             :         char *var;
     467             :         str rt;
     468             :         static ATOMIC_TYPE idtag = ATOMIC_VAR_INIT(0);
     469             : 
     470        1922 :         if( p->retc == 0)
     471           0 :                 throw(MAL, "remote.getId", ILLEGAL_ARGUMENT "MAL instruction misses retc");
     472             : 
     473        1922 :         var = getArgName(mb, p, arg);
     474             :         f = getInstrPtr(mb, 0); /* top level function */
     475             :         mod = getModuleId(f);
     476             :         if (mod == NULL)
     477             :                 mod = "user";
     478        1922 :         rt = getTypeIdentifier(getArgType(mb,p,arg));
     479        1922 :         if (rt == NULL)
     480           0 :                 throw(MAL, "remote.put", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     481             : 
     482        1922 :         snprintf(buf, BUFSIZ, "rmt%u_%s_%s", (unsigned) ATOMIC_ADD(&idtag, 1), var, rt);
     483             : 
     484        1922 :         GDKfree(rt);
     485        1922 :         return(MAL_SUCCEED);
     486             : }
     487             : 
     488             : /**
     489             :  * Helper function to execute a query over the given connection,
     490             :  * returning the result handle.  If communication fails in one way or
     491             :  * another, an error is returned.  Since this function is internal, it
     492             :  * doesn't check the input arguments func, conn and query, as they
     493             :  * should have been checked already.
     494             :  * NOTE: this function assumes a lock for conn is set
     495             :  */
     496             : static inline str
     497        2280 : RMTquery(MapiHdl *ret, const char *func, Mapi conn, const char *query) {
     498             :         MapiHdl mhdl;
     499             : 
     500        2280 :         *ret = NULL;
     501        2280 :         mhdl = mapi_query(conn, query);
     502        2280 :         if (mhdl) {
     503        2280 :                 if (mapi_result_error(mhdl) != NULL) {
     504          11 :                         str err = createException(
     505             :                                         getExceptionType(mapi_result_error(mhdl)),
     506             :                                         func,
     507             :                                         "(mapi:monetdb://%s@%s/%s) %s",
     508             :                                         mapi_get_user(conn),
     509             :                                         mapi_get_host(conn),
     510             :                                         mapi_get_dbname(conn),
     511             :                                         getExceptionMessage(mapi_result_error(mhdl)));
     512          11 :                         mapi_close_handle(mhdl);
     513          11 :                         return(err);
     514             :                 }
     515             :         } else {
     516           0 :                 if (mapi_error(conn) != MOK) {
     517           0 :                         throw(IO, func, "an error occurred on connection: %s",
     518             :                                         mapi_error_str(conn));
     519             :                 } else {
     520           0 :                         throw(MAL, func, "remote function invocation didn't return a result");
     521             :                 }
     522             :         }
     523             : 
     524        2269 :         *ret = mhdl;
     525        2269 :         return(MAL_SUCCEED);
     526             : }
     527             : 
     528         264 : static str RMTprelude(void *ret) {
     529             :         unsigned int type = 0;
     530             : 
     531             :         (void)ret;
     532             : #ifdef WORDS_BIGENDIAN
     533             :         type |= RMTT_B_ENDIAN;
     534             : #else
     535             :         type |= RMTT_L_ENDIAN;
     536             : #endif
     537             : #if SIZEOF_SIZE_T == SIZEOF_LNG
     538             :         type |= RMTT_64_BITS;
     539             : #else
     540             :         type |= RMTT_32_BITS;
     541             : #endif
     542             : #if SIZEOF_OID == SIZEOF_LNG
     543             :         type |= RMTT_64_OIDS;
     544             : #else
     545             :         type |= RMTT_32_OIDS;
     546             : #endif
     547         264 :         localtype = (unsigned char)type;
     548             : 
     549         264 :         return(MAL_SUCCEED);
     550             : }
     551             : 
     552         262 : static str RMTepilogue(void *ret) {
     553             :         connection c, t;
     554             : 
     555             :         (void)ret;
     556             : 
     557         262 :         MT_lock_set(&mal_remoteLock); /* nobody allowed here */
     558             :         /* free connections list */
     559         262 :         c = conns;
     560         262 :         while (c != NULL) {
     561             :                 t = c;
     562           0 :                 c = c->next;
     563           0 :                 MT_lock_set(&t->lock);
     564           0 :                 mapi_destroy(t->mconn);
     565           0 :                 MT_lock_unset(&t->lock);
     566           0 :                 MT_lock_destroy(&t->lock);
     567           0 :                 GDKfree(t->name);
     568           0 :                 GDKfree(t);
     569             :         }
     570             :         /* not sure, but better be safe than sorry */
     571         262 :         conns = NULL;
     572         262 :         MT_lock_unset(&mal_remoteLock);
     573             : 
     574         262 :         return(MAL_SUCCEED);
     575             : }
     576             : static str
     577        1201 : RMTreadbatheader(stream* sin, char* buf) {
     578             :                 ssize_t sz = 0, rd;
     579             : 
     580             :                 /* read the JSON header */
     581      195220 :                 while ((rd = mnstr_read(sin, &buf[sz], 1, 1)) == 1 && buf[sz] != '\n') {
     582      194019 :                         sz += rd;
     583             :                 }
     584        1201 :                 if (rd < 0) {
     585           0 :                         throw(MAL, "remote.get", "could not read BAT JSON header");
     586             :                 }
     587        1201 :                 if (buf[0] == '!') {
     588             :                         char *result;
     589           0 :                         if((result = GDKstrdup(buf)) == NULL)
     590           0 :                                 throw(MAL, "remote.get", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     591             :                         return result;
     592             :                 }
     593             : 
     594        1201 :                 buf[sz] = '\0';
     595             : 
     596        1201 :                 return MAL_SUCCEED;
     597             : }
     598             : 
     599             : typedef struct _binbat_v1 {
     600             :         int Ttype;
     601             :         oid Hseqbase;
     602             :         oid Tseqbase;
     603             :         bool
     604             :                 Tsorted:1,
     605             :                 Trevsorted:1,
     606             :                 Tkey:1,
     607             :                 Tnonil:1,
     608             :                 Tdense:1;
     609             :         BUN size;
     610             :         size_t tailsize;
     611             :         size_t theapsize;
     612             : } binbat;
     613             : 
     614             : static str
     615        1201 : RMTinternalcopyfrom(BAT **ret, char *hdr, stream *in, bool must_flush)
     616             : {
     617        1201 :         binbat bb = { 0, 0, 0, false, false, false, false, false, 0, 0, 0 };
     618             :         char *nme = NULL;
     619             :         char *val = NULL;
     620             :         char tmp;
     621             :         size_t len;
     622             :         lng lv, *lvp;
     623             : 
     624             :         BAT *b;
     625             : 
     626             :         /* hdr is a JSON structure that looks like
     627             :          * {"version":1,"ttype":6,"tseqbase":0,"tailsize":4,"theapsize":0}
     628             :          * we take the binary data directly from the stream */
     629             : 
     630             :         /* could skip whitespace, but we just don't allow that */
     631        1201 :         if (*hdr++ != '{')
     632           0 :                 throw(MAL, "remote.bincopyfrom", "illegal input, not a JSON header (got '%s')", hdr - 1);
     633      194019 :         while (*hdr != '\0') {
     634      192818 :                 switch (*hdr) {
     635       28824 :                         case '"':
     636             :                                 /* we assume only numeric values, so all strings are
     637             :                                  * elems */
     638       28824 :                                 if (nme != NULL) {
     639       14412 :                                         *hdr = '\0';
     640             :                                 } else {
     641       14412 :                                         nme = hdr + 1;
     642             :                                 }
     643             :                                 break;
     644       14412 :                         case ':':
     645       14412 :                                 val = hdr + 1;
     646       14412 :                                 break;
     647       14412 :                         case ',':
     648             :                         case '}':
     649       14412 :                                 if (val == NULL)
     650           0 :                                         throw(MAL, "remote.bincopyfrom",
     651             :                                                         "illegal input, JSON value missing");
     652       14412 :                                 *hdr = '\0';
     653             : 
     654       14412 :                                 lvp = &lv;
     655       14412 :                                 len = sizeof(lv);
     656             :                                 /* tseqbase can be 1<<31/1<<63 which causes overflow
     657             :                                  * in lngFromStr, so we check separately */
     658       14412 :                                 if (strcmp(val,
     659             : #if SIZEOF_OID == 8
     660             :                                                    "9223372036854775808"
     661             : #else
     662             :                                                    "2147483648"
     663             : #endif
     664        1197 :                                                 ) == 0 &&
     665        1197 :                                         strcmp(nme, "tseqbase") == 0) {
     666        1197 :                                         bb.Tseqbase = oid_nil;
     667             :                                 } else {
     668             :                                         /* all values should be non-negative, so we check that
     669             :                                          * here as well */
     670       13215 :                                         if (lngFromStr(val, &len, &lvp, true) < 0 ||
     671       13215 :                                                 lv < 0 /* includes lng_nil */)
     672           0 :                                                 throw(MAL, "remote.bincopyfrom",
     673             :                                                           "bad %s value: %s", nme, val);
     674             : 
     675             :                                         /* deal with nme and val */
     676       13215 :                                         if (strcmp(nme, "version") == 0) {
     677        1201 :                                                 if (lv != 1)
     678           0 :                                                         throw(MAL, "remote.bincopyfrom",
     679             :                                                                   "unsupported version: %s", val);
     680       12014 :                                         } else if (strcmp(nme, "hseqbase") == 0) {
     681             : #if SIZEOF_OID < SIZEOF_LNG
     682             :                                                 if (lv > GDK_oid_max)
     683             :                                                         throw(MAL, "remote.bincopyfrom",
     684             :                                                                           "bad %s value: %s", nme, val);
     685             : #endif
     686        1201 :                                                 bb.Hseqbase = (oid)lv;
     687       10813 :                                         } else if (strcmp(nme, "ttype") == 0) {
     688        1201 :                                                 if (lv >= GDKatomcnt)
     689           0 :                                                         throw(MAL, "remote.bincopyfrom",
     690             :                                                                   "bad %s value: GDK atom number %s doesn't exist", nme, val);
     691        1201 :                                                 bb.Ttype = (int) lv;
     692        9612 :                                         } else if (strcmp(nme, "tseqbase") == 0) {
     693             : #if SIZEOF_OID < SIZEOF_LNG
     694             :                                                 if (lv > GDK_oid_max)
     695             :                                                         throw(MAL, "remote.bincopyfrom",
     696             :                                                                   "bad %s value: %s", nme, val);
     697             : #endif
     698           4 :                                                 bb.Tseqbase = (oid) lv;
     699        9608 :                                         } else if (strcmp(nme, "tsorted") == 0) {
     700        1201 :                                                 bb.Tsorted = lv != 0;
     701        8407 :                                         } else if (strcmp(nme, "trevsorted") == 0) {
     702        1201 :                                                 bb.Trevsorted = lv != 0;
     703        7206 :                                         } else if (strcmp(nme, "tkey") == 0) {
     704        1201 :                                                 bb.Tkey = lv != 0;
     705        6005 :                                         } else if (strcmp(nme, "tnonil") == 0) {
     706        1201 :                                                 bb.Tnonil = lv != 0;
     707        4804 :                                         } else if (strcmp(nme, "tdense") == 0) {
     708        1201 :                                                 bb.Tdense = lv != 0;
     709        3603 :                                         } else if (strcmp(nme, "size") == 0) {
     710        1201 :                                                 if (lv > (lng) BUN_MAX)
     711           0 :                                                         throw(MAL, "remote.bincopyfrom",
     712             :                                                                   "bad %s value: %s", nme, val);
     713        1201 :                                                 bb.size = (BUN) lv;
     714        2402 :                                         } else if (strcmp(nme, "tailsize") == 0) {
     715        1201 :                                                 bb.tailsize = (size_t) lv;
     716        1201 :                                         } else if (strcmp(nme, "theapsize") == 0) {
     717        1201 :                                                 bb.theapsize = (size_t) lv;
     718             :                                         } else {
     719           0 :                                                 throw(MAL, "remote.bincopyfrom",
     720             :                                                           "unknown element: %s", nme);
     721             :                                         }
     722             :                                 }
     723             :                                 nme = val = NULL;
     724             :                                 break;
     725             :                 }
     726      192818 :                 hdr++;
     727             :         }
     728             : 
     729        1201 :         b = COLnew2(bb.Hseqbase, bb.Ttype, bb.size, TRANSIENT, bb.size > 0 ? (uint16_t) (bb.tailsize / bb.size) : 0);
     730        1201 :         if (b == NULL)
     731           0 :                 throw(MAL, "remote.get", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     732             : 
     733        1201 :         if (bb.tailsize > 0) {
     734        2258 :                 if (HEAPextend(b->theap, bb.tailsize, true) != GDK_SUCCEED ||
     735        1129 :                         mnstr_read(in, b->theap->base, bb.tailsize, 1) < 0)
     736           0 :                         goto bailout;
     737        1129 :                 b->theap->dirty = true;
     738             :         }
     739        1201 :         if (bb.theapsize > 0) {
     740          70 :                 if ((b->tvheap->base == NULL &&
     741          70 :                          (*BATatoms[b->ttype].atomHeap)(b->tvheap, b->batCapacity) != GDK_SUCCEED) ||
     742          70 :                         HEAPextend(b->tvheap, bb.theapsize, true) != GDK_SUCCEED ||
     743          35 :                         mnstr_read(in, b->tvheap->base, bb.theapsize, 1) < 0)
     744           0 :                         goto bailout;
     745          35 :                 b->tvheap->free = bb.theapsize;
     746          35 :                 b->tvheap->dirty = true;
     747             :         }
     748             : 
     749             :         /* set properties */
     750        1201 :         b->tseqbase = bb.Tdense ? bb.Tseqbase : oid_nil;
     751        1201 :         b->tsorted = bb.Tsorted;
     752        1201 :         b->trevsorted = bb.Trevsorted;
     753        1201 :         b->tkey = bb.Tkey;
     754        1201 :         b->tnonil = bb.Tnonil;
     755        1201 :         if (bb.Ttype == TYPE_str && bb.size)
     756          35 :                 BATsetcapacity(b, (BUN) (bb.tailsize >> b->tshift));
     757        1201 :         BATsetcount(b, bb.size);
     758             : 
     759             :         // read blockmode flush
     760        1201 :         while (must_flush && mnstr_read(in, &tmp, 1, 1) > 0) {
     761           0 :                 TRC_ERROR(MAL_REMOTE, "Expected flush, got: %c\n", tmp);
     762             :         }
     763             : 
     764        1201 :         BATsettrivprop(b);
     765             : 
     766        1201 :         *ret = b;
     767        1201 :         return(MAL_SUCCEED);
     768             : 
     769           0 :   bailout:
     770           0 :         BBPreclaim(b);
     771           0 :         throw(MAL, "remote.bincopyfrom", "reading failed");
     772             : }
     773             : 
     774             : /**
     775             :  * get fetches the object referenced by ident over connection conn.
     776             :  * We are only interested in retrieving void-headed BATs, i.e. single columns.
     777             :  */
     778        1199 : static str RMTget(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) {
     779             :         str conn, ident, tmp, rt;
     780             :         connection c;
     781             :         char qbuf[BUFSIZ + 1];
     782        1199 :         MapiHdl mhdl = NULL;
     783             :         int rtype;
     784             :         ValPtr v;
     785             : 
     786             :         (void)mb;
     787             :         (void) cntxt;
     788             : 
     789        1199 :         conn = *getArgReference_str(stk, pci, 1);
     790        1199 :         if (conn == NULL || strcmp(conn, (str)str_nil) == 0)
     791           0 :                 throw(ILLARG, "remote.get", ILLEGAL_ARGUMENT ": connection name is NULL or nil");
     792        1199 :         ident = *getArgReference_str(stk, pci, 2);
     793        1199 :         if (ident == 0 || isIdentifier(ident) < 0)
     794           0 :                 throw(ILLARG, "remote.get", ILLEGAL_ARGUMENT ": identifier expected, got '%s'", ident);
     795             : 
     796             :         /* lookup conn, set c if valid */
     797        1199 :         rethrow("remote.get", tmp, RMTfindconn(&c, conn));
     798             : 
     799        1199 :         rtype = getArgType(mb, pci, 0);
     800        1199 :         v = &stk->stk[pci->argv[0]];
     801             : 
     802        1199 :         if (rtype == TYPE_any || isAnyExpression(rtype)) {
     803             :                 char *tpe, *msg;
     804           0 :                 tpe = getTypeName(rtype);
     805           0 :                 msg = createException(MAL, "remote.get", ILLEGAL_ARGUMENT ": unsupported any type: %s",
     806             :                                                           tpe);
     807           0 :                 GDKfree(tpe);
     808           0 :                 return msg;
     809             :         }
     810             :         /* check if the remote type complies with what we expect.
     811             :            Since the put() encodes the type as known to the remote site
     812             :            we can simple compare it here */
     813        1199 :         rt = getTypeIdentifier(rtype);
     814        1199 :         if (rt == NULL)
     815           0 :                 throw(MAL, "remote.get", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     816        1199 :         if (strcmp(ident + strlen(ident) - strlen(rt), rt)) {
     817           0 :                 tmp = createException(MAL, "remote.get", ILLEGAL_ARGUMENT
     818             :                         ": remote object type %s does not match expected type %s",
     819             :                         rt, ident);
     820           0 :                 GDKfree(rt);
     821           0 :                 return tmp;
     822             :         }
     823        1199 :         GDKfree(rt);
     824             : 
     825        1199 :         if (isaBatType(rtype) && (localtype == 0177 || localtype != c->type ))
     826           0 :         {
     827             :                 int t;
     828             :                 size_t s;
     829             :                 ptr r;
     830             :                 str var;
     831             :                 BAT *b;
     832             : 
     833           0 :                 snprintf(qbuf, BUFSIZ, "io.print(%s);", ident);
     834             : 
     835           0 :                 TRC_DEBUG(MAL_REMOTE, "Remote get: %s\n", qbuf);
     836             : 
     837             :                 /* this call should be a single transaction over the channel*/
     838           0 :                 MT_lock_set(&c->lock);
     839             : 
     840           0 :                 if ((tmp = RMTquery(&mhdl, "remote.get", c->mconn, qbuf))
     841             :                                 != MAL_SUCCEED)
     842             :                 {
     843           0 :                         TRC_ERROR(MAL_REMOTE, "Remote get: %s\n%s\n", qbuf, tmp);
     844           0 :                         MT_lock_unset(&c->lock);
     845           0 :                         var = createException(MAL, "remote.get", "%s", tmp);
     846           0 :                         freeException(tmp);
     847           0 :                         return var;
     848             :                 }
     849           0 :                 t = getBatType(rtype);
     850           0 :                 b = COLnew(0, t, 0, TRANSIENT);
     851           0 :                 if (b == NULL) {
     852           0 :                         mapi_close_handle(mhdl);
     853           0 :                         MT_lock_unset(&c->lock);
     854           0 :                         throw(MAL, "remote.get", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     855             :                 }
     856             : 
     857           0 :                 if (ATOMbasetype(t) == TYPE_str) {
     858           0 :                         while (mapi_fetch_row(mhdl)) {
     859           0 :                                 var = mapi_fetch_field(mhdl, 1);
     860           0 :                                 if (BUNappend(b, var == NULL ? str_nil : var, false) != GDK_SUCCEED) {
     861           0 :                                         BBPreclaim(b);
     862           0 :                                         mapi_close_handle(mhdl);
     863           0 :                                         MT_lock_unset(&c->lock);
     864           0 :                                         throw(MAL, "remote.get", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     865             :                                 }
     866             :                         }
     867             :                 } else
     868           0 :                         while (mapi_fetch_row(mhdl)) {
     869           0 :                                 var = mapi_fetch_field(mhdl, 1);
     870           0 :                                 if (var == NULL)
     871             :                                         var = "nil";
     872           0 :                                 s = 0;
     873           0 :                                 r = NULL;
     874           0 :                                 if (ATOMfromstr(t, &r, &s, var, true) < 0 ||
     875           0 :                                         BUNappend(b, r, false) != GDK_SUCCEED) {
     876           0 :                                         BBPreclaim(b);
     877           0 :                                         GDKfree(r);
     878           0 :                                         mapi_close_handle(mhdl);
     879           0 :                                         MT_lock_unset(&c->lock);
     880           0 :                                         throw(MAL, "remote.get", GDK_EXCEPTION);
     881             :                                 }
     882           0 :                                 GDKfree(r);
     883             :                         }
     884             : 
     885           0 :                 v->val.bval = b->batCacheid;
     886           0 :                 v->vtype = TYPE_bat;
     887           0 :                 BBPkeepref(b->batCacheid);
     888             : 
     889           0 :                 mapi_close_handle(mhdl);
     890           0 :                 MT_lock_unset(&c->lock);
     891        2398 :         } else if (isaBatType(rtype)) {
     892             :                 /* binary compatible remote host, transfer BAT in binary form */
     893             :                 stream *sout;
     894             :                 stream *sin;
     895             :                 char buf[256];
     896        1199 :                 BAT *b = NULL;
     897             : 
     898             :                 /* this call should be a single transaction over the channel*/
     899        1199 :                 MT_lock_set(&c->lock);
     900             : 
     901             :                 /* bypass Mapi from this point to efficiently write all data to
     902             :                  * the server */
     903        1199 :                 sout = mapi_get_to(c->mconn);
     904        1199 :                 sin = mapi_get_from(c->mconn);
     905        1199 :                 if (sin == NULL || sout == NULL) {
     906           0 :                         MT_lock_unset(&c->lock);
     907           0 :                         throw(MAL, "remote.get", "Connection lost");
     908             :                 }
     909             : 
     910             :                 /* call our remote helper to do this more efficiently */
     911        1199 :                 mnstr_printf(sout, "remote.batbincopy(%s);\n", ident);
     912        1199 :                 mnstr_flush(sout, MNSTR_FLUSH_DATA);
     913             : 
     914        1199 :                 if ((tmp = RMTreadbatheader(sin, buf)) != MAL_SUCCEED) {
     915           0 :                         MT_lock_unset(&c->lock);
     916           0 :                         return tmp;
     917             :                 }
     918             : 
     919        1199 :                 if ((tmp = RMTinternalcopyfrom(&b, buf, sin, true)) != MAL_SUCCEED) {
     920           0 :                         MT_lock_unset(&c->lock);
     921           0 :                         return(tmp);
     922             :                 }
     923             : 
     924        1199 :                 v->val.bval = b->batCacheid;
     925        1199 :                 v->vtype = TYPE_bat;
     926        1199 :                 BBPkeepref(b->batCacheid);
     927             : 
     928        1199 :                 MT_lock_unset(&c->lock);
     929             :         } else {
     930           0 :                 ptr p = NULL;
     931             :                 str val;
     932           0 :                 size_t len = 0;
     933             : 
     934           0 :                 snprintf(qbuf, BUFSIZ, "io.print(%s);", ident);
     935           0 :                 TRC_DEBUG(MAL_REMOTE, "Remote get: %s - %s\n", c->name, qbuf);
     936           0 :                 if ((tmp=RMTquery(&mhdl, "remote.get", c->mconn, qbuf)) != MAL_SUCCEED)
     937             :                 {
     938           0 :                         return tmp;
     939             :                 }
     940           0 :                 (void) mapi_fetch_row(mhdl); /* should succeed */
     941           0 :                 val = mapi_fetch_field(mhdl, 0);
     942             : 
     943           0 :                 if (ATOMbasetype(rtype) == TYPE_str) {
     944           0 :                         if (!VALinit(v, rtype, val == NULL ? str_nil : val)) {
     945           0 :                                 mapi_close_handle(mhdl);
     946           0 :                                 throw(MAL, "remote.get", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     947             :                         }
     948           0 :                 } else if (ATOMfromstr(rtype, &p, &len, val == NULL ? "nil" : val, true) < 0) {
     949             :                         char *msg;
     950           0 :                         msg = createException(MAL, "remote.get",
     951             :                                                                   "unable to parse value: %s",
     952             :                                                                   val == NULL ? "nil" : val);
     953           0 :                         mapi_close_handle(mhdl);
     954           0 :                         GDKfree(p);
     955           0 :                         return msg;
     956             :                 } else {
     957           0 :                         VALset(v, rtype, p);
     958           0 :                         if (ATOMextern(rtype) == 0)
     959           0 :                                 GDKfree(p);
     960             :                 }
     961             : 
     962           0 :                 mapi_close_handle(mhdl);
     963             :         }
     964             : 
     965             :         return(MAL_SUCCEED);
     966             : }
     967             : 
     968             : /**
     969             :  * stores the given object on the remote host.  The identifier of the
     970             :  * object on the remote host is returned for later use.
     971             :  */
     972        1922 : static str RMTput(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) {
     973             :         str conn, tmp;
     974             :         char ident[BUFSIZ];
     975             :         connection c;
     976             :         ValPtr v;
     977             :         int type;
     978             :         ptr value;
     979        1922 :         MapiHdl mhdl = NULL;
     980             : 
     981             :         (void)cntxt;
     982             : 
     983        1922 :         conn = *getArgReference_str(stk, pci, 1);
     984        1922 :         if (conn == NULL || strcmp(conn, (str)str_nil) == 0)
     985           0 :                 throw(ILLARG, "remote.put", ILLEGAL_ARGUMENT ": connection name is NULL or nil");
     986             : 
     987             :         /* lookup conn */
     988        1922 :         rethrow("remote.put", tmp, RMTfindconn(&c, conn));
     989             : 
     990             :         /* put the thing */
     991        1922 :         type = getArgType(mb, pci, 2);
     992        1922 :         value = getArgReference(stk, pci, 2);
     993             : 
     994             :         /* this call should be a single transaction over the channel*/
     995        1922 :         MT_lock_set(&c->lock);
     996             : 
     997             :         /* get a free, typed identifier for the remote host */
     998        1922 :         tmp = RMTgetId(ident, mb, pci, 2);
     999        1921 :         if (tmp != MAL_SUCCEED) {
    1000           0 :                 MT_lock_unset(&c->lock);
    1001           0 :                 return tmp;
    1002             :         }
    1003             : 
    1004             :         /* depending on the input object generate actions to store the
    1005             :          * object remotely*/
    1006        1921 :         if (type == TYPE_any || type == TYPE_bat || isAnyExpression(type)) {
    1007             :                 char *tpe, *msg;
    1008           0 :                 MT_lock_unset(&c->lock);
    1009           0 :                 tpe = getTypeName(type);
    1010           0 :                 msg = createException(MAL, "remote.put", "unsupported type: %s", tpe);
    1011           0 :                 GDKfree(tpe);
    1012           0 :                 return msg;
    1013        3121 :         } else if (isaBatType(type) && !is_bat_nil(*(bat*) value)) {
    1014             :                 BATiter bi;
    1015             :                 /* naive approach using bat.new() and bat.insert() calls */
    1016             :                 char *tail;
    1017             :                 bat bid;
    1018             :                 BAT *b = NULL;
    1019             :                 BUN p, q;
    1020             :                 str tailv;
    1021             :                 stream *sout;
    1022             : 
    1023        1200 :                 tail = getTypeIdentifier(getBatType(type));
    1024        1200 :                 if (tail == NULL) {
    1025           0 :                         MT_lock_unset(&c->lock);
    1026           0 :                         throw(MAL, "remote.put", SQLSTATE(HY013) MAL_MALLOC_FAIL);
    1027             :                 }
    1028             : 
    1029        1200 :                 bid = *(bat *)value;
    1030        1200 :                 if (bid != 0) {
    1031        1200 :                         if ((b = BATdescriptor(bid)) == NULL){
    1032           0 :                                 MT_lock_unset(&c->lock);
    1033           0 :                                 GDKfree(tail);
    1034           0 :                                 throw(MAL, "remote.put", SQLSTATE(HY002) RUNTIME_OBJECT_MISSING);
    1035             :                         }
    1036             :                 }
    1037             : 
    1038             :                 /* bypass Mapi from this point to efficiently write all data to
    1039             :                  * the server */
    1040        1200 :                 sout = mapi_get_to(c->mconn);
    1041             : 
    1042             :                 /* call our remote helper to do this more efficiently */
    1043        1200 :                 mnstr_printf(sout,
    1044             :                                 "%s := remote.batload(nil:%s, " BUNFMT ");\n",
    1045             :                                 ident, tail, (bid == 0 ? 0 : BATcount(b)));
    1046        1200 :                 mnstr_flush(sout, MNSTR_FLUSH_DATA);
    1047        1200 :                 GDKfree(tail);
    1048             : 
    1049             :                 /* b can be NULL if bid == 0 (only type given, ugh) */
    1050        1200 :                 if (b) {
    1051        1200 :                         int tpe = getBatType(type), trivial = tpe < TYPE_date || ATOMbasetype(tpe) == TYPE_str;
    1052        1200 :                         const void *nil = ATOMnilptr(tpe);
    1053        1200 :                         int (*atomcmp)(const void *, const void *) = ATOMcompare(tpe);
    1054             : 
    1055        1200 :                         bi = bat_iterator(b);
    1056        1200 :                         BATloop(b, p, q) {
    1057           0 :                                 const void *v = BUNtail(bi, p);
    1058           0 :                                 tailv = ATOMformat(tpe, v);
    1059           0 :                                 if (tailv == NULL) {
    1060           0 :                                         bat_iterator_end(&bi);
    1061           0 :                                         BBPunfix(b->batCacheid);
    1062           0 :                                         MT_lock_unset(&c->lock);
    1063           0 :                                         throw(MAL, "remote.put", GDK_EXCEPTION);
    1064             :                                 }
    1065           0 :                                 if (trivial || atomcmp(v, nil) == 0)
    1066           0 :                                         mnstr_printf(sout, "%s\n", tailv);
    1067             :                                 else
    1068           0 :                                         mnstr_printf(sout, "\"%s\"\n", tailv);
    1069           0 :                                 GDKfree(tailv);
    1070             :                         }
    1071        1200 :                         bat_iterator_end(&bi);
    1072        1200 :                         BBPunfix(b->batCacheid);
    1073             :                 }
    1074             : 
    1075             :                 /* write the empty line the server is waiting for, handles
    1076             :                  * all errors at the same time, if any */
    1077        1200 :                 if ((tmp = RMTquery(&mhdl, "remote.put", c->mconn, ""))
    1078             :                                 != MAL_SUCCEED)
    1079             :                 {
    1080           0 :                         MT_lock_unset(&c->lock);
    1081           0 :                         return tmp;
    1082             :                 }
    1083        1200 :                 mapi_close_handle(mhdl);
    1084         721 :         } else if (isaBatType(type) && is_bat_nil(*(bat*) value)) {
    1085             :                 stream *sout;
    1086           0 :                 str typename = getTypeName(type);
    1087           0 :                 sout = mapi_get_to(c->mconn);
    1088           0 :                 mnstr_printf(sout,
    1089             :                                 "%s := nil:%s;\n", ident, typename);
    1090           0 :                 mnstr_flush(sout, MNSTR_FLUSH_DATA);
    1091           0 :                 GDKfree(typename);
    1092             :         } else {
    1093             :                 size_t l;
    1094             :                 str val;
    1095             :                 char *tpe;
    1096             :                 char qbuf[512], *nbuf = qbuf;
    1097         721 :                 const void *nil = ATOMnilptr(type), *p = value;
    1098         721 :                 int (*atomcmp)(const void *, const void *) = ATOMcompare(type);
    1099             : 
    1100         721 :                 if (ATOMextern(type))
    1101         506 :                         p = *(str *)value;
    1102             : 
    1103         721 :                 val = ATOMformat(type, p);
    1104         722 :                 if (val == NULL) {
    1105           0 :                         MT_lock_unset(&c->lock);
    1106           0 :                         throw(MAL, "remote.put", GDK_EXCEPTION);
    1107             :                 }
    1108         722 :                 tpe = getTypeIdentifier(type);
    1109         722 :                 if (tpe == NULL) {
    1110           0 :                         MT_lock_unset(&c->lock);
    1111           0 :                         GDKfree(val);
    1112           0 :                         throw(MAL, "remote.put", SQLSTATE(HY013) MAL_MALLOC_FAIL);
    1113             :                 }
    1114         722 :                 l = strlen(val) + strlen(tpe) + strlen(ident) + 10;
    1115         722 :                 if (l > (ssize_t) sizeof(qbuf) && (nbuf = GDKmalloc(l)) == NULL) {
    1116           0 :                         MT_lock_unset(&c->lock);
    1117           0 :                         GDKfree(val);
    1118           0 :                         GDKfree(tpe);
    1119           0 :                         throw(MAL, "remote.put", SQLSTATE(HY013) MAL_MALLOC_FAIL);
    1120             :                 }
    1121         722 :                 if (type < TYPE_date || ATOMbasetype(type) == TYPE_str || atomcmp(p, nil) == 0)
    1122         722 :                         snprintf(nbuf, l, "%s := %s:%s;\n", ident, val, tpe);
    1123             :                 else
    1124           0 :                         snprintf(nbuf, l, "%s := \"%s\":%s;\n", ident, val, tpe);
    1125         722 :                 GDKfree(tpe);
    1126         722 :                 GDKfree(val);
    1127         722 :                 TRC_DEBUG(MAL_REMOTE, "Remote put: %s - %s\n", c->name, nbuf);
    1128         722 :                 tmp = RMTquery(&mhdl, "remote.put", c->mconn, nbuf);
    1129         722 :                 if (nbuf != qbuf)
    1130          16 :                         GDKfree(nbuf);
    1131         722 :                 if (tmp != MAL_SUCCEED) {
    1132           0 :                         MT_lock_unset(&c->lock);
    1133           0 :                         return tmp;
    1134             :                 }
    1135         722 :                 mapi_close_handle(mhdl);
    1136             :         }
    1137        1922 :         MT_lock_unset(&c->lock);
    1138             : 
    1139             :         /* return the identifier */
    1140        1922 :         v = &stk->stk[pci->argv[0]];
    1141        1922 :         if (VALinit(v, TYPE_str, ident) == NULL)
    1142           0 :                 throw(MAL, "remote.put", SQLSTATE(HY013) MAL_MALLOC_FAIL);
    1143             :         return(MAL_SUCCEED);
    1144             : }
    1145             : 
    1146             : /**
    1147             :  * stores the given <mod>.<fcn> on the remote host.
    1148             :  * An error is returned if the function is already known at the remote site.
    1149             :  * The implementation is based on serialisation of the block into a string
    1150             :  * followed by remote parsing.
    1151             :  */
    1152           0 : static str RMTregisterInternal(Client cntxt, char** fcn_id, const char *conn, const char *mod, const char *fcn)
    1153             : {
    1154             :         str tmp, qry, msg;
    1155             :         connection c;
    1156             :         char buf[BUFSIZ];
    1157           0 :         MapiHdl mhdl = NULL;
    1158             :         Symbol sym;
    1159             : 
    1160           0 :         if (strNil(conn))
    1161           0 :                 throw(ILLARG, "remote.register", ILLEGAL_ARGUMENT ": connection name is NULL or nil");
    1162             : 
    1163             :         /* find local definition */
    1164           0 :         sym = findSymbol(cntxt->usermodule, putName(mod), putName(fcn));
    1165           0 :         if (sym == NULL)
    1166           0 :                 throw(MAL, "remote.register", ILLEGAL_ARGUMENT ": no such function: %s.%s", mod, fcn);
    1167             : 
    1168             :         /* lookup conn */
    1169           0 :         rethrow("remote.register", tmp, RMTfindconn(&c, conn));
    1170             : 
    1171             :         /* this call should be a single transaction over the channel*/
    1172           0 :         MT_lock_set(&c->lock);
    1173             : 
    1174             :         /* get a free, typed identifier for the remote host */
    1175             :         char ident[BUFSIZ];
    1176           0 :         tmp = RMTgetId(ident, sym->def, getInstrPtr(sym->def, 0), 0);
    1177           0 :         if (tmp != MAL_SUCCEED) {
    1178           0 :                 MT_lock_unset(&c->lock);
    1179           0 :                 return tmp;
    1180             :         }
    1181             : 
    1182             :         /* check remote definition */
    1183           0 :         snprintf(buf, BUFSIZ, "b:bit:=inspect.getExistence(\"%s\",\"%s\");\nio.print(b);", mod, ident);
    1184           0 :         TRC_DEBUG(MAL_REMOTE, "Remote register: %s - %s\n", c->name, buf);
    1185           0 :         if ((msg = RMTquery(&mhdl, "remote.register", c->mconn, buf)) != MAL_SUCCEED){
    1186           0 :                 MT_lock_unset(&c->lock);
    1187           0 :                 return msg;
    1188             :         }
    1189             : 
    1190             :         char* result;
    1191           0 :         if ( mapi_get_field_count(mhdl) && mapi_fetch_row(mhdl) && (result = mapi_fetch_field(mhdl, 0))) {
    1192           0 :                 if (strcmp(result, "false") != 0)
    1193           0 :                         msg = createException(MAL, "remote.register",
    1194             :                                         "function already exists at the remote site: %s.%s",
    1195             :                                         mod, fcn);
    1196             :         }
    1197             :         else
    1198           0 :                 msg = createException(MAL, "remote.register", OPERATION_FAILED);
    1199             : 
    1200           0 :         mapi_close_handle(mhdl);
    1201             : 
    1202           0 :         if (msg) {
    1203           0 :                 MT_lock_unset(&c->lock);
    1204           0 :                 return msg;
    1205             :         }
    1206             : 
    1207           0 :         *fcn_id = GDKstrdup(ident);
    1208           0 :         if (*fcn_id == NULL) {
    1209           0 :                 MT_lock_unset(&c->lock);
    1210           0 :                 throw(MAL, "Remote register", MAL_MALLOC_FAIL);
    1211             :         }
    1212             : 
    1213             :         Symbol prg;
    1214           0 :         if ((prg = newFunction(putName(mod), putName(*fcn_id), FUNCTIONsymbol)) == NULL) {
    1215           0 :                 MT_lock_unset(&c->lock);
    1216           0 :                 throw(MAL, "Remote register", MAL_MALLOC_FAIL);
    1217             :         }
    1218             : 
    1219             :         // We only need the Symbol not the inner program stub. So we clear it.
    1220           0 :         freeMalBlk(prg->def);
    1221           0 :         prg->def = NULL;
    1222             : 
    1223           0 :         if ((prg->def = copyMalBlk(sym->def)) == NULL) {
    1224           0 :                 MT_lock_unset(&c->lock);
    1225           0 :                 freeSymbol(prg);
    1226           0 :                 throw(MAL, "Remote register", MAL_MALLOC_FAIL);
    1227             :         }
    1228           0 :         setFunctionId(getInstrPtr(prg->def, 0), putName(*fcn_id));
    1229             : 
    1230             :         /* make sure the program is error free */
    1231           0 :         msg = chkProgram(cntxt->usermodule, prg->def);
    1232           0 :         if ( msg != MAL_SUCCEED || prg->def->errors) {
    1233           0 :                 MT_lock_unset(&c->lock);
    1234           0 :                 if (msg)
    1235             :                         return msg;
    1236           0 :                 throw(MAL, "remote.register",
    1237             :                                 "function '%s.%s' contains syntax or type errors",
    1238             :                                 mod, *fcn_id);
    1239             :         }
    1240             : 
    1241           0 :         qry = mal2str(prg->def, 0, prg->def->stop);
    1242           0 :         TRC_DEBUG(MAL_REMOTE, "Remote register: %s - %s\n", c->name, qry);
    1243           0 :         msg = RMTquery(&mhdl, "remote.register", c->mconn, qry);
    1244           0 :         GDKfree(qry);
    1245           0 :         if (mhdl)
    1246           0 :                 mapi_close_handle(mhdl);
    1247             : 
    1248           0 :         freeSymbol(prg);
    1249             : 
    1250           0 :         MT_lock_unset(&c->lock);
    1251           0 :         return msg;
    1252             : }
    1253             : 
    1254           0 : static str RMTregister(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) {
    1255           0 :         char **fcn_id = getArgReference_str(stk, pci, 0);
    1256           0 :         const char *conn = *getArgReference_str(stk, pci, 1);
    1257           0 :         const char *mod = *getArgReference_str(stk, pci, 2);
    1258           0 :         const char *fcn = *getArgReference_str(stk, pci, 3);
    1259             :         (void)mb;
    1260           0 :         return RMTregisterInternal(cntxt, fcn_id, conn, mod, fcn);
    1261             : }
    1262             : 
    1263             : /**
    1264             :  * exec executes the function with its given arguments on the remote
    1265             :  * host, returning the function's return value.  exec is purposely kept
    1266             :  * very spartan.  All arguments need to be handles to previously put()
    1267             :  * values.  It calls the function with the given arguments at the remote
    1268             :  * site, and returns the handle which stores the return value of the
    1269             :  * remotely executed function.  This return value can be retrieved using
    1270             :  * a get call. It handles multiple return arguments.
    1271             :  */
    1272         281 : static str RMTexec(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) {
    1273             :         str conn, mod, func, tmp;
    1274             :         int i;
    1275             :         size_t len, buflen;
    1276         281 :         connection c= NULL;
    1277             :         char *qbuf;
    1278             :         MapiHdl mhdl;
    1279             : 
    1280             :         (void)cntxt;
    1281             :         (void)mb;
    1282             :         bool no_return_arguments = 0;
    1283             : 
    1284             :         columnar_result_callback* rcb = NULL;
    1285         281 :         ValRecord *v = &(stk)->stk[(pci)->argv[4]];
    1286         281 :         if (pci->retc == 1 && (pci->argc >= 4) && (v->vtype == TYPE_ptr) ) {
    1287           1 :                 rcb = (columnar_result_callback*) v->val.pval;
    1288             :         }
    1289             : 
    1290        1697 :         for (i = 0; i < pci->retc; i++) {
    1291        1416 :                 if (stk->stk[pci->argv[i]].vtype == TYPE_str) {
    1292        1415 :                         tmp = *getArgReference_str(stk, pci, i);
    1293        1415 :                         if (tmp == NULL || strcmp(tmp, (str)str_nil) == 0)
    1294           0 :                                 throw(ILLARG, "remote.exec", ILLEGAL_ARGUMENT
    1295             :                                                 ": return value %d is NULL or nil", i);
    1296             :                 }
    1297             :                 else
    1298             :                         no_return_arguments = 1;
    1299             :         }
    1300             : 
    1301         281 :         conn = *getArgReference_str(stk, pci, i++);
    1302         281 :         if (conn == NULL || strcmp(conn, (str)str_nil) == 0)
    1303           0 :                 throw(ILLARG, "remote.exec", ILLEGAL_ARGUMENT ": connection name is NULL or nil");
    1304         281 :         mod = *getArgReference_str(stk, pci, i++);
    1305         281 :         if (mod == NULL || strcmp(mod, (str)str_nil) == 0)
    1306           0 :                 throw(ILLARG, "remote.exec", ILLEGAL_ARGUMENT ": module name is NULL or nil");
    1307         281 :         func = *getArgReference_str(stk, pci, i++);
    1308         281 :         if (func == NULL || strcmp(func, (str)str_nil) == 0)
    1309           0 :                 throw(ILLARG, "remote.exec", ILLEGAL_ARGUMENT ": function name is NULL or nil");
    1310             : 
    1311             :         /* lookup conn */
    1312         281 :         rethrow("remote.exec", tmp, RMTfindconn(&c, conn));
    1313             : 
    1314             :         /* this call should be a single transaction over the channel*/
    1315         281 :         MT_lock_set(&c->lock);
    1316             : 
    1317         281 :         if(!no_return_arguments && pci->argc - pci->retc < 3) { /* conn, mod, func, ... */
    1318           0 :                 MT_lock_unset(&c->lock);
    1319           0 :                 throw(MAL, "remote.exec", ILLEGAL_ARGUMENT  " MAL instruction misses arguments");
    1320             :         }
    1321             : 
    1322             :         len = 0;
    1323             :         /* count how big a buffer we need */
    1324         281 :         len += 2 * (pci->retc > 1);
    1325         281 :         if (!no_return_arguments)
    1326        1695 :                 for (i = 0; i < pci->retc; i++) {
    1327        1415 :                         len += 2 * (i > 0);
    1328        1415 :                         len += strlen(*getArgReference_str(stk, pci, i));
    1329             :                 }
    1330             : 
    1331         281 :         const int arg_index = rcb ? 4 : 3;
    1332             : 
    1333         281 :         len += strlen(mod) + strlen(func) + 6;
    1334         788 :         for (i = arg_index; i < pci->argc - pci->retc; i++) {
    1335         507 :                 len += 2 * (i > arg_index);
    1336         507 :                 len += strlen(*getArgReference_str(stk, pci, pci->retc + i));
    1337             :         }
    1338             :         len += 2;
    1339         281 :         buflen = len + 1;
    1340         281 :         if ((qbuf = GDKmalloc(buflen)) == NULL) {
    1341           0 :                 MT_lock_unset(&c->lock);
    1342           0 :                 throw(MAL, "remote.exec", SQLSTATE(HY013) MAL_MALLOC_FAIL);
    1343             :         }
    1344             : 
    1345             :         len = 0;
    1346             : 
    1347         281 :         if (pci->retc > 1)
    1348          38 :                 qbuf[len++] = '(';
    1349         281 :         if (!no_return_arguments)
    1350        1695 :                 for (i = 0; i < pci->retc; i++)
    1351        1695 :                         len += snprintf(&qbuf[len], buflen - len, "%s%s",
    1352        1415 :                                         (i > 0 ? ", " : ""), *getArgReference_str(stk, pci, i));
    1353             : 
    1354         281 :         if (pci->retc > 1)
    1355          38 :                 qbuf[len++] = ')';
    1356             : 
    1357             :         /* build the function invocation string in qbuf */
    1358         281 :         if (!no_return_arguments && pci->retc > 0) {
    1359         280 :                 len += snprintf(&qbuf[len], buflen - len, " := %s.%s(", mod, func);
    1360             :         }
    1361             :         else {
    1362           1 :                 len += snprintf(&qbuf[len], buflen - len, " %s.%s(", mod, func);
    1363             :         }
    1364             : 
    1365             :         /* handle the arguments to the function */
    1366             : 
    1367             :         /* put the arguments one by one, and dynamically build the
    1368             :          * invocation string */
    1369         788 :         for (i = arg_index; i < pci->argc - pci->retc; i++) {
    1370         648 :                 len += snprintf(&qbuf[len], buflen - len, "%s%s",
    1371             :                                 (i > arg_index ? ", " : ""),
    1372         507 :                                 *(getArgReference_str(stk, pci, pci->retc + i)));
    1373             :         }
    1374             : 
    1375             :         /* finish end execute the invocation string */
    1376         281 :         len += snprintf(&qbuf[len], buflen - len, ");");
    1377         281 :         TRC_DEBUG(MAL_REMOTE, "Remote exec: %s - %s\n", c->name, qbuf);
    1378         281 :         tmp = RMTquery(&mhdl, "remote.exec", c->mconn, qbuf);
    1379         281 :         GDKfree(qbuf);
    1380             : 
    1381             :         /* Temporary hack:
    1382             :          * use a callback to immediately handle columnar results before hdl is destroyed. */
    1383         281 :         if (tmp == MAL_SUCCEED && rcb && mhdl && (mapi_get_querytype(mhdl) == Q_TABLE || mapi_get_querytype(mhdl) == Q_PREPARE)) {
    1384           1 :                 int fields = mapi_get_field_count(mhdl);
    1385           1 :                 columnar_result* results = GDKzalloc(sizeof(columnar_result) * fields);
    1386             : 
    1387           1 :                 if (!results) {
    1388           0 :                         tmp = createException(MAL, "remote.exec", SQLSTATE(HY013) MAL_MALLOC_FAIL);
    1389             :                 } else {
    1390             :                         int i = 0;
    1391           1 :                         char buf[256] = {0};
    1392           1 :                         stream *sin = mapi_get_from(c->mconn);
    1393             : 
    1394           3 :                         for (; i < fields; i++) {
    1395           2 :                                 BAT *b = NULL;
    1396             : 
    1397           4 :                                 if ((tmp = RMTreadbatheader(sin, buf)) != MAL_SUCCEED ||
    1398           2 :                                         (tmp = RMTinternalcopyfrom(&b, buf, sin, i == fields - 1)) != MAL_SUCCEED) {
    1399             :                                         break;
    1400             :                                 }
    1401             : 
    1402           2 :                                 results[i].id = b->batCacheid;
    1403           2 :                                 results[i].colname = mapi_get_name(mhdl, i);
    1404           2 :                                 results[i].tpename = mapi_get_type(mhdl, i);
    1405           2 :                                 results[i].digits = mapi_get_digits(mhdl, i);
    1406           2 :                                 results[i].scale = mapi_get_scale(mhdl, i);
    1407             :                         }
    1408             : 
    1409           1 :                         if (tmp != MAL_SUCCEED) {
    1410           0 :                                 for (int j = 0; j < i; j++)
    1411           0 :                                         BBPunfix(results[j].id);
    1412             :                         } else {
    1413           3 :                                 for (int j = 0; j < i; j++)
    1414           2 :                                         BBPkeepref(results[j].id);
    1415           1 :                                 assert(rcb->context);
    1416           1 :                                 tmp = rcb->call(rcb->context, mapi_get_table(mhdl, 0), results, fields);
    1417           3 :                                 for (int j = 0; j < i; j++)
    1418           2 :                                         BBPrelease(results[j].id);
    1419             :                         }
    1420           1 :                         GDKfree(results);
    1421             :                 }
    1422             :         }
    1423             : 
    1424         281 :         if (rcb) {
    1425           1 :                 GDKfree(rcb->context);
    1426           1 :                 GDKfree(rcb);
    1427             :         }
    1428             : 
    1429         281 :         if (mhdl)
    1430         270 :                 mapi_close_handle(mhdl);
    1431         281 :         MT_lock_unset(&c->lock);
    1432         281 :         return tmp;
    1433             : }
    1434             : 
    1435             : /**
    1436             :  * batload is a helper function to make transferring a BAT with RMTput
    1437             :  * more efficient.  It works by creating a BAT, and loading it with the
    1438             :  * data as comma separated values from the input stream, until an empty
    1439             :  * line is read.  The given size argument is taken as a hint only, and
    1440             :  * is not enforced to match the number of rows read.
    1441             :  */
    1442        1200 : static str RMTbatload(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) {
    1443             :         ValPtr v;
    1444             :         int t;
    1445             :         int size;
    1446             :         ptr  r;
    1447             :         size_t s;
    1448             :         BAT *b;
    1449             :         size_t len;
    1450             :         char *var;
    1451             :         str msg = MAL_SUCCEED;
    1452        1200 :         bstream *fdin = cntxt->fdin;
    1453             : 
    1454        1200 :         v = &stk->stk[pci->argv[0]]; /* return */
    1455        1200 :         t = getArgType(mb, pci, 1); /* tail type */
    1456        1200 :         size = *getArgReference_int(stk, pci, 2); /* size */
    1457             : 
    1458        1200 :         b = COLnew(0, t, size, TRANSIENT);
    1459        1200 :         if (b == NULL)
    1460           0 :                 throw(MAL, "remote.load", SQLSTATE(HY013) MAL_MALLOC_FAIL);
    1461             : 
    1462             :         /* grab the input stream and start reading */
    1463        1200 :         fdin->eof = false;
    1464        1200 :         len = fdin->pos;
    1465        1200 :         while (len < fdin->len || bstream_next(fdin) > 0) {
    1466             :                 /* newline hunting (how spartan) */
    1467        1200 :                 for (len = fdin->pos; len < fdin->len && fdin->buf[len] != '\n'; len++)
    1468             :                         ;
    1469             :                 /* unterminated line, request more */
    1470        1200 :                 if (fdin->buf[len] != '\n')
    1471           0 :                         continue;
    1472             :                 /* empty line, end of input */
    1473        1200 :                 if (fdin->pos == len) {
    1474        1200 :                         if (isa_block_stream(fdin->s)) {
    1475        1200 :                                 ssize_t n = bstream_next(fdin);
    1476        1200 :                                 if( n )
    1477           0 :                                         msg = createException(MAL, "remote.load", SQLSTATE(HY013) "Unexpected return from remote");
    1478             :                         }
    1479             :                         break;
    1480             :                 }
    1481           0 :                 fdin->buf[len] = '\0'; /* kill \n */
    1482           0 :                 var = &fdin->buf[fdin->pos];
    1483             :                 /* skip over this line */
    1484           0 :                 fdin->pos = ++len;
    1485             : 
    1486           0 :                 s = 0;
    1487           0 :                 r = NULL;
    1488           0 :                 if (ATOMfromstr(t, &r, &s, var, true) < 0 ||
    1489           0 :                         BUNappend(b, r, false) != GDK_SUCCEED) {
    1490           0 :                         BBPreclaim(b);
    1491           0 :                         GDKfree(r);
    1492           0 :                         throw(MAL, "remote.get", GDK_EXCEPTION);
    1493             :                 }
    1494           0 :                 GDKfree(r);
    1495             :         }
    1496             : 
    1497        1200 :         v->val.bval = b->batCacheid;
    1498        1200 :         v->vtype = TYPE_bat;
    1499        1200 :         BBPkeepref(b->batCacheid);
    1500             : 
    1501        1200 :         return msg;
    1502             : }
    1503             : 
    1504             : /**
    1505             :  * dump given BAT to stream
    1506             :  */
    1507        1199 : static str RMTbincopyto(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
    1508             : {
    1509        1199 :         bat bid = *getArgReference_bat(stk, pci, 1);
    1510        1199 :         BAT *b = BBPquickdesc(bid), *v = b;
    1511             :         char sendtheap = 0, sendtvheap = 0;
    1512             : 
    1513             :         (void)mb;
    1514             :         (void)stk;
    1515             :         (void)pci;
    1516             : 
    1517        1199 :         if (b == NULL)
    1518           0 :                 throw(MAL, "remote.bincopyto", RUNTIME_OBJECT_UNDEFINED);
    1519             : 
    1520        1199 :         if (BBPfix(bid) <= 0)
    1521           0 :                 throw(MAL, "remote.bincopyto", MAL_MALLOC_FAIL);
    1522             : 
    1523        1199 :         sendtheap = b->ttype != TYPE_void;
    1524        1199 :         sendtvheap = sendtheap && b->tvarsized;
    1525        1199 :         if (isVIEW(b) && sendtvheap && VIEWvtparent(b) && BATcount(b) < BATcount(BBP_cache(VIEWvtparent(b)))) {
    1526          18 :                 if ((b = BATdescriptor(bid)) == NULL) {
    1527           0 :                         BBPunfix(bid);
    1528           0 :                         throw(MAL, "remote.bincopyto", RUNTIME_OBJECT_MISSING);
    1529             :                 }
    1530          18 :                 v = COLcopy(b, b->ttype, true, TRANSIENT);
    1531          18 :                 BBPunfix(b->batCacheid);
    1532          18 :                 if (v == NULL) {
    1533           0 :                         BBPunfix(bid);
    1534           0 :                         throw(MAL, "remote.bincopyto", GDK_EXCEPTION);
    1535             :                 }
    1536             :         }
    1537             : 
    1538        1245 :         mnstr_printf(cntxt->fdout, /*JSON*/"{"
    1539             :                         "\"version\":1,"
    1540             :                         "\"ttype\":%d,"
    1541             :                         "\"hseqbase\":" OIDFMT ","
    1542             :                         "\"tseqbase\":" OIDFMT ","
    1543             :                         "\"tsorted\":%d,"
    1544             :                         "\"trevsorted\":%d,"
    1545             :                         "\"tkey\":%d,"
    1546             :                         "\"tnonil\":%d,"
    1547             :                         "\"tdense\":%d,"
    1548             :                         "\"size\":" BUNFMT ","
    1549             :                         "\"tailsize\":%zu,"
    1550             :                         "\"theapsize\":%zu"
    1551             :                         "}\n",
    1552        1199 :                         v->ttype,
    1553             :                         v->hseqbase, v->tseqbase,
    1554        1199 :                         v->tsorted, v->trevsorted,
    1555        1199 :                         v->tkey,
    1556        1199 :                         v->tnonil,
    1557        1199 :                         BATtdense(v),
    1558             :                         v->batCount,
    1559        1195 :                         sendtheap ? (size_t)v->batCount << v->tshift : 0,
    1560          46 :                         sendtvheap && v->batCount > 0 ? v->tvheap->free : 0
    1561             :                         );
    1562             : 
    1563        1199 :         if (sendtheap && v->batCount > 0) {
    1564        1127 :                 BATiter vi = bat_iterator(v);
    1565        1127 :                 mnstr_write(cntxt->fdout, /* tail */
    1566        1127 :                                         vi.base, vi.count * vi.width, 1);
    1567        1127 :                 if (sendtvheap)
    1568          34 :                         mnstr_write(cntxt->fdout, /* theap */
    1569          34 :                                                 vi.vh->base, vi.vhfree, 1);
    1570        1127 :                 bat_iterator_end(&vi);
    1571             :         }
    1572             :         /* flush is done by the calling environment (MAL) */
    1573             : 
    1574        1199 :         if (b != v)
    1575          18 :                 BBPreclaim(v);
    1576             : 
    1577        1199 :         BBPunfix(bid);
    1578             : 
    1579        1199 :         return(MAL_SUCCEED);
    1580             : }
    1581             : 
    1582             : /**
    1583             :  * read from the input stream and give the BAT handle back to the caller
    1584             :  */
    1585           0 : static str RMTbincopyfrom(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) {
    1586           0 :         BAT *b = NULL;
    1587             :         ValPtr v;
    1588             :         str err;
    1589             : 
    1590             :         (void)mb;
    1591             : 
    1592             :         /* We receive a normal line, which contains the JSON header, the
    1593             :          * rest is binary data directly on the stream.  We get the first
    1594             :          * line from the buffered stream we have here, and pass it on
    1595             :          * together with the raw stream we have. */
    1596           0 :         cntxt->fdin->eof = false; /* in case it was before */
    1597           0 :         if (bstream_next(cntxt->fdin) <= 0)
    1598           0 :                 throw(MAL, "remote.bincopyfrom", "expected JSON header");
    1599             : 
    1600           0 :         cntxt->fdin->buf[cntxt->fdin->len] = '\0';
    1601           0 :         err = RMTinternalcopyfrom(&b,
    1602           0 :                         &cntxt->fdin->buf[cntxt->fdin->pos], cntxt->fdin->s, true);
    1603             :         /* skip the JSON line */
    1604           0 :         cntxt->fdin->pos = ++cntxt->fdin->len;
    1605           0 :         if (err != MAL_SUCCEED)
    1606             :                 return(err);
    1607             : 
    1608           0 :         v = &stk->stk[pci->argv[0]];
    1609           0 :         v->val.bval = b->batCacheid;
    1610           0 :         v->vtype = TYPE_bat;
    1611           0 :         BBPkeepref(b->batCacheid);
    1612             : 
    1613           0 :         return(MAL_SUCCEED);
    1614             : }
    1615             : 
    1616             : /**
    1617             :  * bintype identifies the system on its binary profile.  This is mainly
    1618             :  * used to determine if BATs can be sent binary across.
    1619             :  */
    1620          76 : static str RMTbintype(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) {
    1621             :         int type = 0;
    1622             :         (void)mb;
    1623             :         (void)stk;
    1624             :         (void)pci;
    1625             : 
    1626             : #ifdef WORDS_BIGENDIAN
    1627             :         type |= RMTT_B_ENDIAN;
    1628             : #else
    1629             :         type |= RMTT_L_ENDIAN;
    1630             : #endif
    1631             : #if SIZEOF_SIZE_T == SIZEOF_LNG
    1632             :         type |= RMTT_64_BITS;
    1633             : #else
    1634             :         type |= RMTT_32_BITS;
    1635             : #endif
    1636             : #if SIZEOF_OID == SIZEOF_LNG
    1637             :         type |= RMTT_64_OIDS;
    1638             : #else
    1639             :         type |= RMTT_32_OIDS;
    1640             : #endif
    1641             : 
    1642          76 :         mnstr_printf(cntxt->fdout, "[ %d ]\n", type);
    1643             : 
    1644          76 :         return(MAL_SUCCEED);
    1645             : }
    1646             : 
    1647             : /**
    1648             :  * Returns whether the underlying connection is still connected or not.
    1649             :  * Best effort implementation on top of mapi using a ping.
    1650             :  */
    1651             : static str
    1652           0 : RMTisalive(int *ret, str *conn)
    1653             : {
    1654             :         str tmp;
    1655             :         connection c;
    1656             : 
    1657           0 :         if (*conn == NULL || strcmp(*conn, (str)str_nil) == 0)
    1658           0 :                 throw(ILLARG, "remote.get", ILLEGAL_ARGUMENT ": connection name is NULL or nil");
    1659             : 
    1660             :         /* lookup conn, set c if valid */
    1661           0 :         rethrow("remote.get", tmp, RMTfindconn(&c, *conn));
    1662             : 
    1663           0 :         *ret = 0;
    1664           0 :         if (mapi_is_connected(c->mconn) && mapi_ping(c->mconn) == MOK)
    1665           0 :                 *ret = 1;
    1666             : 
    1667             :         return MAL_SUCCEED;
    1668             : }
    1669             : 
    1670             : // This is basically a no op
    1671             : static str
    1672         130 : RMTregisterSupervisor(int *ret, str *sup_uuid, str *query_uuid) {
    1673             :         (void)sup_uuid;
    1674             :         (void)query_uuid;
    1675             : 
    1676         130 :         *ret = 0;
    1677         130 :         return MAL_SUCCEED;
    1678             : }
    1679             : 
    1680             : /* this is needed in remote plans */
    1681             : static str
    1682          11 : RMTassert(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
    1683             : {
    1684          11 :         bool flg = (bool) *getArgReference_bit(stk, pci, 1);
    1685          11 :         str msg = *getArgReference_str(stk, pci, 2);
    1686             : 
    1687             :         (void) cntxt;
    1688             :         (void) mb;
    1689          11 :         if (flg) {
    1690          11 :                 if (strlen(msg) > 6 &&
    1691          11 :                     msg[5] == '!' &&
    1692           0 :                     (isdigit((unsigned char) msg[0]) ||
    1693           0 :                      isupper((unsigned char) msg[0])) &&
    1694           0 :                     (isdigit((unsigned char) msg[1]) ||
    1695           0 :                      isupper((unsigned char) msg[1])) &&
    1696           0 :                     (isdigit((unsigned char) msg[2]) ||
    1697           0 :                      isupper((unsigned char) msg[2])) &&
    1698           0 :                     (isdigit((unsigned char) msg[3]) ||
    1699           0 :                      isupper((unsigned char) msg[3])) &&
    1700           0 :                     (isdigit((unsigned char) msg[4]) ||
    1701             :                      isupper((unsigned char) msg[4])))
    1702           0 :                         throw(REMOTE, "assert", "%s", msg); /* includes state */
    1703          11 :                 throw(REMOTE, "assert", SQLSTATE(M0M29) "%s", msg);
    1704             :         }
    1705             :         return MAL_SUCCEED;
    1706             : }
    1707             : 
    1708             : #include "mel.h"
    1709             : mel_func remote_init_funcs[] = {
    1710             :  command("remote", "prelude", RMTprelude, false, "initialise the remote module", args(1,1, arg("",void))),
    1711             :  command("remote", "epilogue", RMTepilogue, false, "release the resources held by the remote module", args(1,1, arg("",void))),
    1712             :  pattern("remote", "assert", RMTassert, false, "Generate an exception when b==true", args(1,3, arg("",void),arg("b",bit),arg("msg",str))),
    1713             :  command("remote", "resolve", RMTresolve, false, "resolve a pattern against Merovingian and return the URIs", args(1,2, batarg("",str),arg("pattern",str))),
    1714             :  pattern("remote", "connect", RMTconnect, false, "returns a newly created connection for uri, using user name and password", args(1,5, arg("",str),arg("uri",str),arg("user",str),arg("passwd",str),arg("scen",str))),
    1715             :  command("remote", "connect", RMTconnectScen, false, "returns a newly created connection for uri, using user name, password and scenario", args(1,6, arg("",str),arg("uri",str),arg("user",str),arg("passwd",str),arg("scen",str),arg("columnar",bit))),
    1716             :  pattern("remote", "connect", RMTconnectTable, false, "return a newly created connection for a table. username and password should be in the vault", args(1,3, arg("",str),arg("table",str),arg("schen",str))),
    1717             :  command("remote", "disconnect", RMTdisconnect, false, "disconnects the connection pointed to by handle (received from a call to connect()", args(1,2, arg("",void),arg("conn",str))),
    1718             :  pattern("remote", "get", RMTget, false, "retrieves a copy of remote object ident", args(1,3, argany("",0),arg("conn",str),arg("ident",str))),
    1719             :  pattern("remote", "put", RMTput, false, "copies object to the remote site and returns its identifier", args(1,3, arg("",str),arg("conn",str),argany("object",0))),
    1720             :  pattern("remote", "register", RMTregister, false, "register <mod>.<fcn> at the remote site", args(1,4, arg("",str),arg("conn",str),arg("mod",str),arg("fcn",str))),
    1721             :  pattern("remote", "exec", RMTexec, false, "remotely executes <mod>.<func> and returns the handle to its result", args(1,4, arg("",str),arg("conn",str),arg("mod",str),arg("func",str))),
    1722             :  pattern("remote", "exec", RMTexec, false, "remotely executes <mod>.<func> and returns the handle to its result", args(1,4, vararg("",str),arg("conn",str),arg("mod",str),arg("func",str))),
    1723             :  pattern("remote", "exec", RMTexec, false, "remotely executes <mod>.<func> using the argument list of remote objects and returns the handle to its result", args(1,5, arg("",str),arg("conn",str),arg("mod",str),arg("func",str),vararg("",str))),
    1724             :  pattern("remote", "exec", RMTexec, false, "remotely executes <mod>.<func> using the argument list of remote objects and returns the handle to its result", args(1,5, vararg("",str),arg("conn",str),arg("mod",str),arg("func",str),vararg("",str))),
    1725             :  pattern("remote", "exec", RMTexec, false, "remotely executes <mod>.<func> using the argument list of remote objects and applying function pointer rcb as callback to handle any results.", args(0,5, arg("conn",str),arg("mod",str),arg("func",str),arg("rcb",ptr), vararg("",str))),
    1726             :  pattern("remote", "exec", RMTexec, false, "remotely executes <mod>.<func> using the argument list of remote objects and ignoring results.", args(0,4, arg("conn",str),arg("mod",str),arg("func",str), vararg("",str))),
    1727             :  command("remote", "isalive", RMTisalive, false, "check if conn is still valid and connected", args(1,2, arg("",int),arg("conn",str))),
    1728             :  pattern("remote", "batload", RMTbatload, false, "create a BAT of the given type and size, and load values from the input stream", args(1,3, batargany("",1),argany("tt",1),arg("size",int))),
    1729             :  pattern("remote", "batbincopy", RMTbincopyto, false, "dump BAT b in binary form to the stream", args(1,2, arg("",void),batargany("b",0))),
    1730             :  pattern("remote", "batbincopy", RMTbincopyfrom, false, "store the binary BAT data in the BBP and return as BAT", args(1,1, batargany("",0))),
    1731             :  pattern("remote", "bintype", RMTbintype, false, "print the binary type of this mserver5", args(1,1, arg("",void))),
    1732             :  command("remote", "register_supervisor", RMTregisterSupervisor, false, "Register the supervisor uuid at a remote site", args(1,3, arg("",int),arg("sup_uuid",str),arg("query_uuid",str))),
    1733             :  { .imp=NULL }
    1734             : };
    1735             : #include "mal_import.h"
    1736             : #ifdef _MSC_VER
    1737             : #undef read
    1738             : #pragma section(".CRT$XCU",read)
    1739             : #endif
    1740         257 : LIB_STARTUP_FUNC(init_remote_mal)
    1741         257 : { mal_module("remote", NULL, remote_init_funcs); }
    1742             : 
    1743             : #endif

Generated by: LCOV version 1.14