aboutsummaryrefslogtreecommitdiffstats
path: root/openresty-ext/src/assembly/resources/openresty/nginx/luaext/vendor/shcache.lua
blob: 32a9b6c57258570794cfcccd378e5d9e2b025c99 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
-- Copyright (C) 2013 Matthieu Tourne
-- @author Matthieu Tourne <matthieu@cloudflare.com>

-- small overlay over shdict, smart cache load mechanism

local M = {}

local resty_lock = require("resty.lock")

local DEBUG = false

-- defaults in secs
local DEFAULT_POSITIVE_TTL = 10     -- cache for, successful lookup
local DEFAULT_NEGATIVE_TTL = 2      -- cache for, failed lookup
local DEFAULT_ACTUALIZE_TTL = 2     -- stale data, actualize data for

-- default lock options, in secs
local function _get_default_lock_options()
   return {
      exptime = 1,     -- max wait if failing to call unlock()
      timeout = 0.5,   -- max waiting time of lock()
      max_step = 0.1,  -- max sleeping interval
   }
end

local function prequire(m)
  local ok, err_or_module = pcall(require, m)
  if not ok then
     return nil, err_or_module
  end
  return err_or_module
end

local conf = prequire("conf")
if conf then
   DEFAULT_NEGATIVE_TTL = conf.DEFAULT_NEGATIVE_TTL or DEFAULT_NEGATIVE_TTL
   DEFAULT_ACTUALIZE_TTL = conf.DEFAULT_ACTUALIZE_TTL or DEFAULT_ACTUALIZE_TTL
end

local band = bit.band
local bor = bit.bor
local st_format = string.format

-- there are only really 5 states total
                                 -- is_stale    is_neg  is_from_cache
local MISS_STATE = 0             -- 0           0       0
local HIT_POSITIVE_STATE = 1     -- 0           0       1
local HIT_NEGATIVE_STATE = 3     -- 0           1       1
local STALE_POSITIVE_STATE = 5   -- 1           0       1

-- stale negative doesn't really make sense, use HIT_NEGATIVE instead
-- local STALE_NEGATIVE_STATE = 7   -- 1           1       1

-- xor to set
local NEGATIVE_FLAG = 2
local STALE_FLAG = 4

local STATES = {
   [MISS_STATE] = 'MISS',
   [HIT_POSITIVE_STATE] = 'HIT',
   [HIT_NEGATIVE_STATE] = 'HIT_NEGATIVE',
   [STALE_POSITIVE_STATE] = 'STALE',
   -- [STALE_NEGATIVE_STATE] = 'STALE_NEGATIVE',
}

local function get_status(flags)
   return STATES[flags] or st_format('UNDEF (0x%x)', flags)
end

local EMPTY_DATA = '_EMPTY_'

-- install debug functions
if DEBUG then
   local resty_lock_lock = resty_lock.lock

   resty_lock.lock = function (...)
      local _, key = unpack({...})
      print("lock key: ", tostring(key))
      return resty_lock_lock(...)
   end

   local resty_lock_unlock = resty_lock.unlock

   resty_lock.unlock = function (...)
      print("unlock")
      return resty_lock_unlock(...)
   end
end


-- store the object in the context
-- useful for debugging and tracking cache status
local function _store_object(self, name)
   if DEBUG then
      print('storing shcache: ', name, ' into ngx.ctx')
   end

   local ngx_ctx = ngx.ctx

   if not ngx_ctx.shcache then
      ngx_ctx.shcache = {}
   end
   ngx_ctx.shcache[name] = self
end

local obj_mt = {
   __index = M,
}

-- default function for callbacks.encode / decode.
local function _identity(data)
   return data
end

