Line data Source code
1 : /*
2 : * This Source Code Form is subject to the terms of the Mozilla Public
3 : * License, v. 2.0. If a copy of the MPL was not distributed with this
4 : * file, You can obtain one at http://mozilla.org/MPL/2.0/.
5 : *
6 : * Copyright 1997 - July 2008 CWI, August 2008 - 2021 MonetDB B.V.
7 : */
8 :
9 : /*
10 : * (c) Martin Kersten
11 : * This module provides an advisary lock manager for SQL transactions
12 : * that prefer waiting over transaction failures due to OCC
13 : * The table may only grow with lockable items
14 : * It could be extended with a semaphore for queue management
15 : */
16 : #include "monetdb_config.h"
17 : #include "gdk.h"
18 : #include "mal_exception.h"
19 : #include "mal_interpreter.h"
20 : #include "gdk_time.h"
21 :
22 : #define LOCKTIMEOUT (20 * 1000)
23 : #define LOCKDELAY 20
24 :
25 : typedef struct{
26 : Client cntxt; // user holding the write lock
27 : lng start; // time when it started
28 : lng retention; // time when the lock is released
29 : lng total; // accumulated lock time
30 : int used; // how often it used, for balancing
31 : int locked; // writelock set or not
32 : } OLTPlockRecord;
33 :
34 : static OLTPlockRecord oltp_locks[MAXOLTPLOCKS];
35 : static int oltp_delay;
36 :
37 : /*
38 : static void
39 : OLTPdump_(Client cntxt, str msg)
40 : {
41 : int i;
42 :
43 : mnstr_printf(cntxt->fdout,"%s",msg);
44 : for(i=0; i< MAXOLTPLOCKS; i++)
45 : if( oltp_locks[i].locked)
46 : mnstr_printf(cntxt->fdout,"#[%i] %3d\n",i, (oltp_locks[i].cntxt ? oltp_locks[i].cntxt->idxx: -1));
47 : }
48 : */
49 :
50 : static str
51 0 : OLTPreset(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
52 : { int i;
53 : (void) cntxt;
54 : (void) mb;
55 : (void) stk;
56 : (void) pci;
57 :
58 0 : MT_lock_set(&mal_oltpLock);
59 :
60 0 : for( i=0; i<MAXOLTPLOCKS; i++){
61 0 : oltp_locks[i].locked = 0;
62 0 : oltp_locks[i].cntxt = 0;
63 0 : oltp_locks[i].start = 0;
64 0 : oltp_locks[i].used = 0;
65 0 : oltp_locks[i].retention = 0;
66 : }
67 0 : MT_lock_unset(&mal_oltpLock);
68 0 : return MAL_SUCCEED;
69 : }
70 :
71 : static str
72 0 : OLTPenable(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
73 : {
74 : (void) mb;
75 : (void) stk;
76 : (void) pci;
77 : (void) cntxt;
78 :
79 0 : oltp_delay = TRUE;
80 0 : return MAL_SUCCEED;
81 : }
82 :
83 : static str
84 0 : OLTPdisable(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
85 : {
86 0 : OLTPreset(cntxt, mb, stk,pci);
87 0 : oltp_delay = FALSE;
88 : (void) cntxt;
89 0 : return MAL_SUCCEED;
90 : }
91 :
92 : static str
93 0 : OLTPinit(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
94 : {
95 : // nothing to be done right now
96 0 : return OLTPreset(cntxt,mb,stk,pci);
97 : }
98 :
99 : // The locking is based in the hash-table.
100 : // It contains all write locks outstanding
101 : // A transaction may proceed if no element in its read set is locked
102 :
103 : static str
104 0 : OLTPlock(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
105 : {
106 : int i,lck;
107 0 : int clk, wait= GDKms();
108 : lng now;
109 : const char *sql, *cpy;
110 :
111 : (void) stk;
112 0 : if ( oltp_delay == FALSE )
113 : return MAL_SUCCEED;
114 :
115 0 : TRC_DEBUG(MAL_SERVER, "%6d lock request for client: %d, pc %d", GDKms(), cntxt->idx, pci->pc);
116 :
117 : do{
118 0 : MT_lock_set(&mal_oltpLock);
119 0 : clk = GDKms();
120 0 : now = GDKusec();
121 : // check if all write locks are available
122 0 : for( i=1; i< pci->argc; i++){
123 0 : lck= getVarConstant(mb, getArg(pci,i)).val.ival;
124 0 : if ( lck > 0 && (oltp_locks[lck].locked || oltp_locks[lck].retention > now ))
125 : break;
126 : }
127 :
128 0 : if( i == pci->argc ){
129 0 : TRC_DEBUG(MAL_SERVER, "OLTP '%6d' set lock for client: %d\n", GDKms(), cntxt->idx);
130 :
131 0 : for( i=1; i< pci->argc; i++){
132 0 : lck= getVarConstant(mb, getArg(pci,i)).val.ival;
133 : // only set the write locks
134 0 : if( lck > 0){
135 0 : oltp_locks[lck].cntxt = cntxt;
136 0 : oltp_locks[lck].start = now;
137 0 : oltp_locks[lck].locked = 1;
138 0 : oltp_locks[lck].retention = 0;
139 : }
140 : }
141 0 : MT_lock_unset(&mal_oltpLock);
142 0 : return MAL_SUCCEED;
143 : } else {
144 0 : MT_lock_unset(&mal_oltpLock);
145 0 : TRC_DEBUG(MAL_SERVER, "%d delay imposed for client: %d\n", GDKms(), cntxt->idx);
146 0 : MT_sleep_ms(LOCKDELAY);
147 : }
148 0 : } while( clk - wait < LOCKTIMEOUT);
149 :
150 0 : TRC_DEBUG(MAL_SERVER, "%6d proceed query for client: %d\n", GDKms(), cntxt->idx);
151 :
152 : // if the time out is related to a copy_from query, we should not start it either.
153 0 : sql = getName("sql");
154 0 : cpy = getName("copy_from");
155 :
156 0 : for( i = 0; i < mb->stop; i++)
157 0 : if( getFunctionId(getInstrPtr(mb,i)) == cpy && getModuleId(getInstrPtr(mb,i)) == sql ){
158 0 : TRC_DEBUG(MAL_SERVER, "%6d bail out a concurrent copy into: %d\n", GDKms(), cntxt->idx);
159 0 : throw(SQL,"oltp.lock","Conflicts with other write operations\n");
160 : }
161 : return MAL_SUCCEED;
162 : }
163 :
164 : static str
165 0 : OLTPrelease(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
166 : {
167 : int i,lck;
168 : lng delay,clk;
169 :
170 : (void) cntxt;
171 : (void) stk;
172 0 : if ( oltp_delay == FALSE )
173 : return MAL_SUCCEED;
174 :
175 0 : MT_lock_set(&mal_oltpLock);
176 0 : clk = GDKusec();
177 :
178 0 : TRC_DEBUG(MAL_SERVER, "%6d release the locks: %d", GDKms(), cntxt->idx);
179 :
180 0 : for( i=1; i< pci->argc; i++){
181 0 : lck= getVarConstant(mb, getArg(pci,i)).val.ival;
182 0 : if( lck > 0){
183 0 : oltp_locks[lck].total += clk - oltp_locks[lck].start;
184 0 : oltp_locks[lck].used ++;
185 0 : oltp_locks[lck].cntxt = 0;
186 0 : oltp_locks[lck].start = 0;
187 0 : oltp_locks[lck].locked = 0;
188 0 : delay = oltp_locks[lck].total/ oltp_locks[lck].used;
189 0 : if( delay > LOCKDELAY || delay < LOCKDELAY/10)
190 : delay = LOCKDELAY;
191 0 : oltp_locks[lck].retention = clk + delay;
192 0 : TRC_DEBUG(MAL_SERVER, "Retention period for lock: %d " LLFMT"\n", lck, delay);
193 : }
194 : }
195 0 : MT_lock_unset(&mal_oltpLock);
196 0 : return MAL_SUCCEED;
197 : }
198 :
199 : static str
200 0 : OLTPtable(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
201 : {
202 : BAT *bs= NULL, *bu= NULL, *bl= NULL, *bq= NULL, *bc = NULL;
203 0 : bat *started = getArgReference_bat(stk,pci,0);
204 0 : bat *userid = getArgReference_bat(stk,pci,1);
205 0 : bat *lockid = getArgReference_bat(stk,pci,2);
206 0 : bat *used = getArgReference_bat(stk,pci,3);
207 : int i;
208 : timestamp tsn;
209 :
210 : (void) cntxt;
211 : (void) mb;
212 :
213 0 : bs = COLnew(0, TYPE_timestamp, 0, TRANSIENT);
214 0 : bu = COLnew(0, TYPE_str, 0, TRANSIENT);
215 0 : bl = COLnew(0, TYPE_int, 0, TRANSIENT);
216 0 : bc = COLnew(0, TYPE_int, 0, TRANSIENT);
217 0 : bq = COLnew(0, TYPE_str, 0, TRANSIENT);
218 :
219 0 : if( bs == NULL || bu == NULL || bl == NULL || bq == NULL || bc == NULL){
220 0 : if( bs) BBPunfix(bs->batCacheid);
221 0 : if( bl) BBPunfix(bl->batCacheid);
222 0 : if( bu) BBPunfix(bu->batCacheid);
223 0 : if( bc) BBPunfix(bc->batCacheid);
224 0 : if( bq) BBPunfix(bq->batCacheid);
225 0 : throw(MAL,"oltp.table", GDK_EXCEPTION);
226 : }
227 0 : for( i = 0; i < MAXOLTPLOCKS; i++) {
228 0 : if (oltp_locks[i].used ){
229 0 : tsn = oltp_locks[i].start ? timestamp_fromusec(oltp_locks[i].start) : timestamp_nil;
230 0 : if (BUNappend(bs, &tsn, false) != GDK_SUCCEED ||
231 0 : BUNappend(bu, oltp_locks[i].cntxt ? oltp_locks[i].cntxt->username : str_nil, false) != GDK_SUCCEED ||
232 0 : BUNappend(bl, &i, false) != GDK_SUCCEED ||
233 0 : BUNappend(bc, &oltp_locks[i].used, false) != GDK_SUCCEED)
234 0 : goto bailout;
235 : }
236 : }
237 : //OLTPdump_(cntxt,"#lock table\n");
238 0 : BBPkeepref(*started = bs->batCacheid);
239 0 : BBPkeepref(*userid = bu->batCacheid);
240 0 : BBPkeepref(*lockid = bl->batCacheid);
241 0 : BBPkeepref(*used = bc->batCacheid);
242 0 : return MAL_SUCCEED;
243 : bailout:
244 0 : BBPunfix(bs->batCacheid);
245 0 : BBPunfix(bl->batCacheid);
246 0 : BBPunfix(bu->batCacheid);
247 0 : BBPunfix(bc->batCacheid);
248 0 : BBPunfix(bq->batCacheid);
249 0 : throw(MAL, "oltp.table", GDK_EXCEPTION);
250 : }
251 :
252 : static str
253 0 : OLTPis_enabled(int *ret) {
254 0 : *ret = oltp_delay;
255 0 : return MAL_SUCCEED;
256 : }
257 :
258 : #include "mel.h"
259 : mel_func oltp_init_funcs[] = {
260 : pattern("oltp", "init", OLTPinit, true, "Initialize the lock table", noargs),
261 : pattern("oltp", "enable", OLTPenable, true, "Enable the OLTP delay monitor", noargs),
262 : pattern("oltp", "disable", OLTPdisable, true, "Disable the OLTP delay monitor", noargs),
263 : pattern("oltp", "reset", OLTPreset, true, "Reset the OLTP lock table", noargs),
264 : pattern("oltp", "lock", OLTPlock, true, "Wait for all write locks needed", args(1,2, arg("",void),vararg("lck",int))),
265 : pattern("oltp", "release", OLTPrelease, true, "Release for all write locks needed", args(1,2, arg("",void),vararg("lck",int))),
266 : pattern("oltp", "table", OLTPtable, true, "Show status of lock table", args(4,4, batarg("start",timestamp),batarg("usr",str),batarg("unit",int),batarg("cnt",int))),
267 : command("oltp", "isenabled", OLTPis_enabled, true, "Query the OLTP state", args(1,1, arg("",int))),
268 : { .imp=NULL }
269 : };
270 : #include "mal_import.h"
271 : #ifdef _MSC_VER
272 : #undef read
273 : #pragma section(".CRT$XCU",read)
274 : #endif
275 257 : LIB_STARTUP_FUNC(init_oltp_mal)
276 257 : { mal_module("oltp", NULL, oltp_init_funcs); }
|