New upstream and debian-specific file updates
[kmrcl.git] / listener.lisp
index 59954057c4c98eeb7c3acccac29c89d8187c0d05..6c511cf9260b08976e2658af04b0ce106f7689f9 100644 (file)
@@ -5,9 +5,15 @@
 ;;;; Name:          listener.lisp
 ;;;; Purpose:       Listener and worker processes
 ;;;; Programmer:    Kevin M. Rosenberg
-;;;; Date Started:  Dec 2002
+;;;; Date Started:  Jun 2003
 ;;;;
-;;;; $Id: listener.lisp,v 1.1 2003/07/08 16:12:40 kevin Exp $
+;;;; $Id$
+;;;;
+;;;; This file, part of KMRCL, is Copyright (c) 2002-2003 by Kevin M. Rosenberg
+;;;;
+;;;; KMRCL users are granted the rights to distribute and use this software
+;;;; as governed by the terms of the Lisp Lesser GNU Public License
+;;;; (http://opensource.franz.com/preamble.html), also known as the LLGPL.
 ;;;; *************************************************************************
 
 (in-package #:kmrcl)
     "List of active listeners")
 
 (defclass listener ()
-  ((port :initarg :port :accessor port) 
+  ((port :initarg :port :accessor port)
    (function :initarg :function :accessor listener-function
-            :initform nil)
+             :initform nil)
    (function-args :initarg :function-args :accessor function-args
-                 :initform nil)
-   (process :initarg :process :accessor process)
-   (socket :initarg :socket :accessor socket)
+                  :initform nil)
+   (process :initarg :process :accessor process :initform nil)
+   (socket :initarg :socket :accessor socket :initform nil)
    (workers :initform nil :accessor workers
-           :documentation "list of worker threads")
+            :documentation "list of worker threads")
    (name :initform "" :accessor name :initarg :name)
    (base-name :initform "listener" :accessor base-name :initarg :base-name)
    (wait :initform nil :accessor wait :initarg :wait)
+   (timeout :initform nil :accessor timeout :initarg :timeout)
+   (number-fixed-workers :initform nil :accessor number-fixed-workers
+                         :initarg :number-fixed-workers)
    (catch-errors :initform nil :accessor catch-errors :initarg :catch-errors)
+   (remote-host-checker :initform nil :accessor remote-host-checker
+                        :initarg :remote-host-checker)
    (format :initform :text :accessor listener-format :initarg :format)))
 
-(defclass worker ()
+(defclass fixed-worker ()
   ((listener :initarg :listener :accessor listener :initform nil)
-   (connection :initarg :connection :accessor connection :initform nil)
    (name :initarg :name :accessor name :initform nil)
-   (thread-fun :initarg :thread-fun :accessor thread-fun :initform nil)
    (process :initarg :process :accessor process :initform nil)))
 
+(defclass worker (fixed-worker)
+  ((connection :initarg :connection :accessor connection :initform nil)
+   (thread-fun :initarg :thread-fun :accessor thread-fun :initform nil)))
+
+
+(defmethod print-object ((obj listener) s)
+  (print-unreadable-object (obj s :type t :identity nil)
+    (format s "port ~A" (port obj))))
+
+(defmethod print-object ((obj fixed-worker) s)
+  (print-unreadable-object (obj s :type t :identity nil)
+    (format s "port ~A" (port (listener obj)))))
 
 ;; High-level API
 
   (case state
     (:start
      (when (member listener *active-listeners*)
-       (warn "~&listener already started")
-       (return-from init/listener listener))
-     (handler-case
-        (progn
-          (setf (name listener) (next-server-name (base-name listener)))
-          (make-socket-server listener))
-       (error (e)
-        (format t "~&Error while trying to start listener~&  ~A" e)
-        (decf *listener-count*)
-        nil)
-       (:no-error (res)
-        (declare (ignore res))
-        (push listener *active-listeners*)
-        listener)))
+       (cmsg "~&listener ~A already initialized" listener)
+       (return-from init/listener))
+     (when (listener-startup listener)
+       (push listener *active-listeners*)
+       listener))
     (:stop
      (unless (member listener *active-listeners*)
-       (warn "~&listener is not in active list")
+       (cmsg "~&listener ~A is not in active list" listener)
        (return-from init/listener listener))
-     (dolist (worker (workers listener))
-       (close-active-socket (connection worker))
-       (destroy-process (process worker)))
-     (setf (workers listener) nil)
-     (with-slots (process socket) listener
-       (errorset (close-passive-socket socket) t)
-       (errorset (destroy-process process) t))
+     (listener-shutdown listener)
      (setq *active-listeners* (remove listener *active-listeners*)))
     (:restart
      (init/listener listener :stop)
     (ignore-errors
        (init/listener listener :stop))))
 
