LCOV - code coverage report
Current view: top level - gdk - gdk_interprocess.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 0 128 0.0 %
Date: 2020-06-29 20:00:14 Functions: 0 14 0.0 %

          Line data    Source code
       1             : /*
       2             :  * This Source Code Form is subject to the terms of the Mozilla Public
       3             :  * License, v. 2.0.  If a copy of the MPL was not distributed with this
       4             :  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
       5             :  *
       6             :  * Copyright 1997 - July 2008 CWI, August 2008 - 2020 MonetDB B.V.
       7             :  */
       8             : 
       9             : #include "monetdb_config.h"
      10             : 
      11             : #ifdef HAVE_FORK
      12             : 
      13             : #include "gdk_interprocess.h"
      14             : #include "gdk.h"
      15             : #include "gdk_private.h"
      16             : 
      17             : #include <string.h>
      18             : 
      19             : #include <sys/types.h>
      20             : #include <sys/ipc.h>
      21             : #include <sys/shm.h>
      22             : #include <sys/wait.h>
      23             : #include <unistd.h>
      24             : #include <sys/mman.h>
      25             : #include <sys/stat.h>
      26             : #include <fcntl.h>
      27             : #include <sched.h>
      28             : #include <sys/sem.h>
      29             : #include <time.h>
      30             : 
      31             : static ATOMIC_TYPE interprocess_unique_id = ATOMIC_VAR_INIT(1);
      32             : static key_t base_key = 800000000;
      33             : 
      34             : // Regular ftok produces too many collisions
      35             : static inline void
      36           0 : ftok_enhanced(int id, key_t * return_key)
      37             : {
      38           0 :         *return_key = base_key + id;
      39             : }
      40             : 
      41             : //! Obtain a set of unique identifiers that can be used to create memory mapped files or semaphores
      42             : /* offset: The amount of unique identifiers necessary
      43             :  * return: The first unique identifier reserved. The consecutive [offset] identifiers are also reserved.
      44             :  *  (ex. if offset = 5 and the return value is 10, then the identifiers 10-14 are reserved)
      45             : */
      46             : size_t
      47           0 : GDKuniqueid(size_t offset)
      48             : {
      49           0 :         return (size_t) ATOMIC_ADD(&interprocess_unique_id, (ATOMIC_BASE_TYPE) offset);
      50             : }
      51             : 
      52             : //! Create a memory mapped file if it does not exist and open it
      53             : /* id: The unique identifier of the memory mapped file (use GDKuniquemmapid to get a unique identifier)
      54             :  * size: Minimum required size of the file
      55             :  * return: Return value pointing into the file, NULL if not successful
      56             : */
      57             : void *
      58           0 : GDKinitmmap(size_t id, size_t size, size_t *return_size)
      59             : {
      60           0 :         char address[100];
      61           0 :         void *ptr;
      62           0 :         int fd;
      63           0 :         int mod = MMAP_READ | MMAP_WRITE | MMAP_SEQUENTIAL | MMAP_SYNC | MAP_SHARED;
      64           0 :         char *path;
      65             : 
      66           0 :         GDKmmapfile(address, sizeof(address), id);
      67             : 
      68             :         /* round up to multiple of GDK_mmap_pagesize with a
      69             :          * minimum of one
      70             :          size = (maxsize + GDK_mmap_pagesize - 1) & ~(GDK_mmap_pagesize - 1);
      71             :          if (size == 0)
      72             :          size = GDK_mmap_pagesize; */
      73           0 :         path = GDKfilepath(0, BATDIR, address, "tmp");
      74           0 :         if (path == NULL) {
      75             :                 return NULL;
      76             :         }
      77           0 :         fd = GDKfdlocate(NOFARM, path, "wb", NULL);
      78           0 :         if (fd < 0) {
      79           0 :                 GDKfree(path);
      80           0 :                 return NULL;
      81             :         }
      82           0 :         if (GDKextendf(fd, size, path) != GDK_SUCCEED) {
      83           0 :                 close(fd);
      84           0 :                 GDKfree(path);
      85           0 :                 return NULL;
      86             :         }
      87           0 :         close(fd);
      88           0 :         ptr = GDKmmap(path, mod, size);
      89           0 :         GDKfree(path);
      90           0 :         if (ptr == NULL) {
      91             :                 return NULL;
      92             :         }
      93           0 :         if (return_size != NULL) {
      94           0 :                 *return_size = size;
      95             :         }
      96             :         return ptr;
      97             : }
      98             : 
      99             : //! Release a memory mapped file that was created through GDKinitmmap
     100             : /* ptr: Pointer to the file
     101             :  * size: Size of the file
     102             :  * id: Identifier of the file
     103             :  * return: GDK_SUCCEED if successful, GDK_FAIL if not successful
     104             : */
     105             : gdk_return
     106           0 : GDKreleasemmap(void *ptr, size_t size, size_t id)
     107             : {
     108           0 :         char address[100];
     109           0 :         char *path;
     110           0 :         int ret;
     111           0 :         GDKmmapfile(address, sizeof(address), id);
     112           0 :         if (GDKmunmap(ptr, size) != GDK_SUCCEED) {
     113             :                 return GDK_FAIL;
     114             :         }
     115           0 :         path = GDKfilepath(0, BATDIR, address, "tmp");
     116           0 :         if (path == NULL) {
     117             :                 return GDK_FAIL;
     118             :         }
     119           0 :         ret = remove(path);
     120           0 :         if (ret < 0)
     121           0 :                 GDKsyserror("cannot remove '%s'", path);
     122           0 :         GDKfree(path);
     123           0 :         return ret < 0 ? GDK_FAIL : GDK_SUCCEED;
     124             : }
     125             : 
     126             : //! snprintf the file name of a memory mapped file (as created by GDKinitmmap)
     127             : /* buffer: The buffer to write the name to
     128             :  * max: The maxsize of the buffer (should be at least ~10 characters)
     129             :  * id: Identifier of the file
     130             : */
     131             : gdk_return
     132           0 : GDKmmapfile(str buffer, size_t max, size_t id)
     133             : {
     134           0 :         snprintf(buffer, max, "pymmap%zu", id);
     135           0 :         return GDK_SUCCEED;
     136             : }
     137             : 
     138             : static gdk_return
     139           0 : interprocess_init_semaphore(int id, int count, int flags, int *semid)
     140             : {
     141           0 :         key_t key;
     142           0 :         ftok_enhanced(id, &key);
     143           0 :         *semid = semget(key, count, flags | 0666);
     144           0 :         if (*semid < 0) {
     145           0 :                 GDKsyserror("semget failed");
     146           0 :                 return GDK_FAIL;
     147             :         }
     148             :         return GDK_SUCCEED;
     149             : }
     150             : 
     151             : //! Create an interprocess semaphore
     152             : /* id: identifier (obtain from GDKuniqueid)
     153             :  * count: amount of semaphores
     154             :  * semid: identifier of the created semaphore (only set if function returns GDK_SUCCEED)
     155             :  */
     156             : gdk_return
     157           0 : GDKcreatesem(int id, int count, int *semid)
     158             : {
     159           0 :         return interprocess_init_semaphore(id, count, IPC_CREAT, semid);
     160             : }
     161             : 
     162             : //! Get an interprocess semaphore that was already created using GDKcreatesem
     163             : /* id: identifier (obtain from GDKuniqueid)
     164             :  * count: amount of semaphores
     165             :  * semid: identifier of the semaphore (only set if function returns GDK_SUCCEED)
     166             :  */
     167             : gdk_return
     168           0 : GDKgetsem(int id, int count, int *semid)
     169             : {
     170           0 :         return interprocess_init_semaphore(id, count, 0, semid);
     171             : }
     172             : 
     173             : //! Gets the value of an interprocess semaphore
     174             : /* sem_id: semaphore identifier (obtained from GDKcreatesem or GDKgetsem)
     175             :  * number: the semaphore number (must be less than 'count' given when creating the semaphore)
     176             :  * semval: the value of the semaphore (only set if function returns GDK_SUCCEED)
     177             :  */
     178             : gdk_return
     179           0 : GDKgetsemval(int sem_id, int number, int *semval)
     180             : {
     181           0 :         *semval = semctl(sem_id, number, GETVAL, 0);
     182           0 :         if (*semval < 0) {
     183           0 :                 GDKsyserror("semctl failed");
     184           0 :                 return GDK_FAIL;
     185             :         }
     186             :         return GDK_SUCCEED;
     187             : }
     188             : 
     189             : //! Change the value of an interprocess semaphore
     190             : /* sem_id: semaphore identifier (obtained from GDKcreatesem or GDKgetsem)
     191             :  * number: the semaphore number (must be less than 'count' given when creating the semaphore)
     192             :  * change: The change to apply to the semaphore value
     193             :  */
     194             : gdk_return
     195           0 : GDKchangesemval(int sem_id, int number, int change)
     196             : {
     197           0 :         struct sembuf buffer;
     198           0 :         buffer.sem_num = number;
     199           0 :         buffer.sem_op = change;
     200           0 :         buffer.sem_flg = 0;
     201             : 
     202           0 :         if (semop(sem_id, &buffer, 1) < 0) {
     203           0 :                 GDKsyserror("semop failed");
     204           0 :                 return GDK_FAIL;
     205             :         }
     206             :         return GDK_SUCCEED;
     207             : }
     208             : 
     209             : //! Change the value of an interprocess semaphore with a timeout
     210             : /* sem_id: semaphore identifier (obtained from GDKcreatesem or GDKgetsem)
     211             :  * number: the semaphore number (must be less than 'count' given when creating the semaphore)
     212             :  * change: The change to apply to the semaphore value
     213             :  * timeout_mseconds: The timeout in milliseconds
     214             :  * succeed: Set to true if the value was successfully changed, or false if the timeout was reached
     215             :  */
     216             : gdk_return
     217           0 : GDKchangesemval_timeout(int sem_id, int number, int change, int timeout_mseconds, bool *succeed)
     218             : {
     219             : #ifdef HAVE_SEMTIMEDOP
     220             :         // Some linux installations don't have semtimedop
     221             :         // The easiest solution is to just call semop instead
     222             :         // The only reason we use semtimedop is to prevent deadlocks when there are segfaults in a subprocess, which really shouldn't happen anyway
     223             :         // So having semtimedop is not vital to the functioning of the program
     224           0 :         struct timespec timeout;
     225           0 :         struct sembuf buffer;
     226           0 :         buffer.sem_num = number;
     227           0 :         buffer.sem_op = change;
     228           0 :         buffer.sem_flg = 0;
     229           0 :         *succeed = false;
     230             : 
     231           0 :         timeout.tv_sec = (timeout_mseconds / 1000);
     232           0 :         timeout.tv_nsec = (timeout_mseconds % 1000) * 1000;
     233             : 
     234           0 :         if (semtimedop(sem_id, &buffer, 1, &timeout) < 0) {
     235           0 :                 if (errno == EAGAIN || errno == EINTR) {
     236             :                         // operation timed out; not an error
     237           0 :                         errno = 0;
     238           0 :                         return GDK_SUCCEED;
     239             :                 } else {
     240           0 :                         GDKsyserror("semtimedop failed");
     241           0 :                         return GDK_FAIL;
     242             :                 }
     243             :         }
     244           0 :         *succeed = true;
     245           0 :         return GDK_SUCCEED;
     246             : #else
     247             :         (void) timeout_mseconds;
     248             :         *succeed = true;
     249             :         return GDKchangesemval(sem_id, number, change);
     250             : #endif
     251             : }
     252             : 
     253             : //! Destroy an interprocess semaphore
     254             : /* sem_id: semaphore identifier (obtained from GDKcreatesem or GDKgetsem)
     255             :  */
     256             : gdk_return
     257           0 : GDKreleasesem(int sem_id)
     258             : {
     259           0 :         if (semctl(sem_id, 0, IPC_RMID) < 0) {
     260           0 :                 GDKsyserror("semctl failed");
     261           0 :                 return GDK_FAIL;
     262             :         }
     263             :         return GDK_SUCCEED;
     264             : }
     265             : 
     266             : // align to 8 bytes
     267             : #define align(sz) ((sz + 7) & ~7)
     268             : 
     269             : size_t
     270           0 : GDKbatcopysize(BAT *bat, str colname)
     271             : {
     272           0 :         size_t size = 0;
     273             : 
     274           0 :         size += align(strlen(colname) + 1);     //[COLNAME]
     275           0 :         size += align(sizeof(BAT));     //[BAT]
     276           0 :         size += align(bat->twidth * BATcount(bat));  //[DATA]
     277             : 
     278           0 :         if (bat->tvheap != NULL) {
     279           0 :                 size += align(sizeof(Heap));    //[VHEAP]
     280           0 :                 size += align(bat->tvheap->size); //[VHEAPDATA]
     281             :         }
     282           0 :         return size;
     283             : }
     284             : 
     285             : size_t
     286           0 : GDKbatcopy(char *dest, BAT *bat, str colname)
     287             : {
     288           0 :         size_t batsize = bat->twidth * BATcount(bat);
     289           0 :         size_t position = 0;
     290             : 
     291             :         //[COLNAME]
     292           0 :         memcpy(dest + position, colname, strlen(colname) + 1);
     293           0 :         position += align(strlen(colname) + 1);
     294             :         //[BAT]
     295           0 :         memcpy(dest + position, bat, sizeof(BAT));
     296           0 :         position += align(sizeof(BAT));
     297             :         //[DATA]
     298           0 :         memcpy(dest + position, Tloc(bat, 0), batsize);
     299           0 :         position += align(batsize);
     300           0 :         if (bat->tvheap != NULL) {
     301             :                 //[VHEAP]
     302           0 :                 memcpy(dest + position, bat->tvheap, sizeof(Heap));
     303           0 :                 position += align(sizeof(Heap));
     304             :                 //[VHEAPDATA]
     305           0 :                 memcpy(dest + position, bat->tvheap->base, bat->tvheap->size);
     306           0 :                 position += align(bat->tvheap->size);
     307             :         }
     308           0 :         return position;
     309             : }
     310             : 
     311             : size_t
     312           0 : GDKbatread(char *src, BAT **bat, str *colname)
     313             : {
     314           0 :         size_t position = 0;
     315           0 :         BAT *b;
     316             :         //load the data for this column from shared memory
     317             :         //[COLNAME]
     318           0 :         *colname = src + position;
     319           0 :         position += align(strlen(*colname) + 1);
     320             :         //[BAT]
     321           0 :         b = (BAT *) (src + position);
     322           0 :         position += align(sizeof(BAT));
     323             :         //[DATA]
     324           0 :         b->theap.base = (void *) (src + position);
     325           0 :         position += align(b->twidth * BATcount(b));
     326           0 :         if (b->tvheap != NULL) {
     327             :                 //[VHEAP]
     328           0 :                 b->tvheap = (Heap *) (src + position);
     329           0 :                 position += align(sizeof(Heap));
     330             :                 //[VHEAPDATA]
     331           0 :                 b->tvheap->base = (void *) (src + position);
     332           0 :                 position += align(b->tvheap->size);
     333             :         }
     334           0 :         *bat = b;
     335           0 :         return position;
     336             : }
     337             : 
     338             : #endif

Generated by: LCOV version 1.14