From: Kevin M. Rosenberg Date: Fri, 11 Jul 2003 02:38:00 +0000 (+0000) Subject: r5277: *** empty log message *** X-Git-Tag: v1.96~161 X-Git-Url: http://git.kpe.io/?p=kmrcl.git;a=commitdiff_plain;h=0dc565c13310ce9f59b42b4e4bdd9167e24ca756;ds=sidebyside r5277: *** empty log message *** --- diff --git a/listener.lisp b/listener.lisp index 19fbdbe..dfd70f4 100644 --- a/listener.lisp +++ b/listener.lisp @@ -7,7 +7,7 @@ ;;;; Programmer: Kevin M. Rosenberg ;;;; Date Started: Jun 2003 ;;;; -;;;; $Id: listener.lisp,v 1.3 2003/07/10 18:52:10 kevin Exp $ +;;;; $Id: listener.lisp,v 1.4 2003/07/11 02:37:33 kevin Exp $ ;;;; ************************************************************************* (in-package #:kmrcl) @@ -36,21 +36,27 @@ (name :initform "" :accessor name :initarg :name) (base-name :initform "listener" :accessor base-name :initarg :base-name) (wait :initform nil :accessor wait :initarg :wait) + (timeout :initform nil :accessor timeout :initarg :timeout) + (number-fixed-workers :initform nil :accessor number-fixed-workers + :initarg :number-fixed-workers) (catch-errors :initform nil :accessor catch-errors :initarg :catch-errors) (format :initform :text :accessor listener-format :initarg :format))) -(defclass worker () +(defclass fixed-worker () ((listener :initarg :listener :accessor listener :initform nil) - (connection :initarg :connection :accessor connection :initform nil) (name :initarg :name :accessor name :initform nil) - (thread-fun :initarg :thread-fun :accessor thread-fun :initform nil) (process :initarg :process :accessor process :initform nil))) +(defclass worker (fixed-worker) + ((connection :initarg :connection :accessor connection :initform nil) + (thread-fun :initarg :thread-fun :accessor thread-fun :initform nil))) + + (defmethod print-object ((obj listener) s) (print-unreadable-object (obj s :type t :identity nil) (format s "port ~A" (port obj)))) -(defmethod print-object ((obj worker) s) +(defmethod print-object ((obj fixed-worker) s) (print-unreadable-object (obj s :type t :identity nil) (format s "port ~A" (port (listener obj))))) @@ -80,13 +86,14 @@ (warn "~&listener is not in active list") (return-from init/listener listener)) (dolist (worker (workers listener)) - (with-slots (connection process) worker - (when connection - (errorset (close-active-socket connection) nil) - (setf connection nil)) - (when process - (errorset (destroy-process process) nil) - (setf process nil)))) + (when (and (typep worker 'worker) + (connection worker)) + (errorset (close-active-socket + (connection worker)) nil) + (setf (connection worker) nil)) + (when (process worker) + (errorset (destroy-process (process worker)) nil) + (setf (process worker) nil))) (setf (workers listener) nil) (with-slots (process socket) listener (when socket @@ -127,37 +134,47 @@ (setf (socket listener) (create-inet-listener (port listener) :format (listener-format listener))) - (setf (process listener) (make-process - (name listener) - #'(lambda () (start-socket-server listener))))) + (if (number-fixed-workers listener) + (start-fixed-number-of-workers listener) + (setf (process listener) (make-process + (name listener) + #'(lambda () + (start-socket-server listener)))))) listener) (defmethod initialize-instance :after ((self worker) &key listener connection name &allow-other-keys) - (unless connection - (error "connection not provided to modlisp-worker")) - (setf (slot-value self 'listener) listener) - (setf (slot-value self 'name) name) - (setf (slot-value self 'connection) connection) - (setf (slot-value self 'thread-fun) - #'(lambda () - (unwind-protect - (if (catch-errors listener) - (handler-case - (apply (listener-function listener) - connection - (function-args listener)) - (error (e) - (cmsg "Error ~A [~A]" e name))) - (apply (listener-function listener) - connection - (function-args listener))) - (progn - (errorset (close-active-socket connection) nil) - (cmsg-c :threads "~A ended" name) - (setf (workers listener) - (remove self (workers listener)))))))) + (flet ((do-work () + (apply (listener-function listener) + connection + (function-args listener)))) + (unless connection + (error "connection not provided to modlisp-worker")) + (setf (slot-value self 'listener) listener) + (setf (slot-value self 'name) name) + (setf (slot-value self 'connection) connection) + (setf (slot-value self 'thread-fun) + #'(lambda () + (unwind-protect + (if (catch-errors listener) + (handler-case + (if (timeout listener) + (with-timeout ((timeout listener)) + (do-work)) + (do-work)) + (error (e) + (cmsg "Error ~A [~A]" e name))) + (if (timeout listener) + (with-timeout ((timeout listener)) + (do-work)) + (do-work))) + (progn + (errorset (finish-output connection) nil) + (errorset (close-active-socket connection) nil) + (cmsg-c :threads "~A ended" name) + (setf (workers listener) + (remove self (workers listener))))))))) (defun start-socket-server (listener) (unwind-protect @@ -168,14 +185,16 @@ (apply (listener-function listener) connection (function-args listener)) - (errorset (close connection) nil)) + (progn + (errorset (finish-output connection) nil) + (errorset (close-active-socket connection) nil))) (let ((worker (make-instance 'worker :listener listener :connection connection :name (next-worker-name (base-name listener))))) - (setf (process worker) - (make-process (name worker) (thread-fun worker))) - (push worker (workers listener)))))) + (setf (process worker) + (make-process (name worker) (thread-fun worker))) + (push worker (workers listener)))))) (errorset (close-passive-socket (socket listener)) nil))) #+lispworks @@ -185,13 +204,56 @@ :direction :io :element-type 'base-char))) (if (wait listener) - (apply (listener-function listener) - connection - (function-args listener)) - (let ((worker (make-instance 'worker :listener listener - :connection connection - :name (next-worker-name - (base-name listener))))) - (setf (process worker) - (make-process (name worker) (thread-fun worker))) - (push worker (workers listener)))))) + (progn + (apply (listener-function listener) + connection + (function-args listener)) + (finish-output connection)) + (let ((worker (make-instance 'worker :listener listener + :connection connection + :name (next-worker-name + (base-name listener))))) + (setf (process worker) + (make-process (name worker) (thread-fun worker))) + (push worker (workers listener)))))) + +;; Fixed pool of workers + +(defun start-fixed-number-of-workers (listener) + (dotimes (i (number-fixed-workers listener)) + (let ((name (next-worker-name (base-name listener)))) + (push + (make-instance 'fixed-worker + :name name + :listener listener + :process + (make-process + name #'(lambda () (fixed-worker name listener)))) + (workers listener))))) + + +(defun fixed-worker (name listener) + (loop + (let ((connection (accept-tcp-connection (socket listener)))) + (flet ((do-work () + (apply (listener-function listener) + connection + (function-args listener)))) + (unwind-protect + (handler-case + (if (catch-errors listener) + (handler-case + (if (timeout listener) + (with-timeout ((timeout listener)) + (do-work)) + (do-work)) + (error (e) + (cmsg "Error ~A [~A]" e name))) + (if (timeout listener) + (with-timeout ((timeout listener)) + (do-work)) + (do-work))) + (error (e) + (format t "Error: ~A" e))) + (errorset (finish-output connection) nil) + (errorset (close connection) nil)))))) diff --git a/processes.lisp b/processes.lisp index 64921f7..547b862 100644 --- a/processes.lisp +++ b/processes.lisp @@ -7,7 +7,7 @@ ;;;; Programmer: Kevin M. Rosenberg ;;;; Date Started: June 2003 ;;;; -;;;; $Id: processes.lisp,v 1.2 2003/07/09 22:12:52 kevin Exp $ +;;;; $Id: processes.lisp,v 1.3 2003/07/11 02:37:33 kevin Exp $ ;;;; ************************************************************************* (in-package #:kmrcl) @@ -24,7 +24,7 @@ (defun destroy-process (process) #+cmu (mp:destroy-process process) #+allegro (mp:process-kill process) - #+sbcl-thread (sb-thread:destroy-thread process) + #+sb-thread (sb-thread:destroy-thread process) #+lispworks (mp:process-kill process) ) @@ -44,6 +44,19 @@ `(mp:with-lock (,lock) ,@body) #+sbcl-thread `(sb-thread:with-recursive-lock (,lock) ,@body) + #-(or allegro cmu lispworks sbcl-thread) + `(progn ,@body) ) +(defmacro with-timeout ((seconds) &body body) + #+allegro + `(mp:with-timeout (,seconds) ,@body) + #+cmu + `(mp:with-timeout (,seconds) ,@body) + #+sbcl-thread + `(sb-ext:with-timeout ,seconds ,@body) + #-(or allegro cmu sbcl-thread) + `(progn ,@body) + ) +