Line data Source code
1 : /*
2 : * This Source Code Form is subject to the terms of the Mozilla Public
3 : * License, v. 2.0. If a copy of the MPL was not distributed with this
4 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
5 : *
6 : * Copyright 1997 - July 2008 CWI, August 2008 - 2021 MonetDB B.V.
7 : */
8 :
9 : #include "monetdb_config.h"
10 : #include "mal.h"
11 : #include "mal_interpreter.h"
12 : #include "mal_authorize.h"
13 : #include "mal_client.h"
14 : #include "mal_runtime.h"
15 : #include "gdk_time.h"
16 :
17 : /* (c) M.L. Kersten
18 : * The queries currently in execution are returned to the front-end for managing expensive ones.
19 : */
20 :
21 : static str
22 2 : SYSMONstatistics(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
23 : {
24 : BAT *user, *querycount, *totalticks, *started, *finished, *maxquery, *maxticks;
25 2 : bat *u = getArgReference_bat(stk,pci,0);
26 2 : bat *c = getArgReference_bat(stk,pci,1);
27 2 : bat *t = getArgReference_bat(stk,pci,2);
28 2 : bat *s = getArgReference_bat(stk,pci,3);
29 2 : bat *f = getArgReference_bat(stk,pci,4);
30 2 : bat *m = getArgReference_bat(stk,pci,5);
31 2 : bat *q = getArgReference_bat(stk,pci,6);
32 : size_t i;
33 2 : timestamp tsn = timestamp_nil;
34 : str msg = MAL_SUCCEED;
35 :
36 : (void) mb;
37 2 : user = COLnew(0, TYPE_str, usrstatscnt, TRANSIENT);
38 2 : querycount = COLnew(0, TYPE_lng, usrstatscnt, TRANSIENT);
39 2 : totalticks = COLnew(0, TYPE_lng, usrstatscnt, TRANSIENT);
40 2 : started = COLnew(0, TYPE_timestamp, usrstatscnt, TRANSIENT);
41 2 : finished = COLnew(0, TYPE_timestamp, usrstatscnt, TRANSIENT);
42 2 : maxticks = COLnew(0, TYPE_lng, usrstatscnt, TRANSIENT);
43 2 : maxquery = COLnew(0, TYPE_str, usrstatscnt, TRANSIENT);
44 2 : if (user == NULL || querycount == NULL || totalticks == NULL || started == NULL || finished == NULL || maxquery == NULL || maxticks == NULL){
45 0 : BBPreclaim(user);
46 0 : BBPreclaim(started);
47 0 : BBPreclaim(querycount);
48 0 : BBPreclaim(totalticks);
49 0 : BBPreclaim(finished);
50 0 : BBPreclaim(maxticks);
51 0 : BBPreclaim(maxquery);
52 0 : throw(MAL, "SYSMONstatistics", SQLSTATE(HY013) MAL_MALLOC_FAIL);
53 : }
54 :
55 2 : MT_lock_set(&mal_delayLock);
56 : // FIXME: what if there are multiple users with ADMIN privilege?
57 11 : for (i = 0 && cntxt->user == MAL_ADMIN; i < usrstatscnt; i++) {
58 : /* We can stop at the first empty entry */
59 10 : if (USRstats[i].username == NULL) break;
60 :
61 9 : if (BUNappend(user, USRstats[i].username, false) != GDK_SUCCEED) {
62 0 : msg = createException(MAL, "SYSMONstatistics", "Failed to append 'user'");
63 0 : goto bailout;
64 : }
65 9 : if (BUNappend(querycount, &USRstats[i].querycount, false) != GDK_SUCCEED){
66 0 : msg = createException(MAL, "SYSMONstatistics", "Failed to append 'querycount'");
67 0 : goto bailout;
68 : }
69 9 : if (BUNappend(totalticks, &USRstats[i].totalticks, false) != GDK_SUCCEED){
70 0 : msg = createException(MAL, "SYSMONstatistics", "Failed to append 'totalticks'");
71 0 : goto bailout;
72 : }
73 : /* convert number of seconds into a timestamp */
74 9 : if (USRstats[i].maxquery != 0){
75 9 : tsn = timestamp_fromtime(USRstats[i].started);
76 9 : if (is_timestamp_nil(tsn)) {
77 0 : msg = createException(MAL, "SYSMONstatistics", SQLSTATE(22003) "failed to convert start time");
78 0 : goto bailout;
79 : }
80 9 : if (BUNappend(started, &tsn, false) != GDK_SUCCEED){
81 0 : msg = createException(MAL, "SYSMONstatistics", "Failed to append 'started'");
82 0 : goto bailout;
83 : }
84 :
85 9 : if (USRstats[i].finished == 0) {
86 0 : tsn = timestamp_nil;
87 : } else {
88 9 : tsn = timestamp_fromtime(USRstats[i].finished);
89 9 : if (is_timestamp_nil(tsn)) {
90 0 : msg = createException(MAL, "SYSMONstatistics", SQLSTATE(22003) "failed to convert finish time");
91 0 : goto bailout;
92 : }
93 : }
94 9 : if (BUNappend(finished, &tsn, false) != GDK_SUCCEED){
95 0 : msg = createException(MAL, "SYSMONstatistics", "Failed to append 'finished'");
96 0 : goto bailout;
97 : }
98 : } else {
99 0 : tsn = timestamp_nil;
100 0 : if (BUNappend(started, &tsn, false) != GDK_SUCCEED){
101 0 : msg = createException(MAL, "SYSMONstatistics", "Failed to append 'started'");
102 0 : goto bailout;
103 : }
104 0 : if (BUNappend(finished, &tsn, false) != GDK_SUCCEED){
105 0 : msg = createException(MAL, "SYSMONstatistics", "Failed to append 'finished'");
106 0 : goto bailout;
107 : }
108 : }
109 :
110 9 : if (BUNappend(maxticks, &USRstats[i].maxticks, false) != GDK_SUCCEED){
111 0 : msg = createException(MAL, "SYSMONstatistics", "Failed to append 'maxticks'");
112 0 : goto bailout;
113 : }
114 9 : if( USRstats[i].maxquery == 0){
115 0 : if (BUNappend(maxquery, "none", false) != GDK_SUCCEED ){
116 0 : msg = createException(MAL, "SYSMONstatistics", "Failed to append 'maxquery' 1");
117 0 : goto bailout;
118 : }
119 : }else {
120 9 : if (BUNappend(maxquery, USRstats[i].maxquery, false) != GDK_SUCCEED ){
121 0 : msg = createException(MAL, "SYSMONstatistics", "Failed to append 'maxquery' 2");
122 0 : goto bailout;
123 : }
124 : }
125 : }
126 2 : MT_lock_unset(&mal_delayLock);
127 2 : BBPkeepref(*u = user->batCacheid);
128 2 : BBPkeepref(*c = querycount->batCacheid);
129 2 : BBPkeepref(*t = totalticks->batCacheid);
130 2 : BBPkeepref(*s = started->batCacheid);
131 2 : BBPkeepref(*f = finished->batCacheid);
132 2 : BBPkeepref(*m = maxticks->batCacheid);
133 2 : BBPkeepref(*q = maxquery->batCacheid);
134 2 : return MAL_SUCCEED;
135 :
136 0 : bailout:
137 0 : MT_lock_unset(&mal_delayLock);
138 0 : BBPunfix(user->batCacheid);
139 0 : BBPunfix(querycount->batCacheid);
140 0 : BBPunfix(totalticks->batCacheid);
141 0 : BBPunfix(started->batCacheid);
142 0 : BBPunfix(finished->batCacheid);
143 0 : BBPunfix(maxticks->batCacheid);
144 0 : BBPunfix(maxquery->batCacheid);
145 0 : return msg;
146 : }
147 :
148 : static str
149 44 : SYSMONqueue(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
150 : {
151 : BAT *tag, *sessionid, *user, *started, *status, *query, *finished, *workers, *memory;
152 44 : bat *t = getArgReference_bat(stk,pci,0);
153 44 : bat *s = getArgReference_bat(stk,pci,1);
154 44 : bat *u = getArgReference_bat(stk,pci,2);
155 44 : bat *sd = getArgReference_bat(stk,pci,3);
156 44 : bat *ss = getArgReference_bat(stk,pci,4);
157 44 : bat *q = getArgReference_bat(stk,pci,5);
158 44 : bat *f = getArgReference_bat(stk,pci,6);
159 44 : bat *w = getArgReference_bat(stk,pci,7);
160 44 : bat *m = getArgReference_bat(stk,pci,8);
161 : lng qtag;
162 : int wrk, mem;
163 : BUN sz;
164 : timestamp tsn;
165 : str msg = MAL_SUCCEED;
166 :
167 : (void) mb;
168 44 : sz = (BUN) qsize; // reserve space for all tuples in QRYqueue
169 44 : tag = COLnew(0, TYPE_lng, sz, TRANSIENT);
170 44 : sessionid = COLnew(0, TYPE_int, sz, TRANSIENT);
171 44 : user = COLnew(0, TYPE_str, sz, TRANSIENT);
172 44 : started = COLnew(0, TYPE_timestamp, sz, TRANSIENT);
173 44 : status = COLnew(0, TYPE_str, sz, TRANSIENT);
174 44 : query = COLnew(0, TYPE_str, sz, TRANSIENT);
175 44 : finished = COLnew(0, TYPE_timestamp, sz, TRANSIENT);
176 44 : workers = COLnew(0, TYPE_int, sz, TRANSIENT);
177 44 : memory = COLnew(0, TYPE_int, sz, TRANSIENT);
178 44 : if ( tag == NULL || sessionid == NULL || user == NULL || query == NULL || started == NULL || finished == NULL || workers == NULL || memory == NULL){
179 0 : BBPreclaim(tag);
180 0 : BBPreclaim(sessionid);
181 0 : BBPreclaim(user);
182 0 : BBPreclaim(started);
183 0 : BBPreclaim(status);
184 0 : BBPreclaim(query);
185 0 : BBPreclaim(finished);
186 0 : BBPreclaim(workers);
187 0 : BBPreclaim(memory);
188 0 : throw(MAL, "SYSMONqueue", SQLSTATE(HY013) MAL_MALLOC_FAIL);
189 : }
190 :
191 44 : MT_lock_set(&mal_delayLock);
192 29652 : for (size_t i = qtail; i != qhead; i++){
193 29612 : if ( i == qsize){
194 : i = 0;
195 40 : if( i == qhead)
196 : break;
197 : }
198 29608 : if( QRYqueue[i].query && (cntxt->user == MAL_ADMIN ||
199 14 : strcmp(cntxt->username, QRYqueue[i].username) == 0) ){
200 29498 : qtag = (lng) QRYqueue[i].tag;
201 29498 : if (BUNappend(tag, &qtag, false) != GDK_SUCCEED)
202 0 : goto bailout;
203 :
204 29498 : if (BUNappend(user, QRYqueue[i].username, false) != GDK_SUCCEED) {
205 0 : goto bailout;
206 : }
207 :
208 29498 : if (BUNappend(sessionid, &(QRYqueue[i].idx), false) != GDK_SUCCEED) {
209 0 : goto bailout;
210 : }
211 :
212 58996 : if (BUNappend(query, QRYqueue[i].query, false) != GDK_SUCCEED ||
213 29498 : BUNappend(status, QRYqueue[i].status, false) != GDK_SUCCEED)
214 0 : goto bailout;
215 :
216 : /* convert number of seconds into a timestamp */
217 29498 : tsn = timestamp_fromtime(QRYqueue[i].start);
218 29498 : if (is_timestamp_nil(tsn)) {
219 0 : msg = createException(MAL, "SYSMONqueue", SQLSTATE(22003) "cannot convert time");
220 0 : goto bailout;
221 : }
222 29498 : if (BUNappend(started, &tsn, false) != GDK_SUCCEED)
223 0 : goto bailout;
224 :
225 29498 : if (QRYqueue[i].finished == 0) {
226 26437 : tsn = timestamp_nil;
227 : } else {
228 3061 : tsn = timestamp_fromtime(QRYqueue[i].finished);
229 3061 : if (is_timestamp_nil(tsn)) {
230 0 : msg = createException(MAL, "SYSMONqueue", SQLSTATE(22003) "cannot convert time");
231 0 : goto bailout;
232 : }
233 : }
234 29498 : if (BUNappend(finished, &tsn, false) != GDK_SUCCEED)
235 0 : goto bailout;
236 :
237 29498 : wrk = QRYqueue[i].workers;
238 29498 : mem = QRYqueue[i].memory;
239 58996 : if ( BUNappend(workers, &wrk, false) != GDK_SUCCEED ||
240 29498 : BUNappend(memory, &mem, false) != GDK_SUCCEED)
241 0 : goto bailout;
242 : }
243 : }
244 44 : MT_lock_unset(&mal_delayLock);
245 44 : BBPkeepref( *t =tag->batCacheid);
246 44 : BBPkeepref( *s =sessionid->batCacheid);
247 44 : BBPkeepref( *u =user->batCacheid);
248 44 : BBPkeepref( *sd =started->batCacheid);
249 44 : BBPkeepref( *ss =status->batCacheid);
250 44 : BBPkeepref( *q =query->batCacheid);
251 44 : BBPkeepref( *f =finished->batCacheid);
252 44 : BBPkeepref( *w =workers->batCacheid);
253 44 : BBPkeepref( *m =memory->batCacheid);
254 44 : return MAL_SUCCEED;
255 :
256 0 : bailout:
257 0 : MT_lock_unset(&mal_delayLock);
258 0 : BBPunfix(tag->batCacheid);
259 0 : BBPunfix(sessionid->batCacheid);
260 0 : BBPunfix(user->batCacheid);
261 0 : BBPunfix(started->batCacheid);
262 0 : BBPunfix(status->batCacheid);
263 0 : BBPunfix(query->batCacheid);
264 0 : BBPunfix(finished->batCacheid);
265 0 : BBPunfix(workers->batCacheid);
266 0 : BBPunfix(memory->batCacheid);
267 0 : return msg ? msg : createException(MAL, "SYSMONqueue", SQLSTATE(HY013) MAL_MALLOC_FAIL);
268 : }
269 :
270 : static str
271 0 : SYSMONpause(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
272 : {
273 : bool set = false;
274 : lng tag = 0;
275 :
276 0 : switch(getArgType(mb,pci,1)){
277 0 : case TYPE_bte: tag = *getArgReference_bte(stk,pci,1); break;
278 0 : case TYPE_sht: tag = *getArgReference_sht(stk,pci,1); break;
279 0 : case TYPE_int: tag = *getArgReference_int(stk,pci,1); break;
280 0 : case TYPE_lng: tag = *getArgReference_lng(stk,pci,1); break;
281 0 : default:
282 0 : throw(MAL, "SYSMONpause", SQLSTATE(42000) "SYSMONpause requires a 64-bit integer");
283 : }
284 0 : if (tag < 1)
285 0 : throw(MAL, "SYSMONpause", SQLSTATE(42000) "Tag must be positive");
286 0 : MT_lock_set(&mal_delayLock);
287 0 : for (size_t i = qtail; i != qhead; i++){
288 0 : if( i == qsize){
289 : i = 0;
290 0 : if( i == qhead)
291 : break;
292 : }
293 0 : if( (lng) QRYqueue[i].tag == tag && cntxt->user == MAL_ADMIN && QRYqueue[i].stk){
294 0 : QRYqueue[i].stk->status = 'p';
295 0 : QRYqueue[i].status = "paused";
296 : set = true;
297 : }
298 : }
299 0 : MT_lock_unset(&mal_delayLock);
300 0 : return set ? MAL_SUCCEED : createException(MAL, "SYSMONpause", SQLSTATE(42000) "Tag " LLFMT " unknown", tag);
301 : }
302 :
303 : static str
304 0 : SYSMONresume(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
305 : {
306 : bool set = false;
307 : lng tag = 0;
308 :
309 0 : switch(getArgType(mb,pci,1)){
310 0 : case TYPE_bte: tag = *getArgReference_bte(stk,pci,1); break;
311 0 : case TYPE_sht: tag = *getArgReference_sht(stk,pci,1); break;
312 0 : case TYPE_int: tag = *getArgReference_int(stk,pci,1); break;
313 0 : case TYPE_lng: tag = *getArgReference_lng(stk,pci,1); break;
314 0 : default:
315 0 : throw(MAL, "SYSMONresume", SQLSTATE(42000) "SYSMONresume requires a 64-bit integer");
316 : }
317 0 : if (tag < 1)
318 0 : throw(MAL, "SYSMONresume", SQLSTATE(42000) "Tag must be positive");
319 0 : MT_lock_set(&mal_delayLock);
320 0 : for (size_t i = qtail; i == qhead; i++){
321 0 : if( i == qsize){
322 : i = 0;
323 0 : if ( i== qhead)
324 : break;
325 : }
326 0 : if( (lng)QRYqueue[i].tag == tag && cntxt->user == MAL_ADMIN && QRYqueue[i].stk){
327 0 : QRYqueue[i].stk->status = 0;
328 0 : QRYqueue[i].status = "running";
329 : set = true;
330 : }
331 : }
332 0 : MT_lock_unset(&mal_delayLock);
333 0 : return set ? MAL_SUCCEED : createException(MAL, "SYSMONresume", SQLSTATE(42000) "Tag " LLFMT " unknown", tag);
334 : }
335 :
336 : static str
337 0 : SYSMONstop(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
338 : {
339 : bool set = false;
340 : lng tag = 0;
341 :
342 0 : switch(getArgType(mb,pci,1)){
343 0 : case TYPE_bte: tag = *getArgReference_bte(stk,pci,1); break;
344 0 : case TYPE_sht: tag = *getArgReference_sht(stk,pci,1); break;
345 0 : case TYPE_int: tag = *getArgReference_int(stk,pci,1); break;
346 0 : case TYPE_lng: tag = *getArgReference_lng(stk,pci,1); break;
347 0 : default:
348 0 : throw(MAL, "SYSMONstop", SQLSTATE(42000) "SYSMONstop requires a 64-bit integer");
349 : }
350 0 : if (tag < 1)
351 0 : throw(MAL, "SYSMONstop", SQLSTATE(42000) "Tag must be positive");
352 0 : MT_lock_set(&mal_delayLock);
353 0 : for (size_t i = qtail; i != qhead; i++){
354 0 : if( i == qsize){
355 : i = 0;
356 0 : if( i == qhead)
357 : break;
358 : }
359 0 : if( (lng) QRYqueue[i].tag == tag && cntxt->user == MAL_ADMIN && QRYqueue[i].stk){
360 0 : QRYqueue[i].stk->status = 'q';
361 0 : QRYqueue[i].status = "stopping";
362 : set = true;
363 : }
364 : }
365 0 : MT_lock_unset(&mal_delayLock);
366 0 : return set ? MAL_SUCCEED : createException(MAL, "SYSMONstop", SQLSTATE(42000) "Tag " LLFMT " unknown", tag);
367 : }
368 :
369 : #include "mel.h"
370 : mel_func sysmon_init_funcs[] = {
371 : pattern("sysmon", "pause", SYSMONpause, false, "Suspend a running query", args(0,1, arg("id",sht))),
372 : pattern("sysmon", "pause", SYSMONpause, false, "Suspend a running query", args(0,1, arg("id",int))),
373 : pattern("sysmon", "pause", SYSMONpause, false, "Suspend a running query", args(0,1, arg("id",lng))),
374 : pattern("sysmon", "resume", SYSMONresume, false, "Resume processing of a query ", args(0,1, arg("id",sht))),
375 : pattern("sysmon", "resume", SYSMONresume, false, "Resume processing of a query ", args(0,1, arg("id",int))),
376 : pattern("sysmon", "resume", SYSMONresume, false, "Resume processing of a query ", args(0,1, arg("id",lng))),
377 : pattern("sysmon", "stop", SYSMONstop, false, "Stop a single query a.s.a.p.", args(0,1, arg("id",sht))),
378 : pattern("sysmon", "stop", SYSMONstop, false, "Stop a single query a.s.a.p.", args(0,1, arg("id",int))),
379 : pattern("sysmon", "stop", SYSMONstop, false, "Stop a single query a.s.a.p.", args(0,1, arg("id",lng))),
380 : pattern("sysmon", "queue", SYSMONqueue, false, "A queue of queries that are currently being executed or recently finished", args(9,9, batarg("tag",lng),batarg("sessionid",int),batarg("user",str),batarg("started",timestamp),batarg("status",str),batarg("query",str),batarg("finished",timestamp),batarg("workers",int),batarg("memory",int))),
381 : pattern("sysmon", "user_statistics", SYSMONstatistics, false, "", args(7,7, batarg("user",str),batarg("querycount",lng),batarg("totalticks",lng),batarg("started",timestamp),batarg("finished",timestamp),batarg("maxticks",lng),batarg("maxquery",str))),
382 : { .imp=NULL }
383 : };
384 : #include "mal_import.h"
385 : #ifdef _MSC_VER
386 : #undef read
387 : #pragma section(".CRT$XCU",read)
388 : #endif
389 255 : LIB_STARTUP_FUNC(init_sysmon_mal)
390 255 : { mal_module("sysmon", NULL, sysmon_init_funcs); }
|