LCOV - code coverage report
Current view: top level - monetdb5/modules/mal - sysmon.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 105 242 43.4 %
Date: 2021-10-27 03:06:47 Functions: 3 6 50.0 %

          Line data    Source code
       1             : /*
       2             :  * This Source Code Form is subject to the terms of the Mozilla Public
       3             :  * License, v. 2.0.  If a copy of the MPL was not distributed with this
       4             :  * file, You can obtain one at http://mozilla.org/MPL/2.0/.
       5             :  *
       6             :  * Copyright 1997 - July 2008 CWI, August 2008 - 2021 MonetDB B.V.
       7             :  */
       8             : 
       9             : #include "monetdb_config.h"
      10             : #include "mal.h"
      11             : #include "mal_interpreter.h"
      12             : #include "mal_authorize.h"
      13             : #include "mal_client.h"
      14             : #include "mal_runtime.h"
      15             : #include "gdk_time.h"
      16             : #include "mal_exception.h"
      17             : 
      18             : /* (c) M.L. Kersten
      19             :  * The queries currently in execution are returned to the front-end for managing expensive ones.
      20             : */
      21             : 
      22             : static str
      23           1 : SYSMONstatistics(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
      24             : {
      25             :         BAT *user, *querycount, *totalticks, *started, *finished, *maxquery, *maxticks;
      26           1 :         bat *u = getArgReference_bat(stk,pci,0);
      27           1 :         bat *c = getArgReference_bat(stk,pci,1);
      28           1 :         bat *t = getArgReference_bat(stk,pci,2);
      29           1 :         bat *s = getArgReference_bat(stk,pci,3);
      30           1 :         bat *f = getArgReference_bat(stk,pci,4);
      31           1 :         bat *m = getArgReference_bat(stk,pci,5);
      32           1 :         bat *q = getArgReference_bat(stk,pci,6);
      33             :         size_t i;
      34           1 :         timestamp tsn = timestamp_nil;
      35             :         str msg = MAL_SUCCEED;
      36             : 
      37             :         (void) mb;
      38           1 :         user = COLnew(0, TYPE_str, usrstatscnt, TRANSIENT);
      39           1 :         querycount = COLnew(0, TYPE_lng, usrstatscnt, TRANSIENT);
      40           1 :         totalticks = COLnew(0, TYPE_lng, usrstatscnt, TRANSIENT);
      41           1 :         started = COLnew(0, TYPE_timestamp, usrstatscnt, TRANSIENT);
      42           1 :         finished = COLnew(0, TYPE_timestamp, usrstatscnt, TRANSIENT);
      43           1 :         maxticks = COLnew(0, TYPE_lng, usrstatscnt, TRANSIENT);
      44           1 :         maxquery = COLnew(0, TYPE_str, usrstatscnt, TRANSIENT);
      45           1 :         if (user == NULL || querycount == NULL || totalticks == NULL || started == NULL || finished == NULL || maxquery == NULL || maxticks == NULL){
      46           0 :                 BBPreclaim(user);
      47           0 :                 BBPreclaim(started);
      48           0 :                 BBPreclaim(querycount);
      49           0 :                 BBPreclaim(totalticks);
      50           0 :                 BBPreclaim(finished);
      51           0 :                 BBPreclaim(maxticks);
      52           0 :                 BBPreclaim(maxquery);
      53           0 :                 throw(MAL, "SYSMONstatistics", SQLSTATE(HY013) MAL_MALLOC_FAIL);
      54             :         }
      55             : 
      56           1 :         MT_lock_set(&mal_delayLock);
      57             :         // FIXME: what if there are multiple users with ADMIN privilege?
      58           6 :         for (i = 0 && cntxt->user == MAL_ADMIN; i < usrstatscnt; i++) {
      59             :                 /* We can stop at the first empty entry */
      60           6 :                 if (USRstats[i].username == NULL) break;
      61             : 
      62           5 :                 if (BUNappend(user, USRstats[i].username, false) != GDK_SUCCEED) {
      63           0 :                         msg = createException(MAL, "SYSMONstatistics", "Failed to append 'user'");
      64           0 :                         goto bailout;
      65             :                 }
      66           5 :                 if (BUNappend(querycount, &USRstats[i].querycount, false) != GDK_SUCCEED){
      67           0 :                         msg = createException(MAL, "SYSMONstatistics", "Failed to append 'querycount'");
      68           0 :                         goto bailout;
      69             :                 }
      70           5 :                 if (BUNappend(totalticks, &USRstats[i].totalticks, false) != GDK_SUCCEED){
      71           0 :                         msg = createException(MAL, "SYSMONstatistics", "Failed to append 'totalticks'");
      72           0 :                         goto bailout;
      73             :                 }
      74             :                 /* convert number of seconds into a timestamp */
      75           5 :                 if (USRstats[i].maxquery != 0){
      76           5 :                         tsn = timestamp_fromtime(USRstats[i].started);
      77           5 :                         if (is_timestamp_nil(tsn)) {
      78           0 :                                 msg = createException(MAL, "SYSMONstatistics", SQLSTATE(22003) "failed to convert start time");
      79           0 :                                 goto bailout;
      80             :                         }
      81           5 :                         if (BUNappend(started, &tsn, false) != GDK_SUCCEED){
      82           0 :                                 msg = createException(MAL, "SYSMONstatistics", "Failed to append 'started'");
      83           0 :                                 goto bailout;
      84             :                         }
      85             : 
      86           5 :                         if (USRstats[i].finished == 0) {
      87           0 :                                 tsn = timestamp_nil;
      88             :                         } else {
      89           5 :                                 tsn = timestamp_fromtime(USRstats[i].finished);
      90           5 :                                 if (is_timestamp_nil(tsn)) {
      91           0 :                                         msg = createException(MAL, "SYSMONstatistics", SQLSTATE(22003) "failed to convert finish time");
      92           0 :                                         goto bailout;
      93             :                                 }
      94             :                         }
      95           5 :                         if (BUNappend(finished, &tsn, false) != GDK_SUCCEED){
      96           0 :                                 msg = createException(MAL, "SYSMONstatistics", "Failed to append 'finished'");
      97           0 :                                 goto bailout;
      98             :                         }
      99             :                 } else {
     100           0 :                         tsn = timestamp_nil;
     101           0 :                         if (BUNappend(started, &tsn, false) != GDK_SUCCEED){
     102           0 :                                 msg = createException(MAL, "SYSMONstatistics", "Failed to append 'started'");
     103           0 :                                 goto bailout;
     104             :                         }
     105           0 :                         if (BUNappend(finished, &tsn, false) != GDK_SUCCEED){
     106           0 :                                 msg = createException(MAL, "SYSMONstatistics", "Failed to append 'finished'");
     107           0 :                                 goto bailout;
     108             :                         }
     109             :                 }
     110             : 
     111           5 :                 if (BUNappend(maxticks, &USRstats[i].maxticks, false) != GDK_SUCCEED){
     112           0 :                         msg = createException(MAL, "SYSMONstatistics", "Failed to append 'maxticks'");
     113           0 :                         goto bailout;
     114             :                 }
     115           5 :                 if( USRstats[i].maxquery == 0){
     116           0 :                         if (BUNappend(maxquery, "none", false) != GDK_SUCCEED ){
     117           0 :                                 msg = createException(MAL, "SYSMONstatistics", "Failed to append 'maxquery' 1");
     118           0 :                                 goto bailout;
     119             :                         }
     120             :                 }else {
     121           5 :                         if (BUNappend(maxquery, USRstats[i].maxquery, false) != GDK_SUCCEED ){
     122           0 :                                 msg = createException(MAL, "SYSMONstatistics", "Failed to append 'maxquery' 2");
     123           0 :                                 goto bailout;
     124             :                         }
     125             :                 }
     126             :         }
     127           1 :         MT_lock_unset(&mal_delayLock);
     128           1 :         BBPkeepref(*u = user->batCacheid);
     129           1 :         BBPkeepref(*c = querycount->batCacheid);
     130           1 :         BBPkeepref(*t = totalticks->batCacheid);
     131           1 :         BBPkeepref(*s = started->batCacheid);
     132           1 :         BBPkeepref(*f = finished->batCacheid);
     133           1 :         BBPkeepref(*m = maxticks->batCacheid);
     134           1 :         BBPkeepref(*q = maxquery->batCacheid);
     135           1 :         return MAL_SUCCEED;
     136             : 
     137           0 : bailout:
     138           0 :         MT_lock_unset(&mal_delayLock);
     139           0 :         BBPunfix(user->batCacheid);
     140           0 :         BBPunfix(querycount->batCacheid);
     141           0 :         BBPunfix(totalticks->batCacheid);
     142           0 :         BBPunfix(started->batCacheid);
     143           0 :         BBPunfix(finished->batCacheid);
     144           0 :         BBPunfix(maxticks->batCacheid);
     145           0 :         BBPunfix(maxquery->batCacheid);
     146           0 :         return msg;
     147             : }
     148             : 
     149             : static str
     150          27 : SYSMONqueue(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
     151             : {
     152             :         BAT *tag, *sessionid, *user, *started, *status, *query, *finished, *workers, *memory;
     153          27 :         bat *t = getArgReference_bat(stk,pci,0);
     154          27 :         bat *s = getArgReference_bat(stk,pci,1);
     155          27 :         bat *u = getArgReference_bat(stk,pci,2);
     156          27 :         bat *sd = getArgReference_bat(stk,pci,3);
     157          27 :         bat *ss = getArgReference_bat(stk,pci,4);
     158          27 :         bat *q = getArgReference_bat(stk,pci,5);
     159          27 :         bat *f = getArgReference_bat(stk,pci,6);
     160          27 :         bat *w = getArgReference_bat(stk,pci,7);
     161          27 :         bat *m = getArgReference_bat(stk,pci,8);
     162             :         lng qtag;
     163             :         int wrk, mem;
     164             :         BUN sz;
     165             :         timestamp tsn;
     166             :         str msg = MAL_SUCCEED;
     167             : 
     168             :         (void) mb;
     169          27 :         sz = (BUN) qsize;       // reserve space for all tuples in QRYqueue
     170          27 :         tag = COLnew(0, TYPE_lng, sz, TRANSIENT);
     171          27 :         sessionid = COLnew(0, TYPE_int, sz, TRANSIENT);
     172          27 :         user = COLnew(0, TYPE_str, sz, TRANSIENT);
     173          27 :         started = COLnew(0, TYPE_timestamp, sz, TRANSIENT);
     174          27 :         status = COLnew(0, TYPE_str, sz, TRANSIENT);
     175          27 :         query = COLnew(0, TYPE_str, sz, TRANSIENT);
     176          27 :         finished = COLnew(0, TYPE_timestamp, sz, TRANSIENT);
     177          27 :         workers = COLnew(0, TYPE_int, sz, TRANSIENT);
     178          27 :         memory = COLnew(0, TYPE_int, sz, TRANSIENT);
     179          27 :         if ( tag == NULL || sessionid == NULL || user == NULL || query == NULL || started == NULL || finished == NULL || workers == NULL || memory == NULL){
     180           0 :                 BBPreclaim(tag);
     181           0 :                 BBPreclaim(sessionid);
     182           0 :                 BBPreclaim(user);
     183           0 :                 BBPreclaim(started);
     184           0 :                 BBPreclaim(status);
     185           0 :                 BBPreclaim(query);
     186           0 :                 BBPreclaim(finished);
     187           0 :                 BBPreclaim(workers);
     188           0 :                 BBPreclaim(memory);
     189           0 :                 throw(MAL, "SYSMONqueue", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     190             :         }
     191             : 
     192          27 :         MT_lock_set(&mal_delayLock);
     193        1448 :         for (size_t i = qtail; i != qhead; i++){
     194        1423 :                 if ( i == qsize){
     195             :                         i = 0;
     196          25 :                         if( i == qhead)
     197             :                                 break;
     198             :                 }
     199        1421 :                 if( QRYqueue[i].query && (cntxt->user == MAL_ADMIN ||
     200          14 :                                         strcmp(cntxt->username, QRYqueue[i].username) == 0) ){
     201        1329 :                         qtag = (lng) QRYqueue[i].tag;
     202        1329 :                         if (BUNappend(tag, &qtag, false) != GDK_SUCCEED)
     203           0 :                                 goto bailout;
     204             : 
     205        1329 :                         if (BUNappend(user, QRYqueue[i].username, false) != GDK_SUCCEED) {
     206           0 :                                 goto bailout;
     207             :                         }
     208             : 
     209        1329 :                         if (BUNappend(sessionid, &(QRYqueue[i].idx), false) != GDK_SUCCEED) {
     210           0 :                                 goto bailout;
     211             :                         }
     212             : 
     213        2658 :                         if (BUNappend(query, QRYqueue[i].query, false) != GDK_SUCCEED ||
     214        1329 :                                 BUNappend(status, QRYqueue[i].status, false) != GDK_SUCCEED)
     215           0 :                                 goto bailout;
     216             : 
     217             :                         /* convert number of seconds into a timestamp */
     218        1329 :                         tsn = timestamp_fromtime(QRYqueue[i].start);
     219        1329 :                         if (is_timestamp_nil(tsn)) {
     220           0 :                                 msg = createException(MAL, "SYSMONqueue", SQLSTATE(22003) "cannot convert time");
     221           0 :                                 goto bailout;
     222             :                         }
     223        1329 :                         if (BUNappend(started, &tsn, false) != GDK_SUCCEED)
     224           0 :                                 goto bailout;
     225             : 
     226        1329 :                         if (QRYqueue[i].finished == 0) {
     227          30 :                                 tsn = timestamp_nil;
     228             :                         } else {
     229        1299 :                                 tsn = timestamp_fromtime(QRYqueue[i].finished);
     230        1299 :                                 if (is_timestamp_nil(tsn)) {
     231           0 :                                         msg = createException(MAL, "SYSMONqueue", SQLSTATE(22003) "cannot convert time");
     232           0 :                                         goto bailout;
     233             :                                 }
     234             :                         }
     235        1329 :                         if (BUNappend(finished, &tsn, false) != GDK_SUCCEED)
     236           0 :                                 goto bailout;
     237             : 
     238        1329 :                         if( QRYqueue[i].mb)
     239          30 :                                 wrk = QRYqueue[i].mb->workers;
     240             :                         else
     241        1299 :                                 wrk = QRYqueue[i].workers;
     242        1329 :                         if( QRYqueue[i].mb)
     243          30 :                                 mem = (int)(1 + QRYqueue[i].mb->memory / LL_CONSTANT(1048576));
     244             :                         else
     245        1299 :                                 mem = (int)QRYqueue[i].memory;
     246        2658 :                         if ( BUNappend(workers, &wrk, false) != GDK_SUCCEED ||
     247        1329 :                                  BUNappend(memory, &mem, false) != GDK_SUCCEED)
     248           0 :                                 goto bailout;
     249             :                 }
     250             :         }
     251          27 :         MT_lock_unset(&mal_delayLock);
     252          27 :         BBPkeepref( *t =tag->batCacheid);
     253          27 :         BBPkeepref( *s =sessionid->batCacheid);
     254          27 :         BBPkeepref( *u =user->batCacheid);
     255          27 :         BBPkeepref( *sd =started->batCacheid);
     256          27 :         BBPkeepref( *ss =status->batCacheid);
     257          27 :         BBPkeepref( *q =query->batCacheid);
     258          27 :         BBPkeepref( *f =finished->batCacheid);
     259          27 :         BBPkeepref( *w =workers->batCacheid);
     260          27 :         BBPkeepref( *m =memory->batCacheid);
     261          27 :         return MAL_SUCCEED;
     262             : 
     263           0 :   bailout:
     264           0 :         MT_lock_unset(&mal_delayLock);
     265           0 :         BBPunfix(tag->batCacheid);
     266           0 :         BBPunfix(sessionid->batCacheid);
     267           0 :         BBPunfix(user->batCacheid);
     268           0 :         BBPunfix(started->batCacheid);
     269           0 :         BBPunfix(status->batCacheid);
     270           0 :         BBPunfix(query->batCacheid);
     271           0 :         BBPunfix(finished->batCacheid);
     272           0 :         BBPunfix(workers->batCacheid);
     273           0 :         BBPunfix(memory->batCacheid);
     274           0 :         return msg ? msg : createException(MAL, "SYSMONqueue", SQLSTATE(HY013) MAL_MALLOC_FAIL);
     275             : }
     276             : 
     277             : static str
     278           0 : SYSMONpause(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
     279             : {
     280             :         bool set = false;
     281             :         lng tag = 0;
     282             : 
     283           0 :         switch(getArgType(mb,pci,1)){
     284           0 :         case TYPE_bte: tag = *getArgReference_bte(stk,pci,1); break;
     285           0 :         case TYPE_sht: tag = *getArgReference_sht(stk,pci,1); break;
     286           0 :         case TYPE_int: tag = *getArgReference_int(stk,pci,1); break;
     287           0 :         case TYPE_lng: tag = *getArgReference_lng(stk,pci,1); break;
     288           0 :         default:
     289           0 :                 throw(MAL, "SYSMONpause", SQLSTATE(42000) "SYSMONpause requires a 64-bit integer");
     290             :         }
     291           0 :         if (tag < 1)
     292           0 :                 throw(MAL, "SYSMONpause", SQLSTATE(42000) "Tag must be positive");
     293           0 :         MT_lock_set(&mal_delayLock);
     294           0 :         for (size_t i = qtail; i != qhead; i++){
     295           0 :                 if( i == qsize){
     296             :                         i = 0;
     297           0 :                         if( i == qhead)
     298             :                                 break;
     299             :                 }
     300           0 :                 if( (lng) QRYqueue[i].tag == tag && cntxt->user == MAL_ADMIN && QRYqueue[i].stk){
     301           0 :                         QRYqueue[i].stk->status = 'p';
     302           0 :                         QRYqueue[i].status = "paused";
     303             :                         set = true;
     304             :                 }
     305             :         }
     306           0 :         MT_lock_unset(&mal_delayLock);
     307           0 :         return set ? MAL_SUCCEED : createException(MAL, "SYSMONpause", SQLSTATE(42000) "Tag " LLFMT " unknown", tag);
     308             : }
     309             : 
     310             : static str
     311           0 : SYSMONresume(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
     312             : {
     313             :         bool set = false;
     314             :         lng tag = 0;
     315             : 
     316           0 :         switch(getArgType(mb,pci,1)){
     317           0 :         case TYPE_bte: tag = *getArgReference_bte(stk,pci,1); break;
     318           0 :         case TYPE_sht: tag = *getArgReference_sht(stk,pci,1); break;
     319           0 :         case TYPE_int: tag = *getArgReference_int(stk,pci,1); break;
     320           0 :         case TYPE_lng: tag = *getArgReference_lng(stk,pci,1); break;
     321           0 :         default:
     322           0 :                 throw(MAL, "SYSMONresume", SQLSTATE(42000) "SYSMONresume requires a 64-bit integer");
     323             :         }
     324           0 :         if (tag < 1)
     325           0 :                 throw(MAL, "SYSMONresume", SQLSTATE(42000) "Tag must be positive");
     326           0 :         MT_lock_set(&mal_delayLock);
     327           0 :         for (size_t i = qtail; i == qhead; i++){
     328           0 :                 if( i == qsize){
     329             :                         i = 0;
     330           0 :                         if ( i== qhead)
     331             :                                 break;
     332             :                 }
     333           0 :                 if( (lng)QRYqueue[i].tag == tag && cntxt->user == MAL_ADMIN && QRYqueue[i].stk){
     334           0 :                         QRYqueue[i].stk->status = 0;
     335           0 :                         QRYqueue[i].status = "running";
     336             :                         set = true;
     337             :                 }
     338             :         }
     339           0 :         MT_lock_unset(&mal_delayLock);
     340           0 :         return set ? MAL_SUCCEED : createException(MAL, "SYSMONresume", SQLSTATE(42000) "Tag " LLFMT " unknown", tag);
     341             : }
     342             : 
     343             : static str
     344           0 : SYSMONstop(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
     345             : {
     346             :         bool set = false;
     347             :         lng tag = 0;
     348             : 
     349           0 :         switch(getArgType(mb,pci,1)){
     350           0 :         case TYPE_bte: tag = *getArgReference_bte(stk,pci,1); break;
     351           0 :         case TYPE_sht: tag = *getArgReference_sht(stk,pci,1); break;
     352           0 :         case TYPE_int: tag = *getArgReference_int(stk,pci,1); break;
     353           0 :         case TYPE_lng: tag = *getArgReference_lng(stk,pci,1); break;
     354           0 :         default:
     355           0 :                 throw(MAL, "SYSMONstop", SQLSTATE(42000) "SYSMONstop requires a 64-bit integer");
     356             :         }
     357           0 :         if (tag < 1)
     358           0 :                 throw(MAL, "SYSMONstop", SQLSTATE(42000) "Tag must be positive");
     359           0 :         MT_lock_set(&mal_delayLock);
     360           0 :         for (size_t i = qtail; i != qhead; i++){
     361           0 :                 if( i == qsize){
     362             :                         i = 0;
     363           0 :                         if( i == qhead)
     364             :                                 break;
     365             :                 }
     366           0 :                 if( (lng) QRYqueue[i].tag == tag && cntxt->user == MAL_ADMIN && QRYqueue[i].stk){
     367           0 :                         QRYqueue[i].stk->status = 'q';
     368           0 :                         QRYqueue[i].status = "stopping";
     369             :                         set = true;
     370             :                 }
     371             :         }
     372           0 :         MT_lock_unset(&mal_delayLock);
     373           0 :         return set ? MAL_SUCCEED : createException(MAL, "SYSMONstop", SQLSTATE(42000) "Tag " LLFMT " unknown", tag);
     374             : }
     375             : 
     376             : #include "mel.h"
     377             : mel_func sysmon_init_funcs[] = {
     378             :  pattern("sysmon", "pause", SYSMONpause, false, "Suspend a running query", args(0,1, arg("id",sht))),
     379             :  pattern("sysmon", "pause", SYSMONpause, false, "Suspend a running query", args(0,1, arg("id",int))),
     380             :  pattern("sysmon", "pause", SYSMONpause, false, "Suspend a running query", args(0,1, arg("id",lng))),
     381             :  pattern("sysmon", "resume", SYSMONresume, false, "Resume processing of a query ", args(0,1, arg("id",sht))),
     382             :  pattern("sysmon", "resume", SYSMONresume, false, "Resume processing of a query ", args(0,1, arg("id",int))),
     383             :  pattern("sysmon", "resume", SYSMONresume, false, "Resume processing of a query ", args(0,1, arg("id",lng))),
     384             :  pattern("sysmon", "stop", SYSMONstop, false, "Stop a single query a.s.a.p.", args(0,1, arg("id",sht))),
     385             :  pattern("sysmon", "stop", SYSMONstop, false, "Stop a single query a.s.a.p.", args(0,1, arg("id",int))),
     386             :  pattern("sysmon", "stop", SYSMONstop, false, "Stop a single query a.s.a.p.", args(0,1, arg("id",lng))),
     387             :  pattern("sysmon", "queue", SYSMONqueue, false, "A queue of queries that are currently being executed or recently finished", args(9,9, batarg("tag",lng),batarg("sessionid",int),batarg("user",str),batarg("started",timestamp),batarg("status",str),batarg("query",str),batarg("finished",timestamp),batarg("workers",int),batarg("memory",int))),
     388             :  pattern("sysmon", "user_statistics", SYSMONstatistics, false, "", args(7,7, batarg("user",str),batarg("querycount",lng),batarg("totalticks",lng),batarg("started",timestamp),batarg("finished",timestamp),batarg("maxticks",lng),batarg("maxquery",str))),
     389             :  { .imp=NULL }
     390             : };
     391             : #include "mal_import.h"
     392             : #ifdef _MSC_VER
     393             : #undef read
     394             : #pragma section(".CRT$XCU",read)
     395             : #endif
     396         257 : LIB_STARTUP_FUNC(init_sysmon_mal)
     397         257 : { mal_module("sysmon", NULL, sysmon_init_funcs); }

Generated by: LCOV version 1.14