-- shdict: ngx.shared.DICT, created by the lua_shared_dict directive
-- callbacks: see shcache state machine for user defined functions
--    * callbacks.external_lookup is required
--    * callbacks.encode    : optional encoding before saving to shmem
--    * callbacks.decode    : optional decoding when retreiving from shmem
-- opts:
--   * opts.positive_ttl    : save a valid external loookup for, in seconds
--   * opts.positive_ttl    : save a invalid loookup for, in seconds
--   * opts.actualize_ttl   : re-actualize a stale record for, in seconds
--   * opts.lock_options    : set option to lock see : http://github.com/agentzh/lua-resty-lock
--                            for more details.
--   * opts.locks_shdict    : specificy the name of the shdict containing the locks
--                            (useful if you might have locks key collisions)
--                            uses "locks" by default.
--   * opts.name            : if shcache object is named, it will automatically
--                            register itself in ngx.ctx.shcache (useful for logging).
local function new(self, shdict, callbacks, opts)
   if not shdict then
      return nil, "shdict does not exist"
   end

   -- check that callbacks.external_lookup is set
   if not callbacks or not callbacks.external_lookup then
      return nil, "no external_lookup function defined"
   end

   if not callbacks.encode then
      callbacks.encode = _identity
   end

   if not callbacks.decode then
      callbacks.decode = _identity
   end

   local opts = opts or {}

   -- merge default lock options with the ones passed to new()
   local lock_options = _get_default_lock_options()
   if opts.lock_options then
      for k, v in pairs(opts.lock_options) do
         lock_options[k] = v
      end
   end

   local name = opts.name

   local obj = {
      shdict = shdict,
      callbacks = callbacks,

      positive_ttl = opts.positive_ttl or DEFAULT_POSITIVE_TTL,
      negative_ttl = opts.negative_ttl or DEFAULT_NEGATIVE_TTL,

      -- ttl to actualize stale data to
      actualize_ttl = opts.actualize_ttl or DEFAULT_ACTUALIZE_TTL,

      lock_options = lock_options,

      locks_shdict = opts.lock_shdict or "locks",

      -- STATUS --

      from_cache = false,
      cache_status = 'UNDEF',
      cache_state = MISS_STATE,
      lock_status = 'NO_LOCK',

      -- shdict:set() pushed out another value
      forcible_set = false,

      -- cache hit on second attempt (post lock)
      hit2 = false,

      name = name,
   }

   local locks = ngx.shared[obj.locks_shdict]

   -- check for existence, locks is not directly used
   if not locks then
      ngx.log(ngx.CRIT, 'shared mem locks is missing.\n',
              '## add to you lua conf: lua_shared_dict locks 5M; ##')
       return nil
   end

   local self = setmetatable(obj, obj_mt)

   -- if the shcache object is named
   -- keep track of the object in the context
   -- (useful for gathering stats at log phase)
   if name then
      _store_object(self, name)
   end

   return self
end
M.new = new

-- acquire a lock
local function _get_lock(self)
   local lock = self.lock
   if not lock then
      lock = resty_lock:new(self.locks_shdict, self.lock_options)
      self.lock = lock
   end
   return lock
end

-- remove the lock if there is any
local function _unlock(self)
   local lock = self.lock
   if lock then
      local ok, err = lock:unlock()
      if not ok then
         ngx.log(ngx.ERR, "failed to unlock :" , err)
      end
      self.lock = nil
   end
end

local function _return(self, data, flags)
   -- make sure we remove the locks if any before returning data
   _unlock(self)

   -- set cache status
   local cache_status = get_status(self.cache_state)

   if cache_status == 'MISS' and not data then
      cache_status = 'NO_DATA'
   end

   self.cache_status = cache_status

   return data, self.from_cache
end

local function _set(self, ...)
   if DEBUG then
      local key, data, ttl, flags = unpack({...})
      print("saving key: ", key, ", for: ", ttl)
   end

   local ok, err, forcible = self.shdict:set(...)

   self.forcible_set = forcible

   if not ok then
      local key, data, ttl, flags = unpack({...})
      ngx.log(ngx.ERR, 'failed to set key: ', key, ', err: ', err)
   end

   return ok
end

-- check if the data returned by :get() is considered empty
local function _is_empty(data, flags)
   return flags and band(flags, NEGATIVE_FLAG) and data == EMPTY_DATA
end

-- save positive, encode the data if needed before :set()
local function _save_positive(self, key, data)
   if DEBUG then
      print("key: ", key, ". save positive, ttl: ", self.positive_ttl)
   end
   data = self.callbacks.encode(data)
   return _set(self, key, data, self.positive_ttl, HIT_POSITIVE_STATE)
