+;;; -*- Mode: Common-Lisp -*-
+
+;;; Copyright (c) 2006, Abhijit 'quasi' Rao. All rights reserved.
+;;; Copyright (c) 2006, Cleartrip Travel Services.
+;;; Copyright (c) 2011 Kevin Rosenberg
+
+;;; Redistribution and use in source and binary forms, with or without
+;;; modification, are permitted provided that the following conditions
+;;; are met:
+
+;;; * Redistributions of source code must retain the above copyright
+;;; notice, this list of conditions and the following disclaimer.
+
+;;; * Redistributions in binary form must reproduce the above
+;;; copyright notice, this list of conditions and the following
+;;; disclaimer in the documentation and/or other materials
+;;; provided with the distribution.
+
+;;; THIS SOFTWARE IS PROVIDED BY THE AUTHOR 'AS IS' AND ANY EXPRESSED
+;;; OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+;;; WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+;;; ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
+;;; DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+;;; DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE
+;;; GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+;;; INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
+;;; WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+;;; NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+;;; SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+(in-package #:memcache)
+
+(defmethod print-object ((mc memcache) stream)
+ (print-unreadable-object (mc stream :type t :identity t)
+ (format stream "~A on ~A:~A ~AMB"
+ (when (slot-boundp mc 'name) (name mc))
+ (when (slot-boundp mc 'ip) (ip mc))
+ (when (slot-boundp mc 'port) (port mc))
+ (when (and (slot-boundp mc 'memcached-server-storage-size)
+ (numberp (slot-value mc 'memcached-server-storage-size)))
+ (/ (memcached-server-storage-size mc) 1024 1024)))))
+
+(defmethod initialize-instance :after ((memcache memcache) &rest initargs)
+ (declare (ignore initargs))
+ (setf (slot-value memcache 'pool) (make-instance 'memcache-connection-pool
+ :name (concatenate 'simple-string (name memcache) " - Connection Pool")
+ :max-capacity (pool-size memcache)))
+ (handler-case (mc-pool-init :memcache memcache)
+ (error () nil))
+ (let ((stats (handler-case (mc-stats :memcache memcache)
+ (error () nil))))
+ (if stats
+ (setf (slot-value memcache 'memcached-server-storage-size) (mc-stats-limit-maxbytes stats))
+ (setf (slot-value memcache 'memcached-server-storage-size) -1))))
+
+(defun make-memcache-instance (&key (ip "127.0.0.1") (port 11211)
+ (name "Memcache") (pool-size 5))
+ "Creates an instance of class MEMCACHE which represents a memcached server."
+ (make-instance 'memcache :name name :ip ip :port port :pool-size pool-size))
+
+
+(defmacro with-pool-maybe ((stream memcache use-pool) &body body)
+ "Macro to wrap the use-pool/dont-use-pool stuff and the cleanup
+around a body of actual action statements"
+ (let ((mc (gensym "MEMCACHE-"))
+ (up (gensym "USE-POOL-"))
+ (us (gensym "USOCKET-")))
+ `(let* ((,mc ,memcache)
+ (,up ,use-pool)
+ (,us (if ,up
+ (if *pool-get-trys?*
+ (mc-get-from-pool-with-try :memcache ,mc)
+ (mc-get-from-pool :memcache ,mc))
+ (mc-make-pool-item :memcache ,mc))))
+ (unwind-protect
+ (when ,us
+ (let ((,stream (usocket:socket-stream ,us)))
+ (handler-case
+ (progn ,@body)
+ (error (c)
+ (when ,up
+ (mc-chuck-from-pool ,us ,mc))
+ (error c)))))
+ (if ,up
+ (mc-put-in-pool ,us :memcache ,mc)
+ (ignore-errors (usocket:socket-close ,mc)))))))
+
+(defun send-mc-command (s &rest args &aux started)
+ (flet ((write-string-bytes (str stream)
+ (loop for char across str
+ do (write-byte (char-code char) stream))))
+ (dolist (arg args)
+ (unless (null arg)
+ (if started
+ (write-byte (char-code #\space) s)
+ (setq started t))
+ (typecase arg
+ (string (write-string-bytes arg s))
+ (character (write-byte (char-code arg) s))
+ (t (write-string-bytes (princ-to-string arg) s)))))
+ (write-string-bytes +crlf+ s)
+ (force-output s)))
+
+
+;;;
+;;;
+;;; Memcached API functionality
+;;;
+;;;
+
+(defun mc-store (key data &key (memcache *memcache*) ((:command command) :set) ((:exptime exptime) 0)
+ ((:use-pool use-pool) *use-pool*) (cas-unique) (flags 0))
+ "Stores data in the memcached server using the :command command.
+key => key by which the data is stored. this is of type SIMPLE-STRING
+data => data to be stored into the cache. data is a sequence of type (UNSIGNED-BYTE 8)
+length => size of data
+memcache => The instance of class memcache which represnts the memcached we want to use.
+command => The storage command we want to use. There are 3 available : set, add & replace.
+exptime => The time in seconds when this data expires. 0 is never expire."
+ (declare (type fixnum exptime) (type simple-string key))
+ (when (and (eq command :cas) (not (integerp cas-unique)))
+ (error "CAS command, but CAS-UNIQUE not set."))
+ (let ((len (length data)))
+ (with-pool-maybe (s memcache use-pool)
+ (send-mc-command
+ s
+ (ecase command
+ (:set "set")
+ (:add "add")
+ (:replace "replace")
+ (:append "append")
+ (:prepend "prepend")
+ (:cas "cas"))
+ key flags exptime len (when (eq command :cas) cas-unique))
+ (write-sequence data s)
+ (send-mc-command s)
+ (read-crlf-line s))))
+
+(defun mc-get (key-or-keys &key (memcache *memcache*) (use-pool *use-pool*)
+ (command :get))
+ "Retrive value for key from memcached server.
+keys-list => is a list of the keys, seperated by whitespace, by which data is stored in memcached
+memcache => The instance of class memcache which represnts the memcached we want to use.
+
+Returns a list of lists where each list has three elements key, flags, and value
+key is of type SIMPLE-STRING
+value is of type (UNSIGNED-BYTE 8)"
+ (let* ((multp (listp key-or-keys))
+ (keys-list (if multp key-or-keys (list key-or-keys)))
+ (res
+ (with-pool-maybe (s memcache use-pool)
+ (apply 'send-mc-command s (ecase command
+ (:get "get")
+ (:gets "gets"))
+ keys-list)
+ (loop for x = (read-crlf-line s)
+ until (string-equal x "END")
+ collect (let* ((status-line (delimited-string-to-list x))
+ (flags (parse-integer (third status-line)))
+ (len (parse-integer (fourth status-line)))
+ (cas-unique (when (eq command :gets)
+ (parse-integer (fifth status-line))))
+ (seq (make-sequence '(vector (unsigned-byte 8)) len)))
+ (read-sequence seq s)
+ (read-crlf-line s)
+ (if (eq command :gets)
+ (list (second status-line) flags seq cas-unique)
+ (list (second status-line) flags seq)))))))
+ (if multp
+ res
+ (car res))))
+
+(defun mc-del (key &key (memcache *memcache*) ((:time time) 0) (use-pool *use-pool*))
+ "Deletes a particular 'key' and it's associated data from the memcached server"
+ (declare (type fixnum time))
+ (with-pool-maybe (s memcache use-pool)
+ (send-mc-command s "delete" key time)
+ (read-crlf-line s)))
+
+(defun incr-or-decr (cmd key delta memcache use-pool)
+ (declare (type fixnum delta))
+ (let* ((res (with-pool-maybe (s memcache use-pool)
+ (send-mc-command s cmd key delta)
+ (read-crlf-line s)))
+ (int (ignore-errors (parse-integer res))))
+ (or int res)))
+
+(defun mc-incr (key &key (memcache *memcache*) (delta 1) (use-pool *use-pool*))
+ "Implements the INCR command. Increments the value of a key.
+Please read memcached documentation for more information.
+key is a string
+delta is an integer"
+ (incr-or-decr "incr" key delta memcache use-pool))
+
+(defun mc-decr (key &key (memcache *memcache*) (delta 1) (use-pool *use-pool*))
+ "Implements the DECR command. Decrements the value of a key.
+ Please read memcached documentation for more information."
+ (incr-or-decr "decr" key delta memcache use-pool))
+
+(defun mc-stats-raw (&key (memcache *memcache*) (use-pool *use-pool*) args
+ &aux results)
+ "Returns Raw stats data from memcached server to be used by the mc-stats function"
+ (with-pool-maybe (s memcache use-pool)
+ (send-mc-command s "stats" args)
+ (with-output-to-string (str)
+ (loop for line = (read-crlf-line s)
+ do (push line results)
+ until (or (string-equal "END" line)
+ (string-equal "ERROR" line)))))
+ (nreverse results))
+
+(defun mc-get-stat (key stats)
+ (when (stringp key) (setq key (ensure-keyword key)))
+ (get-alist key (mc-stats-all-stats stats)))
+
+;;; Collects statistics from the memcached server
+(defun mc-stats (&key (memcache *memcache*) (use-pool *use-pool*))
+ "Returns a struct of type memcache-stats which contains internal statistics from the
+memcached server instance. Please refer to documentation of memcache-stats for detailed
+information about each slot"
+ (let* ((result (mc-stats-raw :memcache memcache :use-pool use-pool))
+ (split (loop with xx = nil
+ for x in result
+ do (setf xx (delimited-string-to-list x))
+ when (and (string= (first xx) "STAT") (second xx))
+ collect (cons (second xx) (third xx))))
+ (all-stats (sort split (lambda (a b) (string-greaterp (car a) (car b)))))
+ (results))
+ (dolist (r all-stats)
+ (push (cons (ensure-keyword (car r))
+ (let* ((val (cdr r))
+ (int (ignore-errors (parse-integer val)))
+ (float (unless int (ignore-errors (parse-float val)))))
+ (cond
+ ((integerp int) int)
+ ((numberp float) float)
+ (t val))))
+ results))
+ (make-memcache-stats
+ :all-stats results
+ :pid (get-alist :pid results)
+ :uptime (get-alist :uptime results)
+ :time (get-alist :time results)
+ :version (get-alist :version results)
+ :rusage-user (get-alist :rusage_user results)
+ :rusage-system (get-alist :rusage_system results)
+ :curr-items (get-alist :curr_items results)
+ :curr-items-total (get-alist :curr_items_tot results)
+ :curr-connections (get-alist :curr_connections results)
+ :total-connections (get-alist :total_connections results)
+ :connection-structures (get-alist :connection_structures results)
+ :cmd-get (get-alist :cmd_get results)
+ :cmd-set (get-alist :cmd_set results)
+ :get-hits (get-alist :get_hits results)
+ :get-misses (get-alist :get_misses results)
+ :bytes-read (get-alist :bytes_read results)
+ :bytes-written (get-alist :bytes_written results)
+ :limit-maxbytes (get-alist :limit_maxbytes results)
+ )))
+
+
+;;; Error Conditions
+
+(define-condition memcached-server-unreachable (error)
+ ())
+
+(define-condition memcache-pool-empty (error)
+ ())
+
+(define-condition cannot-make-pool-object (error)
+ ())
+
+(define-condition bad-pool-object (error)
+ ())
+
+;;;
+;;;
+;;; Memcached Pooled Access
+;;;
+;;;
+
+(defclass memcache-connection-pool ()
+ ((name
+ :initarg :name
+ :reader name
+ :initform "Connection Pool"
+ :type simple-string
+ :documentation "Name of this pool")
+ (pool
+ :initform (make-queue)
+ :accessor pool)
+ (pool-lock
+ :reader pool-lock
+ :initform (make-lock "Memcache Connection Pool Lock"))
+ (max-capacity
+ :initarg :max-capacity
+ :reader max-capacity
+ :initform 2
+ :type fixnum
+ :documentation "Total capacity of the pool to hold pool objects")
+ (current-size
+ :accessor current-size
+ :initform 0)
+ (currently-in-use
+ :accessor currently-in-use
+ :initform 0
+ :type fixnum
+ :documentation "Pool objects currently in Use")
+ (total-uses
+ :accessor total-uses
+ :initform 0
+ :documentation "Total uses of the pool")
+ (total-created
+ :accessor total-created
+ :initform 0
+ :type fixnum
+ :documentation "Total pool objects created")
+ (pool-grow-requests
+ :initform 0
+ :accessor pool-grow-requests
+ :type fixnum
+ :documentation "Pool Grow Request pending Action")
+ (pool-grow-lock
+ :initform (make-lock "Pool Grow Lock")
+ :reader pool-grow-lock))
+ (:documentation "A memcached connection pool object"))
+
+(defmethod print-object ((mcp memcache-connection-pool) stream)
+ (print-unreadable-object (mcp stream :type t :identity t)
+ (format stream "Capacity:~d, Currently in use:~d"
+ (when (slot-boundp mcp 'max-capacity) (max-capacity mcp))
+ (when (slot-boundp mcp 'currently-in-use) (currently-in-use mcp)))))
+
+(defun mc-put-in-pool (conn &key (memcache *memcache*))
+ (with-lock-held ((pool-lock (pool memcache)))
+ (enqueue (pool (pool memcache)) conn)
+ (decf (currently-in-use (pool memcache)))))
+
+(defun mc-get-from-pool (&key (memcache *memcache*))
+ "Returns a pool object from pool."
+ (let (pool-object (state t))
+ (with-lock-held ((pool-lock (pool memcache)))
+ (if (queue-empty-p (pool (pool memcache)))
+ (setf state nil)
+ (progn (incf (currently-in-use (pool memcache)))
+ (incf (total-uses (pool memcache)))
+ (setf pool-object (dequeue (pool (pool memcache)))))))
+ (if state
+ pool-object
+ (error 'memcache-pool-empty))))
+
+(defun mc-get-from-pool-with-try (&key (memcache *memcache*) (tries 5) (try-interval 1))
+ ""
+ (let ((tr 1))
+ (loop
+ (progn (when (> tr tries)
+ (return nil))
+ (let ((conn (handler-case (mc-get-from-pool :memcache memcache)
+ (memcache-pool-empty () nil))))
+ (if (not conn)
+ (progn (incf tr)
+ (warn "memcache ~a : Connection Pool Empty! I will try again after ~d secs." (name memcache) try-interval)
+ (process-sleep try-interval))
+ (return conn)))))))
+
+(defun mc-pool-init (&key (memcache *memcache*))
+ "Cleans up the pool for this particular instance of memcache
+& reinits it with POOL-SIZE number of objects required by this pool"
+ (mc-pool-cleanup memcache)
+ (dotimes (i (pool-size memcache))
+ (mc-pool-grow-request memcache))
+ (mc-pool-grow memcache))
+
+(defun mc-make-pool-item (&key (memcache *memcache*))
+ (handler-case (usocket:socket-connect (ip memcache) (port memcache) :element-type '(unsigned-byte 8))
+ (usocket:socket-error () (error 'memcached-server-unreachable))
+ (error () (error 'cannot-make-pool-object))))
+
+(defun mc-pool-grow (memcache)
+ (let (grow-count pool-item-list)
+ (with-lock-held ((pool-grow-lock (pool memcache)))
+ (setf grow-count (pool-grow-requests (pool memcache)))
+ (setf pool-item-list (remove nil (loop for x from 1 to grow-count
+ collect (mc-make-pool-item :memcache memcache))))
+ (loop for x from 1 to (length pool-item-list)
+ do (with-lock-held ((pool-lock (pool memcache)))
+ (enqueue (pool (pool memcache)) (pop pool-item-list))
+ (incf (total-created (pool memcache)))
+ (incf (current-size (pool memcache))))
+ do (decf (pool-grow-requests (pool memcache)))))))
+
+(defun mc-destroy-pool-item (pool-item)
+ (ignore-errors (usocket:socket-close pool-item)))
+
+(defun mc-pool-grow-request (memcache)
+ (with-lock-held ((pool-grow-lock (pool memcache)))
+ (if (> (max-capacity (pool memcache)) (+ (current-size (pool memcache))
+ (pool-grow-requests (pool memcache))))
+ (incf (pool-grow-requests (pool memcache)))
+ (warn "memcache: Pool is at capacity."))))
+
+(defun mc-chuck-from-pool (object memcache)
+ (mc-destroy-pool-item object)
+ (with-lock-held ((pool-lock (pool memcache)))
+ (decf (current-size (pool memcache))))
+ #|(loop while (mc-pool-grow-request memcache))
+ (mc-pool-grow memcache)|#
+ (mc-pool-init :memcache memcache))
+
+(defun mc-pool-cleanup (memcache)
+ (with-lock-held ((pool-lock (pool memcache)))
+ (with-lock-held ((pool-grow-lock (pool memcache)))
+ (loop
+ when (queue-empty-p (pool (pool memcache)))
+ do (return)
+ else do (mc-destroy-pool-item (dequeue (pool (pool memcache)))))
+ (setf (current-size (pool memcache)) 0
+ (currently-in-use (pool memcache)) 0
+ (pool-grow-requests (pool memcache)) 0
+ (total-created (pool memcache)) 0
+ (total-uses (pool memcache)) 0))))