Line data Source code
1 : /*
2 : * This Source Code Form is subject to the terms of the Mozilla Public
3 : * License, v. 2.0. If a copy of the MPL was not distributed with this
4 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
5 : *
6 : * Copyright 1997 - July 2008 CWI, August 2008 - 2021 MonetDB B.V.
7 : */
8 :
9 : /*
10 : * (c) Fabian Groffen, Martin Kersten
11 : * Remote querying functionality
12 : * Communication with other mservers at the MAL level is a delicate task.
13 : * However, it is indispensable for any distributed functionality. This
14 : * module provides an abstract way to store and retrieve objects on a
15 : * remote site. Additionally, functions on a remote site can be executed
16 : * using objects available in the remote session context. This yields in
17 : * four primitive functions that form the basis for distribution methods:
18 : * get, put, register and exec.
19 : *
20 : * The get method simply retrieves a copy of a remote object. Objects can
21 : * be simple values, strings or Column. The same holds for the put method,
22 : * but the other way around. A local object can be stored on a remote
23 : * site. Upon a successful store, the put method returns the remote
24 : * identifier for the stored object. With this identifier the object can
25 : * be addressed, e.g. using the get method to retrieve the object that was
26 : * stored using put.
27 : *
28 : * The get and put methods are symmetric. Performing a get on an
29 : * identifier that was returned by put, results in an object with the same
30 : * value and type as the one that was put. The result of such an operation is
31 : * equivalent to making an (expensive) copy of the original object.
32 : *
33 : * The register function takes a local MAL function and makes it known at a
34 : * remote site. It ensures that it does not overload an already known
35 : * operation remotely, which could create a semantic conflict.
36 : * Deregistering a function is forbidden, because it would allow for taking
37 : * over the remote site completely.
38 : * C-implemented functions, such as io.print() cannot be remotely stored.
39 : * It would require even more complicated (byte) code shipping and remote
40 : * compilation to make it work.
41 : *
42 : * The choice to let exec only execute functions avoids problems
43 : * to decide what should be returned to the caller. With a function it is
44 : * clear and simple to return that what the function signature prescribes.
45 : * Any side effect (e.g. io.print calls) may cause havoc in the system,
46 : * but are currently ignored.
47 : *
48 : * This leads to the final contract of this module. The methods should be
49 : * used correctly, by obeying their contract. Failing to do so will result
50 : * in errors and possibly undefined behaviour.
51 : *
52 : * The resolve() function can be used to query Merovingian. It returns one
53 : * or more databases discovered in its vicinity matching the given pattern.
54 : *
55 : */
56 : #include "monetdb_config.h"
57 : #include "remote.h"
58 :
59 : /*
60 : * Technically, these methods need to be serialised per connection,
61 : * hence a scheduler that interleaves e.g. multiple get calls, simply
62 : * violates this constraint. If parallelism to the same site is
63 : * desired, a user could create a second connection. This is not always
64 : * easy to generate at the proper place, e.g. overloading the dataflow
65 : * optimizer to patch connections structures is not acceptable.
66 : *
67 : * Instead, we maintain a simple lock with each connection, which can be
68 : * used to issue a safe, but blocking get/put/exec/register request.
69 : */
70 : #ifdef HAVE_MAPI
71 :
72 :
73 : #include "mal_exception.h"
74 : #include "mal_interpreter.h"
75 : #include "mal_function.h" /* for printFunction */
76 : #include "mal_listing.h"
77 : #include "mal_instruction.h" /* for getmodule/func macros */
78 : #include "mal_authorize.h"
79 : #include "mapi.h"
80 : #include "mutils.h"
81 :
82 : #define RMTT_L_ENDIAN (0<<1)
83 : #define RMTT_B_ENDIAN (1<<1)
84 : #define RMTT_32_BITS (0<<2)
85 : #define RMTT_64_BITS (1<<2)
86 : #define RMTT_32_OIDS (0<<3)
87 : #define RMTT_64_OIDS (1<<3)
88 :
89 : typedef struct _connection {
90 : MT_Lock lock; /* lock to avoid interference */
91 : str name; /* the handle for this connection */
92 : Mapi mconn; /* the Mapi handle for the connection */
93 : unsigned char type; /* binary profile of the connection target */
94 : size_t nextid; /* id counter */
95 : struct _connection *next; /* the next connection in the list */
96 : } *connection;
97 :
98 : #ifndef WIN32
99 : #include <sys/socket.h> /* socket */
100 : #include <sys/un.h> /* sockaddr_un */
101 : #endif
102 : #include <unistd.h> /* gethostname */
103 :
104 : static connection conns = NULL;
105 : static unsigned char localtype = 0177;
106 :
107 : static inline str RMTquery(MapiHdl *ret, const char *func, Mapi conn, const char *query);
108 :
109 : /**
110 : * Returns a BAT with valid redirects for the given pattern. If
111 : * merovingian is not running, this function throws an error.
112 : */
113 0 : static str RMTresolve(bat *ret, str *pat) {
114 : #ifdef NATIVE_WIN32
115 : (void) ret;
116 : (void) pat;
117 : throw(MAL, "remote.resolve", "merovingian is not available on "
118 : "your platform, sorry"); /* please upgrade to Linux, etc. */
119 : #else
120 : BAT *list;
121 : const char *mero_uri;
122 : char *p;
123 : unsigned int port;
124 : char **redirs;
125 : char **or;
126 :
127 0 : if (pat == NULL || *pat == NULL || strcmp(*pat, (str)str_nil) == 0)
128 0 : throw(ILLARG, "remote.resolve",
129 : ILLEGAL_ARGUMENT ": pattern is NULL or nil");
130 :
131 0 : mero_uri = GDKgetenv("merovingian_uri");
132 0 : if (mero_uri == NULL)
133 0 : throw(MAL, "remote.resolve", "this function needs the mserver "
134 : "have been started by merovingian");
135 :
136 0 : list = COLnew(0, TYPE_str, 0, TRANSIENT);
137 0 : if (list == NULL)
138 0 : throw(MAL, "remote.resolve", SQLSTATE(HY013) MAL_MALLOC_FAIL);
139 :
140 : /* extract port from mero_uri, let mapi figure out the rest */
141 0 : mero_uri+=strlen("mapi:monetdb://");
142 0 : if (*mero_uri == '[') {
143 0 : if ((mero_uri = strchr(mero_uri, ']')) == NULL) {
144 0 : BBPreclaim(list);
145 0 : throw(MAL, "remote.resolve", "illegal IPv6 address on merovingian_uri: %s",
146 : GDKgetenv("merovingian_uri"));
147 : }
148 : }
149 0 : if ((p = strchr(mero_uri, ':')) == NULL) {
150 0 : BBPreclaim(list);
151 0 : throw(MAL, "remote.resolve", "illegal merovingian_uri setting: %s",
152 : GDKgetenv("merovingian_uri"));
153 : }
154 0 : port = (unsigned int)atoi(p + 1);
155 :
156 0 : or = redirs = mapi_resolve(NULL, port, *pat);
157 :
158 0 : if (redirs == NULL) {
159 0 : BBPreclaim(list);
160 0 : throw(MAL, "remote.resolve", "unknown failure when resolving pattern");
161 : }
162 :
163 0 : while (*redirs != NULL) {
164 0 : if (BUNappend(list, (ptr)*redirs, false) != GDK_SUCCEED) {
165 0 : BBPreclaim(list);
166 : do
167 0 : free(*redirs);
168 0 : while (*++redirs);
169 0 : free(or);
170 0 : throw(MAL, "remote.resolve", SQLSTATE(HY013) MAL_MALLOC_FAIL);
171 : }
172 0 : free(*redirs);
173 0 : redirs++;
174 : }
175 0 : free(or);
176 :
177 0 : BBPkeepref(*ret = list->batCacheid);
178 0 : return(MAL_SUCCEED);
179 : #endif
180 : }
181 :
182 :
183 : /* for unique connection identifiers */
184 : static size_t connection_id = 0;
185 :
186 : /**
187 : * Returns a connection to the given uri. It always returns a newly
188 : * created connection.
189 : */
190 79 : static str RMTconnectScen(
191 : str *ret,
192 : str *ouri,
193 : str *user,
194 : str *passwd,
195 : str *scen,
196 : bit *columnar)
197 : {
198 : connection c;
199 : char conn[BUFSIZ];
200 : char *s;
201 : Mapi m;
202 : MapiHdl hdl;
203 : str msg;
204 :
205 : /* just make sure the return isn't garbage */
206 79 : *ret = 0;
207 :
208 79 : if (ouri == NULL || *ouri == NULL || strcmp(*ouri, (str)str_nil) == 0)
209 0 : throw(ILLARG, "remote.connect", ILLEGAL_ARGUMENT ": database uri "
210 : "is NULL or nil");
211 79 : if (user == NULL || *user == NULL || strcmp(*user, (str)str_nil) == 0)
212 0 : throw(ILLARG, "remote.connect", ILLEGAL_ARGUMENT ": username is "
213 : "NULL or nil");
214 79 : if (passwd == NULL || *passwd == NULL || strcmp(*passwd, (str)str_nil) == 0)
215 0 : throw(ILLARG, "remote.connect", ILLEGAL_ARGUMENT ": password is "
216 : "NULL or nil");
217 79 : if (scen == NULL || *scen == NULL || strcmp(*scen, (str)str_nil) == 0)
218 0 : throw(ILLARG, "remote.connect", ILLEGAL_ARGUMENT ": scenario is "
219 : "NULL or nil");
220 79 : if (strcmp(*scen, "mal") != 0 && strcmp(*scen, "msql") != 0)
221 0 : throw(ILLARG, "remote.connect", ILLEGAL_ARGUMENT ": scenario '%s' "
222 : "is not supported", *scen);
223 :
224 79 : m = mapi_mapiuri(*ouri, *user, *passwd, *scen);
225 79 : if (mapi_error(m)) {
226 0 : msg = createException(MAL, "remote.connect",
227 : "unable to connect to '%s': %s",
228 : *ouri, mapi_error_str(m));
229 0 : mapi_destroy(m);
230 0 : return msg;
231 : }
232 :
233 79 : MT_lock_set(&mal_remoteLock);
234 :
235 : /* generate an unique connection name, they are only known
236 : * within one mserver, id is primary key, the rest is super key */
237 79 : snprintf(conn, BUFSIZ, "%s_%s_%zu", mapi_get_dbname(m), *user, connection_id++);
238 : /* make sure we can construct MAL identifiers using conn */
239 1590 : for (s = conn; *s != '\0'; s++) {
240 1511 : if (!isalnum((unsigned char)*s)) {
241 219 : *s = '_';
242 : }
243 : }
244 :
245 79 : if (mapi_reconnect(m) != MOK) {
246 3 : MT_lock_unset(&mal_remoteLock);
247 3 : msg = createException(IO, "remote.connect",
248 : "unable to connect to '%s': %s",
249 : *ouri, mapi_error_str(m));
250 3 : mapi_destroy(m);
251 3 : return msg;
252 : }
253 :
254 76 : if (columnar && *columnar) {
255 : char set_protocol_query_buf[50];
256 1 : snprintf(set_protocol_query_buf, 50, "sql.set_protocol(%d:int);", PROTOCOL_COLUMNAR);
257 1 : if ((msg = RMTquery(&hdl, "remote.connect", m, set_protocol_query_buf))) {
258 0 : mapi_destroy(m);
259 0 : MT_lock_unset(&mal_remoteLock);
260 0 : return msg;
261 : }
262 : }
263 :
264 : /* connection established, add to list */
265 76 : c = GDKzalloc(sizeof(struct _connection));
266 76 : if ( c == NULL || (c->name = GDKstrdup(conn)) == NULL) {
267 0 : GDKfree(c);
268 0 : mapi_destroy(m);
269 0 : MT_lock_unset(&mal_remoteLock);
270 0 : throw(MAL,"remote.connect", SQLSTATE(HY013) MAL_MALLOC_FAIL);
271 : }
272 76 : c->mconn = m;
273 76 : c->nextid = 0;
274 76 : MT_lock_init(&c->lock, c->name);
275 76 : c->next = conns;
276 76 : conns = c;
277 :
278 76 : msg = RMTquery(&hdl, "remote.connect", m, "remote.bintype();");
279 76 : if (msg) {
280 0 : MT_lock_unset(&mal_remoteLock);
281 0 : return msg;
282 : }
283 152 : if (hdl != NULL && mapi_fetch_row(hdl)) {
284 76 : char *val = mapi_fetch_field(hdl, 0);
285 76 : c->type = (unsigned char)atoi(val);
286 76 : mapi_close_handle(hdl);
287 : } else {
288 0 : c->type = 0;
289 : }
290 :
291 : #ifdef _DEBUG_MAPI_
292 : mapi_trace(c->mconn, true);
293 : #endif
294 :
295 76 : MT_lock_unset(&mal_remoteLock);
296 :
297 76 : *ret = GDKstrdup(conn);
298 76 : if(*ret == NULL)
299 0 : throw(MAL,"remote.connect", SQLSTATE(HY013) MAL_MALLOC_FAIL);
300 : return(MAL_SUCCEED);
301 : }
302 :
303 : static str
304 0 : RMTconnect(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) {
305 : (void) cntxt;
306 : (void) mb;
307 0 : str* ret = getArgReference_str(stk, pci, 0);
308 0 : str* uri = getArgReference_str(stk, pci, 1);
309 0 : str* user = getArgReference_str(stk, pci, 2);
310 0 : str* passwd = getArgReference_str(stk, pci, 3);
311 :
312 0 : str scen = "msql";
313 :
314 0 : if (pci->argc >= 5)
315 0 : scen = *getArgReference_str(stk, pci, 4);
316 :
317 0 : return RMTconnectScen(ret, uri, user, passwd, &scen, NULL);
318 : }
319 :
320 : static str
321 78 : RMTconnectTable(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
322 : {
323 : char *local_table;
324 78 : char *remoteuser = NULL;
325 78 : char *passwd = NULL;
326 78 : char *uri = NULL;
327 : char *tmp;
328 : char *ret;
329 : str scen;
330 : str msg;
331 : ValPtr v;
332 :
333 : (void)mb;
334 : (void)cntxt;
335 :
336 78 : local_table = *getArgReference_str(stk, pci, 1);
337 78 : scen = *getArgReference_str(stk, pci, 2);
338 78 : if (local_table == NULL || strcmp(local_table, (str)str_nil) == 0) {
339 0 : throw(ILLARG, "remote.connect", ILLEGAL_ARGUMENT ": local table is NULL or nil");
340 : }
341 :
342 78 : rethrow("remote.connect", tmp, AUTHgetRemoteTableCredentials(local_table, &uri, &remoteuser, &passwd));
343 78 : if (!remoteuser)
344 0 : remoteuser = GDKstrdup("");
345 78 : if (!passwd)
346 0 : passwd = GDKstrdup("");
347 : /* The password we just got is hashed. Add the byte \1 in front to
348 : * signal this fact to the mapi. */
349 78 : size_t pwlen = strlen(passwd);
350 78 : char *pwhash = (char*)GDKmalloc(pwlen + 2);
351 78 : if (pwhash == NULL) {
352 0 : GDKfree(remoteuser);
353 0 : GDKfree(passwd);
354 0 : throw(MAL, "remote.connect", SQLSTATE(HY013) MAL_MALLOC_FAIL);
355 : }
356 78 : snprintf(pwhash, pwlen + 2, "\1%s", passwd);
357 :
358 78 : msg = RMTconnectScen(&ret, &uri, &remoteuser, &pwhash, &scen, NULL);
359 :
360 78 : GDKfree(passwd);
361 78 : GDKfree(pwhash);
362 :
363 78 : if (msg == MAL_SUCCEED) {
364 75 : v = &stk->stk[pci->argv[0]];
365 75 : if (VALinit(v, TYPE_str, ret) == NULL) {
366 0 : GDKfree(ret);
367 0 : throw(MAL, "remote.connect", SQLSTATE(HY013) MAL_MALLOC_FAIL);
368 : }
369 : }
370 :
371 78 : GDKfree(ret);
372 78 : return msg;
373 : }
374 :
375 :
376 : /**
377 : * Disconnects a connection. The connection needs not to exist in the
378 : * system, it only needs to exist for the client (i.e. it was once
379 : * created).
380 : */
381 : str
382 76 : RMTdisconnect(void *ret, str *conn) {
383 : connection c, t;
384 :
385 76 : if (conn == NULL || *conn == NULL || strcmp(*conn, (str)str_nil) == 0)
386 0 : throw(ILLARG, "remote.disconnect", ILLEGAL_ARGUMENT ": connection "
387 : "is NULL or nil");
388 :
389 :
390 : (void) ret;
391 :
392 : /* we need a lock because the same user can be handled by multiple
393 : * threads */
394 76 : MT_lock_set(&mal_remoteLock);
395 76 : c = conns;
396 : t = NULL; /* parent */
397 : /* walk through the list */
398 97 : while (c != NULL) {
399 97 : if (strcmp(c->name, *conn) == 0) {
400 : /* ok, delete it... */
401 76 : if (t == NULL) {
402 65 : conns = c->next;
403 : } else {
404 11 : t->next = c->next;
405 : }
406 :
407 76 : MT_lock_set(&c->lock); /* shared connection */
408 76 : mapi_disconnect(c->mconn);
409 76 : mapi_destroy(c->mconn);
410 76 : MT_lock_unset(&c->lock);
411 76 : MT_lock_destroy(&c->lock);
412 76 : GDKfree(c->name);
413 76 : GDKfree(c);
414 76 : MT_lock_unset(&mal_remoteLock);
415 76 : return MAL_SUCCEED;
416 : }
417 : t = c;
418 21 : c = c->next;
419 : }
420 :
421 0 : MT_lock_unset(&mal_remoteLock);
422 0 : throw(MAL, "remote.disconnect", "no such connection: %s", *conn);
423 : }
424 :
425 : /**
426 : * Helper function to return a connection matching a given string, or an
427 : * error if it does not exist. Since this function is internal, it
428 : * doesn't check the argument conn, as it should have been checked
429 : * already.
430 : * NOTE: this function acquires the mal_remoteLock before accessing conns
431 : */
432 : static inline str
433 3402 : RMTfindconn(connection *ret, const char *conn) {
434 : connection c;
435 :
436 : /* just make sure the return isn't garbage */
437 3402 : *ret = NULL;
438 3402 : MT_lock_set(&mal_remoteLock); /* protect c */
439 3402 : c = conns;
440 3965 : while (c != NULL) {
441 3965 : if (strcmp(c->name, conn) == 0) {
442 3402 : *ret = c;
443 3402 : MT_lock_unset(&mal_remoteLock);
444 3402 : return(MAL_SUCCEED);
445 : }
446 563 : c = c->next;
447 : }
448 0 : MT_lock_unset(&mal_remoteLock);
449 0 : throw(MAL, "remote.<findconn>", "no such connection: %s", conn);
450 : }
451 :
452 : /**
453 : * Little helper function that returns a GDKmalloced string containing a
454 : * valid identifier that is supposed to be unique in the connection's
455 : * remote context. The generated string depends on the module and
456 : * function the caller is in. But also the runtime context is important.
457 : * The format is rmt<id>_<retvar>_<type>. Every RMTgetId uses a fresh id,
458 : * to distinguish amongst different (parallel) execution context.
459 : * Re-use of this remote identifier should be done with care.
460 : * The encoding of the type allows for ease of type checking later on.
461 : */
462 : static inline str
463 1922 : RMTgetId(char *buf, MalBlkPtr mb, InstrPtr p, int arg) {
464 : InstrPtr f;
465 : const char *mod;
466 : char *var;
467 : str rt;
468 : static ATOMIC_TYPE idtag = ATOMIC_VAR_INIT(0);
469 :
470 1922 : if( p->retc == 0)
471 0 : throw(MAL, "remote.getId", ILLEGAL_ARGUMENT "MAL instruction misses retc");
472 :
473 1922 : var = getArgName(mb, p, arg);
474 : f = getInstrPtr(mb, 0); /* top level function */
475 : mod = getModuleId(f);
476 : if (mod == NULL)
477 : mod = "user";
478 1922 : rt = getTypeIdentifier(getArgType(mb,p,arg));
479 1922 : if (rt == NULL)
480 0 : throw(MAL, "remote.put", SQLSTATE(HY013) MAL_MALLOC_FAIL);
481 :
482 1922 : snprintf(buf, BUFSIZ, "rmt%u_%s_%s", (unsigned) ATOMIC_ADD(&idtag, 1), var, rt);
483 :
484 1922 : GDKfree(rt);
485 1922 : return(MAL_SUCCEED);
486 : }
487 :
488 : /**
489 : * Helper function to execute a query over the given connection,
490 : * returning the result handle. If communication fails in one way or
491 : * another, an error is returned. Since this function is internal, it
492 : * doesn't check the input arguments func, conn and query, as they
493 : * should have been checked already.
494 : * NOTE: this function assumes a lock for conn is set
495 : */
496 : static inline str
497 2280 : RMTquery(MapiHdl *ret, const char *func, Mapi conn, const char *query) {
498 : MapiHdl mhdl;
499 :
500 2280 : *ret = NULL;
501 2280 : mhdl = mapi_query(conn, query);
502 2280 : if (mhdl) {
503 2280 : if (mapi_result_error(mhdl) != NULL) {
504 11 : str err = createException(
505 : getExceptionType(mapi_result_error(mhdl)),
506 : func,
507 : "(mapi:monetdb://%s@%s/%s) %s",
508 : mapi_get_user(conn),
509 : mapi_get_host(conn),
510 : mapi_get_dbname(conn),
511 : getExceptionMessage(mapi_result_error(mhdl)));
512 11 : mapi_close_handle(mhdl);
513 11 : return(err);
514 : }
515 : } else {
516 0 : if (mapi_error(conn) != MOK) {
517 0 : throw(IO, func, "an error occurred on connection: %s",
518 : mapi_error_str(conn));
519 : } else {
520 0 : throw(MAL, func, "remote function invocation didn't return a result");
521 : }
522 : }
523 :
524 2269 : *ret = mhdl;
525 2269 : return(MAL_SUCCEED);
526 : }
527 :
528 264 : static str RMTprelude(void *ret) {
529 : unsigned int type = 0;
530 :
531 : (void)ret;
532 : #ifdef WORDS_BIGENDIAN
533 : type |= RMTT_B_ENDIAN;
534 : #else
535 : type |= RMTT_L_ENDIAN;
536 : #endif
537 : #if SIZEOF_SIZE_T == SIZEOF_LNG
538 : type |= RMTT_64_BITS;
539 : #else
540 : type |= RMTT_32_BITS;
541 : #endif
542 : #if SIZEOF_OID == SIZEOF_LNG
543 : type |= RMTT_64_OIDS;
544 : #else
545 : type |= RMTT_32_OIDS;
546 : #endif
547 264 : localtype = (unsigned char)type;
548 :
549 264 : return(MAL_SUCCEED);
550 : }
551 :
552 262 : static str RMTepilogue(void *ret) {
553 : connection c, t;
554 :
555 : (void)ret;
556 :
557 262 : MT_lock_set(&mal_remoteLock); /* nobody allowed here */
558 : /* free connections list */
559 262 : c = conns;
560 262 : while (c != NULL) {
561 : t = c;
562 0 : c = c->next;
563 0 : MT_lock_set(&t->lock);
564 0 : mapi_destroy(t->mconn);
565 0 : MT_lock_unset(&t->lock);
566 0 : MT_lock_destroy(&t->lock);
567 0 : GDKfree(t->name);
568 0 : GDKfree(t);
569 : }
570 : /* not sure, but better be safe than sorry */
571 262 : conns = NULL;
572 262 : MT_lock_unset(&mal_remoteLock);
573 :
574 262 : return(MAL_SUCCEED);
575 : }
576 : static str
577 1201 : RMTreadbatheader(stream* sin, char* buf) {
578 : ssize_t sz = 0, rd;
579 :
580 : /* read the JSON header */
581 195220 : while ((rd = mnstr_read(sin, &buf[sz], 1, 1)) == 1 && buf[sz] != '\n') {
582 194019 : sz += rd;
583 : }
584 1201 : if (rd < 0) {
585 0 : throw(MAL, "remote.get", "could not read BAT JSON header");
586 : }
587 1201 : if (buf[0] == '!') {
588 : char *result;
589 0 : if((result = GDKstrdup(buf)) == NULL)
590 0 : throw(MAL, "remote.get", SQLSTATE(HY013) MAL_MALLOC_FAIL);
591 : return result;
592 : }
593 :
594 1201 : buf[sz] = '\0';
595 :
596 1201 : return MAL_SUCCEED;
597 : }
598 :
599 : typedef struct _binbat_v1 {
600 : int Ttype;
601 : oid Hseqbase;
602 : oid Tseqbase;
603 : bool
604 : Tsorted:1,
605 : Trevsorted:1,
606 : Tkey:1,
607 : Tnonil:1,
608 : Tdense:1;
609 : BUN size;
610 : size_t tailsize;
611 : size_t theapsize;
612 : } binbat;
613 :
614 : static str
615 1201 : RMTinternalcopyfrom(BAT **ret, char *hdr, stream *in, bool must_flush)
616 : {
617 1201 : binbat bb = { 0, 0, 0, false, false, false, false, false, 0, 0, 0 };
618 : char *nme = NULL;
619 : char *val = NULL;
620 : char tmp;
621 : size_t len;
622 : lng lv, *lvp;
623 :
624 : BAT *b;
625 :
626 : /* hdr is a JSON structure that looks like
627 : * {"version":1,"ttype":6,"tseqbase":0,"tailsize":4,"theapsize":0}
628 : * we take the binary data directly from the stream */
629 :
630 : /* could skip whitespace, but we just don't allow that */
631 1201 : if (*hdr++ != '{')
632 0 : throw(MAL, "remote.bincopyfrom", "illegal input, not a JSON header (got '%s')", hdr - 1);
633 194019 : while (*hdr != '\0') {
634 192818 : switch (*hdr) {
635 28824 : case '"':
636 : /* we assume only numeric values, so all strings are
637 : * elems */
638 28824 : if (nme != NULL) {
639 14412 : *hdr = '\0';
640 : } else {
641 14412 : nme = hdr + 1;
642 : }
643 : break;
644 14412 : case ':':
645 14412 : val = hdr + 1;
646 14412 : break;
647 14412 : case ',':
648 : case '}':
649 14412 : if (val == NULL)
650 0 : throw(MAL, "remote.bincopyfrom",
651 : "illegal input, JSON value missing");
652 14412 : *hdr = '\0';
653 :
654 14412 : lvp = &lv;
655 14412 : len = sizeof(lv);
656 : /* tseqbase can be 1<<31/1<<63 which causes overflow
657 : * in lngFromStr, so we check separately */
658 14412 : if (strcmp(val,
659 : #if SIZEOF_OID == 8
660 : "9223372036854775808"
661 : #else
662 : "2147483648"
663 : #endif
664 1197 : ) == 0 &&
665 1197 : strcmp(nme, "tseqbase") == 0) {
666 1197 : bb.Tseqbase = oid_nil;
667 : } else {
668 : /* all values should be non-negative, so we check that
669 : * here as well */
670 13215 : if (lngFromStr(val, &len, &lvp, true) < 0 ||
671 13215 : lv < 0 /* includes lng_nil */)
672 0 : throw(MAL, "remote.bincopyfrom",
673 : "bad %s value: %s", nme, val);
674 :
675 : /* deal with nme and val */
676 13215 : if (strcmp(nme, "version") == 0) {
677 1201 : if (lv != 1)
678 0 : throw(MAL, "remote.bincopyfrom",
679 : "unsupported version: %s", val);
680 12014 : } else if (strcmp(nme, "hseqbase") == 0) {
681 : #if SIZEOF_OID < SIZEOF_LNG
682 : if (lv > GDK_oid_max)
683 : throw(MAL, "remote.bincopyfrom",
684 : "bad %s value: %s", nme, val);
685 : #endif
686 1201 : bb.Hseqbase = (oid)lv;
687 10813 : } else if (strcmp(nme, "ttype") == 0) {
688 1201 : if (lv >= GDKatomcnt)
689 0 : throw(MAL, "remote.bincopyfrom",
690 : "bad %s value: GDK atom number %s doesn't exist", nme, val);
691 1201 : bb.Ttype = (int) lv;
692 9612 : } else if (strcmp(nme, "tseqbase") == 0) {
693 : #if SIZEOF_OID < SIZEOF_LNG
694 : if (lv > GDK_oid_max)
695 : throw(MAL, "remote.bincopyfrom",
696 : "bad %s value: %s", nme, val);
697 : #endif
698 4 : bb.Tseqbase = (oid) lv;
699 9608 : } else if (strcmp(nme, "tsorted") == 0) {
700 1201 : bb.Tsorted = lv != 0;
701 8407 : } else if (strcmp(nme, "trevsorted") == 0) {
702 1201 : bb.Trevsorted = lv != 0;
703 7206 : } else if (strcmp(nme, "tkey") == 0) {
704 1201 : bb.Tkey = lv != 0;
705 6005 : } else if (strcmp(nme, "tnonil") == 0) {
706 1201 : bb.Tnonil = lv != 0;
707 4804 : } else if (strcmp(nme, "tdense") == 0) {
708 1201 : bb.Tdense = lv != 0;
709 3603 : } else if (strcmp(nme, "size") == 0) {
710 1201 : if (lv > (lng) BUN_MAX)
711 0 : throw(MAL, "remote.bincopyfrom",
712 : "bad %s value: %s", nme, val);
713 1201 : bb.size = (BUN) lv;
714 2402 : } else if (strcmp(nme, "tailsize") == 0) {
715 1201 : bb.tailsize = (size_t) lv;
716 1201 : } else if (strcmp(nme, "theapsize") == 0) {
717 1201 : bb.theapsize = (size_t) lv;
718 : } else {
719 0 : throw(MAL, "remote.bincopyfrom",
720 : "unknown element: %s", nme);
721 : }
722 : }
723 : nme = val = NULL;
724 : break;
725 : }
726 192818 : hdr++;
727 : }
728 :
729 1201 : b = COLnew2(bb.Hseqbase, bb.Ttype, bb.size, TRANSIENT, bb.size > 0 ? (uint16_t) (bb.tailsize / bb.size) : 0);
730 1201 : if (b == NULL)
731 0 : throw(MAL, "remote.get", SQLSTATE(HY013) MAL_MALLOC_FAIL);
732 :
733 1201 : if (bb.tailsize > 0) {
734 2258 : if (HEAPextend(b->theap, bb.tailsize, true) != GDK_SUCCEED ||
735 1129 : mnstr_read(in, b->theap->base, bb.tailsize, 1) < 0)
736 0 : goto bailout;
737 1129 : b->theap->dirty = true;
738 : }
739 1201 : if (bb.theapsize > 0) {
740 70 : if ((b->tvheap->base == NULL &&
741 70 : (*BATatoms[b->ttype].atomHeap)(b->tvheap, b->batCapacity) != GDK_SUCCEED) ||
742 70 : HEAPextend(b->tvheap, bb.theapsize, true) != GDK_SUCCEED ||
743 35 : mnstr_read(in, b->tvheap->base, bb.theapsize, 1) < 0)
744 0 : goto bailout;
745 35 : b->tvheap->free = bb.theapsize;
746 35 : b->tvheap->dirty = true;
747 : }
748 :
749 : /* set properties */
750 1201 : b->tseqbase = bb.Tdense ? bb.Tseqbase : oid_nil;
751 1201 : b->tsorted = bb.Tsorted;
752 1201 : b->trevsorted = bb.Trevsorted;
753 1201 : b->tkey = bb.Tkey;
754 1201 : b->tnonil = bb.Tnonil;
755 1201 : if (bb.Ttype == TYPE_str && bb.size)
756 35 : BATsetcapacity(b, (BUN) (bb.tailsize >> b->tshift));
757 1201 : BATsetcount(b, bb.size);
758 :
759 : // read blockmode flush
760 1201 : while (must_flush && mnstr_read(in, &tmp, 1, 1) > 0) {
761 0 : TRC_ERROR(MAL_REMOTE, "Expected flush, got: %c\n", tmp);
762 : }
763 :
764 1201 : BATsettrivprop(b);
765 :
766 1201 : *ret = b;
767 1201 : return(MAL_SUCCEED);
768 :
769 0 : bailout:
770 0 : BBPreclaim(b);
771 0 : throw(MAL, "remote.bincopyfrom", "reading failed");
772 : }
773 :
774 : /**
775 : * get fetches the object referenced by ident over connection conn.
776 : * We are only interested in retrieving void-headed BATs, i.e. single columns.
777 : */
778 1199 : static str RMTget(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) {
779 : str conn, ident, tmp, rt;
780 : connection c;
781 : char qbuf[BUFSIZ + 1];
782 1199 : MapiHdl mhdl = NULL;
783 : int rtype;
784 : ValPtr v;
785 :
786 : (void)mb;
787 : (void) cntxt;
788 :
789 1199 : conn = *getArgReference_str(stk, pci, 1);
790 1199 : if (conn == NULL || strcmp(conn, (str)str_nil) == 0)
791 0 : throw(ILLARG, "remote.get", ILLEGAL_ARGUMENT ": connection name is NULL or nil");
792 1199 : ident = *getArgReference_str(stk, pci, 2);
793 1199 : if (ident == 0 || isIdentifier(ident) < 0)
794 0 : throw(ILLARG, "remote.get", ILLEGAL_ARGUMENT ": identifier expected, got '%s'", ident);
795 :
796 : /* lookup conn, set c if valid */
797 1199 : rethrow("remote.get", tmp, RMTfindconn(&c, conn));
798 :
799 1199 : rtype = getArgType(mb, pci, 0);
800 1199 : v = &stk->stk[pci->argv[0]];
801 :
802 1199 : if (rtype == TYPE_any || isAnyExpression(rtype)) {
803 : char *tpe, *msg;
804 0 : tpe = getTypeName(rtype);
805 0 : msg = createException(MAL, "remote.get", ILLEGAL_ARGUMENT ": unsupported any type: %s",
806 : tpe);
807 0 : GDKfree(tpe);
808 0 : return msg;
809 : }
810 : /* check if the remote type complies with what we expect.
811 : Since the put() encodes the type as known to the remote site
812 : we can simple compare it here */
813 1199 : rt = getTypeIdentifier(rtype);
814 1199 : if (rt == NULL)
815 0 : throw(MAL, "remote.get", SQLSTATE(HY013) MAL_MALLOC_FAIL);
816 1199 : if (strcmp(ident + strlen(ident) - strlen(rt), rt)) {
817 0 : tmp = createException(MAL, "remote.get", ILLEGAL_ARGUMENT
818 : ": remote object type %s does not match expected type %s",
819 : rt, ident);
820 0 : GDKfree(rt);
821 0 : return tmp;
822 : }
823 1199 : GDKfree(rt);
824 :
825 1199 : if (isaBatType(rtype) && (localtype == 0177 || localtype != c->type ))
826 0 : {
827 : int t;
828 : size_t s;
829 : ptr r;
830 : str var;
831 : BAT *b;
832 :
833 0 : snprintf(qbuf, BUFSIZ, "io.print(%s);", ident);
834 :
835 0 : TRC_DEBUG(MAL_REMOTE, "Remote get: %s\n", qbuf);
836 :
837 : /* this call should be a single transaction over the channel*/
838 0 : MT_lock_set(&c->lock);
839 :
840 0 : if ((tmp = RMTquery(&mhdl, "remote.get", c->mconn, qbuf))
841 : != MAL_SUCCEED)
842 : {
843 0 : TRC_ERROR(MAL_REMOTE, "Remote get: %s\n%s\n", qbuf, tmp);
844 0 : MT_lock_unset(&c->lock);
845 0 : var = createException(MAL, "remote.get", "%s", tmp);
846 0 : freeException(tmp);
847 0 : return var;
848 : }
849 0 : t = getBatType(rtype);
850 0 : b = COLnew(0, t, 0, TRANSIENT);
851 0 : if (b == NULL) {
852 0 : mapi_close_handle(mhdl);
853 0 : MT_lock_unset(&c->lock);
854 0 : throw(MAL, "remote.get", SQLSTATE(HY013) MAL_MALLOC_FAIL);
855 : }
856 :
857 0 : if (ATOMbasetype(t) == TYPE_str) {
858 0 : while (mapi_fetch_row(mhdl)) {
859 0 : var = mapi_fetch_field(mhdl, 1);
860 0 : if (BUNappend(b, var == NULL ? str_nil : var, false) != GDK_SUCCEED) {
861 0 : BBPreclaim(b);
862 0 : mapi_close_handle(mhdl);
863 0 : MT_lock_unset(&c->lock);
864 0 : throw(MAL, "remote.get", SQLSTATE(HY013) MAL_MALLOC_FAIL);
865 : }
866 : }
867 : } else
868 0 : while (mapi_fetch_row(mhdl)) {
869 0 : var = mapi_fetch_field(mhdl, 1);
870 0 : if (var == NULL)
871 : var = "nil";
872 0 : s = 0;
873 0 : r = NULL;
874 0 : if (ATOMfromstr(t, &r, &s, var, true) < 0 ||
875 0 : BUNappend(b, r, false) != GDK_SUCCEED) {
876 0 : BBPreclaim(b);
877 0 : GDKfree(r);
878 0 : mapi_close_handle(mhdl);
879 0 : MT_lock_unset(&c->lock);
880 0 : throw(MAL, "remote.get", GDK_EXCEPTION);
881 : }
882 0 : GDKfree(r);
883 : }
884 :
885 0 : v->val.bval = b->batCacheid;
886 0 : v->vtype = TYPE_bat;
887 0 : BBPkeepref(b->batCacheid);
888 :
889 0 : mapi_close_handle(mhdl);
890 0 : MT_lock_unset(&c->lock);
891 2398 : } else if (isaBatType(rtype)) {
892 : /* binary compatible remote host, transfer BAT in binary form */
893 : stream *sout;
894 : stream *sin;
895 : char buf[256];
896 1199 : BAT *b = NULL;
897 :
898 : /* this call should be a single transaction over the channel*/
899 1199 : MT_lock_set(&c->lock);
900 :
901 : /* bypass Mapi from this point to efficiently write all data to
902 : * the server */
903 1199 : sout = mapi_get_to(c->mconn);
904 1199 : sin = mapi_get_from(c->mconn);
905 1199 : if (sin == NULL || sout == NULL) {
906 0 : MT_lock_unset(&c->lock);
907 0 : throw(MAL, "remote.get", "Connection lost");
908 : }
909 :
910 : /* call our remote helper to do this more efficiently */
911 1199 : mnstr_printf(sout, "remote.batbincopy(%s);\n", ident);
912 1199 : mnstr_flush(sout, MNSTR_FLUSH_DATA);
913 :
914 1199 : if ((tmp = RMTreadbatheader(sin, buf)) != MAL_SUCCEED) {
915 0 : MT_lock_unset(&c->lock);
916 0 : return tmp;
917 : }
918 :
919 1199 : if ((tmp = RMTinternalcopyfrom(&b, buf, sin, true)) != MAL_SUCCEED) {
920 0 : MT_lock_unset(&c->lock);
921 0 : return(tmp);
922 : }
923 :
924 1199 : v->val.bval = b->batCacheid;
925 1199 : v->vtype = TYPE_bat;
926 1199 : BBPkeepref(b->batCacheid);
927 :
928 1199 : MT_lock_unset(&c->lock);
929 : } else {
930 0 : ptr p = NULL;
931 : str val;
932 0 : size_t len = 0;
933 :
934 0 : snprintf(qbuf, BUFSIZ, "io.print(%s);", ident);
935 0 : TRC_DEBUG(MAL_REMOTE, "Remote get: %s - %s\n", c->name, qbuf);
936 0 : if ((tmp=RMTquery(&mhdl, "remote.get", c->mconn, qbuf)) != MAL_SUCCEED)
937 : {
938 0 : return tmp;
939 : }
940 0 : (void) mapi_fetch_row(mhdl); /* should succeed */
941 0 : val = mapi_fetch_field(mhdl, 0);
942 :
943 0 : if (ATOMbasetype(rtype) == TYPE_str) {
944 0 : if (!VALinit(v, rtype, val == NULL ? str_nil : val)) {
945 0 : mapi_close_handle(mhdl);
946 0 : throw(MAL, "remote.get", SQLSTATE(HY013) MAL_MALLOC_FAIL);
947 : }
948 0 : } else if (ATOMfromstr(rtype, &p, &len, val == NULL ? "nil" : val, true) < 0) {
949 : char *msg;
950 0 : msg = createException(MAL, "remote.get",
951 : "unable to parse value: %s",
952 : val == NULL ? "nil" : val);
953 0 : mapi_close_handle(mhdl);
954 0 : GDKfree(p);
955 0 : return msg;
956 : } else {
957 0 : VALset(v, rtype, p);
958 0 : if (ATOMextern(rtype) == 0)
959 0 : GDKfree(p);
960 : }
961 :
962 0 : mapi_close_handle(mhdl);
963 : }
964 :
965 : return(MAL_SUCCEED);
966 : }
967 :
968 : /**
969 : * stores the given object on the remote host. The identifier of the
970 : * object on the remote host is returned for later use.
971 : */
972 1922 : static str RMTput(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) {
973 : str conn, tmp;
974 : char ident[BUFSIZ];
975 : connection c;
976 : ValPtr v;
977 : int type;
978 : ptr value;
979 1922 : MapiHdl mhdl = NULL;
980 :
981 : (void)cntxt;
982 :
983 1922 : conn = *getArgReference_str(stk, pci, 1);
984 1922 : if (conn == NULL || strcmp(conn, (str)str_nil) == 0)
985 0 : throw(ILLARG, "remote.put", ILLEGAL_ARGUMENT ": connection name is NULL or nil");
986 :
987 : /* lookup conn */
988 1922 : rethrow("remote.put", tmp, RMTfindconn(&c, conn));
989 :
990 : /* put the thing */
991 1922 : type = getArgType(mb, pci, 2);
992 1922 : value = getArgReference(stk, pci, 2);
993 :
994 : /* this call should be a single transaction over the channel*/
995 1922 : MT_lock_set(&c->lock);
996 :
997 : /* get a free, typed identifier for the remote host */
998 1922 : tmp = RMTgetId(ident, mb, pci, 2);
999 1921 : if (tmp != MAL_SUCCEED) {
1000 0 : MT_lock_unset(&c->lock);
1001 0 : return tmp;
1002 : }
1003 :
1004 : /* depending on the input object generate actions to store the
1005 : * object remotely*/
1006 1921 : if (type == TYPE_any || type == TYPE_bat || isAnyExpression(type)) {
1007 : char *tpe, *msg;
1008 0 : MT_lock_unset(&c->lock);
1009 0 : tpe = getTypeName(type);
1010 0 : msg = createException(MAL, "remote.put", "unsupported type: %s", tpe);
1011 0 : GDKfree(tpe);
1012 0 : return msg;
1013 3121 : } else if (isaBatType(type) && !is_bat_nil(*(bat*) value)) {
1014 : BATiter bi;
1015 : /* naive approach using bat.new() and bat.insert() calls */
1016 : char *tail;
1017 : bat bid;
1018 : BAT *b = NULL;
1019 : BUN p, q;
1020 : str tailv;
1021 : stream *sout;
1022 :
1023 1200 : tail = getTypeIdentifier(getBatType(type));
1024 1200 : if (tail == NULL) {
1025 0 : MT_lock_unset(&c->lock);
1026 0 : throw(MAL, "remote.put", SQLSTATE(HY013) MAL_MALLOC_FAIL);
1027 : }
1028 :
1029 1200 : bid = *(bat *)value;
1030 1200 : if (bid != 0) {
1031 1200 : if ((b = BATdescriptor(bid)) == NULL){
1032 0 : MT_lock_unset(&c->lock);
1033 0 : GDKfree(tail);
1034 0 : throw(MAL, "remote.put", SQLSTATE(HY002) RUNTIME_OBJECT_MISSING);
1035 : }
1036 : }
1037 :
1038 : /* bypass Mapi from this point to efficiently write all data to
1039 : * the server */
1040 1200 : sout = mapi_get_to(c->mconn);
1041 :
1042 : /* call our remote helper to do this more efficiently */
1043 1200 : mnstr_printf(sout,
1044 : "%s := remote.batload(nil:%s, " BUNFMT ");\n",
1045 : ident, tail, (bid == 0 ? 0 : BATcount(b)));
1046 1200 : mnstr_flush(sout, MNSTR_FLUSH_DATA);
1047 1200 : GDKfree(tail);
1048 :
1049 : /* b can be NULL if bid == 0 (only type given, ugh) */
1050 1200 : if (b) {
1051 1200 : int tpe = getBatType(type), trivial = tpe < TYPE_date || ATOMbasetype(tpe) == TYPE_str;
1052 1200 : const void *nil = ATOMnilptr(tpe);
1053 1200 : int (*atomcmp)(const void *, const void *) = ATOMcompare(tpe);
1054 :
1055 1200 : bi = bat_iterator(b);
1056 1200 : BATloop(b, p, q) {
1057 0 : const void *v = BUNtail(bi, p);
1058 0 : tailv = ATOMformat(tpe, v);
1059 0 : if (tailv == NULL) {
1060 0 : bat_iterator_end(&bi);
1061 0 : BBPunfix(b->batCacheid);
1062 0 : MT_lock_unset(&c->lock);
1063 0 : throw(MAL, "remote.put", GDK_EXCEPTION);
1064 : }
1065 0 : if (trivial || atomcmp(v, nil) == 0)
1066 0 : mnstr_printf(sout, "%s\n", tailv);
1067 : else
1068 0 : mnstr_printf(sout, "\"%s\"\n", tailv);
1069 0 : GDKfree(tailv);
1070 : }
1071 1200 : bat_iterator_end(&bi);
1072 1200 : BBPunfix(b->batCacheid);
1073 : }
1074 :
1075 : /* write the empty line the server is waiting for, handles
1076 : * all errors at the same time, if any */
1077 1200 : if ((tmp = RMTquery(&mhdl, "remote.put", c->mconn, ""))
1078 : != MAL_SUCCEED)
1079 : {
1080 0 : MT_lock_unset(&c->lock);
1081 0 : return tmp;
1082 : }
1083 1200 : mapi_close_handle(mhdl);
1084 721 : } else if (isaBatType(type) && is_bat_nil(*(bat*) value)) {
1085 : stream *sout;
1086 0 : str typename = getTypeName(type);
1087 0 : sout = mapi_get_to(c->mconn);
1088 0 : mnstr_printf(sout,
1089 : "%s := nil:%s;\n", ident, typename);
1090 0 : mnstr_flush(sout, MNSTR_FLUSH_DATA);
1091 0 : GDKfree(typename);
1092 : } else {
1093 : size_t l;
1094 : str val;
1095 : char *tpe;
1096 : char qbuf[512], *nbuf = qbuf;
1097 721 : const void *nil = ATOMnilptr(type), *p = value;
1098 721 : int (*atomcmp)(const void *, const void *) = ATOMcompare(type);
1099 :
1100 721 : if (ATOMextern(type))
1101 506 : p = *(str *)value;
1102 :
1103 721 : val = ATOMformat(type, p);
1104 722 : if (val == NULL) {
1105 0 : MT_lock_unset(&c->lock);
1106 0 : throw(MAL, "remote.put", GDK_EXCEPTION);
1107 : }
1108 722 : tpe = getTypeIdentifier(type);
1109 722 : if (tpe == NULL) {
1110 0 : MT_lock_unset(&c->lock);
1111 0 : GDKfree(val);
1112 0 : throw(MAL, "remote.put", SQLSTATE(HY013) MAL_MALLOC_FAIL);
1113 : }
1114 722 : l = strlen(val) + strlen(tpe) + strlen(ident) + 10;
1115 722 : if (l > (ssize_t) sizeof(qbuf) && (nbuf = GDKmalloc(l)) == NULL) {
1116 0 : MT_lock_unset(&c->lock);
1117 0 : GDKfree(val);
1118 0 : GDKfree(tpe);
1119 0 : throw(MAL, "remote.put", SQLSTATE(HY013) MAL_MALLOC_FAIL);
1120 : }
1121 722 : if (type < TYPE_date || ATOMbasetype(type) == TYPE_str || atomcmp(p, nil) == 0)
1122 722 : snprintf(nbuf, l, "%s := %s:%s;\n", ident, val, tpe);
1123 : else
1124 0 : snprintf(nbuf, l, "%s := \"%s\":%s;\n", ident, val, tpe);
1125 722 : GDKfree(tpe);
1126 722 : GDKfree(val);
1127 722 : TRC_DEBUG(MAL_REMOTE, "Remote put: %s - %s\n", c->name, nbuf);
1128 722 : tmp = RMTquery(&mhdl, "remote.put", c->mconn, nbuf);
1129 722 : if (nbuf != qbuf)
1130 16 : GDKfree(nbuf);
1131 722 : if (tmp != MAL_SUCCEED) {
1132 0 : MT_lock_unset(&c->lock);
1133 0 : return tmp;
1134 : }
1135 722 : mapi_close_handle(mhdl);
1136 : }
1137 1922 : MT_lock_unset(&c->lock);
1138 :
1139 : /* return the identifier */
1140 1922 : v = &stk->stk[pci->argv[0]];
1141 1922 : if (VALinit(v, TYPE_str, ident) == NULL)
1142 0 : throw(MAL, "remote.put", SQLSTATE(HY013) MAL_MALLOC_FAIL);
1143 : return(MAL_SUCCEED);
1144 : }
1145 :
1146 : /**
1147 : * stores the given <mod>.<fcn> on the remote host.
1148 : * An error is returned if the function is already known at the remote site.
1149 : * The implementation is based on serialisation of the block into a string
1150 : * followed by remote parsing.
1151 : */
1152 0 : static str RMTregisterInternal(Client cntxt, char** fcn_id, const char *conn, const char *mod, const char *fcn)
1153 : {
1154 : str tmp, qry, msg;
1155 : connection c;
1156 : char buf[BUFSIZ];
1157 0 : MapiHdl mhdl = NULL;
1158 : Symbol sym;
1159 :
1160 0 : if (strNil(conn))
1161 0 : throw(ILLARG, "remote.register", ILLEGAL_ARGUMENT ": connection name is NULL or nil");
1162 :
1163 : /* find local definition */
1164 0 : sym = findSymbol(cntxt->usermodule, putName(mod), putName(fcn));
1165 0 : if (sym == NULL)
1166 0 : throw(MAL, "remote.register", ILLEGAL_ARGUMENT ": no such function: %s.%s", mod, fcn);
1167 :
1168 : /* lookup conn */
1169 0 : rethrow("remote.register", tmp, RMTfindconn(&c, conn));
1170 :
1171 : /* this call should be a single transaction over the channel*/
1172 0 : MT_lock_set(&c->lock);
1173 :
1174 : /* get a free, typed identifier for the remote host */
1175 : char ident[BUFSIZ];
1176 0 : tmp = RMTgetId(ident, sym->def, getInstrPtr(sym->def, 0), 0);
1177 0 : if (tmp != MAL_SUCCEED) {
1178 0 : MT_lock_unset(&c->lock);
1179 0 : return tmp;
1180 : }
1181 :
1182 : /* check remote definition */
1183 0 : snprintf(buf, BUFSIZ, "b:bit:=inspect.getExistence(\"%s\",\"%s\");\nio.print(b);", mod, ident);
1184 0 : TRC_DEBUG(MAL_REMOTE, "Remote register: %s - %s\n", c->name, buf);
1185 0 : if ((msg = RMTquery(&mhdl, "remote.register", c->mconn, buf)) != MAL_SUCCEED){
1186 0 : MT_lock_unset(&c->lock);
1187 0 : return msg;
1188 : }
1189 :
1190 : char* result;
1191 0 : if ( mapi_get_field_count(mhdl) && mapi_fetch_row(mhdl) && (result = mapi_fetch_field(mhdl, 0))) {
1192 0 : if (strcmp(result, "false") != 0)
1193 0 : msg = createException(MAL, "remote.register",
1194 : "function already exists at the remote site: %s.%s",
1195 : mod, fcn);
1196 : }
1197 : else
1198 0 : msg = createException(MAL, "remote.register", OPERATION_FAILED);
1199 :
1200 0 : mapi_close_handle(mhdl);
1201 :
1202 0 : if (msg) {
1203 0 : MT_lock_unset(&c->lock);
1204 0 : return msg;
1205 : }
1206 :
1207 0 : *fcn_id = GDKstrdup(ident);
1208 0 : if (*fcn_id == NULL) {
1209 0 : MT_lock_unset(&c->lock);
1210 0 : throw(MAL, "Remote register", MAL_MALLOC_FAIL);
1211 : }
1212 :
1213 : Symbol prg;
1214 0 : if ((prg = newFunction(putName(mod), putName(*fcn_id), FUNCTIONsymbol)) == NULL) {
1215 0 : MT_lock_unset(&c->lock);
1216 0 : throw(MAL, "Remote register", MAL_MALLOC_FAIL);
1217 : }
1218 :
1219 : // We only need the Symbol not the inner program stub. So we clear it.
1220 0 : freeMalBlk(prg->def);
1221 0 : prg->def = NULL;
1222 :
1223 0 : if ((prg->def = copyMalBlk(sym->def)) == NULL) {
1224 0 : MT_lock_unset(&c->lock);
1225 0 : freeSymbol(prg);
1226 0 : throw(MAL, "Remote register", MAL_MALLOC_FAIL);
1227 : }
1228 0 : setFunctionId(getInstrPtr(prg->def, 0), putName(*fcn_id));
1229 :
1230 : /* make sure the program is error free */
1231 0 : msg = chkProgram(cntxt->usermodule, prg->def);
1232 0 : if ( msg != MAL_SUCCEED || prg->def->errors) {
1233 0 : MT_lock_unset(&c->lock);
1234 0 : if (msg)
1235 : return msg;
1236 0 : throw(MAL, "remote.register",
1237 : "function '%s.%s' contains syntax or type errors",
1238 : mod, *fcn_id);
1239 : }
1240 :
1241 0 : qry = mal2str(prg->def, 0, prg->def->stop);
1242 0 : TRC_DEBUG(MAL_REMOTE, "Remote register: %s - %s\n", c->name, qry);
1243 0 : msg = RMTquery(&mhdl, "remote.register", c->mconn, qry);
1244 0 : GDKfree(qry);
1245 0 : if (mhdl)
1246 0 : mapi_close_handle(mhdl);
1247 :
1248 0 : freeSymbol(prg);
1249 :
1250 0 : MT_lock_unset(&c->lock);
1251 0 : return msg;
1252 : }
1253 :
1254 0 : static str RMTregister(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) {
1255 0 : char **fcn_id = getArgReference_str(stk, pci, 0);
1256 0 : const char *conn = *getArgReference_str(stk, pci, 1);
1257 0 : const char *mod = *getArgReference_str(stk, pci, 2);
1258 0 : const char *fcn = *getArgReference_str(stk, pci, 3);
1259 : (void)mb;
1260 0 : return RMTregisterInternal(cntxt, fcn_id, conn, mod, fcn);
1261 : }
1262 :
1263 : /**
1264 : * exec executes the function with its given arguments on the remote
1265 : * host, returning the function's return value. exec is purposely kept
1266 : * very spartan. All arguments need to be handles to previously put()
1267 : * values. It calls the function with the given arguments at the remote
1268 : * site, and returns the handle which stores the return value of the
1269 : * remotely executed function. This return value can be retrieved using
1270 : * a get call. It handles multiple return arguments.
1271 : */
1272 281 : static str RMTexec(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) {
1273 : str conn, mod, func, tmp;
1274 : int i;
1275 : size_t len, buflen;
1276 281 : connection c= NULL;
1277 : char *qbuf;
1278 : MapiHdl mhdl;
1279 :
1280 : (void)cntxt;
1281 : (void)mb;
1282 : bool no_return_arguments = 0;
1283 :
1284 : columnar_result_callback* rcb = NULL;
1285 281 : ValRecord *v = &(stk)->stk[(pci)->argv[4]];
1286 281 : if (pci->retc == 1 && (pci->argc >= 4) && (v->vtype == TYPE_ptr) ) {
1287 1 : rcb = (columnar_result_callback*) v->val.pval;
1288 : }
1289 :
1290 1697 : for (i = 0; i < pci->retc; i++) {
1291 1416 : if (stk->stk[pci->argv[i]].vtype == TYPE_str) {
1292 1415 : tmp = *getArgReference_str(stk, pci, i);
1293 1415 : if (tmp == NULL || strcmp(tmp, (str)str_nil) == 0)
1294 0 : throw(ILLARG, "remote.exec", ILLEGAL_ARGUMENT
1295 : ": return value %d is NULL or nil", i);
1296 : }
1297 : else
1298 : no_return_arguments = 1;
1299 : }
1300 :
1301 281 : conn = *getArgReference_str(stk, pci, i++);
1302 281 : if (conn == NULL || strcmp(conn, (str)str_nil) == 0)
1303 0 : throw(ILLARG, "remote.exec", ILLEGAL_ARGUMENT ": connection name is NULL or nil");
1304 281 : mod = *getArgReference_str(stk, pci, i++);
1305 281 : if (mod == NULL || strcmp(mod, (str)str_nil) == 0)
1306 0 : throw(ILLARG, "remote.exec", ILLEGAL_ARGUMENT ": module name is NULL or nil");
1307 281 : func = *getArgReference_str(stk, pci, i++);
1308 281 : if (func == NULL || strcmp(func, (str)str_nil) == 0)
1309 0 : throw(ILLARG, "remote.exec", ILLEGAL_ARGUMENT ": function name is NULL or nil");
1310 :
1311 : /* lookup conn */
1312 281 : rethrow("remote.exec", tmp, RMTfindconn(&c, conn));
1313 :
1314 : /* this call should be a single transaction over the channel*/
1315 281 : MT_lock_set(&c->lock);
1316 :
1317 281 : if(!no_return_arguments && pci->argc - pci->retc < 3) { /* conn, mod, func, ... */
1318 0 : MT_lock_unset(&c->lock);
1319 0 : throw(MAL, "remote.exec", ILLEGAL_ARGUMENT " MAL instruction misses arguments");
1320 : }
1321 :
1322 : len = 0;
1323 : /* count how big a buffer we need */
1324 281 : len += 2 * (pci->retc > 1);
1325 281 : if (!no_return_arguments)
1326 1695 : for (i = 0; i < pci->retc; i++) {
1327 1415 : len += 2 * (i > 0);
1328 1415 : len += strlen(*getArgReference_str(stk, pci, i));
1329 : }
1330 :
1331 281 : const int arg_index = rcb ? 4 : 3;
1332 :
1333 281 : len += strlen(mod) + strlen(func) + 6;
1334 788 : for (i = arg_index; i < pci->argc - pci->retc; i++) {
1335 507 : len += 2 * (i > arg_index);
1336 507 : len += strlen(*getArgReference_str(stk, pci, pci->retc + i));
1337 : }
1338 : len += 2;
1339 281 : buflen = len + 1;
1340 281 : if ((qbuf = GDKmalloc(buflen)) == NULL) {
1341 0 : MT_lock_unset(&c->lock);
1342 0 : throw(MAL, "remote.exec", SQLSTATE(HY013) MAL_MALLOC_FAIL);
1343 : }
1344 :
1345 : len = 0;
1346 :
1347 281 : if (pci->retc > 1)
1348 38 : qbuf[len++] = '(';
1349 281 : if (!no_return_arguments)
1350 1695 : for (i = 0; i < pci->retc; i++)
1351 1695 : len += snprintf(&qbuf[len], buflen - len, "%s%s",
1352 1415 : (i > 0 ? ", " : ""), *getArgReference_str(stk, pci, i));
1353 :
1354 281 : if (pci->retc > 1)
1355 38 : qbuf[len++] = ')';
1356 :
1357 : /* build the function invocation string in qbuf */
1358 281 : if (!no_return_arguments && pci->retc > 0) {
1359 280 : len += snprintf(&qbuf[len], buflen - len, " := %s.%s(", mod, func);
1360 : }
1361 : else {
1362 1 : len += snprintf(&qbuf[len], buflen - len, " %s.%s(", mod, func);
1363 : }
1364 :
1365 : /* handle the arguments to the function */
1366 :
1367 : /* put the arguments one by one, and dynamically build the
1368 : * invocation string */
1369 788 : for (i = arg_index; i < pci->argc - pci->retc; i++) {
1370 648 : len += snprintf(&qbuf[len], buflen - len, "%s%s",
1371 : (i > arg_index ? ", " : ""),
1372 507 : *(getArgReference_str(stk, pci, pci->retc + i)));
1373 : }
1374 :
1375 : /* finish end execute the invocation string */
1376 281 : len += snprintf(&qbuf[len], buflen - len, ");");
1377 281 : TRC_DEBUG(MAL_REMOTE, "Remote exec: %s - %s\n", c->name, qbuf);
1378 281 : tmp = RMTquery(&mhdl, "remote.exec", c->mconn, qbuf);
1379 281 : GDKfree(qbuf);
1380 :
1381 : /* Temporary hack:
1382 : * use a callback to immediately handle columnar results before hdl is destroyed. */
1383 281 : if (tmp == MAL_SUCCEED && rcb && mhdl && (mapi_get_querytype(mhdl) == Q_TABLE || mapi_get_querytype(mhdl) == Q_PREPARE)) {
1384 1 : int fields = mapi_get_field_count(mhdl);
1385 1 : columnar_result* results = GDKzalloc(sizeof(columnar_result) * fields);
1386 :
1387 1 : if (!results) {
1388 0 : tmp = createException(MAL, "remote.exec", SQLSTATE(HY013) MAL_MALLOC_FAIL);
1389 : } else {
1390 : int i = 0;
1391 1 : char buf[256] = {0};
1392 1 : stream *sin = mapi_get_from(c->mconn);
1393 :
1394 3 : for (; i < fields; i++) {
1395 2 : BAT *b = NULL;
1396 :
1397 4 : if ((tmp = RMTreadbatheader(sin, buf)) != MAL_SUCCEED ||
1398 2 : (tmp = RMTinternalcopyfrom(&b, buf, sin, i == fields - 1)) != MAL_SUCCEED) {
1399 : break;
1400 : }
1401 :
1402 2 : results[i].id = b->batCacheid;
1403 2 : results[i].colname = mapi_get_name(mhdl, i);
1404 2 : results[i].tpename = mapi_get_type(mhdl, i);
1405 2 : results[i].digits = mapi_get_digits(mhdl, i);
1406 2 : results[i].scale = mapi_get_scale(mhdl, i);
1407 : }
1408 :
1409 1 : if (tmp != MAL_SUCCEED) {
1410 0 : for (int j = 0; j < i; j++)
1411 0 : BBPunfix(results[j].id);
1412 : } else {
1413 3 : for (int j = 0; j < i; j++)
1414 2 : BBPkeepref(results[j].id);
1415 1 : assert(rcb->context);
1416 1 : tmp = rcb->call(rcb->context, mapi_get_table(mhdl, 0), results, fields);
1417 3 : for (int j = 0; j < i; j++)
1418 2 : BBPrelease(results[j].id);
1419 : }
1420 1 : GDKfree(results);
1421 : }
1422 : }
1423 :
1424 281 : if (rcb) {
1425 1 : GDKfree(rcb->context);
1426 1 : GDKfree(rcb);
1427 : }
1428 :
1429 281 : if (mhdl)
1430 270 : mapi_close_handle(mhdl);
1431 281 : MT_lock_unset(&c->lock);
1432 281 : return tmp;
1433 : }
1434 :
1435 : /**
1436 : * batload is a helper function to make transferring a BAT with RMTput
1437 : * more efficient. It works by creating a BAT, and loading it with the
1438 : * data as comma separated values from the input stream, until an empty
1439 : * line is read. The given size argument is taken as a hint only, and
1440 : * is not enforced to match the number of rows read.
1441 : */
1442 1200 : static str RMTbatload(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) {
1443 : ValPtr v;
1444 : int t;
1445 : int size;
1446 : ptr r;
1447 : size_t s;
1448 : BAT *b;
1449 : size_t len;
1450 : char *var;
1451 : str msg = MAL_SUCCEED;
1452 1200 : bstream *fdin = cntxt->fdin;
1453 :
1454 1200 : v = &stk->stk[pci->argv[0]]; /* return */
1455 1200 : t = getArgType(mb, pci, 1); /* tail type */
1456 1200 : size = *getArgReference_int(stk, pci, 2); /* size */
1457 :
1458 1200 : b = COLnew(0, t, size, TRANSIENT);
1459 1200 : if (b == NULL)
1460 0 : throw(MAL, "remote.load", SQLSTATE(HY013) MAL_MALLOC_FAIL);
1461 :
1462 : /* grab the input stream and start reading */
1463 1200 : fdin->eof = false;
1464 1200 : len = fdin->pos;
1465 1200 : while (len < fdin->len || bstream_next(fdin) > 0) {
1466 : /* newline hunting (how spartan) */
1467 1200 : for (len = fdin->pos; len < fdin->len && fdin->buf[len] != '\n'; len++)
1468 : ;
1469 : /* unterminated line, request more */
1470 1200 : if (fdin->buf[len] != '\n')
1471 0 : continue;
1472 : /* empty line, end of input */
1473 1200 : if (fdin->pos == len) {
1474 1200 : if (isa_block_stream(fdin->s)) {
1475 1200 : ssize_t n = bstream_next(fdin);
1476 1200 : if( n )
1477 0 : msg = createException(MAL, "remote.load", SQLSTATE(HY013) "Unexpected return from remote");
1478 : }
1479 : break;
1480 : }
1481 0 : fdin->buf[len] = '\0'; /* kill \n */
1482 0 : var = &fdin->buf[fdin->pos];
1483 : /* skip over this line */
1484 0 : fdin->pos = ++len;
1485 :
1486 0 : s = 0;
1487 0 : r = NULL;
1488 0 : if (ATOMfromstr(t, &r, &s, var, true) < 0 ||
1489 0 : BUNappend(b, r, false) != GDK_SUCCEED) {
1490 0 : BBPreclaim(b);
1491 0 : GDKfree(r);
1492 0 : throw(MAL, "remote.get", GDK_EXCEPTION);
1493 : }
1494 0 : GDKfree(r);
1495 : }
1496 :
1497 1200 : v->val.bval = b->batCacheid;
1498 1200 : v->vtype = TYPE_bat;
1499 1200 : BBPkeepref(b->batCacheid);
1500 :
1501 1200 : return msg;
1502 : }
1503 :
1504 : /**
1505 : * dump given BAT to stream
1506 : */
1507 1199 : static str RMTbincopyto(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
1508 : {
1509 1199 : bat bid = *getArgReference_bat(stk, pci, 1);
1510 1199 : BAT *b = BBPquickdesc(bid), *v = b;
1511 : char sendtheap = 0, sendtvheap = 0;
1512 :
1513 : (void)mb;
1514 : (void)stk;
1515 : (void)pci;
1516 :
1517 1199 : if (b == NULL)
1518 0 : throw(MAL, "remote.bincopyto", RUNTIME_OBJECT_UNDEFINED);
1519 :
1520 1199 : if (BBPfix(bid) <= 0)
1521 0 : throw(MAL, "remote.bincopyto", MAL_MALLOC_FAIL);
1522 :
1523 1199 : sendtheap = b->ttype != TYPE_void;
1524 1199 : sendtvheap = sendtheap && b->tvarsized;
1525 1199 : if (isVIEW(b) && sendtvheap && VIEWvtparent(b) && BATcount(b) < BATcount(BBP_cache(VIEWvtparent(b)))) {
1526 18 : if ((b = BATdescriptor(bid)) == NULL) {
1527 0 : BBPunfix(bid);
1528 0 : throw(MAL, "remote.bincopyto", RUNTIME_OBJECT_MISSING);
1529 : }
1530 18 : v = COLcopy(b, b->ttype, true, TRANSIENT);
1531 18 : BBPunfix(b->batCacheid);
1532 18 : if (v == NULL) {
1533 0 : BBPunfix(bid);
1534 0 : throw(MAL, "remote.bincopyto", GDK_EXCEPTION);
1535 : }
1536 : }
1537 :
1538 1245 : mnstr_printf(cntxt->fdout, /*JSON*/"{"
1539 : "\"version\":1,"
1540 : "\"ttype\":%d,"
1541 : "\"hseqbase\":" OIDFMT ","
1542 : "\"tseqbase\":" OIDFMT ","
1543 : "\"tsorted\":%d,"
1544 : "\"trevsorted\":%d,"
1545 : "\"tkey\":%d,"
1546 : "\"tnonil\":%d,"
1547 : "\"tdense\":%d,"
1548 : "\"size\":" BUNFMT ","
1549 : "\"tailsize\":%zu,"
1550 : "\"theapsize\":%zu"
1551 : "}\n",
1552 1199 : v->ttype,
1553 : v->hseqbase, v->tseqbase,
1554 1199 : v->tsorted, v->trevsorted,
1555 1199 : v->tkey,
1556 1199 : v->tnonil,
1557 1199 : BATtdense(v),
1558 : v->batCount,
1559 1195 : sendtheap ? (size_t)v->batCount << v->tshift : 0,
1560 46 : sendtvheap && v->batCount > 0 ? v->tvheap->free : 0
1561 : );
1562 :
1563 1199 : if (sendtheap && v->batCount > 0) {
1564 1127 : BATiter vi = bat_iterator(v);
1565 1127 : mnstr_write(cntxt->fdout, /* tail */
1566 1127 : vi.base, vi.count * vi.width, 1);
1567 1127 : if (sendtvheap)
1568 34 : mnstr_write(cntxt->fdout, /* theap */
1569 34 : vi.vh->base, vi.vhfree, 1);
1570 1127 : bat_iterator_end(&vi);
1571 : }
1572 : /* flush is done by the calling environment (MAL) */
1573 :
1574 1199 : if (b != v)
1575 18 : BBPreclaim(v);
1576 :
1577 1199 : BBPunfix(bid);
1578 :
1579 1199 : return(MAL_SUCCEED);
1580 : }
1581 :
1582 : /**
1583 : * read from the input stream and give the BAT handle back to the caller
1584 : */
1585 0 : static str RMTbincopyfrom(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) {
1586 0 : BAT *b = NULL;
1587 : ValPtr v;
1588 : str err;
1589 :
1590 : (void)mb;
1591 :
1592 : /* We receive a normal line, which contains the JSON header, the
1593 : * rest is binary data directly on the stream. We get the first
1594 : * line from the buffered stream we have here, and pass it on
1595 : * together with the raw stream we have. */
1596 0 : cntxt->fdin->eof = false; /* in case it was before */
1597 0 : if (bstream_next(cntxt->fdin) <= 0)
1598 0 : throw(MAL, "remote.bincopyfrom", "expected JSON header");
1599 :
1600 0 : cntxt->fdin->buf[cntxt->fdin->len] = '\0';
1601 0 : err = RMTinternalcopyfrom(&b,
1602 0 : &cntxt->fdin->buf[cntxt->fdin->pos], cntxt->fdin->s, true);
1603 : /* skip the JSON line */
1604 0 : cntxt->fdin->pos = ++cntxt->fdin->len;
1605 0 : if (err != MAL_SUCCEED)
1606 : return(err);
1607 :
1608 0 : v = &stk->stk[pci->argv[0]];
1609 0 : v->val.bval = b->batCacheid;
1610 0 : v->vtype = TYPE_bat;
1611 0 : BBPkeepref(b->batCacheid);
1612 :
1613 0 : return(MAL_SUCCEED);
1614 : }
1615 :
1616 : /**
1617 : * bintype identifies the system on its binary profile. This is mainly
1618 : * used to determine if BATs can be sent binary across.
1619 : */
1620 76 : static str RMTbintype(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) {
1621 : int type = 0;
1622 : (void)mb;
1623 : (void)stk;
1624 : (void)pci;
1625 :
1626 : #ifdef WORDS_BIGENDIAN
1627 : type |= RMTT_B_ENDIAN;
1628 : #else
1629 : type |= RMTT_L_ENDIAN;
1630 : #endif
1631 : #if SIZEOF_SIZE_T == SIZEOF_LNG
1632 : type |= RMTT_64_BITS;
1633 : #else
1634 : type |= RMTT_32_BITS;
1635 : #endif
1636 : #if SIZEOF_OID == SIZEOF_LNG
1637 : type |= RMTT_64_OIDS;
1638 : #else
1639 : type |= RMTT_32_OIDS;
1640 : #endif
1641 :
1642 76 : mnstr_printf(cntxt->fdout, "[ %d ]\n", type);
1643 :
1644 76 : return(MAL_SUCCEED);
1645 : }
1646 :
1647 : /**
1648 : * Returns whether the underlying connection is still connected or not.
1649 : * Best effort implementation on top of mapi using a ping.
1650 : */
1651 : static str
1652 0 : RMTisalive(int *ret, str *conn)
1653 : {
1654 : str tmp;
1655 : connection c;
1656 :
1657 0 : if (*conn == NULL || strcmp(*conn, (str)str_nil) == 0)
1658 0 : throw(ILLARG, "remote.get", ILLEGAL_ARGUMENT ": connection name is NULL or nil");
1659 :
1660 : /* lookup conn, set c if valid */
1661 0 : rethrow("remote.get", tmp, RMTfindconn(&c, *conn));
1662 :
1663 0 : *ret = 0;
1664 0 : if (mapi_is_connected(c->mconn) && mapi_ping(c->mconn) == MOK)
1665 0 : *ret = 1;
1666 :
1667 : return MAL_SUCCEED;
1668 : }
1669 :
1670 : // This is basically a no op
1671 : static str
1672 130 : RMTregisterSupervisor(int *ret, str *sup_uuid, str *query_uuid) {
1673 : (void)sup_uuid;
1674 : (void)query_uuid;
1675 :
1676 130 : *ret = 0;
1677 130 : return MAL_SUCCEED;
1678 : }
1679 :
1680 : /* this is needed in remote plans */
1681 : static str
1682 11 : RMTassert(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
1683 : {
1684 11 : bool flg = (bool) *getArgReference_bit(stk, pci, 1);
1685 11 : str msg = *getArgReference_str(stk, pci, 2);
1686 :
1687 : (void) cntxt;
1688 : (void) mb;
1689 11 : if (flg) {
1690 11 : if (strlen(msg) > 6 &&
1691 11 : msg[5] == '!' &&
1692 0 : (isdigit((unsigned char) msg[0]) ||
1693 0 : isupper((unsigned char) msg[0])) &&
1694 0 : (isdigit((unsigned char) msg[1]) ||
1695 0 : isupper((unsigned char) msg[1])) &&
1696 0 : (isdigit((unsigned char) msg[2]) ||
1697 0 : isupper((unsigned char) msg[2])) &&
1698 0 : (isdigit((unsigned char) msg[3]) ||
1699 0 : isupper((unsigned char) msg[3])) &&
1700 0 : (isdigit((unsigned char) msg[4]) ||
1701 : isupper((unsigned char) msg[4])))
1702 0 : throw(REMOTE, "assert", "%s", msg); /* includes state */
1703 11 : throw(REMOTE, "assert", SQLSTATE(M0M29) "%s", msg);
1704 : }
1705 : return MAL_SUCCEED;
1706 : }
1707 :
1708 : #include "mel.h"
1709 : mel_func remote_init_funcs[] = {
1710 : command("remote", "prelude", RMTprelude, false, "initialise the remote module", args(1,1, arg("",void))),
1711 : command("remote", "epilogue", RMTepilogue, false, "release the resources held by the remote module", args(1,1, arg("",void))),
1712 : pattern("remote", "assert", RMTassert, false, "Generate an exception when b==true", args(1,3, arg("",void),arg("b",bit),arg("msg",str))),
1713 : command("remote", "resolve", RMTresolve, false, "resolve a pattern against Merovingian and return the URIs", args(1,2, batarg("",str),arg("pattern",str))),
1714 : pattern("remote", "connect", RMTconnect, false, "returns a newly created connection for uri, using user name and password", args(1,5, arg("",str),arg("uri",str),arg("user",str),arg("passwd",str),arg("scen",str))),
1715 : command("remote", "connect", RMTconnectScen, false, "returns a newly created connection for uri, using user name, password and scenario", args(1,6, arg("",str),arg("uri",str),arg("user",str),arg("passwd",str),arg("scen",str),arg("columnar",bit))),
1716 : pattern("remote", "connect", RMTconnectTable, false, "return a newly created connection for a table. username and password should be in the vault", args(1,3, arg("",str),arg("table",str),arg("schen",str))),
1717 : command("remote", "disconnect", RMTdisconnect, false, "disconnects the connection pointed to by handle (received from a call to connect()", args(1,2, arg("",void),arg("conn",str))),
1718 : pattern("remote", "get", RMTget, false, "retrieves a copy of remote object ident", args(1,3, argany("",0),arg("conn",str),arg("ident",str))),
1719 : pattern("remote", "put", RMTput, false, "copies object to the remote site and returns its identifier", args(1,3, arg("",str),arg("conn",str),argany("object",0))),
1720 : pattern("remote", "register", RMTregister, false, "register <mod>.<fcn> at the remote site", args(1,4, arg("",str),arg("conn",str),arg("mod",str),arg("fcn",str))),
1721 : pattern("remote", "exec", RMTexec, false, "remotely executes <mod>.<func> and returns the handle to its result", args(1,4, arg("",str),arg("conn",str),arg("mod",str),arg("func",str))),
1722 : pattern("remote", "exec", RMTexec, false, "remotely executes <mod>.<func> and returns the handle to its result", args(1,4, vararg("",str),arg("conn",str),arg("mod",str),arg("func",str))),
1723 : pattern("remote", "exec", RMTexec, false, "remotely executes <mod>.<func> using the argument list of remote objects and returns the handle to its result", args(1,5, arg("",str),arg("conn",str),arg("mod",str),arg("func",str),vararg("",str))),
1724 : pattern("remote", "exec", RMTexec, false, "remotely executes <mod>.<func> using the argument list of remote objects and returns the handle to its result", args(1,5, vararg("",str),arg("conn",str),arg("mod",str),arg("func",str),vararg("",str))),
1725 : pattern("remote", "exec", RMTexec, false, "remotely executes <mod>.<func> using the argument list of remote objects and applying function pointer rcb as callback to handle any results.", args(0,5, arg("conn",str),arg("mod",str),arg("func",str),arg("rcb",ptr), vararg("",str))),
1726 : pattern("remote", "exec", RMTexec, false, "remotely executes <mod>.<func> using the argument list of remote objects and ignoring results.", args(0,4, arg("conn",str),arg("mod",str),arg("func",str), vararg("",str))),
1727 : command("remote", "isalive", RMTisalive, false, "check if conn is still valid and connected", args(1,2, arg("",int),arg("conn",str))),
1728 : pattern("remote", "batload", RMTbatload, false, "create a BAT of the given type and size, and load values from the input stream", args(1,3, batargany("",1),argany("tt",1),arg("size",int))),
1729 : pattern("remote", "batbincopy", RMTbincopyto, false, "dump BAT b in binary form to the stream", args(1,2, arg("",void),batargany("b",0))),
1730 : pattern("remote", "batbincopy", RMTbincopyfrom, false, "store the binary BAT data in the BBP and return as BAT", args(1,1, batargany("",0))),
1731 : pattern("remote", "bintype", RMTbintype, false, "print the binary type of this mserver5", args(1,1, arg("",void))),
1732 : command("remote", "register_supervisor", RMTregisterSupervisor, false, "Register the supervisor uuid at a remote site", args(1,3, arg("",int),arg("sup_uuid",str),arg("query_uuid",str))),
1733 : { .imp=NULL }
1734 : };
1735 : #include "mal_import.h"
1736 : #ifdef _MSC_VER
1737 : #undef read
1738 : #pragma section(".CRT$XCU",read)
1739 : #endif
1740 257 : LIB_STARTUP_FUNC(init_remote_mal)
1741 257 : { mal_module("remote", NULL, remote_init_funcs); }
1742 :
1743 : #endif
|