end

-- save negative, no encoding required (no data actually saved)
local function _save_negative(self, key)
   if DEBUG then
      print("key: ", key, ". save negative, ttl: ", self.negative_ttl)
   end
   return _set(self, key, EMPTY_DATA, self.negative_ttl, HIT_NEGATIVE_STATE)
end

-- save actualize, will boost a stale record to a live one
local function _save_actualize(self, key, data, flags)
   local new_flags = bor(flags, STALE_FLAG)

   if DEBUG then
      print("key: ", key, ". save actualize, ttl: ", self.actualize_ttl,
            ". new state: ", get_status(new_flags))
   end

   _set(self, key, data, self.actualize_ttl, new_flags)
   return new_flags
end

local function _process_cached_data(self, data, flags)
   if DEBUG then
      print("data: ", data, st_format(", flags: %x", flags))
   end

   self.cache_state = flags
   self.from_cache = true

   if _is_empty(data, flags) then
      -- empty cached data
      return nil
   else
      return self.callbacks.decode(data)
   end
end

-- wrapper to get data from the shdict
local function _get(self, key)
   -- always call get_stale() as it does not free element
   -- like get does on each call
   local data, flags, stale = self.shdict:get_stale(key)

   if data and stale then
      if DEBUG then
         print("found stale data for key : ", key)
      end

      self.stale_data = { data, flags }

      return nil, nil
   end

   return data, flags
end

local function _get_stale(self)
   local stale_data = self.stale_data
   if stale_data then
      return unpack(stale_data)
   end

   return nil, nil
end

local function load(self, key)
   -- start: check for existing cache
   local data, flags = _get(self, key)

   -- hit: process_cache_hit
   if data then
      data = _process_cached_data(self, data, flags)
      return _return(self, data)
   end

   -- miss: set lock

   -- lock: set a lock before performing external lookup
   local lock = _get_lock(self)
   local elapsed, err = lock:lock(key)

   if not elapsed then
      -- failed to acquire lock, still proceed normally to external_lookup
      -- unlock() might fail.
      ngx.log(ngx.ERR, "failed to acquire the lock: ", err)
      self.lock_status = 'ERROR'
      -- _unlock won't try to unlock() without a valid lock
      self.lock = nil
   else
      -- lock acquired successfuly

      if elapsed > 0 then

         -- elapsed > 0 => waited lock (other thread might have :set() the data)
         -- (more likely to get a HIT on cache_load 2)
         self.lock_status = 'WAITED'

      else

         -- elapsed == 0 => immediate lock
         -- it is less likely to get a HIT on cache_load 2
         -- but still perform it (race condition cases)
         self.lock_status = 'IMMEDIATE'
      end

      -- perform cache_load 2
      data, flags = _get(self, key)
      if data then
         -- hit2 : process cache hit

         self.hit2 = true

         -- unlock before de-serializing cached data
         _unlock(self)
         data = _process_cached_data(self, data, flags)
         return _return(self, data)
      end

      -- continue to external lookup
   end

   -- perform external lookup
   data, err = self.callbacks.external_lookup()

   if data then
      -- succ: save positive and return the data

      _save_positive(self, key, data)
      return _return(self, data)
   else
      ngx.log(ngx.WARN, 'external lookup failed: ', err)
   end

   -- external lookup failed
   -- attempt to load stale data
   data, flags = _get_stale(self)
   if data and not _is_empty(data, flags) then
      -- hit_stale + valid (positive) data

      flags = _save_actualize(self, key, data, flags)
      -- unlock before de-serializing data
      _unlock(self)
      data = _process_cached_data(self, data, flags)
      return _return(self, data)
   end

   if DEBUG and data then
      -- there is data, but it failed _is_empty() => stale negative data
      print('STALE_NEGATIVE data => cache as a new HIT_NEGATIVE')
   end

   -- nothing has worked, save negative and return empty
   _save_negative(self, key)
   return _return(self, nil)
end
M.load = load

return M