LCOV - code coverage report
Current view: top level - sql/backends/monet5/UDF/pyapi3 - pyapi3.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 447 783 57.1 %
Date: 2021-10-13 02:24:04 Functions: 12 12 100.0 %

          Line data    Source code
       1             : /*
       2             :  * This Source Code Form is subject to the terms of the Mozilla Public
       3             :  * License, v. 2.0.  If a copy of the MPL was not distributed with this
       4             :  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
       5             :  *
       6             :  * Copyright 1997 - July 2008 CWI, August 2008 - 2021 MonetDB B.V.
       7             :  */
       8             : 
       9             : #include "monetdb_config.h"
      10             : #include "pyapi.h"
      11             : #include "connection.h"
      12             : #include "mutils.h"
      13             : 
      14             : #include "unicode.h"
      15             : #include "pytypes.h"
      16             : #include "type_conversion.h"
      17             : #include "formatinput.h"
      18             : #include "conversion.h"
      19             : #include "gdk_interprocess.h"
      20             : 
      21             : #ifdef HAVE_FORK
      22             : // These libraries are used for PYTHON_MAP when forking is enabled [to start new
      23             : // processes and wait on them]
      24             : #include <sys/types.h>
      25             : #include <sys/wait.h>
      26             : #endif
      27             : 
      28             : const char *fork_disableflag = "disable_fork";
      29             : bool option_disable_fork = false;
      30             : 
      31             : static PyObject *marshal_module = NULL;
      32             : PyObject *marshal_loads = NULL;
      33             : 
      34             : typedef struct _AggrParams{
      35             :         PyInput **pyinput_values;
      36             :         void ****split_bats;
      37             :         size_t **group_counts;
      38             :         str **args;
      39             :         PyObject **connection;
      40             :         PyObject **function;
      41             :         PyObject **column_types_dict;
      42             :         PyObject **result_objects;
      43             :         str *pycall;
      44             :         str msg;
      45             :         size_t base;
      46             :         size_t additional_columns;
      47             :         size_t named_columns;
      48             :         size_t columns;
      49             :         size_t group_count;
      50             :         size_t group_start;
      51             :         size_t group_end;
      52             :         MT_Id thread;
      53             : } AggrParams;
      54             : 
      55             : static void ComputeParallelAggregation(AggrParams *p);
      56             : static str CreateEmptyReturn(MalBlkPtr mb, MalStkPtr stk, InstrPtr pci,
      57             :                                                           size_t retcols, oid seqbase);
      58             : 
      59           1 : static const char *FunctionBasePath(void)
      60             : {
      61           1 :         const char *basepath = GDKgetenv("function_basepath");
      62           1 :         if (basepath == NULL) {
      63           1 :                 basepath = getenv("HOME");
      64             :         }
      65           1 :         if (basepath == NULL) {
      66             :                 basepath = "";
      67             :         }
      68           1 :         return basepath;
      69             : }
      70             : 
      71             : static MT_Lock pyapiLock = MT_LOCK_INITIALIZER(pyapiLock);
      72             : static bool pyapiInitialized = false;
      73             : 
      74          25 : bool PYAPI3PyAPIInitialized(void) {
      75          25 :         return pyapiInitialized;
      76             : }
      77             : 
      78             : #ifdef HAVE_FORK
      79             : static bool python_call_active = false;
      80             : #endif
      81             : 
      82             : #ifdef WIN32
      83             : static bool enable_zerocopy_input = true;
      84             : static bool enable_zerocopy_output = false;
      85             : #else
      86             : static bool enable_zerocopy_input = true;
      87             : static bool enable_zerocopy_output = true;
      88             : #endif
      89             : 
      90             : static str
      91             : PyAPIeval(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci, bool grouped, bool mapped);
      92             : 
      93             : str
      94          89 : PYAPI3PyAPIevalStd(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) {
      95          89 :         return PyAPIeval(cntxt, mb, stk, pci, false, false);
      96             : }
      97             : 
      98             : str
      99          34 : PYAPI3PyAPIevalStdMap(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) {
     100          34 :         return PyAPIeval(cntxt, mb, stk, pci, false, true);
     101             : }
     102             : 
     103             : str
     104          20 : PYAPI3PyAPIevalAggr(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) {
     105          20 :         return PyAPIeval(cntxt, mb, stk, pci, true, false);
     106             : }
     107             : 
     108             : str
     109           5 : PYAPI3PyAPIevalAggrMap(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) {
     110           5 :         return PyAPIeval(cntxt, mb, stk, pci, true, true);
     111             : }
     112             : 
     113             : #define NP_SPLIT_BAT(tpe)                                                      \
     114             :         {                                                                          \
     115             :                 tpe ***ptr = (tpe ***)split_bats;                                      \
     116             :                 size_t *temp_indices;                                                  \
     117             :                 tpe *batcontent = (tpe *)basevals;                                     \
     118             :                 /* allocate space for split BAT */                                     \
     119             :                 for (group_it = 0; group_it < group_count; group_it++) {               \
     120             :                         ptr[group_it][i] =                                                 \
     121             :                                 GDKzalloc(group_counts[group_it] * sizeof(tpe));               \
     122             :                 }                                                                      \
     123             :                 /*iterate over the elements of the current BAT*/                       \
     124             :                 temp_indices = GDKzalloc(sizeof(lng) * group_count);                   \
     125             :                 if (BATtvoid(aggr_group)) {                                            \
     126             :                         for (element_it = 0; element_it < elements; element_it++) {        \
     127             :                                 /*append current element to proper group*/                     \
     128             :                                 ptr[element_it][i][temp_indices[element_it]++] =               \
     129             :                                         batcontent[element_it];                                    \
     130             :                         }                                                                  \
     131             :                 } else {                                                               \
     132             :                         for (element_it = 0; element_it < elements; element_it++) {        \
     133             :                                 /*group of current element*/                                   \
     134             :                                 oid group = aggr_group_arr[element_it];                        \
     135             :                                 /*append current element to proper group*/                     \
     136             :                                 ptr[group][i][temp_indices[group]++] = batcontent[element_it]; \
     137             :                         }                                                                  \
     138             :                 }                                                                      \
     139             :                 GDKfree(temp_indices);                                                 \
     140             :         }
     141             : 
     142             : //! The main PyAPI function, this function does everything PyAPI related
     143             : //! It takes as argument a bunch of input BATs, a python function, and outputs a
     144             : //! number of BATs
     145             : //! This function follows the following pipeline
     146             : //! [PARSE_CODE] Step 1: It parses the Python source code and corrects any wrong
     147             : //! indentation, or converts the source code into a PyCodeObject if the source
     148             : //! code is encoded as such
     149             : //! [CONVERT_BAT] Step 2: It converts the input BATs into Numpy Arrays
     150             : //! [EXECUTE_CODE] Step 3: It executes the Python code using the Numpy arrays as arguments
     151             : //! [RETURN_VALUES] Step 4: It collects the return values and converts them back into BATs
     152             : //! If 'mapped' is set to True, it will fork a separate process at [FORK_PROCESS] that executes Step 1-3, the process will then write the return values into memory mapped files and exit, then Step 4 is executed by the main process
     153         148 : static str PyAPIeval(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci, bool grouped, bool mapped) {
     154             :         sql_func * sqlfun = NULL;
     155             :         str exprStr = NULL;
     156             : 
     157             :         const int additional_columns = 3;
     158             :         int i = 1, ai = 0;
     159         148 :         char *pycall = NULL;
     160             :         str *args;
     161         148 :         char *msg = MAL_SUCCEED;
     162             :         BAT *b = NULL;
     163             :         node *argnode = NULL;
     164             :         int seengrp = FALSE;
     165         148 :         PyObject *pArgs = NULL, *pColumns = NULL, *pColumnTypes = NULL,
     166             :                          *pConnection,
     167             :                          *pResult = NULL; // this is going to be the parameter tuple
     168         148 :         PyObject *code_object = NULL;
     169             :         PyReturn *pyreturn_values = NULL;
     170         148 :         PyInput *pyinput_values = NULL;
     171             :         oid seqbase = 0;
     172             : #ifdef HAVE_FORK
     173             :         char *mmap_ptr;
     174             :         QueryStruct *query_ptr = NULL;
     175         148 :         int query_sem = -1;
     176             :         int mmap_id = -1;
     177             :         size_t memory_size = 0;
     178             :         bool child_process = false;
     179         148 :         bool holds_gil = !mapped;
     180             :         void **mmap_ptrs = NULL;
     181             :         size_t *mmap_sizes = NULL;
     182             : #endif
     183             :         bool allow_loopback = !mapped;
     184             :         bit varres;
     185             :         int retcols;
     186             :         bool gstate = 0;
     187             :         int unnamedArgs = 0;
     188         148 :         bool parallel_aggregation = grouped && mapped, freeexprStr = false;
     189         148 :         int argcount = pci->argc;
     190             : 
     191         148 :         char *eval_additional_args[] = {"_columns", "_column_types", "_conn"};
     192             : 
     193             :         mapped = false;
     194             : 
     195             : #ifndef HAVE_FORK
     196             :         (void)mapped;
     197             : #endif
     198             : 
     199         148 :         if (!pyapiInitialized) {
     200           0 :                 throw(MAL, "pyapi3.eval", SQLSTATE(PY000) "Embedded Python is enabled but an error was "
     201             :                                                                  "thrown during initialization.");
     202             :         }
     203         148 :         if (!grouped) {
     204         123 :                 sql_subfunc *sqlmorefun =
     205         123 :                         (*(sql_subfunc **)getArgReference(stk, pci, pci->retc));
     206         123 :                 if (sqlmorefun) {
     207         123 :                         sqlfun = sqlmorefun->func;
     208             :                 }
     209             :         } else {
     210          25 :                 sqlfun = *(sql_func **)getArgReference(stk, pci, pci->retc);
     211             :         }
     212         148 :         exprStr = *getArgReference_str(stk, pci, pci->retc + 1);
     213         148 :         varres = sqlfun ? sqlfun->varres : 0;
     214         148 :         retcols = !varres ? pci->retc : -1;
     215             : 
     216         148 :         args = (str *)GDKzalloc(pci->argc * sizeof(str));
     217         148 :         pyreturn_values = GDKzalloc(pci->retc * sizeof(PyReturn));
     218         148 :         if (args == NULL || pyreturn_values == NULL) {
     219           0 :                 throw(MAL, "pyapi3.eval", SQLSTATE(HY013) MAL_MALLOC_FAIL " arguments.");
     220             :         }
     221             : 
     222         148 :         if ((pci->argc - (pci->retc + 2)) * sizeof(PyInput) > 0) {
     223         108 :                 pyinput_values =
     224         108 :                         GDKzalloc((pci->argc - (pci->retc + 2)) * sizeof(PyInput));
     225             : 
     226         108 :                 if (pyinput_values == NULL) {
     227           0 :                         GDKfree(args);
     228           0 :                         GDKfree(pyreturn_values);
     229           0 :                         throw(MAL, "pyapi3.eval", SQLSTATE(HY013) MAL_MALLOC_FAIL " input values.");
     230             :                 }
     231             :         }
     232             : 
     233             :         // Analyse the SQL_Func structure to get the parameter names
     234         148 :         if (sqlfun != NULL && sqlfun->ops->cnt > 0) {
     235          96 :                 unnamedArgs = pci->retc + 2;
     236          96 :                 argnode = sqlfun->ops->h;
     237         231 :                 while (argnode) {
     238         135 :                         char *argname = ((sql_arg *)argnode->data)->name;
     239         135 :                         args[unnamedArgs++] = GDKstrdup(argname);
     240         135 :                         argnode = argnode->next;
     241             :                 }
     242          96 :                 if (parallel_aggregation && unnamedArgs < pci->argc) {
     243           5 :                         argcount = unnamedArgs;
     244             :                 } else {
     245             :                         parallel_aggregation = false;
     246             :                 }
     247             :         } else {
     248             :                 parallel_aggregation = false;
     249             :         }
     250             : 
     251             :         // We name all the unknown arguments, if grouping is enabled the first
     252             :         // unknown argument that is the group variable, we name this 'aggr_group'
     253         511 :         for (i = pci->retc + 2; i < argcount; i++) {
     254         363 :                 if (args[i] == NULL) {
     255         228 :                         if (!seengrp && grouped) {
     256          11 :                                 args[i] = GDKstrdup("aggr_group");
     257             :                                 seengrp = TRUE;
     258             :                         } else {
     259             :                                 char argbuf[64];
     260         217 :                                 snprintf(argbuf, sizeof(argbuf), "arg%i", i - pci->retc - 1);
     261         217 :                                 args[i] = GDKstrdup(argbuf);
     262             :                         }
     263             :                 }
     264             :         }
     265             : 
     266             :         // Construct PyInput objects, we do this before any multiprocessing because
     267             :         // there is some locking going on in there, and locking + forking = bad idea
     268             :         // (a thread can fork while another process is in the lock, which means we
     269             :         // can get stuck permanently)
     270         148 :         argnode = sqlfun && sqlfun->ops->cnt > 0 ? sqlfun->ops->h : NULL;
     271         503 :         for (i = pci->retc + 2; i < argcount; i++) {
     272         360 :                 PyInput *inp = &pyinput_values[i - (pci->retc + 2)];
     273         360 :                 if (!isaBatType(getArgType(mb, pci, i))) {
     274          48 :                         inp->scalar = true;
     275          48 :                         inp->bat_type = getArgType(mb, pci, i);
     276          48 :                         inp->count = 1;
     277          48 :                         if (inp->bat_type == TYPE_str) {
     278           0 :                                 inp->dataptr = getArgReference_str(stk, pci, i);
     279             :                         } else {
     280          48 :                                 inp->dataptr = getArgReference(stk, pci, i);
     281             :                         }
     282             :                 } else {
     283         312 :                         b = BATdescriptor(*getArgReference_bat(stk, pci, i));
     284         312 :                         if (b == NULL) {
     285           0 :                                 msg = createException(
     286             :                                         MAL, "pyapi3.eval",
     287             :                                         SQLSTATE(PY000) "The BAT passed to the function (argument #%d) is NULL.\n",
     288           0 :                                         i - (pci->retc + 2) + 1);
     289           0 :                                 goto wrapup;
     290             :                         }
     291         312 :                         seqbase = b->hseqbase;
     292         312 :                         inp->count = BATcount(b);
     293         312 :                         inp->bat_type = b->ttype;
     294         312 :                         inp->bat = b;
     295         312 :                         if (inp->count == 0) {
     296             :                                 // one of the input BATs is empty, don't execute the function at
     297             :                                 // all
     298             :                                 // just return empty BATs
     299           5 :                                 msg = CreateEmptyReturn(mb, stk, pci, retcols, seqbase);
     300           5 :                                 goto wrapup;
     301             :                         }
     302             :                 }
     303         355 :                 if (argnode) {
     304         130 :                         inp->sql_subtype = &((sql_arg *)argnode->data)->type;
     305         130 :                         argnode = argnode->next;
     306             :                 }
     307             :         }
     308             : 
     309             : #ifdef HAVE_FORK
     310         143 :         if (!option_disable_fork) {
     311         143 :                 if (!mapped && !parallel_aggregation) {
     312         138 :                         MT_lock_set(&pyapiLock);
     313         138 :                         if (python_call_active) {
     314             :                                 mapped = true;
     315             :                                 holds_gil = false;
     316             :                         } else {
     317         138 :                                 python_call_active = true;
     318             :                                 holds_gil = true;
     319             :                         }
     320         138 :                         MT_lock_unset(&pyapiLock);
     321             :                 }
     322             :         } else {
     323             :                 mapped = false;
     324             :                 holds_gil = true;
     325             :         }
     326             : #endif
     327             : 
     328             : #ifdef HAVE_FORK
     329             :         /*[FORK_PROCESS]*/
     330         138 :         if (mapped) {
     331             :                 lng pid;
     332             :                 // we need 3 + pci->retc * 2 shared memory spaces
     333             :                 // the first is for the header information
     334             :                 // the second for query struct information
     335             :                 // the third is for query results
     336             :                 // the remaining pci->retc * 2 is one for each return BAT, and one for
     337             :                 // each return mask array
     338           0 :                 int mmap_count = 4 + pci->retc * 2;
     339             : 
     340             :                 // create initial shared memory
     341           0 :                 mmap_id = GDKuniqueid(mmap_count);
     342             : 
     343           0 :                 mmap_ptrs = GDKzalloc(mmap_count * sizeof(void *));
     344           0 :                 mmap_sizes = GDKzalloc(mmap_count * sizeof(size_t));
     345           0 :                 if (mmap_ptrs == NULL || mmap_sizes == NULL) {
     346           0 :                         msg = createException(MAL, "pyapi3.eval",
     347             :                                                                   SQLSTATE(HY013) MAL_MALLOC_FAIL " mmap values.");
     348           0 :                         goto wrapup;
     349             :                 }
     350             : 
     351           0 :                 memory_size =
     352           0 :                         pci->retc * sizeof(ReturnBatDescr); // the memory size for the
     353             :                                                                                                 // header files, each process
     354             :                                                                                                 // has one per return value
     355             : 
     356           0 :                 assert(memory_size > 0);
     357             :                 // create the shared memory for the header
     358           0 :                 MT_lock_set(&pyapiLock);
     359           0 :                 mmap_ptrs[0] = GDKinitmmap(mmap_id + 0, memory_size, &mmap_sizes[0]);
     360           0 :                 MT_lock_unset(&pyapiLock);
     361           0 :                 if (mmap_ptrs[0] == NULL) {
     362           0 :                         msg = createException(MAL, "pyapi3.eval", GDK_EXCEPTION);
     363           0 :                         goto wrapup;
     364             :                 }
     365             :                 mmap_ptr = mmap_ptrs[0];
     366             : 
     367             :                 // create the cross-process semaphore used for signaling queries
     368             :                 // we need two semaphores
     369             :                 // the main process waits on the first one (exiting when a query is
     370             :                 // requested or the child process is done)
     371             :                 // the forked process waits for the second one when it requests a query
     372             :                 // (waiting for the result of the query)
     373           0 :                 if (GDKcreatesem(mmap_id, 2, &query_sem) != GDK_SUCCEED) {
     374           0 :                         msg = createException(MAL, "pyapi3.eval", GDK_EXCEPTION);
     375           0 :                         goto wrapup;
     376             :                 }
     377             : 
     378             :                 // create the shared memory space for queries
     379           0 :                 MT_lock_set(&pyapiLock);
     380           0 :                 mmap_ptrs[1] = GDKinitmmap(mmap_id + 1, sizeof(QueryStruct),
     381             :                                                  &mmap_sizes[1]);
     382           0 :                 MT_lock_unset(&pyapiLock);
     383           0 :                 if (mmap_ptrs[1] == NULL) {
     384           0 :                         msg = createException(MAL, "pyapi3.eval", GDK_EXCEPTION);
     385           0 :                         goto wrapup;
     386             :                 }
     387             :                 query_ptr = mmap_ptrs[1];
     388           0 :                 query_ptr->pending_query = false;
     389           0 :                 query_ptr->query[0] = '\0';
     390           0 :                 query_ptr->mmapid = -1;
     391           0 :                 query_ptr->memsize = 0;
     392             : 
     393             :                 // fork
     394           0 :                 MT_lock_set(&pyapiLock);
     395           0 :                 gstate = Python_ObtainGIL(); // we need the GIL before forking,
     396             :                                                                          // otherwise it can get stuck in the forked
     397             :                                                                          // child
     398           0 :                 if ((pid = fork()) < 0) {
     399           0 :                         msg = createException(MAL, "pyapi3.eval", SQLSTATE(PY000) "Failed to fork process");
     400           0 :                         MT_lock_unset(&pyapiLock);
     401             : 
     402           0 :                         goto wrapup;
     403           0 :                 } else if (pid == 0) {
     404             :                         child_process = true;
     405             :                         query_ptr = NULL;
     406           0 :                         if ((query_ptr = GDKinitmmap(mmap_id + 1, sizeof(QueryStruct),
     407             :                                                                                  NULL)) == NULL) {
     408           0 :                                 msg = createException(MAL, "pyapi3.eval", GDK_EXCEPTION);
     409           0 :                                 goto wrapup;
     410             :                         }
     411             :                 } else {
     412           0 :                         gstate = Python_ReleaseGIL(gstate);
     413             :                 }
     414           0 :                 if (!child_process) {
     415             :                         // main process
     416             :                         int status;
     417             :                         bool success = true;
     418           0 :                         bool sem_success = false;
     419             :                         pid_t retcode = 0;
     420             : 
     421             :                         // release the GIL in the main process
     422           0 :                         MT_lock_unset(&pyapiLock);
     423             : 
     424             :                         while (true) {
     425             :                                 // wait for the child to finish
     426             :                                 // note that we use a timeout here in case the child crashes for
     427             :                                 // some reason
     428             :                                 // in this case the semaphore value is never increased, so we
     429             :                                 // would be stuck otherwise
     430           0 :                                 if (GDKchangesemval_timeout(query_sem, 0, -1, 100, &sem_success) != GDK_SUCCEED) {
     431           0 :                                         msg = createException(MAL, "pyapi3.eval", GDK_EXCEPTION);
     432           0 :                                         goto wrapup;
     433             :                                 }
     434           0 :                                 if (sem_success) {
     435             :                                         break;
     436             :                                 }
     437           0 :                                 retcode = waitpid(pid, &status, WNOHANG);
     438           0 :                                 if (retcode > 0)
     439             :                                         break; // we have successfully waited for the child to exit
     440           0 :                                 if (retcode < 0) {
     441             :                                         // error message
     442           0 :                                         const char *err = GDKstrerror(errno, (char[128]){0}, 128);
     443           0 :                                         sem_success = 0;
     444           0 :                                         errno = 0;
     445           0 :                                         msg = createException(
     446             :                                                 MAL, "waitpid",
     447             :                                                 SQLSTATE(PY000) "Error calling waitpid(" LLFMT ", &status, WNOHANG): %s",
     448             :                                                 pid, err);
     449             :                                         break;
     450             :                                 }
     451             :                         }
     452           0 :                         if (sem_success)
     453           0 :                                 waitpid(pid, &status, 0);
     454             : 
     455           0 :                         if (status != 0)
     456             :                                 success = false;
     457             : 
     458             :                         if (!success) {
     459             :                                 // a child failed, get the error message from the child
     460             :                                 ReturnBatDescr *descr = &(((ReturnBatDescr *)mmap_ptr)[0]);
     461             : 
     462           0 :                                 if (descr->bat_size == 0) {
     463           0 :                                         msg = createException(
     464             :                                                 MAL, "pyapi3.eval",
     465             :                                                 SQLSTATE(PY000) "Failure in child process with unknown error.");
     466           0 :                                 } else if ((mmap_ptrs[3] = GDKinitmmap(mmap_id + 3, descr->bat_size,
     467             :                                                                                                            &mmap_sizes[3])) != NULL) {
     468           0 :                                         msg = createException(MAL, "pyapi3.eval", SQLSTATE(PY000) "%s",
     469             :                                                                                   (char *)mmap_ptrs[3]);
     470             :                                 } else {
     471           0 :                                         msg = createException(MAL, "pyapi3.eval", GDK_EXCEPTION);
     472             :                                 }
     473           0 :                                 goto wrapup;
     474             :                         }
     475             : 
     476             :                         // collect return values
     477           0 :                         for (i = 0; i < pci->retc; i++) {
     478           0 :                                 PyReturn *ret = &pyreturn_values[i];
     479           0 :                                 ReturnBatDescr *descr = &(((ReturnBatDescr *)mmap_ptr)[i]);
     480             :                                 size_t total_size = 0;
     481             :                                 bool has_mask = false;
     482             :                                 ret->count = 0;
     483           0 :                                 ret->mmap_id = mmap_id + i + 3;
     484             :                                 ret->memory_size = 0;
     485             :                                 ret->result_type = 0;
     486             : 
     487           0 :                                 ret->count = descr->bat_count;
     488           0 :                                 total_size = descr->bat_size;
     489             : 
     490           0 :                                 ret->memory_size = descr->element_size;
     491           0 :                                 ret->result_type = descr->npy_type;
     492           0 :                                 has_mask = has_mask || descr->has_mask;
     493             : 
     494             :                                 // get the shared memory address for this return value
     495           0 :                                 assert(total_size > 0);
     496           0 :                                 MT_lock_set(&pyapiLock);
     497           0 :                                 mmap_ptrs[i + 3] = GDKinitmmap(mmap_id + i + 3, total_size,
     498           0 :                                                                                            &mmap_sizes[i + 3]);
     499           0 :                                 MT_lock_unset(&pyapiLock);
     500           0 :                                 if (mmap_ptrs[i + 3] == NULL) {
     501           0 :                                         msg = createException(MAL, "pyapi3.eval", GDK_EXCEPTION);
     502           0 :                                         goto wrapup;
     503             :                                 }
     504           0 :                                 ret->array_data = mmap_ptrs[i + 3];
     505           0 :                                 ret->mask_data = NULL;
     506           0 :                                 ret->numpy_array = NULL;
     507           0 :                                 ret->numpy_mask = NULL;
     508           0 :                                 ret->multidimensional = FALSE;
     509           0 :                                 if (has_mask) {
     510           0 :                                         size_t mask_size = ret->count * sizeof(bool);
     511             : 
     512           0 :                                         assert(mask_size > 0);
     513           0 :                                         MT_lock_set(&pyapiLock);
     514           0 :                                         mmap_ptrs[pci->retc + (i + 3)] = GDKinitmmap(
     515           0 :                                                 mmap_id + pci->retc + (i + 3), mask_size,
     516           0 :                                                 &mmap_sizes[pci->retc + (i + 3)]);
     517           0 :                                         MT_lock_unset(&pyapiLock);
     518           0 :                                         if (mmap_ptrs[pci->retc + (i + 3)] == NULL) {
     519           0 :                                                 msg = createException(MAL, "pyapi3.eval", GDK_EXCEPTION);
     520           0 :                                                 goto wrapup;
     521             :                                         }
     522           0 :                                         ret->mask_data = mmap_ptrs[pci->retc + (i + 3)];
     523             :                                 }
     524             :                         }
     525           0 :                         msg = MAL_SUCCEED;
     526             : 
     527           0 :                         goto returnvalues;
     528             :                 }
     529             :         }
     530             : #endif
     531             : 
     532             :         // After this point we will execute Python Code, so we need to acquire the
     533             :         // GIL
     534         143 :         if (!mapped) {
     535         143 :                 gstate = Python_ObtainGIL();
     536             :         }
     537             : 
     538         143 :         if (sqlfun) {
     539             :                 // Check if exprStr references to a file path or if it contains the
     540             :                 // Python code itself
     541             :                 // There is no easy way to check, so the rule is if it starts with '/'
     542             :                 // it is always a file path,
     543             :                 // Otherwise it's a (relative) file path only if it ends with '.py'
     544         143 :                 size_t length = strlen(exprStr);
     545         143 :                 if (exprStr[0] == '/' ||
     546         143 :                         (exprStr[length - 3] == '.' && exprStr[length - 2] == 'p' &&
     547           1 :                          exprStr[length - 1] == 'y')) {
     548             :                         FILE *fp;
     549             :                         char address[1000];
     550             :                         struct stat buffer;
     551             :                         ssize_t length;
     552           1 :                         if (exprStr[0] == '/') {
     553             :                                 // absolute path
     554           0 :                                 snprintf(address, 1000, "%s", exprStr);
     555             :                         } else {
     556             :                                 // relative path
     557           1 :                                 snprintf(address, 1000, "%s/%s", FunctionBasePath(), exprStr);
     558             :                         }
     559           1 :                         if (MT_stat(address, &buffer) < 0) {
     560           0 :                                 msg = createException(
     561             :                                         MAL, "pyapi3.eval",
     562             :                                         SQLSTATE(PY000) "Could not find Python source file \"%s\".", address);
     563           0 :                                 goto wrapup;
     564             :                         }
     565           1 :                         fp = fopen(address, "r");
     566           1 :                         if (fp == NULL) {
     567           0 :                                 msg = createException(
     568             :                                         MAL, "pyapi3.eval",
     569             :                                         SQLSTATE(PY000) "Could not open Python source file \"%s\".", address);
     570           0 :                                 goto wrapup;
     571             :                         }
     572           1 :                         if(fseek(fp, 0, SEEK_END) == -1) {
     573           0 :                                 msg = createException(
     574             :                                                 MAL, "pyapi3.eval",
     575             :                                                 SQLSTATE(PY000) "Failed to set file pointer on Python source file \"%s\".", address);
     576           0 :                                 goto wrapup;
     577             :                         }
     578           1 :                         if((length = ftell(fp)) == -1) {
     579           0 :                                 msg = createException(
     580             :                                                 MAL, "pyapi3.eval",
     581             :                                                 SQLSTATE(PY000) "Failed to set file pointer on Python source file \"%s\".", address);
     582           0 :                                 goto wrapup;
     583             :                         }
     584           1 :                         if(fseek(fp, 0, SEEK_SET) == -1) {
     585           0 :                                 msg = createException(
     586             :                                                 MAL, "pyapi3.eval",
     587             :                                                 SQLSTATE(PY000) "Failed to set file pointer on Python source file \"%s\".", address);
     588           0 :                                 goto wrapup;
     589             :                         }
     590           1 :                         exprStr = GDKzalloc(length + 1);
     591           1 :                         if (exprStr == NULL) {
     592           0 :                                 msg = createException(MAL, "pyapi3.eval",
     593             :                                                                           SQLSTATE(HY013) MAL_MALLOC_FAIL " function body string.");
     594           0 :                                 goto wrapup;
     595             :                         }
     596             :                         freeexprStr = true;
     597           1 :                         if (fread(exprStr, 1, (size_t) length, fp) != (size_t) length) {
     598           0 :                                 msg = createException(MAL, "pyapi3.eval",
     599             :                                                                           SQLSTATE(PY000) "Failed to read from file \"%s\".",
     600             :                                                                           address);
     601           0 :                                 goto wrapup;
     602             :                         }
     603           1 :                         fclose(fp);
     604             :                 }
     605             :         }
     606             : 
     607             :         /*[PARSE_CODE]*/
     608         143 :         pycall = FormatCode(exprStr, args, argcount, 4, &code_object, &msg,
     609             :                                                 eval_additional_args, additional_columns);
     610         143 :         if (pycall == NULL && code_object == NULL) {
     611           0 :                 if (msg == NULL) {
     612           0 :                         msg = createException(MAL, "pyapi3.eval",
     613             :                                                                   SQLSTATE(PY000) "Error while parsing Python code.");
     614             :                 }
     615           0 :                 goto wrapup;
     616             :         }
     617             : 
     618             :         /*[CONVERT_BAT]*/
     619             :         // Now we will do the input handling (aka converting the input BATs to numpy
     620             :         // arrays)
     621             :         // We will put the python arrays in a PyTuple object, we will use this
     622             :         // PyTuple object as the set of arguments to call the Python function
     623         143 :         pArgs = PyTuple_New(argcount - (pci->retc + 2) +
     624         143 :                                                 (code_object == NULL ? additional_columns : 0));
     625         143 :         pColumns = PyDict_New();
     626         143 :         pColumnTypes = PyDict_New();
     627             : #ifdef HAVE_FORK
     628         143 :         pConnection = Py_Connection_Create(cntxt, !allow_loopback, query_ptr, query_sem);
     629             : #else
     630             :         pConnection = Py_Connection_Create(cntxt, !allow_loopback, 0, 0);
     631             : #endif
     632             : 
     633             :         // Now we will loop over the input BATs and convert them to python objects
     634         498 :         for (i = pci->retc + 2; i < argcount; i++) {
     635             :                 PyObject *result_array;
     636             :                 // t_start and t_end hold the part of the BAT we will convert to a Numpy
     637             :                 // array, by default these hold the entire BAT [0 - BATcount(b)]
     638         355 :                 size_t t_start = 0, t_end = pyinput_values[i - (pci->retc + 2)].count;
     639             : 
     640             :                 // There are two possibilities, either the input is a BAT, or the input
     641             :                 // is a scalar
     642             :                 // If the input is a scalar we will convert it to a python scalar
     643             :                 // If the input is a BAT, we will convert it to a numpy array
     644         355 :                 if (pyinput_values[i - (pci->retc + 2)].scalar) {
     645          48 :                         result_array = PyArrayObject_FromScalar(
     646             :                                 &pyinput_values[i - (pci->retc + 2)], &msg);
     647             :                 } else {
     648         307 :                         int type = pyinput_values[i - (pci->retc + 2)].bat_type;
     649         307 :                         result_array = PyMaskedArray_FromBAT(
     650             :                                 &pyinput_values[i - (pci->retc + 2)], t_start, t_end, &msg,
     651         307 :                                 !enable_zerocopy_input && type != TYPE_void);
     652             :                 }
     653         355 :                 if (result_array == NULL) {
     654           0 :                         if (msg == MAL_SUCCEED) {
     655           0 :                                 msg = createException(MAL, "pyapi3.eval",
     656             :                                                                           SQLSTATE(PY000) "Failed to create Numpy Array from BAT.");
     657             :                         }
     658           0 :                         goto wrapup;
     659             :                 }
     660         355 :                 if (code_object == NULL) {
     661         355 :                         PyObject *arg_type = PyString_FromString(
     662         355 :                                 BatType_Format(pyinput_values[i - (pci->retc + 2)].bat_type));
     663         355 :                         PyDict_SetItemString(pColumns, args[i], result_array);
     664         355 :                         PyDict_SetItemString(pColumnTypes, args[i], arg_type);
     665             :                         Py_DECREF(arg_type);
     666             :                 }
     667         355 :                 pyinput_values[i - (pci->retc + 2)].result = result_array;
     668         355 :                 PyTuple_SetItem(pArgs, ai++, result_array);
     669             :         }
     670         143 :         if (code_object == NULL) {
     671         143 :                 PyTuple_SetItem(pArgs, ai++, pColumns);
     672         143 :                 PyTuple_SetItem(pArgs, ai++, pColumnTypes);
     673         143 :                 PyTuple_SetItem(pArgs, ai++, pConnection);
     674             :         }
     675             : 
     676             :         /*[EXECUTE_CODE]*/
     677             :         // Now it is time to actually execute the python code
     678             :         {
     679             :                 PyObject *pFunc, *pModule, *v, *d;
     680             : 
     681             :                 // First we will load the main module, this is required
     682         143 :                 pModule = PyImport_AddModule("__main__");
     683         143 :                 if (!pModule) {
     684           0 :                         msg = PyError_CreateException("Failed to load module", NULL);
     685          15 :                         goto wrapup;
     686             :                 }
     687             : 
     688             :                 // Now we will add the UDF to the main module
     689         143 :                 d = PyModule_GetDict(pModule);
     690         143 :                 if (code_object == NULL) {
     691         143 :                         v = PyRun_StringFlags(pycall, Py_file_input, d, d, NULL);
     692         143 :                         if (v == NULL) {
     693           2 :                                 msg = PyError_CreateException("Could not parse Python code",
     694             :                                                                                           pycall);
     695           2 :                                 goto wrapup;
     696             :                         }
     697             :                         Py_DECREF(v);
     698             : 
     699             :                         // Now we need to obtain a pointer to the function, the function is
     700             :                         // called "pyfun"
     701         141 :                         pFunc = PyObject_GetAttrString(pModule, "pyfun");
     702         141 :                         if (!pFunc || !PyCallable_Check(pFunc)) {
     703           0 :                                 msg = PyError_CreateException("Failed to load function", NULL);
     704           0 :                                 goto wrapup;
     705             :                         }
     706             :                 } else {
     707           0 :                         pFunc = PyFunction_New(code_object, d);
     708           0 :                         if (!pFunc || !PyCallable_Check(pFunc)) {
     709           0 :                                 msg = PyError_CreateException("Failed to load function", NULL);
     710           0 :                                 goto wrapup;
     711             :                         }
     712             :                 }
     713             : 
     714         141 :                 if (parallel_aggregation) {
     715             :                         // parallel aggregation, we run the function once for every group in
     716             :                         // parallel
     717             :                         BAT *aggr_group = NULL, *group_first_occurrence = NULL;
     718             :                         size_t group_count, elements, element_it, group_it;
     719           5 :                         size_t *group_counts = NULL;
     720             :                         oid *aggr_group_arr = NULL;
     721           5 :                         void ***split_bats = NULL;
     722           5 :                         int named_columns = unnamedArgs - (pci->retc + 2);
     723             :                         PyObject *aggr_result;
     724             : 
     725             :                         // release the GIL
     726           5 :                         gstate = Python_ReleaseGIL(gstate);
     727             : 
     728             :                         // the first unnamed argument has the group numbers for every row
     729             :                         aggr_group =
     730           5 :                                 BATdescriptor(*getArgReference_bat(stk, pci, unnamedArgs));
     731             :                         // the second unnamed argument has the first occurrence of every
     732             :                         // group number, we just use this to get the total amount of groups
     733             :                         // quickly
     734             :                         group_first_occurrence =
     735           5 :                                 BATdescriptor(*getArgReference_bat(stk, pci, unnamedArgs + 1));
     736           5 :                         group_count = BATcount(group_first_occurrence);
     737           5 :                         BBPunfix(group_first_occurrence->batCacheid);
     738           5 :                         elements = BATcount(aggr_group); // get the amount of groups
     739             : 
     740             :                         // now we count, for every group, how many elements it has
     741           5 :                         group_counts = GDKzalloc(group_count * sizeof(size_t));
     742           5 :                         if (group_counts == NULL) {
     743           0 :                                 msg = createException(MAL, "pyapi3.eval",
     744             :                                                                           SQLSTATE(HY013) MAL_MALLOC_FAIL " group count array.");
     745           0 :                                 goto aggrwrapup;
     746             :                         }
     747             : 
     748           5 :                         if (BATtvoid(aggr_group)) {
     749           2 :                                 for (element_it = 0; element_it < elements; element_it++) {
     750           1 :                                         group_counts[element_it]++;
     751             :                                 }
     752             :                         } else {
     753           4 :                                 aggr_group_arr = (oid *)aggr_group->theap->base + aggr_group->tbaseoff;
     754          32 :                                 for (element_it = 0; element_it < elements; element_it++) {
     755          28 :                                         group_counts[aggr_group_arr[element_it]]++;
     756             :                                 }
     757             :                         }
     758             : 
     759             :                         // now perform the actual splitting of the data, first construct
     760             :                         // room for splits for every group
     761             :                         // elements are structured as follows:
     762             :                         // split_bats [groupnr] [columnnr] [elementnr]
     763           5 :                         split_bats = GDKzalloc(group_count * sizeof(void *));
     764          22 :                         for (group_it = 0; group_it < group_count; group_it++) {
     765          17 :                                 split_bats[group_it] =
     766          17 :                                         GDKzalloc(sizeof(void *) * named_columns);
     767             :                         }
     768             : 
     769             :                         // now split the columns one by one
     770          10 :                         for (i = 0; i < named_columns; i++) {
     771           5 :                                 PyInput input = pyinput_values[i];
     772           5 :                                 if (!input.scalar) {
     773           5 :                                         BATiter bi = bat_iterator(input.bat);
     774           5 :                                         void *basevals = bi.base;
     775             : 
     776           5 :                                         switch (input.bat_type) {
     777           0 :                                                 case TYPE_void:
     778           0 :                                                         NP_SPLIT_BAT(oid);
     779           0 :                                                         break;
     780           0 :                                                 case TYPE_bit:
     781           0 :                                                         NP_SPLIT_BAT(bit);
     782           0 :                                                         break;
     783           0 :                                                 case TYPE_bte:
     784           0 :                                                         NP_SPLIT_BAT(bte);
     785           0 :                                                         break;
     786           0 :                                                 case TYPE_sht:
     787           0 :                                                         NP_SPLIT_BAT(sht);
     788           0 :                                                         break;
     789           2 :                                                 case TYPE_int:
     790          16 :                                                         NP_SPLIT_BAT(int);
     791           2 :                                                         break;
     792           0 :                                                 case TYPE_oid:
     793           0 :                                                         NP_SPLIT_BAT(oid);
     794           0 :                                                         break;
     795           0 :                                                 case TYPE_lng:
     796           0 :                                                         NP_SPLIT_BAT(lng);
     797           0 :                                                         break;
     798           0 :                                                 case TYPE_flt:
     799           0 :                                                         NP_SPLIT_BAT(flt);
     800           0 :                                                         break;
     801           2 :                                                 case TYPE_dbl:
     802          24 :                                                         NP_SPLIT_BAT(dbl);
     803           2 :                                                         break;
     804             : #ifdef HAVE_HGE
     805           0 :                                                 case TYPE_hge:
     806             :                                                         basevals =
     807             :                                                                 PyArray_BYTES((PyArrayObject *)input.result);
     808           0 :                                                         NP_SPLIT_BAT(dbl);
     809           0 :                                                         break;
     810             : #endif
     811           1 :                                                 case TYPE_str: {
     812           1 :                                                         PyObject ****ptr = (PyObject ****)split_bats;
     813             :                                                         size_t *temp_indices;
     814             :                                                         PyObject **batcontent = (PyObject **)PyArray_DATA(
     815             :                                                                 (PyArrayObject *)input.result);
     816             :                                                         // allocate space for split BAT
     817           5 :                                                         for (group_it = 0; group_it < group_count;
     818           4 :                                                                  group_it++) {
     819           4 :                                                                 ptr[group_it][i] =
     820           4 :                                                                         GDKzalloc(group_counts[group_it] *
     821             :                                                                                           sizeof(PyObject *));
     822             :                                                         }
     823             :                                                         // iterate over the elements of the current BAT
     824             :                                                         temp_indices =
     825           1 :                                                                 GDKzalloc(sizeof(PyObject *) * group_count);
     826           1 :                                                         if (BATtvoid(aggr_group)) {
     827           0 :                                                                 for (element_it = 0; element_it < elements;
     828           0 :                                                                          element_it++) {
     829             :                                                                         // append current element to proper group
     830           0 :                                                                         ptr[element_it][i][temp_indices[element_it]++] =
     831           0 :                                                                                 batcontent[element_it];
     832             :                                                                 }
     833             :                                                         } else {
     834           8 :                                                                 for (element_it = 0; element_it < elements;
     835           7 :                                                                          element_it++) {
     836             :                                                                         // group of current element
     837           7 :                                                                         oid group = aggr_group_arr[element_it];
     838             :                                                                         // append current element to proper group
     839           7 :                                                                         ptr[group][i][temp_indices[group]++] =
     840           7 :                                                                                 batcontent[element_it];
     841             :                                                                 }
     842             :                                                         }
     843           1 :                                                         GDKfree(temp_indices);
     844           1 :                                                         break;
     845             :                                                 }
     846           0 :                                                 default:
     847           0 :                                                         msg = createException(
     848             :                                                                 MAL, "pyapi3.eval", SQLSTATE(PY000) "Unrecognized BAT type %s",
     849             :                                                                 BatType_Format(input.bat_type));
     850           0 :                                                         bat_iterator_end(&bi);
     851           0 :                                                         goto aggrwrapup;
     852             :                                         }
     853           5 :                                         bat_iterator_end(&bi);
     854             :                                 }
     855             :                         }
     856             : 
     857             :                         {
     858             :                                 int res = 0;
     859             :                                 size_t threads = 8; // GDKgetenv("gdk_nr_threads");
     860             :                                 size_t thread_it;
     861             :                                 size_t result_it;
     862             :                                 AggrParams *parameters;
     863             :                                 PyObject **results;
     864             :                                 double current = 0.0;
     865             :                                 double increment;
     866             : 
     867             :                                 // if there are less groups than threads, limit threads to
     868             :                                 // amount of groups
     869           5 :                                 threads = group_count < threads ? group_count : threads;
     870             : 
     871           5 :                                 increment = (double)group_count / (double)threads;
     872             :                                 // start running the threads
     873           5 :                                 parameters = GDKzalloc(threads * sizeof(AggrParams));
     874           5 :                                 results = GDKzalloc(group_count * sizeof(PyObject *));
     875          22 :                                 for (thread_it = 0; thread_it < threads; thread_it++) {
     876          17 :                                         AggrParams *params = &parameters[thread_it];
     877          17 :                                         params->named_columns = named_columns;
     878          17 :                                         params->additional_columns = additional_columns;
     879          17 :                                         params->group_count = group_count;
     880          17 :                                         params->group_counts = &group_counts;
     881          17 :                                         params->pyinput_values = &pyinput_values;
     882          17 :                                         params->column_types_dict = &pColumnTypes;
     883          17 :                                         params->split_bats = &split_bats;
     884          17 :                                         params->base = pci->retc + 2;
     885          17 :                                         params->function = &pFunc;
     886          17 :                                         params->connection = &pConnection;
     887          17 :                                         params->pycall = &pycall;
     888          17 :                                         params->group_start = (size_t)floor(current);
     889          17 :                                         params->group_end = (size_t)floor(current += increment);
     890          17 :                                         params->args = &args;
     891          17 :                                         params->msg = NULL;
     892          17 :                                         params->result_objects = results;
     893          17 :                                         res = MT_create_thread(&params->thread,
     894             :                                                                                    (void (*)(void *)) &
     895             :                                                                                            ComputeParallelAggregation,
     896             :                                                                                    params, MT_THR_JOINABLE,
     897             :                                                                                    "pyapi_par_aggr");
     898          17 :                                         if (res != 0) {
     899           0 :                                                 msg = createException(MAL, "pyapi3.eval",
     900             :                                                                                           SQLSTATE(PY000) "Failed to start thread.");
     901           0 :                                                 goto aggrwrapup;
     902             :                                         }
     903             :                                 }
     904          22 :                                 for (thread_it = 0; thread_it < threads; thread_it++) {
     905          17 :                                         AggrParams params = parameters[thread_it];
     906          17 :                                         int res = MT_join_thread(params.thread);
     907          17 :                                         if (res != 0) {
     908           0 :                                                 msg = createException(MAL, "pyapi3.eval",
     909             :                                                                                           "Failed to join thread.");
     910           0 :                                                 goto aggrwrapup;
     911             :                                         }
     912             :                                 }
     913             : 
     914          22 :                                 for (thread_it = 0; thread_it < threads; thread_it++) {
     915          17 :                                         AggrParams params = parameters[thread_it];
     916          17 :                                         if (results[thread_it] == NULL || params.msg != NULL) {
     917           0 :                                                 msg = params.msg;
     918           0 :                                                 goto wrapup;
     919             :                                         }
     920             :                                 }
     921             : 
     922             :                                 // we need the GIL again to group the parameters
     923           5 :                                 gstate = Python_ObtainGIL();
     924             : 
     925           5 :                                 aggr_result = PyList_New(group_count);
     926          22 :                                 for (result_it = 0; result_it < group_count; result_it++) {
     927          17 :                                         PyList_SetItem(aggr_result, result_it, results[result_it]);
     928             :                                 }
     929           5 :                                 GDKfree(parameters);
     930           5 :                                 GDKfree(results);
     931             :                         }
     932           5 :                         pResult = PyList_New(1);
     933           5 :                         PyList_SetItem(pResult, 0, aggr_result);
     934             : 
     935           5 :                 aggrwrapup:
     936           5 :                         if (group_counts != NULL) {
     937           5 :                                 GDKfree(group_counts);
     938             :                         }
     939           5 :                         if (split_bats != NULL) {
     940          22 :                                 for (group_it = 0; group_it < group_count; group_it++) {
     941          17 :                                         if (split_bats[group_it] != NULL) {
     942          34 :                                                 for (i = 0; i < named_columns; i++) {
     943          17 :                                                         if (split_bats[group_it][i] != NULL) {
     944          17 :                                                                 GDKfree(split_bats[group_it][i]);
     945             :                                                         }
     946             :                                                 }
     947          17 :                                                 GDKfree(split_bats[group_it]);
     948             :                                         }
     949             :                                 }
     950           5 :                                 GDKfree(split_bats);
     951             :                         }
     952             :                         if (aggr_group != NULL) {
     953           5 :                                 BBPunfix(aggr_group->batCacheid);
     954             :                         }
     955           5 :                         if (msg != MAL_SUCCEED) {
     956           0 :                                 goto wrapup;
     957             :                         }
     958             :                 } else {
     959             :                         // The function has been successfully created/compiled, all that
     960             :                         // remains is to actually call the function
     961         136 :                         pResult = PyObject_CallObject(pFunc, pArgs);
     962             :                 }
     963             : 
     964         141 :                 Py_DECREF(pFunc);
     965             :                 Py_DECREF(pArgs);
     966             : 
     967         141 :                 if (PyErr_Occurred()) {
     968          10 :                         msg = PyError_CreateException("Python exception", pycall);
     969          10 :                         if (code_object == NULL) {
     970          10 :                                 PyRun_SimpleString("del pyfun");
     971             :                         }
     972          10 :                         goto wrapup;
     973             :                 }
     974             : 
     975             :                 // if (code_object == NULL) { PyRun_SimpleString("del pyfun"); }
     976             : 
     977         131 :                 if (PyDict_Check(pResult)) { // Handle dictionary returns
     978             :                         // For dictionary returns we need to map each of the (key,value)
     979             :                         // pairs to the proper return value
     980             :                         // We first analyze the SQL Function structure for a list of return
     981             :                         // value names
     982             :                         char **retnames = NULL;
     983          35 :                         if (!varres) {
     984          35 :                                 if (sqlfun != NULL) {
     985          35 :                                         retnames = GDKzalloc(sizeof(char *) * sqlfun->res->cnt);
     986          35 :                                         argnode = sqlfun->res->h;
     987         122 :                                         for (i = 0; i < sqlfun->res->cnt; i++) {
     988          87 :                                                 retnames[i] = ((sql_arg *)argnode->data)->name;
     989          87 :                                                 argnode = argnode->next;
     990             :                                         }
     991             :                                 } else {
     992           0 :                                         msg = createException(MAL, "pyapi3.eval",
     993             :                                                                                   SQLSTATE(PY000) "Return value is a dictionary, but "
     994             :                                                                                   "there is no sql function object, so "
     995             :                                                                                   "we don't know the return value "
     996             :                                                                                   "names and mapping cannot be done.");
     997           0 :                                         goto wrapup;
     998             :                                 }
     999             :                         } else {
    1000             :                                 // If there are a variable number of return types, we take the
    1001             :                                 // column names from the dictionary
    1002           0 :                                 PyObject *keys = PyDict_Keys(pResult);
    1003           0 :                                 retcols = (int)PyList_Size(keys);
    1004           0 :                                 retnames = GDKzalloc(sizeof(char *) * retcols);
    1005           0 :                                 for (i = 0; i < retcols; i++) {
    1006           0 :                                         PyObject *colname = PyList_GetItem(keys, i);
    1007           0 :                                         if (!PyString_CheckExact(colname)) {
    1008           0 :                                                 msg = createException(MAL, "pyapi3.eval",
    1009             :                                                                                           SQLSTATE(PY000) "Expected a string key in the "
    1010             :                                                                                           "dictionary, but received an "
    1011             :                                                                                           "object of type %s",
    1012             :                                                                                           colname->ob_type->tp_name);
    1013           0 :                                                 goto wrapup;
    1014             :                                         }
    1015           0 :                                         retnames[i] = (char *) PyUnicode_AsUTF8(colname);
    1016             :                                 }
    1017             :                         }
    1018             :                         pResult =
    1019          35 :                                 PyDict_CheckForConversion(pResult, retcols, retnames, &msg);
    1020          35 :                         if (retnames != NULL)
    1021          35 :                                 GDKfree(retnames);
    1022          96 :                 } else if (varres) {
    1023           0 :                         msg = createException(MAL, "pyapi3.eval",
    1024             :                                                                   SQLSTATE(PY000) "Expected a variable number return values, "
    1025             :                                                                   "but the return type was not a dictionary. "
    1026             :                                                                   "We require the return type to be a "
    1027             :                                                                   "dictionary for column naming purposes.");
    1028           0 :                         goto wrapup;
    1029             :                 } else {
    1030             :                         // Now we need to do some error checking on the result object,
    1031             :                         // because the result object has to have the correct type/size
    1032             :                         // We will also do some converting of result objects to a common
    1033             :                         // type (such as scalar -> [[scalar]])
    1034          96 :                         pResult = PyObject_CheckForConversion(pResult, retcols, NULL, &msg);
    1035             :                 }
    1036         131 :                 if (pResult == NULL) {
    1037           3 :                         goto wrapup;
    1038             :                 }
    1039             :         }
    1040             : 
    1041         128 :         if (varres) {
    1042           0 :                 GDKfree(pyreturn_values);
    1043           0 :                 pyreturn_values = GDKzalloc(retcols * sizeof(PyReturn));
    1044             :         }
    1045             : 
    1046             :         // Now we have executed the Python function, we have to collect the return
    1047             :         // values and convert them to BATs
    1048             :         // We will first collect header information about the Python return objects
    1049             :         // and extract the underlying C arrays
    1050             :         // We will store this header information in a PyReturn object
    1051             : 
    1052             :         // The reason we are doing this as a separate step is because this
    1053             :         // preprocessing requires us to call the Python API
    1054             :         // Whereas the actual returning does not require us to call the Python API
    1055             :         // This means we can do the actual returning without holding the GIL
    1056         128 :         if (!PyObject_PreprocessObject(pResult, pyreturn_values, retcols, &msg)) {
    1057           0 :                 goto wrapup;
    1058             :         }
    1059             : 
    1060             : #ifdef HAVE_FORK
    1061             :         /*[FORKED]*/
    1062             :         // This is where the child process stops executing
    1063             :         // We have successfully executed the Python function and converted the
    1064             :         // result object to a C array
    1065             :         // Now all that is left is to copy the C array to shared memory so the main
    1066             :         // process can read it and return it
    1067         128 :         if (mapped && child_process) {
    1068             :                 ReturnBatDescr *ptr;
    1069             : 
    1070             :                 // First we will fill in the header information, we will need to get a
    1071             :                 // pointer to the header data first
    1072             :                 // The main process has already created the header data for the child
    1073             :                 // process
    1074           0 :                 if ((mmap_ptrs[0] = GDKinitmmap(mmap_id + 0, memory_size, &mmap_sizes[0])) == NULL) {
    1075           0 :                         msg = createException(MAL, "pyapi3.eval", GDK_EXCEPTION);
    1076           0 :                         goto wrapup;
    1077             :                 }
    1078             : 
    1079             :                 // Now we will write data about our result (memory size, type, number of
    1080             :                 // elements) to the header
    1081             :                 ptr = (ReturnBatDescr *)mmap_ptrs[0];
    1082           0 :                 for (i = 0; i < retcols; i++) {
    1083           0 :                         PyReturn *ret = &pyreturn_values[i];
    1084           0 :                         ReturnBatDescr *descr = &ptr[i];
    1085             : 
    1086           0 :                         if (ret->result_type == NPY_OBJECT) {
    1087             :                                 // We can't deal with NPY_OBJECT arrays, because these are
    1088             :                                 // 'arrays of pointers', so we can't just copy the content of
    1089             :                                 // the array into shared memory
    1090             :                                 // So if we're dealing with a NPY_OBJECT array, we convert them
    1091             :                                 // to a Numpy Array of type NPY_<TYPE> that corresponds with the
    1092             :                                 // desired BAT type
    1093             :                                 // WARNING: Because we could be converting to a NPY_STRING or
    1094             :                                 // NPY_UNICODE array (if the desired type is TYPE_str or
    1095             :                                 // TYPE_hge), this means that memory usage can explode
    1096             :                                 //   because NPY_STRING/NPY_UNICODE arrays are 2D string arrays
    1097             :                                 //   with fixed string length (so if there's one very large
    1098             :                                 //   string the size explodes quickly)
    1099             :                                 //   if someone has some problem with memory size exploding when
    1100             :                                 //   using PYTHON_MAP but it being fine in regular PYTHON this
    1101             :                                 //   is probably the issue
    1102           0 :                                 PyInput *inp = &pyinput_values[i - (pci->retc + 2)];
    1103           0 :                                 int bat_type = inp->bat_type;
    1104           0 :                                 PyObject *new_array = PyArray_FromAny(
    1105             :                                         ret->numpy_array,
    1106           0 :                                         PyArray_DescrFromType(BatType_ToPyType(bat_type)), 1, 1,
    1107             :                                         NPY_ARRAY_CARRAY | NPY_ARRAY_FORCECAST, NULL);
    1108           0 :                                 if (new_array == NULL) {
    1109           0 :                                         msg = createException(MAL, "pyapi3.eval",
    1110             :                                                                                   SQLSTATE(PY000) "Could not convert the returned "
    1111             :                                                                                   "NPY_OBJECT array to the desired "
    1112             :                                                                                   "array of type %s.\n",
    1113             :                                                                                   BatType_Format(bat_type));
    1114           0 :                                         goto wrapup;
    1115             :                                 }
    1116           0 :                                 Py_DECREF(ret->numpy_array); // do we really care about cleaning
    1117             :                                                                                          // this up, considering this only
    1118             :                                                                                          // happens in a separate process
    1119             :                                                                                          // that will be exited soon anyway?
    1120           0 :                                 ret->numpy_array = new_array;
    1121           0 :                                 ret->result_type =
    1122           0 :                                         PyArray_DESCR((PyArrayObject *)ret->numpy_array)->type_num;
    1123           0 :                                 ret->memory_size =
    1124           0 :                                         PyArray_DESCR((PyArrayObject *)ret->numpy_array)->elsize;
    1125           0 :                                 ret->count = PyArray_DIMS((PyArrayObject *)ret->numpy_array)[0];
    1126           0 :                                 ret->array_data =
    1127             :                                         PyArray_DATA((PyArrayObject *)ret->numpy_array);
    1128             :                         }
    1129             : 
    1130           0 :                         descr->npy_type = ret->result_type;
    1131           0 :                         descr->element_size = ret->memory_size;
    1132           0 :                         descr->bat_count = ret->count;
    1133           0 :                         descr->bat_size = ret->memory_size * ret->count;
    1134           0 :                         descr->has_mask = ret->mask_data != NULL;
    1135             : 
    1136           0 :                         if (ret->count > 0) {
    1137           0 :                                 int memory_size = ret->memory_size * ret->count;
    1138             :                                 char *mem_ptr;
    1139             :                                 // now create shared memory for the return value and copy the
    1140             :                                 // actual values
    1141           0 :                                 assert(memory_size > 0);
    1142           0 :                                 if ((mmap_ptrs[i + 3] = GDKinitmmap(mmap_id + i + 3, memory_size,
    1143           0 :                                                                                                         &mmap_sizes[i + 3])) == NULL) {
    1144           0 :                                         msg = createException(MAL, "pyapi3.eval", GDK_EXCEPTION);
    1145           0 :                                         goto wrapup;
    1146             :                                 }
    1147             :                                 mem_ptr = mmap_ptrs[i + 3];
    1148             :                                 assert(mem_ptr);
    1149           0 :                                 memcpy(mem_ptr, PyArray_DATA((PyArrayObject *)ret->numpy_array),
    1150             :                                            memory_size);
    1151             : 
    1152           0 :                                 if (descr->has_mask) {
    1153             :                                         bool *mask_ptr;
    1154           0 :                                         int mask_size = ret->count * sizeof(bool);
    1155           0 :                                         assert(mask_size > 0);
    1156             :                                         // create a memory space for the mask
    1157           0 :                                         if ((mmap_ptrs[retcols + (i + 3)] = GDKinitmmap(
    1158           0 :                                                          mmap_id + retcols + (i + 3), mask_size,
    1159           0 :                                                          &mmap_sizes[retcols + (i + 3)])) == NULL) {
    1160           0 :                                                 msg = createException(MAL, "pyapi3.eval", GDK_EXCEPTION);
    1161           0 :                                                 goto wrapup;
    1162             :                                         }
    1163           0 :                                         mask_ptr = mmap_ptrs[retcols + i + 3];
    1164           0 :                                         assert(mask_ptr);
    1165           0 :                                         memcpy(mask_ptr, ret->mask_data, mask_size);
    1166             :                                 }
    1167             :                         }
    1168             :                 }
    1169             :                 // now free the main process from the semaphore
    1170           0 :                 if (GDKchangesemval(query_sem, 0, 1) != GDK_SUCCEED) {
    1171           0 :                         msg = createException(MAL, "pyapi3.eval", GDK_EXCEPTION);
    1172           0 :                         goto wrapup;
    1173             :                 }
    1174             :                 // Exit child process without an error code
    1175           0 :                 exit(0);
    1176             :         }
    1177             : #endif
    1178             :         // We are done executing Python code (aside from cleanup), so we can release
    1179             :         // the GIL
    1180         128 :         gstate = Python_ReleaseGIL(gstate);
    1181             : 
    1182             : #ifdef HAVE_FORK // This goto is only used for multiprocessing, if HAVE_FORK is
    1183             :                                  // set to 0 this is unused
    1184         128 : returnvalues:
    1185             : #endif
    1186             :         /*[RETURN_VALUES]*/
    1187         128 :         argnode = sqlfun && sqlfun->res ? sqlfun->res->h : NULL;
    1188         304 :         for (i = 0; i < retcols; i++) {
    1189         176 :                 PyReturn *ret = &pyreturn_values[i];
    1190             :                 int bat_type = TYPE_any;
    1191             :                 sql_subtype *sql_subtype =
    1192         176 :                         argnode ? &((sql_arg *)argnode->data)->type : NULL;
    1193         176 :                 if (!varres) {
    1194         176 :                         bat_type = getBatType(getArgType(mb, pci, i));
    1195             : 
    1196         176 :                         if (bat_type == TYPE_any || bat_type == TYPE_void) {
    1197           0 :                                 bat_type = PyType_ToBat(ret->result_type);
    1198           0 :                                 getArgType(mb, pci, i) = bat_type;
    1199             :                         }
    1200             :                 } else {
    1201           0 :                         bat_type = PyType_ToBat(ret->result_type);
    1202             :                 }
    1203             : 
    1204         176 :                 b = PyObject_ConvertToBAT(ret, sql_subtype, bat_type, i, seqbase, &msg,
    1205         176 :                                                                   !enable_zerocopy_output);
    1206         176 :                 if (b == NULL) {
    1207           0 :                         goto wrapup;
    1208             :                 }
    1209             : 
    1210         176 :                 msg = MAL_SUCCEED;
    1211         176 :                 if (isaBatType(getArgType(mb, pci, i))) {
    1212         158 :                         *getArgReference_bat(stk, pci, i) = b->batCacheid;
    1213         158 :                         BBPkeepref(b->batCacheid);
    1214             :                 } else { // single value return, only for non-grouped aggregations
    1215          18 :                         BATiter li = bat_iterator(b);
    1216          18 :                         if (bat_type != TYPE_str) {
    1217          18 :                                 if (VALinit(&stk->stk[pci->argv[i]], bat_type, li.base) ==
    1218             :                                         NULL)
    1219           0 :                                         msg = createException(MAL, "pyapi3.eval", SQLSTATE(HY013) MAL_MALLOC_FAIL);
    1220             :                         } else {
    1221           0 :                                 if (VALinit(&stk->stk[pci->argv[i]], bat_type,
    1222           0 :                                                         BUNtail(li, 0)) == NULL)
    1223           0 :                                         msg = createException(MAL, "pyapi3.eval", SQLSTATE(HY013) MAL_MALLOC_FAIL);
    1224             :                         }
    1225          18 :                         bat_iterator_end(&li);
    1226             :                 }
    1227         176 :                 if (argnode) {
    1228         176 :                         argnode = argnode->next;
    1229             :                 }
    1230             :         }
    1231         128 : wrapup:
    1232             : 
    1233             : #ifdef HAVE_FORK
    1234         148 :         if (mapped && child_process) {
    1235             :                 // If we get here, something went wrong in a child process
    1236             :                 char *error_mem;
    1237             :                 ReturnBatDescr *ptr;
    1238             : 
    1239             :                 // Now we exit the program with an error code
    1240           0 :                 if (GDKchangesemval(query_sem, 0, 1) != GDK_SUCCEED) {
    1241           0 :                         exit(1);
    1242             :                 }
    1243             : 
    1244           0 :                 assert(memory_size > 0);
    1245           0 :                 if ((mmap_ptrs[0] = GDKinitmmap(mmap_id + 0, memory_size, &mmap_sizes[0])) == NULL) {
    1246           0 :                         exit(1);
    1247             :                 }
    1248             : 
    1249             :                 // To indicate that we failed, we will write information to our header
    1250             :                 ptr = (ReturnBatDescr *)mmap_ptrs[0];
    1251           0 :                 for (i = 0; i < retcols; i++) {
    1252           0 :                         ReturnBatDescr *descr = &ptr[i];
    1253             :                         // We will write descr->npy_type to -1, so other processes can see
    1254             :                         // that we failed
    1255           0 :                         descr->npy_type = -1;
    1256             :                         // We will write the memory size of our error message to the
    1257             :                         // bat_size, so the main process can access the shared memory
    1258           0 :                         descr->bat_size = (strlen(msg) + 1) * sizeof(char);
    1259             :                 }
    1260             : 
    1261             :                 // Now create the shared memory to write our error message to
    1262             :                 // We can simply use the slot mmap_id + 3, even though this is normally
    1263             :                 // used for query return values
    1264             :                 // This is because, if the process fails, no values will be returned
    1265           0 :                 if ((error_mem = GDKinitmmap(mmap_id + 3,
    1266           0 :                                                                          (strlen(msg) + 1) * sizeof(char),
    1267             :                                                                          NULL)) == NULL) {
    1268           0 :                         exit(1);
    1269             :                 }
    1270           0 :                 strcpy(error_mem, msg);
    1271           0 :                 exit(1);
    1272             :         }
    1273             : #endif
    1274             : 
    1275             : #ifdef HAVE_FORK
    1276         148 :         if (holds_gil) {
    1277         141 :                 MT_lock_set(&pyapiLock);
    1278         141 :                 python_call_active = false;
    1279         141 :                 MT_lock_unset(&pyapiLock);
    1280             :         }
    1281             : 
    1282         148 :         if (mapped) {
    1283           0 :                 for (i = 0; i < retcols; i++) {
    1284           0 :                         PyReturn *ret = &pyreturn_values[i];
    1285           0 :                         if (ret->mmap_id < 0) {
    1286             :                                 // if we directly give the mmap file to a BAT, don't delete the
    1287             :                                 // MMAP file
    1288           0 :                                 mmap_ptrs[i + 3] = NULL;
    1289             :                         }
    1290             :                 }
    1291           0 :                 for (i = 0; i < 3 + pci->retc * 2; i++) {
    1292           0 :                         if (mmap_ptrs[i] != NULL) {
    1293           0 :                                 GDKreleasemmap(mmap_ptrs[i], mmap_sizes[i], mmap_id + i);
    1294             :                         }
    1295             :                 }
    1296           0 :                 if (mmap_ptrs)
    1297           0 :                         GDKfree(mmap_ptrs);
    1298           0 :                 if (mmap_sizes)
    1299           0 :                         GDKfree(mmap_sizes);
    1300           0 :                 if (query_sem > 0) {
    1301           0 :                         GDKreleasesem(query_sem);
    1302             :                 }
    1303             :         }
    1304             : #endif
    1305             :         // Actual cleanup
    1306             :         // Cleanup input BATs
    1307         526 :         for (i = pci->retc + 2; i < pci->argc; i++) {
    1308         378 :                 PyInput *inp = &pyinput_values[i - (pci->retc + 2)];
    1309         378 :                 if (inp->bat != NULL)
    1310         312 :                         BBPunfix(inp->bat->batCacheid);
    1311             :         }
    1312         148 :         if (pResult != NULL && gstate == 0) {
    1313             :                 // if there is a pResult here, we are running single threaded (LANGUAGE
    1314             :                 // PYTHON),
    1315             :                 // thus we need to free python objects, thus we need to obtain the GIL
    1316         128 :                 gstate = Python_ObtainGIL();
    1317             :         }
    1318         357 :         for (i = 0; i < retcols; i++) {
    1319         209 :                 PyReturn *ret = &pyreturn_values[i];
    1320             :                 // First clean up any return values
    1321         209 :                 if (!ret->multidimensional) {
    1322             :                         // Clean up numpy arrays, if they are there
    1323         209 :                         if (ret->numpy_array != NULL) {
    1324             :                                 Py_DECREF(ret->numpy_array);
    1325             :                         }
    1326         209 :                         if (ret->numpy_mask != NULL) {
    1327             :                                 Py_DECREF(ret->numpy_mask);
    1328             :                         }
    1329             :                 }
    1330             :         }
    1331         148 :         if (pResult != NULL) {
    1332             :                 Py_DECREF(pResult);
    1333             :         }
    1334         148 :         if (gstate != 0) {
    1335         143 :                 gstate = Python_ReleaseGIL(gstate);
    1336             :         }
    1337             : 
    1338             :         // Now release some GDK memory we allocated for strings and input values
    1339         148 :         GDKfree(pyreturn_values);
    1340         148 :         GDKfree(pyinput_values);
    1341        1031 :         for (i = 0; i < pci->argc; i++)
    1342         883 :                 if (args[i])
    1343         363 :                         GDKfree(args[i]);
    1344         148 :         GDKfree(args);
    1345         148 :         GDKfree(pycall);
    1346         148 :         if (freeexprStr)
    1347           1 :                 GDKfree(exprStr);
    1348             : 
    1349         148 :         return msg;
    1350             : }
    1351             : 
    1352             : str
    1353           8 : PYAPI3PyAPIprelude(void *ret) {
    1354             :         (void) ret;
    1355           8 :         MT_lock_set(&pyapiLock);
    1356           8 :         if (!pyapiInitialized) {
    1357           8 :                 wchar_t* program = Py_DecodeLocale("mserver5", NULL);
    1358           8 :                 wchar_t* argv[] = { program };
    1359             :                 str msg = MAL_SUCCEED;
    1360             :                 PyObject *tmp;
    1361           8 :                 Py_Initialize();
    1362           8 :                 PySys_SetArgvEx(1, argv, 0);
    1363           8 :                 _import_array();
    1364           8 :                 msg = _connection_init();
    1365           8 :                 if (msg != MAL_SUCCEED) {
    1366           0 :                         MT_lock_unset(&pyapiLock);
    1367           0 :                         return msg;
    1368             :                 }
    1369           8 :                 msg = _conversion_init();
    1370           8 :                 if (msg != MAL_SUCCEED) {
    1371           0 :                         MT_lock_unset(&pyapiLock);
    1372           0 :                         return msg;
    1373             :                 }
    1374           8 :                 _pytypes_init();
    1375           8 :                 _loader_init();
    1376           8 :                 tmp = PyString_FromString("marshal");
    1377           8 :                 marshal_module = PyImport_Import(tmp);
    1378             :                 Py_DECREF(tmp);
    1379           8 :                 if (marshal_module == NULL) {
    1380           0 :                         MT_lock_unset(&pyapiLock);
    1381           0 :                         return createException(MAL, "pyapi3.eval", SQLSTATE(PY000) "Failed to load Marshal module.");
    1382             :                 }
    1383           8 :                 marshal_loads = PyObject_GetAttrString(marshal_module, "loads");
    1384           8 :                 if (marshal_loads == NULL) {
    1385           0 :                         MT_lock_unset(&pyapiLock);
    1386           0 :                         return createException(MAL, "pyapi3.eval", SQLSTATE(PY000) "Failed to load function \"loads\" from Marshal module.");
    1387             :                 }
    1388           8 :                 if (PyRun_SimpleString("import numpy") != 0) {
    1389           0 :                         msg = PyError_CreateException("Failed to initialize embedded python", NULL);
    1390           0 :                         MT_lock_unset(&pyapiLock);
    1391           0 :                         return msg;
    1392             :                 }
    1393           8 :                 PyEval_SaveThread();
    1394             :                 if (msg != MAL_SUCCEED) {
    1395             :                         MT_lock_unset(&pyapiLock);
    1396             :                         return msg;
    1397             :                 }
    1398           8 :                 pyapiInitialized = true;
    1399           8 :                 fprintf(stdout, "# MonetDB/Python%d module loaded\n", 3);
    1400             :         }
    1401           8 :         MT_lock_unset(&pyapiLock);
    1402           8 :         option_disable_fork = GDKgetenv_istrue(fork_disableflag) || GDKgetenv_isyes(fork_disableflag);
    1403           8 :         return MAL_SUCCEED;
    1404             : }
    1405             : 
    1406          24 : char *PyError_CreateException(char *error_text, char *pycall)
    1407             : {
    1408          24 :         PyObject *py_error_type = NULL, *py_error_value = NULL,
    1409          24 :                          *py_error_traceback = NULL;
    1410             :         const char *py_error_string = NULL;
    1411          24 :         lng line_number = -1;
    1412             : 
    1413          24 :         PyErr_Fetch(&py_error_type, &py_error_value, &py_error_traceback);
    1414          24 :         if (py_error_value) {
    1415             :                 PyObject *error;
    1416          24 :                 PyErr_NormalizeException(&py_error_type, &py_error_value,
    1417             :                                                                  &py_error_traceback);
    1418          24 :                 error = PyObject_Str(py_error_value);
    1419             : 
    1420          24 :                 py_error_string = PyString_AS_STRING(error);
    1421          24 :                 Py_XDECREF(error);
    1422          24 :                 if (pycall != NULL && strlen(pycall) > 0) {
    1423          24 :                         if (py_error_traceback == NULL) {
    1424             :                                 // no traceback info, this means we are dealing with a parsing
    1425             :                                 // error
    1426             :                                 // line information should be in the error message
    1427           2 :                                 sscanf(py_error_string, "%*[^0-9]" LLSCN, &line_number);
    1428           2 :                                 if (line_number < 0)
    1429           0 :                                         goto finally;
    1430             :                         } else {
    1431          22 :                                 line_number =
    1432          22 :                                         ((PyTracebackObject *)py_error_traceback)->tb_lineno;
    1433             :                         }
    1434             : 
    1435             :                         // Now only display the line numbers around the error message, we
    1436             :                         // display 5 lines around the error message
    1437             :                         {
    1438             :                                 char linenr[32];
    1439             :                                 size_t nrpos, pos, i, j;
    1440             :                                 char lineinformation[5000]; // we only support 5000 characters
    1441             :                                                                                         // for 5 lines of the program,
    1442             :                                                                                         // should be enough
    1443             :                                 nrpos = 0; // Current line number
    1444             :                                 pos = 0; // Current position in the lineinformation result array
    1445        3509 :                                 for (i = 0; i < strlen(pycall); i++) {
    1446        3485 :                                         if (pycall[i] == '\n' || i == 0) {
    1447             :                                                 // Check if we have arrived at a new line, if we have
    1448             :                                                 // increment the line count
    1449         179 :                                                 nrpos++;
    1450             :                                                 // Now check if we should display this line
    1451         179 :                                                 if (nrpos >= ((size_t)line_number - 2) &&
    1452         109 :                                                         nrpos <= ((size_t)line_number + 2) && pos < 4997) {
    1453             :                                                         // We shouldn't put a newline on the first line we
    1454             :                                                         // encounter, only on subsequent lines
    1455          98 :                                                         if (nrpos > ((size_t)line_number - 2))
    1456          83 :                                                                 lineinformation[pos++] = '\n';
    1457          98 :                                                         if ((size_t)line_number == nrpos) {
    1458             :                                                                 // If this line is the 'error' line, add an
    1459             :                                                                 // arrow before it, otherwise just add spaces
    1460          24 :                                                                 lineinformation[pos++] = '>';
    1461          24 :                                                                 lineinformation[pos++] = ' ';
    1462             :                                                         } else {
    1463          74 :                                                                 lineinformation[pos++] = ' ';
    1464          74 :                                                                 lineinformation[pos++] = ' ';
    1465             :                                                         }
    1466          98 :                                                         snprintf(linenr, 32, "%zu", nrpos);
    1467         198 :                                                         for (j = 0; j < strlen(linenr); j++) {
    1468         100 :                                                                 lineinformation[pos++] = linenr[j];
    1469             :                                                         }
    1470          98 :                                                         lineinformation[pos++] = '.';
    1471          98 :                                                         lineinformation[pos++] = ' ';
    1472             :                                                 }
    1473             :                                         }
    1474        3485 :                                         if (pycall[i] != '\n' && nrpos >= (size_t)line_number - 2 &&
    1475        2064 :                                                 nrpos <= (size_t)line_number + 2 && pos < 4999) {
    1476             :                                                 // If we are on a line number that we have to display,
    1477             :                                                 // copy the text from this line for display
    1478        1993 :                                                 lineinformation[pos++] = pycall[i];
    1479             :                                         }
    1480             :                                 }
    1481          24 :                                 lineinformation[pos] = '\0';
    1482          24 :                                 return createException(MAL, "pyapi3.eval",  SQLSTATE(PY000) "%s\n%s\n%s",
    1483             :                                                                            error_text, lineinformation,
    1484             :                                                                            py_error_string);
    1485             :                         }
    1486             :                 }
    1487             :         } else {
    1488             :                 py_error_string = "";
    1489             :         }
    1490           0 : finally:
    1491           0 :         if (pycall == NULL)
    1492           0 :                 return createException(MAL, "pyapi3.eval", SQLSTATE(PY000) "%s\n%s", error_text,
    1493             :                                                            py_error_string);
    1494           0 :         return createException(MAL, "pyapi3.eval", SQLSTATE(PY000) "%s\n%s\n%s", error_text, pycall,
    1495             :                                                    py_error_string);
    1496             : }
    1497             : 
    1498          17 : static void ComputeParallelAggregation(AggrParams *p)
    1499             : {
    1500             :         int i;
    1501             :         size_t group_it, ai;
    1502             :         bool gstate = 0;
    1503             :         // now perform the actual aggregation
    1504             :         // we perform one aggregation per group
    1505             : 
    1506             :         // we need the GIL to execute the functions
    1507          17 :         gstate = Python_ObtainGIL();
    1508          34 :         for (group_it = p->group_start; group_it < p->group_end; group_it++) {
    1509             :                 // we first have to construct new
    1510             :                 PyObject *pArgsPartial =
    1511          17 :                         PyTuple_New(p->named_columns + p->additional_columns);
    1512          17 :                 PyObject *pColumnsPartial = PyDict_New();
    1513             :                 PyObject *result;
    1514          17 :                 size_t group_elements = (*p->group_counts)[group_it];
    1515             :                 ai = 0;
    1516             :                 // iterate over columns
    1517          34 :                 for (i = 0; i < (int)p->named_columns; i++) {
    1518             :                         PyObject *vararray = NULL;
    1519          17 :                         PyInput input = (*p->pyinput_values)[i];
    1520          17 :                         if (input.scalar) {
    1521             :                                 // scalar not handled yet
    1522             :                                 vararray = input.result;
    1523             :                         } else {
    1524          17 :                                 npy_intp elements[1] = {group_elements};
    1525          17 :                                 switch (input.bat_type) {
    1526           0 :                                         case TYPE_void:
    1527           0 :                                                 vararray = PyArray_New(
    1528             :                                                         &PyArray_Type, 1, elements,
    1529             : #if SIZEOF_OID == SIZEOF_INT
    1530             :                                                         NPY_UINT
    1531             : #else
    1532             :                                                         NPY_ULONGLONG
    1533             : #endif
    1534             :                                                         ,
    1535             :                                                         NULL,
    1536           0 :                                                         ((oid ***)(*p->split_bats))[group_it][i], 0,
    1537             :                                                         NPY_ARRAY_CARRAY || !NPY_ARRAY_WRITEABLE, NULL);
    1538           0 :                                                 break;
    1539           0 :                                         case TYPE_oid:
    1540           0 :                                                 vararray = PyArray_New(
    1541             :                                                         &PyArray_Type, 1, elements,
    1542             : #if SIZEOF_OID == SIZEOF_INT
    1543             :                                                         NPY_UINT32
    1544             : #else
    1545             :                                                         NPY_UINT64
    1546             : #endif
    1547             :                                                         ,
    1548             :                                                         NULL,
    1549           0 :                                                         ((oid ***)(*p->split_bats))[group_it][i], 0,
    1550             :                                                         NPY_ARRAY_CARRAY || !NPY_ARRAY_WRITEABLE, NULL);
    1551           0 :                                                 break;
    1552           0 :                                         case TYPE_bit:
    1553           0 :                                                 vararray = PyArray_New(
    1554             :                                                         &PyArray_Type, 1, elements, NPY_BOOL, NULL,
    1555           0 :                                                         ((bit ***)(*p->split_bats))[group_it][i], 0,
    1556             :                                                         NPY_ARRAY_CARRAY || !NPY_ARRAY_WRITEABLE, NULL);
    1557           0 :                                                 break;
    1558           0 :                                         case TYPE_bte:
    1559           0 :                                                 vararray = PyArray_New(
    1560             :                                                         &PyArray_Type, 1, elements, NPY_INT8, NULL,
    1561           0 :                                                         ((bte ***)(*p->split_bats))[group_it][i], 0,
    1562             :                                                         NPY_ARRAY_CARRAY || !NPY_ARRAY_WRITEABLE, NULL);
    1563           0 :                                                 break;
    1564           0 :                                         case TYPE_sht:
    1565           0 :                                                 vararray = PyArray_New(
    1566             :                                                         &PyArray_Type, 1, elements, NPY_INT16, NULL,
    1567           0 :                                                         ((sht ***)(*p->split_bats))[group_it][i], 0,
    1568             :                                                         NPY_ARRAY_CARRAY || !NPY_ARRAY_WRITEABLE, NULL);
    1569           0 :                                                 break;
    1570           5 :                                         case TYPE_int:
    1571           5 :                                                 vararray = PyArray_New(
    1572             :                                                         &PyArray_Type, 1, elements, NPY_INT32, NULL,
    1573           5 :                                                         ((int ***)(*p->split_bats))[group_it][i], 0,
    1574             :                                                         NPY_ARRAY_CARRAY || !NPY_ARRAY_WRITEABLE, NULL);
    1575           5 :                                                 break;
    1576           0 :                                         case TYPE_lng:
    1577           0 :                                                 vararray = PyArray_New(
    1578             :                                                         &PyArray_Type, 1, elements, NPY_INT64, NULL,
    1579           0 :                                                         ((lng ***)(*p->split_bats))[group_it][i], 0,
    1580             :                                                         NPY_ARRAY_CARRAY || !NPY_ARRAY_WRITEABLE, NULL);
    1581           0 :                                                 break;
    1582           0 :                                         case TYPE_flt:
    1583           0 :                                                 vararray = PyArray_New(
    1584             :                                                         &PyArray_Type, 1, elements, NPY_FLOAT32, NULL,
    1585           0 :                                                         ((flt ***)(*p->split_bats))[group_it][i], 0,
    1586             :                                                         NPY_ARRAY_CARRAY || !NPY_ARRAY_WRITEABLE, NULL);
    1587           0 :                                                 break;
    1588             : #ifdef HAVE_HGE
    1589           8 :                                         case TYPE_hge:
    1590             : #endif
    1591             :                                         case TYPE_dbl:
    1592           8 :                                                 vararray = PyArray_New(
    1593             :                                                         &PyArray_Type, 1, elements, NPY_FLOAT64, NULL,
    1594           8 :                                                         ((dbl ***)(*p->split_bats))[group_it][i], 0,
    1595             :                                                         NPY_ARRAY_CARRAY || !NPY_ARRAY_WRITEABLE, NULL);
    1596           8 :                                                 break;
    1597           4 :                                         case TYPE_str:
    1598           4 :                                                 vararray = PyArray_New(
    1599             :                                                         &PyArray_Type, 1, elements, NPY_OBJECT, NULL,
    1600           4 :                                                         ((PyObject ****)(*p->split_bats))[group_it][i], 0,
    1601             :                                                         NPY_ARRAY_CARRAY || !NPY_ARRAY_WRITEABLE, NULL);
    1602           4 :                                                 break;
    1603             :                                 }
    1604             : 
    1605          17 :                                 if (vararray == NULL) {
    1606           0 :                                         p->msg = createException(MAL, "pyapi3.eval", SQLSTATE(HY013) MAL_MALLOC_FAIL
    1607             :                                                                                          " to create NumPy array.");
    1608           0 :                                         goto wrapup;
    1609             :                                 }
    1610             :                         }
    1611             :                         // fill in _columns array
    1612          17 :                         PyDict_SetItemString(pColumnsPartial, (*p->args)[p->base + i],
    1613             :                                                                  vararray);
    1614             : 
    1615          17 :                         PyTuple_SetItem(pArgsPartial, ai++, vararray);
    1616             :                 }
    1617             : 
    1618             :                 // additional parameters
    1619          17 :                 PyTuple_SetItem(pArgsPartial, ai++, pColumnsPartial);
    1620          17 :                 PyTuple_SetItem(pArgsPartial, ai++, *p->column_types_dict);
    1621          17 :                 Py_INCREF(*p->column_types_dict);
    1622          17 :                 PyTuple_SetItem(pArgsPartial, ai++, *p->connection);
    1623          17 :                 Py_INCREF(*p->connection);
    1624             : 
    1625             :                 // call the aggregation function
    1626          17 :                 result = PyObject_CallObject(*p->function, pArgsPartial);
    1627             :                 Py_DECREF(pArgsPartial);
    1628             : 
    1629          17 :                 if (result == NULL) {
    1630           0 :                         p->msg = PyError_CreateException("Python exception", *p->pycall);
    1631           0 :                         goto wrapup;
    1632             :                 }
    1633             :                 // gather results
    1634          17 :                 p->result_objects[group_it] = result;
    1635             :         }
    1636             : // release the GIL again
    1637          17 : wrapup:
    1638          17 :         gstate = Python_ReleaseGIL(gstate);
    1639          17 : }
    1640             : 
    1641           5 : static str CreateEmptyReturn(MalBlkPtr mb, MalStkPtr stk, InstrPtr pci,
    1642             :                                                          size_t retcols, oid seqbase)
    1643             : {
    1644             :         str msg = MAL_SUCCEED;
    1645           5 :         void **res = GDKzalloc(retcols * sizeof(void*));
    1646             : 
    1647           5 :         if (!res) {
    1648           0 :                 msg = createException(MAL, "pyapi3.eval", SQLSTATE(HY013) MAL_MALLOC_FAIL);
    1649           0 :                 goto bailout;
    1650             :         }
    1651             : 
    1652          10 :         for (size_t i = 0; i < retcols; i++) {
    1653           5 :                 if (isaBatType(getArgType(mb, pci, i))) {
    1654           4 :                         BAT *b = COLnew(seqbase, getBatType(getArgType(mb, pci, i)), 0, TRANSIENT);
    1655           4 :                         if (!b) {
    1656           0 :                                 msg = createException(MAL, "pyapi3.eval", GDK_EXCEPTION);
    1657           0 :                                 goto bailout;
    1658             :                         }
    1659           4 :                         ((BAT**)res)[i] = b;
    1660             :                 } else { // single value return, only for non-grouped aggregations
    1661             :                         // return NULL to conform to SQL aggregates
    1662             :                         int tpe = getArgType(mb, pci, i);
    1663           1 :                         if (!VALinit(&stk->stk[pci->argv[i]], tpe, ATOMnilptr(tpe))) {
    1664           0 :                                 msg = createException(MAL, "pyapi3.eval", SQLSTATE(HY013) MAL_MALLOC_FAIL);
    1665           0 :                                 goto bailout;
    1666             :                         }
    1667           1 :                         ((ValPtr*)res)[i] = &stk->stk[pci->argv[i]];
    1668             :                 }
    1669             :         }
    1670             : 
    1671           5 : bailout:
    1672           5 :         if (res) {
    1673          10 :                 for (size_t i = 0; i < retcols; i++) {
    1674           5 :                         if (isaBatType(getArgType(mb, pci, i))) {
    1675           4 :                                 BAT *b = ((BAT**)res)[i];
    1676             : 
    1677           4 :                                 if (b && msg) {
    1678           0 :                                         BBPreclaim(b);
    1679           4 :                                 } else if (b) {
    1680           4 :                                         BBPkeepref(*getArgReference_bat(stk, pci, i) = b->batCacheid);
    1681             :                                 }
    1682           1 :                         } else if (msg) {
    1683           0 :                                 ValPtr pt = ((ValPtr*)res)[i];
    1684             : 
    1685           0 :                                 if (pt)
    1686           0 :                                         VALclear(pt);
    1687             :                         }
    1688             :                 }
    1689           5 :                 GDKfree(res);
    1690             :         }
    1691           5 :         return msg;
    1692             : }
    1693             : 
    1694             : #include "mel.h"
    1695             : static mel_func pyapi3_init_funcs[] = {
    1696             :  pattern("pyapi3", "eval", PYAPI3PyAPIevalStd, true, "Execute a simple Python script returning a single value", args(1,3, argany("",0),arg("fptr",ptr),arg("expr",str))),
    1697             :  pattern("pyapi3", "eval", PYAPI3PyAPIevalStd, true, "Execute a simple Python script value", args(1,4, varargany("",0),arg("fptr",ptr),arg("expr",str),varargany("arg",0))),
    1698             :  pattern("pyapi3", "subeval_aggr", PYAPI3PyAPIevalAggr, true, "grouped aggregates through Python", args(1,4, varargany("",0),arg("fptr",ptr),arg("expr",str),varargany("arg",0))),
    1699             :  pattern("pyapi3", "eval_aggr", PYAPI3PyAPIevalAggr, true, "grouped aggregates through Python", args(1,4, varargany("",0),arg("fptr",ptr),arg("expr",str),varargany("arg",0))),
    1700             :  pattern("pyapi3", "eval_loader", PYAPI3PyAPIevalLoader, true, "loader functions through Python", args(1,3, varargany("",0),arg("fptr",ptr),arg("expr",str))),
    1701             :  pattern("pyapi3", "eval_loader", PYAPI3PyAPIevalLoader, true, "loader functions through Python", args(1,4, varargany("",0),arg("fptr",ptr),arg("expr",str),varargany("arg",0))),
    1702             :  command("pyapi3", "prelude", PYAPI3PyAPIprelude, false, "", args(1,1, arg("",void))),
    1703             :  pattern("batpyapi3", "eval", PYAPI3PyAPIevalStd, true, "Execute a simple Python script value", args(1,4, varargany("",0),arg("fptr",ptr),arg("expr",str),varargany("arg",0))),
    1704             :  pattern("batpyapi3", "subeval_aggr", PYAPI3PyAPIevalAggr, true, "grouped aggregates through Python", args(1,4, varargany("",0),arg("fptr",ptr),arg("expr",str),varargany("arg",0))),
    1705             :  pattern("batpyapi3", "eval_aggr", PYAPI3PyAPIevalAggr, true, "grouped aggregates through Python", args(1,4, varargany("",0),arg("fptr",ptr),arg("expr",str),varargany("arg",0))),
    1706             :  pattern("batpyapi3", "eval_loader", PYAPI3PyAPIevalLoader, true, "loader functions through Python", args(1,3, varargany("",0),arg("fptr",ptr),arg("expr",str))),
    1707             :  pattern("batpyapi3", "eval_loader", PYAPI3PyAPIevalLoader, true, "loader functions through Python", args(1,4, varargany("",0),arg("fptr",ptr),arg("expr",str),varargany("arg",0))),
    1708             :  pattern("pyapi3map", "eval", PYAPI3PyAPIevalStdMap, false, "Execute a simple Python script returning a single value", args(1,3, argany("",0),arg("fptr",ptr),arg("expr",str))),
    1709             :  pattern("pyapi3map", "eval", PYAPI3PyAPIevalStdMap, false, "Execute a simple Python script value", args(1,4, varargany("",0),arg("fptr",ptr),arg("expr",str),varargany("arg",0))),
    1710             :  pattern("pyapi3map", "subeval_aggr", PYAPI3PyAPIevalAggrMap, false, "grouped aggregates through Python", args(1,4, varargany("",0),arg("fptr",ptr),arg("expr",str),varargany("arg",0))),
    1711             :  pattern("pyapi3map", "eval_aggr", PYAPI3PyAPIevalAggrMap, false, "grouped aggregates through Python", args(1,4, varargany("",0),arg("fptr",ptr),arg("expr",str),varargany("arg",0))),
    1712             :  pattern("batpyapi3map", "eval", PYAPI3PyAPIevalStdMap, false, "Execute a simple Python script value", args(1,4, varargany("",0),arg("fptr",ptr),arg("expr",str),varargany("arg",0))),
    1713             :  pattern("batpyapi3map", "subeval_aggr", PYAPI3PyAPIevalAggrMap, false, "grouped aggregates through Python", args(1,4, varargany("",0),arg("fptr",ptr),arg("expr",str),varargany("arg",0))),
    1714             :  pattern("batpyapi3map", "eval_aggr", PYAPI3PyAPIevalAggrMap, false, "grouped aggregates through Python", args(1,4, varargany("",0),arg("fptr",ptr),arg("expr",str),varargany("arg",0))),
    1715             :  { .imp=NULL }
    1716             : };
    1717             : #include "mal_import.h"
    1718             : #ifdef _MSC_VER
    1719             : #undef read
    1720             : #pragma section(".CRT$XCU",read)
    1721             : #endif
    1722           8 : LIB_STARTUP_FUNC(init_pyapi3_mal)
    1723           8 : { mal_module("pyapi3", NULL, pyapi3_init_funcs); }

Generated by: LCOV version 1.14