X-Git-Url: http://git.kpe.io/?p=kmrcl.git;a=blobdiff_plain;f=listener.lisp;h=6c511cf9260b08976e2658af04b0ce106f7689f9;hp=0b31cefd745d2eb174890d541bbb4505d9a3da4a;hb=03712fbb06acbb103602bae10f41aeae7fa05127;hpb=739b14ee8844dc777b174105646df3abcb865282 diff --git a/listener.lisp b/listener.lisp index 0b31cef..6c511cf 100644 --- a/listener.lisp +++ b/listener.lisp @@ -30,24 +30,24 @@ "List of active listeners") (defclass listener () - ((port :initarg :port :accessor port) + ((port :initarg :port :accessor port) (function :initarg :function :accessor listener-function - :initform nil) + :initform nil) (function-args :initarg :function-args :accessor function-args - :initform nil) + :initform nil) (process :initarg :process :accessor process :initform nil) (socket :initarg :socket :accessor socket :initform nil) (workers :initform nil :accessor workers - :documentation "list of worker threads") + :documentation "list of worker threads") (name :initform "" :accessor name :initarg :name) (base-name :initform "listener" :accessor base-name :initarg :base-name) (wait :initform nil :accessor wait :initarg :wait) (timeout :initform nil :accessor timeout :initarg :timeout) (number-fixed-workers :initform nil :accessor number-fixed-workers - :initarg :number-fixed-workers) + :initarg :number-fixed-workers) (catch-errors :initform nil :accessor catch-errors :initarg :catch-errors) (remote-host-checker :initform nil :accessor remote-host-checker - :initarg :remote-host-checker) + :initarg :remote-host-checker) (format :initform :text :accessor listener-format :initarg :format))) (defclass fixed-worker () @@ -67,7 +67,7 @@ (defmethod print-object ((obj fixed-worker) s) (print-unreadable-object (obj s :type t :identity nil) (format s "port ~A" (port (listener obj))))) - + ;; High-level API (defun init/listener (listener state) @@ -98,11 +98,11 @@ (defun listener-startup (listener) (handler-case (progn - (setf (name listener) (next-server-name (base-name listener))) - (make-socket-server listener)) + (setf (name listener) (next-server-name (base-name listener))) + (make-socket-server listener)) (error (e) - (format t "~&Error while trying to start listener on port ~A~& ~A" - (port listener) e) + (format t "~&Error while trying to start listener on port ~A~& ~A" + (port listener) e) (decf *listener-count*) nil) (:no-error (res) @@ -112,9 +112,9 @@ (defun listener-shutdown (listener) (dolist (worker (workers listener)) (when (and (typep worker 'worker) - (connection worker)) + (connection worker)) (errorset (close-active-socket - (connection worker)) nil) + (connection worker)) nil) (setf (connection worker) nil)) (when (process worker) (errorset (destroy-process (process worker)) nil) @@ -131,7 +131,7 @@ ;; Low-level functions (defun next-server-name (base-name) - (format nil "~D-~A-socket-server" (incf *listener-count*) base-name)) + (format nil "~D-~A-socket-server" (incf *listener-count*) base-name)) (defun next-worker-name (base-name) (format nil "~D-~A-worker" (incf *worker-count*) base-name)) @@ -141,62 +141,62 @@ (progn (setf (process listener) (comm:start-up-server :process-name (name listener) - :service (port listener) - :function - #'(lambda (handle) - (lw-worker handle listener))))) + :service (port listener) + :function + #'(lambda (handle) + (lw-worker handle listener))))) #-lispworks (progn (setf (socket listener) (create-inet-listener - (port listener) - :format (listener-format listener))) + (port listener) + :format (listener-format listener))) (if (number-fixed-workers listener) - (start-fixed-number-of-workers listener) - (setf (process listener) (make-process - (name listener) - #'(lambda () - (start-socket-server listener)))))) + (start-fixed-number-of-workers listener) + (setf (process listener) (make-process + (name listener) + #'(lambda () + (start-socket-server listener)))))) listener) (defmethod initialize-instance :after ((self worker) &key listener connection name &allow-other-keys) (flet ((do-work () - (apply (listener-function listener) - connection - (function-args listener)))) + (apply (listener-function listener) + connection + (function-args listener)))) (unless connection (error "connection not provided to modlisp-worker")) (setf (slot-value self 'listener) listener) (setf (slot-value self 'name) name) (setf (slot-value self 'connection) connection) (setf (slot-value self 'thread-fun) - #'(lambda () - (unwind-protect - (if (catch-errors listener) - (handler-case - (if (timeout listener) - (with-timeout ((timeout listener)) - (do-work)) - (do-work)) - (error (e) - (cmsg "Error ~A [~A]" e name))) - (if (timeout listener) - (with-timeout ((timeout listener)) - (do-work)) - (do-work))) - (progn - (errorset (finish-output connection) nil) - (errorset (close-active-socket connection) nil) - (cmsg-c :threads "~A ended" name) - (setf (workers listener) - (remove self (workers listener))))))))) + #'(lambda () + (unwind-protect + (if (catch-errors listener) + (handler-case + (if (timeout listener) + (with-timeout ((timeout listener)) + (do-work)) + (do-work)) + (error (e) + (cmsg "Error ~A [~A]" e name))) + (if (timeout listener) + (with-timeout ((timeout listener)) + (do-work)) + (do-work))) + (progn + (errorset (finish-output connection) nil) + (errorset (close-active-socket connection) nil) + (cmsg-c :threads "~A ended" name) + (setf (workers listener) + (remove self (workers listener))))))))) (defun accept-and-check-tcp-connection (listener) (multiple-value-bind (conn socket) (accept-tcp-connection (socket listener)) (when (and (remote-host-checker listener) - (not (funcall (remote-host-checker listener) - (remote-host socket)))) + (not (funcall (remote-host-checker listener) + (remote-host socket)))) (cmsg-c :thread "Deny connection from ~A" (remote-host conn)) (errorset (close-active-socket conn) nil) (setq conn nil)) @@ -204,45 +204,45 @@ (defun start-socket-server (listener) (unwind-protect - (loop + (loop (let ((connection (accept-and-check-tcp-connection listener))) - (when connection - (if (wait listener) - (unwind-protect - (apply (listener-function listener) - connection - (function-args listener)) - (progn - (errorset (finish-output connection) nil) - (errorset (close-active-socket connection) nil))) - (let ((worker (make-instance 'worker :listener listener - :connection connection - :name (next-worker-name - (base-name listener))))) - (setf (process worker) - (make-process (name worker) (thread-fun worker))) - (push worker (workers listener))))))) + (when connection + (if (wait listener) + (unwind-protect + (apply (listener-function listener) + connection + (function-args listener)) + (progn + (errorset (finish-output connection) nil) + (errorset (close-active-socket connection) nil))) + (let ((worker (make-instance 'worker :listener listener + :connection connection + :name (next-worker-name + (base-name listener))))) + (setf (process worker) + (make-process (name worker) (thread-fun worker))) + (push worker (workers listener))))))) (errorset (close-passive-socket (socket listener)) nil))) #+lispworks (defun lw-worker (handle listener) (let ((connection (make-instance 'comm:socket-stream - :socket handle - :direction :io - :element-type 'base-char))) + :socket handle + :direction :io + :element-type 'base-char))) (if (wait listener) - (progn - (apply (listener-function listener) - connection - (function-args listener)) - (finish-output connection)) - (let ((worker (make-instance 'worker :listener listener - :connection connection - :name (next-worker-name - (base-name listener))))) - (setf (process worker) - (make-process (name worker) (thread-fun worker))) - (push worker (workers listener)))))) + (progn + (apply (listener-function listener) + connection + (function-args listener)) + (finish-output connection)) + (let ((worker (make-instance 'worker :listener listener + :connection connection + :name (next-worker-name + (base-name listener))))) + (setf (process worker) + (make-process (name worker) (thread-fun worker))) + (push worker (workers listener)))))) ;; Fixed pool of workers @@ -251,38 +251,38 @@ (let ((name (next-worker-name (base-name listener)))) (push (make-instance 'fixed-worker - :name name - :listener listener - :process - (make-process - name #'(lambda () (fixed-worker name listener)))) + :name name + :listener listener + :process + (make-process + name #'(lambda () (fixed-worker name listener)))) (workers listener))))) (defun fixed-worker (name listener) - (loop + (loop (let ((connection (accept-and-check-tcp-connection listener))) (when connection (flet ((do-work () - (apply (listener-function listener) - connection - (function-args listener)))) - (unwind-protect - (handler-case - (if (catch-errors listener) - (handler-case - (if (timeout listener) - (with-timeout ((timeout listener)) - (do-work)) - (do-work)) - (error (e) - (cmsg "Error ~A [~A]" e name))) - (if (timeout listener) - (with-timeout ((timeout listener)) - (do-work)) - (do-work))) - (error (e) - (format t "Error: ~A" e))) - (errorset (finish-output connection) nil) - (errorset (close connection) nil))))))) - + (apply (listener-function listener) + connection + (function-args listener)))) + (unwind-protect + (handler-case + (if (catch-errors listener) + (handler-case + (if (timeout listener) + (with-timeout ((timeout listener)) + (do-work)) + (do-work)) + (error (e) + (cmsg "Error ~A [~A]" e name))) + (if (timeout listener) + (with-timeout ((timeout listener)) + (do-work)) + (do-work))) + (error (e) + (format t "Error: ~A" e))) + (errorset (finish-output connection) nil) + (errorset (close connection) nil))))))) +