LCOV - code coverage report
Current view: top level - monetdb5/modules/mal - sysmon.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 101 238 42.4 %
Date: 2021-01-13 20:07:21 Functions: 3 6 50.0 %

          Line data    Source code
       1             : /*
       2             :  * This Source Code Form is subject to the terms of the Mozilla Public
       3             :  * License, v. 2.0.  If a copy of the MPL was not distributed with this
       4             :  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
       5             :  *
       6             :  * Copyright 1997 - July 2008 CWI, August 2008 - 2021 MonetDB B.V.
       7             :  */
       8             : 
       9             : #include "monetdb_config.h"
      10             : #include "mal.h"
      11             : #include "mal_interpreter.h"
      12             : #include "mal_authorize.h"
      13             : #include "mal_client.h"
      14             : #include "mal_runtime.h"
      15             : #include "gdk_time.h"
      16             : 
      17             : /* (c) M.L. Kersten
      18             :  * The queries currently in execution are returned to the front-end for managing expensive ones.
      19             : */
      20             : 
      21             : static str
      22           2 : SYSMONstatistics(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
      23             : {
      24             :         BAT *user, *querycount, *totalticks, *started, *finished, *maxquery, *maxticks;
      25           2 :         bat *u = getArgReference_bat(stk,pci,0);
      26           2 :         bat *c = getArgReference_bat(stk,pci,1);
      27           2 :         bat *t = getArgReference_bat(stk,pci,2);
      28           2 :         bat *s = getArgReference_bat(stk,pci,3);
      29           2 :         bat *f = getArgReference_bat(stk,pci,4);
      30           2 :         bat *m = getArgReference_bat(stk,pci,5);
      31           2 :         bat *q = getArgReference_bat(stk,pci,6);
      32             :         size_t i;
      33           2 :         timestamp tsn = timestamp_nil;
      34             :         str msg = MAL_SUCCEED;
      35             : 
      36             :         (void) mb;
      37           2 :         user = COLnew(0, TYPE_str, usrstatscnt, TRANSIENT);
      38           2 :         querycount = COLnew(0, TYPE_lng, usrstatscnt, TRANSIENT);
      39           2 :         totalticks = COLnew(0, TYPE_lng, usrstatscnt, TRANSIENT);
      40           2 :         started = COLnew(0, TYPE_timestamp, usrstatscnt, TRANSIENT);
      41           2 :         finished = COLnew(0, TYPE_timestamp, usrstatscnt, TRANSIENT);
      42           2 :         maxticks = COLnew(0, TYPE_lng, usrstatscnt, TRANSIENT);
      43           2 :         maxquery = COLnew(0, TYPE_str, usrstatscnt, TRANSIENT);
      44           2 :         if (user == NULL || querycount == NULL || totalticks == NULL || started == NULL || finished == NULL || maxquery == NULL || maxticks == NULL){
      45           0 :                 BBPreclaim(user);
      46           0 :                 BBPreclaim(started);
      47           0 :                 BBPreclaim(querycount);
      48           0 :                 BBPreclaim(totalticks);
      49           0 :                 BBPreclaim(finished);
      50           0 :                 BBPreclaim(maxticks);
      51           0 :                 BBPreclaim(maxquery);
      52           0 :                 throw(MAL, "SYSMONstatistics", SQLSTATE(HY013) MAL_MALLOC_FAIL);
      53             :         }
      54             : 
      55           2 :         MT_lock_set(&mal_delayLock);
      56             :         // FIXME: what if there are multiple users with ADMIN privilege?
      57          11 :         for (i = 0 && cntxt->user == MAL_ADMIN; i < usrstatscnt; i++) {
      58             :                 /* We can stop at the first empty entry */
      59          10 :                 if (USRstats[i].username == NULL) break;
      60             : 
      61           9 :                 if (BUNappend(user, USRstats[i].username, false) != GDK_SUCCEED) {
      62           0 :                         msg = createException(MAL, "SYSMONstatistics", "Failed to append 'user'");
      63           0 :                         goto bailout;
      64             :                 }
      65           9 :                 if (BUNappend(querycount, &USRstats[i].querycount, false) != GDK_SUCCEED){
      66           0 :                         msg = createException(MAL, "SYSMONstatistics", "Failed to append 'querycount'");
      67           0 :                         goto bailout;
      68             :                 }
      69           9 :                 if (BUNappend(totalticks, &USRstats[i].totalticks, false) != GDK_SUCCEED){
      70           0 :                         msg = createException(MAL, "SYSMONstatistics", "Failed to append 'totalticks'");
      71           0 :                         goto bailout;
      72             :                 }
      73             :                 /* convert number of seconds into a timestamp */
      74           9 :                 if (USRstats[i].maxquery != 0){
      75           9 :                         tsn = timestamp_fromtime(USRstats[i].started);
      76           9 :                         if (is_timestamp_nil(tsn)) {
      77           0 :                                 msg = createException(MAL, "SYSMONstatistics", SQLSTATE(22003) "failed to convert start time");
      78           0 :                                 goto bailout;
      79             :                         }
      80           9 :                         if (BUNappend(started, &tsn, false) != GDK_SUCCEED){
      81           0 :                                 msg = createException(MAL, "SYSMONstatistics", "Failed to append 'started'");
      82           0 :                                 goto bailout;
      83             :                         }
      84             : 
      85           9 :                         if (USRstats[i].finished == 0) {
      86           0 :                                 tsn = timestamp_nil;
      87             :                         } else {
      88           9 :                                 tsn = timestamp_fromtime(USRstats[i].finished);
      89           9 :                                 if (is_timestamp_nil(tsn)) {
      90           0 :                                         msg = createException(MAL, "SYSMONstatistics", SQLSTATE(22003) "failed to convert finish time");
      91           0 :                                         goto bailout;
      92             :                                 }
      93             :                         }
      94           9 :                         if (BUNappend(finished, &tsn, false) != GDK_SUCCEED){
      95           0 :                                 msg = createException(MAL, "SYSMONstatistics", "Failed to append 'finished'");
      96           0 :                                 goto bailout;
      97             :                         }
      98             :                 } else {
      99           0 :                         tsn = timestamp_nil;
     100           0 :                         if (BUNappend(started, &tsn, false) != GDK_SUCCEED){
     101           0 :                                 msg = createException(MAL, "SYSMONstatistics", "Failed to append 'started'");
     102           0 :                                 goto bailout;
     103             :                         }
     104           0 :                         if (BUNappend(finished, &tsn, false) != GDK_SUCCEED){
     105           0 :                                 msg = createException(MAL, "SYSMONstatistics", "Failed to append 'finished'");
     106           0 :                                 goto bailout;
     107             :                         }
     108             :                 }
     109             : 
     110           9 :                 if (BUNappend(maxticks, &USRstats[i].maxticks, false) != GDK_SUCCEED){
     111           0 :                         msg = createException(MAL, "SYSMONstatistics", "Failed to append 'maxticks'");
     112           0 :                         goto bailout;
     113             :                 }
     114           9 :                 if( USRstats[i].maxquery == 0){
     115           0 :                         if (BUNappend(maxquery, "none", false) != GDK_SUCCEED ){
     116           0 :                                 msg = createException(MAL, "SYSMONstatistics", "Failed to append 'maxquery' 1");
     117           0 :                                 goto bailout;
     118             :                         }
     119             :                 }else {
     120           9 :                         if (BUNappend(maxquery, USRstats[i].maxquery, false) != GDK_SUCCEED ){
     121           0 :                                 msg = createException(MAL, "SYSMONstatistics", "Failed to append 'maxquery' 2");
     122           0 :                                 goto bailout;
     123             :                         }
     124             :                 }
     125             :         }
     126           2 :         MT_lock_unset(&mal_delayLock);
     127           2 :         BBPkeepref(*u = user->batCacheid);
     128           2 :         BBPkeepref(*c = querycount->batCacheid);
     129           2 :         BBPkeepref(*t = totalticks->batCacheid);
     130           2 :         BBPkeepref(*s = started->batCacheid);
     131           2 :         BBPkeepref(*f = finished->batCacheid);
     132           2 :         BBPkeepref(*m = maxticks->batCacheid);
     133           2 :         BBPkeepref(*q = maxquery->batCacheid);
     134           2 :         return MAL_SUCCEED;
     135             : 
     136           0 : bailout:
     137           0 :         MT_lock_unset(&mal_delayLock);
     138           0 :         BBPunfix(user->batCacheid);
     139           0 :         BBPunfix(querycount->batCacheid);
     140           0 :         BBPunfix(totalticks->batCacheid);
     141           0 :         BBPunfix(started->batCacheid);
     142           0 :         BBPunfix(finished->batCacheid);
     143           0 :         BBPunfix(maxticks->batCacheid);
     144           0 :         BBPunfix(maxquery->batCacheid);
     145           0 :         return msg;
     146             : }
     147             : 
     148             : static str
     149          44 : SYSMONqueue(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
     150             : {
     151             :         BAT *tag, *sessionid, *user, *started, *status, *query, *finished, *workers, *memory;
     152          44 :         bat *t = getArgReference_bat(stk,pci,0);
     153          44 :         bat *s = getArgReference_bat(stk,pci,1);
     154          44 :         bat *u = getArgReference_bat(stk,pci,2);
     155          44 :         bat *sd = getArgReference_bat(stk,pci,3);
     156          44 :         bat *ss = getArgReference_bat(stk,pci,4);
     157          44 :         bat *q = getArgReference_bat(stk,pci,5);
     158          44 :         bat *f = getArgReference_bat(stk,pci,6);
     159          44 :         bat *w = getArgReference_bat(stk,pci,7);
     160          44 :         bat *m = getArgReference_bat(stk,pci,8);
     161             :         lng qtag;
     162             :         int wrk, mem;
     163             :         BUN sz;
     164             :         timestamp tsn;
     165             :         str msg = MAL_SUCCEED;
     166             : 
     167             :         (void) mb;
     168          44 :         sz = (BUN) qsize;       // reserve space for all tuples in QRYqueue
     169          44 :         tag = COLnew(0, TYPE_lng, sz, TRANSIENT);
     170          44 :         sessionid = COLnew(0, TYPE_int, sz, TRANSIENT);
     171          44 :         user = COLnew(0, TYPE_str, sz, TRANSIENT);
     172          44 :         started = COLnew(0, TYPE_timestamp, sz, TRANSIENT);
     173          44 :         status = COLnew(0, TYPE_str, sz, TRANSIENT);
     174          44 :         query = COLnew(0, TYPE_str, sz, TRANSIENT);
     175          44 :         finished = COLnew(0, TYPE_timestamp, sz, TRANSIENT);
     176          44 :         workers = COLnew(0, TYPE_int, sz, TRANSIENT);
     177          44 :         memory = COLnew(0, TYPE_int, sz, TRANSIENT);
     178          44 :         if ( tag == NULL || sessionid == NULL || user == NULL || query == NULL || started == NULL || finished == NULL || workers == NULL || memory == NULL){
     179           0 :                 BBPreclaim(tag);
     180           0 :                 BBPreclaim(sessionid);
     181           0 :                 BBPreclaim(user);
     182           0 :                 BBPreclaim(started);
     183           0 :                 BBPreclaim(status);
     184           0 :                 BBPreclaim(query);
     185           0 :                 BBPreclaim(finished);
     186           0 :                 BBPreclaim(workers);
     187           0 :                 BBPreclaim(memory);
     188           0 :                 throw(MAL, "SYSMONqueue", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     189             :         }
     190             : 
     191          44 :         MT_lock_set(&mal_delayLock);
     192       29652 :         for (size_t i = qtail; i != qhead; i++){
     193       29612 :                 if ( i == qsize){
     194             :                         i = 0;
     195          40 :                         if( i == qhead)
     196             :                                 break;
     197             :                 }
     198       29608 :                 if( QRYqueue[i].query && (cntxt->user == MAL_ADMIN || 
     199          14 :                                         strcmp(cntxt->username, QRYqueue[i].username) == 0) ){
     200       29498 :                         qtag = (lng) QRYqueue[i].tag;
     201       29498 :                         if (BUNappend(tag, &qtag, false) != GDK_SUCCEED)
     202           0 :                                 goto bailout;
     203             : 
     204       29498 :                         if (BUNappend(user, QRYqueue[i].username, false) != GDK_SUCCEED) {
     205           0 :                                 goto bailout;
     206             :                         }
     207             : 
     208       29498 :                         if (BUNappend(sessionid, &(QRYqueue[i].idx), false) != GDK_SUCCEED) {
     209           0 :                                 goto bailout;
     210             :                         }
     211             : 
     212       58996 :                         if (BUNappend(query, QRYqueue[i].query, false) != GDK_SUCCEED ||
     213       29498 :                                 BUNappend(status, QRYqueue[i].status, false) != GDK_SUCCEED)
     214           0 :                                 goto bailout;
     215             : 
     216             :                         /* convert number of seconds into a timestamp */
     217       29498 :                         tsn = timestamp_fromtime(QRYqueue[i].start);
     218       29498 :                         if (is_timestamp_nil(tsn)) {
     219           0 :                                 msg = createException(MAL, "SYSMONqueue", SQLSTATE(22003) "cannot convert time");
     220           0 :                                 goto bailout;
     221             :                         }
     222       29498 :                         if (BUNappend(started, &tsn, false) != GDK_SUCCEED)
     223           0 :                                 goto bailout;
     224             : 
     225       29498 :                         if (QRYqueue[i].finished == 0) {
     226       26437 :                                 tsn = timestamp_nil;
     227             :                         } else {
     228        3061 :                                 tsn = timestamp_fromtime(QRYqueue[i].finished);
     229        3061 :                                 if (is_timestamp_nil(tsn)) {
     230           0 :                                         msg = createException(MAL, "SYSMONqueue", SQLSTATE(22003) "cannot convert time");
     231           0 :                                         goto bailout;
     232             :                                 }
     233             :                         }
     234       29498 :                         if (BUNappend(finished, &tsn, false) != GDK_SUCCEED)
     235           0 :                                 goto bailout;
     236             : 
     237       29498 :                         wrk = QRYqueue[i].workers;
     238       29498 :                         mem = QRYqueue[i].memory;
     239       58996 :                         if ( BUNappend(workers, &wrk, false) != GDK_SUCCEED ||
     240       29498 :                                  BUNappend(memory, &mem, false) != GDK_SUCCEED)
     241           0 :                                 goto bailout;
     242             :                 }
     243             :         }
     244          44 :         MT_lock_unset(&mal_delayLock);
     245          44 :         BBPkeepref( *t =tag->batCacheid);
     246          44 :         BBPkeepref( *s =sessionid->batCacheid);
     247          44 :         BBPkeepref( *u =user->batCacheid);
     248          44 :         BBPkeepref( *sd =started->batCacheid);
     249          44 :         BBPkeepref( *ss =status->batCacheid);
     250          44 :         BBPkeepref( *q =query->batCacheid);
     251          44 :         BBPkeepref( *f =finished->batCacheid);
     252          44 :         BBPkeepref( *w =workers->batCacheid);
     253          44 :         BBPkeepref( *m =memory->batCacheid);
     254          44 :         return MAL_SUCCEED;
     255             : 
     256           0 :   bailout:
     257           0 :         MT_lock_unset(&mal_delayLock);
     258           0 :         BBPunfix(tag->batCacheid);
     259           0 :         BBPunfix(sessionid->batCacheid);
     260           0 :         BBPunfix(user->batCacheid);
     261           0 :         BBPunfix(started->batCacheid);
     262           0 :         BBPunfix(status->batCacheid);
     263           0 :         BBPunfix(query->batCacheid);
     264           0 :         BBPunfix(finished->batCacheid);
     265           0 :         BBPunfix(workers->batCacheid);
     266           0 :         BBPunfix(memory->batCacheid);
     267           0 :         return msg ? msg : createException(MAL, "SYSMONqueue", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     268             : }
     269             : 
     270             : static str
     271           0 : SYSMONpause(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
     272             : {
     273             :         bool set = false;
     274             :         lng tag = 0;
     275             : 
     276           0 :         switch(getArgType(mb,pci,1)){
     277           0 :         case TYPE_bte: tag = *getArgReference_bte(stk,pci,1); break;
     278           0 :         case TYPE_sht: tag = *getArgReference_sht(stk,pci,1); break;
     279           0 :         case TYPE_int: tag = *getArgReference_int(stk,pci,1); break;
     280           0 :         case TYPE_lng: tag = *getArgReference_lng(stk,pci,1); break;
     281           0 :         default:
     282           0 :                 throw(MAL, "SYSMONpause", SQLSTATE(42000) "SYSMONpause requires a 64-bit integer");
     283             :         }
     284           0 :         if (tag < 1)
     285           0 :                 throw(MAL, "SYSMONpause", SQLSTATE(42000) "Tag must be positive");
     286           0 :         MT_lock_set(&mal_delayLock);
     287           0 :         for (size_t i = qtail; i != qhead; i++){
     288           0 :                 if( i == qsize){
     289             :                         i = 0;
     290           0 :                         if( i == qhead)
     291             :                                 break;
     292             :                 }
     293           0 :                 if( (lng) QRYqueue[i].tag == tag && cntxt->user == MAL_ADMIN && QRYqueue[i].stk){
     294           0 :                         QRYqueue[i].stk->status = 'p';
     295           0 :                         QRYqueue[i].status = "paused";
     296             :                         set = true;
     297             :                 }
     298             :         }
     299           0 :         MT_lock_unset(&mal_delayLock);
     300           0 :         return set ? MAL_SUCCEED : createException(MAL, "SYSMONpause", SQLSTATE(42000) "Tag " LLFMT " unknown", tag);
     301             : }
     302             : 
     303             : static str
     304           0 : SYSMONresume(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
     305             : {
     306             :         bool set = false;
     307             :         lng tag = 0;
     308             : 
     309           0 :         switch(getArgType(mb,pci,1)){
     310           0 :         case TYPE_bte: tag = *getArgReference_bte(stk,pci,1); break;
     311           0 :         case TYPE_sht: tag = *getArgReference_sht(stk,pci,1); break;
     312           0 :         case TYPE_int: tag = *getArgReference_int(stk,pci,1); break;
     313           0 :         case TYPE_lng: tag = *getArgReference_lng(stk,pci,1); break;
     314           0 :         default:
     315           0 :                 throw(MAL, "SYSMONresume", SQLSTATE(42000) "SYSMONresume requires a 64-bit integer");
     316             :         }
     317           0 :         if (tag < 1)
     318           0 :                 throw(MAL, "SYSMONresume", SQLSTATE(42000) "Tag must be positive");
     319           0 :         MT_lock_set(&mal_delayLock);
     320           0 :         for (size_t i = qtail; i == qhead; i++){
     321           0 :                 if( i == qsize){
     322             :                         i = 0;
     323           0 :                         if ( i== qhead)
     324             :                                 break;
     325             :                 }
     326           0 :                 if( (lng)QRYqueue[i].tag == tag && cntxt->user == MAL_ADMIN && QRYqueue[i].stk){
     327           0 :                         QRYqueue[i].stk->status = 0;
     328           0 :                         QRYqueue[i].status = "running";
     329             :                         set = true;
     330             :                 }
     331             :         }
     332           0 :         MT_lock_unset(&mal_delayLock);
     333           0 :         return set ? MAL_SUCCEED : createException(MAL, "SYSMONresume", SQLSTATE(42000) "Tag " LLFMT " unknown", tag);
     334             : }
     335             : 
     336             : static str
     337           0 : SYSMONstop(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
     338             : {
     339             :         bool set = false;
     340             :         lng tag = 0;
     341             : 
     342           0 :         switch(getArgType(mb,pci,1)){
     343           0 :         case TYPE_bte: tag = *getArgReference_bte(stk,pci,1); break;
     344           0 :         case TYPE_sht: tag = *getArgReference_sht(stk,pci,1); break;
     345           0 :         case TYPE_int: tag = *getArgReference_int(stk,pci,1); break;
     346           0 :         case TYPE_lng: tag = *getArgReference_lng(stk,pci,1); break;
     347           0 :         default:
     348           0 :                 throw(MAL, "SYSMONstop", SQLSTATE(42000) "SYSMONstop requires a 64-bit integer");
     349             :         }
     350           0 :         if (tag < 1)
     351           0 :                 throw(MAL, "SYSMONstop", SQLSTATE(42000) "Tag must be positive");
     352           0 :         MT_lock_set(&mal_delayLock);
     353           0 :         for (size_t i = qtail; i != qhead; i++){
     354           0 :                 if( i == qsize){
     355             :                         i = 0;
     356           0 :                         if( i == qhead)
     357             :                                 break;
     358             :                 }
     359           0 :                 if( (lng) QRYqueue[i].tag == tag && cntxt->user == MAL_ADMIN && QRYqueue[i].stk){
     360           0 :                         QRYqueue[i].stk->status = 'q';
     361           0 :                         QRYqueue[i].status = "stopping";
     362             :                         set = true;
     363             :                 }
     364             :         }
     365           0 :         MT_lock_unset(&mal_delayLock);
     366           0 :         return set ? MAL_SUCCEED : createException(MAL, "SYSMONstop", SQLSTATE(42000) "Tag " LLFMT " unknown", tag);
     367             : }
     368             : 
     369             : #include "mel.h"
     370             : mel_func sysmon_init_funcs[] = {
     371             :  pattern("sysmon", "pause", SYSMONpause, false, "Suspend a running query", args(0,1, arg("id",sht))),
     372             :  pattern("sysmon", "pause", SYSMONpause, false, "Suspend a running query", args(0,1, arg("id",int))),
     373             :  pattern("sysmon", "pause", SYSMONpause, false, "Suspend a running query", args(0,1, arg("id",lng))),
     374             :  pattern("sysmon", "resume", SYSMONresume, false, "Resume processing of a query ", args(0,1, arg("id",sht))),
     375             :  pattern("sysmon", "resume", SYSMONresume, false, "Resume processing of a query ", args(0,1, arg("id",int))),
     376             :  pattern("sysmon", "resume", SYSMONresume, false, "Resume processing of a query ", args(0,1, arg("id",lng))),
     377             :  pattern("sysmon", "stop", SYSMONstop, false, "Stop a single query a.s.a.p.", args(0,1, arg("id",sht))),
     378             :  pattern("sysmon", "stop", SYSMONstop, false, "Stop a single query a.s.a.p.", args(0,1, arg("id",int))),
     379             :  pattern("sysmon", "stop", SYSMONstop, false, "Stop a single query a.s.a.p.", args(0,1, arg("id",lng))),
     380             :  pattern("sysmon", "queue", SYSMONqueue, false, "A queue of queries that are currently being executed or recently finished", args(9,9, batarg("tag",lng),batarg("sessionid",int),batarg("user",str),batarg("started",timestamp),batarg("status",str),batarg("query",str),batarg("finished",timestamp),batarg("workers",int),batarg("memory",int))),
     381             :  pattern("sysmon", "user_statistics", SYSMONstatistics, false, "", args(7,7, batarg("user",str),batarg("querycount",lng),batarg("totalticks",lng),batarg("started",timestamp),batarg("finished",timestamp),batarg("maxticks",lng),batarg("maxquery",str))),
     382             :  { .imp=NULL }
     383             : };
     384             : #include "mal_import.h"
     385             : #ifdef _MSC_VER
     386             : #undef read
     387             : #pragma section(".CRT$XCU",read)
     388             : #endif
     389         255 : LIB_STARTUP_FUNC(init_sysmon_mal)
     390         255 : { mal_module("sysmon", NULL, sysmon_init_funcs); }

Generated by: LCOV version 1.14