LCOV - code coverage report
Current view: top level - monetdb5/mal - mal_dataflow.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 372 493 75.5 %
Date: 2021-09-14 22:17:06 Functions: 12 15 80.0 %

          Line data    Source code
       1             : /*
       2             :  * This Source Code Form is subject to the terms of the Mozilla Public
       3             :  * License, v. 2.0.  If a copy of the MPL was not distributed with this
       4             :  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
       5             :  *
       6             :  * Copyright 1997 - July 2008 CWI, August 2008 - 2021 MonetDB B.V.
       7             :  */
       8             : 
       9             : /*
      10             :  * (author) M Kersten, S Mullender
      11             :  * Dataflow processing only works on a code
      12             :  * sequence that does not include additional (implicit) flow of control
      13             :  * statements and, ideally, consist of expensive BAT operations.
      14             :  * The dataflow portion is identified as a guarded block,
      15             :  * whose entry is controlled by the function language.dataflow();
      16             :  *
      17             :  * The dataflow worker tries to follow the sequence of actions
      18             :  * as layed out in the plan, but abandon this track when it hits
      19             :  * a blocking operator, or an instruction for which not all arguments
      20             :  * are available or resources become scarce.
      21             :  *
      22             :  * The flow graphs is organized such that parallel threads can
      23             :  * access it mostly without expensive locking and dependent
      24             :  * variables are easy to find..
      25             :  */
      26             : #include "monetdb_config.h"
      27             : #include "mal_dataflow.h"
      28             : #include "mal_exception.h"
      29             : #include "mal_private.h"
      30             : #include "mal_internal.h"
      31             : #include "mal_runtime.h"
      32             : #include "mal_resource.h"
      33             : #include "mal_function.h"
      34             : 
      35             : #define DFLOWpending 0          /* runnable */
      36             : #define DFLOWrunning 1          /* currently in progress */
      37             : #define DFLOWwrapup  2          /* done! */
      38             : #define DFLOWretry   3          /* reschedule */
      39             : #define DFLOWskipped 4          /* due to errors */
      40             : 
      41             : /* The per instruction status of execution */
      42             : typedef struct FLOWEVENT {
      43             :         struct DATAFLOW *flow;/* execution context */
      44             :         int pc;         /* pc in underlying malblock */
      45             :         int blocks;     /* awaiting for variables */
      46             :         sht state;      /* of execution */
      47             :         lng clk;
      48             :         sht cost;
      49             :         lng hotclaim;   /* memory foot print of result variables */
      50             :         lng argclaim;   /* memory foot print of arguments */
      51             :         lng maxclaim;   /* memory foot print of  largest argument, counld be used to indicate result size */
      52             : } *FlowEvent, FlowEventRec;
      53             : 
      54             : typedef struct queue {
      55             :         int size;       /* size of queue */
      56             :         int last;       /* last element in the queue */
      57             :         int exitcount;  /* how many threads should exit */
      58             :         FlowEvent *data;
      59             :         MT_Lock l;      /* it's a shared resource, ie we need locks */
      60             :         MT_Sema s;      /* threads wait on empty queues */
      61             : } Queue;
      62             : 
      63             : /*
      64             :  * The dataflow dependency is administered in a graph list structure.
      65             :  * For each instruction we keep the list of instructions that
      66             :  * should be checked for eligibility once we are finished with it.
      67             :  */
      68             : typedef struct DATAFLOW {
      69             :         Client cntxt;   /* for debugging and client resolution */
      70             :         MalBlkPtr mb;   /* carry the context */
      71             :         MalStkPtr stk;
      72             :         int start, stop;    /* guarded block under consideration*/
      73             :         FlowEvent status;   /* status of each instruction */
      74             :         ATOMIC_PTR_TYPE error;          /* error encountered */
      75             :         int *nodes;         /* dependency graph nodes */
      76             :         int *edges;         /* dependency graph */
      77             :         MT_Lock flowlock;   /* lock to protect the above */
      78             :         Queue *done;        /* instructions handled */
      79             : } *DataFlow, DataFlowRec;
      80             : 
      81             : static struct worker {
      82             :         MT_Id id;
      83             :         enum {IDLE, WAITING, RUNNING, FREE, EXITED } flag;
      84             :         ATOMIC_PTR_TYPE cntxt; /* client we do work for (NULL -> any) */
      85             :         char *errbuf;              /* GDKerrbuf so that we can allocate before fork */
      86             :         MT_Sema s;
      87             :         int self;
      88             :         int next;
      89             : } workers[THREADS];
      90             : /* heads of two mutually exclusive linked lists, both using the .next
      91             :  * field in the worker struct */
      92             : static int exited_workers = -1; /* to be joined threads */
      93             : static int idle_workers = -1;   /* idle workers (no thread associated) */
      94             : static int free_workers = -1;   /* free workers (thread doing nothing) */
      95             : static int free_count = 0;              /* number of free threads */
      96             : static int free_max = 0;                /* max number of spare free threads */
      97             : 
      98             : static Queue *todo = 0; /* pending instructions */
      99             : 
     100             : static ATOMIC_TYPE exiting = ATOMIC_VAR_INIT(0);
     101             : static MT_Lock dataflowLock = MT_LOCK_INITIALIZER(dataflowLock);
     102             : static void stopMALdataflow(void);
     103             : 
     104             : void
     105         264 : mal_dataflow_reset(void)
     106             : {
     107         264 :         stopMALdataflow();
     108         264 :         memset((char*) workers, 0,  sizeof(workers));
     109         264 :         idle_workers = -1;
     110         264 :         exited_workers = -1;
     111         264 :         if( todo) {
     112         264 :                 GDKfree(todo->data);
     113         264 :                 MT_lock_destroy(&todo->l);
     114         264 :                 MT_sema_destroy(&todo->s);
     115         264 :                 GDKfree(todo);
     116             :         }
     117         264 :         todo = 0;       /* pending instructions */
     118         264 :         ATOMIC_SET(&exiting, 0);
     119         264 : }
     120             : 
     121             : /*
     122             :  * Calculate the size of the dataflow dependency graph.
     123             :  */
     124             : static int
     125             : DFLOWgraphSize(MalBlkPtr mb, int start, int stop)
     126             : {
     127             :         int cnt = 0;
     128             :         int i;
     129             : 
     130    17217704 :         for (i = start; i < stop; i++)
     131    17091761 :                 cnt += getInstrPtr(mb, i)->argc;
     132             :         return cnt;
     133             : }
     134             : 
     135             : /*
     136             :  * The dataflow execution is confined to a barrier block.
     137             :  * Within the block there are multiple flows, which, in principle,
     138             :  * can be executed in parallel.
     139             :  */
     140             : 
     141             : static Queue*
     142      126208 : q_create(int sz, const char *name)
     143             : {
     144      126208 :         Queue *q = (Queue*)GDKmalloc(sizeof(Queue));
     145             : 
     146      126208 :         if (q == NULL)
     147             :                 return NULL;
     148      126208 :         *q = (Queue) {
     149      126208 :                 .size = ((sz << 1) >> 1), /* we want a multiple of 2 */
     150             :         };
     151      126208 :         q->data = (FlowEvent*) GDKmalloc(sizeof(FlowEvent) * q->size);
     152      126208 :         if (q->data == NULL) {
     153           0 :                 GDKfree(q);
     154           0 :                 return NULL;
     155             :         }
     156             : 
     157      126208 :         MT_lock_init(&q->l, name);
     158      126208 :         MT_sema_init(&q->s, 0, name);
     159      126208 :         return q;
     160             : }
     161             : 
     162             : static void
     163      125943 : q_destroy(Queue *q)
     164             : {
     165      125943 :         assert(q);
     166      125943 :         MT_lock_destroy(&q->l);
     167      125943 :         MT_sema_destroy(&q->s);
     168      125943 :         GDKfree(q->data);
     169      125943 :         GDKfree(q);
     170      125943 : }
     171             : 
     172             : /* keep a simple LIFO queue. It won't be a large one, so shuffles of requeue is possible */
     173             : /* we might actually sort it for better scheduling behavior */
     174             : static void
     175    26309471 : q_enqueue_(Queue *q, FlowEvent d)
     176             : {
     177    26309471 :         assert(q);
     178    26309471 :         assert(d);
     179    26309471 :         if (q->last == q->size) {
     180           2 :                 q->size <<= 1;
     181           2 :                 q->data = (FlowEvent*) GDKrealloc(q->data, sizeof(FlowEvent) * q->size);
     182           2 :                 assert(q->data);
     183             :         }
     184    26309471 :         q->data[q->last++] = d;
     185    26309471 : }
     186             : static void
     187    26271168 : q_enqueue(Queue *q, FlowEvent d)
     188             : {
     189    26271168 :         assert(q);
     190    26271168 :         assert(d);
     191    26271168 :         MT_lock_set(&q->l);
     192    26305363 :         q_enqueue_(q, d);
     193    26310023 :         MT_lock_unset(&q->l);
     194    26300022 :         MT_sema_up(&q->s);
     195    26220860 : }
     196             : 
     197             : /*
     198             :  * A priority queue over the hot claims of memory may
     199             :  * be more effective. It priorizes those instructions
     200             :  * that want to use a big recent result
     201             :  */
     202             : 
     203             : static void
     204           0 : q_requeue_(Queue *q, FlowEvent d)
     205             : {
     206             :         int i;
     207             : 
     208           0 :         assert(q);
     209           0 :         assert(d);
     210           0 :         if (q->last == q->size) {
     211             :                 /* enlarge buffer */
     212           0 :                 q->size <<= 1;
     213           0 :                 q->data = (FlowEvent*) GDKrealloc(q->data, sizeof(FlowEvent) * q->size);
     214           0 :                 assert(q->data);
     215             :         }
     216           0 :         for (i = q->last; i > 0; i--)
     217           0 :                 q->data[i] = q->data[i - 1];
     218           0 :         q->data[0] = d;
     219           0 :         q->last++;
     220           0 : }
     221             : static void
     222           0 : q_requeue(Queue *q, FlowEvent d)
     223             : {
     224           0 :         assert(q);
     225           0 :         assert(d);
     226           0 :         MT_lock_set(&q->l);
     227           0 :         q_requeue_(q, d);
     228           0 :         MT_lock_unset(&q->l);
     229           0 :         MT_sema_up(&q->s);
     230           0 : }
     231             : 
     232             : static FlowEvent
     233    26258437 : q_dequeue(Queue *q, Client cntxt)
     234             : {
     235             :         FlowEvent r = NULL, s = NULL;
     236             :         //int i;
     237             : 
     238    26258437 :         assert(q);
     239    26258437 :         MT_sema_down(&q->s);
     240    26345881 :         if (ATOMIC_GET(&exiting))
     241             :                 return NULL;
     242    26344028 :         MT_lock_set(&q->l);
     243    26416036 :         if( cntxt == NULL && q->exitcount > 0){
     244      125943 :                 q->exitcount--;
     245      125943 :                 MT_lock_unset(&q->l);
     246      125943 :                 return NULL;
     247             :         }
     248             :         {
     249             :                 int i, minpc;
     250             : 
     251    26290093 :                 minpc = q->last -1;
     252    26290093 :                 s = q->data[minpc];
     253             :                 /* for long "queues", just grab the first eligible entry we encounter */
     254    26290093 :                 if (q->last < 1024) {
     255   442947005 :                         for (i = q->last - 1; i >= 0; i--) {
     256   416663590 :                                 if ( cntxt ==  NULL || q->data[i]->flow->cntxt == cntxt) {
     257             :                                         /* for shorter "queues", find the oldest eligible entry */
     258   416661770 :                                         r = q->data[i];
     259   416661770 :                                         if (s && r && s->pc > r->pc) {
     260             :                                                 minpc = i;
     261             :                                                 s = r;
     262             :                                         }
     263             :                                 }
     264             :                         }
     265             :                 }
     266    26290093 :                 if (minpc >= 0) {
     267    26244127 :                         r = q->data[minpc];
     268             :                         i = minpc;
     269    26244127 :                         q->last--;
     270    26244127 :                         memmove(q->data + i, q->data + i + 1, (q->last - i) * sizeof(q->data[0]));
     271             :                 }
     272             :         }
     273    26290093 :         MT_lock_unset(&q->l);
     274    26243649 :         return r;
     275             : }
     276             : 
     277             : /*
     278             :  * We simply move an instruction into the front of the queue.
     279             :  * Beware, we assume that variables are assigned a value once, otherwise
     280             :  * the order may really create errors.
     281             :  * The order of the instructions should be retained as long as possible.
     282             :  * Delay processing when we run out of memory.  Push the instruction back
     283             :  * on the end of queue, waiting for another attempt. Problem might become
     284             :  * that all threads but one are cycling through the queue, each time
     285             :  * finding an eligible instruction, but without enough space.
     286             :  * Therefore, we wait for a few milliseconds as an initial punishment.
     287             :  *
     288             :  * The process could be refined by checking for cheap operations,
     289             :  * i.e. those that would require no memory at all (aggr.count)
     290             :  * This, however, would lead to a dependency to the upper layers,
     291             :  * because in the kernel we don't know what routines are available
     292             :  * with this property. Nor do we maintain such properties.
     293             :  */
     294             : 
     295             : static void
     296        2345 : DFLOWworker(void *T)
     297             : {
     298             :         struct worker *t = (struct worker *) T;
     299             : #ifdef _MSC_VER
     300             :         srand((unsigned int) GDKusec());
     301             : #endif
     302        2345 :         assert(t->errbuf != NULL);
     303        2345 :         GDKsetbuf(t->errbuf);                /* where to leave errors */
     304        2346 :         t->errbuf = NULL;
     305             : 
     306             :         for (;;) {
     307             :                 DataFlow flow;
     308             :                 FlowEvent fe = 0, fnxt = 0;
     309             :                 str error = 0;
     310             :                 int i;
     311             :                 lng claim;
     312             :                 Client cntxt;
     313             :                 InstrPtr p;
     314             : 
     315      127792 :                 GDKclrerr();
     316             : 
     317      111975 :                 if (t->flag == WAITING) {
     318             :                         /* wait until we are allowed to start working */
     319      125943 :                         MT_sema_down(&t->s);
     320      125943 :                         t->flag = RUNNING;
     321             :                 }
     322      111975 :                 assert(t->flag == RUNNING);
     323      111975 :                 cntxt = ATOMIC_PTR_GET(&t->cntxt);
     324             :                 while (1) {
     325    16996663 :                         if (fnxt == 0) {
     326     9467417 :                                 MT_thread_setworking(NULL);
     327     9463057 :                                 cntxt = ATOMIC_PTR_GET(&t->cntxt);
     328     9463057 :                                 fe = q_dequeue(todo, cntxt);
     329     9499237 :                                 if (fe == NULL) {
     330      146322 :                                         if (cntxt) {
     331             :                                                 /* we're not done yet with work for the current
     332             :                                                  * client (as far as we know), so give up the CPU
     333             :                                                  * and let the scheduler enter some more work, but
     334             :                                                  * first compensate for the down we did in
     335             :                                                  * dequeue */
     336       18920 :                                                 MT_sema_up(&todo->s);
     337       18920 :                                                 MT_sleep_ms(1);
     338       18909 :                                                 continue;
     339             :                                         }
     340             :                                         /* no more work to be done: exit */
     341             :                                         break;
     342             :                                 }
     343     9352915 :                                 if (fe->flow->cntxt && fe->flow->cntxt->mythread)
     344     9351657 :                                         MT_thread_setworking(fe->flow->cntxt->mythread->name);
     345             :                         } else
     346             :                                 fe = fnxt;
     347    16884041 :                         if (ATOMIC_GET(&exiting)) {
     348             :                                 break;
     349             :                         }
     350             :                         fnxt = 0;
     351    16884041 :                         assert(fe);
     352    16884041 :                         flow = fe->flow;
     353    16884041 :                         assert(flow);
     354             : 
     355             :                         /* whenever we have a (concurrent) error, skip it */
     356    16884041 :                         if (ATOMIC_PTR_GET(&flow->error)) {
     357        2211 :                                 q_enqueue(flow->done, fe);
     358        2212 :                                 continue;
     359             :                         }
     360             : 
     361    16881830 :                         p= getInstrPtr(flow->mb,fe->pc);
     362    16881830 :                         claim = fe->argclaim;
     363    16881830 :                         if (MALadmission_claim(flow->cntxt, flow->mb, flow->stk, p, claim)) {
     364             :                                 // never block on deblockdataflow()
     365           0 :                                 if( p->fcn != (MALfcn) deblockdataflow){
     366           0 :                                         fe->hotclaim = 0;   /* don't assume priority anymore */
     367           0 :                                         fe->maxclaim = 0;
     368           0 :                                         if (todo->last == 0)
     369           0 :                                                 MT_sleep_ms(DELAYUNIT);
     370           0 :                                         q_requeue(todo, fe);
     371           0 :                                         continue;
     372             :                                 }
     373             :                         }
     374    16895812 :                         error = runMALsequence(flow->cntxt, flow->mb, fe->pc, fe->pc + 1, flow->stk, 0, 0);
     375             :                         /* release the memory claim */
     376    16827950 :                         MALadmission_release(flow->cntxt, flow->mb, flow->stk, p,  claim);
     377             : 
     378    16827149 :                         MT_lock_set(&flow->flowlock);
     379    16963601 :                         fe->state = DFLOWwrapup;
     380    16963601 :                         MT_lock_unset(&flow->flowlock);
     381    16949611 :                         if (error) {
     382             :                                 void *null = NULL;
     383             :                                 /* only collect one error (from one thread, needed for stable testing) */
     384         408 :                                 if (!ATOMIC_PTR_CAS(&flow->error, &null, error))
     385           6 :                                         freeException(error);
     386             :                                 /* after an error we skip the rest of the block */
     387         408 :                                 q_enqueue(flow->done, fe);
     388         408 :                                 continue;
     389             :                         }
     390             : 
     391             :                         /* see if you can find an eligible instruction that uses the
     392             :                          * result just produced. Then we can continue with it right away.
     393             :                          * We are just looking forward for the last block, which means we
     394             :                          * are safe from concurrent actions. No other thread can steal it,
     395             :                          * because we hold the logical lock.
     396             :                          * All eligible instructions are queued
     397             :                          */
     398    16949203 :                         p = getInstrPtr(flow->mb, fe->pc);
     399    16949203 :                         assert(p);
     400    16949203 :                         fe->hotclaim = 0;
     401    16949203 :                         fe->maxclaim = 0;
     402             : 
     403    35521512 :                         for (i = 0; i < p->retc; i++){
     404             :                                 lng footprint;
     405    18620798 :                                 footprint = getMemoryClaim(flow->mb, flow->stk, p, i, FALSE);
     406    18572309 :                                 fe->hotclaim += footprint;
     407    18572309 :                                 if( footprint > fe->maxclaim)
     408     5857890 :                                         fe->maxclaim = footprint;
     409             :                         }
     410             : 
     411             : /* Try to get rid of the hot potato or locate an alternative to proceed.
     412             :  */
     413             : #define HOTPOTATO
     414             : #ifdef HOTPOTATO
     415             :                         /* HOT potato choice */
     416             :                         int last = 0, nxt = -1;
     417             :                         lng nxtclaim = -1;
     418             : 
     419    16900714 :                         MT_lock_set(&flow->flowlock);
     420    66519519 :                         for (last = fe->pc - flow->start; last >= 0 && (i = flow->nodes[last]) > 0; last = flow->edges[last]){
     421    49556325 :                                 if (flow->status[i].state == DFLOWpending && flow->status[i].blocks == 1) {
     422             :                                         /* find the one with the largest footprint */
     423    14654852 :                                         if( nxt == -1){
     424             :                                                 nxt = i;
     425     7607630 :                                                 nxtclaim = flow->status[i].argclaim;
     426             :                                         }
     427    14654852 :                                         if( flow->status[i].argclaim > nxtclaim){
     428             :                                                 nxt = i;
     429             :                                                 nxtclaim =  flow->status[i].argclaim;
     430             :                                         }
     431             :                                 }
     432             :                         }
     433             :                         /* hot potato can not be removed, use alternative to proceed */
     434    16963194 :                         if( nxt >= 0){
     435     7607630 :                                 flow->status[nxt].state = DFLOWrunning;
     436     7607630 :                                 flow->status[nxt].blocks = 0;
     437     7607630 :                                 flow->status[nxt].hotclaim = fe->hotclaim;
     438     7607630 :                                 flow->status[nxt].argclaim += fe->hotclaim;
     439     7607630 :                                 if( flow->status[nxt].maxclaim < fe->maxclaim)
     440     3098586 :                                         flow->status[nxt].maxclaim = fe->maxclaim;
     441             :                                 fnxt = flow->status + nxt;
     442             :                         }
     443    16963194 :                         MT_lock_unset(&flow->flowlock);
     444             : #endif
     445             : 
     446    16949232 :                         q_enqueue(flow->done, fe);
     447    16865779 :                         if ( fnxt == 0 && malProfileMode) {
     448           0 :                                 profilerHeartbeatEvent("wait");
     449             :                         }
     450             :                 }
     451      127402 :                 MT_lock_set(&dataflowLock);
     452      127785 :                 if (GDKexiting() || ATOMIC_GET(&exiting)) {
     453        1872 :                         MT_lock_unset(&dataflowLock);
     454        1862 :                         break;
     455             :                 }
     456      125913 :                 if (free_count >= free_max) {
     457           0 :                         t->flag = EXITED;
     458           0 :                         t->next = exited_workers;
     459           0 :                         exited_workers = t->self;
     460           0 :                         MT_lock_unset(&dataflowLock);
     461           0 :                         break;
     462             :                 }
     463      125913 :                 free_count++;
     464      125913 :                 t->flag = FREE;
     465      125913 :                 assert(free_workers != t->self);
     466      125913 :                 t->next = free_workers;
     467      125913 :                 free_workers = t->self;
     468      125913 :                 MT_lock_unset(&dataflowLock);
     469      125913 :                 MT_sema_down(&t->s);
     470      125913 :                 if (GDKexiting() || ATOMIC_GET(&exiting))
     471             :                         break;
     472      125446 :                 assert(t->flag == WAITING);
     473             :         }
     474        2328 :         GDKfree(GDKerrbuf);
     475        2339 :         GDKsetbuf(0);
     476        2332 : }
     477             : 
     478             : /*
     479             :  * Create an interpreter pool.
     480             :  * One worker will adaptively be available for each client.
     481             :  * The remainder are taken from the GDKnr_threads argument and
     482             :  * typically is equal to the number of cores
     483             :  * The workers are assembled in a local table to enable debugging.
     484             :  */
     485             : static int
     486         265 : DFLOWinitialize(void)
     487             : {
     488             :         int i, limit;
     489             :         int created = 0;
     490             :         static bool first = true;
     491             : 
     492         265 :         MT_lock_set(&mal_contextLock);
     493         265 :         MT_lock_set(&dataflowLock);
     494         265 :         if (todo) {
     495             :                 /* somebody else beat us to it */
     496           0 :                 MT_lock_unset(&dataflowLock);
     497           0 :                 MT_lock_unset(&mal_contextLock);
     498           0 :                 return 0;
     499             :         }
     500         265 :         free_max = GDKgetenv_int("dataflow_max_free", GDKnr_threads < 4 ? 4 : GDKnr_threads);
     501         265 :         todo = q_create(2048, "todo");
     502         265 :         if (todo == NULL) {
     503           0 :                 MT_lock_unset(&dataflowLock);
     504           0 :                 MT_lock_unset(&mal_contextLock);
     505           0 :                 return -1;
     506             :         }
     507         265 :         assert(idle_workers == -1);
     508      271625 :         for (i = 0; i < THREADS; i++) {
     509             :                 char name[MT_NAME_LEN];
     510      271360 :                 snprintf(name, sizeof(name), "DFLOWsema%d", i);
     511      271360 :                 MT_sema_init(&workers[i].s, 0, name);
     512      271360 :                 workers[i].flag = IDLE;
     513      271360 :                 workers[i].self = i;
     514      271360 :                 workers[i].next = idle_workers;
     515      271360 :                 idle_workers = i;
     516      271360 :                 if (first)                              /* only initialize once */
     517      261120 :                         ATOMIC_PTR_INIT(&workers[i].cntxt, NULL);
     518             :         }
     519         265 :         first = false;
     520         265 :         limit = GDKnr_threads ? GDKnr_threads - 1 : 0;
     521             :         if (limit > THREADS)
     522             :                 limit = THREADS;
     523        2114 :         while (limit > 0) {
     524        1849 :                 limit--;
     525        1849 :                 i = idle_workers;
     526        1849 :                 workers[i].errbuf = GDKmalloc(GDKMAXERRLEN);
     527        1849 :                 if (workers[i].errbuf == NULL) {
     528           0 :                         TRC_CRITICAL(MAL_SERVER, "cannot allocate error buffer for worker");
     529           0 :                         continue;
     530             :                 }
     531        1849 :                 idle_workers = workers[i].next;
     532        1849 :                 workers[i].flag = RUNNING;
     533        1849 :                 ATOMIC_PTR_SET(&workers[i].cntxt, NULL);
     534             :                 char name[MT_NAME_LEN];
     535        1849 :                 snprintf(name, sizeof(name), "DFLOWworker%d", i);
     536        1849 :                 if ((workers[i].id = THRcreate(DFLOWworker, (void *) &workers[i], MT_THR_JOINABLE, name)) == 0) {
     537           0 :                         GDKfree(workers[i].errbuf);
     538           0 :                         workers[i].errbuf = NULL;
     539           0 :                         workers[i].flag = IDLE;
     540           0 :                         workers[i].next = idle_workers;
     541           0 :                         idle_workers = i;
     542             :                 } else {
     543        1849 :                         created++;
     544             :                 }
     545             :         }
     546         265 :         if (created == 0) {
     547             :                 /* no threads created */
     548           0 :                 q_destroy(todo);
     549           0 :                 todo = NULL;
     550           0 :                 MT_lock_unset(&dataflowLock);
     551           0 :                 MT_lock_unset(&mal_contextLock);
     552           0 :                 return -1;
     553             :         }
     554         265 :         MT_lock_unset(&dataflowLock);
     555         265 :         MT_lock_unset(&mal_contextLock);
     556         265 :         return 0;
     557             : }
     558             : 
     559             : /*
     560             :  * The dataflow administration is based on administration of
     561             :  * how many variables are still missing before it can be executed.
     562             :  * For each instruction we keep a list of instructions whose
     563             :  * blocking counter should be decremented upon finishing it.
     564             :  */
     565             : static str
     566      125943 : DFLOWinitBlk(DataFlow flow, MalBlkPtr mb, int size)
     567             : {
     568             :         int pc, i, j, k, l, n, etop = 0;
     569             :         int *assign;
     570             :         InstrPtr p;
     571             : 
     572      125943 :         if (flow == NULL)
     573           0 :                 throw(MAL, "dataflow", "DFLOWinitBlk(): Called with flow == NULL");
     574      125943 :         if (mb == NULL)
     575           0 :                 throw(MAL, "dataflow", "DFLOWinitBlk(): Called with mb == NULL");
     576      125943 :         assign = (int *) GDKzalloc(mb->vtop * sizeof(int));
     577      125943 :         if (assign == NULL)
     578           0 :                 throw(MAL, "dataflow", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     579      125943 :         etop = flow->stop - flow->start;
     580    17091761 :         for (n = 0, pc = flow->start; pc < flow->stop; pc++, n++) {
     581    16965818 :                 p = getInstrPtr(mb, pc);
     582    16965818 :                 if (p == NULL) {
     583           0 :                         GDKfree(assign);
     584           0 :                         throw(MAL, "dataflow", "DFLOWinitBlk(): getInstrPtr() returned NULL");
     585             :                 }
     586             : 
     587             :                 /* initial state, ie everything can run */
     588    16965818 :                 flow->status[n].flow = flow;
     589    16965818 :                 flow->status[n].pc = pc;
     590    16965818 :                 flow->status[n].state = DFLOWpending;
     591    16965818 :                 flow->status[n].cost = -1;
     592    16965818 :                 ATOMIC_PTR_SET(&flow->status[n].flow->error, NULL);
     593             : 
     594             :                 /* administer flow dependencies */
     595    75665959 :                 for (j = p->retc; j < p->argc; j++) {
     596             :                         /* list of instructions that wake n-th instruction up */
     597    58700141 :                         if (!isVarConstant(mb, getArg(p, j)) && (k = assign[getArg(p, j)])) {
     598    31273036 :                                 assert(k < pc); /* only dependencies on earlier instructions */
     599             :                                 /* add edge to the target instruction for wakeup call */
     600    31273036 :                                 k -= flow->start;
     601    31273036 :                                 if (flow->nodes[k]) {
     602             :                                         /* add wakeup to tail of list */
     603   283443974 :                                         for (i = k; flow->edges[i] > 0; i = flow->edges[i])
     604             :                                                 ;
     605    27367434 :                                         flow->nodes[etop] = n;
     606    27367434 :                                         flow->edges[etop] = -1;
     607    27367434 :                                         flow->edges[i] = etop;
     608    27367434 :                                         etop++;
     609             :                                         (void) size;
     610    27367434 :                                         if( etop == size){
     611             :                                                 int *tmp;
     612             :                                                 /* in case of realloc failure, the original
     613             :                                                  * pointers will be freed by the caller */
     614         213 :                                                 tmp = (int*) GDKrealloc(flow->nodes, sizeof(int) * 2 * size);
     615         213 :                                                 if (tmp == NULL) {
     616           0 :                                                         GDKfree(assign);
     617           0 :                                                         throw(MAL, "dataflow", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     618             :                                                 }
     619         213 :                                                 flow->nodes = tmp;
     620         213 :                                                 tmp = (int*) GDKrealloc(flow->edges, sizeof(int) * 2 * size);
     621         213 :                                                 if (tmp == NULL) {
     622           0 :                                                         GDKfree(assign);
     623           0 :                                                         throw(MAL, "dataflow", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     624             :                                                 }
     625         213 :                                                 flow->edges = tmp;
     626         213 :                                                 size *=2;
     627             :                                         }
     628             :                                 } else {
     629     3905602 :                                         flow->nodes[k] = n;
     630     3905602 :                                         flow->edges[k] = -1;
     631             :                                 }
     632             : 
     633    31273036 :                                 flow->status[n].blocks++;
     634             :                         }
     635             : 
     636             :                         /* list of instructions to be woken up explicitly */
     637    58700141 :                         if (!isVarConstant(mb, getArg(p, j))) {
     638             :                                 /* be careful, watch out for garbage collection interference */
     639             :                                 /* those should be scheduled after all its other uses */
     640    32962243 :                                 l = getEndScope(mb, getArg(p, j));
     641    32962243 :                                 if (l != pc && l < flow->stop && l > flow->start) {
     642             :                                         /* add edge to the target instruction for wakeup call */
     643    18286831 :                                         assert(pc < l); /* only dependencies on earlier instructions */
     644    18286831 :                                         l -= flow->start;
     645    18286831 :                                         if (flow->nodes[n]) {
     646             :                                                 /* add wakeup to tail of list */
     647    79218634 :                                                 for (i = n; flow->edges[i] > 0; i = flow->edges[i])
     648             :                                                         ;
     649     8849675 :                                                 flow->nodes[etop] = l;
     650     8849675 :                                                 flow->edges[etop] = -1;
     651     8849675 :                                                 flow->edges[i] = etop;
     652     8849675 :                                                 etop++;
     653     8849675 :                                                 if( etop == size){
     654             :                                                         int *tmp;
     655             :                                                         /* in case of realloc failure, the original
     656             :                                                          * pointers will be freed by the caller */
     657         207 :                                                         tmp = (int*) GDKrealloc(flow->nodes, sizeof(int) * 2 * size);
     658         207 :                                                         if (tmp == NULL) {
     659           0 :                                                                 GDKfree(assign);
     660           0 :                                                                 throw(MAL, "dataflow", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     661             :                                                         }
     662         207 :                                                         flow->nodes = tmp;
     663         207 :                                                         tmp = (int*) GDKrealloc(flow->edges, sizeof(int) * 2 * size);
     664         207 :                                                         if (tmp == NULL) {
     665           0 :                                                                 GDKfree(assign);
     666           0 :                                                                 throw(MAL, "dataflow", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     667             :                                                         }
     668         207 :                                                         flow->edges = tmp;
     669         207 :                                                         size *=2;
     670             :                                                 }
     671             :                                         } else {
     672     9437156 :                                                 flow->nodes[n] = l;
     673     9437156 :                                                 flow->edges[n] = -1;
     674             :                                         }
     675    18286831 :                                         flow->status[l].blocks++;
     676             :                                 }
     677             :                         }
     678             :                 }
     679             : 
     680    35616554 :                 for (j = 0; j < p->retc; j++)
     681    18650736 :                         assign[getArg(p, j)] = pc;  /* ensure recognition of dependency on first instruction and constant */
     682             :         }
     683      125943 :         GDKfree(assign);
     684             : 
     685      125943 :         return MAL_SUCCEED;
     686             : }
     687             : 
     688             : /*
     689             :  * Parallel processing is mostly driven by dataflow, but within this context
     690             :  * there may be different schemes to take instructions into execution.
     691             :  * The admission scheme (and wrapup) are the necessary scheduler hooks.
     692             :  * A scheduler registers the functions needed and should release them
     693             :  * at the end of the parallel block.
     694             :  * They take effect after we have ensured that the basic properties for
     695             :  * execution hold.
     696             :  */
     697             : static str
     698      125943 : DFLOWscheduler(DataFlow flow, struct worker *w)
     699             : {
     700             :         int last;
     701             :         int i;
     702             :         int j;
     703             :         InstrPtr p;
     704             :         int tasks=0, actions;
     705             :         str ret = MAL_SUCCEED;
     706             :         FlowEvent fe, f = 0;
     707             : 
     708      125943 :         if (flow == NULL)
     709           0 :                 throw(MAL, "dataflow", "DFLOWscheduler(): Called with flow == NULL");
     710      125943 :         actions = flow->stop - flow->start;
     711      125943 :         if (actions == 0)
     712           0 :                 throw(MAL, "dataflow", "Empty dataflow block");
     713             :         /* initialize the eligible statements */
     714      125943 :         fe = flow->status;
     715             : 
     716      125943 :         MT_lock_set(&flow->flowlock);
     717    17091761 :         for (i = 0; i < actions; i++)
     718    16965818 :                 if (fe[i].blocks == 0) {
     719     1009974 :                         p = getInstrPtr(flow->mb,fe[i].pc);
     720     1009974 :                         if (p == NULL) {
     721           0 :                                 MT_lock_unset(&flow->flowlock);
     722           0 :                                 throw(MAL, "dataflow", "DFLOWscheduler(): getInstrPtr(flow->mb,fe[i].pc) returned NULL");
     723             :                         }
     724     1009974 :                         fe[i].argclaim = 0;
     725     5854145 :                         for (j = p->retc; j < p->argc; j++)
     726     4844170 :                                 fe[i].argclaim += getMemoryClaim(fe[0].flow->mb, fe[0].flow->stk, p, j, FALSE);
     727     1009975 :                         q_enqueue(todo, flow->status + i);
     728     1009974 :                         flow->status[i].state = DFLOWrunning;
     729             :                 }
     730      125943 :         MT_lock_unset(&flow->flowlock);
     731      125943 :         MT_sema_up(&w->s);
     732             : 
     733    17091759 :         while (actions != tasks ) {
     734    16965816 :                 f = q_dequeue(flow->done, NULL);
     735    16965818 :                 if (ATOMIC_GET(&exiting))
     736             :                         break;
     737    16965818 :                 if (f == NULL)
     738           0 :                         throw(MAL, "dataflow", "DFLOWscheduler(): q_dequeue(flow->done) returned NULL");
     739             : 
     740             :                 /*
     741             :                  * When an instruction is finished we have to reduce the blocked
     742             :                  * counter for all dependent instructions.  for those where it
     743             :                  * drops to zero we can scheduler it we do it here instead of the scheduler
     744             :                  */
     745             : 
     746    16965818 :                 MT_lock_set(&flow->flowlock);
     747    16965817 :                 tasks++;
     748    66525683 :                 for (last = f->pc - flow->start; last >= 0 && (i = flow->nodes[last]) > 0; last = flow->edges[last])
     749    49559866 :                         if (flow->status[i].state == DFLOWpending) {
     750    41952237 :                                 flow->status[i].argclaim += f->hotclaim;
     751    41952237 :                                 if (flow->status[i].blocks == 1 ) {
     752     8348213 :                                         flow->status[i].state = DFLOWrunning;
     753     8348213 :                                         flow->status[i].blocks--;
     754     8348213 :                                         q_enqueue(todo, flow->status + i);
     755             :                                 } else {
     756    33604024 :                                         flow->status[i].blocks--;
     757             :                                 }
     758             :                         }
     759    16965817 :                 MT_lock_unset(&flow->flowlock);
     760             :         }
     761             :         /* release the worker from its specific task (turn it into a
     762             :          * generic worker) */
     763      125943 :         ATOMIC_PTR_SET(&w->cntxt, NULL);
     764             :         /* wrap up errors */
     765      125943 :         assert(flow->done->last == 0);
     766      125943 :         if ((ret = ATOMIC_PTR_XCG(&flow->error, NULL)) != NULL ) {
     767         402 :                 TRC_DEBUG(MAL_SERVER, "Errors encountered: %s\n", ret);
     768             :         }
     769             :         return ret;
     770             : }
     771             : 
     772             : /* We create a pool of GDKnr_threads-1 generic workers, that is,
     773             :  * workers that will take on jobs from any clients.  In addition, we
     774             :  * create a single specific worker per client (i.e. each time we enter
     775             :  * here).  This specific worker will only do work for the client for
     776             :  * which it was started.  In this way we can guarantee that there will
     777             :  * always be progress for the client, even if all other workers are
     778             :  * doing something big.
     779             :  *
     780             :  * When all jobs for a client have been done (there are no more
     781             :  * entries for the client in the queue), the specific worker turns
     782             :  * itself into a generic worker.  At the same time, we signal that one
     783             :  * generic worker should exit and this function returns.  In this way
     784             :  * we make sure that there are once again GDKnr_threads-1 generic
     785             :  * workers. */
     786             : str
     787      125943 : runMALdataflow(Client cntxt, MalBlkPtr mb, int startpc, int stoppc, MalStkPtr stk)
     788             : {
     789             :         DataFlow flow = NULL;
     790             :         str msg = MAL_SUCCEED;
     791             :         int size;
     792             :         bit *ret;
     793             :         int i;
     794             : 
     795             :         /* in debugging mode we should not start multiple threads */
     796      125943 :         if (stk == NULL)
     797           0 :                 throw(MAL, "dataflow", "runMALdataflow(): Called with stk == NULL");
     798      125943 :         ret = getArgReference_bit(stk,getInstrPtr(mb,startpc),0);
     799      125943 :         *ret = FALSE;
     800      125943 :         if (stk->cmd) {
     801           0 :                 *ret = TRUE;
     802           0 :                 return MAL_SUCCEED;
     803             :         }
     804             : 
     805      125943 :         assert(stoppc > startpc);
     806             : 
     807             :         /* check existence of workers */
     808      125943 :         if (todo == NULL) {
     809             :                 /* create thread pool */
     810         265 :                 if (GDKnr_threads <= 1 || DFLOWinitialize() < 0) {
     811             :                         /* no threads created, run serially */
     812           0 :                         *ret = TRUE;
     813           0 :                         return MAL_SUCCEED;
     814             :                 }
     815             :         }
     816      125943 :         assert(todo);
     817             :         /* in addition, create one more worker that will only execute
     818             :          * tasks for the current client to compensate for our waiting
     819             :          * until all work is done */
     820      125943 :         MT_lock_set(&dataflowLock);
     821             :         /* join with already exited threads */
     822      125943 :         while ((i = exited_workers) >= 0) {
     823           0 :                 assert(workers[i].flag == EXITED);
     824           0 :                 exited_workers = workers[i].next;
     825           0 :                 workers[i].flag = IDLE;
     826           0 :                 MT_lock_unset(&dataflowLock);
     827           0 :                 MT_join_thread(workers[i].id);
     828           0 :                 MT_lock_set(&dataflowLock);
     829           0 :                 workers[i].next = idle_workers;
     830           0 :                 idle_workers = i;
     831             :         }
     832      125943 :         assert(cntxt != NULL);
     833      125943 :         if ((i = free_workers) >= 0) {
     834      125446 :                 assert(workers[i].flag == FREE);
     835      125446 :                 assert(free_count > 0);
     836      125446 :                 free_count--;
     837      125446 :                 free_workers = workers[i].next;
     838      125446 :                 workers[i].flag = WAITING;
     839      125446 :                 ATOMIC_PTR_SET(&workers[i].cntxt, cntxt);
     840      125446 :                 if (stk->calldepth > 1) {
     841         254 :                         MT_Id pid = MT_getpid();
     842             : 
     843             :                         /* doing a recursive call: copy specificity from
     844             :                          * current worker to new worker */
     845      260350 :                         for (int j = 0; j < THREADS; j++) {
     846      260096 :                                 if (workers[j].flag == RUNNING && workers[j].id == pid) {
     847           0 :                                         ATOMIC_PTR_SET(&workers[i].cntxt,
     848             :                                                                    ATOMIC_PTR_GET(&workers[j].cntxt));
     849           0 :                                         break;
     850             :                                 }
     851             :                         }
     852             :                 }
     853      125446 :                 MT_sema_up(&workers[i].s);
     854         497 :         } else if ((i = idle_workers) >= 0) {
     855         497 :                 assert(workers[i].flag == IDLE);
     856         497 :                 ATOMIC_PTR_SET(&workers[i].cntxt, cntxt);
     857             :                 /* only create specific worker if we are not doing a
     858             :                  * recursive call */
     859         497 :                 if (stk->calldepth > 1) {
     860           0 :                         MT_Id pid = MT_getpid();
     861             : 
     862             :                         /* doing a recursive call: copy specificity from
     863             :                          * current worker to new worker */
     864           0 :                         for (int j = 0; j < THREADS; j++) {
     865           0 :                                 if (workers[j].flag == RUNNING && workers[j].id == pid) {
     866           0 :                                         ATOMIC_PTR_SET(&workers[i].cntxt,
     867             :                                                                    ATOMIC_PTR_GET(&workers[j].cntxt));
     868           0 :                                         break;
     869             :                                 }
     870             :                         }
     871             :                 }
     872         497 :                 idle_workers = workers[i].next;
     873         497 :                 workers[i].flag = WAITING;
     874             :                 char name[MT_NAME_LEN];
     875         497 :                 snprintf(name, sizeof(name), "DFLOWworker%d", i);
     876         497 :                 if ((workers[i].errbuf = GDKmalloc(GDKMAXERRLEN)) == NULL ||
     877         497 :                         (workers[i].id = THRcreate(DFLOWworker, (void *) &workers[i],
     878             :                                                                            MT_THR_JOINABLE, name)) == 0) {
     879             :                         /* cannot start new thread, run serially */
     880           0 :                         *ret = TRUE;
     881           0 :                         GDKfree(workers[i].errbuf);
     882           0 :                         workers[i].errbuf = NULL;
     883           0 :                         workers[i].flag = IDLE;
     884           0 :                         workers[i].next = idle_workers;
     885           0 :                         idle_workers = i;
     886           0 :                         MT_lock_unset(&dataflowLock);
     887           0 :                         return MAL_SUCCEED;
     888             :                 }
     889             :         }
     890      125943 :         MT_lock_unset(&dataflowLock);
     891      125943 :         if (i < 0) {
     892             :                 /* no empty thread slots found, run serially */
     893           0 :                 *ret = TRUE;
     894           0 :                 return MAL_SUCCEED;
     895             :         }
     896             : 
     897      125943 :         flow = (DataFlow)GDKzalloc(sizeof(DataFlowRec));
     898      125943 :         if (flow == NULL)
     899           0 :                 throw(MAL, "dataflow", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     900             : 
     901      125943 :         flow->cntxt = cntxt;
     902      125943 :         flow->mb = mb;
     903      125943 :         flow->stk = stk;
     904             : 
     905             :         /* keep real block count, exclude brackets */
     906      125943 :         flow->start = startpc + 1;
     907      125943 :         flow->stop = stoppc;
     908             : 
     909      125943 :         flow->done = q_create(stoppc- startpc+1, "flow->done");
     910      125943 :         if (flow->done == NULL) {
     911           0 :                 GDKfree(flow);
     912           0 :                 throw(MAL, "dataflow", "runMALdataflow(): Failed to create flow->done queue");
     913             :         }
     914             : 
     915      125943 :         flow->status = (FlowEvent)GDKzalloc((stoppc - startpc + 1) * sizeof(FlowEventRec));
     916      125943 :         if (flow->status == NULL) {
     917           0 :                 q_destroy(flow->done);
     918           0 :                 GDKfree(flow);
     919           0 :                 throw(MAL, "dataflow", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     920             :         }
     921             :         size = DFLOWgraphSize(mb, startpc, stoppc);
     922      125943 :         size += stoppc - startpc;
     923      125943 :         flow->nodes = (int*)GDKzalloc(sizeof(int) * size);
     924      125943 :         if (flow->nodes == NULL) {
     925           0 :                 GDKfree(flow->status);
     926           0 :                 q_destroy(flow->done);
     927           0 :                 GDKfree(flow);
     928           0 :                 throw(MAL, "dataflow", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     929             :         }
     930      125943 :         flow->edges = (int*)GDKzalloc(sizeof(int) * size);
     931      125943 :         if (flow->edges == NULL) {
     932           0 :                 GDKfree(flow->nodes);
     933           0 :                 GDKfree(flow->status);
     934           0 :                 q_destroy(flow->done);
     935           0 :                 GDKfree(flow);
     936           0 :                 throw(MAL, "dataflow", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     937             :         }
     938      125943 :         MT_lock_init(&flow->flowlock, "flow->flowlock");
     939      125943 :         ATOMIC_PTR_INIT(&flow->error, NULL);
     940      125943 :         msg = DFLOWinitBlk(flow, mb, size);
     941             : 
     942      125943 :         if (msg == MAL_SUCCEED)
     943      125943 :                 msg = DFLOWscheduler(flow, &workers[i]);
     944             : 
     945      125943 :         GDKfree(flow->status);
     946      125943 :         GDKfree(flow->edges);
     947      125943 :         GDKfree(flow->nodes);
     948      125943 :         q_destroy(flow->done);
     949      125943 :         MT_lock_destroy(&flow->flowlock);
     950             :         ATOMIC_PTR_DESTROY(&flow->error);
     951      125943 :         GDKfree(flow);
     952             : 
     953             :         /* we created one worker, now tell one worker to exit again */
     954      125943 :         MT_lock_set(&todo->l);
     955      125943 :         todo->exitcount++;
     956      125943 :         MT_lock_unset(&todo->l);
     957      125943 :         MT_sema_up(&todo->s);
     958             : 
     959      125943 :         return msg;
     960             : }
     961             : 
     962             : str
     963           0 : deblockdataflow( Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
     964             : {
     965           0 :     int *ret = getArgReference_int(stk,pci,0);
     966           0 :     int *val = getArgReference_int(stk,pci,1);
     967             :     (void) cntxt;
     968             :     (void) mb;
     969           0 :     *ret = *val;
     970           0 :     return MAL_SUCCEED;
     971             : }
     972             : 
     973             : static void
     974         264 : stopMALdataflow(void)
     975             : {
     976             :         int i;
     977             : 
     978         264 :         ATOMIC_SET(&exiting, 1);
     979         264 :         if (todo) {
     980         264 :                 MT_lock_set(&dataflowLock);
     981             :                 /* first wake up all running threads */
     982      270600 :                 for (i = 0; i < THREADS; i++) {
     983      270336 :                         if (workers[i].flag == RUNNING)
     984        1869 :                                 MT_sema_up(&todo->s);
     985             :                 }
     986         731 :                 for (i = free_workers; i >= 0; i = workers[i].next) {
     987         467 :                         MT_sema_up(&workers[i].s);
     988             :                 }
     989         264 :                 free_workers = -1;
     990      270600 :                 for (i = 0; i < THREADS; i++) {
     991      270336 :                         if (workers[i].flag != IDLE) {
     992        2336 :                                 MT_lock_unset(&dataflowLock);
     993        2336 :                                 MT_join_thread(workers[i].id);
     994        2336 :                                 MT_lock_set(&dataflowLock);
     995        2336 :                                 workers[i].flag = IDLE;
     996        2336 :                                 workers[i].next = idle_workers;
     997        2336 :                                 idle_workers = i;
     998             :                         }
     999             :                 }
    1000         264 :                 MT_lock_unset(&dataflowLock);
    1001             :         }
    1002         264 : }

Generated by: LCOV version 1.14