+(defun listener-startup (listener)
+  (handler-case
+      (progn
+        (setf (name listener) (next-server-name (base-name listener)))
+        (make-socket-server listener))
+    (error (e)
+      (format t "~&Error while trying to start listener on port ~A~&  ~A"
+              (port listener) e)
+      (decf *listener-count*)
+      nil)
+    (:no-error (res)
+      (declare (ignore res))
+      listener)))
+
+(defun listener-shutdown (listener)
+  (dolist (worker (workers listener))
+    (when (and (typep worker 'worker)
+               (connection worker))
+      (errorset (close-active-socket
+                 (connection worker)) nil)
+      (setf (connection worker) nil))
+    (when (process worker)
+      (errorset (destroy-process (process worker)) nil)
+      (setf (process worker) nil)))
+  (setf (workers listener) nil)
+  (with-slots (process socket) listener
+    (when socket
+      (errorset (close-passive-socket socket) nil)
+      (setf socket nil))
+    (when process
+      (errorset (destroy-process process) nil)
+      (setf process nil))))
+
 ;; Low-level functions
 
 (defun next-server-name (base-name)
-  (format nil "~A-socket-server-~D" base-name (incf *listener-count*))) 
+  (format nil "~D-~A-socket-server" (incf *listener-count*) base-name))
 
 (defun next-worker-name (base-name)
-  (format nil "~A-worker-~D" base-name (incf *worker-count*)))
+  (format nil "~D-~A-worker"  (incf *worker-count*) base-name))
 
 (defun make-socket-server (listener)
-  (setf (socket listener) (create-inet-listener
-                          (port listener)
-                          :format (listener-format listener)))
-  (setf (process listener) (make-process
-                           (name listener)
-                           #'(lambda () (start-socket-server listener))))
+  #+lispworks
+  (progn
+    (setf (process listener)
+      (comm:start-up-server :process-name (name listener)
+                            :service (port listener)
+                            :function
+                            #'(lambda (handle)
+                                (lw-worker handle listener)))))
+  #-lispworks
+  (progn
+    (setf (socket listener) (create-inet-listener
+                             (port listener)
+                             :format (listener-format listener)))
+    (if (number-fixed-workers listener)
+        (start-fixed-number-of-workers listener)
+        (setf (process listener) (make-process
+                                  (name listener)
+                                  #'(lambda ()
+                                      (start-socket-server listener))))))
   listener)
 
 
 (defmethod initialize-instance :after
     ((self worker) &key listener connection name &allow-other-keys)
-  (unless connection
-    (error "connection not provided to modlisp-worker"))
-  (setf (slot-value self 'listener) listener)
-  (setf (slot-value self 'name) name)
-  (setf (slot-value self 'connection) connection)
-  (setf (slot-value self 'thread-fun)
-       #'(lambda ()
-           (unwind-protect
-               (if (catch-errors listener)
-                   (handler-case
-                       (apply (listener-function listener)
-                              connection
-                              (function-args listener))
-                     (error (e)
-                       (cmsg "Error ~A [~A]" e name)))
-                 (apply (listener-function listener)
-                        connection
-                        (function-args listener)))
-         (progn
-           (errorset (close-active-socket connection) nil)
-           (cmsg-c :threads "~A ended" name)
-           (setf (workers listener)
-                 (remove self (workers listener))))))))
+  (flet ((do-work ()
+           (apply (listener-function listener)
+                  connection
+                  (function-args listener))))
+    (unless connection
+      (error "connection not provided to modlisp-worker"))
+    (setf (slot-value self 'listener) listener)
+    (setf (slot-value self 'name) name)
+    (setf (slot-value self 'connection) connection)
+    (setf (slot-value self 'thread-fun)
+          #'(lambda ()
+              (unwind-protect
+                   (if (catch-errors listener)
+                       (handler-case
+                           (if (timeout listener)
+                               (with-timeout ((timeout listener))
+                                 (do-work))
+                               (do-work))
+                         (error (e)
+                           (cmsg "Error ~A [~A]" e name)))
+                       (if (timeout listener)
+                           (with-timeout ((timeout listener))
+                             (do-work))
+                           (do-work)))
+                (progn
+                  (errorset (finish-output connection) nil)
+                  (errorset (close-active-socket connection) nil)
+                  (cmsg-c :threads "~A ended" name)
+                  (setf (workers listener)
+                        (remove self (workers listener)))))))))
+
+(defun accept-and-check-tcp-connection (listener)
+  (multiple-value-bind (conn socket) (accept-tcp-connection (socket listener))
+    (when (and (remote-host-checker listener)
+               (not (funcall (remote-host-checker listener)
+                             (remote-host socket))))
+      (cmsg-c :thread "Deny connection from ~A" (remote-host conn))
+      (errorset (close-active-socket conn) nil)
+      (setq conn nil))
+    conn))
 
 (defun start-socket-server (listener)
   (unwind-protect
-      (loop 
-       (let ((connection (accept-tcp-connection (socket listener))))
-        (if (wait listener)
-            (unwind-protect
-                 (apply (listener-function listener)
-                        connection
-                        (function-args listener))
-              (errorset (close connection) nil))
-            (let ((worker (make-instance 'worker :listener listener
-                                         :connection connection
-                                         :name (next-worker-name
-                                                (base-name listener)))))
-             (setf (process worker)
-               (make-process (name worker) (thread-fun worker)))
-             (push worker (workers listener))))))
+      (loop
+       (let ((connection (accept-and-check-tcp-connection listener)))
+         (when connection
+           (if (wait listener)
+               (unwind-protect
+                    (apply (listener-function listener)
+                           connection
+                           (function-args listener))
+                 (progn
+                   (errorset (finish-output connection) nil)
+                   (errorset (close-active-socket connection) nil)))
+               (let ((worker (make-instance 'worker :listener listener
+                                            :connection connection
+                                            :name (next-worker-name
+                                                   (base-name listener)))))
+                 (setf (process worker)
+                       (make-process (name worker) (thread-fun worker)))
+                 (push worker (workers listener)))))))
     (errorset (close-passive-socket (socket listener)) nil)))
+
+#+lispworks
+(defun lw-worker (handle listener)
+  (let ((connection (make-instance 'comm:socket-stream
+                      :socket handle
+                      :direction :io
+                      :element-type 'base-char)))
+    (if (wait listener)
+        (progn
+          (apply (listener-function listener)
+                 connection
+                 (function-args listener))
+          (finish-output connection))
+        (let ((worker (make-instance 'worker :listener listener
+                                     :connection connection
+                                     :name (next-worker-name
+                                            (base-name listener)))))
+          (setf (process worker)
+                (make-process (name worker) (thread-fun worker)))
+          (push worker (workers listener))))))
+
+;; Fixed pool of workers
+
+(defun start-fixed-number-of-workers (listener)
+  (dotimes (i (number-fixed-workers listener))
+    (let ((name (next-worker-name (base-name listener))))
+      (push
+       (make-instance 'fixed-worker
+                      :name name
+                      :listener listener
+                      :process
+                      (make-process
+                       name #'(lambda () (fixed-worker name listener))))
+       (workers listener)))))
+
+
+(defun fixed-worker (name listener)
+  (loop
+   (let ((connection (accept-and-check-tcp-connection listener)))
+     (when connection
+       (flet ((do-work ()
+                (apply (listener-function listener)
+                       connection
+                       (function-args listener))))
+         (unwind-protect
+              (handler-case
+                  (if (catch-errors listener)
+                      (handler-case
+                          (if (timeout listener)
+                              (with-timeout ((timeout listener))
+                                (do-work))
+                              (do-work))
+                        (error (e)
+                          (cmsg "Error ~A [~A]" e name)))
+                      (if (timeout listener)
+                          (with-timeout ((timeout listener))
+                            (do-work))
+                          (do-work)))
+                (error (e)
+                  (format t "Error: ~A" e)))
+           (errorset (finish-output connection) nil)
+           (errorset (close connection) nil)))))))
+