LCOV - code coverage report
Current view: top level - sql/backends/monet5/UDF/pyapi3 - pyapi3.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 492 846 58.2 %
Date: 2020-06-29 20:00:14 Functions: 10 10 100.0 %

          Line data    Source code
       1             : /*
       2             :  * This Source Code Form is subject to the terms of the Mozilla Public
       3             :  * License, v. 2.0.  If a copy of the MPL was not distributed with this
       4             :  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
       5             :  *
       6             :  * Copyright 1997 - July 2008 CWI, August 2008 - 2020 MonetDB B.V.
       7             :  */
       8             : 
       9             : #include "monetdb_config.h"
      10             : #include "pyapi.h"
      11             : #include "connection.h"
      12             : 
      13             : #include "unicode.h"
      14             : #include "pytypes.h"
      15             : #include "type_conversion.h"
      16             : #include "formatinput.h"
      17             : #include "conversion.h"
      18             : #include "gdk_interprocess.h"
      19             : 
      20             : #ifdef HAVE_FORK
      21             : // These libraries are used for PYTHON_MAP when forking is enabled [to start new
      22             : // processes and wait on them]
      23             : #include <sys/types.h>
      24             : #include <sys/wait.h>
      25             : #endif
      26             : 
      27             : const char *fork_disableflag = "disable_fork";
      28             : bool option_disable_fork = false;
      29             : 
      30             : static PyObject *marshal_module = NULL;
      31             : PyObject *marshal_loads = NULL;
      32             : 
      33             : typedef struct _AggrParams{
      34             :         PyInput **pyinput_values;
      35             :         void ****split_bats;
      36             :         size_t **group_counts;
      37             :         str **args;
      38             :         PyObject **connection;
      39             :         PyObject **function;
      40             :         PyObject **column_types_dict;
      41             :         PyObject **result_objects;
      42             :         str *pycall;
      43             :         str msg;
      44             :         size_t base;
      45             :         size_t additional_columns;
      46             :         size_t named_columns;
      47             :         size_t columns;
      48             :         size_t group_count;
      49             :         size_t group_start;
      50             :         size_t group_end;
      51             :         MT_Id thread;
      52             : } AggrParams;
      53             : 
      54             : static void ComputeParallelAggregation(AggrParams *p);
      55             : static void CreateEmptyReturn(MalBlkPtr mb, MalStkPtr stk, InstrPtr pci,
      56             :                                                           size_t retcols, oid seqbase);
      57             : 
      58           1 : static const char *FunctionBasePath(void)
      59             : {
      60           1 :         const char *basepath = GDKgetenv("function_basepath");
      61           1 :         if (basepath == NULL) {
      62           1 :                 basepath = getenv("HOME");
      63             :         }
      64           1 :         if (basepath == NULL) {
      65           0 :                 basepath = "";
      66             :         }
      67           1 :         return basepath;
      68             : }
      69             : 
      70             : static MT_Lock pyapiLock = MT_LOCK_INITIALIZER("pyapiLock");
      71             : static bool pyapiInitialized = false;
      72             : 
      73          25 : bool PYFUNCNAME(PyAPIInitialized)(void) {
      74          25 :         return pyapiInitialized;
      75             : }
      76             : 
      77             : #ifdef HAVE_FORK
      78             : static bool python_call_active = false;
      79             : #endif
      80             : 
      81             : #ifdef WIN32
      82             : static bool enable_zerocopy_input = true;
      83             : static bool enable_zerocopy_output = false;
      84             : #else
      85             : static bool enable_zerocopy_input = true;
      86             : static bool enable_zerocopy_output = true;
      87             : #endif
      88             : 
      89             : static str
      90             : PyAPIeval(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci, bool grouped, bool mapped);
      91             : 
      92             : str
      93          87 : PYFUNCNAME(PyAPIevalStd)(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) {
      94          87 :         return PyAPIeval(cntxt, mb, stk, pci, false, false);
      95             : }
      96             : 
      97             : str
      98          33 : PYFUNCNAME(PyAPIevalStdMap)(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) {
      99          33 :         return PyAPIeval(cntxt, mb, stk, pci, false, true);
     100             : }
     101             : 
     102             : str
     103          17 : PYFUNCNAME(PyAPIevalAggr)(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) {
     104          17 :         return PyAPIeval(cntxt, mb, stk, pci, true, false);
     105             : }
     106             : 
     107             : str
     108           4 : PYFUNCNAME(PyAPIevalAggrMap)(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci) {
     109           4 :         return PyAPIeval(cntxt, mb, stk, pci, true, true);
     110             : }
     111             : 
     112             : #define NP_SPLIT_BAT(tpe)                                                      \
     113             :         {                                                                          \
     114             :                 tpe ***ptr = (tpe ***)split_bats;                                      \
     115             :                 size_t *temp_indices;                                                  \
     116             :                 tpe *batcontent = (tpe *)basevals;                                     \
     117             :                 /* allocate space for split BAT */                                     \
     118             :                 for (group_it = 0; group_it < group_count; group_it++) {               \
     119             :                         ptr[group_it][i] =                                                 \
     120             :                                 GDKzalloc(group_counts[group_it] * sizeof(tpe));               \
     121             :                 }                                                                      \
     122             :                 /*iterate over the elements of the current BAT*/                       \
     123             :                 temp_indices = GDKzalloc(sizeof(lng) * group_count);                   \
     124             :                 for (element_it = 0; element_it < elements; element_it++) {            \
     125             :                         /*group of current element*/                                       \
     126             :                         oid group = aggr_group_arr[element_it];                            \
     127             :                         /*append current element to proper group*/                         \
     128             :                         ptr[group][i][temp_indices[group]++] = batcontent[element_it];     \
     129             :                 }                                                                      \
     130             :                 GDKfree(temp_indices);                                                 \
     131             :         }
     132             : 
     133             : //! The main PyAPI function, this function does everything PyAPI related
     134             : //! It takes as argument a bunch of input BATs, a python function, and outputs a
     135             : //! number of BATs
     136             : //! This function follows the following pipeline
     137             : //! [PARSE_CODE] Step 1: It parses the Python source code and corrects any wrong
     138             : //! indentation, or converts the source code into a PyCodeObject if the source
     139             : //! code is encoded as such
     140             : //! [CONVERT_BAT] Step 2: It converts the input BATs into Numpy Arrays
     141             : //! [EXECUTE_CODE] Step 3: It executes the Python code using the Numpy arrays as arguments
     142             : //! [RETURN_VALUES] Step 4: It collects the return values and converts them back into BATs
     143             : //! If 'mapped' is set to True, it will fork a separate process at [FORK_PROCESS] that executes Step 1-3, the process will then write the return values into memory mapped files and exit, then Step 4 is executed by the main process
     144         141 : static str PyAPIeval(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci, bool grouped, bool mapped) {
     145         141 :         sql_func * sqlfun = NULL;
     146         141 :         str exprStr = NULL;
     147             : 
     148         141 :         const int additional_columns = 3;
     149         141 :         int i = 1, ai = 0;
     150         141 :         char *pycall = NULL;
     151         141 :         str *args;
     152         141 :         char *msg = MAL_SUCCEED;
     153         141 :         BAT *b = NULL;
     154         141 :         node *argnode = NULL;
     155         141 :         int seengrp = FALSE;
     156         141 :         PyObject *pArgs = NULL, *pColumns = NULL, *pColumnTypes = NULL,
     157             :                          *pConnection,
     158         141 :                          *pResult = NULL; // this is going to be the parameter tuple
     159         141 :         PyObject *code_object = NULL;
     160         141 :         PyReturn *pyreturn_values = NULL;
     161         141 :         PyInput *pyinput_values = NULL;
     162         141 :         oid seqbase = 0;
     163             : #ifdef HAVE_FORK
     164         141 :         char *mmap_ptr;
     165         141 :         QueryStruct *query_ptr = NULL;
     166         141 :         int query_sem = -1;
     167         141 :         int mmap_id = -1;
     168         141 :         size_t memory_size = 0;
     169         141 :         bool child_process = false;
     170         141 :         bool holds_gil = !mapped;
     171         141 :         void **mmap_ptrs = NULL;
     172         141 :         size_t *mmap_sizes = NULL;
     173             : #endif
     174         141 :         bool allow_loopback = !mapped;
     175         141 :         bit varres;
     176         141 :         int retcols;
     177         141 :         bool gstate = 0;
     178         141 :         int unnamedArgs = 0;
     179         141 :         bool parallel_aggregation = grouped && mapped;
     180         141 :         int argcount = pci->argc;
     181             : 
     182         141 :         char *eval_additional_args[] = {"_columns", "_column_types", "_conn"};
     183             : 
     184         141 :         mapped = false;
     185             : 
     186             : #ifndef HAVE_FORK
     187             :         (void)mapped;
     188             : #endif
     189             : 
     190         141 :         if (!pyapiInitialized) {
     191           0 :                 throw(MAL, "pyapi3.eval", SQLSTATE(PY000) "Embedded Python is enabled but an error was "
     192             :                                                                  "thrown during initialization.");
     193             :         }
     194         141 :         if (!grouped) {
     195         240 :                 sql_subfunc *sqlmorefun =
     196         120 :                         (*(sql_subfunc **)getArgReference(stk, pci, pci->retc));
     197         120 :                 if (sqlmorefun) {
     198         120 :                         sqlfun = sqlmorefun->func;
     199             :                 }
     200             :         } else {
     201          21 :                 sqlfun = *(sql_func **)getArgReference(stk, pci, pci->retc);
     202             :         }
     203         141 :         exprStr = *getArgReference_str(stk, pci, pci->retc + 1);
     204         141 :         varres = sqlfun ? sqlfun->varres : 0;
     205         141 :         retcols = !varres ? pci->retc : -1;
     206             : 
     207         141 :         args = (str *)GDKzalloc(pci->argc * sizeof(str));
     208         141 :         pyreturn_values = GDKzalloc(pci->retc * sizeof(PyReturn));
     209         141 :         if (args == NULL || pyreturn_values == NULL) {
     210           0 :                 throw(MAL, "pyapi3.eval", SQLSTATE(HY013) MAL_MALLOC_FAIL " arguments.");
     211             :         }
     212             : 
     213         141 :         if ((pci->argc - (pci->retc + 2)) * sizeof(PyInput) > 0) {
     214         204 :                 pyinput_values =
     215         102 :                         GDKzalloc((pci->argc - (pci->retc + 2)) * sizeof(PyInput));
     216             : 
     217         102 :                 if (pyinput_values == NULL) {
     218           0 :                         GDKfree(args);
     219           0 :                         GDKfree(pyreturn_values);
     220           0 :                         throw(MAL, "pyapi3.eval", SQLSTATE(HY013) MAL_MALLOC_FAIL " input values.");
     221             :                 }
     222             :         }
     223             : 
     224             :         // Analyse the SQL_Func structure to get the parameter names
     225         141 :         if (sqlfun != NULL && sqlfun->ops->cnt > 0) {
     226          89 :                 unnamedArgs = pci->retc + 2;
     227          89 :                 argnode = sqlfun->ops->h;
     228         217 :                 while (argnode) {
     229         128 :                         char *argname = ((sql_arg *)argnode->data)->name;
     230         128 :                         args[unnamedArgs++] = GDKstrdup(argname);
     231         128 :                         argnode = argnode->next;
     232             :                 }
     233          89 :                 if (parallel_aggregation && unnamedArgs < pci->argc) {
     234           4 :                         argcount = unnamedArgs;
     235             :                 } else {
     236             :                         parallel_aggregation = false;
     237             :                 }
     238             :         } else {
     239             :                 parallel_aggregation = false;
     240             :         }
     241             : 
     242             :         // We name all the unknown arguments, if grouping is enabled the first
     243             :         // unknown argument that is the group variable, we name this 'aggr_group'
     244         492 :         for (i = pci->retc + 2; i < argcount; i++) {
     245         351 :                 if (args[i] == NULL) {
     246         223 :                         if (!seengrp && grouped) {
     247           9 :                                 args[i] = GDKstrdup("aggr_group");
     248           9 :                                 seengrp = TRUE;
     249             :                         } else {
     250         214 :                                 char argbuf[64];
     251         214 :                                 snprintf(argbuf, sizeof(argbuf), "arg%i", i - pci->retc - 1);
     252         214 :                                 args[i] = GDKstrdup(argbuf);
     253             :                         }
     254             :                 }
     255             :         }
     256             : 
     257             :         // Construct PyInput objects, we do this before any multiprocessing because
     258             :         // there is some locking going on in there, and locking + forking = bad idea
     259             :         // (a thread can fork while another process is in the lock, which means we
     260             :         // can get stuck permanently)
     261         141 :         argnode = sqlfun && sqlfun->ops->cnt > 0 ? sqlfun->ops->h : NULL;
     262         490 :         for (i = pci->retc + 2; i < argcount; i++) {
     263         351 :                 PyInput *inp = &pyinput_values[i - (pci->retc + 2)];
     264         351 :                 if (!isaBatType(getArgType(mb, pci, i))) {
     265          47 :                         inp->scalar = true;
     266          47 :                         inp->bat_type = getArgType(mb, pci, i);
     267          47 :                         inp->count = 1;
     268          47 :                         if (inp->bat_type == TYPE_str) {
     269           0 :                                 inp->dataptr = getArgReference_str(stk, pci, i);
     270             :                         } else {
     271          47 :                                 inp->dataptr = getArgReference(stk, pci, i);
     272             :                         }
     273             :                 } else {
     274         304 :                         b = BATdescriptor(*getArgReference_bat(stk, pci, i));
     275         304 :                         if (b == NULL) {
     276           0 :                                 msg = createException(
     277             :                                         MAL, "pyapi3.eval",
     278             :                                         SQLSTATE(PY000) "The BAT passed to the function (argument #%d) is NULL.\n",
     279           0 :                                         i - (pci->retc + 2) + 1);
     280           0 :                                 goto wrapup;
     281             :                         }
     282         304 :                         seqbase = b->hseqbase;
     283         304 :                         inp->count = BATcount(b);
     284         304 :                         inp->bat_type = b->ttype;
     285         304 :                         inp->bat = b;
     286         304 :                         if (inp->count == 0) {
     287             :                                 // one of the input BATs is empty, don't execute the function at
     288             :                                 // all
     289             :                                 // just return empty BATs
     290           2 :                                 CreateEmptyReturn(mb, stk, pci, retcols, seqbase);
     291           2 :                                 goto wrapup;
     292             :                         }
     293             :                 }
     294         349 :                 if (argnode) {
     295         126 :                         inp->sql_subtype = &((sql_arg *)argnode->data)->type;
     296         126 :                         argnode = argnode->next;
     297             :                 }
     298             :         }
     299             : 
     300             : #ifdef HAVE_FORK
     301         139 :         if (!option_disable_fork) {
     302         139 :                 if (!mapped && !parallel_aggregation) {
     303         135 :                         MT_lock_set(&pyapiLock);
     304         135 :                         if (python_call_active) {
     305             :                                 mapped = true;
     306             :                                 holds_gil = false;
     307             :                         } else {
     308         135 :                                 python_call_active = true;
     309         135 :                                 holds_gil = true;
     310             :                         }
     311         135 :                         MT_lock_unset(&pyapiLock);
     312             :                 }
     313             :         } else {
     314             :                 mapped = false;
     315             :                 holds_gil = true;
     316             :         }
     317             : #endif
     318             : 
     319             : #ifdef HAVE_FORK
     320             :         /*[FORK_PROCESS]*/
     321         135 :         if (mapped) {
     322           0 :                 lng pid;
     323             :                 // we need 3 + pci->retc * 2 shared memory spaces
     324             :                 // the first is for the header information
     325             :                 // the second for query struct information
     326             :                 // the third is for query results
     327             :                 // the remaining pci->retc * 2 is one for each return BAT, and one for
     328             :                 // each return mask array
     329           0 :                 int mmap_count = 4 + pci->retc * 2;
     330             : 
     331             :                 // create initial shared memory
     332           0 :                 mmap_id = GDKuniqueid(mmap_count);
     333             : 
     334           0 :                 mmap_ptrs = GDKzalloc(mmap_count * sizeof(void *));
     335           0 :                 mmap_sizes = GDKzalloc(mmap_count * sizeof(size_t));
     336           0 :                 if (mmap_ptrs == NULL || mmap_sizes == NULL) {
     337           0 :                         msg = createException(MAL, "pyapi3.eval",
     338             :                                                                   SQLSTATE(HY013) MAL_MALLOC_FAIL " mmap values.");
     339           0 :                         goto wrapup;
     340             :                 }
     341             : 
     342           0 :                 memory_size =
     343           0 :                         pci->retc * sizeof(ReturnBatDescr); // the memory size for the
     344             :                                                                                                 // header files, each process
     345             :                                                                                                 // has one per return value
     346             : 
     347           0 :                 assert(memory_size > 0);
     348             :                 // create the shared memory for the header
     349           0 :                 MT_lock_set(&pyapiLock);
     350           0 :                 mmap_ptrs[0] = GDKinitmmap(mmap_id + 0, memory_size, &mmap_sizes[0]);
     351           0 :                 MT_lock_unset(&pyapiLock);
     352           0 :                 if (mmap_ptrs[0] == NULL) {
     353           0 :                         msg = createException(MAL, "pyapi3.eval", GDK_EXCEPTION);
     354           0 :                         goto wrapup;
     355             :                 }
     356           0 :                 mmap_ptr = mmap_ptrs[0];
     357             : 
     358             :                 // create the cross-process semaphore used for signaling queries
     359             :                 // we need two semaphores
     360             :                 // the main process waits on the first one (exiting when a query is
     361             :                 // requested or the child process is done)
     362             :                 // the forked process waits for the second one when it requests a query
     363             :                 // (waiting for the result of the query)
     364           0 :                 if (GDKcreatesem(mmap_id, 2, &query_sem) != GDK_SUCCEED) {
     365           0 :                         msg = createException(MAL, "pyapi3.eval", GDK_EXCEPTION);
     366           0 :                         goto wrapup;
     367             :                 }
     368             : 
     369             :                 // create the shared memory space for queries
     370           0 :                 MT_lock_set(&pyapiLock);
     371           0 :                 mmap_ptrs[1] = GDKinitmmap(mmap_id + 1, sizeof(QueryStruct),
     372             :                                                  &mmap_sizes[1]);
     373           0 :                 MT_lock_unset(&pyapiLock);
     374           0 :                 if (mmap_ptrs[1] == NULL) {
     375           0 :                         msg = createException(MAL, "pyapi3.eval", GDK_EXCEPTION);
     376           0 :                         goto wrapup;
     377             :                 }
     378           0 :                 query_ptr = mmap_ptrs[1];
     379           0 :                 query_ptr->pending_query = false;
     380           0 :                 query_ptr->query[0] = '\0';
     381           0 :                 query_ptr->mmapid = -1;
     382           0 :                 query_ptr->memsize = 0;
     383             : 
     384             :                 // fork
     385           0 :                 MT_lock_set(&pyapiLock);
     386           0 :                 gstate = Python_ObtainGIL(); // we need the GIL before forking,
     387             :                                                                          // otherwise it can get stuck in the forked
     388             :                                                                          // child
     389           0 :                 if ((pid = fork()) < 0) {
     390           0 :                         msg = createException(MAL, "pyapi3.eval", SQLSTATE(PY000) "Failed to fork process");
     391           0 :                         MT_lock_unset(&pyapiLock);
     392             : 
     393           0 :                         goto wrapup;
     394           0 :                 } else if (pid == 0) {
     395           0 :                         child_process = true;
     396           0 :                         query_ptr = NULL;
     397           0 :                         if ((query_ptr = GDKinitmmap(mmap_id + 1, sizeof(QueryStruct),
     398             :                                                                                  NULL)) == NULL) {
     399           0 :                                 msg = createException(MAL, "pyapi3.eval", GDK_EXCEPTION);
     400           0 :                                 goto wrapup;
     401             :                         }
     402             :                 } else {
     403           0 :                         gstate = Python_ReleaseGIL(gstate);
     404             :                 }
     405           0 :                 if (!child_process) {
     406             :                         // main process
     407           0 :                         int status;
     408           0 :                         bool success = true;
     409           0 :                         bool sem_success = false;
     410           0 :                         pid_t retcode = 0;
     411             : 
     412             :                         // release the GIL in the main process
     413           0 :                         MT_lock_unset(&pyapiLock);
     414             : 
     415           0 :                         while (true) {
     416             :                                 // wait for the child to finish
     417             :                                 // note that we use a timeout here in case the child crashes for
     418             :                                 // some reason
     419             :                                 // in this case the semaphore value is never increased, so we
     420             :                                 // would be stuck otherwise
     421           0 :                                 if (GDKchangesemval_timeout(query_sem, 0, -1, 100, &sem_success) != GDK_SUCCEED) {
     422           0 :                                         msg = createException(MAL, "pyapi3.eval", GDK_EXCEPTION);
     423           0 :                                         goto wrapup;
     424             :                                 }
     425           0 :                                 if (sem_success) {
     426             :                                         break;
     427             :                                 }
     428           0 :                                 retcode = waitpid(pid, &status, WNOHANG);
     429           0 :                                 if (retcode > 0)
     430             :                                         break; // we have successfully waited for the child to exit
     431           0 :                                 if (retcode < 0) {
     432             :                                         // error message
     433           0 :                                         const char *err = GDKstrerror(errno, (char[128]){0}, 128);
     434           0 :                                         sem_success = 0;
     435           0 :                                         errno = 0;
     436           0 :                                         msg = createException(
     437             :                                                 MAL, "waitpid",
     438             :                                                 SQLSTATE(PY000) "Error calling waitpid(" LLFMT ", &status, WNOHANG): %s",
     439             :                                                 pid, err);
     440           0 :                                         break;
     441             :                                 }
     442             :                         }
     443           0 :                         if (sem_success)
     444           0 :                                 waitpid(pid, &status, 0);
     445             : 
     446           0 :                         if (status != 0)
     447           0 :                                 success = false;
     448             : 
     449           0 :                         if (!success) {
     450             :                                 // a child failed, get the error message from the child
     451           0 :                                 ReturnBatDescr *descr = &(((ReturnBatDescr *)mmap_ptr)[0]);
     452             : 
     453           0 :                                 if (descr->bat_size == 0) {
     454           0 :                                         msg = createException(
     455             :                                                 MAL, "pyapi3.eval",
     456             :                                                 SQLSTATE(PY000) "Failure in child process with unknown error.");
     457           0 :                                 } else if ((mmap_ptrs[3] = GDKinitmmap(mmap_id + 3, descr->bat_size,
     458             :                                                                                                            &mmap_sizes[3])) != NULL) {
     459           0 :                                         msg = createException(MAL, "pyapi3.eval", SQLSTATE(PY000) "%s",
     460             :                                                                                   (char *)mmap_ptrs[3]);
     461             :                                 } else {
     462           0 :                                         msg = createException(MAL, "pyapi3.eval", GDK_EXCEPTION);
     463             :                                 }
     464           0 :                                 goto wrapup;
     465             :                         }
     466             : 
     467             :                         // collect return values
     468           0 :                         for (i = 0; i < pci->retc; i++) {
     469           0 :                                 PyReturn *ret = &pyreturn_values[i];
     470           0 :                                 ReturnBatDescr *descr = &(((ReturnBatDescr *)mmap_ptr)[i]);
     471           0 :                                 size_t total_size = 0;
     472           0 :                                 bool has_mask = false;
     473           0 :                                 ret->count = 0;
     474           0 :                                 ret->mmap_id = mmap_id + i + 3;
     475           0 :                                 ret->memory_size = 0;
     476           0 :                                 ret->result_type = 0;
     477             : 
     478           0 :                                 ret->count = descr->bat_count;
     479           0 :                                 total_size = descr->bat_size;
     480             : 
     481           0 :                                 ret->memory_size = descr->element_size;
     482           0 :                                 ret->result_type = descr->npy_type;
     483           0 :                                 has_mask = has_mask || descr->has_mask;
     484             : 
     485             :                                 // get the shared memory address for this return value
     486           0 :                                 assert(total_size > 0);
     487           0 :                                 MT_lock_set(&pyapiLock);
     488           0 :                                 mmap_ptrs[i + 3] = GDKinitmmap(mmap_id + i + 3, total_size,
     489           0 :                                                                                            &mmap_sizes[i + 3]);
     490           0 :                                 MT_lock_unset(&pyapiLock);
     491           0 :                                 if (mmap_ptrs[i + 3] == NULL) {
     492           0 :                                         msg = createException(MAL, "pyapi3.eval", GDK_EXCEPTION);
     493           0 :                                         goto wrapup;
     494             :                                 }
     495           0 :                                 ret->array_data = mmap_ptrs[i + 3];
     496           0 :                                 ret->mask_data = NULL;
     497           0 :                                 ret->numpy_array = NULL;
     498           0 :                                 ret->numpy_mask = NULL;
     499           0 :                                 ret->multidimensional = FALSE;
     500           0 :                                 if (has_mask) {
     501           0 :                                         size_t mask_size = ret->count * sizeof(bool);
     502             : 
     503           0 :                                         assert(mask_size > 0);
     504           0 :                                         MT_lock_set(&pyapiLock);
     505           0 :                                         mmap_ptrs[pci->retc + (i + 3)] = GDKinitmmap(
     506           0 :                                                 mmap_id + pci->retc + (i + 3), mask_size,
     507           0 :                                                 &mmap_sizes[pci->retc + (i + 3)]);
     508           0 :                                         MT_lock_unset(&pyapiLock);
     509           0 :                                         if (mmap_ptrs[pci->retc + (i + 3)] == NULL) {
     510           0 :                                                 msg = createException(MAL, "pyapi3.eval", GDK_EXCEPTION);
     511           0 :                                                 goto wrapup;
     512             :                                         }
     513           0 :                                         ret->mask_data = mmap_ptrs[pci->retc + (i + 3)];
     514             :                                 }
     515             :                         }
     516           0 :                         msg = MAL_SUCCEED;
     517             : 
     518           0 :                         goto returnvalues;
     519             :                 }
     520             :         }
     521             : #endif
     522             : 
     523             :         // After this point we will execute Python Code, so we need to acquire the
     524             :         // GIL
     525         139 :         if (!mapped) {
     526         139 :                 gstate = Python_ObtainGIL();
     527             :         }
     528             : 
     529         139 :         if (sqlfun) {
     530             :                 // Check if exprStr references to a file path or if it contains the
     531             :                 // Python code itself
     532             :                 // There is no easy way to check, so the rule is if it starts with '/'
     533             :                 // it is always a file path,
     534             :                 // Otherwise it's a (relative) file path only if it ends with '.py'
     535         139 :                 size_t length = strlen(exprStr);
     536         139 :                 if (exprStr[0] == '/' ||
     537         139 :                         (exprStr[length - 3] == '.' && exprStr[length - 2] == 'p' &&
     538           1 :                          exprStr[length - 1] == 'y')) {
     539           1 :                         FILE *fp;
     540           1 :                         char address[1000];
     541           1 :                         struct stat buffer;
     542           1 :                         ssize_t length;
     543           1 :                         if (exprStr[0] == '/') {
     544             :                                 // absolute path
     545           0 :                                 snprintf(address, 1000, "%s", exprStr);
     546             :                         } else {
     547             :                                 // relative path
     548           1 :                                 snprintf(address, 1000, "%s/%s", FunctionBasePath(), exprStr);
     549             :                         }
     550           1 :                         if (stat(address, &buffer) < 0) {
     551           0 :                                 msg = createException(
     552             :                                         MAL, "pyapi3.eval",
     553             :                                         SQLSTATE(PY000) "Could not find Python source file \"%s\".", address);
     554           0 :                                 goto wrapup;
     555             :                         }
     556           1 :                         fp = fopen(address, "r");
     557           1 :                         if (fp == NULL) {
     558           0 :                                 msg = createException(
     559             :                                         MAL, "pyapi3.eval",
     560             :                                         SQLSTATE(PY000) "Could not open Python source file \"%s\".", address);
     561           0 :                                 goto wrapup;
     562             :                         }
     563           1 :                         if(fseek(fp, 0, SEEK_END) == -1) {
     564           0 :                                 msg = createException(
     565             :                                                 MAL, "pyapi3.eval",
     566             :                                                 SQLSTATE(PY000) "Failed to set file pointer on Python source file \"%s\".", address);
     567           0 :                                 goto wrapup;
     568             :                         }
     569           1 :                         if((length = ftell(fp)) == -1) {
     570           0 :                                 msg = createException(
     571             :                                                 MAL, "pyapi3.eval",
     572             :                                                 SQLSTATE(PY000) "Failed to set file pointer on Python source file \"%s\".", address);
     573           0 :                                 goto wrapup;
     574             :                         }
     575           1 :                         if(fseek(fp, 0, SEEK_SET) == -1) {
     576           0 :                                 msg = createException(
     577             :                                                 MAL, "pyapi3.eval",
     578             :                                                 SQLSTATE(PY000) "Failed to set file pointer on Python source file \"%s\".", address);
     579           0 :                                 goto wrapup;
     580             :                         }
     581           1 :                         exprStr = GDKzalloc(length + 1);
     582           1 :                         if (exprStr == NULL) {
     583           0 :                                 msg = createException(MAL, "pyapi3.eval",
     584             :                                                                           SQLSTATE(HY013) MAL_MALLOC_FAIL " function body string.");
     585           0 :                                 goto wrapup;
     586             :                         }
     587           1 :                         if (fread(exprStr, 1, (size_t) length, fp) != (size_t) length) {
     588           0 :                                 msg = createException(MAL, "pyapi3.eval",
     589             :                                                                           SQLSTATE(PY000) "Failed to read from file \"%s\".",
     590             :                                                                           address);
     591           0 :                                 goto wrapup;
     592             :                         }
     593           1 :                         fclose(fp);
     594             :                 }
     595             :         }
     596             : 
     597             :         /*[PARSE_CODE]*/
     598         139 :         pycall = FormatCode(exprStr, args, argcount, 4, &code_object, &msg,
     599             :                                                 eval_additional_args, additional_columns);
     600         139 :         if (pycall == NULL && code_object == NULL) {
     601           0 :                 if (msg == NULL) {
     602           0 :                         msg = createException(MAL, "pyapi3.eval",
     603             :                                                                   SQLSTATE(PY000) "Error while parsing Python code.");
     604             :                 }
     605           0 :                 goto wrapup;
     606             :         }
     607             : 
     608             :         /*[CONVERT_BAT]*/
     609             :         // Now we will do the input handling (aka converting the input BATs to numpy
     610             :         // arrays)
     611             :         // We will put the python arrays in a PyTuple object, we will use this
     612             :         // PyTuple object as the set of arguments to call the Python function
     613         139 :         pArgs = PyTuple_New(argcount - (pci->retc + 2) +
     614         139 :                                                 (code_object == NULL ? additional_columns : 0));
     615         139 :         pColumns = PyDict_New();
     616         139 :         pColumnTypes = PyDict_New();
     617             : #ifdef HAVE_FORK
     618         139 :         pConnection = Py_Connection_Create(cntxt, !allow_loopback, query_ptr, query_sem);
     619             : #else
     620             :         pConnection = Py_Connection_Create(cntxt, !allow_loopback, 0, 0);
     621             : #endif
     622             : 
     623             :         // Now we will loop over the input BATs and convert them to python objects
     624         488 :         for (i = pci->retc + 2; i < argcount; i++) {
     625         349 :                 PyObject *result_array;
     626             :                 // t_start and t_end hold the part of the BAT we will convert to a Numpy
     627             :                 // array, by default these hold the entire BAT [0 - BATcount(b)]
     628         349 :                 size_t t_start = 0, t_end = pyinput_values[i - (pci->retc + 2)].count;
     629             : 
     630             :                 // There are two possibilities, either the input is a BAT, or the input
     631             :                 // is a scalar
     632             :                 // If the input is a scalar we will convert it to a python scalar
     633             :                 // If the input is a BAT, we will convert it to a numpy array
     634         349 :                 if (pyinput_values[i - (pci->retc + 2)].scalar) {
     635          47 :                         result_array = PyArrayObject_FromScalar(
     636             :                                 &pyinput_values[i - (pci->retc + 2)], &msg);
     637             :                 } else {
     638         302 :                         int type = pyinput_values[i - (pci->retc + 2)].bat_type;
     639         302 :                         result_array = PyMaskedArray_FromBAT(
     640             :                                 &pyinput_values[i - (pci->retc + 2)], t_start, t_end, &msg,
     641         302 :                                 !enable_zerocopy_input && type != TYPE_void);
     642             :                 }
     643         349 :                 if (result_array == NULL) {
     644           0 :                         if (msg == MAL_SUCCEED) {
     645           0 :                                 msg = createException(MAL, "pyapi3.eval",
     646             :                                                                           SQLSTATE(PY000) "Failed to create Numpy Array from BAT.");
     647             :                         }
     648           0 :                         goto wrapup;
     649             :                 }
     650         349 :                 if (code_object == NULL) {
     651         349 :                         PyObject *arg_type = PyString_FromString(
     652         349 :                                 BatType_Format(pyinput_values[i - (pci->retc + 2)].bat_type));
     653         349 :                         PyDict_SetItemString(pColumns, args[i], result_array);
     654         349 :                         PyDict_SetItemString(pColumnTypes, args[i], arg_type);
     655         349 :                         Py_DECREF(arg_type);
     656             :                 }
     657         349 :                 pyinput_values[i - (pci->retc + 2)].result = result_array;
     658         349 :                 PyTuple_SetItem(pArgs, ai++, result_array);
     659             :         }
     660         139 :         if (code_object == NULL) {
     661         139 :                 PyTuple_SetItem(pArgs, ai++, pColumns);
     662         139 :                 PyTuple_SetItem(pArgs, ai++, pColumnTypes);
     663         139 :                 PyTuple_SetItem(pArgs, ai++, pConnection);
     664             :         }
     665             : 
     666             :         /*[EXECUTE_CODE]*/
     667             :         // Now it is time to actually execute the python code
     668             :         {
     669         139 :                 PyObject *pFunc, *pModule, *v, *d;
     670             : 
     671             :                 // First we will load the main module, this is required
     672         139 :                 pModule = PyImport_AddModule("__main__");
     673         139 :                 if (!pModule) {
     674           0 :                         msg = PyError_CreateException("Failed to load module", NULL);
     675          15 :                         goto wrapup;
     676             :                 }
     677             : 
     678             :                 // Now we will add the UDF to the main module
     679         139 :                 d = PyModule_GetDict(pModule);
     680         139 :                 if (code_object == NULL) {
     681         139 :                         v = PyRun_StringFlags(pycall, Py_file_input, d, d, NULL);
     682         139 :                         if (v == NULL) {
     683           2 :                                 msg = PyError_CreateException("Could not parse Python code",
     684             :                                                                                           pycall);
     685           2 :                                 goto wrapup;
     686             :                         }
     687         137 :                         Py_DECREF(v);
     688             : 
     689             :                         // Now we need to obtain a pointer to the function, the function is
     690             :                         // called "pyfun"
     691         137 :                         pFunc = PyObject_GetAttrString(pModule, "pyfun");
     692         137 :                         if (!pFunc || !PyCallable_Check(pFunc)) {
     693           0 :                                 msg = PyError_CreateException("Failed to load function", NULL);
     694           0 :                                 goto wrapup;
     695             :                         }
     696             :                 } else {
     697           0 :                         pFunc = PyFunction_New(code_object, d);
     698           0 :                         if (!pFunc || !PyCallable_Check(pFunc)) {
     699           0 :                                 msg = PyError_CreateException("Failed to load function", NULL);
     700           0 :                                 goto wrapup;
     701             :                         }
     702             :                 }
     703             : 
     704         137 :                 if (parallel_aggregation) {
     705             :                         // parallel aggregation, we run the function once for every group in
     706             :                         // parallel
     707           4 :                         BAT *aggr_group = NULL, *group_first_occurrence = NULL;
     708           4 :                         size_t group_count, elements, element_it, group_it;
     709           4 :                         size_t *group_counts = NULL;
     710           4 :                         oid *aggr_group_arr = NULL;
     711           4 :                         void ***split_bats = NULL;
     712           4 :                         int named_columns = unnamedArgs - (pci->retc + 2);
     713           4 :                         PyObject *aggr_result;
     714             : 
     715             :                         // release the GIL
     716           4 :                         gstate = Python_ReleaseGIL(gstate);
     717             : 
     718             :                         // the first unnamed argument has the group numbers for every row
     719           4 :                         aggr_group =
     720           4 :                                 BATdescriptor(*getArgReference_bat(stk, pci, unnamedArgs));
     721             :                         // the second unnamed argument has the first occurrence of every
     722             :                         // group number, we just use this to get the total amount of groups
     723             :                         // quickly
     724           4 :                         group_first_occurrence =
     725           4 :                                 BATdescriptor(*getArgReference_bat(stk, pci, unnamedArgs + 1));
     726           4 :                         group_count = BATcount(group_first_occurrence);
     727           4 :                         BBPunfix(group_first_occurrence->batCacheid);
     728           4 :                         elements = BATcount(aggr_group); // get the amount of groups
     729             : 
     730             :                         // now we count, for every group, how many elements it has
     731           4 :                         group_counts = GDKzalloc(group_count * sizeof(size_t));
     732           4 :                         if (group_counts == NULL) {
     733           0 :                                 msg = createException(MAL, "pyapi3.eval",
     734             :                                                                           SQLSTATE(HY013) MAL_MALLOC_FAIL " group count array.");
     735           0 :                                 goto aggrwrapup;
     736             :                         }
     737             : 
     738           4 :                         aggr_group_arr = (oid *)aggr_group->theap.base;
     739          32 :                         for (element_it = 0; element_it < elements; element_it++) {
     740          28 :                                 group_counts[aggr_group_arr[element_it]]++;
     741             :                         }
     742             : 
     743             :                         // now perform the actual splitting of the data, first construct
     744             :                         // room for splits for every group
     745             :                         // elements are structured as follows:
     746             :                         // split_bats [groupnr] [columnnr] [elementnr]
     747           4 :                         split_bats = GDKzalloc(group_count * sizeof(void *));
     748          20 :                         for (group_it = 0; group_it < group_count; group_it++) {
     749          32 :                                 split_bats[group_it] =
     750          16 :                                         GDKzalloc(sizeof(void *) * named_columns);
     751             :                         }
     752             : 
     753             :                         // now split the columns one by one
     754           8 :                         for (i = 0; i < named_columns; i++) {
     755           4 :                                 PyInput input = pyinput_values[i];
     756           4 :                                 void *basevals = input.bat->theap.base;
     757             : 
     758           4 :                                 if (!input.scalar) {
     759           4 :                                         switch (input.bat_type) {
     760           0 :                                                 case TYPE_void:
     761           0 :                                                         NP_SPLIT_BAT(oid);
     762           0 :                                                         break;
     763           0 :                                                 case TYPE_bit:
     764           0 :                                                         NP_SPLIT_BAT(bit);
     765           0 :                                                         break;
     766           0 :                                                 case TYPE_bte:
     767           0 :                                                         NP_SPLIT_BAT(bte);
     768           0 :                                                         break;
     769           0 :                                                 case TYPE_sht:
     770           0 :                                                         NP_SPLIT_BAT(sht);
     771           0 :                                                         break;
     772           1 :                                                 case TYPE_int:
     773          13 :                                                         NP_SPLIT_BAT(int);
     774           1 :                                                         break;
     775           0 :                                                 case TYPE_oid:
     776           0 :                                                         NP_SPLIT_BAT(oid);
     777           0 :                                                         break;
     778           0 :                                                 case TYPE_lng:
     779           0 :                                                         NP_SPLIT_BAT(lng);
     780           0 :                                                         break;
     781           0 :                                                 case TYPE_flt:
     782           0 :                                                         NP_SPLIT_BAT(flt);
     783           0 :                                                         break;
     784           2 :                                                 case TYPE_dbl:
     785          26 :                                                         NP_SPLIT_BAT(dbl);
     786           2 :                                                         break;
     787             : #ifdef HAVE_HGE
     788           0 :                                                 case TYPE_hge:
     789           0 :                                                         basevals =
     790           0 :                                                                 PyArray_BYTES((PyArrayObject *)input.result);
     791           0 :                                                         NP_SPLIT_BAT(dbl);
     792           0 :                                                         break;
     793             : #endif
     794           1 :                                                 case TYPE_str: {
     795           1 :                                                         PyObject ****ptr = (PyObject ****)split_bats;
     796           1 :                                                         size_t *temp_indices;
     797           1 :                                                         PyObject **batcontent = (PyObject **)PyArray_DATA(
     798           1 :                                                                 (PyArrayObject *)input.result);
     799             :                                                         // allocate space for split BAT
     800           5 :                                                         for (group_it = 0; group_it < group_count;
     801           4 :                                                                  group_it++) {
     802           8 :                                                                 ptr[group_it][i] =
     803           4 :                                                                         GDKzalloc(group_counts[group_it] *
     804             :                                                                                           sizeof(PyObject *));
     805             :                                                         }
     806             :                                                         // iterate over the elements of the current BAT
     807           9 :                                                         temp_indices =
     808           1 :                                                                 GDKzalloc(sizeof(PyObject *) * group_count);
     809           8 :                                                         for (element_it = 0; element_it < elements;
     810           7 :                                                                  element_it++) {
     811             :                                                                 // group of current element
     812           7 :                                                                 oid group = aggr_group_arr[element_it];
     813             :                                                                 // append current element to proper group
     814           7 :                                                                 ptr[group][i][temp_indices[group]++] =
     815           7 :                                                                         batcontent[element_it];
     816             :                                                         }
     817           1 :                                                         GDKfree(temp_indices);
     818           1 :                                                         break;
     819             :                                                 }
     820           0 :                                                 default:
     821           0 :                                                         msg = createException(
     822             :                                                                 MAL, "pyapi3.eval", SQLSTATE(PY000) "Unrecognized BAT type %s",
     823             :                                                                 BatType_Format(input.bat_type));
     824           0 :                                                         goto aggrwrapup;
     825           4 :                                                         break;
     826             :                                         }
     827           0 :                                 }
     828             :                         }
     829             : 
     830             :                         {
     831           4 :                                 int res = 0;
     832           4 :                                 size_t threads = 8; // GDKgetenv("gdk_nr_threads");
     833           4 :                                 size_t thread_it;
     834           4 :                                 size_t result_it;
     835           4 :                                 AggrParams *parameters;
     836           4 :                                 PyObject **results;
     837           4 :                                 double current = 0.0;
     838           4 :                                 double increment;
     839             : 
     840             :                                 // if there are less groups than threads, limit threads to
     841             :                                 // amount of groups
     842           4 :                                 threads = group_count < threads ? group_count : threads;
     843             : 
     844           4 :                                 increment = (double)group_count / (double)threads;
     845             :                                 // start running the threads
     846           4 :                                 parameters = GDKzalloc(threads * sizeof(AggrParams));
     847           4 :                                 results = GDKzalloc(group_count * sizeof(PyObject *));
     848          20 :                                 for (thread_it = 0; thread_it < threads; thread_it++) {
     849          16 :                                         AggrParams *params = &parameters[thread_it];
     850          16 :                                         params->named_columns = named_columns;
     851          16 :                                         params->additional_columns = additional_columns;
     852          16 :                                         params->group_count = group_count;
     853          16 :                                         params->group_counts = &group_counts;
     854          16 :                                         params->pyinput_values = &pyinput_values;
     855          16 :                                         params->column_types_dict = &pColumnTypes;
     856          16 :                                         params->split_bats = &split_bats;
     857          16 :                                         params->base = pci->retc + 2;
     858          16 :                                         params->function = &pFunc;
     859          16 :                                         params->connection = &pConnection;
     860          16 :                                         params->pycall = &pycall;
     861          16 :                                         params->group_start = (size_t)floor(current);
     862          16 :                                         params->group_end = (size_t)floor(current += increment);
     863          16 :                                         params->args = &args;
     864          16 :                                         params->msg = NULL;
     865          16 :                                         params->result_objects = results;
     866          16 :                                         res = MT_create_thread(&params->thread,
     867             :                                                                                    (void (*)(void *)) &
     868             :                                                                                            ComputeParallelAggregation,
     869             :                                                                                    params, MT_THR_JOINABLE,
     870             :                                                                                    "pyapi_par_aggr");
     871          16 :                                         if (res != 0) {
     872           0 :                                                 msg = createException(MAL, "pyapi3.eval",
     873             :                                                                                           SQLSTATE(PY000) "Failed to start thread.");
     874           0 :                                                 goto aggrwrapup;
     875             :                                         }
     876             :                                 }
     877          20 :                                 for (thread_it = 0; thread_it < threads; thread_it++) {
     878          16 :                                         AggrParams params = parameters[thread_it];
     879          16 :                                         int res = MT_join_thread(params.thread);
     880          16 :                                         if (res != 0) {
     881           0 :                                                 msg = createException(MAL, "pyapi3.eval",
     882             :                                                                                           "Failed to join thread.");
     883           0 :                                                 goto aggrwrapup;
     884             :                                         }
     885             :                                 }
     886             : 
     887          20 :                                 for (thread_it = 0; thread_it < threads; thread_it++) {
     888          16 :                                         AggrParams params = parameters[thread_it];
     889          16 :                                         if (results[thread_it] == NULL || params.msg != NULL) {
     890           0 :                                                 msg = params.msg;
     891           0 :                                                 goto wrapup;
     892             :                                         }
     893             :                                 }
     894             : 
     895             :                                 // we need the GIL again to group the parameters
     896           4 :                                 gstate = Python_ObtainGIL();
     897             : 
     898           4 :                                 aggr_result = PyList_New(group_count);
     899          24 :                                 for (result_it = 0; result_it < group_count; result_it++) {
     900          16 :                                         PyList_SetItem(aggr_result, result_it, results[result_it]);
     901             :                                 }
     902           4 :                                 GDKfree(parameters);
     903           4 :                                 GDKfree(results);
     904             :                         }
     905           4 :                         pResult = PyList_New(1);
     906           4 :                         PyList_SetItem(pResult, 0, aggr_result);
     907             : 
     908           4 :                 aggrwrapup:
     909           4 :                         if (group_counts != NULL) {
     910           4 :                                 GDKfree(group_counts);
     911             :                         }
     912           4 :                         if (split_bats != NULL) {
     913          20 :                                 for (group_it = 0; group_it < group_count; group_it++) {
     914          16 :                                         if (split_bats[group_it] != NULL) {
     915          32 :                                                 for (i = 0; i < named_columns; i++) {
     916          16 :                                                         if (split_bats[group_it][i] != NULL) {
     917          16 :                                                                 GDKfree(split_bats[group_it][i]);
     918             :                                                         }
     919             :                                                 }
     920          16 :                                                 GDKfree(split_bats[group_it]);
     921             :                                         }
     922             :                                 }
     923           4 :                                 GDKfree(split_bats);
     924             :                         }
     925           4 :                         if (aggr_group != NULL) {
     926           4 :                                 BBPunfix(aggr_group->batCacheid);
     927             :                         }
     928           4 :                         if (msg != MAL_SUCCEED) {
     929           0 :                                 goto wrapup;
     930             :                         }
     931             :                 } else {
     932             :                         // The function has been successfully created/compiled, all that
     933             :                         // remains is to actually call the function
     934         133 :                         pResult = PyObject_CallObject(pFunc, pArgs);
     935             :                 }
     936             : 
     937         137 :                 Py_DECREF(pFunc);
     938         137 :                 Py_DECREF(pArgs);
     939             : 
     940         137 :                 if (PyErr_Occurred()) {
     941          10 :                         msg = PyError_CreateException("Python exception", pycall);
     942          10 :                         if (code_object == NULL) {
     943          10 :                                 PyRun_SimpleString("del pyfun");
     944             :                         }
     945          10 :                         goto wrapup;
     946             :                 }
     947             : 
     948             :                 // if (code_object == NULL) { PyRun_SimpleString("del pyfun"); }
     949             : 
     950         127 :                 if (PyDict_Check(pResult)) { // Handle dictionary returns
     951             :                         // For dictionary returns we need to map each of the (key,value)
     952             :                         // pairs to the proper return value
     953             :                         // We first analyze the SQL Function structure for a list of return
     954             :                         // value names
     955          35 :                         char **retnames = NULL;
     956          35 :                         if (!varres) {
     957          35 :                                 if (sqlfun != NULL) {
     958          35 :                                         retnames = GDKzalloc(sizeof(char *) * sqlfun->res->cnt);
     959          35 :                                         argnode = sqlfun->res->h;
     960         122 :                                         for (i = 0; i < sqlfun->res->cnt; i++) {
     961          87 :                                                 retnames[i] = ((sql_arg *)argnode->data)->name;
     962          87 :                                                 argnode = argnode->next;
     963             :                                         }
     964             :                                 } else {
     965           0 :                                         msg = createException(MAL, "pyapi3.eval",
     966             :                                                                                   SQLSTATE(PY000) "Return value is a dictionary, but "
     967             :                                                                                   "there is no sql function object, so "
     968             :                                                                                   "we don't know the return value "
     969             :                                                                                   "names and mapping cannot be done.");
     970           0 :                                         goto wrapup;
     971             :                                 }
     972             :                         } else {
     973             :                                 // If there are a variable number of return types, we take the
     974             :                                 // column names from the dictionary
     975           0 :                                 PyObject *keys = PyDict_Keys(pResult);
     976           0 :                                 retcols = (int)PyList_Size(keys);
     977           0 :                                 retnames = GDKzalloc(sizeof(char *) * retcols);
     978           0 :                                 for (i = 0; i < retcols; i++) {
     979           0 :                                         PyObject *colname = PyList_GetItem(keys, i);
     980           0 :                                         if (!PyString_CheckExact(colname)) {
     981           0 :                                                 msg = createException(MAL, "pyapi3.eval",
     982             :                                                                                           SQLSTATE(PY000) "Expected a string key in the "
     983             :                                                                                           "dictionary, but received an "
     984             :                                                                                           "object of type %s",
     985             :                                                                                           colname->ob_type->tp_name);
     986           0 :                                                 goto wrapup;
     987             :                                         }
     988             : #ifndef IS_PY3K
     989             :                                         retnames[i] = ((PyStringObject *)colname)->ob_sval;
     990             : #else
     991           0 :                                         retnames[i] = (char *) PyUnicode_AsUTF8(colname);
     992             : #endif
     993             :                                 }
     994             :                         }
     995          35 :                         pResult =
     996          35 :                                 PyDict_CheckForConversion(pResult, retcols, retnames, &msg);
     997          35 :                         if (retnames != NULL)
     998          35 :                                 GDKfree(retnames);
     999          92 :                 } else if (varres) {
    1000           0 :                         msg = createException(MAL, "pyapi3.eval",
    1001             :                                                                   SQLSTATE(PY000) "Expected a variable number return values, "
    1002             :                                                                   "but the return type was not a dictionary. "
    1003             :                                                                   "We require the return type to be a "
    1004             :                                                                   "dictionary for column naming purposes.");
    1005           0 :                         goto wrapup;
    1006             :                 } else {
    1007             :                         // Now we need to do some error checking on the result object,
    1008             :                         // because the result object has to have the correct type/size
    1009             :                         // We will also do some converting of result objects to a common
    1010             :                         // type (such as scalar -> [[scalar]])
    1011          92 :                         pResult = PyObject_CheckForConversion(pResult, retcols, NULL, &msg);
    1012             :                 }
    1013         127 :                 if (pResult == NULL) {
    1014           3 :                         goto wrapup;
    1015             :                 }
    1016             :         }
    1017             : 
    1018         124 :         if (varres) {
    1019           0 :                 GDKfree(pyreturn_values);
    1020           0 :                 pyreturn_values = GDKzalloc(retcols * sizeof(PyReturn));
    1021             :         }
    1022             : 
    1023             :         // Now we have executed the Python function, we have to collect the return
    1024             :         // values and convert them to BATs
    1025             :         // We will first collect header information about the Python return objects
    1026             :         // and extract the underlying C arrays
    1027             :         // We will store this header information in a PyReturn object
    1028             : 
    1029             :         // The reason we are doing this as a separate step is because this
    1030             :         // preprocessing requires us to call the Python API
    1031             :         // Whereas the actual returning does not require us to call the Python API
    1032             :         // This means we can do the actual returning without holding the GIL
    1033         124 :         if (!PyObject_PreprocessObject(pResult, pyreturn_values, retcols, &msg)) {
    1034           0 :                 goto wrapup;
    1035             :         }
    1036             : 
    1037             : #ifdef HAVE_FORK
    1038             :         /*[FORKED]*/
    1039             :         // This is where the child process stops executing
    1040             :         // We have successfully executed the Python function and converted the
    1041             :         // result object to a C array
    1042             :         // Now all that is left is to copy the C array to shared memory so the main
    1043             :         // process can read it and return it
    1044         124 :         if (mapped && child_process) {
    1045           0 :                 ReturnBatDescr *ptr;
    1046             : 
    1047             :                 // First we will fill in the header information, we will need to get a
    1048             :                 // pointer to the header data first
    1049             :                 // The main process has already created the header data for the child
    1050             :                 // process
    1051           0 :                 if ((mmap_ptrs[0] = GDKinitmmap(mmap_id + 0, memory_size, &mmap_sizes[0])) == NULL) {
    1052           0 :                         msg = createException(MAL, "pyapi3.eval", GDK_EXCEPTION);
    1053           0 :                         goto wrapup;
    1054             :                 }
    1055             : 
    1056             :                 // Now we will write data about our result (memory size, type, number of
    1057             :                 // elements) to the header
    1058           0 :                 ptr = (ReturnBatDescr *)mmap_ptrs[0];
    1059           0 :                 for (i = 0; i < retcols; i++) {
    1060           0 :                         PyReturn *ret = &pyreturn_values[i];
    1061           0 :                         ReturnBatDescr *descr = &ptr[i];
    1062             : 
    1063           0 :                         if (ret->result_type == NPY_OBJECT) {
    1064             :                                 // We can't deal with NPY_OBJECT arrays, because these are
    1065             :                                 // 'arrays of pointers', so we can't just copy the content of
    1066             :                                 // the array into shared memory
    1067             :                                 // So if we're dealing with a NPY_OBJECT array, we convert them
    1068             :                                 // to a Numpy Array of type NPY_<TYPE> that corresponds with the
    1069             :                                 // desired BAT type
    1070             :                                 // WARNING: Because we could be converting to a NPY_STRING or
    1071             :                                 // NPY_UNICODE array (if the desired type is TYPE_str or
    1072             :                                 // TYPE_hge), this means that memory usage can explode
    1073             :                                 //   because NPY_STRING/NPY_UNICODE arrays are 2D string arrays
    1074             :                                 //   with fixed string length (so if there's one very large
    1075             :                                 //   string the size explodes quickly)
    1076             :                                 //   if someone has some problem with memory size exploding when
    1077             :                                 //   using PYTHON_MAP but it being fine in regular PYTHON this
    1078             :                                 //   is probably the issue
    1079           0 :                                 PyInput *inp = &pyinput_values[i - (pci->retc + 2)];
    1080           0 :                                 int bat_type = inp->bat_type;
    1081           0 :                                 PyObject *new_array = PyArray_FromAny(
    1082             :                                         ret->numpy_array,
    1083           0 :                                         PyArray_DescrFromType(BatType_ToPyType(bat_type)), 1, 1,
    1084             :                                         NPY_ARRAY_CARRAY | NPY_ARRAY_FORCECAST, NULL);
    1085           0 :                                 if (new_array == NULL) {
    1086           0 :                                         msg = createException(MAL, "pyapi3.eval",
    1087             :                                                                                   SQLSTATE(PY000) "Could not convert the returned "
    1088             :                                                                                   "NPY_OBJECT array to the desired "
    1089             :                                                                                   "array of type %s.\n",
    1090             :                                                                                   BatType_Format(bat_type));
    1091           0 :                                         goto wrapup;
    1092             :                                 }
    1093           0 :                                 Py_DECREF(ret->numpy_array); // do we really care about cleaning
    1094             :                                                                                          // this up, considering this only
    1095             :                                                                                          // happens in a separate process
    1096             :                                                                                          // that will be exited soon anyway?
    1097           0 :                                 ret->numpy_array = new_array;
    1098           0 :                                 ret->result_type =
    1099           0 :                                         PyArray_DESCR((PyArrayObject *)ret->numpy_array)->type_num;
    1100           0 :                                 ret->memory_size =
    1101           0 :                                         PyArray_DESCR((PyArrayObject *)ret->numpy_array)->elsize;
    1102           0 :                                 ret->count = PyArray_DIMS((PyArrayObject *)ret->numpy_array)[0];
    1103           0 :                                 ret->array_data =
    1104           0 :                                         PyArray_DATA((PyArrayObject *)ret->numpy_array);
    1105             :                         }
    1106             : 
    1107           0 :                         descr->npy_type = ret->result_type;
    1108           0 :                         descr->element_size = ret->memory_size;
    1109           0 :                         descr->bat_count = ret->count;
    1110           0 :                         descr->bat_size = ret->memory_size * ret->count;
    1111           0 :                         descr->has_mask = ret->mask_data != NULL;
    1112             : 
    1113           0 :                         if (ret->count > 0) {
    1114           0 :                                 int memory_size = ret->memory_size * ret->count;
    1115           0 :                                 char *mem_ptr;
    1116             :                                 // now create shared memory for the return value and copy the
    1117             :                                 // actual values
    1118           0 :                                 assert(memory_size > 0);
    1119           0 :                                 if ((mmap_ptrs[i + 3] = GDKinitmmap(mmap_id + i + 3, memory_size,
    1120           0 :                                                                                                         &mmap_sizes[i + 3])) == NULL) {
    1121           0 :                                         msg = createException(MAL, "pyapi3.eval", GDK_EXCEPTION);
    1122           0 :                                         goto wrapup;
    1123             :                                 }
    1124           0 :                                 mem_ptr = mmap_ptrs[i + 3];
    1125           0 :                                 assert(mem_ptr);
    1126           0 :                                 memcpy(mem_ptr, PyArray_DATA((PyArrayObject *)ret->numpy_array),
    1127             :                                            memory_size);
    1128             : 
    1129           0 :                                 if (descr->has_mask) {
    1130           0 :                                         bool *mask_ptr;
    1131           0 :                                         int mask_size = ret->count * sizeof(bool);
    1132           0 :                                         assert(mask_size > 0);
    1133             :                                         // create a memory space for the mask
    1134           0 :                                         if ((mmap_ptrs[retcols + (i + 3)] = GDKinitmmap(
    1135           0 :                                                          mmap_id + retcols + (i + 3), mask_size,
    1136           0 :                                                          &mmap_sizes[retcols + (i + 3)])) == NULL) {
    1137           0 :                                                 msg = createException(MAL, "pyapi3.eval", GDK_EXCEPTION);
    1138           0 :                                                 goto wrapup;
    1139             :                                         }
    1140           0 :                                         mask_ptr = mmap_ptrs[retcols + i + 3];
    1141           0 :                                         assert(mask_ptr);
    1142           0 :                                         memcpy(mask_ptr, ret->mask_data, mask_size);
    1143             :                                 }
    1144             :                         }
    1145             :                 }
    1146             :                 // now free the main process from the semaphore
    1147           0 :                 if (GDKchangesemval(query_sem, 0, 1) != GDK_SUCCEED) {
    1148           0 :                         msg = createException(MAL, "pyapi3.eval", GDK_EXCEPTION);
    1149           0 :                         goto wrapup;
    1150             :                 }
    1151             :                 // Exit child process without an error code
    1152           0 :                 exit(0);
    1153             :         }
    1154             : #endif
    1155             :         // We are done executing Python code (aside from cleanup), so we can release
    1156             :         // the GIL
    1157         124 :         gstate = Python_ReleaseGIL(gstate);
    1158             : 
    1159             : #ifdef HAVE_FORK // This goto is only used for multiprocessing, if HAVE_FORK is
    1160             :                                  // set to 0 this is unused
    1161         124 : returnvalues:
    1162             : #endif
    1163             :         /*[RETURN_VALUES]*/
    1164         124 :         argnode = sqlfun && sqlfun->res ? sqlfun->res->h : NULL;
    1165         296 :         for (i = 0; i < retcols; i++) {
    1166         172 :                 PyReturn *ret = &pyreturn_values[i];
    1167         172 :                 int bat_type = TYPE_any;
    1168         344 :                 sql_subtype *sql_subtype =
    1169         172 :                         argnode ? &((sql_arg *)argnode->data)->type : NULL;
    1170         172 :                 if (!varres) {
    1171         172 :                         bat_type = getBatType(getArgType(mb, pci, i));
    1172             : 
    1173         172 :                         if (bat_type == TYPE_any || bat_type == TYPE_void) {
    1174           0 :                                 bat_type = PyType_ToBat(ret->result_type);
    1175           0 :                                 getArgType(mb, pci, i) = bat_type;
    1176             :                         }
    1177             :                 } else {
    1178           0 :                         bat_type = PyType_ToBat(ret->result_type);
    1179             :                 }
    1180             : 
    1181         344 :                 b = PyObject_ConvertToBAT(ret, sql_subtype, bat_type, i, seqbase, &msg,
    1182         172 :                                                                   !enable_zerocopy_output);
    1183         172 :                 if (b == NULL) {
    1184           0 :                         goto wrapup;
    1185             :                 }
    1186             : 
    1187         172 :                 msg = MAL_SUCCEED;
    1188         172 :                 if (isaBatType(getArgType(mb, pci, i))) {
    1189         155 :                         *getArgReference_bat(stk, pci, i) = b->batCacheid;
    1190         155 :                         BBPkeepref(b->batCacheid);
    1191             :                 } else { // single value return, only for non-grouped aggregations
    1192          17 :                         if (bat_type != TYPE_str) {
    1193          17 :                                 if (VALinit(&stk->stk[pci->argv[i]], bat_type, Tloc(b, 0)) ==
    1194             :                                         NULL)
    1195           0 :                                         msg = createException(MAL, "pyapi3.eval", SQLSTATE(HY013) MAL_MALLOC_FAIL);
    1196             :                         } else {
    1197           0 :                                 BATiter li = bat_iterator(b);
    1198           0 :                                 if (VALinit(&stk->stk[pci->argv[i]], bat_type,
    1199           0 :                                                         BUNtail(li, 0)) == NULL)
    1200           0 :                                         msg = createException(MAL, "pyapi3.eval", SQLSTATE(HY013) MAL_MALLOC_FAIL);
    1201             :                         }
    1202             :                 }
    1203         172 :                 if (argnode) {
    1204         172 :                         argnode = argnode->next;
    1205             :                 }
    1206             :         }
    1207         124 : wrapup:
    1208             : 
    1209             : #ifdef HAVE_FORK
    1210         141 :         if (mapped && child_process) {
    1211             :                 // If we get here, something went wrong in a child process
    1212           0 :                 char *error_mem;
    1213           0 :                 ReturnBatDescr *ptr;
    1214             : 
    1215             :                 // Now we exit the program with an error code
    1216           0 :                 if (GDKchangesemval(query_sem, 0, 1) != GDK_SUCCEED) {
    1217           0 :                         exit(1);
    1218             :                 }
    1219             : 
    1220           0 :                 assert(memory_size > 0);
    1221           0 :                 if ((mmap_ptrs[0] = GDKinitmmap(mmap_id + 0, memory_size, &mmap_sizes[0])) == NULL) {
    1222           0 :                         exit(1);
    1223             :                 }
    1224             : 
    1225             :                 // To indicate that we failed, we will write information to our header
    1226           0 :                 ptr = (ReturnBatDescr *)mmap_ptrs[0];
    1227           0 :                 for (i = 0; i < retcols; i++) {
    1228           0 :                         ReturnBatDescr *descr = &ptr[i];
    1229             :                         // We will write descr->npy_type to -1, so other processes can see
    1230             :                         // that we failed
    1231           0 :                         descr->npy_type = -1;
    1232             :                         // We will write the memory size of our error message to the
    1233             :                         // bat_size, so the main process can access the shared memory
    1234           0 :                         descr->bat_size = (strlen(msg) + 1) * sizeof(char);
    1235             :                 }
    1236             : 
    1237             :                 // Now create the shared memory to write our error message to
    1238             :                 // We can simply use the slot mmap_id + 3, even though this is normally
    1239             :                 // used for query return values
    1240             :                 // This is because, if the process fails, no values will be returned
    1241           0 :                 if ((error_mem = GDKinitmmap(mmap_id + 3,
    1242           0 :                                                                          (strlen(msg) + 1) * sizeof(char),
    1243             :                                                                          NULL)) == NULL) {
    1244           0 :                         exit(1);
    1245             :                 }
    1246           0 :                 strcpy(error_mem, msg);
    1247           0 :                 exit(1);
    1248             :         }
    1249             : #endif
    1250             : 
    1251             : #ifdef HAVE_FORK
    1252         141 :         if (holds_gil) {
    1253         135 :                 MT_lock_set(&pyapiLock);
    1254         135 :                 python_call_active = false;
    1255         135 :                 MT_lock_unset(&pyapiLock);
    1256             :         }
    1257             : 
    1258         141 :         if (mapped) {
    1259           0 :                 for (i = 0; i < retcols; i++) {
    1260           0 :                         PyReturn *ret = &pyreturn_values[i];
    1261           0 :                         if (ret->mmap_id < 0) {
    1262             :                                 // if we directly give the mmap file to a BAT, don't delete the
    1263             :                                 // MMAP file
    1264           0 :                                 mmap_ptrs[i + 3] = NULL;
    1265             :                         }
    1266             :                 }
    1267           0 :                 for (i = 0; i < 3 + pci->retc * 2; i++) {
    1268           0 :                         if (mmap_ptrs[i] != NULL) {
    1269           0 :                                 GDKreleasemmap(mmap_ptrs[i], mmap_sizes[i], mmap_id + i);
    1270             :                         }
    1271             :                 }
    1272           0 :                 if (mmap_ptrs)
    1273           0 :                         GDKfree(mmap_ptrs);
    1274           0 :                 if (mmap_sizes)
    1275           0 :                         GDKfree(mmap_sizes);
    1276           0 :                 if (query_sem > 0) {
    1277           0 :                         GDKreleasesem(query_sem);
    1278             :                 }
    1279             :         }
    1280             : #endif
    1281             :         // Actual cleanup
    1282             :         // Cleanup input BATs
    1283         504 :         for (i = pci->retc + 2; i < pci->argc; i++) {
    1284         363 :                 PyInput *inp = &pyinput_values[i - (pci->retc + 2)];
    1285         363 :                 if (inp->bat != NULL)
    1286         304 :                         BBPunfix(inp->bat->batCacheid);
    1287             :         }
    1288         141 :         if (pResult != NULL && gstate == 0) {
    1289             :                 // if there is a pResult here, we are running single threaded (LANGUAGE
    1290             :                 // PYTHON),
    1291             :                 // thus we need to free python objects, thus we need to obtain the GIL
    1292         124 :                 gstate = Python_ObtainGIL();
    1293             :         }
    1294         343 :         for (i = 0; i < retcols; i++) {
    1295         202 :                 PyReturn *ret = &pyreturn_values[i];
    1296             :                 // First clean up any return values
    1297         202 :                 if (!ret->multidimensional) {
    1298             :                         // Clean up numpy arrays, if they are there
    1299         202 :                         if (ret->numpy_array != NULL) {
    1300         172 :                                 Py_DECREF(ret->numpy_array);
    1301             :                         }
    1302         202 :                         if (ret->numpy_mask != NULL) {
    1303         202 :                                 Py_DECREF(ret->numpy_mask);
    1304             :                         }
    1305             :                 }
    1306             :         }
    1307         141 :         if (pResult != NULL) {
    1308         124 :                 Py_DECREF(pResult);
    1309             :         }
    1310         141 :         if (gstate != 0) {
    1311         139 :                 gstate = Python_ReleaseGIL(gstate);
    1312             :         }
    1313             : 
    1314             :         // Now release some GDK memory we allocated for strings and input values
    1315         141 :         GDKfree(pyreturn_values);
    1316         141 :         GDKfree(pyinput_values);
    1317        1129 :         for (i = 0; i < pci->argc; i++)
    1318         847 :                 if (args[i])
    1319         351 :                         GDKfree(args[i]);
    1320         141 :         GDKfree(args);
    1321         141 :         GDKfree(pycall);
    1322             : 
    1323         141 :         return msg;
    1324             : }
    1325             : 
    1326             : str
    1327           7 : PYFUNCNAME(PyAPIprelude)(void *ret) {
    1328           7 :         (void) ret;
    1329           7 :         MT_lock_set(&pyapiLock);
    1330           7 :         if (!pyapiInitialized) {
    1331             : #ifdef IS_PY3K
    1332           7 :                 wchar_t* program = Py_DecodeLocale("mserver5", NULL);
    1333           7 :                 wchar_t* argv[] = { program };
    1334             : #else
    1335             :                 char* argv[] = {"mserver5"};
    1336             : #endif
    1337           7 :                 str msg = MAL_SUCCEED;
    1338           7 :                 PyObject *tmp;
    1339           7 :                 Py_Initialize();
    1340           7 :                 PySys_SetArgvEx(1, argv, 0);
    1341           7 :                 _import_array();
    1342           7 :                 msg = _connection_init();
    1343           7 :                 if (msg != MAL_SUCCEED) {
    1344           0 :                         MT_lock_unset(&pyapiLock);
    1345           0 :                         return msg;
    1346             :                 }
    1347           7 :                 msg = _conversion_init();
    1348           7 :                 if (msg != MAL_SUCCEED) {
    1349           0 :                         MT_lock_unset(&pyapiLock);
    1350           0 :                         return msg;
    1351             :                 }
    1352           7 :                 _pytypes_init();
    1353           7 :                 _loader_init();
    1354           7 :                 tmp = PyString_FromString("marshal");
    1355           7 :                 marshal_module = PyImport_Import(tmp);
    1356           7 :                 Py_DECREF(tmp);
    1357           7 :                 if (marshal_module == NULL) {
    1358           0 :                         MT_lock_unset(&pyapiLock);
    1359           0 :                         return createException(MAL, "pyapi3.eval", SQLSTATE(PY000) "Failed to load Marshal module.");
    1360             :                 }
    1361           7 :                 marshal_loads = PyObject_GetAttrString(marshal_module, "loads");
    1362           7 :                 if (marshal_loads == NULL) {
    1363           0 :                         MT_lock_unset(&pyapiLock);
    1364           0 :                         return createException(MAL, "pyapi3.eval", SQLSTATE(PY000) "Failed to load function \"loads\" from Marshal module.");
    1365             :                 }
    1366           7 :                 if (PyRun_SimpleString("import numpy") != 0) {
    1367           0 :                         msg = PyError_CreateException("Failed to initialize embedded python", NULL);
    1368           0 :                         MT_lock_unset(&pyapiLock);
    1369           0 :                         return msg;
    1370             :                 }
    1371           7 :                 PyEval_SaveThread();
    1372           7 :                 if (msg != MAL_SUCCEED) {
    1373             :                         MT_lock_unset(&pyapiLock);
    1374             :                         return msg;
    1375             :                 }
    1376           7 :                 pyapiInitialized = true;
    1377           7 :                 fprintf(stdout, "# MonetDB/Python%d module loaded\n",
    1378             : #ifdef IS_PY3K
    1379             :                         3
    1380             : #else
    1381             :                         2
    1382             : #endif
    1383             :                 );
    1384             :         }
    1385           7 :         MT_lock_unset(&pyapiLock);
    1386           7 :         option_disable_fork = GDKgetenv_istrue(fork_disableflag) || GDKgetenv_isyes(fork_disableflag);
    1387           7 :         return MAL_SUCCEED;
    1388             : }
    1389             : 
    1390          24 : char *PyError_CreateException(char *error_text, char *pycall)
    1391             : {
    1392          24 :         PyObject *py_error_type = NULL, *py_error_value = NULL,
    1393          24 :                          *py_error_traceback = NULL;
    1394          24 :         const char *py_error_string = NULL;
    1395          24 :         lng line_number = -1;
    1396             : 
    1397          24 :         PyErr_Fetch(&py_error_type, &py_error_value, &py_error_traceback);
    1398          24 :         if (py_error_value) {
    1399          24 :                 PyObject *error;
    1400          24 :                 PyErr_NormalizeException(&py_error_type, &py_error_value,
    1401             :                                                                  &py_error_traceback);
    1402          24 :                 error = PyObject_Str(py_error_value);
    1403             : 
    1404          24 :                 py_error_string = PyString_AS_STRING(error);
    1405          24 :                 Py_XDECREF(error);
    1406          24 :                 if (pycall != NULL && strlen(pycall) > 0) {
    1407          24 :                         if (py_error_traceback == NULL) {
    1408             :                                 // no traceback info, this means we are dealing with a parsing
    1409             :                                 // error
    1410             :                                 // line information should be in the error message
    1411           2 :                                 sscanf(py_error_string, "%*[^0-9]" LLSCN, &line_number);
    1412           2 :                                 if (line_number < 0)
    1413           0 :                                         goto finally;
    1414             :                         } else {
    1415          22 :                                 line_number =
    1416          22 :                                         ((PyTracebackObject *)py_error_traceback)->tb_lineno;
    1417             :                         }
    1418             : 
    1419             :                         // Now only display the line numbers around the error message, we
    1420             :                         // display 5 lines around the error message
    1421             :                         {
    1422             :                                 char linenr[32];
    1423             :                                 size_t nrpos, pos, i, j;
    1424             :                                 char lineinformation[5000]; // we only support 5000 characters
    1425             :                                                                                         // for 5 lines of the program,
    1426             :                                                                                         // should be enough
    1427             :                                 nrpos = 0; // Current line number
    1428             :                                 pos = 0; // Current position in the lineinformation result array
    1429        3522 :                                 for (i = 0; i < strlen(pycall); i++) {
    1430        3498 :                                         if (pycall[i] == '\n' || i == 0) {
    1431             :                                                 // Check if we have arrived at a new line, if we have
    1432             :                                                 // increment the line count
    1433         179 :                                                 nrpos++;
    1434             :                                                 // Now check if we should display this line
    1435         179 :                                                 if (nrpos >= ((size_t)line_number - 2) &&
    1436         109 :                                                         nrpos <= ((size_t)line_number + 2) && pos < 4997) {
    1437             :                                                         // We shouldn't put a newline on the first line we
    1438             :                                                         // encounter, only on subsequent lines
    1439          98 :                                                         if (nrpos > ((size_t)line_number - 2))
    1440          83 :                                                                 lineinformation[pos++] = '\n';
    1441          98 :                                                         if ((size_t)line_number == nrpos) {
    1442             :                                                                 // If this line is the 'error' line, add an
    1443             :                                                                 // arrow before it, otherwise just add spaces
    1444          24 :                                                                 lineinformation[pos++] = '>';
    1445          24 :                                                                 lineinformation[pos++] = ' ';
    1446             :                                                         } else {
    1447          74 :                                                                 lineinformation[pos++] = ' ';
    1448          74 :                                                                 lineinformation[pos++] = ' ';
    1449             :                                                         }
    1450          98 :                                                         snprintf(linenr, 32, "%zu", nrpos);
    1451         198 :                                                         for (j = 0; j < strlen(linenr); j++) {
    1452         100 :                                                                 lineinformation[pos++] = linenr[j];
    1453             :                                                         }
    1454          98 :                                                         lineinformation[pos++] = '.';
    1455          98 :                                                         lineinformation[pos++] = ' ';
    1456             :                                                 }
    1457             :                                         }
    1458        3498 :                                         if (pycall[i] != '\n' && nrpos >= (size_t)line_number - 2 &&
    1459        2065 :                                                 nrpos <= (size_t)line_number + 2 && pos < 4999) {
    1460             :                                                 // If we are on a line number that we have to display,
    1461             :                                                 // copy the text from this line for display
    1462        1994 :                                                 lineinformation[pos++] = pycall[i];
    1463             :                                         }
    1464             :                                 }
    1465          24 :                                 lineinformation[pos] = '\0';
    1466          24 :                                 return createException(MAL, "pyapi3.eval",  SQLSTATE(PY000) "%s\n%s\n%s",
    1467             :                                                                            error_text, lineinformation,
    1468             :                                                                            py_error_string);
    1469             :                         }
    1470             :                 }
    1471             :         } else {
    1472             :                 py_error_string = "";
    1473             :         }
    1474           0 : finally:
    1475           0 :         if (pycall == NULL)
    1476           0 :                 return createException(MAL, "pyapi3.eval", SQLSTATE(PY000) "%s\n%s", error_text,
    1477             :                                                            py_error_string);
    1478           0 :         return createException(MAL, "pyapi3.eval", SQLSTATE(PY000) "%s\n%s\n%s", error_text, pycall,
    1479             :                                                    py_error_string);
    1480             : }
    1481             : 
    1482          16 : static void ComputeParallelAggregation(AggrParams *p)
    1483             : {
    1484          16 :         int i;
    1485          16 :         size_t group_it, ai;
    1486          16 :         bool gstate = 0;
    1487             :         // now perform the actual aggregation
    1488             :         // we perform one aggregation per group
    1489             : 
    1490             :         // we need the GIL to execute the functions
    1491          16 :         gstate = Python_ObtainGIL();
    1492          32 :         for (group_it = p->group_start; group_it < p->group_end; group_it++) {
    1493             :                 // we first have to construct new
    1494          16 :                 PyObject *pArgsPartial =
    1495          16 :                         PyTuple_New(p->named_columns + p->additional_columns);
    1496          16 :                 PyObject *pColumnsPartial = PyDict_New();
    1497          16 :                 PyObject *result;
    1498          16 :                 size_t group_elements = (*p->group_counts)[group_it];
    1499          16 :                 ai = 0;
    1500             :                 // iterate over columns
    1501          32 :                 for (i = 0; i < (int)p->named_columns; i++) {
    1502          16 :                         PyObject *vararray = NULL;
    1503          16 :                         PyInput input = (*p->pyinput_values)[i];
    1504          16 :                         if (input.scalar) {
    1505             :                                 // scalar not handled yet
    1506             :                                 vararray = input.result;
    1507             :                         } else {
    1508          16 :                                 npy_intp elements[1] = {group_elements};
    1509          16 :                                 switch (input.bat_type) {
    1510           0 :                                         case TYPE_void:
    1511           0 :                                                 vararray = PyArray_New(
    1512             :                                                         &PyArray_Type, 1, elements,
    1513             : #if SIZEOF_OID == SIZEOF_INT
    1514             :                                                         NPY_UINT
    1515             : #else
    1516             :                                                         NPY_ULONGLONG
    1517             : #endif
    1518             :                                                         ,
    1519             :                                                         NULL,
    1520           0 :                                                         ((oid ***)(*p->split_bats))[group_it][i], 0,
    1521             :                                                         NPY_ARRAY_CARRAY || !NPY_ARRAY_WRITEABLE, NULL);
    1522           0 :                                                 break;
    1523           0 :                                         case TYPE_oid:
    1524           0 :                                                 vararray = PyArray_New(
    1525             :                                                         &PyArray_Type, 1, elements,
    1526             : #if SIZEOF_OID == SIZEOF_INT
    1527             :                                                         NPY_UINT32
    1528             : #else
    1529             :                                                         NPY_UINT64
    1530             : #endif
    1531             :                                                         ,
    1532             :                                                         NULL,
    1533           0 :                                                         ((oid ***)(*p->split_bats))[group_it][i], 0,
    1534             :                                                         NPY_ARRAY_CARRAY || !NPY_ARRAY_WRITEABLE, NULL);
    1535           0 :                                                 break;
    1536           0 :                                         case TYPE_bit:
    1537           0 :                                                 vararray = PyArray_New(
    1538             :                                                         &PyArray_Type, 1, elements, NPY_BOOL, NULL,
    1539           0 :                                                         ((bit ***)(*p->split_bats))[group_it][i], 0,
    1540             :                                                         NPY_ARRAY_CARRAY || !NPY_ARRAY_WRITEABLE, NULL);
    1541           0 :                                                 break;
    1542           0 :                                         case TYPE_bte:
    1543           0 :                                                 vararray = PyArray_New(
    1544             :                                                         &PyArray_Type, 1, elements, NPY_INT8, NULL,
    1545           0 :                                                         ((bte ***)(*p->split_bats))[group_it][i], 0,
    1546             :                                                         NPY_ARRAY_CARRAY || !NPY_ARRAY_WRITEABLE, NULL);
    1547           0 :                                                 break;
    1548           0 :                                         case TYPE_sht:
    1549           0 :                                                 vararray = PyArray_New(
    1550             :                                                         &PyArray_Type, 1, elements, NPY_INT16, NULL,
    1551           0 :                                                         ((sht ***)(*p->split_bats))[group_it][i], 0,
    1552             :                                                         NPY_ARRAY_CARRAY || !NPY_ARRAY_WRITEABLE, NULL);
    1553           0 :                                                 break;
    1554           4 :                                         case TYPE_int:
    1555           4 :                                                 vararray = PyArray_New(
    1556             :                                                         &PyArray_Type, 1, elements, NPY_INT32, NULL,
    1557           4 :                                                         ((int ***)(*p->split_bats))[group_it][i], 0,
    1558             :                                                         NPY_ARRAY_CARRAY || !NPY_ARRAY_WRITEABLE, NULL);
    1559           4 :                                                 break;
    1560           0 :                                         case TYPE_lng:
    1561           0 :                                                 vararray = PyArray_New(
    1562             :                                                         &PyArray_Type, 1, elements, NPY_INT64, NULL,
    1563           0 :                                                         ((lng ***)(*p->split_bats))[group_it][i], 0,
    1564             :                                                         NPY_ARRAY_CARRAY || !NPY_ARRAY_WRITEABLE, NULL);
    1565           0 :                                                 break;
    1566           0 :                                         case TYPE_flt:
    1567           0 :                                                 vararray = PyArray_New(
    1568             :                                                         &PyArray_Type, 1, elements, NPY_FLOAT32, NULL,
    1569           0 :                                                         ((flt ***)(*p->split_bats))[group_it][i], 0,
    1570             :                                                         NPY_ARRAY_CARRAY || !NPY_ARRAY_WRITEABLE, NULL);
    1571           0 :                                                 break;
    1572             : #ifdef HAVE_HGE
    1573           8 :                                         case TYPE_hge:
    1574             : #endif
    1575             :                                         case TYPE_dbl:
    1576           8 :                                                 vararray = PyArray_New(
    1577             :                                                         &PyArray_Type, 1, elements, NPY_FLOAT64, NULL,
    1578           8 :                                                         ((dbl ***)(*p->split_bats))[group_it][i], 0,
    1579             :                                                         NPY_ARRAY_CARRAY || !NPY_ARRAY_WRITEABLE, NULL);
    1580           8 :                                                 break;
    1581           4 :                                         case TYPE_str:
    1582           4 :                                                 vararray = PyArray_New(
    1583             :                                                         &PyArray_Type, 1, elements, NPY_OBJECT, NULL,
    1584           4 :                                                         ((PyObject ****)(*p->split_bats))[group_it][i], 0,
    1585             :                                                         NPY_ARRAY_CARRAY || !NPY_ARRAY_WRITEABLE, NULL);
    1586           4 :                                                 break;
    1587             :                                 }
    1588             : 
    1589          16 :                                 if (vararray == NULL) {
    1590           0 :                                         p->msg = createException(MAL, "pyapi3.eval", SQLSTATE(HY013) MAL_MALLOC_FAIL
    1591             :                                                                                          " to create NumPy array.");
    1592           0 :                                         goto wrapup;
    1593             :                                 }
    1594             :                         }
    1595             :                         // fill in _columns array
    1596          16 :                         PyDict_SetItemString(pColumnsPartial, (*p->args)[p->base + i],
    1597             :                                                                  vararray);
    1598             : 
    1599          16 :                         PyTuple_SetItem(pArgsPartial, ai++, vararray);
    1600             :                 }
    1601             : 
    1602             :                 // additional parameters
    1603          16 :                 PyTuple_SetItem(pArgsPartial, ai++, pColumnsPartial);
    1604          16 :                 PyTuple_SetItem(pArgsPartial, ai++, *p->column_types_dict);
    1605          16 :                 Py_INCREF(*p->column_types_dict);
    1606          16 :                 PyTuple_SetItem(pArgsPartial, ai++, *p->connection);
    1607          16 :                 Py_INCREF(*p->connection);
    1608             : 
    1609             :                 // call the aggregation function
    1610          16 :                 result = PyObject_CallObject(*p->function, pArgsPartial);
    1611          16 :                 Py_DECREF(pArgsPartial);
    1612             : 
    1613          16 :                 if (result == NULL) {
    1614           0 :                         p->msg = PyError_CreateException("Python exception", *p->pycall);
    1615           0 :                         goto wrapup;
    1616             :                 }
    1617             :                 // gather results
    1618          16 :                 p->result_objects[group_it] = result;
    1619             :         }
    1620             : // release the GIL again
    1621          16 : wrapup:
    1622          16 :         gstate = Python_ReleaseGIL(gstate);
    1623          16 : }
    1624             : 
    1625             : static void CreateEmptyReturn(MalBlkPtr mb, MalStkPtr stk, InstrPtr pci,
    1626             :                                                           size_t retcols, oid seqbase)
    1627             : {
    1628             :         size_t i;
    1629             :         for (i = 0; i < retcols; i++) {
    1630             :                 int bat_type = getBatType(getArgType(mb, pci, i));
    1631             :                 BAT *b = COLnew(seqbase, bat_type, 0, TRANSIENT);
    1632             :                 if (isaBatType(getArgType(mb, pci, i))) {
    1633             :                         *getArgReference_bat(stk, pci, i) = b->batCacheid;
    1634             :                         BBPkeepref(b->batCacheid);
    1635             :                 } else { // single value return, only for non-grouped aggregations
    1636             :                         VALinit(&stk->stk[pci->argv[i]], bat_type, Tloc(b, 0));
    1637             :                 }
    1638             :         }
    1639             : }

Generated by: LCOV version 1.14