1 ;;;; -*- Mode: Lisp; Syntax: ANSI-Common-Lisp; Base: 10 -*-
2 ;;;; *************************************************************************
3 ;;;; FILE IDENTIFICATION
5 ;;;; Name: listener.lisp
6 ;;;; Purpose: Listener and worker processes
7 ;;;; Programmer: Kevin M. Rosenberg
8 ;;;; Date Started: Jun 2003
10 ;;;; This file, part of KMRCL, is Copyright (c) 2002-2003 by Kevin M. Rosenberg
12 ;;;; KMRCL users are granted the rights to distribute and use this software
13 ;;;; as governed by the terms of the Lisp Lesser GNU Public License
14 ;;;; (http://opensource.franz.com/preamble.html), also known as the LLGPL.
15 ;;;; *************************************************************************
19 ;;; Variables and data structures for Listener
21 (defvar *listener-count* 0
22 "used to name listeners")
24 (defvar *worker-count* 0
25 "used to name workers")
27 (defvar *active-listeners* nil
28 "List of active listeners")
31 ((port :initarg :port :accessor port)
32 (function :initarg :function :accessor listener-function
34 (function-args :initarg :function-args :accessor function-args
36 (process :initarg :process :accessor process :initform nil)
37 (socket :initarg :socket :accessor socket :initform nil)
38 (workers :initform nil :accessor workers
39 :documentation "list of worker threads")
40 (name :initform "" :accessor name :initarg :name)
41 (base-name :initform "listener" :accessor base-name :initarg :base-name)
42 (wait :initform nil :accessor wait :initarg :wait)
43 (timeout :initform nil :accessor timeout :initarg :timeout)
44 (number-fixed-workers :initform nil :accessor number-fixed-workers
45 :initarg :number-fixed-workers)
46 (catch-errors :initform nil :accessor catch-errors :initarg :catch-errors)
47 (remote-host-checker :initform nil :accessor remote-host-checker
48 :initarg :remote-host-checker)
49 (format :initform :text :accessor listener-format :initarg :format)))
51 (defclass fixed-worker ()
52 ((listener :initarg :listener :accessor listener :initform nil)
53 (name :initarg :name :accessor name :initform nil)
54 (process :initarg :process :accessor process :initform nil)))
56 (defclass worker (fixed-worker)
57 ((connection :initarg :connection :accessor connection :initform nil)
58 (socket :initarg :socket :accessor socket :initform nil)
59 (thread-fun :initarg :thread-fun :accessor thread-fun :initform nil)))
62 (defmethod print-object ((obj listener) s)
63 (print-unreadable-object (obj s :type t :identity nil)
64 (format s "port ~A" (port obj))))
66 (defmethod print-object ((obj fixed-worker) s)
67 (print-unreadable-object (obj s :type t :identity nil)
68 (format s "port ~A" (port (listener obj)))))
72 (defun init/listener (listener state)
73 (check-type listener listener)
76 (when (member listener *active-listeners*)
77 (cmsg "~&listener ~A already initialized" listener)
78 (return-from init/listener))
79 (when (listener-startup listener)
80 (push listener *active-listeners*)
83 (unless (member listener *active-listeners*)
84 (cmsg "~&listener ~A is not in active list" listener)
85 (return-from init/listener listener))
86 (listener-shutdown listener)
87 (setq *active-listeners* (remove listener *active-listeners*)))
89 (init/listener listener :stop)
90 (init/listener listener :start))))
92 (defun stop-all/listener ()
93 (dolist (listener *active-listeners*)
95 (init/listener listener :stop))))
97 (defun listener-startup (listener)
100 (setf (name listener) (next-server-name (base-name listener)))
101 (make-socket-server listener))
103 (format t "~&Error while trying to start listener on port ~A~& ~A"
105 (decf *listener-count*)
108 (declare (ignore res))
111 (defun listener-shutdown (listener)
112 (dolist (worker (workers listener))
113 (when (and (typep worker 'worker)
115 (errorset (close-active-socket
116 (socket worker)) nil)
117 (setf (connection worker) nil)
118 (setf (socket worker) nil))
119 (when (process worker)
120 (errorset (destroy-process (process worker)) nil)
121 (setf (process worker) nil)))
122 (setf (workers listener) nil)
123 (with-slots (process socket) listener
125 (errorset (close-passive-socket socket) nil)
128 (errorset (destroy-process process) nil)
129 (setf process nil))))
131 ;; Low-level functions
133 (defun next-server-name (base-name)
134 (format nil "~D-~A-socket-server" (incf *listener-count*) base-name))
136 (defun next-worker-name (base-name)
137 (format nil "~D-~A-worker" (incf *worker-count*) base-name))
139 (defun make-socket-server (listener)
142 (setf (process listener)
143 (comm:start-up-server :process-name (name listener)
144 :service (port listener)
147 (lw-worker handle listener)))))
150 (setf (socket listener) (create-inet-listener
152 :format (listener-format listener)))
153 (if (number-fixed-workers listener)
154 (start-fixed-number-of-workers listener)
155 (setf (process listener) (make-process
158 (start-socket-server listener))))))
162 (defmethod initialize-instance :after
163 ((self worker) &key listener connection socket name &allow-other-keys)
165 (apply (listener-function listener)
167 (function-args listener))))
169 (error "connection not provided to modlisp-worker"))
171 (error "socket not provided to modlisp-worker"))
172 (setf (slot-value self 'listener) listener)
173 (setf (slot-value self 'name) name)
174 (setf (slot-value self 'connection) connection)
175 (setf (slot-value self 'socket) socket)
176 (setf (slot-value self 'thread-fun)
179 (if (catch-errors listener)
181 (if (timeout listener)
182 (with-timeout ((timeout listener))
186 (cmsg "Error ~A [~A]" e name)))
187 (if (timeout listener)
188 (with-timeout ((timeout listener))
192 (errorset (finish-output connection) nil)
193 (errorset (close-active-socket socket) t)
194 (cmsg-c :threads "~A ended" name)
195 (setf (workers listener)
196 (remove self (workers listener)))))))))
198 (defun accept-and-check-tcp-connection (listener)
199 (multiple-value-bind (conn socket) (accept-tcp-connection (socket listener))
200 (when (and (remote-host-checker listener)
201 (not (funcall (remote-host-checker listener)
202 (remote-host socket))))
203 (cmsg-c :thread "Deny connection from ~A" (remote-host conn))
204 (errorset (close-active-socket socket) nil)
205 (setq conn nil socket nil))
206 (values conn socket)))
208 (defun start-socket-server (listener)
211 (multiple-value-bind (connection socket)
212 (accept-and-check-tcp-connection listener)
216 (apply (listener-function listener)
218 (function-args listener))
220 (errorset (finish-output connection) nil)
221 (errorset (close-active-socket connection) nil)))
222 (let ((worker (make-instance 'worker :listener listener
223 :connection connection
225 :name (next-worker-name
226 (base-name listener)))))
227 (setf (process worker)
228 (make-process (name worker) (thread-fun worker)))
229 (push worker (workers listener)))))))
230 (errorset (close-passive-socket (socket listener)) nil)))
233 (defun lw-worker (handle listener)
234 (let ((connection (make-instance 'comm:socket-stream
237 :element-type 'base-char)))
240 (apply (listener-function listener)
242 (function-args listener))
243 (finish-output connection))
244 (let ((worker (make-instance 'worker :listener listener
245 :connection connection
246 :name (next-worker-name
247 (base-name listener)))))
248 (setf (process worker)
249 (make-process (name worker) (thread-fun worker)))
250 (push worker (workers listener))))))
252 ;; Fixed pool of workers
254 (defun start-fixed-number-of-workers (listener)
255 (dotimes (i (number-fixed-workers listener))
256 (let ((name (next-worker-name (base-name listener))))
258 (make-instance 'fixed-worker
263 name #'(lambda () (fixed-worker name listener))))
264 (workers listener)))))
267 (defun fixed-worker (name listener)
269 (let ((connection (accept-and-check-tcp-connection listener)))
272 (apply (listener-function listener)
274 (function-args listener))))
277 (if (catch-errors listener)
279 (if (timeout listener)
280 (with-timeout ((timeout listener))
284 (cmsg "Error ~A [~A]" e name)))
285 (if (timeout listener)
286 (with-timeout ((timeout listener))
290 (format t "Error: ~A" e)))
291 (errorset (finish-output connection) nil)
292 (errorset (close connection) nil)))))))