r5277: *** empty log message ***
[kmrcl.git] / listener.lisp
index 19fbdbe71b317e0e8d494045e283dacef1fb0000..dfd70f490985ffed9c1cb3d33a60a4d31fba0a86 100644 (file)
@@ -7,7 +7,7 @@
 ;;;; Programmer:    Kevin M. Rosenberg
 ;;;; Date Started:  Jun 2003
 ;;;;
-;;;; $Id: listener.lisp,v 1.3 2003/07/10 18:52:10 kevin Exp $
+;;;; $Id: listener.lisp,v 1.4 2003/07/11 02:37:33 kevin Exp $
 ;;;; *************************************************************************
 
 (in-package #:kmrcl)
    (name :initform "" :accessor name :initarg :name)
    (base-name :initform "listener" :accessor base-name :initarg :base-name)
    (wait :initform nil :accessor wait :initarg :wait)
+   (timeout :initform nil :accessor timeout :initarg :timeout)
+   (number-fixed-workers :initform nil :accessor number-fixed-workers
+                        :initarg :number-fixed-workers)
    (catch-errors :initform nil :accessor catch-errors :initarg :catch-errors)
    (format :initform :text :accessor listener-format :initarg :format)))
 
-(defclass worker ()
+(defclass fixed-worker ()
   ((listener :initarg :listener :accessor listener :initform nil)
-   (connection :initarg :connection :accessor connection :initform nil)
    (name :initarg :name :accessor name :initform nil)
-   (thread-fun :initarg :thread-fun :accessor thread-fun :initform nil)
    (process :initarg :process :accessor process :initform nil)))
 
+(defclass worker (fixed-worker)
+  ((connection :initarg :connection :accessor connection :initform nil)
+   (thread-fun :initarg :thread-fun :accessor thread-fun :initform nil)))
+
+
 (defmethod print-object ((obj listener) s)
   (print-unreadable-object (obj s :type t :identity nil)
     (format s "port ~A" (port obj))))
 
-(defmethod print-object ((obj worker) s)
+(defmethod print-object ((obj fixed-worker) s)
   (print-unreadable-object (obj s :type t :identity nil)
     (format s "port ~A" (port (listener obj)))))
   
        (warn "~&listener is not in active list")
        (return-from init/listener listener))
      (dolist (worker (workers listener))
-       (with-slots (connection process) worker
-        (when connection
-          (errorset (close-active-socket connection) nil)
-          (setf connection nil))
-        (when process
-          (errorset (destroy-process process) nil)
-          (setf process nil))))
+       (when (and (typep worker 'worker)
+                 (connection worker))
+        (errorset (close-active-socket
+                   (connection worker)) nil)
+        (setf (connection worker) nil))
+       (when (process worker)
+        (errorset (destroy-process (process worker)) nil)
+        (setf (process worker) nil)))
      (setf (workers listener) nil)
      (with-slots (process socket) listener
        (when socket
     (setf (socket listener) (create-inet-listener
                             (port listener)
                             :format (listener-format listener)))
-    (setf (process listener) (make-process
-                             (name listener)
-                             #'(lambda () (start-socket-server listener)))))
+    (if (number-fixed-workers listener)
+       (start-fixed-number-of-workers listener)
+       (setf (process listener) (make-process
+                                 (name listener)
+                                 #'(lambda ()
+                                     (start-socket-server listener))))))
   listener)
 
 
 (defmethod initialize-instance :after
     ((self worker) &key listener connection name &allow-other-keys)
-  (unless connection
-    (error "connection not provided to modlisp-worker"))
-  (setf (slot-value self 'listener) listener)
-  (setf (slot-value self 'name) name)
-  (setf (slot-value self 'connection) connection)
-  (setf (slot-value self 'thread-fun)
-       #'(lambda ()
-           (unwind-protect
-               (if (catch-errors listener)
-                   (handler-case
-                       (apply (listener-function listener)
-                              connection
-                              (function-args listener))
-                     (error (e)
-                       (cmsg "Error ~A [~A]" e name)))
-                 (apply (listener-function listener)
-                        connection
-                        (function-args listener)))
-         (progn
-           (errorset (close-active-socket connection) nil)
-           (cmsg-c :threads "~A ended" name)
-           (setf (workers listener)
-                 (remove self (workers listener))))))))
+  (flet ((do-work ()
+          (apply (listener-function listener)
+                 connection
+                 (function-args listener))))
+    (unless connection
+      (error "connection not provided to modlisp-worker"))
+    (setf (slot-value self 'listener) listener)
+    (setf (slot-value self 'name) name)
+    (setf (slot-value self 'connection) connection)
+    (setf (slot-value self 'thread-fun)
+         #'(lambda ()
+             (unwind-protect
+                  (if (catch-errors listener)
+                      (handler-case
+                          (if (timeout listener)
+                              (with-timeout ((timeout listener))
+                                (do-work))
+                              (do-work))
+                        (error (e)
+                          (cmsg "Error ~A [~A]" e name)))
+                      (if (timeout listener)
+                          (with-timeout ((timeout listener))
+                            (do-work))
+                          (do-work)))
+               (progn
+                 (errorset (finish-output connection) nil)
+                 (errorset (close-active-socket connection) nil)
+                 (cmsg-c :threads "~A ended" name)
+                 (setf (workers listener)
+                       (remove self (workers listener)))))))))
 
 (defun start-socket-server (listener)
   (unwind-protect
                  (apply (listener-function listener)
                         connection
                         (function-args listener))
-              (errorset (close connection) nil))
+              (progn
+                (errorset (finish-output connection) nil)
+                (errorset (close-active-socket connection) nil)))
             (let ((worker (make-instance 'worker :listener listener
                                          :connection connection
                                          :name (next-worker-name
                                                 (base-name listener)))))
-             (setf (process worker)
-               (make-process (name worker) (thread-fun worker)))
-             (push worker (workers listener))))))
+              (setf (process worker)
+                    (make-process (name worker) (thread-fun worker)))
+              (push worker (workers listener))))))
     (errorset (close-passive-socket (socket listener)) nil)))
 
 #+lispworks
                      :direction :io
                      :element-type 'base-char)))
     (if (wait listener)
-       (apply (listener-function listener)
-              connection
-              (function-args listener))
-      (let ((worker (make-instance 'worker :listener listener
-                                  :connection connection
-                                  :name (next-worker-name
-                                                 (base-name listener)))))
-       (setf (process worker)
-         (make-process (name worker) (thread-fun worker)))
-       (push worker (workers listener))))))
+       (progn
+         (apply (listener-function listener)
+                connection
+                (function-args listener))
+         (finish-output connection))
+       (let ((worker (make-instance 'worker :listener listener
+                                    :connection connection
+                                    :name (next-worker-name
+                                           (base-name listener)))))
+         (setf (process worker)
+               (make-process (name worker) (thread-fun worker)))
+         (push worker (workers listener))))))
+
+;; Fixed pool of workers
+
+(defun start-fixed-number-of-workers (listener)
+  (dotimes (i (number-fixed-workers listener))
+    (let ((name (next-worker-name (base-name listener))))
+      (push
+       (make-instance 'fixed-worker
+                     :name name
+                     :listener listener
+                     :process
+                     (make-process
+                      name #'(lambda () (fixed-worker name listener))))
+       (workers listener)))))
+
+
+(defun fixed-worker (name listener)
+  (loop 
+   (let ((connection (accept-tcp-connection (socket listener))))
+     (flet ((do-work ()
+             (apply (listener-function listener)
+                    connection
+                    (function-args listener))))
+       (unwind-protect
+           (handler-case
+               (if (catch-errors listener)
+                   (handler-case
+                       (if (timeout listener)
+                           (with-timeout ((timeout listener))
+                             (do-work))
+                           (do-work))
+                     (error (e)
+                       (cmsg "Error ~A [~A]" e name)))
+                   (if (timeout listener)
+                       (with-timeout ((timeout listener))
+                         (do-work))
+                       (do-work)))
+             (error (e)
+               (format t "Error: ~A" e)))
+        (errorset (finish-output connection) nil)
+        (errorset (close connection) nil))))))