LCOV - code coverage report
Current view: top level - gdk - gdk_interprocess.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 0 95 0.0 %
Date: 2021-09-14 19:48:19 Functions: 0 14 0.0 %

          Line data    Source code
       1             : /*
       2             :  * This Source Code Form is subject to the terms of the Mozilla Public
       3             :  * License, v. 2.0.  If a copy of the MPL was not distributed with this
       4             :  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
       5             :  *
       6             :  * Copyright 1997 - July 2008 CWI, August 2008 - 2021 MonetDB B.V.
       7             :  */
       8             : 
       9             : #include "monetdb_config.h"
      10             : 
      11             : #ifdef HAVE_FORK
      12             : 
      13             : #include "gdk_interprocess.h"
      14             : #include "gdk.h"
      15             : #include "gdk_private.h"
      16             : #include "mutils.h"
      17             : 
      18             : #include <string.h>
      19             : 
      20             : #include <sys/types.h>
      21             : #include <sys/ipc.h>
      22             : #include <sys/shm.h>
      23             : #include <sys/wait.h>
      24             : #include <unistd.h>
      25             : #include <sys/mman.h>
      26             : #include <sys/stat.h>
      27             : #include <fcntl.h>
      28             : #include <sched.h>
      29             : #include <sys/sem.h>
      30             : #include <time.h>
      31             : 
      32             : static ATOMIC_TYPE interprocess_unique_id = ATOMIC_VAR_INIT(1);
      33             : static key_t base_key = 800000000;
      34             : 
      35             : // Regular ftok produces too many collisions
      36             : static inline void
      37             : ftok_enhanced(int id, key_t * return_key)
      38             : {
      39           0 :         *return_key = base_key + id;
      40             : }
      41             : 
      42             : //! Obtain a set of unique identifiers that can be used to create memory mapped files or semaphores
      43             : /* offset: The amount of unique identifiers necessary
      44             :  * return: The first unique identifier reserved. The consecutive [offset] identifiers are also reserved.
      45             :  *  (ex. if offset = 5 and the return value is 10, then the identifiers 10-14 are reserved)
      46             : */
      47             : size_t
      48           0 : GDKuniqueid(size_t offset)
      49             : {
      50           0 :         return (size_t) ATOMIC_ADD(&interprocess_unique_id, (ATOMIC_BASE_TYPE) offset);
      51             : }
      52             : 
      53             : //! Create a memory mapped file if it does not exist and open it
      54             : /* id: The unique identifier of the memory mapped file (use GDKuniquemmapid to get a unique identifier)
      55             :  * size: Minimum required size of the file
      56             :  * return: Return value pointing into the file, NULL if not successful
      57             : */
      58             : void *
      59           0 : GDKinitmmap(size_t id, size_t size, size_t *return_size)
      60             : {
      61             :         char address[100];
      62             :         void *ptr;
      63             :         int fd;
      64             :         int mod = MMAP_READ | MMAP_WRITE | MMAP_SEQUENTIAL | MMAP_SYNC | MAP_SHARED;
      65             :         char *path;
      66             : 
      67           0 :         GDKmmapfile(address, sizeof(address), id);
      68             : 
      69             :         /* round up to multiple of GDK_mmap_pagesize with a
      70             :          * minimum of one
      71             :          size = (maxsize + GDK_mmap_pagesize - 1) & ~(GDK_mmap_pagesize - 1);
      72             :          if (size == 0)
      73             :          size = GDK_mmap_pagesize; */
      74           0 :         path = GDKfilepath(0, BATDIR, address, "tmp");
      75           0 :         if (path == NULL) {
      76             :                 return NULL;
      77             :         }
      78           0 :         fd = GDKfdlocate(NOFARM, path, "wb", NULL);
      79           0 :         if (fd < 0) {
      80           0 :                 GDKfree(path);
      81           0 :                 return NULL;
      82             :         }
      83           0 :         if (GDKextendf(fd, size, path) != GDK_SUCCEED) {
      84           0 :                 close(fd);
      85           0 :                 GDKfree(path);
      86           0 :                 return NULL;
      87             :         }
      88           0 :         close(fd);
      89           0 :         ptr = GDKmmap(path, mod, size);
      90           0 :         GDKfree(path);
      91           0 :         if (ptr == NULL) {
      92             :                 return NULL;
      93             :         }
      94           0 :         if (return_size != NULL) {
      95           0 :                 *return_size = size;
      96             :         }
      97             :         return ptr;
      98             : }
      99             : 
     100             : //! Release a memory mapped file that was created through GDKinitmmap
     101             : /* ptr: Pointer to the file
     102             :  * size: Size of the file
     103             :  * id: Identifier of the file
     104             :  * return: GDK_SUCCEED if successful, GDK_FAIL if not successful
     105             : */
     106             : gdk_return
     107           0 : GDKreleasemmap(void *ptr, size_t size, size_t id)
     108             : {
     109             :         char address[100];
     110             :         char *path;
     111             :         int ret;
     112           0 :         GDKmmapfile(address, sizeof(address), id);
     113           0 :         if (GDKmunmap(ptr, size) != GDK_SUCCEED) {
     114             :                 return GDK_FAIL;
     115             :         }
     116           0 :         path = GDKfilepath(0, BATDIR, address, "tmp");
     117           0 :         if (path == NULL) {
     118             :                 return GDK_FAIL;
     119             :         }
     120             :         ret = MT_remove(path);
     121           0 :         if (ret < 0)
     122           0 :                 GDKsyserror("cannot remove '%s'", path);
     123           0 :         GDKfree(path);
     124           0 :         return ret < 0 ? GDK_FAIL : GDK_SUCCEED;
     125             : }
     126             : 
     127             : //! snprintf the file name of a memory mapped file (as created by GDKinitmmap)
     128             : /* buffer: The buffer to write the name to
     129             :  * max: The maxsize of the buffer (should be at least ~10 characters)
     130             :  * id: Identifier of the file
     131             : */
     132             : gdk_return
     133           0 : GDKmmapfile(str buffer, size_t max, size_t id)
     134             : {
     135           0 :         snprintf(buffer, max, "pymmap%zu", id);
     136           0 :         return GDK_SUCCEED;
     137             : }
     138             : 
     139             : static gdk_return
     140           0 : interprocess_init_semaphore(int id, int count, int flags, int *semid)
     141             : {
     142             :         key_t key;
     143             :         ftok_enhanced(id, &key);
     144           0 :         *semid = semget(key, count, flags | 0666);
     145           0 :         if (*semid < 0) {
     146           0 :                 GDKsyserror("semget failed");
     147             :                 return GDK_FAIL;
     148             :         }
     149             :         return GDK_SUCCEED;
     150             : }
     151             : 
     152             : //! Create an interprocess semaphore
     153             : /* id: identifier (obtain from GDKuniqueid)
     154             :  * count: amount of semaphores
     155             :  * semid: identifier of the created semaphore (only set if function returns GDK_SUCCEED)
     156             :  */
     157             : gdk_return
     158           0 : GDKcreatesem(int id, int count, int *semid)
     159             : {
     160           0 :         return interprocess_init_semaphore(id, count, IPC_CREAT, semid);
     161             : }
     162             : 
     163             : //! Get an interprocess semaphore that was already created using GDKcreatesem
     164             : /* id: identifier (obtain from GDKuniqueid)
     165             :  * count: amount of semaphores
     166             :  * semid: identifier of the semaphore (only set if function returns GDK_SUCCEED)
     167             :  */
     168             : gdk_return
     169           0 : GDKgetsem(int id, int count, int *semid)
     170             : {
     171           0 :         return interprocess_init_semaphore(id, count, 0, semid);
     172             : }
     173             : 
     174             : //! Gets the value of an interprocess semaphore
     175             : /* sem_id: semaphore identifier (obtained from GDKcreatesem or GDKgetsem)
     176             :  * number: the semaphore number (must be less than 'count' given when creating the semaphore)
     177             :  * semval: the value of the semaphore (only set if function returns GDK_SUCCEED)
     178             :  */
     179             : gdk_return
     180           0 : GDKgetsemval(int sem_id, int number, int *semval)
     181             : {
     182           0 :         *semval = semctl(sem_id, number, GETVAL, 0);
     183           0 :         if (*semval < 0) {
     184           0 :                 GDKsyserror("semctl failed");
     185             :                 return GDK_FAIL;
     186             :         }
     187             :         return GDK_SUCCEED;
     188             : }
     189             : 
     190             : //! Change the value of an interprocess semaphore
     191             : /* sem_id: semaphore identifier (obtained from GDKcreatesem or GDKgetsem)
     192             :  * number: the semaphore number (must be less than 'count' given when creating the semaphore)
     193             :  * change: The change to apply to the semaphore value
     194             :  */
     195             : gdk_return
     196           0 : GDKchangesemval(int sem_id, int number, int change)
     197             : {
     198             :         struct sembuf buffer;
     199           0 :         buffer.sem_num = number;
     200           0 :         buffer.sem_op = change;
     201           0 :         buffer.sem_flg = 0;
     202             : 
     203           0 :         if (semop(sem_id, &buffer, 1) < 0) {
     204           0 :                 GDKsyserror("semop failed");
     205             :                 return GDK_FAIL;
     206             :         }
     207             :         return GDK_SUCCEED;
     208             : }
     209             : 
     210             : //! Change the value of an interprocess semaphore with a timeout
     211             : /* sem_id: semaphore identifier (obtained from GDKcreatesem or GDKgetsem)
     212             :  * number: the semaphore number (must be less than 'count' given when creating the semaphore)
     213             :  * change: The change to apply to the semaphore value
     214             :  * timeout_mseconds: The timeout in milliseconds
     215             :  * succeed: Set to true if the value was successfully changed, or false if the timeout was reached
     216             :  */
     217             : gdk_return
     218           0 : GDKchangesemval_timeout(int sem_id, int number, int change, int timeout_mseconds, bool *succeed)
     219             : {
     220             : #ifdef HAVE_SEMTIMEDOP
     221             :         // Some linux installations don't have semtimedop
     222             :         // The easiest solution is to just call semop instead
     223             :         // The only reason we use semtimedop is to prevent deadlocks when there are segfaults in a subprocess, which really shouldn't happen anyway
     224             :         // So having semtimedop is not vital to the functioning of the program
     225             :         struct timespec timeout;
     226             :         struct sembuf buffer;
     227             :         buffer.sem_num = number;
     228             :         buffer.sem_op = change;
     229             :         buffer.sem_flg = 0;
     230             :         *succeed = false;
     231             : 
     232             :         timeout.tv_sec = (timeout_mseconds / 1000);
     233             :         timeout.tv_nsec = (timeout_mseconds % 1000) * 1000;
     234             : 
     235             :         if (semtimedop(sem_id, &buffer, 1, &timeout) < 0) {
     236             :                 if (errno == EAGAIN || errno == EINTR) {
     237             :                         // operation timed out; not an error
     238             :                         errno = 0;
     239             :                         return GDK_SUCCEED;
     240             :                 } else {
     241             :                         GDKsyserror("semtimedop failed");
     242             :                         return GDK_FAIL;
     243             :                 }
     244             :         }
     245             :         *succeed = true;
     246             :         return GDK_SUCCEED;
     247             : #else
     248             :         (void) timeout_mseconds;
     249           0 :         *succeed = true;
     250           0 :         return GDKchangesemval(sem_id, number, change);
     251             : #endif
     252             : }
     253             : 
     254             : //! Destroy an interprocess semaphore
     255             : /* sem_id: semaphore identifier (obtained from GDKcreatesem or GDKgetsem)
     256             :  */
     257             : gdk_return
     258           0 : GDKreleasesem(int sem_id)
     259             : {
     260           0 :         if (semctl(sem_id, 0, IPC_RMID) < 0) {
     261           0 :                 GDKsyserror("semctl failed");
     262             :                 return GDK_FAIL;
     263             :         }
     264             :         return GDK_SUCCEED;
     265             : }
     266             : 
     267             : // align to 8 bytes
     268             : #define align(sz) ((sz + 7) & ~7)
     269             : 
     270             : size_t
     271           0 : GDKbatcopysize(BAT *bat, str colname)
     272             : {
     273             :         size_t size = 0;
     274             : 
     275           0 :         size += align(strlen(colname) + 1);     //[COLNAME]
     276           0 :         size += align(sizeof(BAT));     //[BAT]
     277           0 :         size += align(bat->twidth * BATcount(bat));  //[DATA]
     278             : 
     279           0 :         if (bat->tvheap != NULL) {
     280           0 :                 size += align(sizeof(Heap));    //[VHEAP]
     281           0 :                 size += align(bat->tvheap->size); //[VHEAPDATA]
     282             :         }
     283           0 :         return size;
     284             : }
     285             : 
     286             : size_t
     287           0 : GDKbatcopy(char *dest, BAT *bat, str colname)
     288             : {
     289           0 :         MT_lock_set(&bat->theaplock);
     290           0 :         size_t batsize = bat->twidth * BATcount(bat);
     291             :         size_t position = 0;
     292             : 
     293             :         //[COLNAME]
     294           0 :         memcpy(dest + position, colname, strlen(colname) + 1);
     295           0 :         position += align(strlen(colname) + 1);
     296             :         //[BAT]
     297           0 :         memcpy(dest + position, bat, sizeof(BAT));
     298           0 :         position += align(sizeof(BAT));
     299             :         //[DATA]
     300           0 :         memcpy(dest + position, Tloc(bat, 0), batsize);
     301           0 :         position += align(batsize);
     302           0 :         if (bat->tvheap != NULL) {
     303             :                 //[VHEAP]
     304           0 :                 memcpy(dest + position, bat->tvheap, sizeof(Heap));
     305           0 :                 position += align(sizeof(Heap));
     306             :                 //[VHEAPDATA]
     307           0 :                 memcpy(dest + position, bat->tvheap->base, bat->tvheap->size);
     308           0 :                 position += align(bat->tvheap->size);
     309             :         }
     310           0 :         MT_lock_unset(&bat->theaplock);
     311           0 :         return position;
     312             : }
     313             : 
     314             : size_t
     315           0 : GDKbatread(char *src, BAT **bat, str *colname)
     316             : {
     317             :         size_t position = 0;
     318             :         BAT *b;
     319             :         //load the data for this column from shared memory
     320             :         //[COLNAME]
     321           0 :         *colname = src + position;
     322           0 :         position += align(strlen(*colname) + 1);
     323             :         //[BAT]
     324           0 :         b = (BAT *) (src + position);
     325           0 :         position += align(sizeof(BAT));
     326             :         //[DATA]
     327           0 :         b->theap->base = (void *) (src + position);
     328           0 :         position += align(b->twidth * BATcount(b));
     329           0 :         if (b->tvheap != NULL) {
     330             :                 //[VHEAP]
     331           0 :                 b->tvheap = (Heap *) (src + position);
     332           0 :                 position += align(sizeof(Heap));
     333             :                 //[VHEAPDATA]
     334           0 :                 b->tvheap->base = (void *) (src + position);
     335           0 :                 position += align(b->tvheap->size);
     336             :         }
     337           0 :         *bat = b;
     338           0 :         return position;
     339             : }
     340             : 
     341             : #endif

Generated by: LCOV version 1.14