;;; -*- Mode: Common-Lisp -*- ;;; Copyright (c) 2006, Abhijit 'quasi' Rao. All rights reserved. ;;; Copyright (c) 2006, Cleartrip Travel Services. ;;; Copyright (c) 2011 Kevin Rosenberg ;;; Redistribution and use in source and binary forms, with or without ;;; modification, are permitted provided that the following conditions ;;; are met: ;;; * Redistributions of source code must retain the above copyright ;;; notice, this list of conditions and the following disclaimer. ;;; * Redistributions in binary form must reproduce the above ;;; copyright notice, this list of conditions and the following ;;; disclaimer in the documentation and/or other materials ;;; provided with the distribution. ;;; THIS SOFTWARE IS PROVIDED BY THE AUTHOR 'AS IS' AND ANY EXPRESSED ;;; OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED ;;; WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ;;; ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY ;;; DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL ;;; DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE ;;; GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS ;;; INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, ;;; WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING ;;; NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS ;;; SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. (in-package #:memcache) (defmethod print-object ((mc memcache) stream) (print-unreadable-object (mc stream :type t :identity t) (format stream "~A on ~A:~A ~AMB" (when (slot-boundp mc 'name) (name mc)) (when (slot-boundp mc 'host) (host mc)) (when (slot-boundp mc 'port) (port mc)) (when (and (slot-boundp mc 'memcached-server-storage-size) (numberp (slot-value mc 'memcached-server-storage-size))) (/ (memcached-server-storage-size mc) 1024 1024))))) (defmethod initialize-instance :after ((memcache memcache) &rest initargs) (declare (ignore initargs)) (setf (slot-value memcache 'pool) (make-instance 'memcache-connection-pool :name (concatenate 'simple-string (name memcache) " - Connection Pool") :max-capacity (pool-size memcache))) (handler-case (mc-pool-init :memcache memcache) (error () nil)) (let ((stats (handler-case (mc-stats :memcache memcache) (error () nil)))) (if stats (setf (slot-value memcache 'memcached-server-storage-size) (mc-stats-limit-maxbytes stats)) (setf (slot-value memcache 'memcached-server-storage-size) -1)))) (defun make-memcache-instance (&key (host "127.0.0.1") (port 11211) (name "Memcache") (pool-size 5)) "Creates an instance of class MEMCACHE which represents a memcached server." (make-instance 'memcache :name name :host host :port port :pool-size pool-size)) (defmacro with-pool-maybe ((stream memcache use-pool) &body body) "Macro to wrap the use-pool/dont-use-pool stuff and the cleanup around a body of actual action statements" (let ((mc (gensym "MEMCACHE-")) (up (gensym "USE-POOL-")) (us (gensym "USOCKET-"))) `(let* ((,mc ,memcache) (,up ,use-pool) (,us (if ,up (if *pool-get-trys?* (mc-get-from-pool-with-try :memcache ,mc) (mc-get-from-pool :memcache ,mc)) (mc-make-pool-item :memcache ,mc)))) (unwind-protect (when ,us (let ((,stream (usocket:socket-stream ,us))) (handler-case (progn ,@body) (error (c) (when ,up (mc-chuck-from-pool ,us ,mc)) (error c))))) (if ,up (mc-put-in-pool ,us :memcache ,mc) (ignore-errors (usocket:socket-close ,mc))))))) (defun write-string-bytes (string stream) (loop for char across string do (write-byte (char-code char) stream))) (defun send-mc-command (s &rest args &aux started) (dolist (arg args) (unless (null arg) (if started (write-byte (char-code #\space) s) (setq started t)) (typecase arg #+nil (keyword (if (eq :no-reply arg) (write-string-bytes "noreply" s) (write-string-bytes (string-downcase (symbol-name arg)) s))) (string (write-string-bytes arg s)) (character (write-byte (char-code arg) s)) (t (write-string-bytes (princ-to-string arg) s))))) (write-string-bytes +crlf+ s) (force-output s)) ;;; ;;; ;;; Memcached API functionality ;;; ;;; (defun mc-store (key data &key (memcache *memcache*) ((:command command) :set) ((:exptime exptime) 0) ((:use-pool use-pool) *use-pool*) (cas-unique) (flags 0) no-reply) "Stores data in the memcached server using the :command command. key => key by which the data is stored. this is of type SIMPLE-STRING data => data to be stored into the cache. data is a sequence of type (UNSIGNED-BYTE 8) length => size of data memcache => The instance of class memcache which represnts the memcached we want to use. command => The storage command we want to use. There are 3 available : set, add & replace. exptime => The time in seconds when this data expires. 0 is never expire." (declare (type fixnum exptime) (type simple-string key)) (when (and (eq command :cas) (not (integerp cas-unique))) (error "CAS command, but CAS-UNIQUE not set.")) (let ((len (length data))) (with-pool-maybe (s memcache use-pool) (send-mc-command s (ecase command (:set "set") (:add "add") (:replace "replace") (:append "append") (:prepend "prepend") (:cas "cas")) key flags exptime len (when (eq command :cas) cas-unique) (when no-reply "noreply")) (write-sequence data s) (send-mc-command s) (if no-reply (values) (read-crlf-line s))))) (defun mc-get (key-or-keys &key (memcache *memcache*) (use-pool *use-pool*) (command :get)) "Retrive value for key from memcached server. keys-list => is a list of the keys, seperated by whitespace, by which data is stored in memcached memcache => The instance of class memcache which represnts the memcached we want to use. Returns a list of lists where each list has three elements key, flags, and value key is of type SIMPLE-STRING value is of type (UNSIGNED-BYTE 8)" (let* ((multp (listp key-or-keys)) (keys-list (if multp key-or-keys (list key-or-keys))) (res (with-pool-maybe (s memcache use-pool) (apply 'send-mc-command s (ecase command (:get "get") (:gets "gets")) keys-list) (loop for x = (read-crlf-line s) until (string-equal x "END") collect (let* ((status-line (delimited-string-to-list x)) (flags (parse-integer (third status-line))) (len (parse-integer (fourth status-line))) (cas-unique (when (eq command :gets) (parse-integer (fifth status-line)))) (seq (make-sequence '(vector (unsigned-byte 8)) len))) (read-sequence seq s) (read-crlf-line s) (if (eq command :gets) (list (second status-line) flags seq cas-unique) (list (second status-line) flags seq))))))) (if multp res (car res)))) (defun mc-del (key &key (memcache *memcache*) ((:time time) 0) (use-pool *use-pool*) (no-reply)) "Deletes a particular 'key' and it's associated data from the memcached server" (declare (type fixnum time)) (with-pool-maybe (s memcache use-pool) (send-mc-command s "delete" key time (when no-reply "noreply")) (if no-reply (values) (read-crlf-line s)))) (defun incr-or-decr (cmd key delta memcache use-pool no-reply) (declare (type fixnum delta)) (let* ((res (with-pool-maybe (s memcache use-pool) (send-mc-command s cmd key delta (if no-reply "noreply")) (if no-reply (values) (read-crlf-line s)))) (int (unless no-reply (ignore-errors (parse-integer res))))) (or int res))) (defun mc-version (&key (memcache *memcache*) (use-pool *use-pool*)) (let* ((raw (with-pool-maybe (s memcache use-pool) (send-mc-command s "version") (read-crlf-line s))) (split (delimited-string-to-list raw))) (when (string-equal (first split) "VERSION") (second split)))) (defun mc-verbosity (v &key (memcache *memcache*) (use-pool *use-pool*) (no-reply)) (declare (type integer v)) (let ((res (with-pool-maybe (s memcache use-pool) (send-mc-command s "verbosity" v (when no-reply "noreply")) (if no-reply (values) (read-crlf-line s))))) res)) (defun mc-flush-all (&key (time nil) (memcache *memcache*) (use-pool *use-pool*) (no-reply)) (declare (type (or null integer) time)) (let ((res (with-pool-maybe (s memcache use-pool) (if time (send-mc-command s "flush_all" time (when no-reply "noreply")) (send-mc-command s "flush_all" (when no-reply "noreply"))) (if no-reply (values) (read-crlf-line s))))) res)) (defun mc-incr (key &key (memcache *memcache*) (delta 1) (use-pool *use-pool*) (no-reply)) "Implements the INCR command. Increments the value of a key. Please read memcached documentation for more information. key is a string delta is an integer" (incr-or-decr "incr" key delta memcache use-pool no-reply)) (defun mc-decr (key &key (memcache *memcache*) (delta 1) (use-pool *use-pool*) (no-reply)) "Implements the DECR command. Decrements the value of a key. Please read memcached documentation for more information." (incr-or-decr "decr" key delta memcache use-pool no-reply)) (defun mc-stats-raw (&key (memcache *memcache*) (use-pool *use-pool*) args &aux results) "Returns Raw stats data from memcached server to be used by the mc-stats function" (with-pool-maybe (s memcache use-pool) (send-mc-command s "stats" args) (with-output-to-string (str) (loop for line = (read-crlf-line s) do (push line results) until (or (string-equal "END" line) (string-equal "ERROR" line))))) (nreverse results)) (defun mc-get-stat (key stats) (when (stringp key) (setq key (ensure-keyword key))) (get-alist key (mc-stats-all-stats stats))) ;;; Collects statistics from the memcached server (defun mc-stats (&key (memcache *memcache*) (use-pool *use-pool*)) "Returns a struct of type memcache-stats which contains internal statistics from the memcached server instance. Please refer to documentation of memcache-stats for detailed information about each slot" (let* ((result (mc-stats-raw :memcache memcache :use-pool use-pool)) (split (loop with xx = nil for x in result do (setf xx (delimited-string-to-list x)) when (and (string= (first xx) "STAT") (second xx)) collect (cons (second xx) (third xx)))) (all-stats (sort split (lambda (a b) (string-greaterp (car a) (car b))))) (results)) (dolist (r all-stats) (push (cons (ensure-keyword (car r)) (let* ((val (cdr r)) (int (ignore-errors (parse-integer val))) (float (unless int (ignore-errors (parse-float val))))) (cond ((integerp int) int) ((numberp float) float) (t val)))) results)) (make-memcache-stats :all-stats results :pid (get-alist :pid results) :uptime (get-alist :uptime results) :time (get-alist :time results) :version (get-alist :version results) :rusage-user (get-alist :rusage_user results) :rusage-system (get-alist :rusage_system results) :curr-items (get-alist :curr_items results) :curr-items-total (get-alist :curr_items_tot results) :curr-connections (get-alist :curr_connections results) :total-connections (get-alist :total_connections results) :connection-structures (get-alist :connection_structures results) :cmd-get (get-alist :cmd_get results) :cmd-set (get-alist :cmd_set results) :get-hits (get-alist :get_hits results) :get-misses (get-alist :get_misses results) :bytes-read (get-alist :bytes_read results) :bytes-written (get-alist :bytes_written results) :limit-maxbytes (get-alist :limit_maxbytes results) ))) ;;; Error Conditions (define-condition memcached-server-unreachable (error) ((error :initarg :error))) (define-condition memcache-pool-empty (error) ()) (define-condition cannot-make-pool-object (error) ((error :initarg :error))) (define-condition bad-pool-object (error) ()) ;;; ;;; ;;; Memcached Pooled Access ;;; ;;; (defclass memcache-connection-pool () ((name :initarg :name :reader name :initform "Connection Pool" :type simple-string :documentation "Name of this pool") (pool :initform (make-queue) :accessor pool) (pool-lock :reader pool-lock :initform (make-lock "Memcache Connection Pool Lock")) (max-capacity :initarg :max-capacity :reader max-capacity :initform 2 :type fixnum :documentation "Total capacity of the pool to hold pool objects") (current-size :accessor current-size :initform 0) (currently-in-use :accessor currently-in-use :initform 0 :type fixnum :documentation "Pool objects currently in Use") (total-uses :accessor total-uses :initform 0 :documentation "Total uses of the pool") (total-created :accessor total-created :initform 0 :type fixnum :documentation "Total pool objects created") (pool-grow-requests :initform 0 :accessor pool-grow-requests :type fixnum :documentation "Pool Grow Request pending Action") (pool-grow-lock :initform (make-lock "Pool Grow Lock") :reader pool-grow-lock)) (:documentation "A memcached connection pool object")) (defmethod print-object ((mcp memcache-connection-pool) stream) (print-unreadable-object (mcp stream :type t :identity t) (format stream "Capacity:~d, Currently in use:~d" (when (slot-boundp mcp 'max-capacity) (max-capacity mcp)) (when (slot-boundp mcp 'currently-in-use) (currently-in-use mcp))))) (defun mc-put-in-pool (conn &key (memcache *memcache*)) (with-lock-held ((pool-lock (pool memcache))) (enqueue (pool (pool memcache)) conn) (decf (currently-in-use (pool memcache))))) (defun mc-get-from-pool (&key (memcache *memcache*)) "Returns a pool object from pool." (let (pool-object (state t)) (with-lock-held ((pool-lock (pool memcache))) (if (queue-empty-p (pool (pool memcache))) (setf state nil) (progn (incf (currently-in-use (pool memcache))) (incf (total-uses (pool memcache))) (setf pool-object (dequeue (pool (pool memcache))))))) (if state pool-object (error 'memcache-pool-empty)))) (defun mc-get-from-pool-with-try (&key (memcache *memcache*) (tries 5) (try-interval 1)) "" (let ((tr 1)) (loop (progn (when (> tr tries) (return nil)) (let ((conn (handler-case (mc-get-from-pool :memcache memcache) (memcache-pool-empty () nil)))) (if (not conn) (progn (incf tr) (warn "memcache ~a : Connection Pool Empty! I will try again after ~d secs." (name memcache) try-interval) (process-sleep try-interval)) (return conn))))))) (defun mc-pool-init (&key (memcache *memcache*)) "Cleans up the pool for this particular instance of memcache & reinits it with POOL-SIZE number of objects required by this pool" (mc-pool-cleanup memcache) (dotimes (i (pool-size memcache)) (mc-pool-grow-request memcache)) (mc-pool-grow memcache)) (defun mc-make-pool-item (&key (memcache *memcache*)) (handler-case (usocket:socket-connect (host memcache) (port memcache) :element-type '(unsigned-byte 8)) (usocket:socket-error (e) (error 'memcached-server-unreachable :error e)) (error (e) (error 'cannot-make-pool-object :error e)))) (defun mc-pool-grow (memcache) (let (grow-count pool-item-list) (with-lock-held ((pool-grow-lock (pool memcache))) (setf grow-count (pool-grow-requests (pool memcache))) (setf pool-item-list (remove nil (loop for x from 1 to grow-count collect (mc-make-pool-item :memcache memcache)))) (loop for x from 1 to (length pool-item-list) do (with-lock-held ((pool-lock (pool memcache))) (enqueue (pool (pool memcache)) (pop pool-item-list)) (incf (total-created (pool memcache))) (incf (current-size (pool memcache)))) do (decf (pool-grow-requests (pool memcache))))))) (defun mc-destroy-pool-item (pool-item) (ignore-errors (usocket:socket-close pool-item))) (defun mc-pool-grow-request (memcache) (with-lock-held ((pool-grow-lock (pool memcache))) (if (> (max-capacity (pool memcache)) (+ (current-size (pool memcache)) (pool-grow-requests (pool memcache)))) (incf (pool-grow-requests (pool memcache))) (warn "memcache: Pool is at capacity.")))) (defun mc-chuck-from-pool (object memcache) (mc-destroy-pool-item object) (with-lock-held ((pool-lock (pool memcache))) (decf (current-size (pool memcache)))) #|(loop while (mc-pool-grow-request memcache)) (mc-pool-grow memcache)|# (mc-pool-init :memcache memcache)) (defun mc-pool-cleanup (memcache) (with-lock-held ((pool-lock (pool memcache))) (with-lock-held ((pool-grow-lock (pool memcache))) (loop when (queue-empty-p (pool (pool memcache))) do (return) else do (mc-destroy-pool-item (dequeue (pool (pool memcache))))) (setf (current-size (pool memcache)) 0 (currently-in-use (pool memcache)) 0 (pool-grow-requests (pool memcache)) 0 (total-created (pool memcache)) 0 (total-uses (pool memcache)) 0))))