Line data Source code
1 : /*
2 : * This Source Code Form is subject to the terms of the Mozilla Public
3 : * License, v. 2.0. If a copy of the MPL was not distributed with this
4 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
5 : *
6 : * Copyright 1997 - July 2008 CWI, August 2008 - 2021 MonetDB B.V.
7 : */
8 :
9 : /*
10 : * @a N.J. Nes P. Boncz, S. Mullender, M. Kersten
11 : * @v 1.1
12 : * @+ MAPI interface
13 : * The complete Mapi library is available to setup
14 : * communication with another Mserver.
15 : *
16 : * Clients may initialize a private listener to implement
17 : * specific services. For example, in an OLTP environment
18 : * it may make sense to have a listener for each transaction
19 : * type, which simply parses a sequence of transaction parameters.
20 : *
21 : * Authorization of access to the server is handled as part
22 : * of the client record initialization phase.
23 : *
24 : * This library internally uses pointer handles, which we replace with
25 : * an index in a locally maintained table. It provides a handle
26 : * to easily detect havoc clients.
27 : *
28 : * A cleaner and simplier interface for distributed processing is available in
29 : * the module remote.
30 : */
31 : #include "monetdb_config.h"
32 : #ifdef HAVE_MAPI
33 : #include "mal_client.h"
34 : #include "mal_session.h"
35 : #include "mal_exception.h"
36 : #include "mal_interpreter.h"
37 : #include "mal_authorize.h"
38 : #include "msabaoth.h"
39 : #include "mcrypt.h"
40 : #include "stream.h"
41 : #include "streams.h" /* for Stream */
42 : #include <sys/types.h>
43 : #include "stream_socket.h"
44 : #include "mapi.h"
45 : #include "mutils.h"
46 :
47 : #if defined(HAVE_GETENTROPY) && defined(HAVE_SYS_RANDOM_H)
48 : #include <sys/random.h>
49 : #endif
50 :
51 : #ifdef HAVE_SYS_SOCKET_H
52 : # include <sys/select.h>
53 : # include <sys/socket.h>
54 : # include <unistd.h> /* gethostname() */
55 : # include <netinet/in.h> /* hton and ntoh */
56 : # include <arpa/inet.h> /* addr_in */
57 : #else /* UNIX specific */
58 : #ifdef HAVE_WINSOCK_H /* Windows specific */
59 : # include <winsock.h>
60 : #endif
61 : #endif
62 : #ifdef HAVE_SYS_UN_H
63 : # include <sys/un.h>
64 : #endif
65 : #ifdef HAVE_NETDB_H
66 : # include <netdb.h>
67 : # include <netinet/in.h>
68 : #endif
69 : #ifdef HAVE_POLL_H
70 : #include <poll.h>
71 : #endif
72 : #ifdef HAVE_SYS_UIO_H
73 : # include <sys/uio.h>
74 : #endif
75 : #ifdef HAVE_FCNTL_H
76 : #include <fcntl.h>
77 : #endif
78 :
79 : #ifdef HAVE_SOCKLEN_T
80 : #define SOCKLEN socklen_t
81 : #else
82 : #define SOCKLEN int
83 : #endif
84 :
85 : #if !defined(HAVE_ACCEPT4) || !defined(SOCK_CLOEXEC)
86 : #define accept4(sockfd, addr, addrlen, flags) accept(sockfd, addr, addrlen)
87 : #endif
88 :
89 : #define SERVERMAXUSERS 5
90 :
91 : static char seedChars[] = {'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j',
92 : 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x',
93 : 'y', 'z', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L',
94 : 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z',
95 : '1', '2', '3', '4', '5', '6', '7', '8', '9', '0'};
96 :
97 :
98 : #if !defined(HAVE_UUID) && !defined(HAVE_GETENTROPY) && defined(HAVE_RAND_S)
99 : static inline bool
100 : gen_win_challenge(char *buf, size_t size)
101 : {
102 : for (size_t i = 0; i < size; ) {
103 : unsigned int r;
104 : if (rand_s(&r) != 0)
105 : return false;
106 : for (size_t j = 0; j < sizeof(size_t) && i < size; j++) {
107 : buf[i++] = seedChars[(r & 0xFF) % 62];
108 : r >>= 8;
109 : }
110 : }
111 : return true;
112 : }
113 : #endif
114 :
115 5455 : static void generateChallenge(str buf, int min, int max) {
116 : size_t size;
117 : size_t i;
118 :
119 : #ifdef __COVERITY__
120 : /* hide rand() calls from analysis */
121 : size = (min + max) / 2;
122 : for (i = 0; i < size; i++)
123 : buf[i] = seedChars[i % 62];
124 : buf[size] = '\0';
125 : #else
126 : /* don't seed the randomiser here, or you get the same challenge
127 : * during the same second */
128 : #if defined(HAVE_GETENTROPY)
129 5455 : if (getentropy(&size, sizeof(size)) < 0)
130 : #elif defined(HAVE_RAND_S)
131 : unsigned int r;
132 : if (rand_s(&r) == 0)
133 : size = (size_t) r;
134 : else
135 : #endif
136 0 : size = rand();
137 5455 : size = (size % (max - min)) + min;
138 : #if defined(HAVE_GETENTROPY)
139 5455 : if (getentropy(buf, size) == 0)
140 57325 : for (i = 0; i < size; i++)
141 51870 : buf[i] = seedChars[((unsigned char *) buf)[i] % 62];
142 : else
143 : #elif defined(HAVE_RAND_S)
144 : if (!gen_win_challenge(buf, size))
145 : #endif
146 0 : for (i = 0; i < size; i++)
147 0 : buf[i] = seedChars[rand() % 62];
148 5455 : buf[size] = '\0';
149 : #endif
150 5455 : }
151 :
152 : struct challengedata {
153 : stream *in;
154 : stream *out;
155 : char challenge[13];
156 : };
157 :
158 : static str SERVERsetAlias(void *ret, int *key, str *dbalias);
159 :
160 : static void
161 5455 : doChallenge(void *data)
162 : {
163 5455 : char *buf = GDKmalloc(BLOCK + 1);
164 : char challenge[13];
165 :
166 5455 : stream *fdin = ((struct challengedata *) data)->in;
167 5455 : stream *fdout = ((struct challengedata *) data)->out;
168 : bstream *bs;
169 : ssize_t len = 0;
170 : protocol_version protocol = PROTOCOL_9;
171 : size_t buflen = BLOCK;
172 :
173 5455 : MT_thread_setworking("challenging client");
174 : #ifdef _MSC_VER
175 : srand((unsigned int) GDKusec());
176 : #endif
177 5455 : memcpy(challenge, ((struct challengedata *) data)->challenge, sizeof(challenge));
178 5455 : GDKfree(data);
179 5455 : if (buf == NULL) {
180 0 : TRC_ERROR(MAL_SERVER, MAL_MALLOC_FAIL "\n");
181 0 : close_stream(fdin);
182 0 : close_stream(fdout);
183 0 : return;
184 : }
185 :
186 : // Send the challenge over the block stream
187 5455 : mnstr_printf(fdout, "%s:mserver:9:%s:%s:%s:sql=%d:",
188 : challenge,
189 : mcrypt_getHashAlgorithms(),
190 : #ifdef WORDS_BIGENDIAN
191 : "BIG",
192 : #else
193 : "LIT",
194 : #endif
195 : MONETDB5_PASSWDHASH,
196 : MAPI_HANDSHAKE_OPTIONS_LEVEL
197 : );
198 5455 : mnstr_flush(fdout, MNSTR_FLUSH_DATA);
199 : /* get response */
200 5455 : if ((len = mnstr_read_block(fdin, buf, 1, BLOCK)) < 0) {
201 : /* the client must have gone away, so no reason to write anything */
202 0 : close_stream(fdin);
203 0 : close_stream(fdout);
204 0 : GDKfree(buf);
205 0 : return;
206 : }
207 5455 : buf[len] = 0;
208 :
209 5455 : bs = bstream_create(fdin, 128 * BLOCK);
210 :
211 5455 : if (bs == NULL){
212 0 : mnstr_printf(fdout, "!allocation failure in the server\n");
213 0 : close_stream(fdin);
214 0 : close_stream(fdout);
215 0 : GDKfree(buf);
216 0 : GDKsyserror("SERVERlisten:"MAL_MALLOC_FAIL);
217 : return;
218 : }
219 5455 : bs->eof = true;
220 5455 : MSscheduleClient(buf, challenge, bs, fdout, protocol, buflen);
221 : }
222 :
223 : static ATOMIC_TYPE nlistener = ATOMIC_VAR_INIT(0); /* nr of listeners */
224 : static ATOMIC_TYPE serveractive = ATOMIC_VAR_INIT(0);
225 : static ATOMIC_TYPE serverexiting = ATOMIC_VAR_INIT(0); /* listeners should exit */
226 : static ATOMIC_TYPE threadno = ATOMIC_VAR_INIT(0); /* thread sequence no */
227 :
228 : static void
229 252 : SERVERlistenThread(SOCKET *Sock)
230 : {
231 : char *msg = NULL;
232 : int retval;
233 252 : SOCKET socks[3] = {Sock[0], Sock[1], Sock[2]};
234 : struct challengedata *data;
235 : MT_Id tid;
236 : stream *s;
237 : int i;
238 :
239 252 : GDKfree(Sock);
240 :
241 252 : (void) ATOMIC_INC(&nlistener);
242 :
243 : do {
244 : SOCKET msgsock = INVALID_SOCKET;
245 : #ifdef HAVE_POLL
246 : struct pollfd pfd[3];
247 : nfds_t npfd;
248 : npfd = 0;
249 633376 : for (i = 0; i < 3; i++) {
250 475032 : if (socks[i] != INVALID_SOCKET)
251 316688 : pfd[npfd++] = (struct pollfd) {.fd = socks[i],
252 : .events = POLLIN};
253 : }
254 : /* Wait up to 0.1 seconds (0.01 if testing) */
255 160380 : retval = poll(pfd, npfd, GDKdebug & FORCEMITOMASK ? 10 : 100);
256 158343 : if (retval == -1 && errno == EINTR)
257 152637 : continue;
258 : #else
259 : fd_set fds;
260 : FD_ZERO(&fds);
261 : /* temporarily use msgsock to record the highest socket fd */
262 : for (i = 0; i < 3; i++) {
263 : if (socks[i] != INVALID_SOCKET) {
264 : FD_SET(socks[i], &fds);
265 : if (msgsock == INVALID_SOCKET || socks[i] > msgsock)
266 : msgsock = socks[i];
267 : }
268 : }
269 : /* Wait up to 0.1 seconds (0.01 if testing) */
270 : struct timeval tv = (struct timeval) {
271 : .tv_usec = GDKdebug & FORCEMITOMASK ? 10000 : 100000,
272 : };
273 :
274 : retval = select((int) msgsock + 1, &fds, NULL, NULL, &tv);
275 : msgsock = INVALID_SOCKET;
276 : #endif
277 158343 : if (ATOMIC_GET(&serverexiting) || GDKexiting())
278 : break;
279 158092 : if (retval == 0) {
280 : /* nothing interesting has happened */
281 152637 : continue;
282 : }
283 5455 : if (retval == SOCKET_ERROR) {
284 0 : if (
285 : #ifdef _MSC_VER
286 : WSAGetLastError() != WSAEINTR
287 : #else
288 0 : errno != EINTR
289 : #endif
290 : ) {
291 : msg = "select failed";
292 0 : goto error;
293 : }
294 0 : continue;
295 : }
296 : bool isusock = false;
297 : #ifdef HAVE_POLL
298 5629 : for (i = 0; i < (int) npfd; i++) {
299 5629 : if (pfd[i].revents & POLLIN) {
300 5455 : msgsock = pfd[i].fd;
301 5455 : isusock = msgsock == socks[2];
302 5455 : break;
303 : }
304 : }
305 : #else
306 : for (i = 0; i < 3; i++) {
307 : if (socks[i] != INVALID_SOCKET && FD_ISSET(socks[i], &fds)) {
308 : msgsock = socks[i];
309 : isusock = i == 2;
310 : break;
311 : }
312 : }
313 : #endif
314 5455 : if (msgsock == INVALID_SOCKET)
315 0 : continue;
316 :
317 5455 : if ((msgsock = accept4(msgsock, NULL, NULL, SOCK_CLOEXEC)) == INVALID_SOCKET) {
318 0 : if (
319 : #ifdef _MSC_VER
320 : WSAGetLastError() != WSAEINTR
321 : #else
322 0 : errno != EINTR
323 : #endif
324 0 : || !ATOMIC_GET(&serveractive)) {
325 : msg = "accept failed";
326 0 : goto error;
327 : }
328 0 : continue;
329 : }
330 : #if defined(HAVE_FCNTL) && (!defined(SOCK_CLOEXEC) || !defined(HAVE_ACCEPT4))
331 : (void) fcntl(msgsock, F_SETFD, FD_CLOEXEC);
332 : #endif
333 : #ifdef HAVE_SYS_UN_H
334 5455 : if (isusock) {
335 : struct msghdr msgh;
336 : struct iovec iov;
337 : char buf[1];
338 : int rv;
339 : char ccmsg[CMSG_SPACE(sizeof(int))];
340 : struct cmsghdr *cmsg;
341 :
342 : /* BEWARE: unix domain sockets have a slightly different
343 : * behaviour initialy than normal sockets, because we can
344 : * send filedescriptors or credentials with them. To do so,
345 : * we need to use sendmsg/recvmsg, which operates on a bare
346 : * socket. Unfortunately we *have* to send something, so it
347 : * is one byte that can optionally carry the ancillary data.
348 : * This byte is at this moment defined to contain a character:
349 : * '0' - there is no ancillary data
350 : * '1' - ancillary data for passing a file descriptor
351 : * The future may introduce a state for passing credentials.
352 : * Any unknown character must be interpreted as some unknown
353 : * action, and hence not supported by the server. */
354 :
355 174 : iov.iov_base = buf;
356 174 : iov.iov_len = 1;
357 :
358 174 : msgh.msg_name = 0;
359 174 : msgh.msg_namelen = 0;
360 174 : msgh.msg_iov = &iov;
361 174 : msgh.msg_iovlen = 1;
362 174 : msgh.msg_flags = 0;
363 174 : msgh.msg_control = ccmsg;
364 174 : msgh.msg_controllen = sizeof(ccmsg);
365 :
366 174 : rv = recvmsg(msgsock, &msgh, 0);
367 174 : if (rv == -1) {
368 0 : closesocket(msgsock);
369 0 : continue;
370 : }
371 :
372 174 : switch (buf[0]) {
373 : case '0':
374 : /* nothing special, nothing to do */
375 : break;
376 0 : case '1':
377 : { int *c_d;
378 : /* filedescriptor, put it in place of msgsock */
379 0 : cmsg = CMSG_FIRSTHDR(&msgh);
380 0 : (void) shutdown(msgsock, SHUT_WR);
381 0 : closesocket(msgsock);
382 0 : if (!cmsg || cmsg->cmsg_type != SCM_RIGHTS) {
383 0 : TRC_CRITICAL(MAL_SERVER, "Expected file descriptor, but received something else\n");
384 0 : continue;
385 : }
386 : /* HACK to avoid
387 : * "dereferencing type-punned pointer will break strict-aliasing rules"
388 : * (with gcc 4.5.1 on Fedora 14)
389 : */
390 : c_d = (int*)CMSG_DATA(cmsg);
391 0 : msgsock = *c_d;
392 : }
393 0 : break;
394 0 : default:
395 : /* some unknown state */
396 0 : closesocket(msgsock);
397 0 : TRC_CRITICAL(MAL_SERVER, "Unknown command type in first byte\n");
398 0 : continue;
399 : }
400 : }
401 : #endif
402 :
403 5455 : data = GDKzalloc(sizeof(*data));
404 5455 : if( data == NULL){
405 0 : closesocket(msgsock);
406 0 : TRC_ERROR(MAL_SERVER, MAL_MALLOC_FAIL "\n");
407 0 : continue;
408 : }
409 5455 : data->in = socket_rstream(msgsock, "Server read");
410 5455 : if (data->in == NULL) {
411 0 : stream_alloc_fail:
412 0 : mnstr_destroy(data->in);
413 0 : mnstr_destroy(data->out);
414 0 : GDKfree(data);
415 0 : closesocket(msgsock);
416 0 : TRC_ERROR(MAL_SERVER, "Cannot allocate stream: %s\n", mnstr_peek_error(NULL));
417 0 : continue;
418 : }
419 5455 : data->out = socket_wstream(msgsock, "Server write");
420 5455 : if (data->out == NULL) {
421 0 : goto stream_alloc_fail;
422 : }
423 5455 : s = block_stream(data->in);
424 5455 : if (s == NULL) {
425 0 : goto stream_alloc_fail;
426 : }
427 5455 : data->in = s;
428 5455 : s = block_stream(data->out);
429 5455 : if (s == NULL) {
430 0 : goto stream_alloc_fail;
431 : }
432 5455 : data->out = s;
433 : char name[MT_NAME_LEN];
434 5455 : snprintf(name, sizeof(name), "client%d",
435 5455 : (int) ATOMIC_INC(&threadno));
436 :
437 : /* generate the challenge string */
438 5455 : generateChallenge(data->challenge, 8, 12);
439 :
440 5455 : if ((tid = THRcreate(doChallenge, data, MT_THR_DETACHED, name)) == 0) {
441 0 : mnstr_destroy(data->in);
442 0 : mnstr_destroy(data->out);
443 0 : GDKfree(data);
444 0 : closesocket(msgsock);
445 0 : TRC_ERROR(MAL_SERVER, "Cannot fork new client thread\n");
446 0 : continue;
447 : }
448 158092 : } while (!ATOMIC_GET(&serverexiting) && !GDKexiting());
449 0 : error:
450 251 : (void) ATOMIC_DEC(&nlistener);
451 1004 : for (i = 0; i < 3; i++)
452 753 : if (socks[i] != INVALID_SOCKET)
453 502 : closesocket(socks[i]);
454 251 : if (msg)
455 0 : TRC_CRITICAL(MAL_SERVER, "Terminating listener: %s\n", msg);
456 251 : return;
457 : }
458 :
459 : #ifdef _MSC_VER
460 : #define HOSTLEN int
461 : #else
462 : #define HOSTLEN size_t
463 : #endif
464 :
465 : static char *
466 252 : start_listen(SOCKET *sockp, int *portp, const char *listenaddr,
467 : char *host, size_t hostlen, int maxusers)
468 : {
469 252 : struct addrinfo *result = NULL;
470 252 : struct addrinfo hints = {
471 : .ai_family = AF_INET6,
472 : .ai_flags = AI_PASSIVE | AI_NUMERICSERV,
473 : .ai_socktype = SOCK_STREAM,
474 : .ai_protocol = IPPROTO_TCP,
475 : };
476 : int e = 0;
477 252 : int ipv6_vs6only = -1;
478 : SOCKET sock = INVALID_SOCKET;
479 : const char *err;
480 : int nsock = 0;
481 252 : sockp[0] = sockp[1] = INVALID_SOCKET;
482 252 : host[0] = 0;
483 252 : if (listenaddr == NULL || strcmp(listenaddr, "localhost") == 0) {
484 : hints.ai_family = AF_INET6;
485 0 : hints.ai_flags |= AI_NUMERICHOST;
486 0 : ipv6_vs6only = 0;
487 : listenaddr = "::1";
488 0 : strcpy_len(host, "localhost", hostlen);
489 252 : } else if (strcmp(listenaddr, "all") == 0) {
490 : hints.ai_family = AF_INET6;
491 252 : ipv6_vs6only = 0;
492 : listenaddr = NULL;
493 0 : } else if (strcmp(listenaddr, "::") == 0) {
494 : hints.ai_family = AF_INET6;
495 0 : ipv6_vs6only = 1;
496 : listenaddr = NULL;
497 0 : } else if (strcmp(listenaddr, "0.0.0.0") == 0) {
498 0 : hints.ai_family = AF_INET;
499 0 : hints.ai_flags |= AI_NUMERICHOST;
500 : listenaddr = NULL;
501 0 : } else if (strcmp(listenaddr, "::1") == 0) {
502 : hints.ai_family = AF_INET6;
503 0 : hints.ai_flags |= AI_NUMERICHOST;
504 0 : ipv6_vs6only = 1;
505 0 : strcpy_len(host, "localhost", hostlen);
506 0 : } else if (strcmp(listenaddr, "127.0.0.1") == 0) {
507 0 : hints.ai_family = AF_INET;
508 0 : hints.ai_flags |= AI_NUMERICHOST;
509 0 : strcpy_len(host, "localhost", hostlen);
510 : } else {
511 : hints.ai_family = AF_INET6;
512 0 : ipv6_vs6only = 0;
513 : }
514 : char sport[8]; /* max "65535" */
515 252 : snprintf(sport, sizeof(sport), "%d", *portp);
516 : for (;;) { /* max twice */
517 504 : int check = getaddrinfo(listenaddr, sport, &hints, &result);
518 504 : if (check != 0) {
519 : #ifdef _MSC_VER
520 : err = wsaerror(WSAGetLastError());
521 : #else
522 0 : err = gai_strerror(check);
523 : #endif
524 0 : throw(IO, "mal_mapi.listen", OPERATION_FAILED ": cannot get address "
525 : "information for %s and port %s: %s",
526 0 : listenaddr ? listenaddr : hints.ai_family == AF_INET6 ? "::" : "0.0.0.0",
527 : sport, err);
528 : }
529 :
530 756 : for (struct addrinfo *rp = result; rp; rp = rp->ai_next) {
531 504 : sock = socket(rp->ai_family, rp->ai_socktype
532 : #ifdef SOCK_CLOEXEC
533 : | SOCK_CLOEXEC
534 : #endif
535 : , rp->ai_protocol);
536 504 : if (sock == INVALID_SOCKET) {
537 : #ifdef _MSC_VER
538 : e = WSAGetLastError();
539 : #else
540 0 : e = errno;
541 : #endif
542 252 : continue;
543 : }
544 : #if defined(HAVE_FCNTL) && !defined(SOCK_CLOEXEC)
545 : (void) fcntl(sock, F_SETFD, FD_CLOEXEC);
546 : #endif
547 504 : if (ipv6_vs6only >= 0)
548 252 : if (setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY,
549 : (const char *) &ipv6_vs6only, (SOCKLEN) sizeof(int)) == -1)
550 0 : perror("setsockopt IPV6_V6ONLY");
551 :
552 : /* do not reuse addresses for ephemeral (autosense) ports */
553 504 : if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR,
554 504 : (const char *) &(int){1},
555 : (SOCKLEN) sizeof(int)) == SOCKET_ERROR) {
556 : #ifdef _MSC_VER
557 : e = WSAGetLastError();
558 : #else
559 0 : e = errno;
560 : #endif
561 0 : closesocket(sock);
562 : sock = INVALID_SOCKET;
563 0 : continue;
564 : }
565 504 : if (bind(sock, rp->ai_addr, (SOCKLEN) rp->ai_addrlen) == SOCKET_ERROR) {
566 : #ifdef _MSC_VER
567 : e = WSAGetLastError();
568 : #else
569 252 : e = errno;
570 : #endif
571 252 : closesocket(sock);
572 : sock = INVALID_SOCKET;
573 252 : continue;
574 : }
575 252 : if (listen(sock, maxusers) == SOCKET_ERROR) {
576 : #ifdef _MSC_VER
577 : e = WSAGetLastError();
578 : #else
579 0 : e = errno;
580 : #endif
581 0 : closesocket(sock);
582 : sock = INVALID_SOCKET;
583 0 : continue;
584 : }
585 : struct sockaddr_storage addr;
586 252 : SOCKLEN addrlen = (SOCKLEN) sizeof(addr);
587 252 : if (getsockname(sock, (struct sockaddr *) &addr, &addrlen) == SOCKET_ERROR) {
588 : #ifdef _MSC_VER
589 : e = WSAGetLastError();
590 : #else
591 0 : e = errno;
592 : #endif
593 0 : closesocket(sock);
594 : sock = INVALID_SOCKET;
595 0 : continue;
596 : }
597 252 : if (getnameinfo((struct sockaddr *) &addr, addrlen,
598 : NULL, (SOCKLEN) 0,
599 : sport, (SOCKLEN) sizeof(sport),
600 : NI_NUMERICSERV) == 0)
601 252 : *portp = (int) strtol(sport, NULL, 10);
602 252 : sockp[nsock++] = sock;
603 252 : break;
604 : }
605 504 : freeaddrinfo(result);
606 504 : if (ipv6_vs6only == 0) {
607 252 : ipv6_vs6only = -1;
608 252 : hints.ai_family = AF_INET;
609 252 : if (listenaddr && strcmp(listenaddr, "::1") == 0)
610 : listenaddr = "127.0.0.1";
611 : } else
612 : break;
613 : }
614 :
615 252 : if (nsock == 0) {
616 : #ifdef _MSC_VER
617 : err = wsaerror(e);
618 : #else
619 0 : err = GDKstrerror(e, (char[128]){0}, 128);
620 : #endif
621 0 : throw(IO, "mal_mapi.listen", OPERATION_FAILED ": bind to "
622 : "stream socket on address %s and port %s failed: %s",
623 0 : listenaddr ? listenaddr : hints.ai_family == AF_INET6 ? "::" : "0.0.0.0",
624 : sport, err);
625 : }
626 252 : if (host[0] == 0)
627 252 : gethostname(host, (HOSTLEN) hostlen);
628 : return NULL;
629 : }
630 :
631 : static str
632 252 : SERVERlisten(int port, const char *usockfile, int maxusers)
633 : {
634 : SOCKET socks[3];
635 : SOCKET *psock;
636 : #ifdef HAVE_SYS_UN_H
637 : struct sockaddr_un userver;
638 : char *usockfilenew = NULL;
639 : #endif
640 : SOCKLEN length = 0;
641 : MT_Id pid;
642 : str buf;
643 252 : char host[128] = "";
644 :
645 : /* early way out, we do not want to listen on any port when running in embedded mode */
646 252 : if (GDKgetenv_istrue("mapi_disable")) {
647 : return MAL_SUCCEED;
648 : }
649 :
650 252 : const char *listenaddr = port < 0 ? "none" : GDKgetenv("mapi_listenaddr");
651 :
652 252 : if (strNil(usockfile) || *usockfile == '\0') {
653 : usockfile = NULL;
654 : #ifndef HAVE_SYS_UN_H
655 : } else {
656 : throw(IO, "mal_mapi.listen", OPERATION_FAILED ": UNIX domain sockets are not supported");
657 : #endif
658 : }
659 252 : maxusers = (maxusers ? maxusers : SERVERMAXUSERS);
660 :
661 252 : if (listenaddr && strcmp(listenaddr, "none") == 0 && usockfile == NULL) {
662 0 : throw(ILLARG, "mal_mapi.listen", OPERATION_FAILED ": no port or socket file specified");
663 : }
664 :
665 252 : if (port > 65535) {
666 0 : throw(ILLARG, "mal_mapi.listen", OPERATION_FAILED ": port number should be between 0 and 65535");
667 : }
668 :
669 252 : socks[0] = socks[1] = socks[2] = INVALID_SOCKET;
670 :
671 252 : if (listenaddr == NULL || strcmp(listenaddr, "none") != 0) {
672 252 : char *msg = start_listen(socks, &port, listenaddr, host, sizeof(host), maxusers);
673 252 : if (msg != MAL_SUCCEED) {
674 0 : return msg;
675 : }
676 : char sport[10];
677 252 : snprintf(sport, sizeof(sport), "%d", port);
678 252 : GDKsetenv("mapi_port", sport);
679 : }
680 :
681 : #ifdef HAVE_SYS_UN_H
682 252 : if (usockfile) {
683 : /* prevent silent truncation, sun_path is typically around 108
684 : * chars long :/ */
685 252 : size_t ulen = strlen(usockfile);
686 252 : if (ulen >= sizeof(userver.sun_path)) {
687 0 : if (socks[0] != INVALID_SOCKET)
688 0 : closesocket(socks[0]);
689 0 : if (socks[1] != INVALID_SOCKET)
690 0 : closesocket(socks[1]);
691 0 : throw(MAL, "mal_mapi.listen",
692 : OPERATION_FAILED ": UNIX socket path too long: %s",
693 : usockfile);
694 : }
695 :
696 252 : socks[2] = socket(AF_UNIX, SOCK_STREAM
697 : #ifdef SOCK_CLOEXEC
698 : | SOCK_CLOEXEC
699 : #endif
700 : , 0);
701 252 : if (socks[2] == INVALID_SOCKET) {
702 : #ifdef _MSC_VER
703 : const char *err = wsaerror(WSAGetLastError());
704 : #else
705 0 : const char *err = GDKstrerror(errno, (char[128]){0}, 128);
706 : #endif
707 0 : if (socks[0] != INVALID_SOCKET)
708 0 : closesocket(socks[0]);
709 0 : if (socks[1] != INVALID_SOCKET)
710 0 : closesocket(socks[1]);
711 0 : throw(IO, "mal_mapi.listen",
712 : OPERATION_FAILED ": creation of UNIX socket failed: %s", err);
713 : }
714 : #if !defined(SOCK_CLOEXEC) && defined(HAVE_FCNTL)
715 : (void) fcntl(socks[2], F_SETFD, FD_CLOEXEC);
716 : #endif
717 :
718 252 : userver.sun_family = AF_UNIX;
719 : const char *p;
720 252 : if ((p = strstr(usockfile, "${PORT}")) != NULL) {
721 165 : usockfilenew = GDKmalloc(ulen + 1);
722 : /* note, "${PORT}" is longer than the longest possible decimal
723 : * representation of a port number ("65535") */
724 165 : if (usockfilenew) {
725 165 : snprintf(usockfilenew, ulen + 1,
726 165 : "%.*s%d%s", (int) (p - usockfile), usockfile,
727 : port < 0 ? 0 : port, p + 7);
728 : usockfile = usockfilenew;
729 165 : ulen = strlen(usockfile);
730 : }
731 : }
732 252 : memcpy(userver.sun_path, usockfile, ulen + 1);
733 : length = (SOCKLEN) sizeof(userver);
734 252 : if (MT_remove(usockfile) == -1 && errno != ENOENT) {
735 0 : char *e = createException(IO, "mal_mapi.listen", OPERATION_FAILED ": remove UNIX socket file: %s",
736 0 : GDKstrerror(errno, (char[128]){0}, 128));
737 0 : if (socks[0] != INVALID_SOCKET)
738 0 : closesocket(socks[0]);
739 0 : if (socks[1] != INVALID_SOCKET)
740 0 : closesocket(socks[1]);
741 0 : closesocket(socks[2]);
742 0 : if (usockfilenew)
743 0 : GDKfree(usockfilenew);
744 : return e;
745 : }
746 252 : if (bind(socks[2], (struct sockaddr *) &userver, length) == SOCKET_ERROR) {
747 : #ifdef _MSC_VER
748 : const char *err = wsaerror(WSAGetLastError());
749 : #else
750 0 : const char *err = GDKstrerror(errno, (char[128]){0}, 128);
751 : #endif
752 0 : if (socks[0] != INVALID_SOCKET)
753 0 : closesocket(socks[0]);
754 0 : if (socks[1] != INVALID_SOCKET)
755 0 : closesocket(socks[1]);
756 0 : closesocket(socks[2]);
757 : (void) MT_remove(usockfile);
758 0 : buf = createException(IO, "mal_mapi.listen",
759 : OPERATION_FAILED
760 : ": binding to UNIX socket file %s failed: %s",
761 : usockfile, err);
762 0 : if (usockfilenew)
763 0 : GDKfree(usockfilenew);
764 : return buf;
765 : }
766 252 : if (listen(socks[2], maxusers) == SOCKET_ERROR) {
767 : #ifdef _MSC_VER
768 : const char *err = wsaerror(WSAGetLastError());
769 : #else
770 0 : const char *err = GDKstrerror(errno, (char[128]){0}, 128);
771 : #endif
772 0 : if (socks[0] != INVALID_SOCKET)
773 0 : closesocket(socks[0]);
774 0 : if (socks[1] != INVALID_SOCKET)
775 0 : closesocket(socks[1]);
776 0 : closesocket(socks[2]);
777 : (void) MT_remove(usockfile);
778 0 : buf = createException(IO, "mal_mapi.listen",
779 : OPERATION_FAILED
780 : ": setting UNIX socket file %s to listen failed: %s",
781 : usockfile, err);
782 0 : if (usockfilenew)
783 0 : GDKfree(usockfilenew);
784 : return buf;
785 : }
786 252 : GDKsetenv("mapi_usock", usockfile);
787 : }
788 : #endif
789 :
790 : /* seed the randomiser such that our challenges aren't
791 : * predictable... */
792 252 : srand((unsigned int) GDKusec());
793 :
794 252 : psock = GDKmalloc(sizeof(socks));
795 252 : if (psock == NULL) {
796 0 : for (int i = 0; i < 3; i++) {
797 0 : if (socks[i] != INVALID_SOCKET)
798 0 : closesocket(socks[i]);
799 : }
800 0 : throw(MAL,"mal_mapi.listen", SQLSTATE(HY013) MAL_MALLOC_FAIL);
801 : }
802 252 : memcpy(psock, socks, sizeof(socks));
803 252 : if (MT_create_thread(&pid, (void (*)(void *)) SERVERlistenThread, psock,
804 : MT_THR_DETACHED, "listenThread") != 0) {
805 0 : for (int i = 0; i < 3; i++) {
806 0 : if (socks[i] != INVALID_SOCKET)
807 0 : closesocket(socks[i]);
808 : }
809 0 : GDKfree(psock);
810 0 : throw(MAL, "mal_mapi.listen", OPERATION_FAILED ": starting thread failed");
811 : }
812 :
813 252 : TRC_DEBUG(MAL_SERVER, "Ready to accept connections on: %s:%d\n", host, port);
814 :
815 252 : if (socks[0] != INVALID_SOCKET || socks[1] != INVALID_SOCKET) {
816 252 : if (!GDKinmemory(0) && (buf = msab_marchConnection(host, port)) != NULL)
817 0 : free(buf);
818 : else
819 : /* announce that we're now reachable */
820 252 : printf("# Listening for connection requests on "
821 : "mapi:monetdb://%s:%i/\n", host, port);
822 : }
823 : #ifdef HAVE_SYS_UN_H
824 252 : if (socks[2] != INVALID_SOCKET) {
825 252 : if (!GDKinmemory(0) && (buf = msab_marchConnection(usockfile, 0)) != NULL)
826 0 : free(buf);
827 : else
828 : /* announce that we're now reachable */
829 252 : printf("# Listening for UNIX domain connection requests on "
830 : "mapi:monetdb://%s\n", usockfile);
831 : }
832 252 : if (usockfilenew)
833 165 : GDKfree(usockfilenew);
834 : #endif
835 :
836 : return MAL_SUCCEED;
837 : }
838 :
839 : /*
840 : * @- Wrappers
841 : * The MonetDB Version 5 wrappers are collected here
842 : * The latest port known to gain access is stored
843 : * in the database, so that others can more easily
844 : * be notified.
845 : */
846 : static str
847 252 : SERVERlisten_default(int *ret)
848 : {
849 : int port = MAPI_PORT;
850 252 : const char* p = GDKgetenv("mapi_port");
851 :
852 : (void) ret;
853 252 : if (p)
854 252 : port = (int) strtol(p, NULL, 10);
855 252 : p = GDKgetenv("mapi_usock");
856 252 : return SERVERlisten(port, p, SERVERMAXUSERS);
857 : }
858 :
859 : static str
860 0 : SERVERlisten_usock(int *ret, str *usock)
861 : {
862 : (void) ret;
863 0 : return SERVERlisten(-1, usock ? *usock : NULL, SERVERMAXUSERS);
864 : }
865 :
866 : static str
867 0 : SERVERlisten_port(int *ret, int *pid)
868 : {
869 : (void) ret;
870 0 : return SERVERlisten(*pid, NULL, SERVERMAXUSERS);
871 : }
872 : /*
873 : * The internet connection listener may be terminated from the server console,
874 : * or temporarily suspended to enable system maintenance.
875 : * It is advisable to trace the interactions of clients on the server
876 : * side. At least as far as it concerns requests received.
877 : * The kernel supports this 'spying' behavior with a file descriptor
878 : * field in the client record.
879 : */
880 :
881 : static str
882 0 : SERVERstop(void *ret)
883 : {
884 0 : TRC_INFO(MAL_SERVER, "Server stop\n");
885 0 : ATOMIC_SET(&serverexiting, 1);
886 : /* wait until they all exited, but skip the wait if the whole
887 : * system is going down */
888 0 : while (ATOMIC_GET(&nlistener) > 0 && !GDKexiting())
889 0 : MT_sleep_ms(100);
890 : (void) ret; /* fool compiler */
891 0 : return MAL_SUCCEED;
892 : }
893 :
894 :
895 : static str
896 0 : SERVERsuspend(void *res)
897 : {
898 : (void) res;
899 0 : ATOMIC_SET(&serveractive, 0);
900 0 : return MAL_SUCCEED;
901 : }
902 :
903 : static str
904 0 : SERVERresume(void *res)
905 : {
906 0 : ATOMIC_SET(&serveractive, 1);
907 : (void) res;
908 0 : return MAL_SUCCEED;
909 : }
910 :
911 : static str
912 0 : SERVERclient(void *res, const Stream *In, const Stream *Out)
913 : {
914 : struct challengedata *data;
915 : MT_Id tid;
916 :
917 : (void) res;
918 : /* in embedded mode we allow just one client */
919 0 : data = GDKmalloc(sizeof(*data));
920 0 : if( data == NULL)
921 0 : throw(MAL, "mapi.SERVERclient", SQLSTATE(HY013) MAL_MALLOC_FAIL);
922 0 : data->in = block_stream(*In);
923 0 : data->out = block_stream(*Out);
924 0 : if (data->in == NULL || data->out == NULL) {
925 0 : mnstr_destroy(data->in);
926 0 : mnstr_destroy(data->out);
927 0 : GDKfree(data);
928 0 : throw(MAL, "mapi.SERVERclient", SQLSTATE(HY013) MAL_MALLOC_FAIL);
929 : }
930 : char name[MT_NAME_LEN];
931 0 : snprintf(name, sizeof(name), "client%d",
932 0 : (int) ATOMIC_INC(&threadno));
933 :
934 : /* generate the challenge string */
935 0 : generateChallenge(data->challenge, 8, 12);
936 :
937 0 : if ((tid = THRcreate(doChallenge, data, MT_THR_DETACHED, name)) == 0) {
938 0 : mnstr_destroy(data->in);
939 0 : mnstr_destroy(data->out);
940 0 : GDKfree(data);
941 0 : throw(MAL, "mapi.SERVERclient", "cannot fork new client thread");
942 : }
943 : return MAL_SUCCEED;
944 : }
945 :
946 : /*
947 : * @+ Remote Processing
948 : * The remainder of the file contains the wrappers around
949 : * the Mapi library used by application programmers.
950 : * Details on the functions can be found there.
951 : *
952 : * Sessions have a lifetime different from dynamic scopes.
953 : * This means the user should use a session identifier
954 : * to select the correct handle.
955 : * For the time being we use the index in the global
956 : * session table. The client pointer is retained to
957 : * perform access control.
958 : *
959 : * We use a single result set handle. All data should be
960 : * consumed before continueing.
961 : *
962 : * A few extra routines should be defined to
963 : * dump and inspect the sessions table.
964 : *
965 : * The remote site may return a single error
966 : * with a series of error lines. These contain
967 : * then a starting !. They are all stripped here.
968 : */
969 : #define catchErrors(fcn) \
970 : do { \
971 : int rn = mapi_error(mid); \
972 : if ((rn == -4 && hdl && mapi_result_error(hdl)) || rn) { \
973 : const char *err, *e; \
974 : str newerr; \
975 : str ret; \
976 : size_t l; \
977 : char *f; \
978 : \
979 : if (hdl && mapi_result_error(hdl)) \
980 : err = mapi_result_error(hdl); \
981 : else \
982 : err = mapi_result_error(SERVERsessions[i].hdl); \
983 : \
984 : if (err == NULL) \
985 : err = "(no additional error message)"; \
986 : \
987 : l = 2 * strlen(err) + 8192; \
988 : newerr = (str) GDKmalloc(l); \
989 : if(newerr == NULL) { err = SQLSTATE(HY013) MAL_MALLOC_FAIL; break;} \
990 : \
991 : f = newerr; \
992 : /* I think this code tries to deal with multiple errors, this \
993 : * will fail this way if it does, since no ! is in the error \
994 : * string, only newlines to separate them */ \
995 : for (e = err; *e && l > 1; e++) { \
996 : if (*e == '!' && *(e - 1) == '\n') { \
997 : snprintf(f, l, "MALException:" fcn ":remote error:"); \
998 : l -= strlen(f); \
999 : while (*f) \
1000 : f++; \
1001 : } else { \
1002 : *f++ = *e; \
1003 : l--; \
1004 : } \
1005 : } \
1006 : \
1007 : *f = 0; \
1008 : ret = createException(MAL, fcn, \
1009 : OPERATION_FAILED ": remote error: %s", \
1010 : newerr); \
1011 : GDKfree(newerr); \
1012 : return ret; \
1013 : } \
1014 : } while (0)
1015 :
1016 : #define MAXSESSIONS 32
1017 : struct{
1018 : int key;
1019 : str dbalias; /* logical name of the session */
1020 : Client c;
1021 : Mapi mid; /* communication channel */
1022 : MapiHdl hdl; /* result set handle */
1023 : } SERVERsessions[MAXSESSIONS];
1024 :
1025 : static int sessionkey=0;
1026 :
1027 : /* #define MAPI_TEST*/
1028 :
1029 : static str
1030 6 : SERVERconnectAll(Client cntxt, int *key, str *host, int *port, str *username, str *password, str *lang)
1031 : {
1032 : Mapi mid;
1033 : int i;
1034 :
1035 6 : MT_lock_set(&mal_contextLock);
1036 6 : for(i=1; i< MAXSESSIONS; i++)
1037 6 : if( SERVERsessions[i].c ==0 ) break;
1038 :
1039 6 : if( i==MAXSESSIONS){
1040 0 : MT_lock_unset(&mal_contextLock);
1041 0 : throw(IO, "mapi.connect", OPERATION_FAILED ": too many sessions");
1042 : }
1043 6 : SERVERsessions[i].c= cntxt;
1044 6 : SERVERsessions[i].key= ++sessionkey;
1045 6 : MT_lock_unset(&mal_contextLock);
1046 :
1047 6 : mid = mapi_connect(*host, *port, *username, *password, *lang, NULL);
1048 :
1049 6 : if (mapi_error(mid)) {
1050 0 : const char *err = mapi_error_str(mid);
1051 : str ex;
1052 0 : if (err == NULL)
1053 : err = "(no reason given)";
1054 0 : if (err[0] == '!')
1055 0 : err = err + 1;
1056 0 : SERVERsessions[i].c = NULL;
1057 0 : ex = createException(IO, "mapi.connect", "Could not connect: %s", err);
1058 0 : mapi_destroy(mid);
1059 0 : return(ex);
1060 : }
1061 :
1062 : #ifdef MAPI_TEST
1063 : mnstr_printf(SERVERsessions[i].c->fdout,"Succeeded to establish session\n");
1064 : #endif
1065 6 : SERVERsessions[i].mid= mid;
1066 6 : *key = SERVERsessions[i].key;
1067 6 : return MAL_SUCCEED;
1068 : }
1069 :
1070 : static str
1071 0 : SERVERdisconnectALL(int *key){
1072 : int i;
1073 :
1074 0 : MT_lock_set(&mal_contextLock);
1075 :
1076 0 : for(i=1; i< MAXSESSIONS; i++)
1077 0 : if( SERVERsessions[i].c != 0 ) {
1078 : #ifdef MAPI_TEST
1079 : mnstr_printf(SERVERsessions[i].c->fdout,"Close session %d\n",i);
1080 : #endif
1081 0 : SERVERsessions[i].c = 0;
1082 0 : if( SERVERsessions[i].dbalias)
1083 0 : GDKfree(SERVERsessions[i].dbalias);
1084 0 : SERVERsessions[i].dbalias = NULL;
1085 0 : *key = SERVERsessions[i].key;
1086 0 : if( SERVERsessions[i].hdl)
1087 0 : mapi_close_handle(SERVERsessions[i].hdl);
1088 0 : SERVERsessions[i].hdl= NULL;
1089 0 : mapi_disconnect(SERVERsessions[i].mid);
1090 : }
1091 :
1092 0 : MT_lock_unset(&mal_contextLock);
1093 :
1094 0 : return MAL_SUCCEED;
1095 : }
1096 :
1097 : static str
1098 0 : SERVERdisconnectWithAlias(int *key, str *dbalias){
1099 : int i;
1100 :
1101 0 : MT_lock_set(&mal_contextLock);
1102 :
1103 0 : for(i=0; i<MAXSESSIONS; i++)
1104 0 : if( SERVERsessions[i].dbalias &&
1105 0 : strcmp(SERVERsessions[i].dbalias, *dbalias)==0){
1106 0 : SERVERsessions[i].c = 0;
1107 : if( SERVERsessions[i].dbalias)
1108 0 : GDKfree(SERVERsessions[i].dbalias);
1109 0 : SERVERsessions[i].dbalias = NULL;
1110 0 : *key = SERVERsessions[i].key;
1111 0 : if( SERVERsessions[i].hdl)
1112 0 : mapi_close_handle(SERVERsessions[i].hdl);
1113 0 : SERVERsessions[i].hdl= NULL;
1114 0 : mapi_disconnect(SERVERsessions[i].mid);
1115 0 : break;
1116 : }
1117 :
1118 0 : if( i==MAXSESSIONS){
1119 0 : MT_lock_unset(&mal_contextLock);
1120 0 : throw(IO, "mapi.disconnect", "Impossible to close session for db_alias: '%s'", *dbalias);
1121 : }
1122 :
1123 0 : MT_lock_unset(&mal_contextLock);
1124 0 : return MAL_SUCCEED;
1125 : }
1126 :
1127 : static str
1128 1 : SERVERconnect(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci){
1129 1 : int *key =getArgReference_int(stk,pci,0);
1130 1 : str *host = getArgReference_str(stk,pci,1);
1131 1 : int *port = getArgReference_int(stk,pci,2);
1132 1 : str *username = getArgReference_str(stk,pci,3);
1133 1 : str *password= getArgReference_str(stk,pci,4);
1134 1 : str *lang = getArgReference_str(stk,pci,5);
1135 :
1136 : (void) mb;
1137 1 : return SERVERconnectAll(cntxt, key,host,port,username,password,lang);
1138 : }
1139 :
1140 :
1141 : static str
1142 5 : SERVERreconnectAlias(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
1143 : {
1144 5 : int *key =getArgReference_int(stk,pci,0);
1145 5 : str *host = getArgReference_str(stk,pci,1);
1146 5 : int *port = getArgReference_int(stk,pci,2);
1147 5 : str *dbalias = getArgReference_str(stk,pci,3);
1148 5 : str *username = getArgReference_str(stk,pci,4);
1149 5 : str *password= getArgReference_str(stk,pci,5);
1150 5 : str *lang = getArgReference_str(stk,pci,6);
1151 : int i;
1152 : str msg=MAL_SUCCEED;
1153 :
1154 : (void) mb;
1155 :
1156 165 : for(i=0; i<MAXSESSIONS; i++)
1157 160 : if( SERVERsessions[i].key &&
1158 5 : SERVERsessions[i].dbalias &&
1159 0 : strcmp(SERVERsessions[i].dbalias, *dbalias)==0){
1160 0 : *key = SERVERsessions[i].key;
1161 0 : return msg;
1162 : }
1163 :
1164 5 : msg= SERVERconnectAll(cntxt, key, host, port, username, password, lang);
1165 5 : if( msg == MAL_SUCCEED)
1166 5 : msg = SERVERsetAlias(NULL, key, dbalias);
1167 : return msg;
1168 : }
1169 :
1170 : static str
1171 0 : SERVERreconnectWithoutAlias(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) {
1172 0 : int *key =getArgReference_int(stk,pci,0);
1173 0 : str *host = getArgReference_str(stk,pci,1);
1174 0 : int *port = getArgReference_int(stk,pci,2);
1175 0 : str *username = getArgReference_str(stk,pci,3);
1176 0 : str *password= getArgReference_str(stk,pci,4);
1177 0 : str *lang = getArgReference_str(stk,pci,5);
1178 : int i;
1179 0 : str msg=MAL_SUCCEED, nme= "anonymous";
1180 :
1181 : (void) mb;
1182 :
1183 0 : for(i=0; i<MAXSESSIONS; i++)
1184 0 : if( SERVERsessions[i].key ){
1185 0 : *key = SERVERsessions[i].key;
1186 0 : return msg;
1187 : }
1188 :
1189 0 : msg= SERVERconnectAll(cntxt, key, host, port, username, password, lang);
1190 0 : if( msg == MAL_SUCCEED)
1191 0 : msg = SERVERsetAlias(NULL, key, &nme);
1192 : return msg;
1193 : }
1194 :
1195 : #define accessTest(val, fcn) \
1196 : do { \
1197 : for(i=0; i< MAXSESSIONS; i++) \
1198 : if( SERVERsessions[i].c && \
1199 : SERVERsessions[i].key== (val)) break; \
1200 : if( i== MAXSESSIONS) \
1201 : throw(MAL, "mapi." fcn, "Access violation," \
1202 : " could not find matching session descriptor"); \
1203 : mid= SERVERsessions[i].mid; \
1204 : (void) mid; /* silence compilers */ \
1205 : } while (0)
1206 :
1207 : static str
1208 5 : SERVERsetAlias(void *ret, int *key, str *dbalias){
1209 : int i;
1210 : Mapi mid;
1211 10 : accessTest(*key, "setAlias");
1212 5 : SERVERsessions[i].dbalias= GDKstrdup(*dbalias);
1213 5 : if(SERVERsessions[i].dbalias == NULL)
1214 0 : throw(MAL, "mapi.set_alias", SQLSTATE(HY013) MAL_MALLOC_FAIL);
1215 : (void) ret;
1216 : return MAL_SUCCEED;
1217 : }
1218 :
1219 : static str
1220 0 : SERVERlookup(int *ret, str *dbalias)
1221 : {
1222 : int i;
1223 0 : for(i=0; i< MAXSESSIONS; i++)
1224 0 : if( SERVERsessions[i].dbalias &&
1225 0 : strcmp(SERVERsessions[i].dbalias, *dbalias)==0){
1226 0 : *ret= SERVERsessions[i].key;
1227 0 : return MAL_SUCCEED;
1228 : }
1229 0 : throw(MAL, "mapi.lookup", "Could not find database connection");
1230 : }
1231 :
1232 : static str
1233 0 : SERVERtrace(void *ret, int *key, int *flag){
1234 : (void )ret;
1235 0 : mapi_trace(SERVERsessions[*key].mid,(bool)*flag);
1236 0 : return MAL_SUCCEED;
1237 : }
1238 :
1239 : static str
1240 0 : SERVERdisconnect(void *ret, int *key){
1241 : int i;
1242 : Mapi mid;
1243 : (void) ret;
1244 0 : accessTest(*key, "disconnect");
1245 0 : if( SERVERsessions[i].hdl)
1246 0 : mapi_close_handle(SERVERsessions[i].hdl);
1247 0 : SERVERsessions[i].hdl= NULL;
1248 0 : mapi_disconnect(mid);
1249 0 : if( SERVERsessions[i].dbalias)
1250 0 : GDKfree(SERVERsessions[i].dbalias);
1251 0 : SERVERsessions[i].c= 0;
1252 0 : SERVERsessions[i].dbalias= 0;
1253 0 : return MAL_SUCCEED;
1254 : }
1255 :
1256 : static str
1257 6 : SERVERdestroy(void *ret, int *key){
1258 : int i;
1259 : Mapi mid;
1260 : (void) ret;
1261 12 : accessTest(*key, "destroy");
1262 6 : if( SERVERsessions[i].hdl)
1263 4 : mapi_close_handle(SERVERsessions[i].hdl);
1264 6 : SERVERsessions[i].hdl= NULL;
1265 6 : mapi_disconnect(mid);
1266 6 : mapi_destroy(mid);
1267 6 : SERVERsessions[i].c= 0;
1268 6 : if( SERVERsessions[i].dbalias)
1269 5 : GDKfree(SERVERsessions[i].dbalias);
1270 6 : SERVERsessions[i].dbalias= 0;
1271 6 : return MAL_SUCCEED;
1272 : }
1273 :
1274 : static str
1275 0 : SERVERreconnect(void *ret, int *key){
1276 : int i;
1277 : Mapi mid;
1278 : (void) ret;
1279 0 : accessTest(*key, "destroy");
1280 0 : if( SERVERsessions[i].hdl)
1281 0 : mapi_close_handle(SERVERsessions[i].hdl);
1282 0 : SERVERsessions[i].hdl= NULL;
1283 0 : mapi_reconnect(mid);
1284 0 : return MAL_SUCCEED;
1285 : }
1286 :
1287 : static str
1288 0 : SERVERping(int *ret, int *key){
1289 : int i;
1290 : Mapi mid;
1291 0 : accessTest(*key, "destroy");
1292 0 : *ret= mapi_ping(mid);
1293 0 : return MAL_SUCCEED;
1294 : }
1295 :
1296 : static str
1297 23 : SERVERquery(int *ret, int *key, str *qry){
1298 : Mapi mid;
1299 : MapiHdl hdl=0;
1300 : int i;
1301 46 : accessTest(*key, "query");
1302 23 : if( SERVERsessions[i].hdl)
1303 19 : mapi_close_handle(SERVERsessions[i].hdl);
1304 23 : SERVERsessions[i].hdl = mapi_query(mid, *qry);
1305 23 : catchErrors("mapi.query");
1306 23 : *ret = *key;
1307 23 : return MAL_SUCCEED;
1308 : }
1309 :
1310 : static str
1311 0 : SERVERquery_handle(int *ret, int *key, str *qry){
1312 : Mapi mid;
1313 : MapiHdl hdl=0;
1314 : int i;
1315 0 : accessTest(*key, "query_handle");
1316 0 : mapi_query_handle(SERVERsessions[i].hdl, *qry);
1317 0 : catchErrors("mapi.query_handle");
1318 0 : *ret = *key;
1319 0 : return MAL_SUCCEED;
1320 : }
1321 :
1322 : static str
1323 0 : SERVERquery_array(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pc){
1324 : (void)cntxt, (void) mb; (void) stk; (void) pc;
1325 0 : throw(MAL, "mapi.query_array", SQLSTATE(0A000) PROGRAM_NYI);
1326 : }
1327 :
1328 : static str
1329 0 : SERVERprepare(int *ret, int *key, str *qry){
1330 : Mapi mid;
1331 : int i;
1332 0 : accessTest(*key, "prepare");
1333 0 : if( SERVERsessions[i].hdl)
1334 0 : mapi_close_handle(SERVERsessions[i].hdl);
1335 0 : SERVERsessions[i].hdl= mapi_prepare(mid, *qry);
1336 0 : if( mapi_error(mid) )
1337 0 : throw(MAL, "mapi.prepare", "%s",
1338 : mapi_result_error(SERVERsessions[i].hdl));
1339 0 : *ret = *key;
1340 0 : return MAL_SUCCEED;
1341 : }
1342 :
1343 : static str
1344 0 : SERVERfinish(int *ret, int *key){
1345 : Mapi mid;
1346 : int i;
1347 0 : accessTest(*key, "finish");
1348 0 : mapi_finish(SERVERsessions[i].hdl);
1349 0 : if( mapi_error(mid) )
1350 0 : throw(MAL, "mapi.finish", "%s",
1351 : mapi_result_error(SERVERsessions[i].hdl));
1352 0 : *ret = *key;
1353 0 : return MAL_SUCCEED;
1354 : }
1355 :
1356 : static str
1357 1 : SERVERget_row_count(lng *ret, int *key){
1358 : Mapi mid;
1359 : int i;
1360 2 : accessTest(*key, "get_row_count");
1361 1 : *ret= (lng) mapi_get_row_count(SERVERsessions[i].hdl);
1362 1 : if( mapi_error(mid) )
1363 0 : throw(MAL, "mapi.get_row_count", "%s",
1364 : mapi_result_error(SERVERsessions[i].hdl));
1365 : return MAL_SUCCEED;
1366 : }
1367 :
1368 : static str
1369 1 : SERVERget_field_count(int *ret, int *key){
1370 : Mapi mid;
1371 : int i;
1372 2 : accessTest(*key, "get_field_count");
1373 1 : *ret= mapi_get_field_count(SERVERsessions[i].hdl);
1374 1 : if( mapi_error(mid) )
1375 0 : throw(MAL, "mapi.get_field_count", "%s",
1376 : mapi_result_error(SERVERsessions[i].hdl));
1377 : return MAL_SUCCEED;
1378 : }
1379 :
1380 : static str
1381 0 : SERVERrows_affected(lng *ret, int *key){
1382 : Mapi mid;
1383 : int i;
1384 0 : accessTest(*key, "rows_affected");
1385 0 : *ret= (lng) mapi_rows_affected(SERVERsessions[i].hdl);
1386 0 : return MAL_SUCCEED;
1387 : }
1388 :
1389 : static str
1390 1 : SERVERfetch_row(int *ret, int *key){
1391 : Mapi mid;
1392 : int i;
1393 2 : accessTest(*key, "fetch_row");
1394 1 : *ret= mapi_fetch_row(SERVERsessions[i].hdl);
1395 1 : return MAL_SUCCEED;
1396 : }
1397 :
1398 : static str
1399 0 : SERVERfetch_all_rows(lng *ret, int *key){
1400 : Mapi mid;
1401 : int i;
1402 0 : accessTest(*key, "fetch_all_rows");
1403 0 : *ret= (lng) mapi_fetch_all_rows(SERVERsessions[i].hdl);
1404 0 : return MAL_SUCCEED;
1405 : }
1406 :
1407 : static str
1408 1 : SERVERfetch_field_str(str *ret, int *key, int *fnr){
1409 : Mapi mid;
1410 : int i;
1411 : str fld;
1412 2 : accessTest(*key, "fetch_field");
1413 1 : fld= mapi_fetch_field(SERVERsessions[i].hdl,*fnr);
1414 1 : *ret= GDKstrdup(fld? fld: str_nil);
1415 1 : if(*ret == NULL)
1416 0 : throw(MAL, "mapi.fetch_field_str", SQLSTATE(HY013) MAL_MALLOC_FAIL);
1417 1 : if( mapi_error(mid) )
1418 0 : throw(MAL, "mapi.fetch_field_str", "%s",
1419 : mapi_result_error(SERVERsessions[i].hdl));
1420 : return MAL_SUCCEED;
1421 : }
1422 :
1423 : static str
1424 1 : SERVERfetch_field_int(int *ret, int *key, int *fnr){
1425 : Mapi mid;
1426 : int i;
1427 : str fld;
1428 2 : accessTest(*key, "fetch_field");
1429 1 : fld= mapi_fetch_field(SERVERsessions[i].hdl,*fnr);
1430 2 : *ret= fld? (int) atol(fld): int_nil;
1431 1 : if( mapi_error(mid) )
1432 0 : throw(MAL, "mapi.fetch_field_int", "%s",
1433 : mapi_result_error(SERVERsessions[i].hdl));
1434 : return MAL_SUCCEED;
1435 : }
1436 :
1437 : static str
1438 0 : SERVERfetch_field_lng(lng *ret, int *key, int *fnr){
1439 : Mapi mid;
1440 : int i;
1441 : str fld;
1442 0 : accessTest(*key, "fetch_field");
1443 0 : fld= mapi_fetch_field(SERVERsessions[i].hdl,*fnr);
1444 0 : *ret= fld? atol(fld): lng_nil;
1445 0 : if( mapi_error(mid) )
1446 0 : throw(MAL, "mapi.fetch_field_lng", "%s",
1447 : mapi_result_error(SERVERsessions[i].hdl));
1448 : return MAL_SUCCEED;
1449 : }
1450 :
1451 : #ifdef HAVE_HGE
1452 : static str
1453 0 : SERVERfetch_field_hge(hge *ret, int *key, int *fnr){
1454 : Mapi mid;
1455 : int i;
1456 : str fld;
1457 0 : accessTest(*key, "fetch_field");
1458 0 : fld= mapi_fetch_field(SERVERsessions[i].hdl,*fnr);
1459 0 : *ret= fld? atol(fld): hge_nil;
1460 0 : if( mapi_error(mid) )
1461 0 : throw(MAL, "mapi.fetch_field_hge", "%s",
1462 : mapi_result_error(SERVERsessions[i].hdl));
1463 : return MAL_SUCCEED;
1464 : }
1465 : #endif
1466 :
1467 : static str
1468 0 : SERVERfetch_field_sht(sht *ret, int *key, int *fnr){
1469 : Mapi mid;
1470 : int i;
1471 : str fld;
1472 0 : accessTest(*key, "fetch_field");
1473 0 : fld= mapi_fetch_field(SERVERsessions[i].hdl,*fnr);
1474 0 : *ret= fld? (sht) atol(fld): sht_nil;
1475 0 : if( mapi_error(mid) )
1476 0 : throw(MAL, "mapi.fetch_field", "%s",
1477 : mapi_result_error(SERVERsessions[i].hdl));
1478 : return MAL_SUCCEED;
1479 : }
1480 :
1481 : static str
1482 0 : SERVERfetch_field_void(void *ret, int *key, int *fnr){
1483 : Mapi mid;
1484 : int i;
1485 : (void) ret;
1486 : (void) fnr;
1487 0 : accessTest(*key, "fetch_field");
1488 0 : throw(MAL, "mapi.fetch_field_void","defaults to nil");
1489 : }
1490 :
1491 : static str
1492 0 : SERVERfetch_field_oid(oid *ret, int *key, int *fnr){
1493 : Mapi mid;
1494 : int i;
1495 : str fld;
1496 0 : accessTest(*key, "fetch_field");
1497 0 : fld= mapi_fetch_field(SERVERsessions[i].hdl,*fnr);
1498 0 : if( mapi_error(mid) )
1499 0 : throw(MAL, "mapi.fetch_field_oid", "%s",
1500 : mapi_result_error(SERVERsessions[i].hdl));
1501 0 : if(fld==0 || strcmp(fld,"nil")==0)
1502 0 : *(oid*) ret= void_nil;
1503 0 : else *(oid*) ret = (oid) atol(fld);
1504 : return MAL_SUCCEED;
1505 : }
1506 :
1507 : static str
1508 0 : SERVERfetch_field_bte(bte *ret, int *key, int *fnr){
1509 : Mapi mid;
1510 : int i;
1511 : str fld;
1512 0 : accessTest(*key, "fetch_field");
1513 0 : fld= mapi_fetch_field(SERVERsessions[i].hdl,*fnr);
1514 0 : if( mapi_error(mid) )
1515 0 : throw(MAL, "mapi.fetch_field_bte", "%s",
1516 : mapi_result_error(SERVERsessions[i].hdl));
1517 0 : if(fld==0 || strcmp(fld,"nil")==0)
1518 0 : *(bte*) ret= bte_nil;
1519 0 : else *(bte*) ret = *fld;
1520 : return MAL_SUCCEED;
1521 : }
1522 :
1523 : static str
1524 0 : SERVERfetch_line(str *ret, int *key){
1525 : Mapi mid;
1526 : int i;
1527 : str fld;
1528 0 : accessTest(*key, "fetch_line");
1529 0 : fld= mapi_fetch_line(SERVERsessions[i].hdl);
1530 0 : if( mapi_error(mid) )
1531 0 : throw(MAL, "mapi.fetch_line", "%s",
1532 : mapi_result_error(SERVERsessions[i].hdl));
1533 0 : *ret= GDKstrdup(fld? fld:str_nil);
1534 0 : if(*ret == NULL)
1535 0 : throw(MAL, "mapi.fetch_line", SQLSTATE(HY013) MAL_MALLOC_FAIL);
1536 : return MAL_SUCCEED;
1537 : }
1538 :
1539 : static str
1540 0 : SERVERnext_result(int *ret, int *key){
1541 : Mapi mid;
1542 : int i;
1543 0 : accessTest(*key, "next_result");
1544 0 : mapi_next_result(SERVERsessions[i].hdl);
1545 0 : if( mapi_error(mid) )
1546 0 : throw(MAL, "mapi.next_result", "%s",
1547 : mapi_result_error(SERVERsessions[i].hdl));
1548 0 : *ret= *key;
1549 0 : return MAL_SUCCEED;
1550 : }
1551 :
1552 : static str
1553 0 : SERVERfetch_reset(int *ret, int *key){
1554 : Mapi mid;
1555 : int i;
1556 0 : accessTest(*key, "fetch_reset");
1557 0 : mapi_fetch_reset(SERVERsessions[i].hdl);
1558 0 : if( mapi_error(mid) )
1559 0 : throw(MAL, "mapi.fetch_reset", "%s",
1560 : mapi_result_error(SERVERsessions[i].hdl));
1561 0 : *ret= *key;
1562 0 : return MAL_SUCCEED;
1563 : }
1564 :
1565 : static str
1566 0 : SERVERfetch_field_bat(bat *bid, int *key){
1567 : int i,j,cnt;
1568 : Mapi mid;
1569 : char *fld;
1570 : BAT *b;
1571 :
1572 0 : accessTest(*key, "rpc");
1573 0 : b= COLnew(0,TYPE_str,256, TRANSIENT);
1574 0 : if( b == NULL)
1575 0 : throw(MAL,"mapi.fetch", SQLSTATE(HY013) MAL_MALLOC_FAIL);
1576 0 : cnt= mapi_get_field_count(SERVERsessions[i].hdl);
1577 0 : for(j=0; j< cnt; j++){
1578 0 : fld= mapi_fetch_field(SERVERsessions[i].hdl,j);
1579 0 : if( mapi_error(mid) ) {
1580 0 : BBPreclaim(b);
1581 0 : throw(MAL, "mapi.fetch_field_bat", "%s",
1582 : mapi_result_error(SERVERsessions[i].hdl));
1583 : }
1584 0 : if (BUNappend(b,fld, false) != GDK_SUCCEED) {
1585 0 : BBPreclaim(b);
1586 0 : throw(MAL, "mapi.fetch_field_bat", SQLSTATE(HY013) MAL_MALLOC_FAIL);
1587 : }
1588 : }
1589 0 : *bid = b->batCacheid;
1590 0 : BBPkeepref(*bid);
1591 0 : return MAL_SUCCEED;
1592 : }
1593 :
1594 : static str
1595 0 : SERVERerror(int *ret, int *key){
1596 : Mapi mid;
1597 : int i;
1598 0 : accessTest(*key, "error");
1599 0 : *ret= mapi_error(mid);
1600 0 : return MAL_SUCCEED;
1601 : }
1602 :
1603 : static str
1604 0 : SERVERgetError(str *ret, int *key){
1605 : Mapi mid;
1606 : int i;
1607 0 : accessTest(*key, "getError");
1608 0 : *ret= GDKstrdup(mapi_error_str(mid));
1609 0 : if(*ret == NULL)
1610 0 : throw(MAL, "mapi.get_error", SQLSTATE(HY013) MAL_MALLOC_FAIL);
1611 : return MAL_SUCCEED;
1612 : }
1613 :
1614 : static str
1615 0 : SERVERexplain(str *ret, int *key){
1616 : Mapi mid;
1617 : int i;
1618 :
1619 0 : accessTest(*key, "explain");
1620 0 : *ret= GDKstrdup(mapi_error_str(mid));
1621 0 : if(*ret == NULL)
1622 0 : throw(MAL, "mapi.explain", SQLSTATE(HY013) MAL_MALLOC_FAIL);
1623 : return MAL_SUCCEED;
1624 : }
1625 : /*
1626 : * The remainder should contain the wrapping of
1627 : * relevant SERVER functions. Furthermore, we
1628 : * should analyse the return value and update
1629 : * the stack trace.
1630 : *
1631 : * Two routines should be
1632 : * mapi.rpc(key,"query")
1633 : *
1634 : * The generic scheme for handling a remote MAL
1635 : * procedure call with a single row answer.
1636 : */
1637 18 : static int SERVERfieldAnalysis(str fld, int tpe, ValPtr v){
1638 18 : v->vtype= tpe;
1639 18 : switch(tpe){
1640 0 : case TYPE_void:
1641 0 : v->val.oval = void_nil;
1642 0 : break;
1643 2 : case TYPE_oid:
1644 2 : if(fld==0 || strcmp(fld,"nil")==0)
1645 1 : v->val.oval= void_nil;
1646 1 : else v->val.oval = (oid) atol(fld);
1647 : break;
1648 0 : case TYPE_bit:
1649 0 : if(fld== 0 || strcmp(fld,"nil")==0)
1650 0 : v->val.btval= bit_nil;
1651 : else
1652 0 : if(strcmp(fld,"true")==0)
1653 0 : v->val.btval= TRUE;
1654 : else
1655 0 : if(strcmp(fld,"false")==0)
1656 0 : v->val.btval= FALSE;
1657 : break;
1658 0 : case TYPE_bte:
1659 0 : if(fld==0 || strcmp(fld,"nil")==0)
1660 0 : v->val.btval= bte_nil;
1661 : else
1662 0 : v->val.btval= *fld;
1663 : break;
1664 0 : case TYPE_sht:
1665 0 : if(fld==0 || strcmp(fld,"nil")==0)
1666 0 : v->val.shval = sht_nil;
1667 0 : else v->val.shval= (sht) atol(fld);
1668 : break;
1669 12 : case TYPE_int:
1670 12 : if(fld==0 || strcmp(fld,"nil")==0)
1671 1 : v->val.ival = int_nil;
1672 11 : else v->val.ival= (int) atol(fld);
1673 : break;
1674 2 : case TYPE_lng:
1675 2 : if(fld==0 || strcmp(fld,"nil")==0)
1676 0 : v->val.lval= lng_nil;
1677 2 : else v->val.lval= (lng) atol(fld);
1678 : break;
1679 : #ifdef HAVE_HGE
1680 0 : case TYPE_hge:
1681 0 : if(fld==0 || strcmp(fld,"nil")==0)
1682 0 : v->val.hval= hge_nil;
1683 0 : else v->val.hval= (hge) atol(fld);
1684 : break;
1685 : #endif
1686 0 : case TYPE_flt:
1687 0 : if(fld==0 || strcmp(fld,"nil")==0)
1688 0 : v->val.fval= flt_nil;
1689 0 : else v->val.fval= (flt) atof(fld);
1690 : break;
1691 0 : case TYPE_dbl:
1692 0 : if(fld==0 || strcmp(fld,"nil")==0)
1693 0 : v->val.dval= dbl_nil;
1694 0 : else v->val.dval= (dbl) atof(fld);
1695 : break;
1696 2 : case TYPE_str:
1697 2 : if(fld==0 || strcmp(fld,"nil")==0){
1698 0 : if (VALinit(v, TYPE_str, str_nil) == NULL)
1699 0 : return -1;
1700 : } else {
1701 2 : if (VALinit(v, TYPE_str, fld) == NULL)
1702 0 : return -1;
1703 : }
1704 : break;
1705 : }
1706 : return 0;
1707 : }
1708 :
1709 : static str
1710 6 : SERVERmapi_rpc_single_row(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
1711 : {
1712 : int key,i,j;
1713 : Mapi mid;
1714 : MapiHdl hdl;
1715 : char *s,*fld, *qry=0;
1716 :
1717 : (void) cntxt;
1718 6 : key= * getArgReference_int(stk,pci,pci->retc);
1719 12 : accessTest(key, "rpc");
1720 : #ifdef MAPI_TEST
1721 : mnstr_printf(cntxt->fdout,"about to send: %s\n",qry);
1722 : #endif
1723 : /* glue all strings together */
1724 12 : for(i= pci->retc+1; i<pci->argc; i++){
1725 6 : fld= * getArgReference_str(stk,pci,i);
1726 6 : if( qry == 0) {
1727 6 : qry= GDKstrdup(fld);
1728 6 : if ( qry == NULL)
1729 0 : throw(MAL, "mapi.rpc",SQLSTATE(HY013) MAL_MALLOC_FAIL);
1730 : } else {
1731 0 : s= (char*) GDKmalloc(strlen(qry)+strlen(fld)+1);
1732 0 : if ( s == NULL) {
1733 0 : GDKfree(qry);
1734 0 : throw(MAL, "mapi.rpc", SQLSTATE(HY013) MAL_MALLOC_FAIL);
1735 : }
1736 0 : strcpy(s,qry);
1737 0 : strcat(s,fld);
1738 0 : GDKfree(qry);
1739 : qry= s;
1740 : }
1741 : }
1742 6 : hdl= mapi_query(mid, qry);
1743 6 : GDKfree(qry);
1744 6 : catchErrors("mapi.rpc");
1745 :
1746 : i= 0;
1747 12 : while( mapi_fetch_row(hdl)){
1748 12 : for(j=0; j<pci->retc; j++){
1749 6 : fld= mapi_fetch_field(hdl,j);
1750 : #ifdef MAPI_TEST
1751 : mnstr_printf(cntxt->fdout,"Got: %s\n",fld);
1752 : #endif
1753 6 : switch(getVarType(mb,getArg(pci,j)) ){
1754 6 : case TYPE_void:
1755 : case TYPE_oid:
1756 : case TYPE_bit:
1757 : case TYPE_bte:
1758 : case TYPE_sht:
1759 : case TYPE_int:
1760 : case TYPE_lng:
1761 : #ifdef HAVE_HGE
1762 : case TYPE_hge:
1763 : #endif
1764 : case TYPE_flt:
1765 : case TYPE_dbl:
1766 : case TYPE_str:
1767 6 : if(SERVERfieldAnalysis(fld,getVarType(mb,getArg(pci,j)),&stk->stk[pci->argv[j]]) < 0) {
1768 0 : mapi_close_handle(hdl);
1769 0 : throw(MAL, "mapi.rpc", SQLSTATE(HY013) MAL_MALLOC_FAIL);
1770 : }
1771 : break;
1772 0 : default:
1773 0 : mapi_close_handle(hdl);
1774 0 : throw(MAL, "mapi.rpc",
1775 : "Missing type implementation ");
1776 : /* all the other basic types come here */
1777 : }
1778 : }
1779 6 : i++;
1780 : }
1781 6 : mapi_close_handle(hdl);
1782 6 : if( i>1)
1783 0 : throw(MAL, "mapi.rpc","Too many answers");
1784 : return MAL_SUCCEED;
1785 : }
1786 : /*
1787 : * Transport of the BATs is only slightly more complicated.
1788 : * The generic implementation based on a pattern is the next
1789 : * step.
1790 : */
1791 : static str
1792 5 : SERVERmapi_rpc_bat(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci){
1793 : bat *ret;
1794 : int *key;
1795 : str *qry,err= MAL_SUCCEED;
1796 : Mapi mid;
1797 : MapiHdl hdl;
1798 : char *fld2;
1799 : BAT *b;
1800 : ValRecord tval;
1801 : int i=0, tt;
1802 :
1803 : (void) cntxt;
1804 5 : ret= getArgReference_bat(stk,pci,0);
1805 5 : key= getArgReference_int(stk,pci,pci->retc);
1806 5 : qry= getArgReference_str(stk,pci,pci->retc+1);
1807 10 : accessTest(*key, "rpc");
1808 5 : tt= getBatType(getVarType(mb,getArg(pci,0)));
1809 :
1810 5 : hdl= mapi_query(mid, *qry);
1811 5 : catchErrors("mapi.rpc");
1812 :
1813 5 : b= COLnew(0,tt,256, TRANSIENT);
1814 5 : if ( b == NULL) {
1815 0 : mapi_close_handle(hdl);
1816 0 : throw(MAL,"mapi.rpc", SQLSTATE(HY013) MAL_MALLOC_FAIL);
1817 : }
1818 17 : while( mapi_fetch_row(hdl)){
1819 12 : fld2= mapi_fetch_field(hdl,1);
1820 12 : if(SERVERfieldAnalysis(fld2, tt, &tval) < 0) {
1821 0 : BBPreclaim(b);
1822 0 : mapi_close_handle(hdl);
1823 0 : throw(MAL, "mapi.rpc", SQLSTATE(HY013) MAL_MALLOC_FAIL);
1824 : }
1825 12 : if (BUNappend(b,VALptr(&tval), false) != GDK_SUCCEED) {
1826 0 : BBPreclaim(b);
1827 0 : mapi_close_handle(hdl);
1828 0 : throw(MAL, "mapi.rpc", SQLSTATE(HY013) MAL_MALLOC_FAIL);
1829 : }
1830 : }
1831 5 : mapi_close_handle(hdl);
1832 5 : *ret = b->batCacheid;
1833 5 : BBPkeepref(*ret);
1834 :
1835 5 : return err;
1836 : }
1837 :
1838 : static str
1839 2 : SERVERput(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci){
1840 : int *key;
1841 : str *nme;
1842 : ptr val;
1843 : int i,tpe;
1844 : Mapi mid;
1845 : MapiHdl hdl=0;
1846 : char *w=0, buf[BUFSIZ];
1847 :
1848 : (void) cntxt;
1849 2 : key= getArgReference_int(stk,pci,pci->retc);
1850 2 : nme= getArgReference_str(stk,pci,pci->retc+1);
1851 2 : val= getArgReference(stk,pci,pci->retc+2);
1852 4 : accessTest(*key, "put");
1853 2 : switch( (tpe=getArgType(mb,pci, pci->retc+2)) ){
1854 0 : case TYPE_bat:{
1855 : /* generate a tuple batch */
1856 : /* and reload it into the proper format */
1857 : str ht,tt;
1858 0 : BAT *b = BBPquickdesc(BBPindex(*nme));
1859 : size_t len;
1860 :
1861 0 : if (!b)
1862 0 : throw(MAL, "mapi.put", RUNTIME_OBJECT_MISSING);
1863 :
1864 : /* reconstruct the object */
1865 0 : ht = getTypeName(TYPE_oid);
1866 0 : tt = getTypeName(getBatType(tpe));
1867 0 : snprintf(buf,BUFSIZ,"%s:= bat.new(:%s,%s);", *nme, ht,tt );
1868 0 : len = strlen(buf);
1869 0 : snprintf(buf+len,BUFSIZ-len,"%s:= io.import(%s,tuples);", *nme, *nme);
1870 :
1871 : /* and execute the request */
1872 0 : if( SERVERsessions[i].hdl)
1873 0 : mapi_close_handle(SERVERsessions[i].hdl);
1874 0 : SERVERsessions[i].hdl= mapi_query(mid, buf);
1875 :
1876 0 : GDKfree(ht);
1877 0 : GDKfree(tt);
1878 0 : break;
1879 : }
1880 0 : case TYPE_str:
1881 0 : snprintf(buf,BUFSIZ,"%s:=%s;",*nme,*(char**)val);
1882 0 : if( SERVERsessions[i].hdl)
1883 0 : mapi_close_handle(SERVERsessions[i].hdl);
1884 0 : SERVERsessions[i].hdl= mapi_query(mid, buf);
1885 0 : break;
1886 2 : default:
1887 2 : if ((w = ATOMformat(tpe,val)) == NULL)
1888 0 : throw(MAL, "mapi.put", GDK_EXCEPTION);
1889 2 : snprintf(buf,BUFSIZ,"%s:=%s;",*nme,w);
1890 2 : GDKfree(w);
1891 2 : if( SERVERsessions[i].hdl)
1892 2 : mapi_close_handle(SERVERsessions[i].hdl);
1893 2 : SERVERsessions[i].hdl= mapi_query(mid, buf);
1894 2 : break;
1895 : }
1896 2 : catchErrors("mapi.put");
1897 : return MAL_SUCCEED;
1898 : }
1899 :
1900 : static str
1901 0 : SERVERputLocal(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci){
1902 : str *ret, *nme;
1903 : ptr val;
1904 : int tpe;
1905 : char *w=0, buf[BUFSIZ];
1906 :
1907 : (void) cntxt;
1908 0 : ret= getArgReference_str(stk,pci,0);
1909 0 : nme= getArgReference_str(stk,pci,pci->retc);
1910 0 : val= getArgReference(stk,pci,pci->retc+1);
1911 0 : switch( (tpe=getArgType(mb,pci, pci->retc+1)) ){
1912 0 : case TYPE_bat:
1913 : case TYPE_ptr:
1914 0 : throw(MAL, "mapi.glue","Unsupported type");
1915 0 : case TYPE_str:
1916 0 : snprintf(buf,BUFSIZ,"%s:=%s;",*nme,*(char**)val);
1917 0 : break;
1918 0 : default:
1919 0 : if ((w = ATOMformat(tpe,val)) == NULL)
1920 0 : throw(MAL, "mapi.glue", GDK_EXCEPTION);
1921 0 : snprintf(buf,BUFSIZ,"%s:=%s;",*nme,w);
1922 0 : GDKfree(w);
1923 0 : break;
1924 : }
1925 0 : *ret= GDKstrdup(buf);
1926 0 : if(*ret == NULL)
1927 0 : throw(MAL, "mapi.glue", GDK_EXCEPTION);
1928 : return MAL_SUCCEED;
1929 : }
1930 :
1931 : static str
1932 1 : SERVERbindBAT(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci){
1933 : int *key;
1934 : str *nme,*tab,*col;
1935 : int i;
1936 : Mapi mid;
1937 : MapiHdl hdl=0;
1938 : char buf[BUFSIZ];
1939 :
1940 : (void) cntxt;
1941 1 : key= getArgReference_int(stk,pci,pci->retc);
1942 1 : nme= getArgReference_str(stk,pci,pci->retc+1);
1943 2 : accessTest(*key, "bind");
1944 1 : if( pci->argc == 6) {
1945 : char *tn;
1946 0 : tab= getArgReference_str(stk,pci,pci->retc+2);
1947 0 : col= getArgReference_str(stk,pci,pci->retc+3);
1948 0 : i= *getArgReference_int(stk,pci,pci->retc+4);
1949 0 : tn = getTypeName(getBatType(getVarType(mb,getDestVar(pci))));
1950 0 : snprintf(buf,BUFSIZ,"%s:bat[:%s]:=sql.bind(\"%s\",\"%s\",\"%s\",%d);",
1951 : getVarName(mb,getDestVar(pci)),
1952 : tn,
1953 : *nme, *tab,*col,i);
1954 0 : GDKfree(tn);
1955 1 : } else if( pci->argc == 5) {
1956 0 : tab= getArgReference_str(stk,pci,pci->retc+2);
1957 0 : i= *getArgReference_int(stk,pci,pci->retc+3);
1958 0 : snprintf(buf,BUFSIZ,"%s:bat[:oid]:=sql.bind(\"%s\",\"%s\",0,%d);",
1959 : getVarName(mb,getDestVar(pci)),*nme, *tab,i);
1960 : } else {
1961 : str hn,tn;
1962 1 : int target= getArgType(mb,pci,0);
1963 1 : hn= getTypeName(TYPE_oid);
1964 1 : tn= getTypeName(getBatType(target));
1965 1 : snprintf(buf,BUFSIZ,"%s:bat[:%s]:=bbp.bind(\"%s\");",
1966 : getVarName(mb,getDestVar(pci)), tn, *nme);
1967 1 : GDKfree(hn);
1968 1 : GDKfree(tn);
1969 : }
1970 1 : if( SERVERsessions[i].hdl)
1971 1 : mapi_close_handle(SERVERsessions[i].hdl);
1972 1 : SERVERsessions[i].hdl= mapi_query(mid, buf);
1973 1 : catchErrors("mapi.bind");
1974 : return MAL_SUCCEED;
1975 : }
1976 :
1977 : #include "mel.h"
1978 : mel_func mal_mapi_init_funcs[] = {
1979 : command("mapi", "prelude", SERVERlisten_default, false, "", args(1,1, arg("",int))),
1980 : command("mapi", "listen", SERVERlisten_default, false, "Start a Mapi server with the default settings.", args(1,1, arg("",int))),
1981 : command("mapi", "listen", SERVERlisten_port, false, "Start a Mapi listener on the port given.", args(1,2, arg("",int),arg("port",int))),
1982 : command("mapi", "listen", SERVERlisten_usock, false, "Start a Mapi listener on the unix socket file given.", args(1,2, arg("",int),arg("unixsocket",str))),
1983 : command("mapi", "stop", SERVERstop, false, "Terminate connection listeners.", args(1,1, arg("",void))),
1984 : command("mapi", "suspend", SERVERsuspend, false, "Suspend accepting connections.", args(1,1, arg("",void))),
1985 : command("mapi", "resume", SERVERresume, false, "Resume connection listeners.", args(1,1, arg("",void))),
1986 : command("mapi", "malclient", SERVERclient, false, "Start a Mapi client for a particular stream pair.", args(1,3, arg("",void),arg("in",streams),arg("out",streams))),
1987 : command("mapi", "trace", SERVERtrace, false, "Toggle the Mapi library debug tracer.", args(1,3, arg("",void),arg("mid",int),arg("flag",int))),
1988 : pattern("mapi", "reconnect", SERVERreconnectWithoutAlias, false, "Re-establish connection with a remote mserver.", args(1,6, arg("",int),arg("host",str),arg("port",int),arg("usr",str),arg("passwd",str),arg("lang",str))),
1989 : pattern("mapi", "reconnect", SERVERreconnectAlias, false, "Re-establish connection with a remote mserver.", args(1,7, arg("",int),arg("host",str),arg("port",int),arg("db_alias",str),arg("usr",str),arg("passwd",str),arg("lang",str))),
1990 : command("mapi", "reconnect", SERVERreconnect, false, "Re-establish a connection.", args(1,2, arg("",void),arg("mid",int))),
1991 : pattern("mapi", "connect", SERVERconnect, false, "Establish connection with a remote mserver.", args(1,6, arg("",int),arg("host",str),arg("port",int),arg("usr",str),arg("passwd",str),arg("lang",str))),
1992 : command("mapi", "disconnect", SERVERdisconnectWithAlias, false, "Close connection with a remote Mserver.", args(1,2, arg("",int),arg("dbalias",str))),
1993 : command("mapi", "disconnect", SERVERdisconnectALL, false, "Close connections with all remote Mserver.", args(1,1, arg("",int))),
1994 : command("mapi", "setAlias", SERVERsetAlias, false, "Give the channel a logical name.", args(0,2, arg("key",int),arg("dbalias",str))),
1995 : command("mapi", "lookup", SERVERlookup, false, "Retrieve the connection identifier.", args(1,2, arg("",int),arg("dbalias",str))),
1996 : command("mapi", "disconnect", SERVERdisconnect, false, "Terminate the session.", args(1,2, arg("",void),arg("mid",int))),
1997 : command("mapi", "destroy", SERVERdestroy, false, "Destroy the handle for an Mserver.", args(1,2, arg("",void),arg("mid",int))),
1998 : command("mapi", "ping", SERVERping, false, "Test availability of an Mserver.", args(1,2, arg("",int),arg("mid",int))),
1999 : command("mapi", "query", SERVERquery, false, "Send the query for execution", args(1,3, arg("",int),arg("mid",int),arg("qry",str))),
2000 : command("mapi", "query_handle", SERVERquery_handle, false, "Send the query for execution.", args(1,3, arg("",int),arg("mid",int),arg("qry",str))),
2001 : pattern("mapi", "query_array", SERVERquery_array, false, "Send the query for execution replacing '?' by arguments.", args(1,4, arg("",int),arg("mid",int),arg("qry",str),vararg("arg",str))),
2002 : command("mapi", "prepare", SERVERprepare, false, "Prepare a query for execution.", args(1,3, arg("",int),arg("mid",int),arg("qry",str))),
2003 : command("mapi", "finish", SERVERfinish, false, "Remove all remaining answers.", args(1,2, arg("",int),arg("hdl",int))),
2004 : command("mapi", "get_field_count", SERVERget_field_count, false, "Return number of fields.", args(1,2, arg("",int),arg("hdl",int))),
2005 : command("mapi", "get_row_count", SERVERget_row_count, false, "Return number of rows.", args(1,2, arg("",lng),arg("hdl",int))),
2006 : command("mapi", "rows_affected", SERVERrows_affected, false, "Return number of affected rows.", args(1,2, arg("",lng),arg("hdl",int))),
2007 : command("mapi", "fetch_row", SERVERfetch_row, false, "Retrieve the next row for analysis.", args(1,2, arg("",int),arg("hdl",int))),
2008 : command("mapi", "fetch_all_rows", SERVERfetch_all_rows, false, "Retrieve all rows into the cache.", args(1,2, arg("",lng),arg("hdl",int))),
2009 : command("mapi", "fetch_field", SERVERfetch_field_str, false, "Retrieve a single field.", args(1,3, arg("",str),arg("hdl",int),arg("fnr",int))),
2010 : command("mapi", "fetch_field", SERVERfetch_field_int, false, "Retrieve a single int field.", args(1,3, arg("",int),arg("hdl",int),arg("fnr",int))),
2011 : command("mapi", "fetch_field", SERVERfetch_field_lng, false, "Retrieve a single lng field.", args(1,3, arg("",lng),arg("hdl",int),arg("fnr",int))),
2012 : command("mapi", "fetch_field", SERVERfetch_field_sht, false, "Retrieve a single sht field.", args(1,3, arg("",sht),arg("hdl",int),arg("fnr",int))),
2013 : command("mapi", "fetch_field", SERVERfetch_field_void, false, "Retrieve a single void field.", args(1,3, arg("",void),arg("hdl",int),arg("fnr",int))),
2014 : command("mapi", "fetch_field", SERVERfetch_field_oid, false, "Retrieve a single void field.", args(1,3, arg("",oid),arg("hdl",int),arg("fnr",int))),
2015 : command("mapi", "fetch_field", SERVERfetch_field_bte, false, "Retrieve a single bte field.", args(1,3, arg("",bte),arg("hdl",int),arg("fnr",int))),
2016 : command("mapi", "fetch_field_array", SERVERfetch_field_bat, false, "Retrieve all fields for a row.", args(1,2, batarg("",str),arg("hdl",int))),
2017 : command("mapi", "fetch_line", SERVERfetch_line, false, "Retrieve a complete line.", args(1,2, arg("",str),arg("hdl",int))),
2018 : command("mapi", "fetch_reset", SERVERfetch_reset, false, "Reset the cache read line.", args(1,2, arg("",int),arg("hdl",int))),
2019 : command("mapi", "next_result", SERVERnext_result, false, "Go to next result set.", args(1,2, arg("",int),arg("hdl",int))),
2020 : command("mapi", "error", SERVERerror, false, "Check for an error in the communication.", args(1,2, arg("",int),arg("mid",int))),
2021 : command("mapi", "getError", SERVERgetError, false, "Get error message.", args(1,2, arg("",str),arg("mid",int))),
2022 : command("mapi", "explain", SERVERexplain, false, "Turn the error seen into a string.", args(1,2, arg("",str),arg("mid",int))),
2023 : pattern("mapi", "put", SERVERput, false, "Send a value to a remote site.", args(1,4, arg("",void),arg("mid",int),arg("nme",str),argany("val",1))),
2024 : pattern("mapi", "put", SERVERputLocal, false, "Prepare sending a value to a remote site.", args(1,3, arg("",str),arg("nme",str),argany("val",1))),
2025 : pattern("mapi", "rpc", SERVERmapi_rpc_single_row, false, "Send a simple query for execution and fetch result.", args(1,3, argany("",0),arg("key",int),vararg("qry",str))),
2026 : pattern("mapi", "rpc", SERVERmapi_rpc_bat, false, "", args(1,3, batargany("",2),arg("key",int),arg("qry",str))),
2027 : command("mapi", "rpc", SERVERquery, false, "Send a simple query for execution.", args(1,3, arg("",int),arg("key",int),arg("qry",str))),
2028 : pattern("mapi", "bind", SERVERbindBAT, false, "Bind a remote variable to a local one.", args(1,6, batargany("",2),arg("key",int),arg("rschema",str),arg("rtable",str),arg("rcolumn",str),arg("i",int))),
2029 : pattern("mapi", "bind", SERVERbindBAT, false, "Bind a remote variable to a local one.", args(1,5, batargany("",2),arg("key",int),arg("rschema",str),arg("rtable",str),arg("i",int))),
2030 : pattern("mapi", "bind", SERVERbindBAT, false, "Bind a remote variable to a local one.", args(1,3, batargany("",2),arg("key",int),arg("remoteName",str))),
2031 : #ifdef HAVE_HGE
2032 : command("mapi", "fetch_field", SERVERfetch_field_hge, false, "Retrieve a single hge field.", args(1,3, arg("",hge),arg("hdl",int),arg("fnr",int))),
2033 : #endif
2034 : { .imp=NULL }
2035 : };
2036 : #include "mal_import.h"
2037 : #ifdef _MSC_VER
2038 : #undef read
2039 : #pragma section(".CRT$XCU",read)
2040 : #endif
2041 257 : LIB_STARTUP_FUNC(init_mal_mapi_mal)
2042 257 : { mal_module("mapi", NULL, mal_mapi_init_funcs); }
2043 :
2044 : #else
2045 : // this avoids a compiler warning w.r.t. empty compilation units.
2046 : int SERVERdummy = 42;
2047 : #endif // HAVE_MAPI
|