r9949: delete-directory-and-files update
[kmrcl.git] / listener.lisp
index d37d72d924c54f51a3c82c948ba149e5d71667d0..0b31cefd745d2eb174890d541bbb4505d9a3da4a 100644 (file)
@@ -5,9 +5,15 @@
 ;;;; Name:          listener.lisp
 ;;;; Purpose:       Listener and worker processes
 ;;;; Programmer:    Kevin M. Rosenberg
-;;;; Date Started:  Dec 2002
+;;;; Date Started:  Jun 2003
 ;;;;
-;;;; $Id: listener.lisp,v 1.2 2003/07/09 22:12:52 kevin Exp $
+;;;; $Id$
+;;;;
+;;;; This file, part of KMRCL, is Copyright (c) 2002-2003 by Kevin M. Rosenberg
+;;;;
+;;;; KMRCL users are granted the rights to distribute and use this software
+;;;; as governed by the terms of the Lisp Lesser GNU Public License
+;;;; (http://opensource.franz.com/preamble.html), also known as the LLGPL.
 ;;;; *************************************************************************
 
 (in-package #:kmrcl)
             :initform nil)
    (function-args :initarg :function-args :accessor function-args
                  :initform nil)
-   (process :initarg :process :accessor process)
-   (socket :initarg :socket :accessor socket)
+   (process :initarg :process :accessor process :initform nil)
+   (socket :initarg :socket :accessor socket :initform nil)
    (workers :initform nil :accessor workers
            :documentation "list of worker threads")
    (name :initform "" :accessor name :initarg :name)
    (base-name :initform "listener" :accessor base-name :initarg :base-name)
    (wait :initform nil :accessor wait :initarg :wait)
+   (timeout :initform nil :accessor timeout :initarg :timeout)
+   (number-fixed-workers :initform nil :accessor number-fixed-workers
+                        :initarg :number-fixed-workers)
    (catch-errors :initform nil :accessor catch-errors :initarg :catch-errors)
+   (remote-host-checker :initform nil :accessor remote-host-checker
+                       :initarg :remote-host-checker)
    (format :initform :text :accessor listener-format :initarg :format)))
 
-(defclass worker ()
+(defclass fixed-worker ()
   ((listener :initarg :listener :accessor listener :initform nil)
-   (connection :initarg :connection :accessor connection :initform nil)
    (name :initarg :name :accessor name :initform nil)
-   (thread-fun :initarg :thread-fun :accessor thread-fun :initform nil)
    (process :initarg :process :accessor process :initform nil)))
 
+(defclass worker (fixed-worker)
+  ((connection :initarg :connection :accessor connection :initform nil)
+   (thread-fun :initarg :thread-fun :accessor thread-fun :initform nil)))
+
+
 (defmethod print-object ((obj listener) s)
   (print-unreadable-object (obj s :type t :identity nil)
     (format s "port ~A" (port obj))))
 
-(defmethod print-object ((obj worker) s)
+(defmethod print-object ((obj fixed-worker) s)
   (print-unreadable-object (obj s :type t :identity nil)
     (format s "port ~A" (port (listener obj)))))
   
   (case state
     (:start
      (when (member listener *active-listeners*)
-       (warn "~&listener already started")
-       (return-from init/listener listener))
-     (handler-case
-        (progn
-          (setf (name listener) (next-server-name (base-name listener)))
-          (make-socket-server listener))
-       (error (e)
-        (format t "~&Error while trying to start listener~&  ~A" e)
-        (decf *listener-count*)
-        nil)
-       (:no-error (res)
-        (declare (ignore res))
-        (push listener *active-listeners*)
-        listener)))
+       (cmsg "~&listener ~A already initialized" listener)
+       (return-from init/listener))
+     (when (listener-startup listener)
+       (push listener *active-listeners*)
+       listener))
     (:stop
      (unless (member listener *active-listeners*)
-       (warn "~&listener is not in active list")
+       (cmsg "~&listener ~A is not in active list" listener)
        (return-from init/listener listener))
-     (dolist (worker (workers listener))
-       (close-active-socket (connection worker))
-       (destroy-process (process worker))
-       (setf (connection worker) nil)
-       (setf (process worker) nil))
-     (setf (workers listener) nil)
-     (with-slots (process socket) listener
-       (errorset (close-passive-socket socket) t)
-       (errorset (destroy-process process) t)
-       (setf process nil)
-       (setf socket nil))
+     (listener-shutdown listener)
      (setq *active-listeners* (remove listener *active-listeners*)))
     (:restart
      (init/listener listener :stop)
     (ignore-errors
        (init/listener listener :stop))))
 
