LCOV - code coverage report
Current view: top level - monetdb5/modules/mal - remote.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 365 649 56.2 %
Date: 2021-01-13 20:07:21 Functions: 16 22 72.7 %

          Line data    Source code
       1             : /*
       2             :  * This Source Code Form is subject to the terms of the Mozilla Public
       3             :  * License, v. 2.0.  If a copy of the MPL was not distributed with this
       4             :  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
       5             :  *
       6             :  * Copyright 1997 - July 2008 CWI, August 2008 - 2021 MonetDB B.V.
       7             :  */
       8             : 
       9             : /*
      10             :  * (c) Fabian Groffen, Martin Kersten
      11             :  * Remote querying functionality
      12             :  * Communication with other mservers at the MAL level is a delicate task.
      13             :  * However, it is indispensable for any distributed functionality.  This
      14             :  * module provides an abstract way to store and retrieve objects on a
      15             :  * remote site.  Additionally, functions on a remote site can be executed
      16             :  * using objects available in the remote session context.  This yields in
      17             :  * four primitive functions that form the basis for distribution methods:
      18             :  * get, put, register and exec.
      19             :  *
      20             :  * The get method simply retrieves a copy of a remote object.  Objects can
      21             :  * be simple values, strings or Column.  The same holds for the put method,
      22             :  * but the other way around.  A local object can be stored on a remote
      23             :  * site.  Upon a successful store, the put method returns the remote
      24             :  * identifier for the stored object.  With this identifier the object can
      25             :  * be addressed, e.g. using the get method to retrieve the object that was
      26             :  * stored using put.
      27             :  *
      28             :  * The get and put methods are symmetric.  Performing a get on an
      29             :  * identifier that was returned by put, results in an object with the same
      30             :  * value and type as the one that was put.  The result of such an operation is
      31             :  * equivalent to making an (expensive) copy of the original object.
      32             :  *
      33             :  * The register function takes a local MAL function and makes it known at a
      34             :  * remote site. It ensures that it does not overload an already known
      35             :  * operation remotely, which could create a semantic conflict.
      36             :  * Deregistering a function is forbidden, because it would allow for taking
      37             :  * over the remote site completely.
      38             :  * C-implemented functions, such as io.print() cannot be remotely stored.
      39             :  * It would require even more complicated (byte) code shipping and remote
      40             :  * compilation to make it work.
      41             :  *
      42             :  * The choice to let exec only execute functions avoids problems
      43             :  * to decide what should be returned to the caller.  With a function it is
      44             :  * clear and simple to return that what the function signature prescribes.
      45             :  * Any side effect (e.g. io.print calls) may cause havoc in the system,
      46             :  * but are currently ignored.
      47             :  *
      48             :  * This leads to the final contract of this module.  The methods should be
      49             :  * used correctly, by obeying their contract.  Failing to do so will result
      50             :  * in errors and possibly undefined behaviour.
      51             :  *
      52             :  * The resolve() function can be used to query Merovingian.  It returns one
      53             :  * or more databases discovered in its vicinity matching the given pattern.
      54             :  *
      55             :  */
      56             : #include "monetdb_config.h"
      57             : 
      58             : /*
      59             :  * Technically, these methods need to be serialised per connection,
      60             :  * hence a scheduler that interleaves e.g. multiple get calls, simply
      61             :  * violates this constraint.  If parallelism to the same site is
      62             :  * desired, a user could create a second connection.  This is not always
      63             :  * easy to generate at the proper place, e.g. overloading the dataflow
      64             :  * optimizer to patch connections structures is not acceptable.
      65             :  *
      66             :  * Instead, we maintain a simple lock with each connection, which can be
      67             :  * used to issue a safe, but blocking get/put/exec/register request.
      68             :  */
      69             : #ifdef HAVE_MAPI
      70             : 
      71             : #include "mal.h"
      72             : #include "mal_exception.h"
      73             : #include "mal_interpreter.h"
      74             : #include "mal_function.h" /* for printFunction */
      75             : #include "mal_listing.h"
      76             : #include "mal_instruction.h" /* for getmodule/func macros */
      77             : #include "mal_authorize.h"
      78             : #include "mapi.h"
      79             : #include "mutils.h"
      80             : 
      81             : #define RMTT_L_ENDIAN   (0<<1)
      82             : #define RMTT_B_ENDIAN   (1<<1)
      83             : #define RMTT_32_BITS    (0<<2)
      84             : #define RMTT_64_BITS    (1<<2)
      85             : #define RMTT_32_OIDS    (0<<3)
      86             : #define RMTT_64_OIDS    (1<<3)
      87             : 
      88             : typedef struct _connection {
      89             :         MT_Lock            lock;      /* lock to avoid interference */
      90             :         str                name;      /* the handle for this connection */
      91             :         Mapi               mconn;     /* the Mapi handle for the connection */
      92             :         unsigned char      type;      /* binary profile of the connection target */
      93             :         size_t             nextid;    /* id counter */
      94             :         struct _connection *next;     /* the next connection in the list */
      95             : } *connection;
      96             : 
      97             : #ifndef WIN32
      98             : #include <sys/socket.h> /* socket */
      99             : #include <sys/un.h> /* sockaddr_un */
     100             : #endif
     101             : #include <unistd.h> /* gethostname */
     102             : 
     103             : static connection conns = NULL;
     104             : static unsigned char localtype = 0177;
     105             : 
     106             : static inline str RMTquery(MapiHdl *ret, const char *func, Mapi conn, const char *query);
     107             : static inline str RMTinternalcopyfrom(BAT **ret, char *hdr, stream *in);
     108             : 
     109             : /**
     110             :  * Returns a BAT with valid redirects for the given pattern.  If
     111             :  * merovingian is not running, this function throws an error.
     112             :  */
     113           0 : static str RMTresolve(bat *ret, str *pat) {
     114             : #ifdef NATIVE_WIN32
     115             :         (void) ret;
     116             :         (void) pat;
     117             :         throw(MAL, "remote.resolve", "merovingian is not available on "
     118             :                         "your platform, sorry"); /* please upgrade to Linux, etc. */
     119             : #else
     120             :         BAT *list;
     121             :         const char *mero_uri;
     122             :         char *p;
     123             :         unsigned int port;
     124             :         char **redirs;
     125             :         char **or;
     126             : 
     127           0 :         if (pat == NULL || *pat == NULL || strcmp(*pat, (str)str_nil) == 0)
     128           0 :                 throw(ILLARG, "remote.resolve",
     129             :                                 ILLEGAL_ARGUMENT ": pattern is NULL or nil");
     130             : 
     131           0 :         mero_uri = GDKgetenv("merovingian_uri");
     132           0 :         if (mero_uri == NULL)
     133           0 :                 throw(MAL, "remote.resolve", "this function needs the mserver "
     134             :                                 "have been started by merovingian");
     135             : 
     136           0 :         list = COLnew(0, TYPE_str, 0, TRANSIENT);
     137           0 :         if (list == NULL)
     138           0 :                 throw(MAL, "remote.resolve", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     139             : 
     140             :         /* extract port from mero_uri, let mapi figure out the rest */
     141           0 :         mero_uri+=strlen("mapi:monetdb://");
     142           0 :         if (*mero_uri == '[') {
     143           0 :                 if ((mero_uri = strchr(mero_uri, ']')) == NULL)
     144           0 :                         throw(MAL, "remote.resolve", "illegal IPv6 address on merovingian_uri: %s",
     145             :                                   GDKgetenv("merovingian_uri"));
     146             :         }
     147           0 :         if ((p = strchr(mero_uri, ':')) == NULL)
     148           0 :                 throw(MAL, "remote.resolve", "illegal merovingian_uri setting: %s",
     149             :                                 GDKgetenv("merovingian_uri"));
     150           0 :         port = (unsigned int)atoi(p + 1);
     151             : 
     152           0 :         or = redirs = mapi_resolve(NULL, port, *pat);
     153             : 
     154           0 :         if (redirs == NULL)
     155           0 :                 throw(MAL, "remote.resolve", "unknown failure when resolving pattern");
     156             : 
     157           0 :         while (*redirs != NULL) {
     158           0 :                 if (BUNappend(list, (ptr)*redirs, false) != GDK_SUCCEED) {
     159           0 :                         BBPreclaim(list);
     160             :                         do
     161           0 :                                 free(*redirs);
     162           0 :                         while (*++redirs);
     163           0 :                         free(or);
     164           0 :                         throw(MAL, "remote.resolve", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     165             :                 }
     166           0 :                 free(*redirs);
     167           0 :                 redirs++;
     168             :         }
     169           0 :         free(or);
     170             : 
     171           0 :         BBPkeepref(*ret = list->batCacheid);
     172           0 :         return(MAL_SUCCEED);
     173             : #endif
     174             : }
     175             : 
     176             : 
     177             : /* for unique connection identifiers */
     178             : static size_t connection_id = 0;
     179             : 
     180             : /**
     181             :  * Returns a connection to the given uri.  It always returns a newly
     182             :  * created connection.
     183             :  */
     184         142 : static str RMTconnectScen(
     185             :                 str *ret,
     186             :                 str *ouri,
     187             :                 str *user,
     188             :                 str *passwd,
     189             :                 str *scen)
     190             : {
     191             :         connection c;
     192             :         char conn[BUFSIZ];
     193             :         char *s;
     194             :         Mapi m;
     195             :         MapiHdl hdl;
     196             :         str msg;
     197             : 
     198             :         /* just make sure the return isn't garbage */
     199         142 :         *ret = 0;
     200             : 
     201         142 :         if (ouri == NULL || *ouri == NULL || strcmp(*ouri, (str)str_nil) == 0)
     202           0 :                 throw(ILLARG, "remote.connect", ILLEGAL_ARGUMENT ": database uri "
     203             :                                 "is NULL or nil");
     204         142 :         if (user == NULL || *user == NULL || strcmp(*user, (str)str_nil) == 0)
     205           0 :                 throw(ILLARG, "remote.connect", ILLEGAL_ARGUMENT ": username is "
     206             :                                 "NULL or nil");
     207         142 :         if (passwd == NULL || *passwd == NULL || strcmp(*passwd, (str)str_nil) == 0)
     208           0 :                 throw(ILLARG, "remote.connect", ILLEGAL_ARGUMENT ": password is "
     209             :                                 "NULL or nil");
     210         142 :         if (scen == NULL || *scen == NULL || strcmp(*scen, (str)str_nil) == 0)
     211           0 :                 throw(ILLARG, "remote.connect", ILLEGAL_ARGUMENT ": scenario is "
     212             :                                 "NULL or nil");
     213         142 :         if (strcmp(*scen, "mal") != 0 && strcmp(*scen, "msql") != 0)
     214           0 :                 throw(ILLARG, "remote.connect", ILLEGAL_ARGUMENT ": scenario '%s' "
     215             :                                 "is not supported", *scen);
     216             : 
     217         142 :         m = mapi_mapiuri(*ouri, *user, *passwd, *scen);
     218         142 :         if (mapi_error(m)) {
     219           0 :                 msg = createException(MAL, "remote.connect",
     220             :                                                           "unable to connect to '%s': %s",
     221             :                                                           *ouri, mapi_error_str(m));
     222           0 :                 mapi_destroy(m);
     223           0 :                 return msg;
     224             :         }
     225             : 
     226         142 :         MT_lock_set(&mal_remoteLock);
     227             : 
     228             :         /* generate an unique connection name, they are only known
     229             :          * within one mserver, id is primary key, the rest is super key */
     230         142 :         snprintf(conn, BUFSIZ, "%s_%s_%zu", mapi_get_dbname(m), *user, connection_id++);
     231             :         /* make sure we can construct MAL identifiers using conn */
     232        2739 :         for (s = conn; *s != '\0'; s++) {
     233        2597 :                 if (!isalnum((unsigned char)*s)) {
     234         401 :                         *s = '_';
     235             :                 }
     236             :         }
     237             : 
     238         142 :         if (mapi_reconnect(m) != MOK) {
     239           3 :                 MT_lock_unset(&mal_remoteLock);
     240           3 :                 msg = createException(IO, "remote.connect",
     241             :                                                           "unable to connect to '%s': %s",
     242             :                                                           *ouri, mapi_error_str(m));
     243           3 :                 mapi_destroy(m);
     244           3 :                 return msg;
     245             :         }
     246             : 
     247             :         /* connection established, add to list */
     248         139 :         c = GDKzalloc(sizeof(struct _connection));
     249         139 :         if ( c == NULL || (c->name = GDKstrdup(conn)) == NULL) {
     250           0 :                 GDKfree(c);
     251           0 :                 mapi_destroy(m);
     252           0 :                 MT_lock_unset(&mal_remoteLock);
     253           0 :                 throw(MAL,"remote.connect", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     254             :         }
     255         139 :         c->mconn = m;
     256         139 :         c->nextid = 0;
     257         139 :         MT_lock_init(&c->lock, c->name);
     258         139 :         c->next = conns;
     259         139 :         conns = c;
     260             : 
     261         139 :         msg = RMTquery(&hdl, "remote.connect", m, "remote.bintype();");
     262         139 :         if (msg) {
     263           0 :                 MT_lock_unset(&mal_remoteLock);
     264           0 :                 return msg;
     265             :         }
     266         278 :         if (hdl != NULL && mapi_fetch_row(hdl)) {
     267         139 :                 char *val = mapi_fetch_field(hdl, 0);
     268         139 :                 c->type = (unsigned char)atoi(val);
     269         139 :                 mapi_close_handle(hdl);
     270             :         } else {
     271           0 :                 c->type = 0;
     272             :         }
     273             : 
     274             : #ifdef _DEBUG_MAPI_
     275             :         mapi_trace(c->mconn, true);
     276             : #endif
     277             : 
     278         139 :         MT_lock_unset(&mal_remoteLock);
     279             : 
     280         139 :         *ret = GDKstrdup(conn);
     281         139 :         if(*ret == NULL)
     282           0 :                 throw(MAL,"remote.connect", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     283             :         return(MAL_SUCCEED);
     284             : }
     285             : 
     286           0 : static str RMTconnect(
     287             :                 str *ret,
     288             :                 str *uri,
     289             :                 str *user,
     290             :                 str *passwd)
     291             : {
     292           0 :         str scen = "mal";
     293           0 :         return RMTconnectScen(ret, uri, user, passwd, &scen);
     294             : }
     295             : 
     296             : static str
     297         142 : RMTconnectTable(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
     298             : {
     299             :         char *local_table;
     300             :         char *remoteuser;
     301             :         char *passwd;
     302             :         char *uri;
     303             :         char *tmp;
     304             :         char *ret;
     305             :         str scen;
     306             :         str msg;
     307             :         ValPtr v;
     308             : 
     309             :         (void)mb;
     310             :         (void)cntxt;
     311             : 
     312         142 :         local_table = *getArgReference_str(stk, pci, 1);
     313         142 :         scen = *getArgReference_str(stk, pci, 2);
     314         142 :         if (local_table == NULL || strcmp(local_table, (str)str_nil) == 0) {
     315           0 :                 throw(ILLARG, "remote.connect", ILLEGAL_ARGUMENT ": local table is NULL or nil");
     316             :         }
     317             : 
     318         142 :         rethrow("remote.connect", tmp, AUTHgetRemoteTableCredentials(local_table, &uri, &remoteuser, &passwd));
     319             : 
     320             :         /* The password we just got is hashed. Add the byte \1 in front to
     321             :          * signal this fact to the mapi. */
     322         142 :         size_t pwlen = strlen(passwd);
     323         142 :         char *pwhash = (char*)GDKmalloc(pwlen + 2);
     324         142 :         if (pwhash == NULL) {
     325           0 :                 GDKfree(remoteuser);
     326           0 :                 GDKfree(passwd);
     327           0 :                 throw(MAL, "remote.connect", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     328             :         }
     329         142 :         snprintf(pwhash, pwlen + 2, "\1%s", passwd);
     330             : 
     331         142 :         msg = RMTconnectScen(&ret, &uri, &remoteuser, &pwhash, &scen);
     332             : 
     333         142 :         GDKfree(passwd);
     334         142 :         GDKfree(pwhash);
     335             : 
     336         142 :         if (msg == MAL_SUCCEED) {
     337         139 :                 v = &stk->stk[pci->argv[0]];
     338         139 :                 v->vtype = TYPE_str;
     339         139 :                 if((v->val.sval = GDKstrdup(ret)) == NULL) {
     340           0 :                         GDKfree(ret);
     341           0 :                         throw(MAL, "remote.connect", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     342             :                 }
     343             :         }
     344             : 
     345         142 :         GDKfree(ret);
     346         142 :         return msg;
     347             : }
     348             : 
     349             : 
     350             : /**
     351             :  * Disconnects a connection.  The connection needs not to exist in the
     352             :  * system, it only needs to exist for the client (i.e. it was once
     353             :  * created).
     354             :  */
     355         133 : static str RMTdisconnect(void *ret, str *conn) {
     356             :         connection c, t;
     357             : 
     358         133 :         if (conn == NULL || *conn == NULL || strcmp(*conn, (str)str_nil) == 0)
     359           0 :                 throw(ILLARG, "remote.disconnect", ILLEGAL_ARGUMENT ": connection "
     360             :                                 "is NULL or nil");
     361             : 
     362             : 
     363             :         (void) ret;
     364             : 
     365             :         /* we need a lock because the same user can be handled by multiple
     366             :          * threads */
     367         133 :         MT_lock_set(&mal_remoteLock);
     368         133 :         c = conns;
     369             :         t = NULL; /* parent */
     370             :         /* walk through the list */
     371         277 :         while (c != NULL) {
     372         277 :                 if (strcmp(c->name, *conn) == 0) {
     373             :                         /* ok, delete it... */
     374         133 :                         if (t == NULL) {
     375          71 :                                 conns = c->next;
     376             :                         } else {
     377          62 :                                 t->next = c->next;
     378             :                         }
     379             : 
     380         133 :                         MT_lock_set(&c->lock); /* shared connection */
     381         133 :                         mapi_disconnect(c->mconn);
     382         133 :                         mapi_destroy(c->mconn);
     383         133 :                         MT_lock_unset(&c->lock);
     384         133 :                         MT_lock_destroy(&c->lock);
     385         133 :                         GDKfree(c->name);
     386         133 :                         GDKfree(c);
     387         133 :                         MT_lock_unset(&mal_remoteLock);
     388         133 :                         return MAL_SUCCEED;
     389             :                 }
     390             :                 t = c;
     391         144 :                 c = c->next;
     392             :         }
     393             : 
     394           0 :         MT_lock_unset(&mal_remoteLock);
     395           0 :         throw(MAL, "remote.disconnect", "no such connection: %s", *conn);
     396             : }
     397             : 
     398             : /**
     399             :  * Helper function to return a connection matching a given string, or an
     400             :  * error if it does not exist.  Since this function is internal, it
     401             :  * doesn't check the argument conn, as it should have been checked
     402             :  * already.
     403             :  * NOTE: this function acquires the mal_remoteLock before accessing conns
     404             :  */
     405             : static inline str
     406        3478 : RMTfindconn(connection *ret, const char *conn) {
     407             :         connection c;
     408             : 
     409             :         /* just make sure the return isn't garbage */
     410        3478 :         *ret = NULL;
     411        3478 :         MT_lock_set(&mal_remoteLock); /* protect c */
     412        3478 :         c = conns;
     413        8307 :         while (c != NULL) {
     414        8307 :                 if (strcmp(c->name, conn) == 0) {
     415        3478 :                         *ret = c;
     416        3478 :                         MT_lock_unset(&mal_remoteLock);
     417        3478 :                         return(MAL_SUCCEED);
     418             :                 }
     419        4829 :                 c = c->next;
     420             :         }
     421           0 :         MT_lock_unset(&mal_remoteLock);
     422           0 :         throw(MAL, "remote.<findconn>", "no such connection: %s", conn);
     423             : }
     424             : 
     425             : /**
     426             :  * Little helper function that returns a GDKmalloced string containing a
     427             :  * valid identifier that is supposed to be unique in the connection's
     428             :  * remote context.  The generated string depends on the module and
     429             :  * function the caller is in. But also the runtime context is important.
     430             :  * The format is rmt<id>_<retvar>_<type>.  Every RMTgetId uses a fresh id,
     431             :  * to distinguish amongst different (parallel) execution context.
     432             :  * Re-use of this remote identifier should be done with care.
     433             :  * The encoding of the type allows for ease of type checking later on.
     434             :  */
     435             : static inline str
     436             : RMTgetId(char *buf, MalBlkPtr mb, InstrPtr p, int arg) {
     437             :         InstrPtr f;
     438             :         char *mod;
     439             :         char *var;
     440             :         str rt;
     441             :         static ATOMIC_TYPE idtag = ATOMIC_VAR_INIT(0);
     442             : 
     443             :         if( p->retc == 0)
     444             :                 throw(MAL, "remote.getId", ILLEGAL_ARGUMENT "MAL instruction misses retc");
     445             : 
     446             :         var = getArgName(mb, p, arg);
     447             :         f = getInstrPtr(mb, 0); /* top level function */
     448             :         mod = getModuleId(f);
     449             :         if (mod == NULL)
     450             :                 mod = "user";
     451             :         rt = getTypeIdentifier(getArgType(mb,p,arg));
     452             :         if (rt == NULL)
     453             :                 throw(MAL, "remote.put", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     454             : 
     455             :         snprintf(buf, BUFSIZ, "rmt%u_%s_%s", (unsigned) ATOMIC_ADD(&idtag, 1), var, rt);
     456             : 
     457             :         GDKfree(rt);
     458             :         return(MAL_SUCCEED);
     459             : }
     460             : 
     461             : /**
     462             :  * Helper function to execute a query over the given connection,
     463             :  * returning the result handle.  If communication fails in one way or
     464             :  * another, an error is returned.  Since this function is internal, it
     465             :  * doesn't check the input arguments func, conn and query, as they
     466             :  * should have been checked already.
     467             :  * NOTE: this function assumes a lock for conn is set
     468             :  */
     469             : static inline str
     470        2699 : RMTquery(MapiHdl *ret, const char *func, Mapi conn, const char *query) {
     471             :         MapiHdl mhdl;
     472             : 
     473        2699 :         *ret = NULL;
     474        2699 :         mhdl = mapi_query(conn, query);
     475        2700 :         if (mhdl) {
     476        2700 :                 if (mapi_result_error(mhdl) != NULL) {
     477           6 :                         str err = createException(
     478             :                                         getExceptionType(mapi_result_error(mhdl)),
     479             :                                         func,
     480             :                                         "(mapi:monetdb://%s@%s/%s) %s",
     481             :                                         mapi_get_user(conn),
     482             :                                         mapi_get_host(conn),
     483             :                                         mapi_get_dbname(conn),
     484             :                                         getExceptionMessage(mapi_result_error(mhdl)));
     485           6 :                         mapi_close_handle(mhdl);
     486           6 :                         return(err);
     487             :                 }
     488             :         } else {
     489           0 :                 if (mapi_error(conn) != MOK) {
     490           0 :                         throw(IO, func, "an error occurred on connection: %s",
     491             :                                         mapi_error_str(conn));
     492             :                 } else {
     493           0 :                         throw(MAL, func, "remote function invocation didn't return a result");
     494             :                 }
     495             :         }
     496             : 
     497        2694 :         *ret = mhdl;
     498        2694 :         return(MAL_SUCCEED);
     499             : }
     500             : 
     501         261 : static str RMTprelude(void *ret) {
     502             :         unsigned int type = 0;
     503             : 
     504             :         (void)ret;
     505             : #ifdef WORDS_BIGENDIAN
     506             :         type |= RMTT_B_ENDIAN;
     507             : #else
     508             :         type |= RMTT_L_ENDIAN;
     509             : #endif
     510             : #if SIZEOF_SIZE_T == SIZEOF_LNG
     511             :         type |= RMTT_64_BITS;
     512             : #else
     513             :         type |= RMTT_32_BITS;
     514             : #endif
     515             : #if SIZEOF_OID == SIZEOF_LNG
     516             :         type |= RMTT_64_OIDS;
     517             : #else
     518             :         type |= RMTT_32_OIDS;
     519             : #endif
     520         261 :         localtype = (unsigned char)type;
     521             : 
     522         261 :         return(MAL_SUCCEED);
     523             : }
     524             : 
     525         259 : static str RMTepilogue(void *ret) {
     526             :         connection c, t;
     527             : 
     528             :         (void)ret;
     529             : 
     530         259 :         MT_lock_set(&mal_remoteLock); /* nobody allowed here */
     531             :         /* free connections list */
     532         259 :         c = conns;
     533         265 :         while (c != NULL) {
     534             :                 t = c;
     535           6 :                 c = c->next;
     536           6 :                 MT_lock_set(&t->lock);
     537           6 :                 mapi_destroy(t->mconn);
     538           6 :                 MT_lock_unset(&t->lock);
     539           6 :                 MT_lock_destroy(&t->lock);
     540           6 :                 GDKfree(t->name);
     541           6 :                 GDKfree(t);
     542             :         }
     543             :         /* not sure, but better be safe than sorry */
     544         259 :         conns = NULL;
     545         259 :         MT_lock_unset(&mal_remoteLock);
     546             : 
     547         259 :         return(MAL_SUCCEED);
     548             : }
     549             : 
     550             : /**
     551             :  * get fetches the object referenced by ident over connection conn.
     552             :  * We are only interested in retrieving void-headed BATs, i.e. single columns.
     553             :  */
     554         917 : static str RMTget(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) {
     555             :         str conn, ident, tmp, rt;
     556             :         connection c;
     557             :         char qbuf[BUFSIZ + 1];
     558         917 :         MapiHdl mhdl = NULL;
     559             :         int rtype;
     560             :         ValPtr v;
     561             : 
     562             :         (void)mb;
     563             :         (void) cntxt;
     564             : 
     565         917 :         conn = *getArgReference_str(stk, pci, 1);
     566         917 :         if (conn == NULL || strcmp(conn, (str)str_nil) == 0)
     567           0 :                 throw(ILLARG, "remote.get", ILLEGAL_ARGUMENT ": connection name is NULL or nil");
     568         917 :         ident = *getArgReference_str(stk, pci, 2);
     569         917 :         if (ident == 0 || isIdentifier(ident) < 0)
     570           0 :                 throw(ILLARG, "remote.get", ILLEGAL_ARGUMENT ": identifier expected, got '%s'", ident);
     571             : 
     572             :         /* lookup conn, set c if valid */
     573         917 :         rethrow("remote.get", tmp, RMTfindconn(&c, conn));
     574             : 
     575         917 :         rtype = getArgType(mb, pci, 0);
     576         917 :         v = &stk->stk[pci->argv[0]];
     577             : 
     578         917 :         if (rtype == TYPE_any || isAnyExpression(rtype)) {
     579             :                 char *tpe, *msg;
     580           0 :                 tpe = getTypeName(rtype);
     581           0 :                 msg = createException(MAL, "remote.get", ILLEGAL_ARGUMENT ": unsupported any type: %s",
     582             :                                                           tpe);
     583           0 :                 GDKfree(tpe);
     584           0 :                 return msg;
     585             :         }
     586             :         /* check if the remote type complies with what we expect.
     587             :            Since the put() encodes the type as known to the remote site
     588             :            we can simple compare it here */
     589         917 :         rt = getTypeIdentifier(rtype);
     590         917 :         if (rt == NULL)
     591           0 :                 throw(MAL, "remote.get", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     592         917 :         if (strcmp(ident + strlen(ident) - strlen(rt), rt)) {
     593           0 :                 tmp = createException(MAL, "remote.get", ILLEGAL_ARGUMENT
     594             :                         ": remote object type %s does not match expected type %s",
     595             :                         rt, ident);
     596           0 :                 GDKfree(rt);
     597           0 :                 return tmp;
     598             :         }
     599         917 :         GDKfree(rt);
     600             : 
     601         917 :         if (isaBatType(rtype) && (localtype == 0177 || localtype != c->type ))
     602           0 :         {
     603             :                 int t;
     604             :                 size_t s;
     605             :                 ptr r;
     606             :                 str var;
     607             :                 BAT *b;
     608             : 
     609           0 :                 snprintf(qbuf, BUFSIZ, "io.print(%s);", ident);
     610             : 
     611           0 :                 TRC_DEBUG(MAL_REMOTE, "Remote get: %s\n", qbuf);
     612             : 
     613             :                 /* this call should be a single transaction over the channel*/
     614           0 :                 MT_lock_set(&c->lock);
     615             : 
     616           0 :                 if ((tmp = RMTquery(&mhdl, "remote.get", c->mconn, qbuf))
     617             :                                 != MAL_SUCCEED)
     618             :                 {
     619           0 :                         TRC_ERROR(MAL_REMOTE, "Remote get: %s\n%s\n", qbuf, tmp);
     620           0 :                         MT_lock_unset(&c->lock);
     621           0 :                         var = createException(MAL, "remote.get", "%s", tmp);
     622           0 :                         freeException(tmp);
     623           0 :                         return var;
     624             :                 }
     625           0 :                 t = getBatType(rtype);
     626           0 :                 b = COLnew(0, t, 0, TRANSIENT);
     627           0 :                 if (b == NULL) {
     628           0 :                         mapi_close_handle(mhdl);
     629           0 :                         MT_lock_unset(&c->lock);
     630           0 :                         throw(MAL, "remote.get", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     631             :                 }
     632             : 
     633           0 :                 if (ATOMvarsized(t)) {
     634           0 :                         while (mapi_fetch_row(mhdl)) {
     635           0 :                                 var = mapi_fetch_field(mhdl, 1);
     636           0 :                                 if (BUNappend(b, var == NULL ? str_nil : var, false) != GDK_SUCCEED) {
     637           0 :                                         BBPreclaim(b);
     638           0 :                                         mapi_close_handle(mhdl);
     639           0 :                                         MT_lock_unset(&c->lock);
     640           0 :                                         throw(MAL, "remote.get", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     641             :                                 }
     642             :                         }
     643             :                 } else
     644           0 :                         while (mapi_fetch_row(mhdl)) {
     645           0 :                                 var = mapi_fetch_field(mhdl, 1);
     646           0 :                                 if (var == NULL)
     647             :                                         var = "nil";
     648           0 :                                 s = 0;
     649           0 :                                 r = NULL;
     650           0 :                                 if (ATOMfromstr(t, &r, &s, var, true) < 0 ||
     651           0 :                                         BUNappend(b, r, false) != GDK_SUCCEED) {
     652           0 :                                         BBPreclaim(b);
     653           0 :                                         GDKfree(r);
     654           0 :                                         mapi_close_handle(mhdl);
     655           0 :                                         MT_lock_unset(&c->lock);
     656           0 :                                         throw(MAL, "remote.get", GDK_EXCEPTION);
     657             :                                 }
     658           0 :                                 GDKfree(r);
     659             :                         }
     660             : 
     661           0 :                 v->val.bval = b->batCacheid;
     662           0 :                 v->vtype = TYPE_bat;
     663           0 :                 BBPkeepref(b->batCacheid);
     664             : 
     665           0 :                 mapi_close_handle(mhdl);
     666           0 :                 MT_lock_unset(&c->lock);
     667        1834 :         } else if (isaBatType(rtype)) {
     668             :                 /* binary compatible remote host, transfer BAT in binary form */
     669             :                 stream *sout;
     670             :                 stream *sin;
     671             :                 char buf[256];
     672             :                 ssize_t sz = 0, rd;
     673         917 :                 BAT *b = NULL;
     674             : 
     675             :                 /* this call should be a single transaction over the channel*/
     676         917 :                 MT_lock_set(&c->lock);
     677             : 
     678             :                 /* bypass Mapi from this point to efficiently write all data to
     679             :                  * the server */
     680         917 :                 sout = mapi_get_to(c->mconn);
     681         917 :                 sin = mapi_get_from(c->mconn);
     682         917 :                 if (sin == NULL || sout == NULL) {
     683           0 :                         MT_lock_unset(&c->lock);
     684           0 :                         throw(MAL, "remote.get", "Connection lost");
     685             :                 }
     686             : 
     687             :                 /* call our remote helper to do this more efficiently */
     688         917 :                 mnstr_printf(sout, "remote.batbincopy(%s);\n", ident);
     689         917 :                 mnstr_flush(sout, MNSTR_FLUSH_DATA);
     690             : 
     691             :                 /* read the JSON header */
     692      154001 :                 while ((rd = mnstr_read(sin, &buf[sz], 1, 1)) == 1 && buf[sz] != '\n') {
     693      153084 :                         sz += rd;
     694             :                 }
     695         917 :                 if (rd < 0) {
     696           0 :                         MT_lock_unset(&c->lock);
     697           0 :                         throw(MAL, "remote.get", "could not read BAT JSON header");
     698             :                 }
     699         917 :                 if (buf[0] == '!') {
     700             :                         char *result;
     701           0 :                         MT_lock_unset(&c->lock);
     702           0 :                         if((result = GDKstrdup(buf)) == NULL)
     703           0 :                                 throw(MAL, "remote.get", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     704             :                         return result;
     705             :                 }
     706             : 
     707         917 :                 buf[sz] = '\0';
     708         917 :                 if ((tmp = RMTinternalcopyfrom(&b, buf, sin)) != NULL) {
     709           0 :                         MT_lock_unset(&c->lock);
     710           0 :                         return(tmp);
     711             :                 }
     712             : 
     713         917 :                 v->val.bval = b->batCacheid;
     714         917 :                 v->vtype = TYPE_bat;
     715         917 :                 BBPkeepref(b->batCacheid);
     716             : 
     717         917 :                 MT_lock_unset(&c->lock);
     718             :         } else {
     719           0 :                 ptr p = NULL;
     720             :                 str val;
     721           0 :                 size_t len = 0;
     722             : 
     723           0 :                 snprintf(qbuf, BUFSIZ, "io.print(%s);", ident);
     724           0 :                 TRC_DEBUG(MAL_REMOTE, "Remote get: %s - %s\n", c->name, qbuf);
     725           0 :                 if ((tmp=RMTquery(&mhdl, "remote.get", c->mconn, qbuf)) != MAL_SUCCEED)
     726             :                 {
     727           0 :                         return tmp;
     728             :                 }
     729           0 :                 (void) mapi_fetch_row(mhdl); /* should succeed */
     730           0 :                 val = mapi_fetch_field(mhdl, 0);
     731             : 
     732           0 :                 if (ATOMvarsized(rtype)) {
     733           0 :                         p = GDKstrdup(val == NULL ? str_nil : val);
     734           0 :                         if (p == NULL) {
     735           0 :                                 mapi_close_handle(mhdl);
     736           0 :                                 throw(MAL, "remote.get", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     737             :                         }
     738           0 :                         VALset(v, rtype, p);
     739           0 :                 } else if (ATOMfromstr(rtype, &p, &len, val == NULL ? "nil" : val, true) < 0) {
     740             :                         char *msg;
     741           0 :                         msg = createException(MAL, "remote.get",
     742             :                                                                   "unable to parse value: %s",
     743             :                                                                   val == NULL ? "nil" : val);
     744           0 :                         mapi_close_handle(mhdl);
     745           0 :                         GDKfree(p);
     746           0 :                         return msg;
     747             :                 } else {
     748           0 :                         VALset(v, rtype, p);
     749           0 :                         if (ATOMextern(rtype) == 0)
     750           0 :                                 GDKfree(p);
     751             :                 }
     752             : 
     753           0 :                 mapi_close_handle(mhdl);
     754             :         }
     755             : 
     756             :         return(MAL_SUCCEED);
     757             : }
     758             : 
     759             : /**
     760             :  * stores the given object on the remote host.  The identifier of the
     761             :  * object on the remote host is returned for later use.
     762             :  */
     763        2154 : static str RMTput(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) {
     764             :         str conn, tmp;
     765             :         char ident[BUFSIZ];
     766             :         connection c;
     767             :         ValPtr v;
     768             :         int type;
     769             :         ptr value;
     770        2154 :         MapiHdl mhdl = NULL;
     771             : 
     772             :         (void)cntxt;
     773             : 
     774        2154 :         conn = *getArgReference_str(stk, pci, 1);
     775        2154 :         if (conn == NULL || strcmp(conn, (str)str_nil) == 0)
     776           0 :                 throw(ILLARG, "remote.put", ILLEGAL_ARGUMENT ": connection name is NULL or nil");
     777             : 
     778             :         /* lookup conn */
     779        2154 :         rethrow("remote.put", tmp, RMTfindconn(&c, conn));
     780             : 
     781             :         /* put the thing */
     782        2154 :         type = getArgType(mb, pci, 2);
     783        2154 :         value = getArgReference(stk, pci, 2);
     784             : 
     785             :         /* this call should be a single transaction over the channel*/
     786        2154 :         MT_lock_set(&c->lock);
     787             : 
     788             :         /* get a free, typed identifier for the remote host */
     789        2154 :         tmp = RMTgetId(ident, mb, pci, 2);
     790        2154 :         if (tmp != MAL_SUCCEED) {
     791           0 :                 MT_lock_unset(&c->lock);
     792           0 :                 return tmp;
     793             :         }
     794             : 
     795             :         /* depending on the input object generate actions to store the
     796             :          * object remotely*/
     797        2154 :         if (type == TYPE_any || type == TYPE_bat || isAnyExpression(type)) {
     798             :                 char *tpe, *msg;
     799           0 :                 MT_lock_unset(&c->lock);
     800           0 :                 tpe = getTypeName(type);
     801           0 :                 msg = createException(MAL, "remote.put", "unsupported type: %s", tpe);
     802           0 :                 GDKfree(tpe);
     803           0 :                 return msg;
     804        3072 :         } else if (isaBatType(type) && !is_bat_nil(*(bat*) value)) {
     805             :                 BATiter bi;
     806             :                 /* naive approach using bat.new() and bat.insert() calls */
     807             :                 char *tail;
     808             :                 bat bid;
     809             :                 BAT *b = NULL;
     810             :                 BUN p, q;
     811             :                 str tailv;
     812             :                 stream *sout;
     813             : 
     814         918 :                 tail = getTypeIdentifier(getBatType(type));
     815         918 :                 if (tail == NULL) {
     816           0 :                         MT_lock_unset(&c->lock);
     817           0 :                         throw(MAL, "remote.put", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     818             :                 }
     819             : 
     820         918 :                 bid = *(bat *)value;
     821         918 :                 if (bid != 0) {
     822         918 :                         if ((b = BATdescriptor(bid)) == NULL){
     823           0 :                                 MT_lock_unset(&c->lock);
     824           0 :                                 GDKfree(tail);
     825           0 :                                 throw(MAL, "remote.put", SQLSTATE(HY002) RUNTIME_OBJECT_MISSING);
     826             :                         }
     827             :                 }
     828             : 
     829             :                 /* bypass Mapi from this point to efficiently write all data to
     830             :                  * the server */
     831         918 :                 sout = mapi_get_to(c->mconn);
     832             : 
     833             :                 /* call our remote helper to do this more efficiently */
     834         918 :                 mnstr_printf(sout,
     835             :                                 "%s := remote.batload(:%s, " BUNFMT ");\n",
     836             :                                 ident, tail, (bid == 0 ? 0 : BATcount(b)));
     837         918 :                 mnstr_flush(sout, MNSTR_FLUSH_DATA);
     838         918 :                 GDKfree(tail);
     839             : 
     840             :                 /* b can be NULL if bid == 0 (only type given, ugh) */
     841         918 :                 if (b) {
     842         918 :                         bi = bat_iterator(b);
     843         918 :                         BATloop(b, p, q) {
     844           0 :                                 tailv = ATOMformat(getBatType(type), BUNtail(bi, p));
     845           0 :                                 if (tailv == NULL) {
     846           0 :                                         BBPunfix(b->batCacheid);
     847           0 :                                         MT_lock_unset(&c->lock);
     848           0 :                                         throw(MAL, "remote.put", GDK_EXCEPTION);
     849             :                                 }
     850           0 :                                 if (getBatType(type) >= TYPE_date && getBatType(type) != TYPE_str)
     851           0 :                                         mnstr_printf(sout, "\"%s\"\n", tailv);
     852             :                                 else
     853           0 :                                         mnstr_printf(sout, "%s\n", tailv);
     854           0 :                                 GDKfree(tailv);
     855             :                         }
     856         918 :                         BBPunfix(b->batCacheid);
     857             :                 }
     858             : 
     859             :                 /* write the empty line the server is waiting for, handles
     860             :                  * all errors at the same time, if any */
     861         918 :                 if ((tmp = RMTquery(&mhdl, "remote.put", c->mconn, ""))
     862             :                                 != MAL_SUCCEED)
     863             :                 {
     864           0 :                         MT_lock_unset(&c->lock);
     865           0 :                         return tmp;
     866             :                 }
     867         918 :                 mapi_close_handle(mhdl);
     868        1236 :         } else if (isaBatType(type) && is_bat_nil(*(bat*) value)) {
     869             :                 stream *sout;
     870           0 :                 str typename = getTypeName(type);
     871           0 :                 sout = mapi_get_to(c->mconn);
     872           0 :                 mnstr_printf(sout,
     873             :                                 "%s := nil:%s;\n", ident, typename);
     874           0 :                 mnstr_flush(sout, MNSTR_FLUSH_DATA);
     875           0 :                 GDKfree(typename);
     876             :         } else {
     877             :                 size_t l;
     878             :                 str val;
     879             :                 char *tpe;
     880             :                 char qbuf[512], *nbuf = qbuf;
     881        1236 :                 if (ATOMvarsized(type)) {
     882         963 :                         val = ATOMformat(type, *(str *)value);
     883             :                 } else {
     884         273 :                         val = ATOMformat(type, value);
     885             :                 }
     886        1236 :                 if (val == NULL) {
     887           0 :                         MT_lock_unset(&c->lock);
     888           0 :                         throw(MAL, "remote.put", GDK_EXCEPTION);
     889             :                 }
     890        1236 :                 tpe = getTypeIdentifier(type);
     891        1236 :                 if (tpe == NULL) {
     892           0 :                         MT_lock_unset(&c->lock);
     893           0 :                         GDKfree(val);
     894           0 :                         throw(MAL, "remote.put", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     895             :                 }
     896        1236 :                 l = strlen(val) + strlen(tpe) + strlen(ident) + 10;
     897        1236 :                 if (l > (ssize_t) sizeof(qbuf) && (nbuf = GDKmalloc(l)) == NULL) {
     898           0 :                         MT_lock_unset(&c->lock);
     899           0 :                         GDKfree(val);
     900           0 :                         GDKfree(tpe);
     901           0 :                         throw(MAL, "remote.put", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     902             :                 }
     903        1236 :                 if (type < TYPE_date || type == TYPE_str)
     904        1236 :                         snprintf(nbuf, l, "%s := %s:%s;\n", ident, val, tpe);
     905             :                 else
     906           0 :                         snprintf(nbuf, l, "%s := \"%s\":%s;\n", ident, val, tpe);
     907        1236 :                 GDKfree(tpe);
     908        1236 :                 GDKfree(val);
     909        1235 :                 TRC_DEBUG(MAL_REMOTE, "Remote put: %s - %s\n", c->name, nbuf);
     910        1235 :                 tmp = RMTquery(&mhdl, "remote.put", c->mconn, nbuf);
     911        1236 :                 if (nbuf != qbuf)
     912          53 :                         GDKfree(nbuf);
     913        1236 :                 if (tmp != MAL_SUCCEED) {
     914           0 :                         MT_lock_unset(&c->lock);
     915           0 :                         return tmp;
     916             :                 }
     917        1236 :                 mapi_close_handle(mhdl);
     918             :         }
     919        2154 :         MT_lock_unset(&c->lock);
     920             : 
     921             :         /* return the identifier */
     922        2154 :         v = &stk->stk[pci->argv[0]];
     923        2154 :         v->vtype = TYPE_str;
     924        2154 :         if((v->val.sval = GDKstrdup(ident)) == NULL)
     925           0 :                 throw(MAL, "remote.put", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     926             :         return(MAL_SUCCEED);
     927             : }
     928             : 
     929             : /**
     930             :  * stores the given <mod>.<fcn> on the remote host.
     931             :  * An error is returned if the function is already known at the remote site.
     932             :  * The implementation is based on serialisation of the block into a string
     933             :  * followed by remote parsing.
     934             :  */
     935           0 : static str RMTregisterInternal(Client cntxt, const char *conn, const char *mod, const char *fcn)
     936             : {
     937             :         str tmp, qry, msg;
     938             :         connection c;
     939             :         char buf[BUFSIZ];
     940           0 :         MapiHdl mhdl = NULL;
     941             :         Symbol sym;
     942             : 
     943           0 :         if (strNil(conn))
     944           0 :                 throw(ILLARG, "remote.register", ILLEGAL_ARGUMENT ": connection name is NULL or nil");
     945             : 
     946             :         /* find local definition */
     947           0 :         sym = findSymbol(cntxt->usermodule, putName(mod), putName(fcn));
     948           0 :         if (sym == NULL)
     949           0 :                 throw(MAL, "remote.register", ILLEGAL_ARGUMENT ": no such function: %s.%s", mod, fcn);
     950             : 
     951             :         /* lookup conn */
     952           0 :         rethrow("remote.register", tmp, RMTfindconn(&c, conn));
     953             : 
     954             :         /* this call should be a single transaction over the channel*/
     955           0 :         MT_lock_set(&c->lock);
     956             : 
     957             :         /* check remote definition */
     958           0 :         snprintf(buf, BUFSIZ, "inspect.getSignature(\"%s\",\"%s\");", mod, fcn);
     959           0 :         TRC_DEBUG(MAL_REMOTE, "Remote register: %s - %s\n", c->name, buf);
     960           0 :         msg = RMTquery(&mhdl, "remote.register", c->mconn, buf);
     961           0 :         if (msg == MAL_SUCCEED) {
     962           0 :                 MT_lock_unset(&c->lock);
     963           0 :                 throw(MAL, "remote.register",
     964             :                                 "function already exists at the remote site: %s.%s",
     965             :                                 mod, fcn);
     966             :         } else {
     967             :                 /* we basically hope/assume this is a "doesn't exist" error */
     968           0 :                 freeException(msg);
     969             :         }
     970           0 :         if (mhdl)
     971           0 :                 mapi_close_handle(mhdl);
     972             : 
     973             :         /* make sure the program is error free */
     974           0 :         msg = chkProgram(cntxt->usermodule, sym->def);
     975           0 :         if ( msg == MAL_SUCCEED || sym->def->errors) {
     976           0 :                 MT_lock_unset(&c->lock);
     977           0 :                 if (msg)
     978             :                         return msg;
     979           0 :                 throw(MAL, "remote.register",
     980             :                                 "function '%s.%s' contains syntax or type errors",
     981             :                                 mod, fcn);
     982             :         }
     983             : 
     984           0 :         qry = mal2str(sym->def, 0, sym->def->stop);
     985           0 :         TRC_DEBUG(MAL_REMOTE, "Remote register: %s - %s\n", c->name, qry);
     986           0 :         msg = RMTquery(&mhdl, "remote.register", c->mconn, qry);
     987           0 :         GDKfree(qry);
     988           0 :         if (mhdl)
     989           0 :                 mapi_close_handle(mhdl);
     990             : 
     991           0 :         MT_lock_unset(&c->lock);
     992           0 :         return msg;
     993             : }
     994             : 
     995           0 : static str RMTregister(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) {
     996           0 :         const char *conn = *getArgReference_str(stk, pci, 1);
     997           0 :         const char *mod = *getArgReference_str(stk, pci, 2);
     998           0 :         const char *fcn = *getArgReference_str(stk, pci, 3);
     999             :         (void)mb;
    1000           0 :         return RMTregisterInternal(cntxt, conn, mod, fcn);
    1001             : }
    1002             : 
    1003             : /**
    1004             :  * exec executes the function with its given arguments on the remote
    1005             :  * host, returning the function's return value.  exec is purposely kept
    1006             :  * very spartan.  All arguments need to be handles to previously put()
    1007             :  * values.  It calls the function with the given arguments at the remote
    1008             :  * site, and returns the handle which stores the return value of the
    1009             :  * remotely executed function.  This return value can be retrieved using
    1010             :  * a get call. It handles multiple return arguments.
    1011             :  */
    1012         407 : static str RMTexec(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) {
    1013             :         str conn, mod, func, tmp;
    1014             :         int i;
    1015             :         size_t len, buflen;
    1016         407 :         connection c= NULL;
    1017             :         char *qbuf;
    1018             :         MapiHdl mhdl;
    1019             : 
    1020             :         (void)cntxt;
    1021             :         (void)mb;
    1022             : 
    1023        1598 :         for (i = 0; i < pci->retc; i++) {
    1024        1191 :                 tmp = *getArgReference_str(stk, pci, i);
    1025        1191 :                 if (tmp == NULL || strcmp(tmp, (str)str_nil) == 0)
    1026           0 :                         throw(ILLARG, "remote.exec", ILLEGAL_ARGUMENT
    1027             :                                         ": return value %d is NULL or nil", i);
    1028             :         }
    1029         407 :         conn = *getArgReference_str(stk, pci, i++);
    1030         407 :         if (conn == NULL || strcmp(conn, (str)str_nil) == 0)
    1031           0 :                 throw(ILLARG, "remote.exec", ILLEGAL_ARGUMENT ": connection name is NULL or nil");
    1032         407 :         mod = *getArgReference_str(stk, pci, i++);
    1033         407 :         if (mod == NULL || strcmp(mod, (str)str_nil) == 0)
    1034           0 :                 throw(ILLARG, "remote.exec", ILLEGAL_ARGUMENT ": module name is NULL or nil");
    1035         407 :         func = *getArgReference_str(stk, pci, i++);
    1036         407 :         if (func == NULL || strcmp(func, (str)str_nil) == 0)
    1037           0 :                 throw(ILLARG, "remote.exec", ILLEGAL_ARGUMENT ": function name is NULL or nil");
    1038             : 
    1039             :         /* lookup conn */
    1040         407 :         rethrow("remote.exec", tmp, RMTfindconn(&c, conn));
    1041             : 
    1042             :         /* this call should be a single transaction over the channel*/
    1043         407 :         MT_lock_set(&c->lock);
    1044             : 
    1045         407 :         if(pci->argc - pci->retc < 3) /* conn, mod, func, ... */
    1046           0 :                 throw(MAL, "remote.exec", ILLEGAL_ARGUMENT  " MAL instruction misses arguments");
    1047             : 
    1048             :         len = 0;
    1049             :         /* count how big a buffer we need */
    1050         407 :         len += 2 * (pci->retc > 1);
    1051        1598 :         for (i = 0; i < pci->retc; i++) {
    1052        1191 :                 len += 2 * (i > 0);
    1053        1191 :                 len += strlen(*getArgReference_str(stk, pci, i));
    1054             :         }
    1055         407 :         len += strlen(mod) + strlen(func) + 6;
    1056        1370 :         for (i = 3; i < pci->argc - pci->retc; i++) {
    1057         963 :                 len += 2 * (i > 3);
    1058         963 :                 len += strlen(*getArgReference_str(stk, pci, pci->retc + i));
    1059             :         }
    1060             :         len += 2;
    1061         407 :         buflen = len + 1;
    1062         407 :         if ((qbuf = GDKmalloc(buflen)) == NULL)
    1063           0 :                 throw(MAL, "remote.exec", SQLSTATE(HY013) MAL_MALLOC_FAIL);
    1064             : 
    1065             :         len = 0;
    1066             : 
    1067         407 :         if (pci->retc > 1)
    1068         113 :                 qbuf[len++] = '(';
    1069        1598 :         for (i = 0; i < pci->retc; i++)
    1070        1191 :                 len += snprintf(&qbuf[len], buflen - len, "%s%s",
    1071        1191 :                                 (i > 0 ? ", " : ""), *getArgReference_str(stk, pci, i));
    1072             : 
    1073         407 :         if (pci->retc > 1)
    1074         113 :                 qbuf[len++] = ')';
    1075             : 
    1076             :         /* build the function invocation string in qbuf */
    1077         407 :         len += snprintf(&qbuf[len], buflen - len, " := %s.%s(", mod, func);
    1078             : 
    1079             :         /* handle the arguments to the function */
    1080             : 
    1081             :         /* put the arguments one by one, and dynamically build the
    1082             :          * invocation string */
    1083        1370 :         for (i = 3; i < pci->argc - pci->retc; i++) {
    1084         963 :                 len += snprintf(&qbuf[len], buflen - len, "%s%s",
    1085             :                                 (i > 3 ? ", " : ""),
    1086         963 :                                 *(getArgReference_str(stk, pci, pci->retc + i)));
    1087             :         }
    1088             : 
    1089             :         /* finish end execute the invocation string */
    1090         407 :         len += snprintf(&qbuf[len], buflen - len, ");");
    1091         407 :         TRC_DEBUG(MAL_REMOTE, "Remote exec: %s - %s\n", c->name, qbuf);
    1092         407 :         tmp = RMTquery(&mhdl, "remote.exec", c->mconn, qbuf);
    1093         407 :         GDKfree(qbuf);
    1094         407 :         if (mhdl)
    1095         401 :                 mapi_close_handle(mhdl);
    1096         407 :         MT_lock_unset(&c->lock);
    1097         407 :         return tmp;
    1098             : }
    1099             : 
    1100             : /**
    1101             :  * batload is a helper function to make transferring a BAT with RMTput
    1102             :  * more efficient.  It works by creating a BAT, and loading it with the
    1103             :  * data as comma separated values from the input stream, until an empty
    1104             :  * line is read.  The given size argument is taken as a hint only, and
    1105             :  * is not enforced to match the number of rows read.
    1106             :  */
    1107         918 : static str RMTbatload(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) {
    1108             :         ValPtr v;
    1109             :         int t;
    1110             :         int size;
    1111             :         ptr  r;
    1112             :         size_t s;
    1113             :         BAT *b;
    1114             :         size_t len;
    1115             :         char *var;
    1116             :         str msg = MAL_SUCCEED;
    1117         918 :         bstream *fdin = cntxt->fdin;
    1118             : 
    1119         918 :         v = &stk->stk[pci->argv[0]]; /* return */
    1120         918 :         t = getArgType(mb, pci, 1); /* tail type */
    1121         918 :         size = *getArgReference_int(stk, pci, 2); /* size */
    1122             : 
    1123         918 :         b = COLnew(0, t, size, TRANSIENT);
    1124         918 :         if (b == NULL)
    1125           0 :                 throw(MAL, "remote.load", SQLSTATE(HY013) MAL_MALLOC_FAIL);
    1126             : 
    1127             :         /* grab the input stream and start reading */
    1128         918 :         fdin->eof = false;
    1129         918 :         len = fdin->pos;
    1130         918 :         while (len < fdin->len || bstream_next(fdin) > 0) {
    1131             :                 /* newline hunting (how spartan) */
    1132         918 :                 for (len = fdin->pos; len < fdin->len && fdin->buf[len] != '\n'; len++)
    1133             :                         ;
    1134             :                 /* unterminated line, request more */
    1135         918 :                 if (fdin->buf[len] != '\n')
    1136           0 :                         continue;
    1137             :                 /* empty line, end of input */
    1138         918 :                 if (fdin->pos == len) {
    1139         918 :                         if (isa_block_stream(fdin->s)) {
    1140         918 :                                 ssize_t n = bstream_next(fdin);
    1141         918 :                                 if( n )
    1142           0 :                                         msg = createException(MAL, "remote.load", SQLSTATE(HY013) "Unexpected return from remote");
    1143             :                         }
    1144             :                         break;
    1145             :                 }
    1146           0 :                 fdin->buf[len] = '\0'; /* kill \n */
    1147           0 :                 var = &fdin->buf[fdin->pos];
    1148             :                 /* skip over this line */
    1149           0 :                 fdin->pos = ++len;
    1150             : 
    1151           0 :                 s = 0;
    1152           0 :                 r = NULL;
    1153           0 :                 if (ATOMfromstr(t, &r, &s, var, true) < 0 ||
    1154           0 :                         BUNappend(b, r, false) != GDK_SUCCEED) {
    1155           0 :                         BBPreclaim(b);
    1156           0 :                         GDKfree(r);
    1157           0 :                         throw(MAL, "remote.get", GDK_EXCEPTION);
    1158             :                 }
    1159           0 :                 GDKfree(r);
    1160             :         }
    1161             : 
    1162         918 :         v->val.bval = b->batCacheid;
    1163         918 :         v->vtype = TYPE_bat;
    1164         918 :         BBPkeepref(b->batCacheid);
    1165             : 
    1166         918 :         return msg;
    1167             : }
    1168             : 
    1169             : /**
    1170             :  * dump given BAT to stream
    1171             :  */
    1172         917 : static str RMTbincopyto(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
    1173             : {
    1174         917 :         bat bid = *getArgReference_bat(stk, pci, 1);
    1175         917 :         BAT *b = BBPquickdesc(bid, false), *v = b;
    1176             :         char sendtheap = 0;
    1177             : 
    1178             :         (void)mb;
    1179             :         (void)stk;
    1180             :         (void)pci;
    1181             : 
    1182         917 :         if (b == NULL)
    1183           0 :                 throw(MAL, "remote.bincopyto", RUNTIME_OBJECT_UNDEFINED);
    1184             : 
    1185         917 :         if (BBPfix(bid) <= 0)
    1186           0 :                 throw(MAL, "remote.bincopyto", MAL_MALLOC_FAIL);
    1187             : 
    1188         917 :         sendtheap = b->ttype != TYPE_void && b->tvarsized;
    1189         917 :         if (isVIEW(b) && sendtheap && VIEWvtparent(b) && BATcount(b) < BATcount(BBPquickdesc(VIEWvtparent(b), false)))
    1190          11 :                 v = COLcopy(b, b->ttype, true, TRANSIENT);
    1191             : 
    1192         917 :         mnstr_printf(cntxt->fdout, /*JSON*/"{"
    1193             :                         "\"version\":1,"
    1194             :                         "\"ttype\":%d,"
    1195             :                         "\"hseqbase\":" OIDFMT ","
    1196             :                         "\"tseqbase\":" OIDFMT ","
    1197             :                         "\"tsorted\":%d,"
    1198             :                         "\"trevsorted\":%d,"
    1199             :                         "\"tkey\":%d,"
    1200             :                         "\"tnonil\":%d,"
    1201             :                         "\"tdense\":%d,"
    1202             :                         "\"size\":" BUNFMT ","
    1203             :                         "\"tailsize\":%zu,"
    1204             :                         "\"theapsize\":%zu"
    1205             :                         "}\n",
    1206         917 :                         v->ttype,
    1207             :                         v->hseqbase, v->tseqbase,
    1208         917 :                         v->tsorted, v->trevsorted,
    1209         917 :                         v->tkey,
    1210         917 :                         v->tnonil,
    1211          26 :                         BATtdense(v),
    1212             :                         v->batCount,
    1213         917 :                         (size_t)v->batCount * Tsize(v),
    1214         162 :                         sendtheap && v->batCount > 0 ? v->tvheap->free : 0
    1215             :                         );
    1216             : 
    1217         917 :         if (v->batCount > 0) {
    1218         849 :                 mnstr_write(cntxt->fdout, /* tail */
    1219         849 :                 Tloc(v, 0), v->batCount * Tsize(v), 1);
    1220         849 :                 if (sendtheap)
    1221         150 :                         mnstr_write(cntxt->fdout, /* theap */
    1222         150 :                                         Tbase(v), v->tvheap->free, 1);
    1223             :         }
    1224             :         /* flush is done by the calling environment (MAL) */
    1225             : 
    1226         917 :         if (b != v)
    1227          11 :                 BBPreclaim(v);
    1228             : 
    1229         917 :         BBPunfix(bid);
    1230             : 
    1231         917 :         return(MAL_SUCCEED);
    1232             : }
    1233             : 
    1234             : typedef struct _binbat_v1 {
    1235             :         int Ttype;
    1236             :         oid Hseqbase;
    1237             :         oid Tseqbase;
    1238             :         bool
    1239             :                 Tsorted:1,
    1240             :                 Trevsorted:1,
    1241             :                 Tkey:1,
    1242             :                 Tnonil:1,
    1243             :                 Tdense:1;
    1244             :         BUN size;
    1245             :         size_t headsize;
    1246             :         size_t tailsize;
    1247             :         size_t theapsize;
    1248             : } binbat;
    1249             : 
    1250             : static inline str
    1251         917 : RMTinternalcopyfrom(BAT **ret, char *hdr, stream *in)
    1252             : {
    1253         917 :         binbat bb = { 0, 0, 0, false, false, false, false, false, 0, 0, 0, 0 };
    1254             :         char *nme = NULL;
    1255             :         char *val = NULL;
    1256             :         char tmp;
    1257             :         size_t len;
    1258             :         lng lv, *lvp;
    1259             : 
    1260             :         BAT *b;
    1261             : 
    1262             :         /* hdr is a JSON structure that looks like
    1263             :          * {"version":1,"ttype":6,"tseqbase":0,"tailsize":4,"theapsize":0}
    1264             :          * we take the binary data directly from the stream */
    1265             : 
    1266             :         /* could skip whitespace, but we just don't allow that */
    1267         917 :         if (*hdr++ != '{')
    1268           0 :                 throw(MAL, "remote.bincopyfrom", "illegal input, not a JSON header (got '%s')", hdr - 1);
    1269      152999 :         while (*hdr != '\0') {
    1270      152082 :                 switch (*hdr) {
    1271       22004 :                         case '"':
    1272             :                                 /* we assume only numeric values, so all strings are
    1273             :                                  * elems */
    1274       22004 :                                 if (nme != NULL) {
    1275       11000 :                                         *hdr = '\0';
    1276             :                                 } else {
    1277       11004 :                                         nme = hdr + 1;
    1278             :                                 }
    1279             :                                 break;
    1280       10998 :                         case ':':
    1281       10998 :                                 val = hdr + 1;
    1282       10998 :                                 break;
    1283       11004 :                         case ',':
    1284             :                         case '}':
    1285       11004 :                                 if (val == NULL)
    1286           0 :                                         throw(MAL, "remote.bincopyfrom",
    1287             :                                                         "illegal input, JSON value missing");
    1288       11004 :                                 *hdr = '\0';
    1289             : 
    1290       11004 :                                 lvp = &lv;
    1291       11004 :                                 len = sizeof(lv);
    1292             :                                 /* tseqbase can be 1<<31/1<<63 which causes overflow
    1293             :                                  * in lngFromStr, so we check separately */
    1294       11004 :                                 if (strcmp(val,
    1295             : #if SIZEOF_OID == 8
    1296             :                                                    "9223372036854775808"
    1297             : #else
    1298             :                                                    "2147483648"
    1299             : #endif
    1300         891 :                                                 ) == 0 &&
    1301         891 :                                         strcmp(nme, "tseqbase") == 0) {
    1302         891 :                                         bb.Tseqbase = oid_nil;
    1303             :                                 } else {
    1304             :                                         /* all values should be non-negative, so we check that
    1305             :                                          * here as well */
    1306       10113 :                                         if (lngFromStr(val, &len, &lvp, true) < 0 ||
    1307       10113 :                                                 lv < 0 /* includes lng_nil */)
    1308           0 :                                                 throw(MAL, "remote.bincopyfrom",
    1309             :                                                           "bad %s value: %s", nme, val);
    1310             : 
    1311             :                                         /* deal with nme and val */
    1312       10113 :                                         if (strcmp(nme, "version") == 0) {
    1313         917 :                                                 if (lv != 1)
    1314           0 :                                                         throw(MAL, "remote.bincopyfrom",
    1315             :                                                                   "unsupported version: %s", val);
    1316        9196 :                                         } else if (strcmp(nme, "hseqbase") == 0) {
    1317             : #if SIZEOF_OID < SIZEOF_LNG
    1318             :                                                 if (lv > GDK_oid_max)
    1319             :                                                         throw(MAL, "remote.bincopyfrom",
    1320             :                                                                           "bad %s value: %s", nme, val);
    1321             : #endif
    1322             :                                                 bb.Hseqbase = (oid)lv;
    1323        8279 :                                         } else if (strcmp(nme, "ttype") == 0) {
    1324         917 :                                                 if (lv >= GDKatomcnt)
    1325           0 :                                                         throw(MAL, "remote.bincopyfrom",
    1326             :                                                                   "bad %s value: %s", nme, val);
    1327         917 :                                                 bb.Ttype = (int) lv;
    1328        7362 :                                         } else if (strcmp(nme, "tseqbase") == 0) {
    1329             : #if SIZEOF_OID < SIZEOF_LNG
    1330             :                                                 if (lv > GDK_oid_max)
    1331             :                                                         throw(MAL, "remote.bincopyfrom",
    1332             :                                                                   "bad %s value: %s", nme, val);
    1333             : #endif
    1334          26 :                                                 bb.Tseqbase = (oid) lv;
    1335        7336 :                                         } else if (strcmp(nme, "tsorted") == 0) {
    1336         917 :                                                 bb.Tsorted = lv != 0;
    1337        6419 :                                         } else if (strcmp(nme, "trevsorted") == 0) {
    1338         917 :                                                 bb.Trevsorted = lv != 0;
    1339        5502 :                                         } else if (strcmp(nme, "tkey") == 0) {
    1340         917 :                                                 bb.Tkey = lv != 0;
    1341        4585 :                                         } else if (strcmp(nme, "tnonil") == 0) {
    1342         917 :                                                 bb.Tnonil = lv != 0;
    1343        3668 :                                         } else if (strcmp(nme, "tdense") == 0) {
    1344         917 :                                                 bb.Tdense = lv != 0;
    1345        2751 :                                         } else if (strcmp(nme, "size") == 0) {
    1346         917 :                                                 if (lv > (lng) BUN_MAX)
    1347           0 :                                                         throw(MAL, "remote.bincopyfrom",
    1348             :                                                                   "bad %s value: %s", nme, val);
    1349         917 :                                                 bb.size = (BUN) lv;
    1350        1834 :                                         } else if (strcmp(nme, "tailsize") == 0) {
    1351         917 :                                                 bb.tailsize = (size_t) lv;
    1352         917 :                                         } else if (strcmp(nme, "theapsize") == 0) {
    1353         917 :                                                 bb.theapsize = (size_t) lv;
    1354             :                                         } else {
    1355           0 :                                                 throw(MAL, "remote.bincopyfrom",
    1356             :                                                           "unknown element: %s", nme);
    1357             :                                         }
    1358             :                                 }
    1359             :                                 nme = val = NULL;
    1360             :                                 break;
    1361             :                 }
    1362      152082 :                 hdr++;
    1363             :         }
    1364             : 
    1365         917 :         b = COLnew(0, bb.Ttype, bb.size, TRANSIENT);
    1366         917 :         if (b == NULL)
    1367           0 :                 throw(MAL, "remote.get", SQLSTATE(HY013) MAL_MALLOC_FAIL);
    1368             : 
    1369             :         /* for strings, the width may not match, fix it to match what we
    1370             :          * retrieved */
    1371         917 :         if (bb.Ttype == TYPE_str && bb.size) {
    1372         150 :                 b->twidth = (unsigned short) (bb.tailsize / bb.size);
    1373         150 :                 b->tshift = ATOMelmshift(Tsize(b));
    1374             :         }
    1375             : 
    1376         917 :         if (bb.tailsize > 0) {
    1377        1646 :                 if (HEAPextend(&b->theap, bb.tailsize, true) != GDK_SUCCEED ||
    1378         823 :                         mnstr_read(in, b->theap.base, bb.tailsize, 1) < 0)
    1379           0 :                         goto bailout;
    1380         823 :                 b->theap.dirty = true;
    1381             :         }
    1382         917 :         if (bb.theapsize > 0) {
    1383         300 :                 if (HEAPextend(b->tvheap, bb.theapsize, true) != GDK_SUCCEED ||
    1384         150 :                         mnstr_read(in, b->tvheap->base, bb.theapsize, 1) < 0)
    1385           0 :                         goto bailout;
    1386         150 :                 b->tvheap->free = bb.theapsize;
    1387         150 :                 b->tvheap->dirty = true;
    1388             :         }
    1389             : 
    1390             :         /* set properties */
    1391         917 :         b->tseqbase = bb.Tdense ? bb.Tseqbase : oid_nil;
    1392         917 :         b->tsorted = bb.Tsorted;
    1393         917 :         b->trevsorted = bb.Trevsorted;
    1394         917 :         b->tkey = bb.Tkey;
    1395         917 :         b->tnonil = bb.Tnonil;
    1396         917 :         if (bb.Ttype == TYPE_str && bb.size)
    1397         150 :                 BATsetcapacity(b, (BUN) (bb.tailsize >> b->tshift));
    1398         917 :         BATsetcount(b, bb.size);
    1399         917 :         b->batDirtydesc = true;
    1400             : 
    1401             :         /* read blockmode flush */
    1402         917 :         while (mnstr_read(in, &tmp, 1, 1) > 0) {
    1403           0 :                 TRC_ERROR(MAL_REMOTE, "Expected flush, got: %c\n", tmp);
    1404             :         }
    1405             : 
    1406         917 :         BATsettrivprop(b);
    1407             : 
    1408         917 :         *ret = b;
    1409         917 :         return(MAL_SUCCEED);
    1410             : 
    1411           0 :   bailout:
    1412           0 :         BBPreclaim(b);
    1413           0 :         throw(MAL, "remote.bincopyfrom", "reading failed");
    1414             : }
    1415             : 
    1416             : /**
    1417             :  * read from the input stream and give the BAT handle back to the caller
    1418             :  */
    1419           0 : static str RMTbincopyfrom(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) {
    1420           0 :         BAT *b = NULL;
    1421             :         ValPtr v;
    1422             :         str err;
    1423             : 
    1424             :         (void)mb;
    1425             : 
    1426             :         /* We receive a normal line, which contains the JSON header, the
    1427             :          * rest is binary data directly on the stream.  We get the first
    1428             :          * line from the buffered stream we have here, and pass it on
    1429             :          * together with the raw stream we have. */
    1430           0 :         cntxt->fdin->eof = false; /* in case it was before */
    1431           0 :         if (bstream_next(cntxt->fdin) <= 0)
    1432           0 :                 throw(MAL, "remote.bincopyfrom", "expected JSON header");
    1433             : 
    1434           0 :         cntxt->fdin->buf[cntxt->fdin->len] = '\0';
    1435           0 :         err = RMTinternalcopyfrom(&b,
    1436           0 :                         &cntxt->fdin->buf[cntxt->fdin->pos], cntxt->fdin->s);
    1437             :         /* skip the JSON line */
    1438           0 :         cntxt->fdin->pos = ++cntxt->fdin->len;
    1439           0 :         if (err != MAL_SUCCEED)
    1440             :                 return(err);
    1441             : 
    1442           0 :         v = &stk->stk[pci->argv[0]];
    1443           0 :         v->val.bval = b->batCacheid;
    1444           0 :         v->vtype = TYPE_bat;
    1445           0 :         BBPkeepref(b->batCacheid);
    1446             : 
    1447           0 :         return(MAL_SUCCEED);
    1448             : }
    1449             : 
    1450             : /**
    1451             :  * bintype identifies the system on its binary profile.  This is mainly
    1452             :  * used to determine if BATs can be sent binary across.
    1453             :  */
    1454         139 : static str RMTbintype(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) {
    1455             :         int type = 0;
    1456             :         (void)mb;
    1457             :         (void)stk;
    1458             :         (void)pci;
    1459             : 
    1460             : #ifdef WORDS_BIGENDIAN
    1461             :         type |= RMTT_B_ENDIAN;
    1462             : #else
    1463             :         type |= RMTT_L_ENDIAN;
    1464             : #endif
    1465             : #if SIZEOF_SIZE_T == SIZEOF_LNG
    1466             :         type |= RMTT_64_BITS;
    1467             : #else
    1468             :         type |= RMTT_32_BITS;
    1469             : #endif
    1470             : #if SIZEOF_OID == SIZEOF_LNG
    1471             :         type |= RMTT_64_OIDS;
    1472             : #else
    1473             :         type |= RMTT_32_OIDS;
    1474             : #endif
    1475             : 
    1476         139 :         mnstr_printf(cntxt->fdout, "[ %d ]\n", type);
    1477             : 
    1478         139 :         return(MAL_SUCCEED);
    1479             : }
    1480             : 
    1481             : /**
    1482             :  * Returns whether the underlying connection is still connected or not.
    1483             :  * Best effort implementation on top of mapi using a ping.
    1484             :  */
    1485             : static str
    1486           0 : RMTisalive(int *ret, str *conn)
    1487             : {
    1488             :         str tmp;
    1489             :         connection c;
    1490             : 
    1491           0 :         if (*conn == NULL || strcmp(*conn, (str)str_nil) == 0)
    1492           0 :                 throw(ILLARG, "remote.get", ILLEGAL_ARGUMENT ": connection name is NULL or nil");
    1493             : 
    1494             :         /* lookup conn, set c if valid */
    1495           0 :         rethrow("remote.get", tmp, RMTfindconn(&c, *conn));
    1496             : 
    1497           0 :         *ret = 0;
    1498           0 :         if (mapi_is_connected(c->mconn) && mapi_ping(c->mconn) == MOK)
    1499           0 :                 *ret = 1;
    1500             : 
    1501             :         return MAL_SUCCEED;
    1502             : }
    1503             : 
    1504             : // This is basically a no op
    1505             : static str
    1506         268 : RMTregisterSupervisor(int *ret, str *sup_uuid, str *query_uuid) {
    1507             :         (void)sup_uuid;
    1508             :         (void)query_uuid;
    1509             : 
    1510         268 :         *ret = 0;
    1511         268 :         return MAL_SUCCEED;
    1512             : }
    1513             : 
    1514             : #include "mel.h"
    1515             : mel_func remote_init_funcs[] = {
    1516             :  command("remote", "prelude", RMTprelude, false, "initialise the remote module", args(1,1, arg("",void))),
    1517             :  command("remote", "epilogue", RMTepilogue, false, "release the resources held by the remote module", args(1,1, arg("",void))),
    1518             :  command("remote", "resolve", RMTresolve, false, "resolve a pattern against Merovingian and return the URIs", args(1,2, batarg("",str),arg("pattern",str))),
    1519             :  command("remote", "connect", RMTconnect, false, "returns a newly created connection for uri, using user name and password", args(1,4, arg("",str),arg("uri",str),arg("user",str),arg("passwd",str))),
    1520             :  command("remote", "connect", RMTconnectScen, false, "returns a newly created connection for uri, using user name, password and scenario", args(1,5, arg("",str),arg("uri",str),arg("user",str),arg("passwd",str),arg("scen",str))),
    1521             :  pattern("remote", "connect", RMTconnectTable, false, "return a newly created connection for a table. username and password should be in the vault", args(1,3, arg("",str),arg("table",str),arg("schen",str))),
    1522             :  command("remote", "disconnect", RMTdisconnect, false, "disconnects the connection pointed to by handle (received from a call to connect()", args(1,2, arg("",void),arg("conn",str))),
    1523             :  pattern("remote", "get", RMTget, false, "retrieves a copy of remote object ident", args(1,3, argany("",0),arg("conn",str),arg("ident",str))),
    1524             :  pattern("remote", "put", RMTput, false, "copies object to the remote site and returns its identifier", args(1,3, arg("",str),arg("conn",str),argany("object",0))),
    1525             :  pattern("remote", "register", RMTregister, false, "register <mod>.<fcn> at the remote site", args(1,4, arg("",void),arg("conn",str),arg("mod",str),arg("fcn",str))),
    1526             :  pattern("remote", "exec", RMTexec, false, "remotely executes <mod>.<func> and returns the handle to its result", args(1,4, arg("",str),arg("conn",str),arg("mod",str),arg("func",str))),
    1527             :  pattern("remote", "exec", RMTexec, false, "remotely executes <mod>.<func> and returns the handle to its result", args(1,4, vararg("",str),arg("conn",str),arg("mod",str),arg("func",str))),
    1528             :  pattern("remote", "exec", RMTexec, false, "remotely executes <mod>.<func> using the argument list of remote objects and returns the handle to its result", args(1,5, arg("",str),arg("conn",str),arg("mod",str),arg("func",str),vararg("",str))),
    1529             :  pattern("remote", "exec", RMTexec, false, "remotely executes <mod>.<func> using the argument list of remote objects and returns the handle to its result", args(1,5, vararg("",str),arg("conn",str),arg("mod",str),arg("func",str),vararg("",str))),
    1530             :  command("remote", "isalive", RMTisalive, false, "check if conn is still valid and connected", args(1,2, arg("",int),arg("conn",str))),
    1531             :  pattern("remote", "batload", RMTbatload, false, "create a BAT of the given type and size, and load values from the input stream", args(1,3, batargany("",1),argany("tt",1),arg("size",int))),
    1532             :  pattern("remote", "batbincopy", RMTbincopyto, false, "dump BAT b in binary form to the stream", args(1,2, arg("",void),batargany("b",0))),
    1533             :  pattern("remote", "batbincopy", RMTbincopyfrom, false, "store the binary BAT data in the BBP and return as BAT", args(1,1, batargany("",0))),
    1534             :  pattern("remote", "bintype", RMTbintype, false, "print the binary type of this mserver5", args(1,1, arg("",void))),
    1535             :  command("remote", "register_supervisor", RMTregisterSupervisor, false, "Register the supervisor uuid at a remote site", args(1,3, arg("",int),arg("sup_uuid",str),arg("query_uuid",str))),
    1536             :  { .imp=NULL }
    1537             : };
    1538             : #include "mal_import.h"
    1539             : #ifdef _MSC_VER
    1540             : #undef read
    1541             : #pragma section(".CRT$XCU",read)
    1542             : #endif
    1543         255 : LIB_STARTUP_FUNC(init_remote_mal)
    1544         255 : { mal_module("remote", NULL, remote_init_funcs); }
    1545             : 
    1546             : #endif

Generated by: LCOV version 1.14