Line data Source code
1 : /*
2 : * This Source Code Form is subject to the terms of the Mozilla Public
3 : * License, v. 2.0. If a copy of the MPL was not distributed with this
4 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
5 : *
6 : * Copyright 1997 - July 2008 CWI, August 2008 - 2021 MonetDB B.V.
7 : */
8 :
9 : /*
10 : * @a N.J. Nes P. Boncz, S. Mullender, M. Kersten
11 : * @v 1.1
12 : * @+ MAPI interface
13 : * The complete Mapi library is available to setup
14 : * communication with another Mserver.
15 : *
16 : * Clients may initialize a private listener to implement
17 : * specific services. For example, in an OLTP environment
18 : * it may make sense to have a listener for each transaction
19 : * type, which simply parses a sequence of transaction parameters.
20 : *
21 : * Authorization of access to the server is handled as part
22 : * of the client record initialization phase.
23 : *
24 : * This library internally uses pointer handles, which we replace with
25 : * an index in a locally maintained table. It provides a handle
26 : * to easily detect havoc clients.
27 : *
28 : * A cleaner and simplier interface for distributed processing is available in
29 : * the module remote.
30 : */
31 : #include "monetdb_config.h"
32 : #ifdef HAVE_MAPI
33 : #include "mal_mapi.h"
34 : #include <sys/types.h>
35 : #include "stream_socket.h"
36 : #include "mapi.h"
37 :
38 : #ifdef HAVE_OPENSSL
39 : # include <openssl/rand.h> /* RAND_bytes() */
40 : #else
41 : #ifdef HAVE_COMMONCRYPTO
42 : # include <CommonCrypto/CommonCrypto.h>
43 : # include <CommonCrypto/CommonRandom.h>
44 : #endif
45 : #endif
46 : #ifdef HAVE_WINSOCK_H /* Windows specific */
47 : # include <winsock.h>
48 : #else /* UNIX specific */
49 : # include <sys/select.h>
50 : # include <sys/socket.h>
51 : # include <unistd.h> /* gethostname() */
52 : # include <netinet/in.h> /* hton and ntoh */
53 : # include <arpa/inet.h> /* addr_in */
54 : #endif
55 : #ifdef HAVE_SYS_UN_H
56 : # include <sys/un.h>
57 : #endif
58 : #ifdef HAVE_NETDB_H
59 : # include <netdb.h>
60 : # include <netinet/in.h>
61 : #endif
62 : #ifdef HAVE_POLL_H
63 : #include <poll.h>
64 : #endif
65 : #ifdef HAVE_SYS_UIO_H
66 : # include <sys/uio.h>
67 : #endif
68 : #ifdef HAVE_FCNTL_H
69 : #include <fcntl.h>
70 : #endif
71 :
72 : #ifdef HAVE_SOCKLEN_T
73 : #define SOCKLEN socklen_t
74 : #else
75 : #define SOCKLEN int
76 : #endif
77 :
78 : #if !defined(HAVE_ACCEPT4) || !defined(SOCK_CLOEXEC)
79 : #define accept4(sockfd, addr, addrlen, flags) accept(sockfd, addr, addrlen)
80 : #endif
81 :
82 : static char seedChars[] = {'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j',
83 : 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x',
84 : 'y', 'z', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L',
85 : 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z',
86 : '1', '2', '3', '4', '5', '6', '7', '8', '9', '0'};
87 :
88 :
89 4723 : static void generateChallenge(str buf, int min, int max) {
90 : size_t size;
91 : size_t i;
92 :
93 : #ifdef __COVERITY__
94 : /* hide rand() calls from analysis */
95 : size = (min + max) / 2;
96 : for (i = 0; i < size; i++)
97 : buf[i] = seedChars[i % 62];
98 : buf[size] = 0;
99 : #else
100 : /* don't seed the randomiser here, or you get the same challenge
101 : * during the same second */
102 : #ifdef HAVE_OPENSSL
103 4723 : if (RAND_bytes((unsigned char *) &size, (int) sizeof(size)) < 0)
104 : #else
105 : #ifdef HAVE_COMMONCRYPTO
106 : if (CCRandomGenerateBytes(&size, sizeof(size)) != kCCSuccess)
107 : #endif
108 : #endif
109 0 : size = rand();
110 4723 : size = (size % (max - min)) + min;
111 : #ifdef HAVE_OPENSSL
112 4723 : if (RAND_bytes((unsigned char *) buf, (int) size) >= 0)
113 49581 : for (i = 0; i < size; i++)
114 44858 : buf[i] = seedChars[((unsigned char *) buf)[i] % 62];
115 : else
116 : #else
117 : #ifdef HAVE_COMMONCRYPTO
118 : if (CCRandomGenerateBytes(buf, size) == kCCSuccess)
119 : for (i = 0; i < size; i++)
120 : buf[i] = seedChars[((unsigned char *) buf)[i] % 62];
121 : else
122 : #endif
123 : #endif
124 0 : for (i = 0; i < size; i++) {
125 0 : buf[i] = seedChars[rand() % 62];
126 : }
127 4723 : buf[i] = '\0';
128 : #endif
129 4723 : }
130 :
131 : struct challengedata {
132 : stream *in;
133 : stream *out;
134 : char challenge[13];
135 : };
136 :
137 : static void
138 4723 : doChallenge(void *data)
139 : {
140 4723 : char *buf = GDKmalloc(BLOCK + 1);
141 : char challenge[13];
142 :
143 4723 : stream *fdin = ((struct challengedata *) data)->in;
144 4723 : stream *fdout = ((struct challengedata *) data)->out;
145 : bstream *bs;
146 : ssize_t len = 0;
147 : protocol_version protocol = PROTOCOL_9;
148 : size_t buflen = BLOCK;
149 :
150 4723 : MT_thread_setworking("challenging client");
151 : #ifdef _MSC_VER
152 : srand((unsigned int) GDKusec());
153 : #endif
154 4723 : memcpy(challenge, ((struct challengedata *) data)->challenge, sizeof(challenge));
155 4723 : GDKfree(data);
156 4723 : if (buf == NULL) {
157 0 : TRC_ERROR(MAL_SERVER, MAL_MALLOC_FAIL "\n");
158 0 : close_stream(fdin);
159 0 : close_stream(fdout);
160 0 : return;
161 : }
162 :
163 : // send the challenge over the block stream
164 4723 : mnstr_printf(fdout, "%s:mserver:9:%s:%s:%s:",
165 : challenge,
166 : mcrypt_getHashAlgorithms(),
167 : #ifdef WORDS_BIGENDIAN
168 : "BIG",
169 : #else
170 : "LIT",
171 : #endif
172 : MONETDB5_PASSWDHASH
173 : );
174 4723 : mnstr_flush(fdout, MNSTR_FLUSH_DATA);
175 : /* get response */
176 4723 : if ((len = mnstr_read_block(fdin, buf, 1, BLOCK)) < 0) {
177 : /* the client must have gone away, so no reason to write anything */
178 0 : close_stream(fdin);
179 0 : close_stream(fdout);
180 0 : GDKfree(buf);
181 0 : return;
182 : }
183 4723 : buf[len] = 0;
184 :
185 4723 : if (strstr(buf, "PROT10")) {
186 : char *errmsg = NULL;
187 : char *buflenstrend, *buflenstr = strstr(buf, "PROT10");
188 : compression_method comp;
189 : protocol = PROTOCOL_10;
190 0 : if ((buflenstr = strchr(buflenstr, ':')) == NULL ||
191 0 : (buflenstr = strchr(buflenstr + 1, ':')) == NULL) {
192 0 : mnstr_printf(fdout, "!buffer size needs to be set and bigger than %d\n", BLOCK);
193 0 : close_stream(fdin);
194 0 : close_stream(fdout);
195 0 : GDKfree(buf);
196 0 : return;
197 : }
198 0 : buflenstr++; /* position after ':' */
199 0 : buflenstrend = strchr(buflenstr, ':');
200 :
201 0 : if (buflenstrend) buflenstrend[0] = '\0';
202 0 : buflen = atol(buflenstr);
203 0 : if (buflenstrend) buflenstrend[0] = ':';
204 :
205 0 : if (buflen < BLOCK) {
206 0 : mnstr_printf(fdout, "!buffer size needs to be set and bigger than %d\n", BLOCK);
207 0 : close_stream(fdin);
208 0 : close_stream(fdout);
209 0 : GDKfree(buf);
210 0 : return;
211 : }
212 :
213 : comp = COMPRESSION_NONE;
214 0 : if (strstr(buf, "COMPRESSION_SNAPPY")) {
215 : #ifdef HAVE_SNAPPY
216 : comp = COMPRESSION_SNAPPY;
217 : #else
218 : errmsg = "!server does not support Snappy compression.\n";
219 : #endif
220 0 : } else if (strstr(buf, "COMPRESSION_LZ4")) {
221 : #ifdef HAVE_LIBLZ4
222 : comp = COMPRESSION_LZ4;
223 : #else
224 : errmsg = "!server does not support LZ4 compression.\n";
225 : #endif
226 0 : } else if (strstr(buf, "COMPRESSION_NONE")) {
227 : comp = COMPRESSION_NONE;
228 : } else {
229 : errmsg = "!no compression type specified.\n";
230 : }
231 :
232 : if (errmsg) {
233 : // incorrect compression type specified
234 0 : mnstr_printf(fdout, "%s", errmsg);
235 0 : close_stream(fdin);
236 0 : close_stream(fdout);
237 0 : GDKfree(buf);
238 0 : return;
239 : }
240 :
241 : {
242 : // convert the block_stream into a block_stream2
243 : stream *from, *to;
244 0 : from = block_stream2(fdin, buflen, comp);
245 0 : to = block_stream2(fdout, buflen, comp);
246 0 : if (from == NULL || to == NULL) {
247 0 : GDKsyserror("SERVERlisten:"MAL_MALLOC_FAIL);
248 0 : close_stream(fdin);
249 0 : close_stream(fdout);
250 0 : GDKfree(buf);
251 : return;
252 : }
253 : fdin = from;
254 : fdout = to;
255 : }
256 : }
257 :
258 4723 : bs = bstream_create(fdin, 128 * BLOCK);
259 :
260 4723 : if (bs == NULL){
261 0 : mnstr_printf(fdout, "!allocation failure in the server\n");
262 0 : close_stream(fdin);
263 0 : close_stream(fdout);
264 0 : GDKfree(buf);
265 0 : GDKsyserror("SERVERlisten:"MAL_MALLOC_FAIL);
266 : return;
267 : }
268 4723 : bs->eof = true;
269 4723 : MSscheduleClient(buf, challenge, bs, fdout, protocol, buflen);
270 : }
271 :
272 : static ATOMIC_TYPE nlistener = ATOMIC_VAR_INIT(0); /* nr of listeners */
273 : static ATOMIC_TYPE serveractive = ATOMIC_VAR_INIT(0);
274 : static ATOMIC_TYPE serverexiting = ATOMIC_VAR_INIT(0); /* listeners should exit */
275 : static ATOMIC_TYPE threadno = ATOMIC_VAR_INIT(0); /* thread sequence no */
276 :
277 : static void
278 250 : SERVERlistenThread(SOCKET *Sock)
279 : {
280 : char *msg = NULL;
281 : int retval;
282 250 : SOCKET socks[3] = {Sock[0], Sock[1], Sock[2]};
283 : struct challengedata *data;
284 : MT_Id tid;
285 : stream *s;
286 : int i;
287 :
288 250 : GDKfree(Sock);
289 :
290 250 : (void) ATOMIC_INC(&nlistener);
291 :
292 : do {
293 : SOCKET msgsock = INVALID_SOCKET;
294 : #ifdef HAVE_POLL
295 : struct pollfd pfd[3];
296 : nfds_t npfd;
297 : npfd = 0;
298 699236 : for (i = 0; i < 3; i++) {
299 524427 : if (socks[i] != INVALID_SOCKET)
300 349618 : pfd[npfd++] = (struct pollfd) {.fd = socks[i],
301 : .events = POLLIN};
302 : }
303 : /* Wait up to 0.1 seconds (0.01 if testing) */
304 175290 : retval = poll(pfd, npfd, GDKdebug & FORCEMITOMASK ? 10 : 100);
305 174808 : if (retval == -1 && errno == EINTR)
306 169836 : continue;
307 : #else
308 : fd_set fds;
309 : FD_ZERO(&fds);
310 : /* temporarily use msgsock to record the highest socket fd */
311 : for (i = 0; i < 3; i++) {
312 : if (socks[i] != INVALID_SOCKET) {
313 : FD_SET(socks[i], &fds);
314 : if (msgsock == INVALID_SOCKET || socks[i] > msgsock)
315 : msgsock = socks[i];
316 : }
317 : }
318 : /* Wait up to 0.1 seconds (0.01 if testing) */
319 : struct timeval tv = (struct timeval) {
320 : .tv_usec = GDKdebug & FORCEMITOMASK ? 10000 : 100000,
321 : };
322 :
323 : retval = select((int) msgsock + 1, &fds, NULL, NULL, &tv);
324 : msgsock = INVALID_SOCKET;
325 : #endif
326 174808 : if (ATOMIC_GET(&serverexiting) || GDKexiting())
327 : break;
328 174559 : if (retval == 0) {
329 : /* nothing interesting has happened */
330 169836 : continue;
331 : }
332 4723 : if (retval == SOCKET_ERROR) {
333 0 : if (
334 : #ifdef _MSC_VER
335 : WSAGetLastError() != WSAEINTR
336 : #else
337 0 : errno != EINTR
338 : #endif
339 : ) {
340 : msg = "select failed";
341 0 : goto error;
342 : }
343 0 : continue;
344 : }
345 : bool isusock = false;
346 : #ifdef HAVE_POLL
347 7854 : for (i = 0; i < (int) npfd; i++) {
348 7854 : if (pfd[i].revents & POLLIN) {
349 4723 : msgsock = pfd[i].fd;
350 4723 : isusock = msgsock == socks[2];
351 4723 : break;
352 : }
353 : }
354 : #else
355 : for (i = 0; i < 3; i++) {
356 : if (socks[i] != INVALID_SOCKET && FD_ISSET(socks[i], &fds)) {
357 : msgsock = socks[i];
358 : isusock = i == 2;
359 : break;
360 : }
361 : }
362 : #endif
363 4723 : if (msgsock == INVALID_SOCKET)
364 0 : continue;
365 :
366 4723 : if ((msgsock = accept4(msgsock, NULL, NULL, SOCK_CLOEXEC)) == INVALID_SOCKET) {
367 0 : if (
368 : #ifdef _MSC_VER
369 : WSAGetLastError() != WSAEINTR
370 : #else
371 0 : errno != EINTR
372 : #endif
373 0 : || !ATOMIC_GET(&serveractive)) {
374 : msg = "accept failed";
375 0 : goto error;
376 : }
377 0 : continue;
378 : }
379 : #if defined(HAVE_FCNTL) && (!defined(SOCK_CLOEXEC) || !defined(HAVE_ACCEPT4))
380 : (void) fcntl(msgsock, F_SETFD, FD_CLOEXEC);
381 : #endif
382 : #ifdef HAVE_SYS_UN_H
383 4723 : if (isusock) {
384 : struct msghdr msgh;
385 : struct iovec iov;
386 : char buf[1];
387 : int rv;
388 : char ccmsg[CMSG_SPACE(sizeof(int))];
389 : struct cmsghdr *cmsg;
390 :
391 : /* BEWARE: unix domain sockets have a slightly different
392 : * behaviour initialy than normal sockets, because we can
393 : * send filedescriptors or credentials with them. To do so,
394 : * we need to use sendmsg/recvmsg, which operates on a bare
395 : * socket. Unfortunately we *have* to send something, so it
396 : * is one byte that can optionally carry the ancillary data.
397 : * This byte is at this moment defined to contain a character:
398 : * '0' - there is no ancillary data
399 : * '1' - ancillary data for passing a file descriptor
400 : * The future may introduce a state for passing credentials.
401 : * Any unknown character must be interpreted as some unknown
402 : * action, and hence not supported by the server. */
403 :
404 3131 : iov.iov_base = buf;
405 3131 : iov.iov_len = 1;
406 :
407 3131 : msgh.msg_name = 0;
408 3131 : msgh.msg_namelen = 0;
409 3131 : msgh.msg_iov = &iov;
410 3131 : msgh.msg_iovlen = 1;
411 3131 : msgh.msg_flags = 0;
412 3131 : msgh.msg_control = ccmsg;
413 3131 : msgh.msg_controllen = sizeof(ccmsg);
414 :
415 3131 : rv = recvmsg(msgsock, &msgh, 0);
416 3131 : if (rv == -1) {
417 0 : closesocket(msgsock);
418 0 : continue;
419 : }
420 :
421 3131 : switch (buf[0]) {
422 : case '0':
423 : /* nothing special, nothing to do */
424 : break;
425 0 : case '1':
426 : { int *c_d;
427 : /* filedescriptor, put it in place of msgsock */
428 0 : cmsg = CMSG_FIRSTHDR(&msgh);
429 0 : (void) shutdown(msgsock, SHUT_WR);
430 0 : closesocket(msgsock);
431 0 : if (!cmsg || cmsg->cmsg_type != SCM_RIGHTS) {
432 0 : TRC_CRITICAL(MAL_SERVER, "Expected file descriptor, but received something else\n");
433 0 : continue;
434 : }
435 : /* HACK to avoid
436 : * "dereferencing type-punned pointer will break strict-aliasing rules"
437 : * (with gcc 4.5.1 on Fedora 14)
438 : */
439 : c_d = (int*)CMSG_DATA(cmsg);
440 0 : msgsock = *c_d;
441 : }
442 0 : break;
443 0 : default:
444 : /* some unknown state */
445 0 : closesocket(msgsock);
446 0 : TRC_CRITICAL(MAL_SERVER, "Unknown command type in first byte\n");
447 0 : continue;
448 : }
449 : }
450 : #endif
451 :
452 4723 : data = GDKmalloc(sizeof(*data));
453 4723 : if( data == NULL){
454 0 : closesocket(msgsock);
455 0 : TRC_ERROR(MAL_SERVER, MAL_MALLOC_FAIL "\n");
456 0 : continue;
457 : }
458 4723 : data->in = socket_rstream(msgsock, "Server read");
459 4723 : if (data->in == NULL) {
460 0 : stream_alloc_fail:
461 0 : mnstr_destroy(data->in);
462 0 : mnstr_destroy(data->out);
463 0 : GDKfree(data);
464 0 : closesocket(msgsock);
465 0 : TRC_ERROR(MAL_SERVER, "Cannot allocate stream: %s\n", mnstr_peek_error(NULL));
466 0 : continue;
467 : }
468 4723 : data->out = socket_wstream(msgsock, "Server write");
469 4723 : if (data->out == NULL) {
470 0 : goto stream_alloc_fail;
471 : }
472 4723 : s = block_stream(data->in);
473 4723 : if (s == NULL) {
474 0 : goto stream_alloc_fail;
475 : }
476 4723 : data->in = s;
477 4723 : s = block_stream(data->out);
478 4723 : if (s == NULL) {
479 0 : goto stream_alloc_fail;
480 : }
481 4723 : data->out = s;
482 : char name[MT_NAME_LEN];
483 4723 : snprintf(name, sizeof(name), "client%d",
484 4723 : (int) ATOMIC_INC(&threadno));
485 :
486 : /* generate the challenge string */
487 4723 : generateChallenge(data->challenge, 8, 12);
488 :
489 4723 : if ((tid = THRcreate(doChallenge, data, MT_THR_DETACHED, name)) == 0) {
490 0 : mnstr_destroy(data->in);
491 0 : mnstr_destroy(data->out);
492 0 : GDKfree(data);
493 0 : closesocket(msgsock);
494 0 : TRC_ERROR(MAL_SERVER, "Cannot fork new client thread\n");
495 0 : continue;
496 : }
497 174559 : } while (!ATOMIC_GET(&serverexiting) && !GDKexiting());
498 0 : error:
499 249 : (void) ATOMIC_DEC(&nlistener);
500 996 : for (i = 0; i < 3; i++)
501 747 : if (socks[i] != INVALID_SOCKET)
502 498 : closesocket(socks[i]);
503 249 : if (msg)
504 0 : TRC_CRITICAL(MAL_SERVER, "Terminating listener: %s\n", msg);
505 249 : return;
506 : }
507 :
508 : #ifdef _MSC_VER
509 : #define HOSTLEN int
510 : #else
511 : #define HOSTLEN size_t
512 : #endif
513 :
514 : static char *
515 250 : start_listen(SOCKET *sockp, int *portp, const char *listenaddr,
516 : char *host, size_t hostlen, int maxusers)
517 : {
518 250 : struct addrinfo *result = NULL;
519 250 : struct addrinfo hints = {
520 : .ai_family = AF_INET6,
521 : .ai_flags = AI_PASSIVE | AI_NUMERICSERV,
522 : .ai_socktype = SOCK_STREAM,
523 : .ai_protocol = IPPROTO_TCP,
524 : };
525 : int e = 0;
526 250 : int ipv6_vs6only = -1;
527 : SOCKET sock = INVALID_SOCKET;
528 : const char *err;
529 : int nsock = 0;
530 250 : sockp[0] = sockp[1] = INVALID_SOCKET;
531 250 : host[0] = 0;
532 250 : if (listenaddr == NULL || strcmp(listenaddr, "localhost") == 0) {
533 : hints.ai_family = AF_INET6;
534 0 : hints.ai_flags |= AI_NUMERICHOST;
535 0 : ipv6_vs6only = 0;
536 : listenaddr = "::1";
537 0 : strcpy_len(host, "localhost", hostlen);
538 250 : } else if (strcmp(listenaddr, "all") == 0) {
539 : hints.ai_family = AF_INET6;
540 250 : ipv6_vs6only = 0;
541 : listenaddr = NULL;
542 0 : } else if (strcmp(listenaddr, "::") == 0) {
543 : hints.ai_family = AF_INET6;
544 0 : ipv6_vs6only = 1;
545 : listenaddr = NULL;
546 0 : } else if (strcmp(listenaddr, "0.0.0.0") == 0) {
547 0 : hints.ai_family = AF_INET;
548 0 : hints.ai_flags |= AI_NUMERICHOST;
549 : listenaddr = NULL;
550 0 : } else if (strcmp(listenaddr, "::1") == 0) {
551 : hints.ai_family = AF_INET6;
552 0 : hints.ai_flags |= AI_NUMERICHOST;
553 0 : ipv6_vs6only = 1;
554 0 : strcpy_len(host, "localhost", hostlen);
555 0 : } else if (strcmp(listenaddr, "127.0.0.1") == 0) {
556 0 : hints.ai_family = AF_INET;
557 0 : hints.ai_flags |= AI_NUMERICHOST;
558 0 : strcpy_len(host, "localhost", hostlen);
559 : } else {
560 : hints.ai_family = AF_INET6;
561 0 : ipv6_vs6only = 0;
562 : }
563 : char sport[8]; /* max "65535" */
564 250 : snprintf(sport, sizeof(sport), "%d", *portp);
565 : for (;;) { /* max twice */
566 500 : int check = getaddrinfo(listenaddr, sport, &hints, &result);
567 500 : if (check != 0) {
568 : #ifdef _MSC_VER
569 : err = wsaerror(WSAGetLastError());
570 : #else
571 0 : err = gai_strerror(check);
572 : #endif
573 0 : throw(IO, "mal_mapi.listen", OPERATION_FAILED ": cannot get address "
574 : "information for %s and port %s: %s",
575 0 : listenaddr ? listenaddr : hints.ai_family == AF_INET6 ? "::" : "0.0.0.0",
576 : sport, err);
577 : }
578 :
579 750 : for (struct addrinfo *rp = result; rp; rp = rp->ai_next) {
580 500 : sock = socket(rp->ai_family, rp->ai_socktype
581 : #ifdef SOCK_CLOEXEC
582 : | SOCK_CLOEXEC
583 : #endif
584 : , rp->ai_protocol);
585 500 : if (sock == INVALID_SOCKET) {
586 : #ifdef _MSC_VER
587 : e = WSAGetLastError();
588 : #else
589 0 : e = errno;
590 : #endif
591 250 : continue;
592 : }
593 : #if defined(HAVE_FCNTL) && !defined(SOCK_CLOEXEC)
594 : (void) fcntl(sock, F_SETFD, FD_CLOEXEC);
595 : #endif
596 500 : if (ipv6_vs6only >= 0)
597 250 : if (setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY,
598 : (const char *) &ipv6_vs6only, (SOCKLEN) sizeof(int)) == -1)
599 0 : perror("setsockopt IPV6_V6ONLY");
600 :
601 : /* do not reuse addresses for ephemeral (autosense) ports */
602 500 : if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
603 500 : (const char *) &(int){1},
604 : (SOCKLEN) sizeof(int)) == SOCKET_ERROR) {
605 : #ifdef _MSC_VER
606 : e = WSAGetLastError();
607 : #else
608 0 : e = errno;
609 : #endif
610 0 : closesocket(sock);
611 : sock = INVALID_SOCKET;
612 0 : continue;
613 : }
614 500 : if (bind(sock, rp->ai_addr, (SOCKLEN) rp->ai_addrlen) == SOCKET_ERROR) {
615 : #ifdef _MSC_VER
616 : e = WSAGetLastError();
617 : #else
618 250 : e = errno;
619 : #endif
620 250 : closesocket(sock);
621 : sock = INVALID_SOCKET;
622 250 : continue;
623 : }
624 250 : if (listen(sock, maxusers) == SOCKET_ERROR) {
625 : #ifdef _MSC_VER
626 : e = WSAGetLastError();
627 : #else
628 0 : e = errno;
629 : #endif
630 0 : closesocket(sock);
631 : sock = INVALID_SOCKET;
632 0 : continue;
633 : }
634 : struct sockaddr_storage addr;
635 250 : SOCKLEN addrlen = (SOCKLEN) sizeof(addr);
636 250 : if (getsockname(sock, (struct sockaddr *) &addr, &addrlen) == SOCKET_ERROR) {
637 : #ifdef _MSC_VER
638 : e = WSAGetLastError();
639 : #else
640 0 : e = errno;
641 : #endif
642 0 : closesocket(sock);
643 : sock = INVALID_SOCKET;
644 0 : continue;
645 : }
646 250 : if (getnameinfo((struct sockaddr *) &addr, addrlen,
647 : NULL, (SOCKLEN) 0,
648 : sport, (SOCKLEN) sizeof(sport),
649 : NI_NUMERICSERV) == 0)
650 250 : *portp = (int) strtol(sport, NULL, 10);
651 250 : sockp[nsock++] = sock;
652 250 : break;
653 : }
654 500 : freeaddrinfo(result);
655 500 : if (ipv6_vs6only == 0) {
656 250 : ipv6_vs6only = -1;
657 250 : hints.ai_family = AF_INET;
658 250 : if (listenaddr && strcmp(listenaddr, "::1") == 0)
659 : listenaddr = "127.0.0.1";
660 : } else
661 : break;
662 : }
663 :
664 250 : if (nsock == 0) {
665 : #ifdef _MSC_VER
666 : err = wsaerror(e);
667 : #else
668 0 : err = GDKstrerror(e, (char[128]){0}, 128);
669 : #endif
670 0 : throw(IO, "mal_mapi.listen", OPERATION_FAILED ": bind to "
671 : "stream socket on address %s and port %s failed: %s",
672 0 : listenaddr ? listenaddr : hints.ai_family == AF_INET6 ? "::" : "0.0.0.0",
673 : sport, err);
674 : }
675 250 : if (host[0] == 0)
676 250 : gethostname(host, (HOSTLEN) hostlen);
677 : return NULL;
678 : }
679 :
680 : static str
681 250 : SERVERlisten(int port, const char *usockfile, int maxusers)
682 : {
683 : SOCKET socks[3];
684 : SOCKET *psock;
685 : #ifdef HAVE_SYS_UN_H
686 : struct sockaddr_un userver;
687 : #endif
688 : SOCKLEN length = 0;
689 : MT_Id pid;
690 : str buf;
691 250 : char host[128] = "";
692 :
693 : /* early way out, we do not want to listen on any port when running in embedded mode */
694 250 : if (GDKgetenv_istrue("mapi_disable")) {
695 : return MAL_SUCCEED;
696 : }
697 :
698 250 : const char *listenaddr = port < 0 ? "none" : GDKgetenv("mapi_listenaddr");
699 :
700 250 : if (strNil(usockfile)) {
701 : usockfile = NULL;
702 : #ifndef HAVE_SYS_UN_H
703 : } else {
704 : throw(IO, "mal_mapi.listen", OPERATION_FAILED ": UNIX domain sockets are not supported");
705 : #endif
706 : }
707 250 : maxusers = (maxusers ? maxusers : SERVERMAXUSERS);
708 :
709 250 : if (listenaddr && strcmp(listenaddr, "none") == 0 && usockfile == NULL) {
710 0 : throw(ILLARG, "mal_mapi.listen", OPERATION_FAILED ": no port or socket file specified");
711 : }
712 :
713 250 : if (port > 65535) {
714 0 : throw(ILLARG, "mal_mapi.listen", OPERATION_FAILED ": port number should be between 0 and 65535");
715 : }
716 :
717 250 : socks[0] = socks[1] = socks[2] = INVALID_SOCKET;
718 :
719 250 : if (listenaddr == NULL || strcmp(listenaddr, "none") != 0) {
720 250 : char *msg = start_listen(socks, &port, listenaddr, host, sizeof(host), maxusers);
721 250 : if (msg != MAL_SUCCEED) {
722 : return msg;
723 : }
724 : }
725 :
726 : #ifdef HAVE_SYS_UN_H
727 250 : if (usockfile) {
728 : /* prevent silent truncation, sun_path is typically around 108
729 : * chars long :/ */
730 250 : size_t ulen = strlen(usockfile);
731 250 : if (ulen >= sizeof(userver.sun_path)) {
732 0 : if (socks[0] != INVALID_SOCKET)
733 0 : closesocket(socks[0]);
734 0 : if (socks[1] != INVALID_SOCKET)
735 0 : closesocket(socks[1]);
736 0 : throw(MAL, "mal_mapi.listen",
737 : OPERATION_FAILED ": UNIX socket path too long: %s",
738 : usockfile);
739 : }
740 :
741 250 : socks[2] = socket(AF_UNIX, SOCK_STREAM
742 : #ifdef SOCK_CLOEXEC
743 : | SOCK_CLOEXEC
744 : #endif
745 : , 0);
746 250 : if (socks[2] == INVALID_SOCKET) {
747 : #ifdef _MSC_VER
748 : const char *err = wsaerror(WSAGetLastError());
749 : #else
750 0 : const char *err = GDKstrerror(errno, (char[128]){0}, 128);
751 : #endif
752 0 : if (socks[0] != INVALID_SOCKET)
753 0 : closesocket(socks[0]);
754 0 : if (socks[1] != INVALID_SOCKET)
755 0 : closesocket(socks[1]);
756 0 : throw(IO, "mal_mapi.listen",
757 : OPERATION_FAILED ": creation of UNIX socket failed: %s", err);
758 : }
759 : #if !defined(SOCK_CLOEXEC) && defined(HAVE_FCNTL)
760 : (void) fcntl(socks[2], F_SETFD, FD_CLOEXEC);
761 : #endif
762 :
763 250 : userver.sun_family = AF_UNIX;
764 250 : memcpy(userver.sun_path, usockfile, ulen + 1);
765 : length = (SOCKLEN) sizeof(userver);
766 250 : if (remove(usockfile) == -1 && errno != ENOENT) {
767 0 : char *e = createException(IO, "mal_mapi.listen", OPERATION_FAILED ": remove UNIX socket file: %s",
768 0 : GDKstrerror(errno, (char[128]){0}, 128));
769 0 : if (socks[0] != INVALID_SOCKET)
770 0 : closesocket(socks[0]);
771 0 : if (socks[1] != INVALID_SOCKET)
772 0 : closesocket(socks[1]);
773 0 : closesocket(socks[2]);
774 : return e;
775 : }
776 250 : if (bind(socks[2], (struct sockaddr *) &userver, length) == SOCKET_ERROR) {
777 : #ifdef _MSC_VER
778 : const char *err = wsaerror(WSAGetLastError());
779 : #else
780 0 : const char *err = GDKstrerror(errno, (char[128]){0}, 128);
781 : #endif
782 0 : if (socks[0] != INVALID_SOCKET)
783 0 : closesocket(socks[0]);
784 0 : if (socks[1] != INVALID_SOCKET)
785 0 : closesocket(socks[1]);
786 0 : closesocket(socks[2]);
787 0 : (void) remove(usockfile);
788 0 : throw(IO, "mal_mapi.listen",
789 : OPERATION_FAILED
790 : ": binding to UNIX socket file %s failed: %s",
791 : usockfile, err);
792 : }
793 250 : if (listen(socks[2], maxusers) == SOCKET_ERROR) {
794 : #ifdef _MSC_VER
795 : const char *err = wsaerror(WSAGetLastError());
796 : #else
797 0 : const char *err = GDKstrerror(errno, (char[128]){0}, 128);
798 : #endif
799 0 : if (socks[0] != INVALID_SOCKET)
800 0 : closesocket(socks[0]);
801 0 : if (socks[1] != INVALID_SOCKET)
802 0 : closesocket(socks[1]);
803 0 : closesocket(socks[2]);
804 0 : (void) remove(usockfile);
805 0 : throw(IO, "mal_mapi.listen",
806 : OPERATION_FAILED
807 : ": setting UNIX socket file %s to listen failed: %s",
808 : usockfile, err);
809 : }
810 : }
811 : #endif
812 :
813 : /* seed the randomiser such that our challenges aren't
814 : * predictable... */
815 250 : srand((unsigned int) GDKusec());
816 :
817 250 : psock = GDKmalloc(sizeof(socks));
818 250 : if (psock == NULL) {
819 0 : for (int i = 0; i < 3; i++) {
820 0 : if (socks[i] != INVALID_SOCKET)
821 0 : closesocket(socks[i]);
822 : }
823 0 : throw(MAL,"mal_mapi.listen", SQLSTATE(HY013) MAL_MALLOC_FAIL);
824 : }
825 250 : memcpy(psock, socks, sizeof(socks));
826 250 : if (MT_create_thread(&pid, (void (*)(void *)) SERVERlistenThread, psock,
827 : MT_THR_DETACHED, "listenThread") != 0) {
828 0 : for (int i = 0; i < 3; i++) {
829 0 : if (socks[i] != INVALID_SOCKET)
830 0 : closesocket(socks[i]);
831 : }
832 0 : GDKfree(psock);
833 0 : throw(MAL, "mal_mapi.listen", OPERATION_FAILED ": starting thread failed");
834 : }
835 :
836 250 : TRC_DEBUG(MAL_SERVER, "Ready to accept connections on: %s:%d\n", host, port);
837 :
838 250 : if (socks[0] != INVALID_SOCKET || socks[1] != INVALID_SOCKET) {
839 250 : if (!GDKinmemory() && (buf = msab_marchConnection(host, port)) != NULL)
840 0 : free(buf);
841 : else
842 : /* announce that we're now reachable */
843 250 : printf("# Listening for connection requests on "
844 : "mapi:monetdb://%s:%i/\n", host, port);
845 : }
846 : #ifdef HAVE_SYS_UN_H
847 250 : if (socks[2] != INVALID_SOCKET) {
848 250 : if (!GDKinmemory() && (buf = msab_marchConnection(usockfile, 0)) != NULL)
849 0 : free(buf);
850 : else
851 : /* announce that we're now reachable */
852 250 : printf("# Listening for UNIX domain connection requests on "
853 : "mapi:monetdb://%s\n", usockfile);
854 : }
855 : #endif
856 :
857 : return MAL_SUCCEED;
858 : }
859 :
860 : /*
861 : * @- Wrappers
862 : * The MonetDB Version 5 wrappers are collected here
863 : * The latest port known to gain access is stored
864 : * in the database, so that others can more easily
865 : * be notified.
866 : */
867 : str
868 250 : SERVERlisten_default(int *ret)
869 : {
870 : int port = MAPI_PORT;
871 250 : const char* p = GDKgetenv("mapi_port");
872 :
873 : (void) ret;
874 250 : if (p)
875 250 : port = (int) strtol(p, NULL, 10);
876 250 : p = GDKgetenv("mapi_usock");
877 250 : return SERVERlisten(port, p, SERVERMAXUSERS);
878 : }
879 :
880 : str
881 0 : SERVERlisten_usock(int *ret, str *usock)
882 : {
883 : (void) ret;
884 0 : return SERVERlisten(-1, usock ? *usock : NULL, SERVERMAXUSERS);
885 : }
886 :
887 : str
888 0 : SERVERlisten_port(int *ret, int *pid)
889 : {
890 : (void) ret;
891 0 : return SERVERlisten(*pid, NULL, SERVERMAXUSERS);
892 : }
893 : /*
894 : * The internet connection listener may be terminated from the server console,
895 : * or temporarily suspended to enable system maintenance.
896 : * It is advisable to trace the interactions of clients on the server
897 : * side. At least as far as it concerns requests received.
898 : * The kernel supports this 'spying' behavior with a file descriptor
899 : * field in the client record.
900 : */
901 :
902 : str
903 0 : SERVERstop(void *ret)
904 : {
905 0 : TRC_INFO(MAL_SERVER, "Server stop\n");
906 0 : ATOMIC_SET(&serverexiting, 1);
907 : /* wait until they all exited, but skip the wait if the whole
908 : * system is going down */
909 0 : while (ATOMIC_GET(&nlistener) > 0 && !GDKexiting())
910 0 : MT_sleep_ms(100);
911 : (void) ret; /* fool compiler */
912 0 : return MAL_SUCCEED;
913 : }
914 :
915 :
916 : str
917 0 : SERVERsuspend(void *res)
918 : {
919 : (void) res;
920 0 : ATOMIC_SET(&serveractive, 0);
921 0 : return MAL_SUCCEED;
922 : }
923 :
924 : str
925 0 : SERVERresume(void *res)
926 : {
927 0 : ATOMIC_SET(&serveractive, 1);
928 : (void) res;
929 0 : return MAL_SUCCEED;
930 : }
931 :
932 : str
933 0 : SERVERclient(void *res, const Stream *In, const Stream *Out)
934 : {
935 : struct challengedata *data;
936 : MT_Id tid;
937 :
938 : (void) res;
939 : /* in embedded mode we allow just one client */
940 0 : data = GDKmalloc(sizeof(*data));
941 0 : if( data == NULL)
942 0 : throw(MAL, "mapi.SERVERclient", SQLSTATE(HY013) MAL_MALLOC_FAIL);
943 0 : data->in = block_stream(*In);
944 0 : data->out = block_stream(*Out);
945 0 : if (data->in == NULL || data->out == NULL) {
946 0 : mnstr_destroy(data->in);
947 0 : mnstr_destroy(data->out);
948 0 : GDKfree(data);
949 0 : throw(MAL, "mapi.SERVERclient", SQLSTATE(HY013) MAL_MALLOC_FAIL);
950 : }
951 : char name[MT_NAME_LEN];
952 0 : snprintf(name, sizeof(name), "client%d",
953 0 : (int) ATOMIC_INC(&threadno));
954 :
955 : /* generate the challenge string */
956 0 : generateChallenge(data->challenge, 8, 12);
957 :
958 0 : if ((tid = THRcreate(doChallenge, data, MT_THR_DETACHED, name)) == 0) {
959 0 : mnstr_destroy(data->in);
960 0 : mnstr_destroy(data->out);
961 0 : GDKfree(data);
962 0 : throw(MAL, "mapi.SERVERclient", "cannot fork new client thread");
963 : }
964 : return MAL_SUCCEED;
965 : }
966 :
967 : /*
968 : * @+ Remote Processing
969 : * The remainder of the file contains the wrappers around
970 : * the Mapi library used by application programmers.
971 : * Details on the functions can be found there.
972 : *
973 : * Sessions have a lifetime different from dynamic scopes.
974 : * This means the user should use a session identifier
975 : * to select the correct handle.
976 : * For the time being we use the index in the global
977 : * session table. The client pointer is retained to
978 : * perform access control.
979 : *
980 : * We use a single result set handle. All data should be
981 : * consumed before continueing.
982 : *
983 : * A few extra routines should be defined to
984 : * dump and inspect the sessions table.
985 : *
986 : * The remote site may return a single error
987 : * with a series of error lines. These contain
988 : * then a starting !. They are all stripped here.
989 : */
990 : #define catchErrors(fcn) \
991 : do { \
992 : int rn = mapi_error(mid); \
993 : if ((rn == -4 && hdl && mapi_result_error(hdl)) || rn) { \
994 : const char *err, *e; \
995 : str newerr; \
996 : str ret; \
997 : size_t l; \
998 : char *f; \
999 : \
1000 : if (hdl && mapi_result_error(hdl)) \
1001 : err = mapi_result_error(hdl); \
1002 : else \
1003 : err = mapi_result_error(SERVERsessions[i].hdl); \
1004 : \
1005 : if (err == NULL) \
1006 : err = "(no additional error message)"; \
1007 : \
1008 : l = 2 * strlen(err) + 8192; \
1009 : newerr = (str) GDKmalloc(l); \
1010 : if(newerr == NULL) { err = SQLSTATE(HY013) MAL_MALLOC_FAIL; break;} \
1011 : \
1012 : f = newerr; \
1013 : /* I think this code tries to deal with multiple errors, this \
1014 : * will fail this way if it does, since no ! is in the error \
1015 : * string, only newlines to separate them */ \
1016 : for (e = err; *e && l > 1; e++) { \
1017 : if (*e == '!' && *(e - 1) == '\n') { \
1018 : snprintf(f, l, "MALException:" fcn ":remote error:"); \
1019 : l -= strlen(f); \
1020 : while (*f) \
1021 : f++; \
1022 : } else { \
1023 : *f++ = *e; \
1024 : l--; \
1025 : } \
1026 : } \
1027 : \
1028 : *f = 0; \
1029 : ret = createException(MAL, fcn, \
1030 : OPERATION_FAILED ": remote error: %s", \
1031 : newerr); \
1032 : GDKfree(newerr); \
1033 : return ret; \
1034 : } \
1035 : } while (0)
1036 :
1037 : #define MAXSESSIONS 32
1038 : struct{
1039 : int key;
1040 : str dbalias; /* logical name of the session */
1041 : Client c;
1042 : Mapi mid; /* communication channel */
1043 : MapiHdl hdl; /* result set handle */
1044 : } SERVERsessions[MAXSESSIONS];
1045 :
1046 : static int sessionkey=0;
1047 :
1048 : /* #define MAPI_TEST*/
1049 :
1050 : static str
1051 5 : SERVERconnectAll(Client cntxt, int *key, str *host, int *port, str *username, str *password, str *lang)
1052 : {
1053 : Mapi mid;
1054 : int i;
1055 :
1056 5 : MT_lock_set(&mal_contextLock);
1057 5 : for(i=1; i< MAXSESSIONS; i++)
1058 5 : if( SERVERsessions[i].c ==0 ) break;
1059 :
1060 5 : if( i==MAXSESSIONS){
1061 0 : MT_lock_unset(&mal_contextLock);
1062 0 : throw(IO, "mapi.connect", OPERATION_FAILED ": too many sessions");
1063 : }
1064 5 : SERVERsessions[i].c= cntxt;
1065 5 : SERVERsessions[i].key= ++sessionkey;
1066 5 : MT_lock_unset(&mal_contextLock);
1067 :
1068 5 : mid = mapi_connect(*host, *port, *username, *password, *lang, NULL);
1069 :
1070 5 : if (mapi_error(mid)) {
1071 0 : const char *err = mapi_error_str(mid);
1072 : str ex;
1073 0 : if (err == NULL)
1074 : err = "(no reason given)";
1075 0 : if (err[0] == '!')
1076 0 : err = err + 1;
1077 0 : SERVERsessions[i].c = NULL;
1078 0 : ex = createException(IO, "mapi.connect", "Could not connect: %s", err);
1079 0 : mapi_destroy(mid);
1080 0 : return(ex);
1081 : }
1082 :
1083 : #ifdef MAPI_TEST
1084 : mnstr_printf(SERVERsessions[i].c->fdout,"Succeeded to establish session\n");
1085 : #endif
1086 5 : SERVERsessions[i].mid= mid;
1087 5 : *key = SERVERsessions[i].key;
1088 5 : return MAL_SUCCEED;
1089 : }
1090 :
1091 : str
1092 0 : SERVERdisconnectALL(int *key){
1093 : int i;
1094 :
1095 0 : MT_lock_set(&mal_contextLock);
1096 :
1097 0 : for(i=1; i< MAXSESSIONS; i++)
1098 0 : if( SERVERsessions[i].c != 0 ) {
1099 : #ifdef MAPI_TEST
1100 : mnstr_printf(SERVERsessions[i].c->fdout,"Close session %d\n",i);
1101 : #endif
1102 0 : SERVERsessions[i].c = 0;
1103 0 : if( SERVERsessions[i].dbalias)
1104 0 : GDKfree(SERVERsessions[i].dbalias);
1105 0 : SERVERsessions[i].dbalias = NULL;
1106 0 : *key = SERVERsessions[i].key;
1107 0 : mapi_disconnect(SERVERsessions[i].mid);
1108 : }
1109 :
1110 0 : MT_lock_unset(&mal_contextLock);
1111 :
1112 0 : return MAL_SUCCEED;
1113 : }
1114 :
1115 : str
1116 0 : SERVERdisconnectWithAlias(int *key, str *dbalias){
1117 : int i;
1118 :
1119 0 : MT_lock_set(&mal_contextLock);
1120 :
1121 0 : for(i=0; i<MAXSESSIONS; i++)
1122 0 : if( SERVERsessions[i].dbalias &&
1123 0 : strcmp(SERVERsessions[i].dbalias, *dbalias)==0){
1124 0 : SERVERsessions[i].c = 0;
1125 : if( SERVERsessions[i].dbalias)
1126 0 : GDKfree(SERVERsessions[i].dbalias);
1127 0 : SERVERsessions[i].dbalias = NULL;
1128 0 : *key = SERVERsessions[i].key;
1129 0 : mapi_disconnect(SERVERsessions[i].mid);
1130 0 : break;
1131 : }
1132 :
1133 0 : if( i==MAXSESSIONS){
1134 0 : MT_lock_unset(&mal_contextLock);
1135 0 : throw(IO, "mapi.disconnect", "Impossible to close session for db_alias: '%s'", *dbalias);
1136 : }
1137 :
1138 0 : MT_lock_unset(&mal_contextLock);
1139 0 : return MAL_SUCCEED;
1140 : }
1141 :
1142 : str
1143 1 : SERVERconnect(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci){
1144 1 : int *key =getArgReference_int(stk,pci,0);
1145 1 : str *host = getArgReference_str(stk,pci,1);
1146 1 : int *port = getArgReference_int(stk,pci,2);
1147 1 : str *username = getArgReference_str(stk,pci,3);
1148 1 : str *password= getArgReference_str(stk,pci,4);
1149 1 : str *lang = getArgReference_str(stk,pci,5);
1150 :
1151 : (void) mb;
1152 1 : return SERVERconnectAll(cntxt, key,host,port,username,password,lang);
1153 : }
1154 :
1155 :
1156 : str
1157 5 : SERVERreconnectAlias(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
1158 : {
1159 5 : int *key =getArgReference_int(stk,pci,0);
1160 5 : str *host = getArgReference_str(stk,pci,1);
1161 5 : int *port = getArgReference_int(stk,pci,2);
1162 5 : str *dbalias = getArgReference_str(stk,pci,3);
1163 5 : str *username = getArgReference_str(stk,pci,4);
1164 5 : str *password= getArgReference_str(stk,pci,5);
1165 5 : str *lang = getArgReference_str(stk,pci,6);
1166 : int i;
1167 : str msg=MAL_SUCCEED;
1168 :
1169 : (void) mb;
1170 :
1171 134 : for(i=0; i<MAXSESSIONS; i++)
1172 130 : if( SERVERsessions[i].key &&
1173 5 : SERVERsessions[i].dbalias &&
1174 1 : strcmp(SERVERsessions[i].dbalias, *dbalias)==0){
1175 1 : *key = SERVERsessions[i].key;
1176 1 : return msg;
1177 : }
1178 :
1179 4 : msg= SERVERconnectAll(cntxt, key, host, port, username, password, lang);
1180 4 : if( msg == MAL_SUCCEED)
1181 4 : msg = SERVERsetAlias(NULL, key, dbalias);
1182 : return msg;
1183 : }
1184 :
1185 : str
1186 0 : SERVERreconnectWithoutAlias(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) {
1187 0 : int *key =getArgReference_int(stk,pci,0);
1188 0 : str *host = getArgReference_str(stk,pci,1);
1189 0 : int *port = getArgReference_int(stk,pci,2);
1190 0 : str *username = getArgReference_str(stk,pci,3);
1191 0 : str *password= getArgReference_str(stk,pci,4);
1192 0 : str *lang = getArgReference_str(stk,pci,5);
1193 : int i;
1194 0 : str msg=MAL_SUCCEED, nme= "anonymous";
1195 :
1196 : (void) mb;
1197 :
1198 0 : for(i=0; i<MAXSESSIONS; i++)
1199 0 : if( SERVERsessions[i].key ){
1200 0 : *key = SERVERsessions[i].key;
1201 0 : return msg;
1202 : }
1203 :
1204 0 : msg= SERVERconnectAll(cntxt, key, host, port, username, password, lang);
1205 0 : if( msg == MAL_SUCCEED)
1206 0 : msg = SERVERsetAlias(NULL, key, &nme);
1207 : return msg;
1208 : }
1209 :
1210 : #define accessTest(val, fcn) \
1211 : do { \
1212 : for(i=0; i< MAXSESSIONS; i++) \
1213 : if( SERVERsessions[i].c && \
1214 : SERVERsessions[i].key== (val)) break; \
1215 : if( i== MAXSESSIONS) \
1216 : throw(MAL, "mapi." fcn, "Access violation," \
1217 : " could not find matching session descriptor"); \
1218 : mid= SERVERsessions[i].mid; \
1219 : (void) mid; /* silence compilers */ \
1220 : } while (0)
1221 :
1222 : str
1223 4 : SERVERsetAlias(void *ret, int *key, str *dbalias){
1224 : int i;
1225 : Mapi mid;
1226 8 : accessTest(*key, "setAlias");
1227 4 : SERVERsessions[i].dbalias= GDKstrdup(*dbalias);
1228 4 : if(SERVERsessions[i].dbalias == NULL)
1229 0 : throw(MAL, "mapi.set_alias", SQLSTATE(HY013) MAL_MALLOC_FAIL);
1230 : (void) ret;
1231 : return MAL_SUCCEED;
1232 : }
1233 :
1234 : str
1235 0 : SERVERlookup(int *ret, str *dbalias)
1236 : {
1237 : int i;
1238 0 : for(i=0; i< MAXSESSIONS; i++)
1239 0 : if( SERVERsessions[i].dbalias &&
1240 0 : strcmp(SERVERsessions[i].dbalias, *dbalias)==0){
1241 0 : *ret= SERVERsessions[i].key;
1242 0 : return MAL_SUCCEED;
1243 : }
1244 0 : throw(MAL, "mapi.lookup", "Could not find database connection");
1245 : }
1246 :
1247 : str
1248 0 : SERVERtrace(void *ret, int *key, int *flag){
1249 : (void )ret;
1250 0 : mapi_trace(SERVERsessions[*key].mid,(bool)*flag);
1251 0 : return MAL_SUCCEED;
1252 : }
1253 :
1254 : str
1255 5 : SERVERdisconnect(void *ret, int *key){
1256 : int i;
1257 : Mapi mid;
1258 : (void) ret;
1259 10 : accessTest(*key, "disconnect");
1260 5 : mapi_disconnect(mid);
1261 5 : if( SERVERsessions[i].dbalias)
1262 4 : GDKfree(SERVERsessions[i].dbalias);
1263 5 : SERVERsessions[i].c= 0;
1264 5 : SERVERsessions[i].dbalias= 0;
1265 5 : return MAL_SUCCEED;
1266 : }
1267 :
1268 : str
1269 0 : SERVERdestroy(void *ret, int *key){
1270 : int i;
1271 : Mapi mid;
1272 : (void) ret;
1273 0 : accessTest(*key, "destroy");
1274 0 : mapi_destroy(mid);
1275 0 : SERVERsessions[i].c= 0;
1276 0 : if( SERVERsessions[i].dbalias)
1277 0 : GDKfree(SERVERsessions[i].dbalias);
1278 0 : SERVERsessions[i].dbalias= 0;
1279 0 : return MAL_SUCCEED;
1280 : }
1281 :
1282 : str
1283 0 : SERVERreconnect(void *ret, int *key){
1284 : int i;
1285 : Mapi mid;
1286 : (void) ret;
1287 0 : accessTest(*key, "destroy");
1288 0 : mapi_reconnect(mid);
1289 0 : return MAL_SUCCEED;
1290 : }
1291 :
1292 : str
1293 0 : SERVERping(int *ret, int *key){
1294 : int i;
1295 : Mapi mid;
1296 0 : accessTest(*key, "destroy");
1297 0 : *ret= mapi_ping(mid);
1298 0 : return MAL_SUCCEED;
1299 : }
1300 :
1301 : str
1302 24 : SERVERquery(int *ret, int *key, str *qry){
1303 : Mapi mid;
1304 : MapiHdl hdl=0;
1305 : int i;
1306 48 : accessTest(*key, "query");
1307 24 : if( SERVERsessions[i].hdl)
1308 23 : mapi_close_handle(SERVERsessions[i].hdl);
1309 24 : SERVERsessions[i].hdl = mapi_query(mid, *qry);
1310 92 : catchErrors("mapi.query");
1311 23 : *ret = *key;
1312 23 : return MAL_SUCCEED;
1313 : }
1314 :
1315 : str
1316 0 : SERVERquery_handle(int *ret, int *key, str *qry){
1317 : Mapi mid;
1318 : MapiHdl hdl=0;
1319 : int i;
1320 0 : accessTest(*key, "query_handle");
1321 0 : mapi_query_handle(SERVERsessions[i].hdl, *qry);
1322 0 : catchErrors("mapi.query_handle");
1323 0 : *ret = *key;
1324 0 : return MAL_SUCCEED;
1325 : }
1326 :
1327 : str
1328 0 : SERVERquery_array(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pc){
1329 : (void)cntxt, (void) mb; (void) stk; (void) pc;
1330 0 : throw(MAL, "mapi.query_array", SQLSTATE(0A000) PROGRAM_NYI);
1331 : }
1332 :
1333 : str
1334 0 : SERVERprepare(int *ret, int *key, str *qry){
1335 : Mapi mid;
1336 : int i;
1337 0 : accessTest(*key, "prepare");
1338 0 : if( SERVERsessions[i].hdl)
1339 0 : mapi_close_handle(SERVERsessions[i].hdl);
1340 0 : SERVERsessions[i].hdl= mapi_prepare(mid, *qry);
1341 0 : if( mapi_error(mid) )
1342 0 : throw(MAL, "mapi.prepare", "%s",
1343 : mapi_result_error(SERVERsessions[i].hdl));
1344 0 : *ret = *key;
1345 0 : return MAL_SUCCEED;
1346 : }
1347 :
1348 : str
1349 0 : SERVERfinish(int *ret, int *key){
1350 : Mapi mid;
1351 : int i;
1352 0 : accessTest(*key, "finish");
1353 0 : mapi_finish(SERVERsessions[i].hdl);
1354 0 : if( mapi_error(mid) )
1355 0 : throw(MAL, "mapi.finish", "%s",
1356 : mapi_result_error(SERVERsessions[i].hdl));
1357 0 : *ret = *key;
1358 0 : return MAL_SUCCEED;
1359 : }
1360 :
1361 : str
1362 1 : SERVERget_row_count(lng *ret, int *key){
1363 : Mapi mid;
1364 : int i;
1365 2 : accessTest(*key, "get_row_count");
1366 1 : *ret= (lng) mapi_get_row_count(SERVERsessions[i].hdl);
1367 1 : if( mapi_error(mid) )
1368 0 : throw(MAL, "mapi.get_row_count", "%s",
1369 : mapi_result_error(SERVERsessions[i].hdl));
1370 : return MAL_SUCCEED;
1371 : }
1372 :
1373 : str
1374 1 : SERVERget_field_count(int *ret, int *key){
1375 : Mapi mid;
1376 : int i;
1377 2 : accessTest(*key, "get_field_count");
1378 1 : *ret= mapi_get_field_count(SERVERsessions[i].hdl);
1379 1 : if( mapi_error(mid) )
1380 0 : throw(MAL, "mapi.get_field_count", "%s",
1381 : mapi_result_error(SERVERsessions[i].hdl));
1382 : return MAL_SUCCEED;
1383 : }
1384 :
1385 : str
1386 0 : SERVERrows_affected(lng *ret, int *key){
1387 : Mapi mid;
1388 : int i;
1389 0 : accessTest(*key, "rows_affected");
1390 0 : *ret= (lng) mapi_rows_affected(SERVERsessions[i].hdl);
1391 0 : return MAL_SUCCEED;
1392 : }
1393 :
1394 : str
1395 1 : SERVERfetch_row(int *ret, int *key){
1396 : Mapi mid;
1397 : int i;
1398 2 : accessTest(*key, "fetch_row");
1399 1 : *ret= mapi_fetch_row(SERVERsessions[i].hdl);
1400 1 : return MAL_SUCCEED;
1401 : }
1402 :
1403 : str
1404 0 : SERVERfetch_all_rows(lng *ret, int *key){
1405 : Mapi mid;
1406 : int i;
1407 0 : accessTest(*key, "fetch_all_rows");
1408 0 : *ret= (lng) mapi_fetch_all_rows(SERVERsessions[i].hdl);
1409 0 : return MAL_SUCCEED;
1410 : }
1411 :
1412 : str
1413 1 : SERVERfetch_field_str(str *ret, int *key, int *fnr){
1414 : Mapi mid;
1415 : int i;
1416 : str fld;
1417 2 : accessTest(*key, "fetch_field");
1418 1 : fld= mapi_fetch_field(SERVERsessions[i].hdl,*fnr);
1419 1 : *ret= GDKstrdup(fld? fld: str_nil);
1420 1 : if(*ret == NULL)
1421 0 : throw(MAL, "mapi.fetch_field_str", SQLSTATE(HY013) MAL_MALLOC_FAIL);
1422 1 : if( mapi_error(mid) )
1423 0 : throw(MAL, "mapi.fetch_field_str", "%s",
1424 : mapi_result_error(SERVERsessions[i].hdl));
1425 : return MAL_SUCCEED;
1426 : }
1427 :
1428 : str
1429 1 : SERVERfetch_field_int(int *ret, int *key, int *fnr){
1430 : Mapi mid;
1431 : int i;
1432 : str fld;
1433 2 : accessTest(*key, "fetch_field");
1434 1 : fld= mapi_fetch_field(SERVERsessions[i].hdl,*fnr);
1435 2 : *ret= fld? (int) atol(fld): int_nil;
1436 1 : if( mapi_error(mid) )
1437 0 : throw(MAL, "mapi.fetch_field_int", "%s",
1438 : mapi_result_error(SERVERsessions[i].hdl));
1439 : return MAL_SUCCEED;
1440 : }
1441 :
1442 : str
1443 0 : SERVERfetch_field_lng(lng *ret, int *key, int *fnr){
1444 : Mapi mid;
1445 : int i;
1446 : str fld;
1447 0 : accessTest(*key, "fetch_field");
1448 0 : fld= mapi_fetch_field(SERVERsessions[i].hdl,*fnr);
1449 0 : *ret= fld? atol(fld): lng_nil;
1450 0 : if( mapi_error(mid) )
1451 0 : throw(MAL, "mapi.fetch_field_lng", "%s",
1452 : mapi_result_error(SERVERsessions[i].hdl));
1453 : return MAL_SUCCEED;
1454 : }
1455 :
1456 : #ifdef HAVE_HGE
1457 : str
1458 0 : SERVERfetch_field_hge(hge *ret, int *key, int *fnr){
1459 : Mapi mid;
1460 : int i;
1461 : str fld;
1462 0 : accessTest(*key, "fetch_field");
1463 0 : fld= mapi_fetch_field(SERVERsessions[i].hdl,*fnr);
1464 0 : *ret= fld? atol(fld): hge_nil;
1465 0 : if( mapi_error(mid) )
1466 0 : throw(MAL, "mapi.fetch_field_hge", "%s",
1467 : mapi_result_error(SERVERsessions[i].hdl));
1468 : return MAL_SUCCEED;
1469 : }
1470 : #endif
1471 :
1472 : str
1473 0 : SERVERfetch_field_sht(sht *ret, int *key, int *fnr){
1474 : Mapi mid;
1475 : int i;
1476 : str fld;
1477 0 : accessTest(*key, "fetch_field");
1478 0 : fld= mapi_fetch_field(SERVERsessions[i].hdl,*fnr);
1479 0 : *ret= fld? (sht) atol(fld): sht_nil;
1480 0 : if( mapi_error(mid) )
1481 0 : throw(MAL, "mapi.fetch_field", "%s",
1482 : mapi_result_error(SERVERsessions[i].hdl));
1483 : return MAL_SUCCEED;
1484 : }
1485 :
1486 : str
1487 0 : SERVERfetch_field_void(void *ret, int *key, int *fnr){
1488 : Mapi mid;
1489 : int i;
1490 : (void) ret;
1491 : (void) fnr;
1492 0 : accessTest(*key, "fetch_field");
1493 0 : throw(MAL, "mapi.fetch_field_void","defaults to nil");
1494 : }
1495 :
1496 : str
1497 0 : SERVERfetch_field_oid(oid *ret, int *key, int *fnr){
1498 : Mapi mid;
1499 : int i;
1500 : str fld;
1501 0 : accessTest(*key, "fetch_field");
1502 0 : fld= mapi_fetch_field(SERVERsessions[i].hdl,*fnr);
1503 0 : if( mapi_error(mid) )
1504 0 : throw(MAL, "mapi.fetch_field_oid", "%s",
1505 : mapi_result_error(SERVERsessions[i].hdl));
1506 0 : if(fld==0 || strcmp(fld,"nil")==0)
1507 0 : *(oid*) ret= void_nil;
1508 0 : else *(oid*) ret = (oid) atol(fld);
1509 : return MAL_SUCCEED;
1510 : }
1511 :
1512 : str
1513 0 : SERVERfetch_field_bte(bte *ret, int *key, int *fnr){
1514 : Mapi mid;
1515 : int i;
1516 : str fld;
1517 0 : accessTest(*key, "fetch_field");
1518 0 : fld= mapi_fetch_field(SERVERsessions[i].hdl,*fnr);
1519 0 : if( mapi_error(mid) )
1520 0 : throw(MAL, "mapi.fetch_field_bte", "%s",
1521 : mapi_result_error(SERVERsessions[i].hdl));
1522 0 : if(fld==0 || strcmp(fld,"nil")==0)
1523 0 : *(bte*) ret= bte_nil;
1524 0 : else *(bte*) ret = *fld;
1525 : return MAL_SUCCEED;
1526 : }
1527 :
1528 : str
1529 0 : SERVERfetch_line(str *ret, int *key){
1530 : Mapi mid;
1531 : int i;
1532 : str fld;
1533 0 : accessTest(*key, "fetch_line");
1534 0 : fld= mapi_fetch_line(SERVERsessions[i].hdl);
1535 0 : if( mapi_error(mid) )
1536 0 : throw(MAL, "mapi.fetch_line", "%s",
1537 : mapi_result_error(SERVERsessions[i].hdl));
1538 0 : *ret= GDKstrdup(fld? fld:str_nil);
1539 0 : if(*ret == NULL)
1540 0 : throw(MAL, "mapi.fetch_line", SQLSTATE(HY013) MAL_MALLOC_FAIL);
1541 : return MAL_SUCCEED;
1542 : }
1543 :
1544 : str
1545 0 : SERVERnext_result(int *ret, int *key){
1546 : Mapi mid;
1547 : int i;
1548 0 : accessTest(*key, "next_result");
1549 0 : mapi_next_result(SERVERsessions[i].hdl);
1550 0 : if( mapi_error(mid) )
1551 0 : throw(MAL, "mapi.next_result", "%s",
1552 : mapi_result_error(SERVERsessions[i].hdl));
1553 0 : *ret= *key;
1554 0 : return MAL_SUCCEED;
1555 : }
1556 :
1557 : str
1558 0 : SERVERfetch_reset(int *ret, int *key){
1559 : Mapi mid;
1560 : int i;
1561 0 : accessTest(*key, "fetch_reset");
1562 0 : mapi_fetch_reset(SERVERsessions[i].hdl);
1563 0 : if( mapi_error(mid) )
1564 0 : throw(MAL, "mapi.fetch_reset", "%s",
1565 : mapi_result_error(SERVERsessions[i].hdl));
1566 0 : *ret= *key;
1567 0 : return MAL_SUCCEED;
1568 : }
1569 :
1570 : str
1571 0 : SERVERfetch_field_bat(bat *bid, int *key){
1572 : int i,j,cnt;
1573 : Mapi mid;
1574 : char *fld;
1575 : BAT *b;
1576 :
1577 0 : accessTest(*key, "rpc");
1578 0 : b= COLnew(0,TYPE_str,256, TRANSIENT);
1579 0 : if( b == NULL)
1580 0 : throw(MAL,"mapi.fetch", SQLSTATE(HY013) MAL_MALLOC_FAIL);
1581 0 : cnt= mapi_get_field_count(SERVERsessions[i].hdl);
1582 0 : for(j=0; j< cnt; j++){
1583 0 : fld= mapi_fetch_field(SERVERsessions[i].hdl,j);
1584 0 : if( mapi_error(mid) ) {
1585 0 : BBPreclaim(b);
1586 0 : throw(MAL, "mapi.fetch_field_bat", "%s",
1587 : mapi_result_error(SERVERsessions[i].hdl));
1588 : }
1589 0 : if (BUNappend(b,fld, false) != GDK_SUCCEED) {
1590 0 : BBPreclaim(b);
1591 0 : throw(MAL, "mapi.fetch_field_bat", SQLSTATE(HY013) MAL_MALLOC_FAIL);
1592 : }
1593 : }
1594 0 : *bid = b->batCacheid;
1595 0 : BBPkeepref(*bid);
1596 0 : return MAL_SUCCEED;
1597 : }
1598 :
1599 : str
1600 0 : SERVERerror(int *ret, int *key){
1601 : Mapi mid;
1602 : int i;
1603 0 : accessTest(*key, "error");
1604 0 : *ret= mapi_error(mid);
1605 0 : return MAL_SUCCEED;
1606 : }
1607 :
1608 : str
1609 0 : SERVERgetError(str *ret, int *key){
1610 : Mapi mid;
1611 : int i;
1612 0 : accessTest(*key, "getError");
1613 0 : *ret= GDKstrdup(mapi_error_str(mid));
1614 0 : if(*ret == NULL)
1615 0 : throw(MAL, "mapi.get_error", SQLSTATE(HY013) MAL_MALLOC_FAIL);
1616 : return MAL_SUCCEED;
1617 : }
1618 :
1619 : str
1620 0 : SERVERexplain(str *ret, int *key){
1621 : Mapi mid;
1622 : int i;
1623 :
1624 0 : accessTest(*key, "explain");
1625 0 : *ret= GDKstrdup(mapi_error_str(mid));
1626 0 : if(*ret == NULL)
1627 0 : throw(MAL, "mapi.explain", SQLSTATE(HY013) MAL_MALLOC_FAIL);
1628 : return MAL_SUCCEED;
1629 : }
1630 : /*
1631 : * The remainder should contain the wrapping of
1632 : * relevant SERVER functions. Furthermore, we
1633 : * should analyse the return value and update
1634 : * the stack trace.
1635 : *
1636 : * Two routines should be
1637 : * mapi.rpc(key,"query")
1638 : *
1639 : * The generic scheme for handling a remote MAL
1640 : * procedure call with a single row answer.
1641 : */
1642 18 : static int SERVERfieldAnalysis(str fld, int tpe, ValPtr v){
1643 18 : v->vtype= tpe;
1644 18 : switch(tpe){
1645 0 : case TYPE_void:
1646 0 : v->val.oval = void_nil;
1647 0 : break;
1648 2 : case TYPE_oid:
1649 2 : if(fld==0 || strcmp(fld,"nil")==0)
1650 1 : v->val.oval= void_nil;
1651 1 : else v->val.oval = (oid) atol(fld);
1652 : break;
1653 0 : case TYPE_bit:
1654 0 : if(fld== 0 || strcmp(fld,"nil")==0)
1655 0 : v->val.btval= bit_nil;
1656 : else
1657 0 : if(strcmp(fld,"true")==0)
1658 0 : v->val.btval= TRUE;
1659 : else
1660 0 : if(strcmp(fld,"false")==0)
1661 0 : v->val.btval= FALSE;
1662 : break;
1663 0 : case TYPE_bte:
1664 0 : if(fld==0 || strcmp(fld,"nil")==0)
1665 0 : v->val.btval= bte_nil;
1666 : else
1667 0 : v->val.btval= *fld;
1668 : break;
1669 0 : case TYPE_sht:
1670 0 : if(fld==0 || strcmp(fld,"nil")==0)
1671 0 : v->val.shval = sht_nil;
1672 0 : else v->val.shval= (sht) atol(fld);
1673 : break;
1674 12 : case TYPE_int:
1675 12 : if(fld==0 || strcmp(fld,"nil")==0)
1676 1 : v->val.ival = int_nil;
1677 11 : else v->val.ival= (int) atol(fld);
1678 : break;
1679 2 : case TYPE_lng:
1680 2 : if(fld==0 || strcmp(fld,"nil")==0)
1681 0 : v->val.lval= lng_nil;
1682 2 : else v->val.lval= (lng) atol(fld);
1683 : break;
1684 : #ifdef HAVE_HGE
1685 0 : case TYPE_hge:
1686 0 : if(fld==0 || strcmp(fld,"nil")==0)
1687 0 : v->val.hval= hge_nil;
1688 0 : else v->val.hval= (hge) atol(fld);
1689 : break;
1690 : #endif
1691 0 : case TYPE_flt:
1692 0 : if(fld==0 || strcmp(fld,"nil")==0)
1693 0 : v->val.fval= flt_nil;
1694 0 : else v->val.fval= (flt) atof(fld);
1695 : break;
1696 0 : case TYPE_dbl:
1697 0 : if(fld==0 || strcmp(fld,"nil")==0)
1698 0 : v->val.dval= dbl_nil;
1699 0 : else v->val.dval= (dbl) atof(fld);
1700 : break;
1701 2 : case TYPE_str:
1702 2 : if(fld==0 || strcmp(fld,"nil")==0){
1703 0 : if((v->val.sval= GDKstrdup(str_nil)) == NULL)
1704 : return -1;
1705 0 : v->len = strlen(v->val.sval);
1706 : } else {
1707 2 : if((v->val.sval= GDKstrdup(fld)) == NULL)
1708 : return -1;
1709 2 : v->len = strlen(fld);
1710 : }
1711 : break;
1712 : }
1713 : return 0;
1714 : }
1715 :
1716 : str
1717 6 : SERVERmapi_rpc_single_row(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
1718 : {
1719 : int key,i,j;
1720 : Mapi mid;
1721 : MapiHdl hdl;
1722 : char *s,*fld, *qry=0;
1723 :
1724 : (void) cntxt;
1725 6 : key= * getArgReference_int(stk,pci,pci->retc);
1726 12 : accessTest(key, "rpc");
1727 : #ifdef MAPI_TEST
1728 : mnstr_printf(cntxt->fdout,"about to send: %s\n",qry);
1729 : #endif
1730 : /* glue all strings together */
1731 12 : for(i= pci->retc+1; i<pci->argc; i++){
1732 6 : fld= * getArgReference_str(stk,pci,i);
1733 6 : if( qry == 0) {
1734 6 : qry= GDKstrdup(fld);
1735 6 : if ( qry == NULL)
1736 0 : throw(MAL, "mapi.rpc",SQLSTATE(HY013) MAL_MALLOC_FAIL);
1737 : } else {
1738 0 : s= (char*) GDKmalloc(strlen(qry)+strlen(fld)+1);
1739 0 : if ( s == NULL) {
1740 0 : GDKfree(qry);
1741 0 : throw(MAL, "mapi.rpc", SQLSTATE(HY013) MAL_MALLOC_FAIL);
1742 : }
1743 0 : strcpy(s,qry);
1744 0 : strcat(s,fld);
1745 0 : GDKfree(qry);
1746 : qry= s;
1747 : }
1748 : }
1749 6 : hdl= mapi_query(mid, qry);
1750 6 : GDKfree(qry);
1751 6 : catchErrors("mapi.rpc");
1752 :
1753 : i= 0;
1754 12 : while( mapi_fetch_row(hdl)){
1755 12 : for(j=0; j<pci->retc; j++){
1756 6 : fld= mapi_fetch_field(hdl,j);
1757 : #ifdef MAPI_TEST
1758 : mnstr_printf(cntxt->fdout,"Got: %s\n",fld);
1759 : #endif
1760 6 : switch(getVarType(mb,getArg(pci,j)) ){
1761 6 : case TYPE_void:
1762 : case TYPE_oid:
1763 : case TYPE_bit:
1764 : case TYPE_bte:
1765 : case TYPE_sht:
1766 : case TYPE_int:
1767 : case TYPE_lng:
1768 : #ifdef HAVE_HGE
1769 : case TYPE_hge:
1770 : #endif
1771 : case TYPE_flt:
1772 : case TYPE_dbl:
1773 : case TYPE_str:
1774 6 : if(SERVERfieldAnalysis(fld,getVarType(mb,getArg(pci,j)),&stk->stk[pci->argv[j]]) < 0)
1775 0 : throw(MAL, "mapi.rpc", SQLSTATE(HY013) MAL_MALLOC_FAIL);
1776 : break;
1777 0 : default:
1778 0 : throw(MAL, "mapi.rpc",
1779 : "Missing type implementation ");
1780 : /* all the other basic types come here */
1781 : }
1782 : }
1783 6 : i++;
1784 : }
1785 6 : if( i>1)
1786 0 : throw(MAL, "mapi.rpc","Too many answers");
1787 : return MAL_SUCCEED;
1788 : }
1789 : /*
1790 : * Transport of the BATs is only slightly more complicated.
1791 : * The generic implementation based on a pattern is the next
1792 : * step.
1793 : */
1794 : str
1795 5 : SERVERmapi_rpc_bat(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci){
1796 : bat *ret;
1797 : int *key;
1798 : str *qry,err= MAL_SUCCEED;
1799 : Mapi mid;
1800 : MapiHdl hdl;
1801 : char *fld2;
1802 : BAT *b;
1803 : ValRecord tval;
1804 : int i=0, tt;
1805 :
1806 : (void) cntxt;
1807 5 : ret= getArgReference_bat(stk,pci,0);
1808 5 : key= getArgReference_int(stk,pci,pci->retc);
1809 5 : qry= getArgReference_str(stk,pci,pci->retc+1);
1810 10 : accessTest(*key, "rpc");
1811 5 : tt= getBatType(getVarType(mb,getArg(pci,0)));
1812 :
1813 5 : hdl= mapi_query(mid, *qry);
1814 5 : catchErrors("mapi.rpc");
1815 :
1816 5 : b= COLnew(0,tt,256, TRANSIENT);
1817 5 : if ( b == NULL)
1818 0 : throw(MAL,"mapi.rpc", SQLSTATE(HY013) MAL_MALLOC_FAIL);
1819 17 : while( mapi_fetch_row(hdl)){
1820 12 : fld2= mapi_fetch_field(hdl,1);
1821 12 : if(SERVERfieldAnalysis(fld2, tt, &tval) < 0) {
1822 0 : BBPreclaim(b);
1823 0 : throw(MAL, "mapi.rpc", SQLSTATE(HY013) MAL_MALLOC_FAIL);
1824 : }
1825 12 : if (BUNappend(b,VALptr(&tval), false) != GDK_SUCCEED) {
1826 0 : BBPreclaim(b);
1827 0 : throw(MAL, "mapi.rpc", SQLSTATE(HY013) MAL_MALLOC_FAIL);
1828 : }
1829 : }
1830 5 : *ret = b->batCacheid;
1831 5 : BBPkeepref(*ret);
1832 :
1833 5 : return err;
1834 : }
1835 :
1836 : str
1837 2 : SERVERput(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci){
1838 : int *key;
1839 : str *nme;
1840 : ptr val;
1841 : int i,tpe;
1842 : Mapi mid;
1843 : MapiHdl hdl=0;
1844 : char *w=0, buf[BUFSIZ];
1845 :
1846 : (void) cntxt;
1847 2 : key= getArgReference_int(stk,pci,pci->retc);
1848 2 : nme= getArgReference_str(stk,pci,pci->retc+1);
1849 2 : val= getArgReference(stk,pci,pci->retc+2);
1850 4 : accessTest(*key, "put");
1851 2 : switch( (tpe=getArgType(mb,pci, pci->retc+2)) ){
1852 0 : case TYPE_bat:{
1853 : /* generate a tuple batch */
1854 : /* and reload it into the proper format */
1855 : str ht,tt;
1856 0 : BAT *b= BATdescriptor(BBPindex(*nme));
1857 : size_t len;
1858 :
1859 0 : if( b== NULL){
1860 0 : throw(MAL,"mapi.put","Can not access BAT");
1861 : }
1862 :
1863 : /* reconstruct the object */
1864 0 : ht = getTypeName(TYPE_oid);
1865 0 : tt = getTypeName(getBatType(tpe));
1866 0 : snprintf(buf,BUFSIZ,"%s:= bat.new(:%s,%s);", *nme, ht,tt );
1867 0 : len = strlen(buf);
1868 0 : snprintf(buf+len,BUFSIZ-len,"%s:= io.import(%s,tuples);", *nme, *nme);
1869 :
1870 : /* and execute the request */
1871 0 : if( SERVERsessions[i].hdl)
1872 0 : mapi_close_handle(SERVERsessions[i].hdl);
1873 0 : SERVERsessions[i].hdl= mapi_query(mid, buf);
1874 :
1875 0 : GDKfree(ht); GDKfree(tt);
1876 0 : BBPrelease(b->batCacheid);
1877 0 : break;
1878 : }
1879 0 : case TYPE_str:
1880 0 : snprintf(buf,BUFSIZ,"%s:=%s;",*nme,*(char**)val);
1881 0 : if( SERVERsessions[i].hdl)
1882 0 : mapi_close_handle(SERVERsessions[i].hdl);
1883 0 : SERVERsessions[i].hdl= mapi_query(mid, buf);
1884 0 : break;
1885 2 : default:
1886 2 : if ((w = ATOMformat(tpe,val)) == NULL)
1887 0 : throw(MAL, "mapi.put", GDK_EXCEPTION);
1888 2 : snprintf(buf,BUFSIZ,"%s:=%s;",*nme,w);
1889 2 : GDKfree(w);
1890 2 : if( SERVERsessions[i].hdl)
1891 2 : mapi_close_handle(SERVERsessions[i].hdl);
1892 2 : SERVERsessions[i].hdl= mapi_query(mid, buf);
1893 2 : break;
1894 : }
1895 2 : catchErrors("mapi.put");
1896 : return MAL_SUCCEED;
1897 : }
1898 :
1899 : str
1900 0 : SERVERputLocal(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci){
1901 : str *ret, *nme;
1902 : ptr val;
1903 : int tpe;
1904 : char *w=0, buf[BUFSIZ];
1905 :
1906 : (void) cntxt;
1907 0 : ret= getArgReference_str(stk,pci,0);
1908 0 : nme= getArgReference_str(stk,pci,pci->retc);
1909 0 : val= getArgReference(stk,pci,pci->retc+1);
1910 0 : switch( (tpe=getArgType(mb,pci, pci->retc+1)) ){
1911 0 : case TYPE_bat:
1912 : case TYPE_ptr:
1913 0 : throw(MAL, "mapi.glue","Unsupported type");
1914 0 : case TYPE_str:
1915 0 : snprintf(buf,BUFSIZ,"%s:=%s;",*nme,*(char**)val);
1916 0 : break;
1917 0 : default:
1918 0 : if ((w = ATOMformat(tpe,val)) == NULL)
1919 0 : throw(MAL, "mapi.glue", GDK_EXCEPTION);
1920 0 : snprintf(buf,BUFSIZ,"%s:=%s;",*nme,w);
1921 0 : GDKfree(w);
1922 0 : break;
1923 : }
1924 0 : *ret= GDKstrdup(buf);
1925 0 : if(*ret == NULL)
1926 0 : throw(MAL, "mapi.glue", GDK_EXCEPTION);
1927 : return MAL_SUCCEED;
1928 : }
1929 :
1930 : str
1931 1 : SERVERbindBAT(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci){
1932 : int *key;
1933 : str *nme,*tab,*col;
1934 : int i;
1935 : Mapi mid;
1936 : MapiHdl hdl=0;
1937 : char buf[BUFSIZ];
1938 :
1939 : (void) cntxt;
1940 1 : key= getArgReference_int(stk,pci,pci->retc);
1941 1 : nme= getArgReference_str(stk,pci,pci->retc+1);
1942 2 : accessTest(*key, "bind");
1943 1 : if( pci->argc == 6) {
1944 : char *tn;
1945 0 : tab= getArgReference_str(stk,pci,pci->retc+2);
1946 0 : col= getArgReference_str(stk,pci,pci->retc+3);
1947 0 : i= *getArgReference_int(stk,pci,pci->retc+4);
1948 0 : tn = getTypeName(getBatType(getVarType(mb,getDestVar(pci))));
1949 0 : snprintf(buf,BUFSIZ,"%s:bat[:%s]:=sql.bind(\"%s\",\"%s\",\"%s\",%d);",
1950 0 : getVarName(mb,getDestVar(pci)),
1951 : tn,
1952 : *nme, *tab,*col,i);
1953 0 : GDKfree(tn);
1954 1 : } else if( pci->argc == 5) {
1955 0 : tab= getArgReference_str(stk,pci,pci->retc+2);
1956 0 : i= *getArgReference_int(stk,pci,pci->retc+3);
1957 0 : snprintf(buf,BUFSIZ,"%s:bat[:oid]:=sql.bind(\"%s\",\"%s\",0,%d);",
1958 0 : getVarName(mb,getDestVar(pci)),*nme, *tab,i);
1959 : } else {
1960 : str hn,tn;
1961 1 : int target= getArgType(mb,pci,0);
1962 1 : hn= getTypeName(TYPE_oid);
1963 1 : tn= getTypeName(getBatType(target));
1964 1 : snprintf(buf,BUFSIZ,"%s:bat[:%s]:=bbp.bind(\"%s\");",
1965 1 : getVarName(mb,getDestVar(pci)), tn, *nme);
1966 1 : GDKfree(hn);
1967 1 : GDKfree(tn);
1968 : }
1969 1 : if( SERVERsessions[i].hdl)
1970 1 : mapi_close_handle(SERVERsessions[i].hdl);
1971 1 : SERVERsessions[i].hdl= mapi_query(mid, buf);
1972 1 : catchErrors("mapi.bind");
1973 : return MAL_SUCCEED;
1974 : }
1975 :
1976 : #include "mel.h"
1977 : mel_func mal_mapi_init_funcs[] = {
1978 : command("mapi", "prelude", SERVERlisten_default, false, "", args(1,1, arg("",int))),
1979 : command("mapi", "listen", SERVERlisten_default, false, "Start a Mapi server with the default settings.", args(1,1, arg("",int))),
1980 : command("mapi", "listen", SERVERlisten_port, false, "Start a Mapi listener on the port given.", args(1,2, arg("",int),arg("port",int))),
1981 : command("mapi", "listen", SERVERlisten_usock, false, "Start a Mapi listener on the unix socket file given.", args(1,2, arg("",int),arg("unixsocket",str))),
1982 : command("mapi", "stop", SERVERstop, false, "Terminate connection listeners.", args(1,1, arg("",void))),
1983 : command("mapi", "suspend", SERVERsuspend, false, "Suspend accepting connections.", args(1,1, arg("",void))),
1984 : command("mapi", "resume", SERVERresume, false, "Resume connection listeners.", args(1,1, arg("",void))),
1985 : command("mapi", "malclient", SERVERclient, false, "Start a Mapi client for a particular stream pair.", args(1,3, arg("",void),arg("in",streams),arg("out",streams))),
1986 : command("mapi", "trace", SERVERtrace, false, "Toggle the Mapi library debug tracer.", args(1,3, arg("",void),arg("mid",int),arg("flag",int))),
1987 : pattern("mapi", "reconnect", SERVERreconnectWithoutAlias, false, "Re-establish connection with a remote mserver.", args(1,6, arg("",int),arg("host",str),arg("port",int),arg("usr",str),arg("passwd",str),arg("lang",str))),
1988 : pattern("mapi", "reconnect", SERVERreconnectAlias, false, "Re-establish connection with a remote mserver.", args(1,7, arg("",int),arg("host",str),arg("port",int),arg("db_alias",str),arg("usr",str),arg("passwd",str),arg("lang",str))),
1989 : command("mapi", "reconnect", SERVERreconnect, false, "Re-establish a connection.", args(1,2, arg("",void),arg("mid",int))),
1990 : pattern("mapi", "connect", SERVERconnect, false, "Establish connection with a remote mserver.", args(1,6, arg("",int),arg("host",str),arg("port",int),arg("usr",str),arg("passwd",str),arg("lang",str))),
1991 : command("mapi", "disconnect", SERVERdisconnectWithAlias, false, "Close connection with a remote Mserver.", args(1,2, arg("",int),arg("dbalias",str))),
1992 : command("mapi", "disconnect", SERVERdisconnectALL, false, "Close connections with all remote Mserver.", args(1,1, arg("",int))),
1993 : command("mapi", "setAlias", SERVERsetAlias, false, "Give the channel a logical name.", args(0,2, arg("key",int),arg("dbalias",str))),
1994 : command("mapi", "lookup", SERVERlookup, false, "Retrieve the connection identifier.", args(1,2, arg("",int),arg("dbalias",str))),
1995 : command("mapi", "disconnect", SERVERdisconnect, false, "Terminate the session.", args(1,2, arg("",void),arg("mid",int))),
1996 : command("mapi", "destroy", SERVERdestroy, false, "Destroy the handle for an Mserver.", args(1,2, arg("",void),arg("mid",int))),
1997 : command("mapi", "ping", SERVERping, false, "Test availability of an Mserver.", args(1,2, arg("",int),arg("mid",int))),
1998 : command("mapi", "query", SERVERquery, false, "Send the query for execution", args(1,3, arg("",int),arg("mid",int),arg("qry",str))),
1999 : command("mapi", "query_handle", SERVERquery_handle, false, "Send the query for execution.", args(1,3, arg("",int),arg("mid",int),arg("qry",str))),
2000 : pattern("mapi", "query_array", SERVERquery_array, false, "Send the query for execution replacing '?' by arguments.", args(1,4, arg("",int),arg("mid",int),arg("qry",str),vararg("arg",str))),
2001 : command("mapi", "prepare", SERVERprepare, false, "Prepare a query for execution.", args(1,3, arg("",int),arg("mid",int),arg("qry",str))),
2002 : command("mapi", "finish", SERVERfinish, false, "Remove all remaining answers.", args(1,2, arg("",int),arg("hdl",int))),
2003 : command("mapi", "get_field_count", SERVERget_field_count, false, "Return number of fields.", args(1,2, arg("",int),arg("hdl",int))),
2004 : command("mapi", "get_row_count", SERVERget_row_count, false, "Return number of rows.", args(1,2, arg("",lng),arg("hdl",int))),
2005 : command("mapi", "rows_affected", SERVERrows_affected, false, "Return number of affected rows.", args(1,2, arg("",lng),arg("hdl",int))),
2006 : command("mapi", "fetch_row", SERVERfetch_row, false, "Retrieve the next row for analysis.", args(1,2, arg("",int),arg("hdl",int))),
2007 : command("mapi", "fetch_all_rows", SERVERfetch_all_rows, false, "Retrieve all rows into the cache.", args(1,2, arg("",lng),arg("hdl",int))),
2008 : command("mapi", "fetch_field", SERVERfetch_field_str, false, "Retrieve a single field.", args(1,3, arg("",str),arg("hdl",int),arg("fnr",int))),
2009 : command("mapi", "fetch_field", SERVERfetch_field_int, false, "Retrieve a single int field.", args(1,3, arg("",int),arg("hdl",int),arg("fnr",int))),
2010 : command("mapi", "fetch_field", SERVERfetch_field_lng, false, "Retrieve a single lng field.", args(1,3, arg("",lng),arg("hdl",int),arg("fnr",int))),
2011 : command("mapi", "fetch_field", SERVERfetch_field_sht, false, "Retrieve a single sht field.", args(1,3, arg("",sht),arg("hdl",int),arg("fnr",int))),
2012 : command("mapi", "fetch_field", SERVERfetch_field_void, false, "Retrieve a single void field.", args(1,3, arg("",void),arg("hdl",int),arg("fnr",int))),
2013 : command("mapi", "fetch_field", SERVERfetch_field_oid, false, "Retrieve a single void field.", args(1,3, arg("",oid),arg("hdl",int),arg("fnr",int))),
2014 : command("mapi", "fetch_field", SERVERfetch_field_bte, false, "Retrieve a single bte field.", args(1,3, arg("",bte),arg("hdl",int),arg("fnr",int))),
2015 : command("mapi", "fetch_field_array", SERVERfetch_field_bat, false, "Retrieve all fields for a row.", args(1,2, batarg("",str),arg("hdl",int))),
2016 : command("mapi", "fetch_line", SERVERfetch_line, false, "Retrieve a complete line.", args(1,2, arg("",str),arg("hdl",int))),
2017 : command("mapi", "fetch_reset", SERVERfetch_reset, false, "Reset the cache read line.", args(1,2, arg("",int),arg("hdl",int))),
2018 : command("mapi", "next_result", SERVERnext_result, false, "Go to next result set.", args(1,2, arg("",int),arg("hdl",int))),
2019 : command("mapi", "error", SERVERerror, false, "Check for an error in the communication.", args(1,2, arg("",int),arg("mid",int))),
2020 : command("mapi", "getError", SERVERgetError, false, "Get error message.", args(1,2, arg("",str),arg("mid",int))),
2021 : command("mapi", "explain", SERVERexplain, false, "Turn the error seen into a string.", args(1,2, arg("",str),arg("mid",int))),
2022 : pattern("mapi", "put", SERVERput, false, "Send a value to a remote site.", args(1,4, arg("",void),arg("mid",int),arg("nme",str),argany("val",1))),
2023 : pattern("mapi", "put", SERVERputLocal, false, "Prepare sending a value to a remote site.", args(1,3, arg("",str),arg("nme",str),argany("val",1))),
2024 : pattern("mapi", "rpc", SERVERmapi_rpc_single_row, false, "Send a simple query for execution and fetch result.", args(1,3, argany("",0),arg("key",int),vararg("qry",str))),
2025 : pattern("mapi", "rpc", SERVERmapi_rpc_bat, false, "", args(1,3, batargany("",2),arg("key",int),arg("qry",str))),
2026 : command("mapi", "rpc", SERVERquery, false, "Send a simple query for execution.", args(1,3, arg("",int),arg("key",int),arg("qry",str))),
2027 : pattern("mapi", "bind", SERVERbindBAT, false, "Bind a remote variable to a local one.", args(1,6, batargany("",2),arg("key",int),arg("rschema",str),arg("rtable",str),arg("rcolumn",str),arg("i",int))),
2028 : pattern("mapi", "bind", SERVERbindBAT, false, "Bind a remote variable to a local one.", args(1,5, batargany("",2),arg("key",int),arg("rschema",str),arg("rtable",str),arg("i",int))),
2029 : pattern("mapi", "bind", SERVERbindBAT, false, "Bind a remote variable to a local one.", args(1,3, batargany("",2),arg("key",int),arg("remoteName",str))),
2030 : #ifdef HAVE_HGE
2031 : command("mapi", "fetch_field", SERVERfetch_field_hge, false, "Retrieve a single hge field.", args(1,3, arg("",hge),arg("hdl",int),arg("fnr",int))),
2032 : #endif
2033 : { .imp=NULL }
2034 : };
2035 : #include "mal_import.h"
2036 : #ifdef _MSC_VER
2037 : #undef read
2038 : #pragma section(".CRT$XCU",read)
2039 : #endif
2040 255 : LIB_STARTUP_FUNC(init_mal_mapi_mal)
2041 255 : { mal_module("mapi", NULL, mal_mapi_init_funcs); }
2042 :
2043 : #else
2044 : // this avoids a compiler warning w.r.t. empty compilation units.
2045 : int SERVERdummy = 42;
2046 : #endif // HAVE_MAPI
|