r7061: initial property settings
[kmrcl.git] / listener.lisp
1 ;;;; -*- Mode: Lisp; Syntax: ANSI-Common-Lisp; Base: 10; Package: modlisp -*-
2 ;;;; *************************************************************************
3 ;;;; FILE IDENTIFICATION
4 ;;;;
5 ;;;; Name:          listener.lisp
6 ;;;; Purpose:       Listener and worker processes
7 ;;;; Programmer:    Kevin M. Rosenberg
8 ;;;; Date Started:  Jun 2003
9 ;;;;
10 ;;;; $Id$
11 ;;;; *************************************************************************
12
13 (in-package #:kmrcl)
14
15 ;;; Variables and data structures for Listener
16
17 (defvar *listener-count* 0
18   "used to name listeners")
19
20 (defvar *worker-count* 0
21   "used to name workers")
22
23 (defvar *active-listeners* nil
24     "List of active listeners")
25
26 (defclass listener ()
27   ((port :initarg :port :accessor port) 
28    (function :initarg :function :accessor listener-function
29              :initform nil)
30    (function-args :initarg :function-args :accessor function-args
31                   :initform nil)
32    (process :initarg :process :accessor process :initform nil)
33    (socket :initarg :socket :accessor socket :initform nil)
34    (workers :initform nil :accessor workers
35             :documentation "list of worker threads")
36    (name :initform "" :accessor name :initarg :name)
37    (base-name :initform "listener" :accessor base-name :initarg :base-name)
38    (wait :initform nil :accessor wait :initarg :wait)
39    (timeout :initform nil :accessor timeout :initarg :timeout)
40    (number-fixed-workers :initform nil :accessor number-fixed-workers
41                          :initarg :number-fixed-workers)
42    (catch-errors :initform nil :accessor catch-errors :initarg :catch-errors)
43    (remote-host-checker :initform nil :accessor remote-host-checker
44                         :initarg :remote-host-checker)
45    (format :initform :text :accessor listener-format :initarg :format)))
46
47 (defclass fixed-worker ()
48   ((listener :initarg :listener :accessor listener :initform nil)
49    (name :initarg :name :accessor name :initform nil)
50    (process :initarg :process :accessor process :initform nil)))
51
52 (defclass worker (fixed-worker)
53   ((connection :initarg :connection :accessor connection :initform nil)
54    (thread-fun :initarg :thread-fun :accessor thread-fun :initform nil)))
55
56
57 (defmethod print-object ((obj listener) s)
58   (print-unreadable-object (obj s :type t :identity nil)
59     (format s "port ~A" (port obj))))
60
61 (defmethod print-object ((obj fixed-worker) s)
62   (print-unreadable-object (obj s :type t :identity nil)
63     (format s "port ~A" (port (listener obj)))))
64   
65 ;; High-level API
66
67 (defun init/listener (listener state)
68   (check-type listener listener)
69   (case state
70     (:start
71      (when (member listener *active-listeners*)
72        (cmsg "~&listener ~A already initialized" listener)
73        (return-from init/listener))
74      (when (listener-startup listener)
75        (push listener *active-listeners*)
76        listener))
77     (:stop
78      (unless (member listener *active-listeners*)
79        (cmsg "~&listener ~A is not in active list" listener)
80        (return-from init/listener listener))
81      (listener-shutdown listener)
82      (setq *active-listeners* (remove listener *active-listeners*)))
83     (:restart
84      (init/listener listener :stop)
85      (init/listener listener :start))))
86
87 (defun stop-all/listener ()
88   (dolist (listener *active-listeners*)
89     (ignore-errors
90        (init/listener listener :stop))))
91
92 (defun listener-startup (listener)
93   (handler-case
94       (progn
95         (setf (name listener) (next-server-name (base-name listener)))
96         (make-socket-server listener))
97     (error (e)
98       (format t "~&Error while trying to start listener on port ~A~&  ~A" 
99               (port listener) e)
100       (decf *listener-count*)
101       nil)
102     (:no-error (res)
103       (declare (ignore res))
104       listener)))
105
106 (defun listener-shutdown (listener)
107   (dolist (worker (workers listener))
108     (when (and (typep worker 'worker)
109                (connection worker))
110       (errorset (close-active-socket
111                  (connection worker)) nil)
112       (setf (connection worker) nil))
113     (when (process worker)
114       (errorset (destroy-process (process worker)) nil)
115       (setf (process worker) nil)))
116   (setf (workers listener) nil)
117   (with-slots (process socket) listener
118     (when socket
119       (errorset (close-passive-socket socket) nil)
120       (setf socket nil))
121     (when process
122       (errorset (destroy-process process) nil)
123       (setf process nil))))
124
125 ;; Low-level functions
126
127 (defun next-server-name (base-name)
128   (format nil "~D-~A-socket-server" (incf *listener-count*) base-name)) 
129
130 (defun next-worker-name (base-name)
131   (format nil "~D-~A-worker"  (incf *worker-count*) base-name))
132
133 (defun make-socket-server (listener)
134   #+lispworks
135   (progn
136     (setf (process listener)
137       (comm:start-up-server :process-name (name listener)
138                             :service (port listener) 
139                             :function
140                             #'(lambda (handle) 
141                                 (lw-worker handle listener)))))
142   #-lispworks
143   (progn
144     (setf (socket listener) (create-inet-listener
145                              (port listener)
146                              :format (listener-format listener)))
147     (if (number-fixed-workers listener)
148         (start-fixed-number-of-workers listener)
149         (setf (process listener) (make-process
150                                   (name listener)
151                                   #'(lambda ()
152                                       (start-socket-server listener))))))
153   listener)
154
155
156 (defmethod initialize-instance :after
157     ((self worker) &key listener connection name &allow-other-keys)
158   (flet ((do-work ()
159            (apply (listener-function listener)
160                   connection
161                   (function-args listener))))
162     (unless connection
163       (error "connection not provided to modlisp-worker"))
164     (setf (slot-value self 'listener) listener)
165     (setf (slot-value self 'name) name)
166     (setf (slot-value self 'connection) connection)
167     (setf (slot-value self 'thread-fun)
168           #'(lambda ()
169               (unwind-protect
170                    (if (catch-errors listener)
171                        (handler-case
172                            (if (timeout listener)
173                                (with-timeout ((timeout listener))
174                                  (do-work))
175                                (do-work))
176                          (error (e)
177                            (cmsg "Error ~A [~A]" e name)))
178                        (if (timeout listener)
179                            (with-timeout ((timeout listener))
180                              (do-work))
181                            (do-work)))
182                 (progn
183                   (errorset (finish-output connection) nil)
184                   (errorset (close-active-socket connection) nil)
185                   (cmsg-c :threads "~A ended" name)
186                   (setf (workers listener)
187                         (remove self (workers listener)))))))))
188
189 (defun accept-and-check-tcp-connection (listener)
190   (multiple-value-bind (conn socket) (accept-tcp-connection (socket listener))
191     (when (and (remote-host-checker listener)
192                (not (funcall (remote-host-checker listener)
193                              (remote-host socket))))
194       (cmsg-c :thread "Deny connection from ~A" (remote-host conn))
195       (errorset (close-active-socket conn) nil)
196       (setq conn nil))
197     conn))
198
199 (defun start-socket-server (listener)
200   (unwind-protect
201       (loop 
202        (let ((connection (accept-and-check-tcp-connection listener)))
203          (when connection
204            (if (wait listener)
205                (unwind-protect
206                     (apply (listener-function listener)
207                            connection
208                            (function-args listener))
209                  (progn
210                    (errorset (finish-output connection) nil)
211                    (errorset (close-active-socket connection) nil)))
212                (let ((worker (make-instance 'worker :listener listener
213                                             :connection connection
214                                             :name (next-worker-name
215                                                    (base-name listener)))))
216                  (setf (process worker)
217                        (make-process (name worker) (thread-fun worker)))
218                  (push worker (workers listener)))))))
219     (errorset (close-passive-socket (socket listener)) nil)))
220
221 #+lispworks
222 (defun lw-worker (handle listener)
223   (let ((connection (make-instance 'comm:socket-stream
224                       :socket handle
225                       :direction :io
226                       :element-type 'base-char)))
227     (if (wait listener)
228         (progn
229           (apply (listener-function listener)
230                  connection
231                  (function-args listener))
232           (finish-output connection))
233         (let ((worker (make-instance 'worker :listener listener
234                                      :connection connection
235                                      :name (next-worker-name
236                                             (base-name listener)))))
237           (setf (process worker)
238                 (make-process (name worker) (thread-fun worker)))
239           (push worker (workers listener))))))
240
241 ;; Fixed pool of workers
242
243 (defun start-fixed-number-of-workers (listener)
244   (dotimes (i (number-fixed-workers listener))
245     (let ((name (next-worker-name (base-name listener))))
246       (push
247        (make-instance 'fixed-worker
248                       :name name
249                       :listener listener
250                       :process
251                       (make-process
252                        name #'(lambda () (fixed-worker name listener))))
253        (workers listener)))))
254
255
256 (defun fixed-worker (name listener)
257   (loop 
258    (let ((connection (accept-and-check-tcp-connection listener)))
259      (when connection
260        (flet ((do-work ()
261                 (apply (listener-function listener)
262                        connection
263                        (function-args listener))))
264          (unwind-protect
265               (handler-case
266                   (if (catch-errors listener)
267                       (handler-case
268                           (if (timeout listener)
269                               (with-timeout ((timeout listener))
270                                 (do-work))
271                               (do-work))
272                         (error (e)
273                           (cmsg "Error ~A [~A]" e name)))
274                       (if (timeout listener)
275                           (with-timeout ((timeout listener))
276                             (do-work))
277                           (do-work)))
278                 (error (e)
279                   (format t "Error: ~A" e)))
280            (errorset (finish-output connection) nil)
281            (errorset (close connection) nil)))))))
282