Ignore null cdata
[kmrcl.git] / listener.lisp
1 ;;;; -*- Mode: Lisp; Syntax: ANSI-Common-Lisp; Base: 10 -*-
2 ;;;; *************************************************************************
3 ;;;; FILE IDENTIFICATION
4 ;;;;
5 ;;;; Name:          listener.lisp
6 ;;;; Purpose:       Listener and worker processes
7 ;;;; Programmer:    Kevin M. Rosenberg
8 ;;;; Date Started:  Jun 2003
9 ;;;;
10 ;;;; This file, part of KMRCL, is Copyright (c) 2002-2003 by Kevin M. Rosenberg
11 ;;;;
12 ;;;; KMRCL users are granted the rights to distribute and use this software
13 ;;;; as governed by the terms of the Lisp Lesser GNU Public License
14 ;;;; (http://opensource.franz.com/preamble.html), also known as the LLGPL.
15 ;;;; *************************************************************************
16
17 (in-package #:kmrcl)
18
19 ;;; Variables and data structures for Listener
20
21 (defvar *listener-count* 0
22   "used to name listeners")
23
24 (defvar *worker-count* 0
25   "used to name workers")
26
27 (defvar *active-listeners* nil
28     "List of active listeners")
29
30 (defclass listener ()
31   ((port :initarg :port :accessor port)
32    (function :initarg :function :accessor listener-function
33              :initform nil)
34    (function-args :initarg :function-args :accessor function-args
35                   :initform nil)
36    (process :initarg :process :accessor process :initform nil)
37    (socket :initarg :socket :accessor socket :initform nil)
38    (workers :initform nil :accessor workers
39             :documentation "list of worker threads")
40    (name :initform "" :accessor name :initarg :name)
41    (base-name :initform "listener" :accessor base-name :initarg :base-name)
42    (wait :initform nil :accessor wait :initarg :wait)
43    (timeout :initform nil :accessor timeout :initarg :timeout)
44    (number-fixed-workers :initform nil :accessor number-fixed-workers
45                          :initarg :number-fixed-workers)
46    (catch-errors :initform nil :accessor catch-errors :initarg :catch-errors)
47    (remote-host-checker :initform nil :accessor remote-host-checker
48                         :initarg :remote-host-checker)
49    (format :initform :text :accessor listener-format :initarg :format)))
50
51 (defclass fixed-worker ()
52   ((listener :initarg :listener :accessor listener :initform nil)
53    (name :initarg :name :accessor name :initform nil)
54    (process :initarg :process :accessor process :initform nil)))
55
56 (defclass worker (fixed-worker)
57   ((connection :initarg :connection :accessor connection :initform nil)
58    (thread-fun :initarg :thread-fun :accessor thread-fun :initform nil)))
59
60
61 (defmethod print-object ((obj listener) s)
62   (print-unreadable-object (obj s :type t :identity nil)
63     (format s "port ~A" (port obj))))
64
65 (defmethod print-object ((obj fixed-worker) s)
66   (print-unreadable-object (obj s :type t :identity nil)
67     (format s "port ~A" (port (listener obj)))))
68
69 ;; High-level API
70
71 (defun init/listener (listener state)
72   (check-type listener listener)
73   (case state
74     (:start
75      (when (member listener *active-listeners*)
76        (cmsg "~&listener ~A already initialized" listener)
77        (return-from init/listener))
78      (when (listener-startup listener)
79        (push listener *active-listeners*)
80        listener))
81     (:stop
82      (unless (member listener *active-listeners*)
83        (cmsg "~&listener ~A is not in active list" listener)
84        (return-from init/listener listener))
85      (listener-shutdown listener)
86      (setq *active-listeners* (remove listener *active-listeners*)))
87     (:restart
88      (init/listener listener :stop)
89      (init/listener listener :start))))
90
91 (defun stop-all/listener ()
92   (dolist (listener *active-listeners*)
93     (ignore-errors
94        (init/listener listener :stop))))
95
96 (defun listener-startup (listener)
97   (handler-case
98       (progn
99         (setf (name listener) (next-server-name (base-name listener)))
100         (make-socket-server listener))
101     (error (e)
102       (format t "~&Error while trying to start listener on port ~A~&  ~A"
103               (port listener) e)
104       (decf *listener-count*)
105       nil)
106     (:no-error (res)
107       (declare (ignore res))
108       listener)))
109
110 (defun listener-shutdown (listener)
111   (dolist (worker (workers listener))
112     (when (and (typep worker 'worker)
113                (connection worker))
114       (errorset (close-active-socket
115                  (connection worker)) nil)
116       (setf (connection worker) nil))
117     (when (process worker)
118       (errorset (destroy-process (process worker)) nil)
119       (setf (process worker) nil)))
120   (setf (workers listener) nil)
121   (with-slots (process socket) listener
122     (when socket
123       (errorset (close-passive-socket socket) nil)
124       (setf socket nil))
125     (when process
126       (errorset (destroy-process process) nil)
127       (setf process nil))))
128
129 ;; Low-level functions
130
131 (defun next-server-name (base-name)
132   (format nil "~D-~A-socket-server" (incf *listener-count*) base-name))
133
134 (defun next-worker-name (base-name)
135   (format nil "~D-~A-worker"  (incf *worker-count*) base-name))
136
137 (defun make-socket-server (listener)
138   #+lispworks
139   (progn
140     (setf (process listener)
141       (comm:start-up-server :process-name (name listener)
142                             :service (port listener)
143                             :function
144                             #'(lambda (handle)
145                                 (lw-worker handle listener)))))
146   #-lispworks
147   (progn
148     (setf (socket listener) (create-inet-listener
149                              (port listener)
150                              :format (listener-format listener)))
151     (if (number-fixed-workers listener)
152         (start-fixed-number-of-workers listener)
153         (setf (process listener) (make-process
154                                   (name listener)
155                                   #'(lambda ()
156                                       (start-socket-server listener))))))
157   listener)
158
159
160 (defmethod initialize-instance :after
161     ((self worker) &key listener connection name &allow-other-keys)
162   (flet ((do-work ()
163            (apply (listener-function listener)
164                   connection
165                   (function-args listener))))
166     (unless connection
167       (error "connection not provided to modlisp-worker"))
168     (setf (slot-value self 'listener) listener)
169     (setf (slot-value self 'name) name)
170     (setf (slot-value self 'connection) connection)
171     (setf (slot-value self 'thread-fun)
172           #'(lambda ()
173               (unwind-protect
174                    (if (catch-errors listener)
175                        (handler-case
176                            (if (timeout listener)
177                                (with-timeout ((timeout listener))
178                                  (do-work))
179                                (do-work))
180                          (error (e)
181                            (cmsg "Error ~A [~A]" e name)))
182                        (if (timeout listener)
183                            (with-timeout ((timeout listener))
184                              (do-work))
185                            (do-work)))
186                 (progn
187                   (errorset (finish-output connection) nil)
188                   (errorset (close-active-socket connection) nil)
189                   (cmsg-c :threads "~A ended" name)
190                   (setf (workers listener)
191                         (remove self (workers listener)))))))))
192
193 (defun accept-and-check-tcp-connection (listener)
194   (multiple-value-bind (conn socket) (accept-tcp-connection (socket listener))
195     (when (and (remote-host-checker listener)
196                (not (funcall (remote-host-checker listener)
197                              (remote-host socket))))
198       (cmsg-c :thread "Deny connection from ~A" (remote-host conn))
199       (errorset (close-active-socket conn) nil)
200       (setq conn nil))
201     conn))
202
203 (defun start-socket-server (listener)
204   (unwind-protect
205       (loop
206        (let ((connection (accept-and-check-tcp-connection listener)))
207          (when connection
208            (if (wait listener)
209                (unwind-protect
210                     (apply (listener-function listener)
211                            connection
212                            (function-args listener))
213                  (progn
214                    (errorset (finish-output connection) nil)
215                    (errorset (close-active-socket connection) nil)))
216                (let ((worker (make-instance 'worker :listener listener
217                                             :connection connection
218                                             :name (next-worker-name
219                                                    (base-name listener)))))
220                  (setf (process worker)
221                        (make-process (name worker) (thread-fun worker)))
222                  (push worker (workers listener)))))))
223     (errorset (close-passive-socket (socket listener)) nil)))
224
225 #+lispworks
226 (defun lw-worker (handle listener)
227   (let ((connection (make-instance 'comm:socket-stream
228                       :socket handle
229                       :direction :io
230                       :element-type 'base-char)))
231     (if (wait listener)
232         (progn
233           (apply (listener-function listener)
234                  connection
235                  (function-args listener))
236           (finish-output connection))
237         (let ((worker (make-instance 'worker :listener listener
238                                      :connection connection
239                                      :name (next-worker-name
240                                             (base-name listener)))))
241           (setf (process worker)
242                 (make-process (name worker) (thread-fun worker)))
243           (push worker (workers listener))))))
244
245 ;; Fixed pool of workers
246
247 (defun start-fixed-number-of-workers (listener)
248   (dotimes (i (number-fixed-workers listener))
249     (let ((name (next-worker-name (base-name listener))))
250       (push
251        (make-instance 'fixed-worker
252                       :name name
253                       :listener listener
254                       :process
255                       (make-process
256                        name #'(lambda () (fixed-worker name listener))))
257        (workers listener)))))
258
259
260 (defun fixed-worker (name listener)
261   (loop
262    (let ((connection (accept-and-check-tcp-connection listener)))
263      (when connection
264        (flet ((do-work ()
265                 (apply (listener-function listener)
266                        connection
267                        (function-args listener))))
268          (unwind-protect
269               (handler-case
270                   (if (catch-errors listener)
271                       (handler-case
272                           (if (timeout listener)
273                               (with-timeout ((timeout listener))
274                                 (do-work))
275                               (do-work))
276                         (error (e)
277                           (cmsg "Error ~A [~A]" e name)))
278                       (if (timeout listener)
279                           (with-timeout ((timeout listener))
280                             (do-work))
281                           (do-work)))
282                 (error (e)
283                   (format t "Error: ~A" e)))
284            (errorset (finish-output connection) nil)
285            (errorset (close connection) nil)))))))
286