--- /dev/null
+;;;; -*- Mode: Lisp; Syntax: ANSI-Common-Lisp; Base: 10; Package: modlisp -*-
+;;;; *************************************************************************
+;;;; FILE IDENTIFICATION
+;;;;
+;;;; Name: listener.lisp
+;;;; Purpose: Listener and worker processes
+;;;; Programmer: Kevin M. Rosenberg
+;;;; Date Started: Dec 2002
+;;;;
+;;;; $Id: listener.lisp,v 1.1 2003/07/08 16:12:40 kevin Exp $
+;;;; *************************************************************************
+
+(in-package #:kmrcl)
+
+;;; Variables and data structures for Listener
+
+(defvar *listener-count* 0
+ "used to name listeners")
+
+(defvar *worker-count* 0
+ "used to name workers")
+
+(defvar *active-listeners* nil
+ "List of active listeners")
+
+(defclass listener ()
+ ((port :initarg :port :accessor port)
+ (function :initarg :function :accessor listener-function
+ :initform nil)
+ (function-args :initarg :function-args :accessor function-args
+ :initform nil)
+ (process :initarg :process :accessor process)
+ (socket :initarg :socket :accessor socket)
+ (workers :initform nil :accessor workers
+ :documentation "list of worker threads")
+ (name :initform "" :accessor name :initarg :name)
+ (base-name :initform "listener" :accessor base-name :initarg :base-name)
+ (wait :initform nil :accessor wait :initarg :wait)
+ (catch-errors :initform nil :accessor catch-errors :initarg :catch-errors)
+ (format :initform :text :accessor listener-format :initarg :format)))
+
+(defclass worker ()
+ ((listener :initarg :listener :accessor listener :initform nil)
+ (connection :initarg :connection :accessor connection :initform nil)
+ (name :initarg :name :accessor name :initform nil)
+ (thread-fun :initarg :thread-fun :accessor thread-fun :initform nil)
+ (process :initarg :process :accessor process :initform nil)))
+
+
+;; High-level API
+
+(defun init/listener (listener state)
+ (check-type listener listener)
+ (case state
+ (:start
+ (when (member listener *active-listeners*)
+ (warn "~&listener already started")
+ (return-from init/listener listener))
+ (handler-case
+ (progn
+ (setf (name listener) (next-server-name (base-name listener)))
+ (make-socket-server listener))
+ (error (e)
+ (format t "~&Error while trying to start listener~& ~A" e)
+ (decf *listener-count*)
+ nil)
+ (:no-error (res)
+ (declare (ignore res))
+ (push listener *active-listeners*)
+ listener)))
+ (:stop
+ (unless (member listener *active-listeners*)
+ (warn "~&listener is not in active list")
+ (return-from init/listener listener))
+ (dolist (worker (workers listener))
+ (close-active-socket (connection worker))
+ (destroy-process (process worker)))
+ (setf (workers listener) nil)
+ (with-slots (process socket) listener
+ (errorset (close-passive-socket socket) t)
+ (errorset (destroy-process process) t))
+ (setq *active-listeners* (remove listener *active-listeners*)))
+ (:restart
+ (init/listener listener :stop)
+ (init/listener listener :start))))
+
+(defun stop-all/listener ()
+ (dolist (listener *active-listeners*)
+ (ignore-errors
+ (init/listener listener :stop))))
+
+;; Low-level functions
+
+(defun next-server-name (base-name)
+ (format nil "~A-socket-server-~D" base-name (incf *listener-count*)))
+
+(defun next-worker-name (base-name)
+ (format nil "~A-worker-~D" base-name (incf *worker-count*)))
+
+(defun make-socket-server (listener)
+ (setf (socket listener) (create-inet-listener
+ (port listener)
+ :format (listener-format listener)))
+ (setf (process listener) (make-process
+ (name listener)
+ #'(lambda () (start-socket-server listener))))
+ listener)
+
+
+(defmethod initialize-instance :after
+ ((self worker) &key listener connection name &allow-other-keys)
+ (unless connection
+ (error "connection not provided to modlisp-worker"))
+ (setf (slot-value self 'listener) listener)
+ (setf (slot-value self 'name) name)
+ (setf (slot-value self 'connection) connection)
+ (setf (slot-value self 'thread-fun)
+ #'(lambda ()
+ (unwind-protect
+ (if (catch-errors listener)
+ (handler-case
+ (apply (listener-function listener)
+ connection
+ (function-args listener))
+ (error (e)
+ (cmsg "Error ~A [~A]" e name)))
+ (apply (listener-function listener)
+ connection
+ (function-args listener)))
+ (progn
+ (errorset (close-active-socket connection) nil)
+ (cmsg-c :threads "~A ended" name)
+ (setf (workers listener)
+ (remove self (workers listener))))))))
+
+(defun start-socket-server (listener)
+ (unwind-protect
+ (loop
+ (let ((connection (accept-tcp-connection (socket listener))))
+ (if (wait listener)
+ (unwind-protect
+ (apply (listener-function listener)
+ connection
+ (function-args listener))
+ (errorset (close connection) nil))
+ (let ((worker (make-instance 'worker :listener listener
+ :connection connection
+ :name (next-worker-name
+ (base-name listener)))))
+ (setf (process worker)
+ (make-process (name worker) (thread-fun worker)))
+ (push worker (workers listener))))))
+ (errorset (close-passive-socket (socket listener)) nil)))
--- /dev/null
+;;;; -*- Mode: Lisp; Syntax: ANSI-Common-Lisp; Base: 10; Package: modlisp -*-
+;;;; *************************************************************************
+;;;; FILE IDENTIFICATION
+;;;;
+;;;; Name: sockets.lisp
+;;;; Purpose: Socket functions
+;;;; Programmer: Kevin M. Rosenberg with excerpts from portableaserve
+;;;; Date Started: Jun 2003
+;;;;
+;;;; $Id: sockets.lisp,v 1.1 2003/07/08 16:12:40 kevin Exp $
+;;;; *************************************************************************
+
+(in-package #:kmrcl)
+
+;; Sockets
+
+#+lispworks
+(progn
+
+(define-condition lw-stream-error (error)
+ ((stream :initarg :stream
+ :reader stream-error-stream)
+ (action :initarg :action
+ :reader stream-error-action)
+ (code :initarg :code
+ :reader stream-error-code)
+ (identifier :initarg :identifier
+ :reader stream-error-identifier))
+ (:report (lambda (condition stream)
+ (format stream "A stream error occured (action=~A identifier=~A code=~A stream=~S)."
+ (stream-error-action condition)
+ (stream-error-identifier condition)
+ (stream-error-code condition)
+ (stream-error-stream condition)))))
+
+(define-condition socket-error (lw-stream-error)
+ ()
+ (:report (lambda (condition stream)
+ (format stream "A socket error occured (action=~A identifier=~A code=~A stream=~S)."
+ (stream-error-action condition)
+ (stream-error-identifier condition)
+ (stream-error-code condition)
+ (stream-error-stream condition)))))
+
+(defclass socket ()
+ ((passive-socket :type fixnum
+ :initarg :passive-socket
+ :reader socket-os-fd)))
+
+ (defclass passive-socket (socket)
+ ((element-type :type (member signed-byte unsigned-byte base-char)
+ :initarg :element-type
+ :reader element-type)
+ (port :type fixnum
+ :initarg :port
+ :reader local-port)))
+
+ (defmethod print-object ((passive-socket passive-socket) stream)
+ (print-unreadable-object (passive-socket stream :type t :identity nil)
+ (format stream "@~d on port ~d" (socket-os-fd passive-socket) (local-port passive-socket))))
+
+ (defclass binary-socket-stream (comm:socket-stream) ())
+(defclass input-binary-socket-stream (binary-socket-stream)())
+(defclass output-binary-socket-stream (binary-socket-stream)())
+(defclass bidirectional-binary-socket-stream (input-binary-socket-stream output-binary-socket-stream)())
+
+#+unix
+(defun %socket-error-identifier (code)
+ (case code
+ (32 :x-broken-pipe)
+ (98 :address-in-use)
+ (99 :address-not-available)
+ (100 :network-down)
+ (102 :network-reset)
+ (103 :connection-aborted)
+ (104 :connection-reset)
+ (105 :no-buffer-space)
+ (108 :shutdown)
+ (110 :connection-timed-out)
+ (111 :connection-refused)
+ (112 :host-down)
+ (113 :host-unreachable)
+ (otherwise :unknown)))
+
+#+win32
+(defun %socket-error-identifier (code)
+ (case code
+ (10048 :address-in-use)
+ (10049 :address-not-available)
+ (10050 :network-down)
+ (10052 :network-reset)
+ (10053 :connection-aborted)
+ (10054 :connection-reset)
+ (10055 :no-buffer-space)
+ (10058 :shutdown)
+ (10060 :connection-timed-out)
+ (10061 :connection-refused)
+ (10064 :host-down)
+ (10065 :host-unreachable)
+ (otherwise :unknown)))
+
+(defun socket-error (stream error-code action format-string &rest format-args)
+ (let ((code (if (numberp error-code) error-code #+unix(lw:errno-value))))
+ (error 'socket-error :stream stream :code code
+ :identifier (if (keywordp error-code)
+ error-code
+ (%socket-error-identifier error-code))
+ :action action
+ :format-control "~A occured while doing socket IO (~?)"
+ :format-arguments (list 'socket-error format-string format-args))))
+
+
+(defmethod comm::socket-error ((stream binary-socket-stream) error-code format-string &rest format-args)
+ (apply #'socket-error stream error-code :IO format-string format-args))
+
+(defmethod stream-input-available ((fd fixnum))
+ (comm::socket-listen fd))
+
+(defmethod stream-input-available ((stream stream::os-file-handle-stream))
+ (stream-input-available (stream::os-file-handle-stream-file-handle stream)))
+
+(defmethod stream-input-available ((stream comm:socket-stream))
+ (or (comm::socket-listen (comm:socket-stream-socket stream))
+ (listen stream)))
+
+(defmethod stream-input-available ((stream passive-socket))
+ (comm::socket-listen (socket-os-fd stream)))
+
+(defun %new-passive-socket (local-port)
+ (multiple-value-bind (socket error-location error-code)
+ (comm::create-tcp-socket-for-service local-port)
+ (cond (socket socket)
+ (t (error 'socket-error :action error-location :code error-code :identifier :unknown)))))
+
+) ;; lispworks
+
+#+sbcl
+(defun listen-to-inet-port (&key (port 0) (kind :stream) (reuse nil))
+ "Create, bind and listen to an inet socket on *:PORT.
+setsockopt SO_REUSEADDR if :reuse is not nil"
+ (let ((socket (make-instance 'sb-bsd-sockets:inet-socket
+ :type :stream
+ :protocol :tcp)))
+ (if reuse
+ (setf (sb-bsd-sockets:sockopt-reuse-address socket) t))
+ (sb-bsd-sockets:socket-bind
+ socket (sb-bsd-sockets:make-inet-address "0.0.0.0") port)
+ (sb-bsd-sockets:socket-listen socket 15)
+ socket))
+
+(defun create-inet-listener (port &key (format :text) (reuse-address t))
+ #+cmu (ext:create-inet-listener port)
+ #+allegro
+ (socket:make-socket :connect :passive :local-port port :format :binary
+ :address-family
+ (if (stringp port)
+ :file
+ (if (or (null port) (integerp port))
+ :internet
+ (error "illegal value for port: ~s" port)))
+ :reuse-address reuse-address)
+ #+sbcl
+ (listen-to-inet-port :port port :reuse reuse-address)
+ #+clisp (ext:socket-server port)
+ #+lispworks
+ (let ((comm::*use_so_reuseaddr* reuse-address))
+ (make-instance 'passive-socket
+ :port port
+ :passive-socket (%new-passive-socket port)
+ :element-type (case format
+ (:text 'base-char))))
+ )
+
+(defun make-fd-stream (socket &key input output element-type)
+ #+cmu
+ (sys:make-fd-stream socket :input input :output output
+ :element-type element-type)
+ #+sbcl
+ (sb-bsd-sockets:socket-make-stream socket :input input :output output
+ :element-type element-type)
+ #-(or cmu sbcl) (declare (ignore input output element-type))
+ #-(or cmu sbcl) socket
+ )
+
+
+(defun accept-tcp-connection (listener)
+ #+cmu
+ (progn
+ (mp:process-wait-until-fd-usable listener :input)
+ (sys:make-fd-stream
+ (nth-value 0 (ext:accept-tcp-connection listener))
+ :input t :output t))
+ #+sbcl
+ (when (sb-sys:wait-until-fd-usable
+ (sb-bsd-sockets:socket-file-descriptor listener) :input)
+ (sb-bsd-sockets:socket-make-stream
+ (sb-bsd-sockets:socket-accept listener)
+ :element-type 'base-char
+ :input t :output t))
+ #+allegro
+ (socket:accept-connection listener)
+ #+clisp
+ (ext:socket-accept listener)
+ #+lispworks
+ (progn
+ (loop while (not (stream-input-available listener))
+ do (sleep 1))
+ (make-instance 'bidirectional-binary-socket-stream
+ :socket (comm::get-fd-from-socket
+ (socket-os-fd listener))
+ :direction :io
+ :element-type (element-type listener)))
+
+ )
+
+
+(defmacro errorset (form display)
+ `(handler-case
+ ,form
+ (error (e)
+ (declare (ignorable e))
+ (when ,display
+ (format t "~&Error: ~A~%" e)))))
+
+(defun close-passive-socket (socket)
+ #+allegro (close socket)
+ #+cmu (unix:unix-close socket)
+ #+sbcl (sb-unix:unix-close
+ (sb-bsd-sockets:socket-file-descriptor socket))
+ #+clisp (close socket)
+ #+lispworks (comm::close-socket (socket-os-fd socket))
+ )
+
+
+(defun close-active-socket (socket)
+ (close socket))
+
+#+sbcl
+(defun ipaddr-to-dotted (ipaddr &key values)
+ "Convert from 32-bit integer to dotted string."
+ (declare (type (unsigned-byte 32) ipaddr))
+ (let ((a (logand #xff (ash ipaddr -24)))
+ (b (logand #xff (ash ipaddr -16)))
+ (c (logand #xff (ash ipaddr -8)))
+ (d (logand #xff ipaddr)))
+ (if values
+ (values a b c d)
+ (format nil "~d.~d.~d.~d" a b c d))))
+
+#+sbcl
+(defun dotted-to-ipaddr (dotted &key (errorp t))
+ "Convert from dotted string to 32-bit integer."
+ (declare (string dotted))
+ (if errorp
+ (let ((ll (string-tokens (substitute #\Space #\. dotted))))
+ (+ (ash (first ll) 24) (ash (second ll) 16)
+ (ash (third ll) 8) (fourth ll)))
+ (ignore-errors
+ (let ((ll (string-tokens (substitute #\Space #\. dotted))))
+ (+ (ash (first ll) 24) (ash (second ll) 16)
+ (ash (third ll) 8) (fourth ll))))))
+
+#+sbcl
+(defun ipaddr-to-hostname (ipaddr &key ignore-cache)
+ (when ignore-cache
+ (warn ":IGNORE-CACHE keyword in IPADDR-TO-HOSTNAME not supported."))
+ (sb-bsd-sockets:host-ent-name
+ (sb-bsd-sockets:get-host-by-address
+ (sb-bsd-sockets:make-inet-address ipaddr))))
+
+#+sbcl
+(defun lookup-hostname (host &key ignore-cache)
+ (when ignore-cache
+ (warn ":IGNORE-CACHE keyword in LOOKUP-HOSTNAME not supported."))
+ (if (stringp host)
+ (sb-bsd-sockets:host-ent-address
+ (sb-bsd-sockets:get-host-by-name host))
+ (dotted-to-ipaddr (ipaddr-to-dotted host))))
+
+
+(defun make-active-socket (server port)
+ #+allegro (socket:make-socket :remote-host server
+ :remote-port port)
+ #+lispworks (comm:open-tcp-stream server port)
+ #+sbcl (let ((socket (make-instance 'sb-bsd-sockets:inet-socket
+ :type :stream
+ :protocol :tcp)))
+ (sb-bsd-sockets:socket-connect
+ socket (lookup-hostname server) port)
+ (sb-bsd-sockets:socket-make-stream socket
+ :input t
+ :output t
+ :element-type 'base-char))
+ #+cmu
+ (sys:make-fd-stream (ext:connect-to-inet-socket host port)
+ :input t :output t :element-type 'base-char)
+ )