Line data Source code
1 : /*
2 : * This Source Code Form is subject to the terms of the Mozilla Public
3 : * License, v. 2.0. If a copy of the MPL was not distributed with this
4 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
5 : *
6 : * Copyright 1997 - July 2008 CWI, August 2008 - 2021 MonetDB B.V.
7 : */
8 :
9 : #include "monetdb_config.h"
10 : #include "mal.h"
11 : #include "mal_interpreter.h"
12 : #include "mal_authorize.h"
13 : #include "mal_client.h"
14 : #include "mal_runtime.h"
15 : #include "gdk_time.h"
16 : #include "mal_exception.h"
17 :
18 : /* (c) M.L. Kersten
19 : * The queries currently in execution are returned to the front-end for managing expensive ones.
20 : */
21 :
22 : static str
23 1 : SYSMONstatistics(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
24 : {
25 : BAT *user, *querycount, *totalticks, *started, *finished, *maxquery, *maxticks;
26 1 : bat *u = getArgReference_bat(stk,pci,0);
27 1 : bat *c = getArgReference_bat(stk,pci,1);
28 1 : bat *t = getArgReference_bat(stk,pci,2);
29 1 : bat *s = getArgReference_bat(stk,pci,3);
30 1 : bat *f = getArgReference_bat(stk,pci,4);
31 1 : bat *m = getArgReference_bat(stk,pci,5);
32 1 : bat *q = getArgReference_bat(stk,pci,6);
33 : size_t i;
34 1 : timestamp tsn = timestamp_nil;
35 : str msg = MAL_SUCCEED;
36 :
37 : (void) mb;
38 1 : user = COLnew(0, TYPE_str, usrstatscnt, TRANSIENT);
39 1 : querycount = COLnew(0, TYPE_lng, usrstatscnt, TRANSIENT);
40 1 : totalticks = COLnew(0, TYPE_lng, usrstatscnt, TRANSIENT);
41 1 : started = COLnew(0, TYPE_timestamp, usrstatscnt, TRANSIENT);
42 1 : finished = COLnew(0, TYPE_timestamp, usrstatscnt, TRANSIENT);
43 1 : maxticks = COLnew(0, TYPE_lng, usrstatscnt, TRANSIENT);
44 1 : maxquery = COLnew(0, TYPE_str, usrstatscnt, TRANSIENT);
45 1 : if (user == NULL || querycount == NULL || totalticks == NULL || started == NULL || finished == NULL || maxquery == NULL || maxticks == NULL){
46 0 : BBPreclaim(user);
47 0 : BBPreclaim(started);
48 0 : BBPreclaim(querycount);
49 0 : BBPreclaim(totalticks);
50 0 : BBPreclaim(finished);
51 0 : BBPreclaim(maxticks);
52 0 : BBPreclaim(maxquery);
53 0 : throw(MAL, "SYSMONstatistics", SQLSTATE(HY013) MAL_MALLOC_FAIL);
54 : }
55 :
56 1 : MT_lock_set(&mal_delayLock);
57 : // FIXME: what if there are multiple users with ADMIN privilege?
58 6 : for (i = 0 && cntxt->user == MAL_ADMIN; i < usrstatscnt; i++) {
59 : /* We can stop at the first empty entry */
60 6 : if (USRstats[i].username == NULL) break;
61 :
62 5 : if (BUNappend(user, USRstats[i].username, false) != GDK_SUCCEED) {
63 0 : msg = createException(MAL, "SYSMONstatistics", "Failed to append 'user'");
64 0 : goto bailout;
65 : }
66 5 : if (BUNappend(querycount, &USRstats[i].querycount, false) != GDK_SUCCEED){
67 0 : msg = createException(MAL, "SYSMONstatistics", "Failed to append 'querycount'");
68 0 : goto bailout;
69 : }
70 5 : if (BUNappend(totalticks, &USRstats[i].totalticks, false) != GDK_SUCCEED){
71 0 : msg = createException(MAL, "SYSMONstatistics", "Failed to append 'totalticks'");
72 0 : goto bailout;
73 : }
74 : /* convert number of seconds into a timestamp */
75 5 : if (USRstats[i].maxquery != 0){
76 5 : tsn = timestamp_fromtime(USRstats[i].started);
77 5 : if (is_timestamp_nil(tsn)) {
78 0 : msg = createException(MAL, "SYSMONstatistics", SQLSTATE(22003) "failed to convert start time");
79 0 : goto bailout;
80 : }
81 5 : if (BUNappend(started, &tsn, false) != GDK_SUCCEED){
82 0 : msg = createException(MAL, "SYSMONstatistics", "Failed to append 'started'");
83 0 : goto bailout;
84 : }
85 :
86 5 : if (USRstats[i].finished == 0) {
87 0 : tsn = timestamp_nil;
88 : } else {
89 5 : tsn = timestamp_fromtime(USRstats[i].finished);
90 5 : if (is_timestamp_nil(tsn)) {
91 0 : msg = createException(MAL, "SYSMONstatistics", SQLSTATE(22003) "failed to convert finish time");
92 0 : goto bailout;
93 : }
94 : }
95 5 : if (BUNappend(finished, &tsn, false) != GDK_SUCCEED){
96 0 : msg = createException(MAL, "SYSMONstatistics", "Failed to append 'finished'");
97 0 : goto bailout;
98 : }
99 : } else {
100 0 : tsn = timestamp_nil;
101 0 : if (BUNappend(started, &tsn, false) != GDK_SUCCEED){
102 0 : msg = createException(MAL, "SYSMONstatistics", "Failed to append 'started'");
103 0 : goto bailout;
104 : }
105 0 : if (BUNappend(finished, &tsn, false) != GDK_SUCCEED){
106 0 : msg = createException(MAL, "SYSMONstatistics", "Failed to append 'finished'");
107 0 : goto bailout;
108 : }
109 : }
110 :
111 5 : if (BUNappend(maxticks, &USRstats[i].maxticks, false) != GDK_SUCCEED){
112 0 : msg = createException(MAL, "SYSMONstatistics", "Failed to append 'maxticks'");
113 0 : goto bailout;
114 : }
115 5 : if( USRstats[i].maxquery == 0){
116 0 : if (BUNappend(maxquery, "none", false) != GDK_SUCCEED ){
117 0 : msg = createException(MAL, "SYSMONstatistics", "Failed to append 'maxquery' 1");
118 0 : goto bailout;
119 : }
120 : }else {
121 5 : if (BUNappend(maxquery, USRstats[i].maxquery, false) != GDK_SUCCEED ){
122 0 : msg = createException(MAL, "SYSMONstatistics", "Failed to append 'maxquery' 2");
123 0 : goto bailout;
124 : }
125 : }
126 : }
127 1 : MT_lock_unset(&mal_delayLock);
128 1 : BBPkeepref(*u = user->batCacheid);
129 1 : BBPkeepref(*c = querycount->batCacheid);
130 1 : BBPkeepref(*t = totalticks->batCacheid);
131 1 : BBPkeepref(*s = started->batCacheid);
132 1 : BBPkeepref(*f = finished->batCacheid);
133 1 : BBPkeepref(*m = maxticks->batCacheid);
134 1 : BBPkeepref(*q = maxquery->batCacheid);
135 1 : return MAL_SUCCEED;
136 :
137 0 : bailout:
138 0 : MT_lock_unset(&mal_delayLock);
139 0 : BBPunfix(user->batCacheid);
140 0 : BBPunfix(querycount->batCacheid);
141 0 : BBPunfix(totalticks->batCacheid);
142 0 : BBPunfix(started->batCacheid);
143 0 : BBPunfix(finished->batCacheid);
144 0 : BBPunfix(maxticks->batCacheid);
145 0 : BBPunfix(maxquery->batCacheid);
146 0 : return msg;
147 : }
148 :
149 : static str
150 27 : SYSMONqueue(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
151 : {
152 : BAT *tag, *sessionid, *user, *started, *status, *query, *finished, *workers, *memory;
153 27 : bat *t = getArgReference_bat(stk,pci,0);
154 27 : bat *s = getArgReference_bat(stk,pci,1);
155 27 : bat *u = getArgReference_bat(stk,pci,2);
156 27 : bat *sd = getArgReference_bat(stk,pci,3);
157 27 : bat *ss = getArgReference_bat(stk,pci,4);
158 27 : bat *q = getArgReference_bat(stk,pci,5);
159 27 : bat *f = getArgReference_bat(stk,pci,6);
160 27 : bat *w = getArgReference_bat(stk,pci,7);
161 27 : bat *m = getArgReference_bat(stk,pci,8);
162 : lng qtag;
163 : int wrk, mem;
164 : BUN sz;
165 : timestamp tsn;
166 : str msg = MAL_SUCCEED;
167 :
168 : (void) mb;
169 27 : sz = (BUN) qsize; // reserve space for all tuples in QRYqueue
170 27 : tag = COLnew(0, TYPE_lng, sz, TRANSIENT);
171 27 : sessionid = COLnew(0, TYPE_int, sz, TRANSIENT);
172 27 : user = COLnew(0, TYPE_str, sz, TRANSIENT);
173 27 : started = COLnew(0, TYPE_timestamp, sz, TRANSIENT);
174 27 : status = COLnew(0, TYPE_str, sz, TRANSIENT);
175 27 : query = COLnew(0, TYPE_str, sz, TRANSIENT);
176 27 : finished = COLnew(0, TYPE_timestamp, sz, TRANSIENT);
177 27 : workers = COLnew(0, TYPE_int, sz, TRANSIENT);
178 27 : memory = COLnew(0, TYPE_int, sz, TRANSIENT);
179 27 : if ( tag == NULL || sessionid == NULL || user == NULL || query == NULL || started == NULL || finished == NULL || workers == NULL || memory == NULL){
180 0 : BBPreclaim(tag);
181 0 : BBPreclaim(sessionid);
182 0 : BBPreclaim(user);
183 0 : BBPreclaim(started);
184 0 : BBPreclaim(status);
185 0 : BBPreclaim(query);
186 0 : BBPreclaim(finished);
187 0 : BBPreclaim(workers);
188 0 : BBPreclaim(memory);
189 0 : throw(MAL, "SYSMONqueue", SQLSTATE(HY013) MAL_MALLOC_FAIL);
190 : }
191 :
192 27 : MT_lock_set(&mal_delayLock);
193 1448 : for (size_t i = qtail; i != qhead; i++){
194 1423 : if ( i == qsize){
195 : i = 0;
196 25 : if( i == qhead)
197 : break;
198 : }
199 1421 : if( QRYqueue[i].query && (cntxt->user == MAL_ADMIN ||
200 14 : strcmp(cntxt->username, QRYqueue[i].username) == 0) ){
201 1329 : qtag = (lng) QRYqueue[i].tag;
202 1329 : if (BUNappend(tag, &qtag, false) != GDK_SUCCEED)
203 0 : goto bailout;
204 :
205 1329 : if (BUNappend(user, QRYqueue[i].username, false) != GDK_SUCCEED) {
206 0 : goto bailout;
207 : }
208 :
209 1329 : if (BUNappend(sessionid, &(QRYqueue[i].idx), false) != GDK_SUCCEED) {
210 0 : goto bailout;
211 : }
212 :
213 2658 : if (BUNappend(query, QRYqueue[i].query, false) != GDK_SUCCEED ||
214 1329 : BUNappend(status, QRYqueue[i].status, false) != GDK_SUCCEED)
215 0 : goto bailout;
216 :
217 : /* convert number of seconds into a timestamp */
218 1329 : tsn = timestamp_fromtime(QRYqueue[i].start);
219 1329 : if (is_timestamp_nil(tsn)) {
220 0 : msg = createException(MAL, "SYSMONqueue", SQLSTATE(22003) "cannot convert time");
221 0 : goto bailout;
222 : }
223 1329 : if (BUNappend(started, &tsn, false) != GDK_SUCCEED)
224 0 : goto bailout;
225 :
226 1329 : if (QRYqueue[i].finished == 0) {
227 30 : tsn = timestamp_nil;
228 : } else {
229 1299 : tsn = timestamp_fromtime(QRYqueue[i].finished);
230 1299 : if (is_timestamp_nil(tsn)) {
231 0 : msg = createException(MAL, "SYSMONqueue", SQLSTATE(22003) "cannot convert time");
232 0 : goto bailout;
233 : }
234 : }
235 1329 : if (BUNappend(finished, &tsn, false) != GDK_SUCCEED)
236 0 : goto bailout;
237 :
238 1329 : if( QRYqueue[i].mb)
239 30 : wrk = QRYqueue[i].mb->workers;
240 : else
241 1299 : wrk = QRYqueue[i].workers;
242 1329 : if( QRYqueue[i].mb)
243 30 : mem = (int)(1 + QRYqueue[i].mb->memory / LL_CONSTANT(1048576));
244 : else
245 1299 : mem = (int)QRYqueue[i].memory;
246 2658 : if ( BUNappend(workers, &wrk, false) != GDK_SUCCEED ||
247 1329 : BUNappend(memory, &mem, false) != GDK_SUCCEED)
248 0 : goto bailout;
249 : }
250 : }
251 27 : MT_lock_unset(&mal_delayLock);
252 27 : BBPkeepref( *t =tag->batCacheid);
253 27 : BBPkeepref( *s =sessionid->batCacheid);
254 27 : BBPkeepref( *u =user->batCacheid);
255 27 : BBPkeepref( *sd =started->batCacheid);
256 27 : BBPkeepref( *ss =status->batCacheid);
257 27 : BBPkeepref( *q =query->batCacheid);
258 27 : BBPkeepref( *f =finished->batCacheid);
259 27 : BBPkeepref( *w =workers->batCacheid);
260 27 : BBPkeepref( *m =memory->batCacheid);
261 27 : return MAL_SUCCEED;
262 :
263 0 : bailout:
264 0 : MT_lock_unset(&mal_delayLock);
265 0 : BBPunfix(tag->batCacheid);
266 0 : BBPunfix(sessionid->batCacheid);
267 0 : BBPunfix(user->batCacheid);
268 0 : BBPunfix(started->batCacheid);
269 0 : BBPunfix(status->batCacheid);
270 0 : BBPunfix(query->batCacheid);
271 0 : BBPunfix(finished->batCacheid);
272 0 : BBPunfix(workers->batCacheid);
273 0 : BBPunfix(memory->batCacheid);
274 0 : return msg ? msg : createException(MAL, "SYSMONqueue", SQLSTATE(HY013) MAL_MALLOC_FAIL);
275 : }
276 :
277 : static str
278 0 : SYSMONpause(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
279 : {
280 : bool set = false;
281 : lng tag = 0;
282 :
283 0 : switch(getArgType(mb,pci,1)){
284 0 : case TYPE_bte: tag = *getArgReference_bte(stk,pci,1); break;
285 0 : case TYPE_sht: tag = *getArgReference_sht(stk,pci,1); break;
286 0 : case TYPE_int: tag = *getArgReference_int(stk,pci,1); break;
287 0 : case TYPE_lng: tag = *getArgReference_lng(stk,pci,1); break;
288 0 : default:
289 0 : throw(MAL, "SYSMONpause", SQLSTATE(42000) "SYSMONpause requires a 64-bit integer");
290 : }
291 0 : if (tag < 1)
292 0 : throw(MAL, "SYSMONpause", SQLSTATE(42000) "Tag must be positive");
293 0 : MT_lock_set(&mal_delayLock);
294 0 : for (size_t i = qtail; i != qhead; i++){
295 0 : if( i == qsize){
296 : i = 0;
297 0 : if( i == qhead)
298 : break;
299 : }
300 0 : if( (lng) QRYqueue[i].tag == tag && cntxt->user == MAL_ADMIN && QRYqueue[i].stk){
301 0 : QRYqueue[i].stk->status = 'p';
302 0 : QRYqueue[i].status = "paused";
303 : set = true;
304 : }
305 : }
306 0 : MT_lock_unset(&mal_delayLock);
307 0 : return set ? MAL_SUCCEED : createException(MAL, "SYSMONpause", SQLSTATE(42000) "Tag " LLFMT " unknown", tag);
308 : }
309 :
310 : static str
311 0 : SYSMONresume(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
312 : {
313 : bool set = false;
314 : lng tag = 0;
315 :
316 0 : switch(getArgType(mb,pci,1)){
317 0 : case TYPE_bte: tag = *getArgReference_bte(stk,pci,1); break;
318 0 : case TYPE_sht: tag = *getArgReference_sht(stk,pci,1); break;
319 0 : case TYPE_int: tag = *getArgReference_int(stk,pci,1); break;
320 0 : case TYPE_lng: tag = *getArgReference_lng(stk,pci,1); break;
321 0 : default:
322 0 : throw(MAL, "SYSMONresume", SQLSTATE(42000) "SYSMONresume requires a 64-bit integer");
323 : }
324 0 : if (tag < 1)
325 0 : throw(MAL, "SYSMONresume", SQLSTATE(42000) "Tag must be positive");
326 0 : MT_lock_set(&mal_delayLock);
327 0 : for (size_t i = qtail; i == qhead; i++){
328 0 : if( i == qsize){
329 : i = 0;
330 0 : if ( i== qhead)
331 : break;
332 : }
333 0 : if( (lng)QRYqueue[i].tag == tag && cntxt->user == MAL_ADMIN && QRYqueue[i].stk){
334 0 : QRYqueue[i].stk->status = 0;
335 0 : QRYqueue[i].status = "running";
336 : set = true;
337 : }
338 : }
339 0 : MT_lock_unset(&mal_delayLock);
340 0 : return set ? MAL_SUCCEED : createException(MAL, "SYSMONresume", SQLSTATE(42000) "Tag " LLFMT " unknown", tag);
341 : }
342 :
343 : static str
344 0 : SYSMONstop(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
345 : {
346 : bool set = false;
347 : lng tag = 0;
348 :
349 0 : switch(getArgType(mb,pci,1)){
350 0 : case TYPE_bte: tag = *getArgReference_bte(stk,pci,1); break;
351 0 : case TYPE_sht: tag = *getArgReference_sht(stk,pci,1); break;
352 0 : case TYPE_int: tag = *getArgReference_int(stk,pci,1); break;
353 0 : case TYPE_lng: tag = *getArgReference_lng(stk,pci,1); break;
354 0 : default:
355 0 : throw(MAL, "SYSMONstop", SQLSTATE(42000) "SYSMONstop requires a 64-bit integer");
356 : }
357 0 : if (tag < 1)
358 0 : throw(MAL, "SYSMONstop", SQLSTATE(42000) "Tag must be positive");
359 0 : MT_lock_set(&mal_delayLock);
360 0 : for (size_t i = qtail; i != qhead; i++){
361 0 : if( i == qsize){
362 : i = 0;
363 0 : if( i == qhead)
364 : break;
365 : }
366 0 : if( (lng) QRYqueue[i].tag == tag && cntxt->user == MAL_ADMIN && QRYqueue[i].stk){
367 0 : QRYqueue[i].stk->status = 'q';
368 0 : QRYqueue[i].status = "stopping";
369 : set = true;
370 : }
371 : }
372 0 : MT_lock_unset(&mal_delayLock);
373 0 : return set ? MAL_SUCCEED : createException(MAL, "SYSMONstop", SQLSTATE(42000) "Tag " LLFMT " unknown", tag);
374 : }
375 :
376 : #include "mel.h"
377 : mel_func sysmon_init_funcs[] = {
378 : pattern("sysmon", "pause", SYSMONpause, false, "Suspend a running query", args(0,1, arg("id",sht))),
379 : pattern("sysmon", "pause", SYSMONpause, false, "Suspend a running query", args(0,1, arg("id",int))),
380 : pattern("sysmon", "pause", SYSMONpause, false, "Suspend a running query", args(0,1, arg("id",lng))),
381 : pattern("sysmon", "resume", SYSMONresume, false, "Resume processing of a query ", args(0,1, arg("id",sht))),
382 : pattern("sysmon", "resume", SYSMONresume, false, "Resume processing of a query ", args(0,1, arg("id",int))),
383 : pattern("sysmon", "resume", SYSMONresume, false, "Resume processing of a query ", args(0,1, arg("id",lng))),
384 : pattern("sysmon", "stop", SYSMONstop, false, "Stop a single query a.s.a.p.", args(0,1, arg("id",sht))),
385 : pattern("sysmon", "stop", SYSMONstop, false, "Stop a single query a.s.a.p.", args(0,1, arg("id",int))),
386 : pattern("sysmon", "stop", SYSMONstop, false, "Stop a single query a.s.a.p.", args(0,1, arg("id",lng))),
387 : pattern("sysmon", "queue", SYSMONqueue, false, "A queue of queries that are currently being executed or recently finished", args(9,9, batarg("tag",lng),batarg("sessionid",int),batarg("user",str),batarg("started",timestamp),batarg("status",str),batarg("query",str),batarg("finished",timestamp),batarg("workers",int),batarg("memory",int))),
388 : pattern("sysmon", "user_statistics", SYSMONstatistics, false, "", args(7,7, batarg("user",str),batarg("querycount",lng),batarg("totalticks",lng),batarg("started",timestamp),batarg("finished",timestamp),batarg("maxticks",lng),batarg("maxquery",str))),
389 : { .imp=NULL }
390 : };
391 : #include "mal_import.h"
392 : #ifdef _MSC_VER
393 : #undef read
394 : #pragma section(".CRT$XCU",read)
395 : #endif
396 257 : LIB_STARTUP_FUNC(init_sysmon_mal)
397 257 : { mal_module("sysmon", NULL, sysmon_init_funcs); }
|