+(defun listener-startup (listener)
+  (handler-case
+      (progn
+       (setf (name listener) (next-server-name (base-name listener)))
+       (make-socket-server listener))
+    (error (e)
+      (format t "~&Error while trying to start listener on port ~A~&  ~A" 
+             (port listener) e)
+      (decf *listener-count*)
+      nil)
+    (:no-error (res)
+      (declare (ignore res))
+      listener)))
+
+(defun listener-shutdown (listener)
+  (dolist (worker (workers listener))
+    (when (and (typep worker 'worker)
+              (connection worker))
+      (errorset (close-active-socket
+                (connection worker)) nil)
+      (setf (connection worker) nil))
+    (when (process worker)
+      (errorset (destroy-process (process worker)) nil)
+      (setf (process worker) nil)))
+  (setf (workers listener) nil)
+  (with-slots (process socket) listener
+    (when socket
+      (errorset (close-passive-socket socket) nil)
+      (setf socket nil))
+    (when process
+      (errorset (destroy-process process) nil)
+      (setf process nil))))
+
 ;; Low-level functions
 
 (defun next-server-name (base-name)
-  (format nil "~A-socket-server-~D" base-name (incf *listener-count*))) 
+  (format nil "~D-~A-socket-server" (incf *listener-count*) base-name)) 
 
 (defun next-worker-name (base-name)
-  (format nil "~A-worker-~D" base-name (incf *worker-count*)))
+  (format nil "~D-~A-worker"  (incf *worker-count*) base-name))
 
 (defun make-socket-server (listener)
-  (setf (socket listener) (create-inet-listener
-                          (port listener)
-                          :format (listener-format listener)))
-  (setf (process listener) (make-process
-                           (name listener)
-                           #'(lambda () (start-socket-server listener))))
+  #+lispworks
+  (progn
+    (setf (process listener)
+      (comm:start-up-server :process-name (name listener)
+                           :service (port listener) 
+                           :function
+                           #'(lambda (handle) 
+                               (lw-worker handle listener)))))
+  #-lispworks
+  (progn
+    (setf (socket listener) (create-inet-listener
+                            (port listener)
+                            :format (listener-format listener)))
+    (if (number-fixed-workers listener)
+       (start-fixed-number-of-workers listener)
+       (setf (process listener) (make-process
+                                 (name listener)
+                                 #'(lambda ()
+                                     (start-socket-server listener))))))
   listener)
 
 
 (defmethod initialize-instance :after
     ((self worker) &key listener connection name &allow-other-keys)
-  (unless connection
-    (error "connection not provided to modlisp-worker"))
-  (setf (slot-value self 'listener) listener)
-  (setf (slot-value self 'name) name)
-  (setf (slot-value self 'connection) connection)
-  (setf (slot-value self 'thread-fun)
-       #'(lambda ()
-           (unwind-protect
-               (if (catch-errors listener)
-                   (handler-case
-                       (apply (listener-function listener)
-                              connection
-                              (function-args listener))
-                     (error (e)
-                       (cmsg "Error ~A [~A]" e name)))
-                 (apply (listener-function listener)
-                        connection
-                        (function-args listener)))
-         (progn
-           (errorset (close-active-socket connection) nil)
-           (cmsg-c :threads "~A ended" name)
-           (setf (workers listener)
-                 (remove self (workers listener))))))))
+  (flet ((do-work ()
+          (apply (listener-function listener)
+                 connection
+                 (function-args listener))))
+    (unless connection
+      (error "connection not provided to modlisp-worker"))
+    (setf (slot-value self 'listener) listener)
+    (setf (slot-value self 'name) name)
+    (setf (slot-value self 'connection) connection)
+    (setf (slot-value self 'thread-fun)
+         #'(lambda ()
+             (unwind-protect
+                  (if (catch-errors listener)
+                      (handler-case
+                          (if (timeout listener)
+                              (with-timeout ((timeout listener))
+                                (do-work))
+                              (do-work))
+                        (error (e)
+                          (cmsg "Error ~A [~A]" e name)))
+                      (if (timeout listener)
+                          (with-timeout ((timeout listener))
+                            (do-work))
+                          (do-work)))
+               (progn
+                 (errorset (finish-output connection) nil)
+                 (errorset (close-active-socket connection) nil)
+                 (cmsg-c :threads "~A ended" name)
+                 (setf (workers listener)
+                       (remove self (workers listener)))))))))
+
+(defun accept-and-check-tcp-connection (listener)
+  (multiple-value-bind (conn socket) (accept-tcp-connection (socket listener))
+    (when (and (remote-host-checker listener)
+              (not (funcall (remote-host-checker listener)
+                            (remote-host socket))))
+      (cmsg-c :thread "Deny connection from ~A" (remote-host conn))
+      (errorset (close-active-socket conn) nil)
+      (setq conn nil))
+    conn))
 
 (defun start-socket-server (listener)
   (unwind-protect
       (loop 
-       (let ((connection (accept-tcp-connection (socket listener))))
-        (if (wait listener)
-            (unwind-protect
-                 (apply (listener-function listener)
-                        connection
-                        (function-args listener))
-              (errorset (close connection) nil))
-            (let ((worker (make-instance 'worker :listener listener
-                                         :connection connection
-                                         :name (next-worker-name
-                                                (base-name listener)))))
-             (setf (process worker)
-               (make-process (name worker) (thread-fun worker)))
-             (push worker (workers listener))))))
+       (let ((connection (accept-and-check-tcp-connection listener)))
+        (when connection
+          (if (wait listener)
+              (unwind-protect
+                   (apply (listener-function listener)
+                          connection
+                          (function-args listener))
+                (progn
+                  (errorset (finish-output connection) nil)
+                  (errorset (close-active-socket connection) nil)))
+              (let ((worker (make-instance 'worker :listener listener
+                                           :connection connection
+                                           :name (next-worker-name
+                                                  (base-name listener)))))
+                (setf (process worker)
+                      (make-process (name worker) (thread-fun worker)))
+                (push worker (workers listener)))))))
     (errorset (close-passive-socket (socket listener)) nil)))
+
+#+lispworks
+(defun lw-worker (handle listener)
+  (let ((connection (make-instance 'comm:socket-stream
+                     :socket handle
+                     :direction :io
+                     :element-type 'base-char)))
+    (if (wait listener)
+       (progn
+         (apply (listener-function listener)
+                connection
+                (function-args listener))
+         (finish-output connection))
+       (let ((worker (make-instance 'worker :listener listener
+                                    :connection connection
+                                    :name (next-worker-name
+                                           (base-name listener)))))
+         (setf (process worker)
+               (make-process (name worker) (thread-fun worker)))
+         (push worker (workers listener))))))
+
+;; Fixed pool of workers
+
+(defun start-fixed-number-of-workers (listener)
+  (dotimes (i (number-fixed-workers listener))
+    (let ((name (next-worker-name (base-name listener))))
+      (push
+       (make-instance 'fixed-worker
+                     :name name
+                     :listener listener
+                     :process
+                     (make-process
+                      name #'(lambda () (fixed-worker name listener))))
+       (workers listener)))))
+
+
+(defun fixed-worker (name listener)
+  (loop 
+   (let ((connection (accept-and-check-tcp-connection listener)))
+     (when connection
+       (flet ((do-work ()
+               (apply (listener-function listener)
+                      connection
+                      (function-args listener))))
+        (unwind-protect
+             (handler-case
+                 (if (catch-errors listener)
+                     (handler-case
+                         (if (timeout listener)
+                             (with-timeout ((timeout listener))
+                               (do-work))
+                             (do-work))
+                       (error (e)
+                         (cmsg "Error ~A [~A]" e name)))
+                     (if (timeout listener)
+                         (with-timeout ((timeout listener))
+                           (do-work))
+                         (do-work)))
+               (error (e)
+                 (format t "Error: ~A" e)))
+          (errorset (finish-output connection) nil)
+          (errorset (close connection) nil)))))))
+