New upstream and debian-specific file updates
[kmrcl.git] / listener.lisp
1 ;;;; -*- Mode: Lisp; Syntax: ANSI-Common-Lisp; Base: 10; Package: modlisp -*-
2 ;;;; *************************************************************************
3 ;;;; FILE IDENTIFICATION
4 ;;;;
5 ;;;; Name:          listener.lisp
6 ;;;; Purpose:       Listener and worker processes
7 ;;;; Programmer:    Kevin M. Rosenberg
8 ;;;; Date Started:  Jun 2003
9 ;;;;
10 ;;;; $Id$
11 ;;;;
12 ;;;; This file, part of KMRCL, is Copyright (c) 2002-2003 by Kevin M. Rosenberg
13 ;;;;
14 ;;;; KMRCL users are granted the rights to distribute and use this software
15 ;;;; as governed by the terms of the Lisp Lesser GNU Public License
16 ;;;; (http://opensource.franz.com/preamble.html), also known as the LLGPL.
17 ;;;; *************************************************************************
18
19 (in-package #:kmrcl)
20
21 ;;; Variables and data structures for Listener
22
23 (defvar *listener-count* 0
24   "used to name listeners")
25
26 (defvar *worker-count* 0
27   "used to name workers")
28
29 (defvar *active-listeners* nil
30     "List of active listeners")
31
32 (defclass listener ()
33   ((port :initarg :port :accessor port)
34    (function :initarg :function :accessor listener-function
35              :initform nil)
36    (function-args :initarg :function-args :accessor function-args
37                   :initform nil)
38    (process :initarg :process :accessor process :initform nil)
39    (socket :initarg :socket :accessor socket :initform nil)
40    (workers :initform nil :accessor workers
41             :documentation "list of worker threads")
42    (name :initform "" :accessor name :initarg :name)
43    (base-name :initform "listener" :accessor base-name :initarg :base-name)
44    (wait :initform nil :accessor wait :initarg :wait)
45    (timeout :initform nil :accessor timeout :initarg :timeout)
46    (number-fixed-workers :initform nil :accessor number-fixed-workers
47                          :initarg :number-fixed-workers)
48    (catch-errors :initform nil :accessor catch-errors :initarg :catch-errors)
49    (remote-host-checker :initform nil :accessor remote-host-checker
50                         :initarg :remote-host-checker)
51    (format :initform :text :accessor listener-format :initarg :format)))
52
53 (defclass fixed-worker ()
54   ((listener :initarg :listener :accessor listener :initform nil)
55    (name :initarg :name :accessor name :initform nil)
56    (process :initarg :process :accessor process :initform nil)))
57
58 (defclass worker (fixed-worker)
59   ((connection :initarg :connection :accessor connection :initform nil)
60    (thread-fun :initarg :thread-fun :accessor thread-fun :initform nil)))
61
62
63 (defmethod print-object ((obj listener) s)
64   (print-unreadable-object (obj s :type t :identity nil)
65     (format s "port ~A" (port obj))))
66
67 (defmethod print-object ((obj fixed-worker) s)
68   (print-unreadable-object (obj s :type t :identity nil)
69     (format s "port ~A" (port (listener obj)))))
70
71 ;; High-level API
72
73 (defun init/listener (listener state)
74   (check-type listener listener)
75   (case state
76     (:start
77      (when (member listener *active-listeners*)
78        (cmsg "~&listener ~A already initialized" listener)
79        (return-from init/listener))
80      (when (listener-startup listener)
81        (push listener *active-listeners*)
82        listener))
83     (:stop
84      (unless (member listener *active-listeners*)
85        (cmsg "~&listener ~A is not in active list" listener)
86        (return-from init/listener listener))
87      (listener-shutdown listener)
88      (setq *active-listeners* (remove listener *active-listeners*)))
89     (:restart
90      (init/listener listener :stop)
91      (init/listener listener :start))))
92
93 (defun stop-all/listener ()
94   (dolist (listener *active-listeners*)
95     (ignore-errors
96        (init/listener listener :stop))))
97
98 (defun listener-startup (listener)
99   (handler-case
100       (progn
101         (setf (name listener) (next-server-name (base-name listener)))
102         (make-socket-server listener))
103     (error (e)
104       (format t "~&Error while trying to start listener on port ~A~&  ~A"
105               (port listener) e)
106       (decf *listener-count*)
107       nil)
108     (:no-error (res)
109       (declare (ignore res))
110       listener)))
111
112 (defun listener-shutdown (listener)
113   (dolist (worker (workers listener))
114     (when (and (typep worker 'worker)
115                (connection worker))
116       (errorset (close-active-socket
117                  (connection worker)) nil)
118       (setf (connection worker) nil))
119     (when (process worker)
120       (errorset (destroy-process (process worker)) nil)
121       (setf (process worker) nil)))
122   (setf (workers listener) nil)
123   (with-slots (process socket) listener
124     (when socket
125       (errorset (close-passive-socket socket) nil)
126       (setf socket nil))
127     (when process
128       (errorset (destroy-process process) nil)
129       (setf process nil))))
130
131 ;; Low-level functions
132
133 (defun next-server-name (base-name)
134   (format nil "~D-~A-socket-server" (incf *listener-count*) base-name))
135
136 (defun next-worker-name (base-name)
137   (format nil "~D-~A-worker"  (incf *worker-count*) base-name))
138
139 (defun make-socket-server (listener)
140   #+lispworks
141   (progn
142     (setf (process listener)
143       (comm:start-up-server :process-name (name listener)
144                             :service (port listener)
145                             :function
146                             #'(lambda (handle)
147                                 (lw-worker handle listener)))))
148   #-lispworks
149   (progn
150     (setf (socket listener) (create-inet-listener
151                              (port listener)
152                              :format (listener-format listener)))
153     (if (number-fixed-workers listener)
154         (start-fixed-number-of-workers listener)
155         (setf (process listener) (make-process
156                                   (name listener)
157                                   #'(lambda ()
158                                       (start-socket-server listener))))))
159   listener)
160
161
162 (defmethod initialize-instance :after
163     ((self worker) &key listener connection name &allow-other-keys)
164   (flet ((do-work ()
165            (apply (listener-function listener)
166                   connection
167                   (function-args listener))))
168     (unless connection
169       (error "connection not provided to modlisp-worker"))
170     (setf (slot-value self 'listener) listener)
171     (setf (slot-value self 'name) name)
172     (setf (slot-value self 'connection) connection)
173     (setf (slot-value self 'thread-fun)
174           #'(lambda ()
175               (unwind-protect
176                    (if (catch-errors listener)
177                        (handler-case
178                            (if (timeout listener)
179                                (with-timeout ((timeout listener))
180                                  (do-work))
181                                (do-work))
182                          (error (e)
183                            (cmsg "Error ~A [~A]" e name)))
184                        (if (timeout listener)
185                            (with-timeout ((timeout listener))
186                              (do-work))
187                            (do-work)))
188                 (progn
189                   (errorset (finish-output connection) nil)
190                   (errorset (close-active-socket connection) nil)
191                   (cmsg-c :threads "~A ended" name)
192                   (setf (workers listener)
193                         (remove self (workers listener)))))))))
194
195 (defun accept-and-check-tcp-connection (listener)
196   (multiple-value-bind (conn socket) (accept-tcp-connection (socket listener))
197     (when (and (remote-host-checker listener)
198                (not (funcall (remote-host-checker listener)
199                              (remote-host socket))))
200       (cmsg-c :thread "Deny connection from ~A" (remote-host conn))
201       (errorset (close-active-socket conn) nil)
202       (setq conn nil))
203     conn))
204
205 (defun start-socket-server (listener)
206   (unwind-protect
207       (loop
208        (let ((connection (accept-and-check-tcp-connection listener)))
209          (when connection
210            (if (wait listener)
211                (unwind-protect
212                     (apply (listener-function listener)
213                            connection
214                            (function-args listener))
215                  (progn
216                    (errorset (finish-output connection) nil)
217                    (errorset (close-active-socket connection) nil)))
218                (let ((worker (make-instance 'worker :listener listener
219                                             :connection connection
220                                             :name (next-worker-name
221                                                    (base-name listener)))))
222                  (setf (process worker)
223                        (make-process (name worker) (thread-fun worker)))
224                  (push worker (workers listener)))))))
225     (errorset (close-passive-socket (socket listener)) nil)))
226
227 #+lispworks
228 (defun lw-worker (handle listener)
229   (let ((connection (make-instance 'comm:socket-stream
230                       :socket handle
231                       :direction :io
232                       :element-type 'base-char)))
233     (if (wait listener)
234         (progn
235           (apply (listener-function listener)
236                  connection
237                  (function-args listener))
238           (finish-output connection))
239         (let ((worker (make-instance 'worker :listener listener
240                                      :connection connection
241                                      :name (next-worker-name
242                                             (base-name listener)))))
243           (setf (process worker)
244                 (make-process (name worker) (thread-fun worker)))
245           (push worker (workers listener))))))
246
247 ;; Fixed pool of workers
248
249 (defun start-fixed-number-of-workers (listener)
250   (dotimes (i (number-fixed-workers listener))
251     (let ((name (next-worker-name (base-name listener))))
252       (push
253        (make-instance 'fixed-worker
254                       :name name
255                       :listener listener
256                       :process
257                       (make-process
258                        name #'(lambda () (fixed-worker name listener))))
259        (workers listener)))))
260
261
262 (defun fixed-worker (name listener)
263   (loop
264    (let ((connection (accept-and-check-tcp-connection listener)))
265      (when connection
266        (flet ((do-work ()
267                 (apply (listener-function listener)
268                        connection
269                        (function-args listener))))
270          (unwind-protect
271               (handler-case
272                   (if (catch-errors listener)
273                       (handler-case
274                           (if (timeout listener)
275                               (with-timeout ((timeout listener))
276                                 (do-work))
277                               (do-work))
278                         (error (e)
279                           (cmsg "Error ~A [~A]" e name)))
280                       (if (timeout listener)
281                           (with-timeout ((timeout listener))
282                             (do-work))
283                           (do-work)))
284                 (error (e)
285                   (format t "Error: ~A" e)))
286            (errorset (finish-output connection) nil)
287            (errorset (close connection) nil)))))))
288