LCOV - code coverage report
Current view: top level - common/stream - socket_stream.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 101 128 78.9 %
Date: 2021-10-13 02:24:04 Functions: 8 8 100.0 %

          Line data    Source code
       1             : /*
       2             :  * This Source Code Form is subject to the terms of the Mozilla Public
       3             :  * License, v. 2.0.  If a copy of the MPL was not distributed with this
       4             :  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
       5             :  *
       6             :  * Copyright 1997 - July 2008 CWI, August 2008 - 2021 MonetDB B.V.
       7             :  */
       8             : 
       9             : /* Generic stream handling code such as init and close */
      10             : 
      11             : #include "monetdb_config.h"
      12             : #include "stream.h"
      13             : #include "stream_internal.h"
      14             : #ifdef HAVE_SYS_TIME_H
      15             : #include <sys/time.h>
      16             : #endif
      17             : 
      18             : 
      19             : /* ------------------------------------------------------------------ */
      20             : /* streams working on a socket */
      21             : 
      22             : static ssize_t
      23     1051572 : socket_write(stream *restrict s, const void *restrict buf, size_t elmsize, size_t cnt)
      24             : {
      25     1051572 :         size_t size = elmsize * cnt, res = 0;
      26             : #ifdef NATIVE_WIN32
      27             :         int nr = 0;
      28             : #else
      29             :         ssize_t nr = 0;
      30             : #endif
      31             : 
      32     1051572 :         if (s->errkind != MNSTR_NO__ERROR)
      33             :                 return -1;
      34             : 
      35     1051572 :         if (size == 0 || elmsize == 0)
      36           0 :                 return (ssize_t) cnt;
      37             : 
      38     1051572 :         errno = 0;
      39     3154577 :         while (res < size &&
      40             :                (
      41             : #ifdef NATIVE_WIN32
      42             :                        /* send works on int, make sure the argument fits */
      43             :                        ((nr = send(s->stream_data.s, (const char *) buf + res, (int) min(size - res, 1 << 16), 0)) > 0)
      44             : #else
      45     1051572 :                        ((nr = write(s->stream_data.s, (const char *) buf + res, size - res)) > 0)
      46             : #endif
      47         139 :                        || (nr < 0 && /* syscall failed */
      48         139 :                            s->timeout > 0 &&      /* potentially timeout */
      49             : #ifdef _MSC_VER
      50             :                            WSAGetLastError() == WSAEWOULDBLOCK &&
      51             : #else
      52           0 :                            (errno == EAGAIN
      53             : #if EAGAIN != EWOULDBLOCK
      54             :                             || errno == EWOULDBLOCK
      55             : #endif
      56           0 :                                    ) && /* it was! */
      57             : #endif
      58           0 :                            s->timeout_func != NULL &&        /* callback function exists */
      59           0 :                            !s->timeout_func(s->timeout_data))     /* callback says don't stop */
      60         139 :                        ||(nr < 0 &&
      61             : #ifdef _MSC_VER
      62             :                           WSAGetLastError() == WSAEINTR
      63             : #else
      64         139 :                           errno == EINTR
      65             : #endif
      66             :                                ))       /* interrupted */
      67             :                 ) {
      68     1051433 :                 errno = 0;
      69             : #ifdef _MSC_VER
      70             :                 WSASetLastError(0);
      71             : #endif
      72     1051433 :                 if (nr > 0)
      73     1051433 :                         res += (size_t) nr;
      74             :         }
      75     1051572 :         if (res >= elmsize)
      76     1051433 :                 return (ssize_t) (res / elmsize);
      77         139 :         if (nr < 0) {
      78         139 :                 if (s->timeout > 0 &&
      79             : #ifdef _MSC_VER
      80             :                     WSAGetLastError() == WSAEWOULDBLOCK
      81             : #else
      82           0 :                     (errno == EAGAIN
      83             : #if EAGAIN != EWOULDBLOCK
      84             :                      || errno == EWOULDBLOCK
      85             : #endif
      86             :                             )
      87             : #endif
      88             :                         )
      89           0 :                         mnstr_set_error(s, MNSTR_TIMEOUT, NULL);
      90             :                 else
      91         139 :                         mnstr_set_error_errno(s, MNSTR_WRITE_ERROR, "socket write");
      92         139 :                 return -1;
      93             :         }
      94             :         return 0;
      95             : }
      96             : 
      97             : static ssize_t
      98     1301826 : socket_read(stream *restrict s, void *restrict buf, size_t elmsize, size_t cnt)
      99             : {
     100             : #ifdef _MSC_VER
     101             :         int nr = 0;
     102             : #else
     103             :         ssize_t nr = 0;
     104             : #endif
     105     1301826 :         size_t size = elmsize * cnt;
     106             : 
     107     1301826 :         if (s->errkind != MNSTR_NO__ERROR)
     108             :                 return -1;
     109     1301826 :         if (size == 0)
     110             :                 return 0;
     111             : 
     112             : #ifdef _MSC_VER
     113             :         /* recv only takes an int parameter, and read does not accept
     114             :          * sockets */
     115             :         if (size > INT_MAX)
     116             :                 size = elmsize * (INT_MAX / elmsize);
     117             : #endif
     118             :         for (;;) {
     119     1304159 :                 if (s->timeout) {
     120             :                         int ret;
     121             : #ifdef HAVE_POLL
     122             :                         struct pollfd pfd;
     123             : 
     124      671157 :                         pfd = (struct pollfd) {.fd = s->stream_data.s,
     125             :                                                .events = POLLIN};
     126             : 
     127      671157 :                         ret = poll(&pfd, 1, (int) s->timeout);
     128      671157 :                         if (ret == -1 && errno == EINTR)
     129        2333 :                                 continue;
     130      671157 :                         if (ret == -1 || (pfd.revents & POLLERR)) {
     131           0 :                                 mnstr_set_error_errno(s, MNSTR_READ_ERROR, "poll error");
     132          17 :                                 return -1;
     133             :                         }
     134             : #else
     135             :                         struct timeval tv;
     136             :                         fd_set fds;
     137             : 
     138             :                         errno = 0;
     139             : #ifdef _MSC_VER
     140             :                         WSASetLastError(0);
     141             : #endif
     142             :                         FD_ZERO(&fds);
     143             :                         FD_SET(s->stream_data.s, &fds);
     144             :                         tv.tv_sec = s->timeout / 1000;
     145             :                         tv.tv_usec = (s->timeout % 1000) * 1000;
     146             :                         ret = select(
     147             : #ifdef _MSC_VER
     148             :                                 0,      /* ignored on Windows */
     149             : #else
     150             :                                 s->stream_data.s + 1,
     151             : #endif
     152             :                                 &fds, NULL, NULL, &tv);
     153             :                         if (ret == SOCKET_ERROR) {
     154             :                                 mnstr_set_error_errno(s, MNSTR_READ_ERROR, "select");
     155             :                                 return -1;
     156             :                         }
     157             : #endif
     158      671157 :                         if (ret == 0) {
     159        2350 :                                 if (s->timeout_func == NULL || s->timeout_func(s->timeout_data)) {
     160          17 :                                         mnstr_set_error(s, MNSTR_TIMEOUT, NULL);
     161          17 :                                         return -1;
     162             :                                 }
     163        2333 :                                 continue;
     164             :                         }
     165      668807 :                         assert(ret == 1);
     166             : #ifdef HAVE_POLL
     167      668807 :                         assert(pfd.revents & (POLLIN|POLLHUP));
     168             : #else
     169             :                         assert(FD_ISSET(s->stream_data.s, &fds));
     170             : #endif
     171             :                 }
     172             : #ifdef _MSC_VER
     173             :                 nr = recv(s->stream_data.s, buf, (int) size, 0);
     174             :                 if (nr == SOCKET_ERROR) {
     175             :                         mnstr_set_error_errno(s, MNSTR_READ_ERROR, "recv");
     176             :                         return -1;
     177             :                 }
     178             : #else
     179     1301809 :                 nr = read(s->stream_data.s, buf, size);
     180     1301844 :                 if (nr == -1 && errno == EINTR)
     181           0 :                         continue;
     182     1301844 :                 if (nr == -1) {
     183           0 :                         mnstr_set_error_errno(s, MNSTR_READ_ERROR, NULL);
     184           0 :                         return -1;
     185             :                 }
     186             : #endif
     187             :                 break;
     188             :         }
     189     1301844 :         if (nr == 0) {
     190        5621 :                 s->eof = true;
     191        5621 :                 return 0;       /* end of file */
     192             :         }
     193     1296223 :         if (elmsize > 1) {
     194      548716 :                 while ((size_t) nr % elmsize != 0) {
     195             :                         /* if elmsize > 1, we really expect that "the
     196             :                          * other side" wrote complete items in a
     197             :                          * single system call, so we expect to at
     198             :                          * least receive complete items, and hence we
     199             :                          * continue reading until we did in fact
     200             :                          * receive an integral number of complete
     201             :                          * items, ignoring any timeouts (but not real
     202             :                          * errors) (note that recursion is limited
     203             :                          * since we don't propagate the element size
     204             :                          * to the recursive call) */
     205             :                         ssize_t n;
     206           9 :                         n = socket_read(s, (char *) buf + nr, 1, size - (size_t) nr);
     207           0 :                         if (n < 0) {
     208           0 :                                 if (s->errkind == MNSTR_NO__ERROR)
     209           0 :                                         mnstr_set_error(s, MNSTR_READ_ERROR, "socket_read failed");
     210           0 :                                 return -1;
     211             :                         }
     212           0 :                         if (n == 0)     /* unexpected end of file */
     213             :                                 break;
     214           0 :                         nr +=
     215             : #ifdef _MSC_VER
     216             :                                 (int)
     217             : #endif
     218             :                                 n;
     219             :                 }
     220             :         }
     221     1296214 :         return nr / (ssize_t) elmsize;
     222             : }
     223             : 
     224             : static void
     225       13728 : socket_close(stream *s)
     226             : {
     227       13728 :         SOCKET fd = s->stream_data.s;
     228             : 
     229       13728 :         if (fd != INVALID_SOCKET) {
     230             :                 /* Related read/write (in/out, from/to) streams
     231             :                  * share a single socket which is not dup'ed (anymore)
     232             :                  * as Windows' dup doesn't work on sockets;
     233             :                  * hence, only one of the streams must/may close that
     234             :                  * socket; we choose to let the read socket do the
     235             :                  * job, since in mapi.c it may happen that the read
     236             :                  * stream is closed before the write stream was even
     237             :                  * created.
     238             :                  */
     239       13728 :                 if (s->readonly) {
     240             : #ifdef HAVE_SHUTDOWN
     241        6864 :                         shutdown(fd, SHUT_RDWR);
     242             : #endif
     243        6864 :                         closesocket(fd);
     244             :                 }
     245             :         }
     246       13728 :         s->stream_data.s = INVALID_SOCKET;
     247       13728 : }
     248             : 
     249             : static void
     250        5371 : socket_update_timeout(stream *s)
     251             : {
     252        5371 :         SOCKET fd = s->stream_data.s;
     253             :         struct timeval tv;
     254             : 
     255        5371 :         if (fd == INVALID_SOCKET)
     256           0 :                 return;
     257        5371 :         tv.tv_sec = s->timeout / 1000;
     258        5371 :         tv.tv_usec = (s->timeout % 1000) * 1000;
     259             :         /* cast to char * for Windows, no harm on "normal" systems */
     260        5371 :         if (!s->readonly)
     261           2 :                 (void) setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, (char *) &tv, (socklen_t) sizeof(tv));
     262             : }
     263             : 
     264             : #ifndef MSG_DONTWAIT
     265             : #define MSG_DONTWAIT 0
     266             : #endif
     267             : 
     268             : static int
     269    11279427 : socket_isalive(const stream *s)
     270             : {
     271    11279427 :         SOCKET fd = s->stream_data.s;
     272             : #ifdef HAVE_POLL
     273             :         struct pollfd pfd;
     274             :         int ret;
     275    11279427 :         pfd = (struct pollfd){.fd = fd};
     276    11279427 :         if ((ret = poll(&pfd, 1, 0)) == 0)
     277             :                 return 1;
     278           0 :         if (ret == -1 && errno == EINTR)
     279           0 :                 return socket_isalive(s);
     280           0 :         if (ret < 0 || pfd.revents & (POLLERR | POLLHUP))
     281             :                 return 0;
     282           0 :         assert(0);              /* unexpected revents value */
     283             :         return 0;
     284             : #else
     285             :         fd_set fds;
     286             :         struct timeval t;
     287             :         char buffer[32];
     288             : 
     289             :         t.tv_sec = 0;
     290             :         t.tv_usec = 0;
     291             :         FD_ZERO(&fds);
     292             :         FD_SET(fd, &fds);
     293             :         return select(
     294             : #ifdef _MSC_VER
     295             :                 0,      /* ignored on Windows */
     296             : #else
     297             :                 fd + 1,
     298             : #endif
     299             :                 &fds, NULL, NULL, &t) <= 0 ||
     300             :                 recv(fd, buffer, sizeof(buffer), MSG_PEEK | MSG_DONTWAIT) != 0;
     301             : #endif
     302             : }
     303             : 
     304             : static stream *
     305       13740 : socket_open(SOCKET sock, const char *name)
     306             : {
     307             :         stream *s;
     308       13740 :         int domain = 0;
     309             : 
     310       13740 :         if (sock == INVALID_SOCKET) {
     311           0 :                 mnstr_set_open_error(name, 0, "invalid socket");
     312           0 :                 return NULL;
     313             :         }
     314       13740 :         if ((s = create_stream(name)) == NULL)
     315             :                 return NULL;
     316       13740 :         s->read = socket_read;
     317       13740 :         s->write = socket_write;
     318       13740 :         s->close = socket_close;
     319       13740 :         s->stream_data.s = sock;
     320       13740 :         s->update_timeout = socket_update_timeout;
     321       13740 :         s->isalive = socket_isalive;
     322             : 
     323       13740 :         errno = 0;
     324             : #ifdef _MSC_VER
     325             :         WSASetLastError(0);
     326             : #endif
     327             : #if defined(SO_DOMAIN)
     328             :         {
     329       13740 :                 socklen_t len = (socklen_t) sizeof(domain);
     330       13740 :                 if (getsockopt(sock, SOL_SOCKET, SO_DOMAIN, (void *) &domain, &len) == SOCKET_ERROR)
     331           0 :                         domain = AF_INET;       /* give it a value if call fails */
     332             :         }
     333             : #endif
     334             : #if defined(SO_KEEPALIVE) && !defined(WIN32)
     335       13740 :         if (domain != PF_UNIX) {        /* not on UNIX sockets */
     336       13172 :                 int opt = 1;
     337       13172 :                 (void) setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, (void *) &opt, sizeof(opt));
     338             :         }
     339             : #endif
     340             : #if defined(IPTOS_THROUGHPUT) && !defined(WIN32)
     341       13740 :         if (domain != PF_UNIX) {        /* not on UNIX sockets */
     342       13172 :                 int tos = IPTOS_THROUGHPUT;
     343             : 
     344       13172 :                 (void) setsockopt(sock, IPPROTO_IP, IP_TOS, (void *) &tos, sizeof(tos));
     345             :         }
     346             : #endif
     347             : #ifdef TCP_NODELAY
     348       13740 :         if (domain != PF_UNIX) {        /* not on UNIX sockets */
     349       13172 :                 int nodelay = 1;
     350             : 
     351       13172 :                 (void) setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, (void *) &nodelay, sizeof(nodelay));
     352             :         }
     353             : #endif
     354             : #ifdef HAVE_FCNTL
     355             :         {
     356       13740 :                 int fl = fcntl(sock, F_GETFL);
     357             : 
     358       13740 :                 fl &= ~O_NONBLOCK;
     359       13740 :                 if (fcntl(sock, F_SETFL, fl) < 0) {
     360           0 :                         mnstr_set_error_errno(s, MNSTR_OPEN_ERROR, "fcntl unset O_NONBLOCK failed");
     361           0 :                         return s;
     362             :                 }
     363             :         }
     364             : #endif
     365             : 
     366             :         return s;
     367             : }
     368             : 
     369             : stream *
     370        6870 : socket_rstream(SOCKET sock, const char *name)
     371             : {
     372             :         stream *s = NULL;
     373             : 
     374             : #ifdef STREAM_DEBUG
     375             :         fprintf(stderr, "socket_rstream %zd %s\n", (ssize_t) sock, name);
     376             : #endif
     377        6870 :         if ((s = socket_open(sock, name)) != NULL)
     378        6870 :                 s->binary = true;
     379        6870 :         return s;
     380             : }
     381             : 
     382             : stream *
     383        6870 : socket_wstream(SOCKET sock, const char *name)
     384             : {
     385             :         stream *s;
     386             : 
     387             : #ifdef STREAM_DEBUG
     388             :         fprintf(stderr, "socket_wstream %zd %s\n", (ssize_t) sock, name);
     389             : #endif
     390        6870 :         if ((s = socket_open(sock, name)) == NULL)
     391             :                 return NULL;
     392        6870 :         s->readonly = false;
     393        6870 :         s->binary = true;
     394        6870 :         return s;
     395             : }

Generated by: LCOV version 1.14