r5284: *** empty log message ***
[kmrcl.git] / listener.lisp
1 ;;;; -*- Mode: Lisp; Syntax: ANSI-Common-Lisp; Base: 10; Package: modlisp -*-
2 ;;;; *************************************************************************
3 ;;;; FILE IDENTIFICATION
4 ;;;;
5 ;;;; Name:          listener.lisp
6 ;;;; Purpose:       Listener and worker processes
7 ;;;; Programmer:    Kevin M. Rosenberg
8 ;;;; Date Started:  Jun 2003
9 ;;;;
10 ;;;; $Id: listener.lisp,v 1.5 2003/07/11 06:58:32 kevin Exp $
11 ;;;; *************************************************************************
12
13 (in-package #:kmrcl)
14
15 ;;; Variables and data structures for Listener
16
17 (defvar *listener-count* 0
18   "used to name listeners")
19
20 (defvar *worker-count* 0
21   "used to name workers")
22
23 (defvar *active-listeners* nil
24     "List of active listeners")
25
26 (defclass listener ()
27   ((port :initarg :port :accessor port) 
28    (function :initarg :function :accessor listener-function
29              :initform nil)
30    (function-args :initarg :function-args :accessor function-args
31                   :initform nil)
32    (process :initarg :process :accessor process :initform nil)
33    (socket :initarg :socket :accessor socket :initform nil)
34    (workers :initform nil :accessor workers
35             :documentation "list of worker threads")
36    (name :initform "" :accessor name :initarg :name)
37    (base-name :initform "listener" :accessor base-name :initarg :base-name)
38    (wait :initform nil :accessor wait :initarg :wait)
39    (timeout :initform nil :accessor timeout :initarg :timeout)
40    (number-fixed-workers :initform nil :accessor number-fixed-workers
41                          :initarg :number-fixed-workers)
42    (catch-errors :initform nil :accessor catch-errors :initarg :catch-errors)
43    (remote-host-checker :initform nil :accessor remote-host-checker
44                         :initarg :remote-host-checker)
45    (format :initform :text :accessor listener-format :initarg :format)))
46
47 (defclass fixed-worker ()
48   ((listener :initarg :listener :accessor listener :initform nil)
49    (name :initarg :name :accessor name :initform nil)
50    (process :initarg :process :accessor process :initform nil)))
51
52 (defclass worker (fixed-worker)
53   ((connection :initarg :connection :accessor connection :initform nil)
54    (thread-fun :initarg :thread-fun :accessor thread-fun :initform nil)))
55
56
57 (defmethod print-object ((obj listener) s)
58   (print-unreadable-object (obj s :type t :identity nil)
59     (format s "port ~A" (port obj))))
60
61 (defmethod print-object ((obj fixed-worker) s)
62   (print-unreadable-object (obj s :type t :identity nil)
63     (format s "port ~A" (port (listener obj)))))
64   
65 ;; High-level API
66
67 (defun init/listener (listener state)
68   (check-type listener listener)
69   (case state
70     (:start
71      (when (member listener *active-listeners*)
72        (warn "~&listener already started")
73        (return-from init/listener listener))
74      (handler-case
75          (progn
76            (setf (name listener) (next-server-name (base-name listener)))
77            (make-socket-server listener))
78        (error (e)
79          (format t "~&Error while trying to start listener~&  ~A" e)
80          (decf *listener-count*)
81          nil)
82        (:no-error (res)
83          (declare (ignore res))
84          (push listener *active-listeners*)
85          listener)))
86     (:stop
87      (unless (member listener *active-listeners*)
88        (warn "~&listener is not in active list")
89        (return-from init/listener listener))
90      (dolist (worker (workers listener))
91        (when (and (typep worker 'worker)
92                   (connection worker))
93          (errorset (close-active-socket
94                     (connection worker)) nil)
95          (setf (connection worker) nil))
96        (when (process worker)
97          (errorset (destroy-process (process worker)) nil)
98          (setf (process worker) nil)))
99      (setf (workers listener) nil)
100      (with-slots (process socket) listener
101        (when socket
102          (errorset (close-passive-socket socket) nil)
103          (setf socket nil))
104        (when process
105          (errorset (destroy-process process) nil)
106          (setf process nil)))
107      (setq *active-listeners* (remove listener *active-listeners*)))
108     (:restart
109      (init/listener listener :stop)
110      (init/listener listener :start))))
111
112 (defun stop-all/listener ()
113   (dolist (listener *active-listeners*)
114     (ignore-errors
115        (init/listener listener :stop))))
116
117 ;; Low-level functions
118
119 (defun next-server-name (base-name)
120   (format nil "~A-socket-server-~D" base-name (incf *listener-count*))) 
121
122 (defun next-worker-name (base-name)
123   (format nil "~A-worker-~D" base-name (incf *worker-count*)))
124
125 (defun make-socket-server (listener)
126   #+lispworks
127   (progn
128     (setf (process listener)
129       (comm:start-up-server :process-name (name listener)
130                             :service (port listener) 
131                             :function
132                             #'(lambda (handle) 
133                                 (lw-worker handle listener)))))
134   #-lispworks
135   (progn
136     (setf (socket listener) (create-inet-listener
137                              (port listener)
138                              :format (listener-format listener)))
139     (if (number-fixed-workers listener)
140         (start-fixed-number-of-workers listener)
141         (setf (process listener) (make-process
142                                   (name listener)
143                                   #'(lambda ()
144                                       (start-socket-server listener))))))
145   listener)
146
147
148 (defmethod initialize-instance :after
149     ((self worker) &key listener connection name &allow-other-keys)
150   (flet ((do-work ()
151            (apply (listener-function listener)
152                   connection
153                   (function-args listener))))
154     (unless connection
155       (error "connection not provided to modlisp-worker"))
156     (setf (slot-value self 'listener) listener)
157     (setf (slot-value self 'name) name)
158     (setf (slot-value self 'connection) connection)
159     (setf (slot-value self 'thread-fun)
160           #'(lambda ()
161               (unwind-protect
162                    (if (catch-errors listener)
163                        (handler-case
164                            (if (timeout listener)
165                                (with-timeout ((timeout listener))
166                                  (do-work))
167                                (do-work))
168                          (error (e)
169                            (cmsg "Error ~A [~A]" e name)))
170                        (if (timeout listener)
171                            (with-timeout ((timeout listener))
172                              (do-work))
173                            (do-work)))
174                 (progn
175                   (errorset (finish-output connection) nil)
176                   (errorset (close-active-socket connection) nil)
177                   (cmsg-c :threads "~A ended" name)
178                   (setf (workers listener)
179                         (remove self (workers listener)))))))))
180
181 (defun accept-and-check-tcp-connection (listener)
182   (multiple-value-bind (conn socket) (accept-tcp-connection (socket listener))
183     (when (and (remote-host-checker listener)
184                (not (funcall (remote-host-checker listener)
185                              (remote-host socket))))
186       (cmsg-c :thread "Deny connection from ~A" (remote-host conn))
187       (errorset (close-active-socket conn) nil)
188       (setq conn nil))
189     conn))
190
191 (defun start-socket-server (listener)
192   (unwind-protect
193       (loop 
194        (let ((connection (accept-and-check-tcp-connection listener)))
195          (when connection
196            (if (wait listener)
197                (unwind-protect
198                     (apply (listener-function listener)
199                            connection
200                            (function-args listener))
201                  (progn
202                    (errorset (finish-output connection) nil)
203                    (errorset (close-active-socket connection) nil)))
204                (let ((worker (make-instance 'worker :listener listener
205                                             :connection connection
206                                             :name (next-worker-name
207                                                    (base-name listener)))))
208                  (setf (process worker)
209                        (make-process (name worker) (thread-fun worker)))
210                  (push worker (workers listener)))))))
211     (errorset (close-passive-socket (socket listener)) nil)))
212
213 #+lispworks
214 (defun lw-worker (handle listener)
215   (let ((connection (make-instance 'comm:socket-stream
216                       :socket handle
217                       :direction :io
218                       :element-type 'base-char)))
219     (if (wait listener)
220         (progn
221           (apply (listener-function listener)
222                  connection
223                  (function-args listener))
224           (finish-output connection))
225         (let ((worker (make-instance 'worker :listener listener
226                                      :connection connection
227                                      :name (next-worker-name
228                                             (base-name listener)))))
229           (setf (process worker)
230                 (make-process (name worker) (thread-fun worker)))
231           (push worker (workers listener))))))
232
233 ;; Fixed pool of workers
234
235 (defun start-fixed-number-of-workers (listener)
236   (dotimes (i (number-fixed-workers listener))
237     (let ((name (next-worker-name (base-name listener))))
238       (push
239        (make-instance 'fixed-worker
240                       :name name
241                       :listener listener
242                       :process
243                       (make-process
244                        name #'(lambda () (fixed-worker name listener))))
245        (workers listener)))))
246
247
248 (defun fixed-worker (name listener)
249   (loop 
250    (let ((connection (accept-and-check-tcp-connection listener)))
251      (when connection
252        (flet ((do-work ()
253                 (apply (listener-function listener)
254                        connection
255                        (function-args listener))))
256          (unwind-protect
257               (handler-case
258                   (if (catch-errors listener)
259                       (handler-case
260                           (if (timeout listener)
261                               (with-timeout ((timeout listener))
262                                 (do-work))
263                               (do-work))
264                         (error (e)
265                           (cmsg "Error ~A [~A]" e name)))
266                       (if (timeout listener)
267                           (with-timeout ((timeout listener))
268                             (do-work))
269                           (do-work)))
270                 (error (e)
271                   (format t "Error: ~A" e)))
272            (errorset (finish-output connection) nil)
273            (errorset (close connection) nil)))))))
274