From: Kevin M. Rosenberg Date: Tue, 8 Jul 2003 16:12:40 +0000 (+0000) Subject: r5259: *** empty log message *** X-Git-Tag: v1.96~166 X-Git-Url: http://git.kpe.io/?p=kmrcl.git;a=commitdiff_plain;h=7a31a7ff629ae760d9c3e3abedf6e03605f83f23 r5259: *** empty log message *** --- diff --git a/kmrcl.asd b/kmrcl.asd index 90afbe6..29d0060 100644 --- a/kmrcl.asd +++ b/kmrcl.asd @@ -7,7 +7,7 @@ ;;;; Programmer: Kevin M. Rosenberg ;;;; Date Started: Apr 2000 ;;;; -;;;; $Id: kmrcl.asd,v 1.37 2003/06/25 20:28:08 kevin Exp $ +;;;; $Id: kmrcl.asd,v 1.38 2003/07/08 16:11:19 kevin Exp $ ;;;; ;;;; This file, part of KMRCL, is Copyright (c) 2002 by Kevin M. Rosenberg ;;;; @@ -50,8 +50,12 @@ #+kmr-mop (:file "attrib-class" :depends-on ("seqs" "mop")) (:file "equal" :depends-on ("macros" #+kmr-mop "mop")) (:file "web-utils" :depends-on ("macros" "strings")) - (:file "xml-utils" :depends-on ("macros"))) - ) + (:file "xml-utils" :depends-on ("macros")) + (:file "sockets" :depends-on ("macros")) + (:file "processes" :depends-on ("macros")) + (:file "listener" :depends-on ("sockets" "processes")) + )) + (defmethod perform ((o test-op) (c (eql (find-system 'kmrcl)))) (operate 'load-op 'kmrcl-tests) diff --git a/listener.lisp b/listener.lisp new file mode 100644 index 0000000..5995405 --- /dev/null +++ b/listener.lisp @@ -0,0 +1,153 @@ +;;;; -*- Mode: Lisp; Syntax: ANSI-Common-Lisp; Base: 10; Package: modlisp -*- +;;;; ************************************************************************* +;;;; FILE IDENTIFICATION +;;;; +;;;; Name: listener.lisp +;;;; Purpose: Listener and worker processes +;;;; Programmer: Kevin M. Rosenberg +;;;; Date Started: Dec 2002 +;;;; +;;;; $Id: listener.lisp,v 1.1 2003/07/08 16:12:40 kevin Exp $ +;;;; ************************************************************************* + +(in-package #:kmrcl) + +;;; Variables and data structures for Listener + +(defvar *listener-count* 0 + "used to name listeners") + +(defvar *worker-count* 0 + "used to name workers") + +(defvar *active-listeners* nil + "List of active listeners") + +(defclass listener () + ((port :initarg :port :accessor port) + (function :initarg :function :accessor listener-function + :initform nil) + (function-args :initarg :function-args :accessor function-args + :initform nil) + (process :initarg :process :accessor process) + (socket :initarg :socket :accessor socket) + (workers :initform nil :accessor workers + :documentation "list of worker threads") + (name :initform "" :accessor name :initarg :name) + (base-name :initform "listener" :accessor base-name :initarg :base-name) + (wait :initform nil :accessor wait :initarg :wait) + (catch-errors :initform nil :accessor catch-errors :initarg :catch-errors) + (format :initform :text :accessor listener-format :initarg :format))) + +(defclass worker () + ((listener :initarg :listener :accessor listener :initform nil) + (connection :initarg :connection :accessor connection :initform nil) + (name :initarg :name :accessor name :initform nil) + (thread-fun :initarg :thread-fun :accessor thread-fun :initform nil) + (process :initarg :process :accessor process :initform nil))) + + +;; High-level API + +(defun init/listener (listener state) + (check-type listener listener) + (case state + (:start + (when (member listener *active-listeners*) + (warn "~&listener already started") + (return-from init/listener listener)) + (handler-case + (progn + (setf (name listener) (next-server-name (base-name listener))) + (make-socket-server listener)) + (error (e) + (format t "~&Error while trying to start listener~& ~A" e) + (decf *listener-count*) + nil) + (:no-error (res) + (declare (ignore res)) + (push listener *active-listeners*) + listener))) + (:stop + (unless (member listener *active-listeners*) + (warn "~&listener is not in active list") + (return-from init/listener listener)) + (dolist (worker (workers listener)) + (close-active-socket (connection worker)) + (destroy-process (process worker))) + (setf (workers listener) nil) + (with-slots (process socket) listener + (errorset (close-passive-socket socket) t) + (errorset (destroy-process process) t)) + (setq *active-listeners* (remove listener *active-listeners*))) + (:restart + (init/listener listener :stop) + (init/listener listener :start)))) + +(defun stop-all/listener () + (dolist (listener *active-listeners*) + (ignore-errors + (init/listener listener :stop)))) + +;; Low-level functions + +(defun next-server-name (base-name) + (format nil "~A-socket-server-~D" base-name (incf *listener-count*))) + +(defun next-worker-name (base-name) + (format nil "~A-worker-~D" base-name (incf *worker-count*))) + +(defun make-socket-server (listener) + (setf (socket listener) (create-inet-listener + (port listener) + :format (listener-format listener))) + (setf (process listener) (make-process + (name listener) + #'(lambda () (start-socket-server listener)))) + listener) + + +(defmethod initialize-instance :after + ((self worker) &key listener connection name &allow-other-keys) + (unless connection + (error "connection not provided to modlisp-worker")) + (setf (slot-value self 'listener) listener) + (setf (slot-value self 'name) name) + (setf (slot-value self 'connection) connection) + (setf (slot-value self 'thread-fun) + #'(lambda () + (unwind-protect + (if (catch-errors listener) + (handler-case + (apply (listener-function listener) + connection + (function-args listener)) + (error (e) + (cmsg "Error ~A [~A]" e name))) + (apply (listener-function listener) + connection + (function-args listener))) + (progn + (errorset (close-active-socket connection) nil) + (cmsg-c :threads "~A ended" name) + (setf (workers listener) + (remove self (workers listener)))))))) + +(defun start-socket-server (listener) + (unwind-protect + (loop + (let ((connection (accept-tcp-connection (socket listener)))) + (if (wait listener) + (unwind-protect + (apply (listener-function listener) + connection + (function-args listener)) + (errorset (close connection) nil)) + (let ((worker (make-instance 'worker :listener listener + :connection connection + :name (next-worker-name + (base-name listener))))) + (setf (process worker) + (make-process (name worker) (thread-fun worker))) + (push worker (workers listener)))))) + (errorset (close-passive-socket (socket listener)) nil))) diff --git a/package.lisp b/package.lisp index 0aa1228..60a96e7 100644 --- a/package.lisp +++ b/package.lisp @@ -7,7 +7,7 @@ ;;;; Programmer: Kevin M. Rosenberg ;;;; Date Started: Apr 2000 ;;;; -;;;; $Id: package.lisp,v 1.46 2003/07/08 00:12:51 kevin Exp $ +;;;; $Id: package.lisp,v 1.47 2003/07/08 16:11:19 kevin Exp $ ;;;; ;;;; This file, part of KMRCL, is Copyright (c) 2002 by Kevin M. Rosenberg ;;;; @@ -203,6 +203,15 @@ #:get-output-stream-data #:dump-output-stream-data #:make-byte-array-input-stream + + ;; sockets.lisp + #:make-active-socket + #:close-active-socket + + ;; listener.lisp + #:init/listener + #:stop-all/listener + #:listener )) diff --git a/processes.lisp b/processes.lisp new file mode 100644 index 0000000..0c9175f --- /dev/null +++ b/processes.lisp @@ -0,0 +1,49 @@ +;;;; -*- Mode: Lisp; Syntax: ANSI-Common-Lisp; Base: 10; Package: modlisp -*- +;;;; ************************************************************************* +;;;; FILE IDENTIFICATION +;;;; +;;;; Name: processes.lisp +;;;; Purpose: Multiprocessing functions +;;;; Programmer: Kevin M. Rosenberg +;;;; Date Started: June 2003 +;;;; +;;;; $Id: processes.lisp,v 1.1 2003/07/08 16:12:40 kevin Exp $ +;;;; ************************************************************************* + +(in-package #:kmrcl) + + +(defun make-process (name func) + #+cmu (mp:make-process func :name name) + #+allegro (mp:process-run-function name func) + #+lispworks (mp:process-run-function name nil func) + #+sb-thread (sb-thread:make-thread func) + #+clisp (funcall func) + ) + +(defun destroy-process (process) + #+cmu (mp:destroy-process process) + #+allegro (mp:process-kill process) + #+sbcl-thread (sb-thread:destroy-thread process) + #+lispworks (mp:process-kill process) + ) + +(defun make-lock (name) + #+allegro (mp:make-process-lock :name name) + #+cmu (mp:make-lock name) + #+lispworks (mp:make-lock :name name) + #+sbcl-thread (sb-thread:make-mutex :name name) + ) + +(defmacro with-lock-held ((lock) &body body) + #+allegro + `(mp:with-process-lock (,lock) ,@body) + #+cmu + `(mp:with-lock-held (,lock) ,@body) + #+lispworks + `(mp:with-lock (,lock) ,@body) + #+sbcl-thread + `(sb-thread:with-recursive-lock (,lock) ,@body) + ) + + diff --git a/sockets.lisp b/sockets.lisp new file mode 100644 index 0000000..116e155 --- /dev/null +++ b/sockets.lisp @@ -0,0 +1,297 @@ +;;;; -*- Mode: Lisp; Syntax: ANSI-Common-Lisp; Base: 10; Package: modlisp -*- +;;;; ************************************************************************* +;;;; FILE IDENTIFICATION +;;;; +;;;; Name: sockets.lisp +;;;; Purpose: Socket functions +;;;; Programmer: Kevin M. Rosenberg with excerpts from portableaserve +;;;; Date Started: Jun 2003 +;;;; +;;;; $Id: sockets.lisp,v 1.1 2003/07/08 16:12:40 kevin Exp $ +;;;; ************************************************************************* + +(in-package #:kmrcl) + +;; Sockets + +#+lispworks +(progn + +(define-condition lw-stream-error (error) + ((stream :initarg :stream + :reader stream-error-stream) + (action :initarg :action + :reader stream-error-action) + (code :initarg :code + :reader stream-error-code) + (identifier :initarg :identifier + :reader stream-error-identifier)) + (:report (lambda (condition stream) + (format stream "A stream error occured (action=~A identifier=~A code=~A stream=~S)." + (stream-error-action condition) + (stream-error-identifier condition) + (stream-error-code condition) + (stream-error-stream condition))))) + +(define-condition socket-error (lw-stream-error) + () + (:report (lambda (condition stream) + (format stream "A socket error occured (action=~A identifier=~A code=~A stream=~S)." + (stream-error-action condition) + (stream-error-identifier condition) + (stream-error-code condition) + (stream-error-stream condition))))) + +(defclass socket () + ((passive-socket :type fixnum + :initarg :passive-socket + :reader socket-os-fd))) + + (defclass passive-socket (socket) + ((element-type :type (member signed-byte unsigned-byte base-char) + :initarg :element-type + :reader element-type) + (port :type fixnum + :initarg :port + :reader local-port))) + + (defmethod print-object ((passive-socket passive-socket) stream) + (print-unreadable-object (passive-socket stream :type t :identity nil) + (format stream "@~d on port ~d" (socket-os-fd passive-socket) (local-port passive-socket)))) + + (defclass binary-socket-stream (comm:socket-stream) ()) +(defclass input-binary-socket-stream (binary-socket-stream)()) +(defclass output-binary-socket-stream (binary-socket-stream)()) +(defclass bidirectional-binary-socket-stream (input-binary-socket-stream output-binary-socket-stream)()) + +#+unix +(defun %socket-error-identifier (code) + (case code + (32 :x-broken-pipe) + (98 :address-in-use) + (99 :address-not-available) + (100 :network-down) + (102 :network-reset) + (103 :connection-aborted) + (104 :connection-reset) + (105 :no-buffer-space) + (108 :shutdown) + (110 :connection-timed-out) + (111 :connection-refused) + (112 :host-down) + (113 :host-unreachable) + (otherwise :unknown))) + +#+win32 +(defun %socket-error-identifier (code) + (case code + (10048 :address-in-use) + (10049 :address-not-available) + (10050 :network-down) + (10052 :network-reset) + (10053 :connection-aborted) + (10054 :connection-reset) + (10055 :no-buffer-space) + (10058 :shutdown) + (10060 :connection-timed-out) + (10061 :connection-refused) + (10064 :host-down) + (10065 :host-unreachable) + (otherwise :unknown))) + +(defun socket-error (stream error-code action format-string &rest format-args) + (let ((code (if (numberp error-code) error-code #+unix(lw:errno-value)))) + (error 'socket-error :stream stream :code code + :identifier (if (keywordp error-code) + error-code + (%socket-error-identifier error-code)) + :action action + :format-control "~A occured while doing socket IO (~?)" + :format-arguments (list 'socket-error format-string format-args)))) + + +(defmethod comm::socket-error ((stream binary-socket-stream) error-code format-string &rest format-args) + (apply #'socket-error stream error-code :IO format-string format-args)) + +(defmethod stream-input-available ((fd fixnum)) + (comm::socket-listen fd)) + +(defmethod stream-input-available ((stream stream::os-file-handle-stream)) + (stream-input-available (stream::os-file-handle-stream-file-handle stream))) + +(defmethod stream-input-available ((stream comm:socket-stream)) + (or (comm::socket-listen (comm:socket-stream-socket stream)) + (listen stream))) + +(defmethod stream-input-available ((stream passive-socket)) + (comm::socket-listen (socket-os-fd stream))) + +(defun %new-passive-socket (local-port) + (multiple-value-bind (socket error-location error-code) + (comm::create-tcp-socket-for-service local-port) + (cond (socket socket) + (t (error 'socket-error :action error-location :code error-code :identifier :unknown))))) + +) ;; lispworks + +#+sbcl +(defun listen-to-inet-port (&key (port 0) (kind :stream) (reuse nil)) + "Create, bind and listen to an inet socket on *:PORT. +setsockopt SO_REUSEADDR if :reuse is not nil" + (let ((socket (make-instance 'sb-bsd-sockets:inet-socket + :type :stream + :protocol :tcp))) + (if reuse + (setf (sb-bsd-sockets:sockopt-reuse-address socket) t)) + (sb-bsd-sockets:socket-bind + socket (sb-bsd-sockets:make-inet-address "0.0.0.0") port) + (sb-bsd-sockets:socket-listen socket 15) + socket)) + +(defun create-inet-listener (port &key (format :text) (reuse-address t)) + #+cmu (ext:create-inet-listener port) + #+allegro + (socket:make-socket :connect :passive :local-port port :format :binary + :address-family + (if (stringp port) + :file + (if (or (null port) (integerp port)) + :internet + (error "illegal value for port: ~s" port))) + :reuse-address reuse-address) + #+sbcl + (listen-to-inet-port :port port :reuse reuse-address) + #+clisp (ext:socket-server port) + #+lispworks + (let ((comm::*use_so_reuseaddr* reuse-address)) + (make-instance 'passive-socket + :port port + :passive-socket (%new-passive-socket port) + :element-type (case format + (:text 'base-char)))) + ) + +(defun make-fd-stream (socket &key input output element-type) + #+cmu + (sys:make-fd-stream socket :input input :output output + :element-type element-type) + #+sbcl + (sb-bsd-sockets:socket-make-stream socket :input input :output output + :element-type element-type) + #-(or cmu sbcl) (declare (ignore input output element-type)) + #-(or cmu sbcl) socket + ) + + +(defun accept-tcp-connection (listener) + #+cmu + (progn + (mp:process-wait-until-fd-usable listener :input) + (sys:make-fd-stream + (nth-value 0 (ext:accept-tcp-connection listener)) + :input t :output t)) + #+sbcl + (when (sb-sys:wait-until-fd-usable + (sb-bsd-sockets:socket-file-descriptor listener) :input) + (sb-bsd-sockets:socket-make-stream + (sb-bsd-sockets:socket-accept listener) + :element-type 'base-char + :input t :output t)) + #+allegro + (socket:accept-connection listener) + #+clisp + (ext:socket-accept listener) + #+lispworks + (progn + (loop while (not (stream-input-available listener)) + do (sleep 1)) + (make-instance 'bidirectional-binary-socket-stream + :socket (comm::get-fd-from-socket + (socket-os-fd listener)) + :direction :io + :element-type (element-type listener))) + + ) + + +(defmacro errorset (form display) + `(handler-case + ,form + (error (e) + (declare (ignorable e)) + (when ,display + (format t "~&Error: ~A~%" e))))) + +(defun close-passive-socket (socket) + #+allegro (close socket) + #+cmu (unix:unix-close socket) + #+sbcl (sb-unix:unix-close + (sb-bsd-sockets:socket-file-descriptor socket)) + #+clisp (close socket) + #+lispworks (comm::close-socket (socket-os-fd socket)) + ) + + +(defun close-active-socket (socket) + (close socket)) + +#+sbcl +(defun ipaddr-to-dotted (ipaddr &key values) + "Convert from 32-bit integer to dotted string." + (declare (type (unsigned-byte 32) ipaddr)) + (let ((a (logand #xff (ash ipaddr -24))) + (b (logand #xff (ash ipaddr -16))) + (c (logand #xff (ash ipaddr -8))) + (d (logand #xff ipaddr))) + (if values + (values a b c d) + (format nil "~d.~d.~d.~d" a b c d)))) + +#+sbcl +(defun dotted-to-ipaddr (dotted &key (errorp t)) + "Convert from dotted string to 32-bit integer." + (declare (string dotted)) + (if errorp + (let ((ll (string-tokens (substitute #\Space #\. dotted)))) + (+ (ash (first ll) 24) (ash (second ll) 16) + (ash (third ll) 8) (fourth ll))) + (ignore-errors + (let ((ll (string-tokens (substitute #\Space #\. dotted)))) + (+ (ash (first ll) 24) (ash (second ll) 16) + (ash (third ll) 8) (fourth ll)))))) + +#+sbcl +(defun ipaddr-to-hostname (ipaddr &key ignore-cache) + (when ignore-cache + (warn ":IGNORE-CACHE keyword in IPADDR-TO-HOSTNAME not supported.")) + (sb-bsd-sockets:host-ent-name + (sb-bsd-sockets:get-host-by-address + (sb-bsd-sockets:make-inet-address ipaddr)))) + +#+sbcl +(defun lookup-hostname (host &key ignore-cache) + (when ignore-cache + (warn ":IGNORE-CACHE keyword in LOOKUP-HOSTNAME not supported.")) + (if (stringp host) + (sb-bsd-sockets:host-ent-address + (sb-bsd-sockets:get-host-by-name host)) + (dotted-to-ipaddr (ipaddr-to-dotted host)))) + + +(defun make-active-socket (server port) + #+allegro (socket:make-socket :remote-host server + :remote-port port) + #+lispworks (comm:open-tcp-stream server port) + #+sbcl (let ((socket (make-instance 'sb-bsd-sockets:inet-socket + :type :stream + :protocol :tcp))) + (sb-bsd-sockets:socket-connect + socket (lookup-hostname server) port) + (sb-bsd-sockets:socket-make-stream socket + :input t + :output t + :element-type 'base-char)) + #+cmu + (sys:make-fd-stream (ext:connect-to-inet-socket host port) + :input t :output t :element-type 'base-char) + )