r5277: *** empty log message ***
[kmrcl.git] / listener.lisp
1 ;;;; -*- Mode: Lisp; Syntax: ANSI-Common-Lisp; Base: 10; Package: modlisp -*-
2 ;;;; *************************************************************************
3 ;;;; FILE IDENTIFICATION
4 ;;;;
5 ;;;; Name:          listener.lisp
6 ;;;; Purpose:       Listener and worker processes
7 ;;;; Programmer:    Kevin M. Rosenberg
8 ;;;; Date Started:  Jun 2003
9 ;;;;
10 ;;;; $Id: listener.lisp,v 1.4 2003/07/11 02:37:33 kevin Exp $
11 ;;;; *************************************************************************
12
13 (in-package #:kmrcl)
14
15 ;;; Variables and data structures for Listener
16
17 (defvar *listener-count* 0
18   "used to name listeners")
19
20 (defvar *worker-count* 0
21   "used to name workers")
22
23 (defvar *active-listeners* nil
24     "List of active listeners")
25
26 (defclass listener ()
27   ((port :initarg :port :accessor port) 
28    (function :initarg :function :accessor listener-function
29              :initform nil)
30    (function-args :initarg :function-args :accessor function-args
31                   :initform nil)
32    (process :initarg :process :accessor process :initform nil)
33    (socket :initarg :socket :accessor socket :initform nil)
34    (workers :initform nil :accessor workers
35             :documentation "list of worker threads")
36    (name :initform "" :accessor name :initarg :name)
37    (base-name :initform "listener" :accessor base-name :initarg :base-name)
38    (wait :initform nil :accessor wait :initarg :wait)
39    (timeout :initform nil :accessor timeout :initarg :timeout)
40    (number-fixed-workers :initform nil :accessor number-fixed-workers
41                          :initarg :number-fixed-workers)
42    (catch-errors :initform nil :accessor catch-errors :initarg :catch-errors)
43    (format :initform :text :accessor listener-format :initarg :format)))
44
45 (defclass fixed-worker ()
46   ((listener :initarg :listener :accessor listener :initform nil)
47    (name :initarg :name :accessor name :initform nil)
48    (process :initarg :process :accessor process :initform nil)))
49
50 (defclass worker (fixed-worker)
51   ((connection :initarg :connection :accessor connection :initform nil)
52    (thread-fun :initarg :thread-fun :accessor thread-fun :initform nil)))
53
54
55 (defmethod print-object ((obj listener) s)
56   (print-unreadable-object (obj s :type t :identity nil)
57     (format s "port ~A" (port obj))))
58
59 (defmethod print-object ((obj fixed-worker) s)
60   (print-unreadable-object (obj s :type t :identity nil)
61     (format s "port ~A" (port (listener obj)))))
62   
63 ;; High-level API
64
65 (defun init/listener (listener state)
66   (check-type listener listener)
67   (case state
68     (:start
69      (when (member listener *active-listeners*)
70        (warn "~&listener already started")
71        (return-from init/listener listener))
72      (handler-case
73          (progn
74            (setf (name listener) (next-server-name (base-name listener)))
75            (make-socket-server listener))
76        (error (e)
77          (format t "~&Error while trying to start listener~&  ~A" e)
78          (decf *listener-count*)
79          nil)
80        (:no-error (res)
81          (declare (ignore res))
82          (push listener *active-listeners*)
83          listener)))
84     (:stop
85      (unless (member listener *active-listeners*)
86        (warn "~&listener is not in active list")
87        (return-from init/listener listener))
88      (dolist (worker (workers listener))
89        (when (and (typep worker 'worker)
90                   (connection worker))
91          (errorset (close-active-socket
92                     (connection worker)) nil)
93          (setf (connection worker) nil))
94        (when (process worker)
95          (errorset (destroy-process (process worker)) nil)
96          (setf (process worker) nil)))
97      (setf (workers listener) nil)
98      (with-slots (process socket) listener
99        (when socket
100          (errorset (close-passive-socket socket) nil)
101          (setf socket nil))
102        (when process
103          (errorset (destroy-process process) nil)
104          (setf process nil)))
105      (setq *active-listeners* (remove listener *active-listeners*)))
106     (:restart
107      (init/listener listener :stop)
108      (init/listener listener :start))))
109
110 (defun stop-all/listener ()
111   (dolist (listener *active-listeners*)
112     (ignore-errors
113        (init/listener listener :stop))))
114
115 ;; Low-level functions
116
117 (defun next-server-name (base-name)
118   (format nil "~A-socket-server-~D" base-name (incf *listener-count*))) 
119
120 (defun next-worker-name (base-name)
121   (format nil "~A-worker-~D" base-name (incf *worker-count*)))
122
123 (defun make-socket-server (listener)
124   #+lispworks
125   (progn
126     (setf (process listener)
127       (comm:start-up-server :process-name (name listener)
128                             :service (port listener) 
129                             :function
130                             #'(lambda (handle) 
131                                 (lw-worker handle listener)))))
132   #-lispworks
133   (progn
134     (setf (socket listener) (create-inet-listener
135                              (port listener)
136                              :format (listener-format listener)))
137     (if (number-fixed-workers listener)
138         (start-fixed-number-of-workers listener)
139         (setf (process listener) (make-process
140                                   (name listener)
141                                   #'(lambda ()
142                                       (start-socket-server listener))))))
143   listener)
144
145
146 (defmethod initialize-instance :after
147     ((self worker) &key listener connection name &allow-other-keys)
148   (flet ((do-work ()
149            (apply (listener-function listener)
150                   connection
151                   (function-args listener))))
152     (unless connection
153       (error "connection not provided to modlisp-worker"))
154     (setf (slot-value self 'listener) listener)
155     (setf (slot-value self 'name) name)
156     (setf (slot-value self 'connection) connection)
157     (setf (slot-value self 'thread-fun)
158           #'(lambda ()
159               (unwind-protect
160                    (if (catch-errors listener)
161                        (handler-case
162                            (if (timeout listener)
163                                (with-timeout ((timeout listener))
164                                  (do-work))
165                                (do-work))
166                          (error (e)
167                            (cmsg "Error ~A [~A]" e name)))
168                        (if (timeout listener)
169                            (with-timeout ((timeout listener))
170                              (do-work))
171                            (do-work)))
172                 (progn
173                   (errorset (finish-output connection) nil)
174                   (errorset (close-active-socket connection) nil)
175                   (cmsg-c :threads "~A ended" name)
176                   (setf (workers listener)
177                         (remove self (workers listener)))))))))
178
179 (defun start-socket-server (listener)
180   (unwind-protect
181       (loop 
182        (let ((connection (accept-tcp-connection (socket listener))))
183          (if (wait listener)
184              (unwind-protect
185                   (apply (listener-function listener)
186                          connection
187                          (function-args listener))
188                (progn
189                  (errorset (finish-output connection) nil)
190                  (errorset (close-active-socket connection) nil)))
191              (let ((worker (make-instance 'worker :listener listener
192                                           :connection connection
193                                           :name (next-worker-name
194                                                  (base-name listener)))))
195                (setf (process worker)
196                      (make-process (name worker) (thread-fun worker)))
197                (push worker (workers listener))))))
198     (errorset (close-passive-socket (socket listener)) nil)))
199
200 #+lispworks
201 (defun lw-worker (handle listener)
202   (let ((connection (make-instance 'comm:socket-stream
203                       :socket handle
204                       :direction :io
205                       :element-type 'base-char)))
206     (if (wait listener)
207         (progn
208           (apply (listener-function listener)
209                  connection
210                  (function-args listener))
211           (finish-output connection))
212         (let ((worker (make-instance 'worker :listener listener
213                                      :connection connection
214                                      :name (next-worker-name
215                                             (base-name listener)))))
216           (setf (process worker)
217                 (make-process (name worker) (thread-fun worker)))
218           (push worker (workers listener))))))
219
220 ;; Fixed pool of workers
221
222 (defun start-fixed-number-of-workers (listener)
223   (dotimes (i (number-fixed-workers listener))
224     (let ((name (next-worker-name (base-name listener))))
225       (push
226        (make-instance 'fixed-worker
227                       :name name
228                       :listener listener
229                       :process
230                       (make-process
231                        name #'(lambda () (fixed-worker name listener))))
232        (workers listener)))))
233
234
235 (defun fixed-worker (name listener)
236   (loop 
237    (let ((connection (accept-tcp-connection (socket listener))))
238      (flet ((do-work ()
239               (apply (listener-function listener)
240                      connection
241                      (function-args listener))))
242        (unwind-protect
243             (handler-case
244                 (if (catch-errors listener)
245                     (handler-case
246                         (if (timeout listener)
247                             (with-timeout ((timeout listener))
248                               (do-work))
249                             (do-work))
250                       (error (e)
251                         (cmsg "Error ~A [~A]" e name)))
252                     (if (timeout listener)
253                         (with-timeout ((timeout listener))
254                           (do-work))
255                         (do-work)))
256               (error (e)
257                 (format t "Error: ~A" e)))
258          (errorset (finish-output connection) nil)
259          (errorset (close connection) nil))))))