r5259: *** empty log message ***
authorKevin M. Rosenberg <kevin@rosenberg.net>
Tue, 8 Jul 2003 16:12:40 +0000 (16:12 +0000)
committerKevin M. Rosenberg <kevin@rosenberg.net>
Tue, 8 Jul 2003 16:12:40 +0000 (16:12 +0000)
kmrcl.asd
listener.lisp [new file with mode: 0644]
package.lisp
processes.lisp [new file with mode: 0644]
sockets.lisp [new file with mode: 0644]

index 90afbe68a30791f1d25ef5a4599214ce79158006..29d0060ff555a91035427babb9575937e6627dd9 100644 (file)
--- a/kmrcl.asd
+++ b/kmrcl.asd
@@ -7,7 +7,7 @@
 ;;;; Programmer:    Kevin M. Rosenberg
 ;;;; Date Started:  Apr 2000
 ;;;;
-;;;; $Id: kmrcl.asd,v 1.37 2003/06/25 20:28:08 kevin Exp $
+;;;; $Id: kmrcl.asd,v 1.38 2003/07/08 16:11:19 kevin Exp $
 ;;;;
 ;;;; This file, part of KMRCL, is Copyright (c) 2002 by Kevin M. Rosenberg
 ;;;;
      #+kmr-mop (:file "attrib-class" :depends-on ("seqs" "mop"))
      (:file "equal" :depends-on ("macros" #+kmr-mop "mop"))
      (:file "web-utils" :depends-on ("macros" "strings"))
-     (:file "xml-utils" :depends-on ("macros")))
-    )
+     (:file "xml-utils" :depends-on ("macros"))
+     (:file "sockets" :depends-on ("macros"))
+     (:file "processes" :depends-on ("macros"))
+     (:file "listener" :depends-on ("sockets" "processes"))
+     ))
+
 
 (defmethod perform ((o test-op) (c (eql (find-system 'kmrcl))))
   (operate 'load-op 'kmrcl-tests)
diff --git a/listener.lisp b/listener.lisp
new file mode 100644 (file)
index 0000000..5995405
--- /dev/null
@@ -0,0 +1,153 @@
+;;;; -*- Mode: Lisp; Syntax: ANSI-Common-Lisp; Base: 10; Package: modlisp -*-
+;;;; *************************************************************************
+;;;; FILE IDENTIFICATION
+;;;;
+;;;; Name:          listener.lisp
+;;;; Purpose:       Listener and worker processes
+;;;; Programmer:    Kevin M. Rosenberg
+;;;; Date Started:  Dec 2002
+;;;;
+;;;; $Id: listener.lisp,v 1.1 2003/07/08 16:12:40 kevin Exp $
+;;;; *************************************************************************
+
+(in-package #:kmrcl)
+
+;;; Variables and data structures for Listener
+
+(defvar *listener-count* 0
+  "used to name listeners")
+
+(defvar *worker-count* 0
+  "used to name workers")
+
+(defvar *active-listeners* nil
+    "List of active listeners")
+
+(defclass listener ()
+  ((port :initarg :port :accessor port) 
+   (function :initarg :function :accessor listener-function
+            :initform nil)
+   (function-args :initarg :function-args :accessor function-args
+                 :initform nil)
+   (process :initarg :process :accessor process)
+   (socket :initarg :socket :accessor socket)
+   (workers :initform nil :accessor workers
+           :documentation "list of worker threads")
+   (name :initform "" :accessor name :initarg :name)
+   (base-name :initform "listener" :accessor base-name :initarg :base-name)
+   (wait :initform nil :accessor wait :initarg :wait)
+   (catch-errors :initform nil :accessor catch-errors :initarg :catch-errors)
+   (format :initform :text :accessor listener-format :initarg :format)))
+
+(defclass worker ()
+  ((listener :initarg :listener :accessor listener :initform nil)
+   (connection :initarg :connection :accessor connection :initform nil)
+   (name :initarg :name :accessor name :initform nil)
+   (thread-fun :initarg :thread-fun :accessor thread-fun :initform nil)
+   (process :initarg :process :accessor process :initform nil)))
+
+
+;; High-level API
+
+(defun init/listener (listener state)
+  (check-type listener listener)
+  (case state
+    (:start
+     (when (member listener *active-listeners*)
+       (warn "~&listener already started")
+       (return-from init/listener listener))
+     (handler-case
+        (progn
+          (setf (name listener) (next-server-name (base-name listener)))
+          (make-socket-server listener))
+       (error (e)
+        (format t "~&Error while trying to start listener~&  ~A" e)
+        (decf *listener-count*)
+        nil)
+       (:no-error (res)
+        (declare (ignore res))
+        (push listener *active-listeners*)
+        listener)))
+    (:stop
+     (unless (member listener *active-listeners*)
+       (warn "~&listener is not in active list")
+       (return-from init/listener listener))
+     (dolist (worker (workers listener))
+       (close-active-socket (connection worker))
+       (destroy-process (process worker)))
+     (setf (workers listener) nil)
+     (with-slots (process socket) listener
+       (errorset (close-passive-socket socket) t)
+       (errorset (destroy-process process) t))
+     (setq *active-listeners* (remove listener *active-listeners*)))
+    (:restart
+     (init/listener listener :stop)
+     (init/listener listener :start))))
+
+(defun stop-all/listener ()
+  (dolist (listener *active-listeners*)
+    (ignore-errors
+       (init/listener listener :stop))))
+
+;; Low-level functions
+
+(defun next-server-name (base-name)
+  (format nil "~A-socket-server-~D" base-name (incf *listener-count*))) 
+
+(defun next-worker-name (base-name)
+  (format nil "~A-worker-~D" base-name (incf *worker-count*)))
+
+(defun make-socket-server (listener)
+  (setf (socket listener) (create-inet-listener
+                          (port listener)
+                          :format (listener-format listener)))
+  (setf (process listener) (make-process
+                           (name listener)
+                           #'(lambda () (start-socket-server listener))))
+  listener)
+
+
+(defmethod initialize-instance :after
+    ((self worker) &key listener connection name &allow-other-keys)
+  (unless connection
+    (error "connection not provided to modlisp-worker"))
+  (setf (slot-value self 'listener) listener)
+  (setf (slot-value self 'name) name)
+  (setf (slot-value self 'connection) connection)
+  (setf (slot-value self 'thread-fun)
+       #'(lambda ()
+           (unwind-protect
+               (if (catch-errors listener)
+                   (handler-case
+                       (apply (listener-function listener)
+                              connection
+                              (function-args listener))
+                     (error (e)
+                       (cmsg "Error ~A [~A]" e name)))
+                 (apply (listener-function listener)
+                        connection
+                        (function-args listener)))
+         (progn
+           (errorset (close-active-socket connection) nil)
+           (cmsg-c :threads "~A ended" name)
+           (setf (workers listener)
+                 (remove self (workers listener))))))))
+
+(defun start-socket-server (listener)
+  (unwind-protect
+      (loop 
+       (let ((connection (accept-tcp-connection (socket listener))))
+        (if (wait listener)
+            (unwind-protect
+                 (apply (listener-function listener)
+                        connection
+                        (function-args listener))
+              (errorset (close connection) nil))
+            (let ((worker (make-instance 'worker :listener listener
+                                         :connection connection
+                                         :name (next-worker-name
+                                                (base-name listener)))))
+             (setf (process worker)
+               (make-process (name worker) (thread-fun worker)))
+             (push worker (workers listener))))))
+    (errorset (close-passive-socket (socket listener)) nil)))
index 0aa12281c1b46f0953abc9819777c3c9eb8b0bda..60a96e7c1c0ded47b5aa9ea60037861a7b74e396 100644 (file)
@@ -7,7 +7,7 @@
 ;;;; Programmer:    Kevin M. Rosenberg
 ;;;; Date Started:  Apr 2000
 ;;;;
-;;;; $Id: package.lisp,v 1.46 2003/07/08 00:12:51 kevin Exp $
+;;;; $Id: package.lisp,v 1.47 2003/07/08 16:11:19 kevin Exp $
 ;;;;
 ;;;; This file, part of KMRCL, is Copyright (c) 2002 by Kevin M. Rosenberg
 ;;;;
    #:get-output-stream-data
    #:dump-output-stream-data
    #:make-byte-array-input-stream
+
+   ;; sockets.lisp
+   #:make-active-socket
+   #:close-active-socket
+   
+   ;; listener.lisp
+   #:init/listener
+   #:stop-all/listener
+   #:listener
    ))
 
 
diff --git a/processes.lisp b/processes.lisp
new file mode 100644 (file)
index 0000000..0c9175f
--- /dev/null
@@ -0,0 +1,49 @@
+;;;; -*- Mode: Lisp; Syntax: ANSI-Common-Lisp; Base: 10; Package: modlisp -*-
+;;;; *************************************************************************
+;;;; FILE IDENTIFICATION
+;;;;
+;;;; Name:          processes.lisp
+;;;; Purpose:       Multiprocessing functions
+;;;; Programmer:    Kevin M. Rosenberg
+;;;; Date Started:  June 2003
+;;;;
+;;;; $Id: processes.lisp,v 1.1 2003/07/08 16:12:40 kevin Exp $
+;;;; *************************************************************************
+
+(in-package #:kmrcl)
+
+
+(defun make-process (name func)
+  #+cmu (mp:make-process func :name name)
+  #+allegro (mp:process-run-function name func)
+  #+lispworks (mp:process-run-function name nil func)
+  #+sb-thread (sb-thread:make-thread func)
+  #+clisp (funcall func)
+  )
+
+(defun destroy-process (process)
+  #+cmu (mp:destroy-process process)
+  #+allegro (mp:process-kill process)
+  #+sbcl-thread (sb-thread:destroy-thread process)
+  #+lispworks (mp:process-kill process)
+  )
+
+(defun make-lock (name)
+  #+allegro (mp:make-process-lock :name name)
+  #+cmu (mp:make-lock name)
+  #+lispworks (mp:make-lock :name name)
+  #+sbcl-thread (sb-thread:make-mutex :name name)
+  )
+
+(defmacro with-lock-held ((lock) &body body)
+  #+allegro
+  `(mp:with-process-lock (,lock) ,@body)
+  #+cmu
+  `(mp:with-lock-held (,lock) ,@body)
+  #+lispworks
+  `(mp:with-lock (,lock) ,@body)
+  #+sbcl-thread
+  `(sb-thread:with-recursive-lock (,lock) ,@body)
+  )
+
+
diff --git a/sockets.lisp b/sockets.lisp
new file mode 100644 (file)
index 0000000..116e155
--- /dev/null
@@ -0,0 +1,297 @@
+;;;; -*- Mode: Lisp; Syntax: ANSI-Common-Lisp; Base: 10; Package: modlisp -*-
+;;;; *************************************************************************
+;;;; FILE IDENTIFICATION
+;;;;
+;;;; Name:          sockets.lisp
+;;;; Purpose:       Socket functions
+;;;; Programmer:    Kevin M. Rosenberg with excerpts from portableaserve
+;;;; Date Started:  Jun 2003
+;;;;
+;;;; $Id: sockets.lisp,v 1.1 2003/07/08 16:12:40 kevin Exp $
+;;;; *************************************************************************
+
+(in-package #:kmrcl)
+
+;; Sockets
+
+#+lispworks
+(progn
+  
+(define-condition lw-stream-error (error)
+  ((stream :initarg :stream
+           :reader stream-error-stream)
+   (action :initarg :action
+           :reader stream-error-action)
+   (code :initarg :code
+         :reader stream-error-code)
+   (identifier :initarg :identifier
+               :reader stream-error-identifier))
+  (:report (lambda (condition stream)
+             (format stream "A stream error occured (action=~A identifier=~A code=~A stream=~S)."
+                     (stream-error-action condition)
+                     (stream-error-identifier condition)
+                     (stream-error-code condition)
+                     (stream-error-stream condition)))))
+
+(define-condition socket-error (lw-stream-error)
+  ()
+  (:report (lambda (condition stream)
+             (format stream "A socket error occured (action=~A identifier=~A code=~A stream=~S)."
+                     (stream-error-action condition)
+                     (stream-error-identifier condition)
+                     (stream-error-code condition)
+                     (stream-error-stream condition)))))
+
+(defclass socket ()
+    ((passive-socket :type fixnum
+                    :initarg :passive-socket
+                    :reader socket-os-fd)))
+  
+  (defclass passive-socket (socket)
+    ((element-type :type (member signed-byte unsigned-byte base-char)
+                  :initarg :element-type
+                  :reader element-type)
+     (port :type fixnum
+          :initarg :port
+          :reader local-port)))
+
+  (defmethod print-object ((passive-socket passive-socket) stream)
+    (print-unreadable-object (passive-socket stream :type t :identity nil)
+      (format stream "@~d on port ~d" (socket-os-fd passive-socket) (local-port passive-socket))))
+
+  (defclass binary-socket-stream (comm:socket-stream) ())
+(defclass input-binary-socket-stream (binary-socket-stream)())
+(defclass output-binary-socket-stream (binary-socket-stream)())
+(defclass bidirectional-binary-socket-stream (input-binary-socket-stream output-binary-socket-stream)())
+
+#+unix
+(defun %socket-error-identifier (code)
+  (case code
+    (32 :x-broken-pipe)
+    (98 :address-in-use)
+    (99 :address-not-available)
+    (100 :network-down)
+    (102 :network-reset)
+    (103 :connection-aborted)
+    (104 :connection-reset)
+    (105 :no-buffer-space)
+    (108 :shutdown)
+    (110 :connection-timed-out)
+    (111 :connection-refused)
+    (112 :host-down)
+    (113 :host-unreachable)
+    (otherwise :unknown)))
+
+#+win32
+(defun %socket-error-identifier (code)
+  (case code
+    (10048 :address-in-use)
+    (10049 :address-not-available)
+    (10050 :network-down)
+    (10052 :network-reset)
+    (10053 :connection-aborted)
+    (10054 :connection-reset)
+    (10055 :no-buffer-space)
+    (10058 :shutdown)
+    (10060 :connection-timed-out)
+    (10061 :connection-refused)
+    (10064 :host-down)
+    (10065 :host-unreachable)
+    (otherwise :unknown)))
+
+(defun socket-error (stream error-code action format-string &rest format-args)
+  (let ((code (if (numberp error-code) error-code #+unix(lw:errno-value))))
+    (error 'socket-error :stream stream :code code
+           :identifier (if (keywordp error-code)
+                           error-code
+                         (%socket-error-identifier error-code))
+           :action action
+           :format-control "~A occured while doing socket IO (~?)"
+           :format-arguments (list 'socket-error format-string format-args))))
+
+
+(defmethod comm::socket-error ((stream binary-socket-stream) error-code format-string &rest format-args)
+  (apply #'socket-error stream error-code :IO format-string format-args))
+
+(defmethod stream-input-available ((fd fixnum))
+  (comm::socket-listen fd))
+
+(defmethod stream-input-available ((stream stream::os-file-handle-stream))
+  (stream-input-available (stream::os-file-handle-stream-file-handle stream)))
+
+(defmethod stream-input-available ((stream comm:socket-stream))
+  (or (comm::socket-listen (comm:socket-stream-socket stream))
+      (listen stream)))
+
+(defmethod stream-input-available ((stream passive-socket))
+  (comm::socket-listen (socket-os-fd stream)))
+
+(defun %new-passive-socket (local-port)
+  (multiple-value-bind (socket error-location error-code)
+      (comm::create-tcp-socket-for-service local-port)
+    (cond (socket socket)
+         (t (error 'socket-error :action error-location :code error-code :identifier :unknown)))))
+
+) ;; lispworks
+
+#+sbcl
+(defun listen-to-inet-port (&key (port 0) (kind :stream) (reuse nil))
+  "Create, bind and listen to an inet socket on *:PORT.
+setsockopt SO_REUSEADDR if :reuse is not nil"
+  (let ((socket (make-instance 'sb-bsd-sockets:inet-socket
+                              :type :stream
+                              :protocol :tcp)))
+    (if reuse
+        (setf (sb-bsd-sockets:sockopt-reuse-address socket) t))
+    (sb-bsd-sockets:socket-bind 
+     socket (sb-bsd-sockets:make-inet-address "0.0.0.0") port)
+    (sb-bsd-sockets:socket-listen socket 15)
+    socket))
+
+(defun create-inet-listener (port &key (format :text) (reuse-address t))
+  #+cmu (ext:create-inet-listener port)
+  #+allegro
+  (socket:make-socket :connect :passive :local-port port :format :binary
+                     :address-family 
+                     (if (stringp port)
+                         :file
+                       (if (or (null port) (integerp port))
+                           :internet
+                         (error "illegal value for port: ~s" port)))
+                     :reuse-address reuse-address)
+  #+sbcl
+  (listen-to-inet-port :port port :reuse reuse-address)
+  #+clisp (ext:socket-server port)
+  #+lispworks
+  (let ((comm::*use_so_reuseaddr* reuse-address))
+    (make-instance 'passive-socket
+                  :port port
+                  :passive-socket (%new-passive-socket port)
+                  :element-type (case format
+                                  (:text 'base-char))))
+  )
+
+(defun make-fd-stream (socket &key input output element-type)
+  #+cmu
+  (sys:make-fd-stream socket :input input :output output
+                     :element-type element-type)
+  #+sbcl
+  (sb-bsd-sockets:socket-make-stream socket :input input :output output
+                                    :element-type element-type)
+  #-(or cmu sbcl) (declare (ignore input output element-type))
+  #-(or cmu sbcl) socket
+  )
+
+
+(defun accept-tcp-connection (listener)
+  #+cmu
+  (progn
+    (mp:process-wait-until-fd-usable listener :input)
+    (sys:make-fd-stream
+     (nth-value 0 (ext:accept-tcp-connection listener))
+     :input t :output t))
+  #+sbcl
+  (when (sb-sys:wait-until-fd-usable
+        (sb-bsd-sockets:socket-file-descriptor listener) :input)
+    (sb-bsd-sockets:socket-make-stream 
+     (sb-bsd-sockets:socket-accept listener)
+     :element-type 'base-char
+     :input t :output t))
+  #+allegro
+  (socket:accept-connection listener)
+  #+clisp
+  (ext:socket-accept listener)
+  #+lispworks
+  (progn
+    (loop while (not (stream-input-available listener))
+         do (sleep 1))
+    (make-instance 'bidirectional-binary-socket-stream
+                  :socket (comm::get-fd-from-socket
+                           (socket-os-fd listener))
+                  :direction :io
+                  :element-type (element-type listener)))
+    
+  )
+
+
+(defmacro errorset (form display)
+  `(handler-case
+    ,form
+    (error (e)
+     (declare (ignorable e))
+     (when ,display
+       (format t "~&Error: ~A~%" e)))))
+
+(defun close-passive-socket (socket)
+  #+allegro (close socket)
+  #+cmu (unix:unix-close socket)
+  #+sbcl (sb-unix:unix-close
+         (sb-bsd-sockets:socket-file-descriptor socket))
+  #+clisp (close socket)
+  #+lispworks (comm::close-socket (socket-os-fd socket))
+  )
+
+
+(defun close-active-socket (socket)
+  (close socket))
+
+#+sbcl
+(defun ipaddr-to-dotted (ipaddr &key values)
+  "Convert from 32-bit integer to dotted string."
+  (declare (type (unsigned-byte 32) ipaddr))
+  (let ((a (logand #xff (ash ipaddr -24)))
+       (b (logand #xff (ash ipaddr -16)))
+       (c (logand #xff (ash ipaddr -8)))
+       (d (logand #xff ipaddr)))
+    (if values
+       (values a b c d)
+      (format nil "~d.~d.~d.~d" a b c d))))
+
+#+sbcl
+(defun dotted-to-ipaddr (dotted &key (errorp t))
+  "Convert from dotted string to 32-bit integer."
+  (declare (string dotted))
+  (if errorp
+      (let ((ll (string-tokens (substitute #\Space #\. dotted))))
+       (+ (ash (first ll) 24) (ash (second ll) 16)
+          (ash (third ll) 8) (fourth ll)))
+    (ignore-errors
+       (let ((ll (string-tokens (substitute #\Space #\. dotted))))
+         (+ (ash (first ll) 24) (ash (second ll) 16)
+            (ash (third ll) 8) (fourth ll))))))
+
+#+sbcl
+(defun ipaddr-to-hostname (ipaddr &key ignore-cache)
+  (when ignore-cache
+    (warn ":IGNORE-CACHE keyword in IPADDR-TO-HOSTNAME not supported."))
+  (sb-bsd-sockets:host-ent-name
+   (sb-bsd-sockets:get-host-by-address
+    (sb-bsd-sockets:make-inet-address ipaddr))))
+
+#+sbcl
+(defun lookup-hostname (host &key ignore-cache)
+  (when ignore-cache
+    (warn ":IGNORE-CACHE keyword in LOOKUP-HOSTNAME not supported."))
+  (if (stringp host)
+      (sb-bsd-sockets:host-ent-address
+       (sb-bsd-sockets:get-host-by-name host))
+      (dotted-to-ipaddr (ipaddr-to-dotted host))))
+
+
+(defun make-active-socket (server port)
+  #+allegro (socket:make-socket :remote-host server
+                               :remote-port port)
+  #+lispworks (comm:open-tcp-stream server port)
+  #+sbcl (let ((socket (make-instance 'sb-bsd-sockets:inet-socket
+                                     :type :stream
+                                     :protocol :tcp)))
+          (sb-bsd-sockets:socket-connect
+           socket (lookup-hostname server) port)
+          (sb-bsd-sockets:socket-make-stream socket
+                                             :input t
+                                             :output t
+                                             :element-type 'base-char))
+  #+cmu 
+  (sys:make-fd-stream (ext:connect-to-inet-socket host port)
+                     :input t :output t :element-type 'base-char)
+  )