r2753: move files
[clsql.git] / db-postgresql-socket / postgresql-socket-api.cl
diff --git a/db-postgresql-socket/postgresql-socket-api.cl b/db-postgresql-socket/postgresql-socket-api.cl
new file mode 100644 (file)
index 0000000..b64bdea
--- /dev/null
@@ -0,0 +1,832 @@
+;;;; -*- Mode: LISP; Syntax: ANSI-Common-Lisp; Base: 10 -*-
+;;;; *************************************************************************
+;;;; FILE IDENTIFICATION
+;;;;
+;;;; Name:          postgresql-socket.cl
+;;;; Purpose:       Low-level PostgreSQL interface using sockets
+;;;; Programmers:   Kevin M. Rosenberg based on
+;;;;                Original code by Pierre R. Mai 
+;;;;                
+;;;; Date Started:  Feb 2002
+;;;;
+;;;; $Id: postgresql-socket-api.cl,v 1.1 2002/09/18 07:43:41 kevin Exp $
+;;;;
+;;;; This file, part of CLSQL, is Copyright (c) 2002 by Kevin M. Rosenberg
+;;;; and Copyright (c) 1999-2001 by Pierre R. Mai
+;;;;
+;;;; CLSQL users are granted the rights to distribute and use this software
+;;;; as governed by the terms of the Lisp Lesser GNU Public License
+;;;; (http://opensource.franz.com/preamble.html), also known as the LLGPL.
+;;;; *************************************************************************
+
+
+;;;; Changes by Kevin Rosenberg
+;;;;  - Added socket open functions for Allegro and Lispworks
+;;;;  - Changed CMUCL FFI to UFFI
+;;;;  - Added necessary (force-output) for socket streams on 
+;;;;     Allegro and Lispworks
+;;;;  - Added initialization variable
+;;;;  - Added field type processing
+
+(declaim (optimize (debug 3) (speed 3) (safety 1) (compilation-speed 0)))
+(in-package :postgresql-socket)
+
+(uffi:def-enum pgsql-ftype
+    ((:bytea 17)
+     (:int2 21)
+     (:int4 23)
+     (:int8 20)
+     (:float4 700)
+     (:float8 701)))
+
+(defmethod database-type-library-loaded ((database-type
+                                         (eql :postgresql-socket)))
+  "T if foreign library was able to be loaded successfully. Always true for
+socket interface"
+  t)
+                                     
+
+;;; Message I/O stuff
+
+(defmacro define-message-constants (description &rest clauses)
+  (assert (evenp (length clauses)))
+  (loop with seen-characters = nil
+       for (name char) on clauses by #'cddr
+       for char-code = (char-code char)
+       for doc-string = (format nil "~A (~:C): ~A" description char name)
+       if (member char seen-characters)
+       do (error "Duplicate message type ~@C for group ~A" char description)
+       else
+       collect
+       `(defconstant ,name ,char-code ,doc-string)
+       into result-clauses
+       and do (push char seen-characters)
+      finally
+       (return `(progn ,@result-clauses))))
+
+(eval-when (:compile-toplevel :load-toplevel :execute)
+(define-message-constants "Backend Message Constants"
+  +ascii-row-message+ #\D
+  +authentication-message+ #\R
+  +backend-key-message+ #\K
+  +binary-row-message+ #\B
+  +completed-response-message+ #\C
+  +copy-in-response-message+ #\G
+  +copy-out-response-message+ #\H
+  +cursor-response-message+ #\P
+  +empty-query-response-message+ #\I
+  +error-response-message+ #\E
+  +function-response-message+ #\V
+  +notice-response-message+ #\N
+  +notification-response-message+ #\A
+  +ready-for-query-message+ #\Z
+  +row-description-message+ #\T))
+
+(defgeneric send-socket-value (type socket value))
+
+(defmethod send-socket-value ((type (eql 'int32)) socket (value integer))
+  (write-byte (ldb (byte 8 24) value) socket)
+  (write-byte (ldb (byte 8 16) value) socket)
+  (write-byte (ldb (byte 8 8) value) socket)
+  (write-byte (ldb (byte 8 0) value) socket))
+
+(defmethod send-socket-value ((type (eql 'int16)) socket (value integer))
+  (write-byte (ldb (byte 8 8) value) socket)
+  (write-byte (ldb (byte 8 0) value) socket))
+
+(defmethod send-socket-value ((type (eql 'int8)) socket (value integer))
+  (write-byte (ldb (byte 8 0) value) socket))
+
+(defmethod send-socket-value ((type (eql 'string)) socket (value string))
+  (loop for char across value
+       for code = (char-code char)
+       do (write-byte code socket)
+       finally (write-byte 0 socket)))
+
+(defmethod send-socket-value ((type (eql 'limstring)) socket (value string))
+  (loop for char across value
+       for code = (char-code char)
+       do (write-byte code socket)))
+
+(defmethod send-socket-value ((type (eql 'byte)) socket (value integer))
+  (write-byte value socket))
+
+(defmethod send-socket-value ((type (eql 'byte)) socket (value character))
+  (write-byte (char-code value) socket))
+
+(defmethod send-socket-value ((type (eql 'byte)) socket value)
+  (write-sequence value socket))
+
+(defgeneric read-socket-value (type socket))
+
+(defmethod read-socket-value ((type (eql 'int32)) socket)
+  (let ((result 0))
+    (setf (ldb (byte 8 24) result) (read-byte socket))
+    (setf (ldb (byte 8 16) result) (read-byte socket))
+    (setf (ldb (byte 8 8) result) (read-byte socket))
+    (setf (ldb (byte 8 0) result) (read-byte socket))
+    result))
+
+(defmethod read-socket-value ((type (eql 'int16)) socket)
+  (let ((result 0))
+    (setf (ldb (byte 8 8) result) (read-byte socket))
+    (setf (ldb (byte 8 0) result) (read-byte socket))
+    result))
+
+(defmethod read-socket-value ((type (eql 'int8)) socket)
+  (read-byte socket))
+
+(defmethod read-socket-value ((type (eql 'string)) socket)
+  (with-output-to-string (out)
+    (loop for code = (read-byte socket)
+         until (zerop code)
+         do (write-char (code-char code) out))))
+
+(defgeneric skip-socket-value (type socket))
+
+(defmethod skip-socket-value ((type (eql 'int32)) socket)
+  (dotimes (i 4) (read-byte socket)))
+
+(defmethod skip-socket-value ((type (eql 'int16)) socket)
+  (dotimes (i 2) (read-byte socket)))
+
+(defmethod skip-socket-value ((type (eql 'int8)) socket)
+  (read-byte socket))
+
+(defmethod skip-socket-value ((type (eql 'string)) socket)
+  (loop until (zerop (read-byte socket))))
+
+(defmacro define-message-sender (name (&rest args) &rest clauses)
+  (loop with socket-var = (gensym)
+       for (type value) in clauses
+       collect
+       `(send-socket-value ',type ,socket-var ,value)
+       into body
+      finally
+       (return
+         `(defun ,name (,socket-var ,@args)
+            ,@body))))
+
+(defun pad-limstring (string limit)
+  (let ((result (make-string limit :initial-element #\NULL)))
+    (loop for char across string
+         for index from 0 below limit
+         do (setf (char result index) char))
+    result))
+
+(define-message-sender send-startup-message
+    (database user &optional (command-line "") (backend-tty ""))
+  (int32 296)                           ; Length
+  (int32 #x00020000)                    ; Version 2.0
+  (limstring (pad-limstring database 64))
+  (limstring (pad-limstring user 32))
+  (limstring (pad-limstring command-line 64))
+  (limstring (pad-limstring "" 64))     ; Unused
+  (limstring (pad-limstring backend-tty 64)))
+
+(define-message-sender send-terminate-message ()
+  (byte #\X))
+
+(define-message-sender send-unencrypted-password-message (password)
+  (int32 (+ 5 (length password)))
+  (string password))
+
+(define-message-sender send-query-message (query)
+  (byte #\Q)
+  (string query))
+
+(define-message-sender send-encrypted-password-message (crypted-password)
+  (int32 (+ 5 (length crypted-password)))
+  (string crypted-password))
+
+(define-message-sender send-cancel-request (pid key)
+  (int32 16)                            ; Length
+  (int32 80877102)                      ; Magic
+  (int32 pid)
+  (int32 key))
+
+
+(defun read-socket-sequence (string stream)
+"KMR -- Added to support reading from binary stream into a string"
+  (declare (optimize (speed 3) (safety 0))
+          (string string))
+  (dotimes (i (length string))
+    (declare (fixnum i))
+    (setf (char string i) (code-char (read-byte stream))))
+  string)
+
+
+;;; Support for encrypted password transmission
+
+(defvar *crypt-library-loaded* nil)
+
+(defun crypt-password (password salt)
+  "Encrypt a password for transmission to a PostgreSQL server."
+  (unless *crypt-library-loaded*
+    (uffi:load-foreign-library 
+     (uffi:find-foreign-library "libcrypt"
+                          '("/usr/lib/" "/usr/local/lib/" "/lib/"))
+     :supporting-libaries '("c"))
+    (eval '(uffi:def-function "crypt" 
+           ((key :cstring)
+            (salt :cstring))
+           :returning :cstring))
+    (setq *crypt-library-loaded* t))
+   (uffi:with-cstring (password-cstring password)
+     (uffi:with-cstring (salt-cstring salt)
+       (uffi:convert-from-cstring 
+       (funcall (fdefinition 'crypt) password-cstring salt-cstring)))))
+;;; Condition hierarchy
+
+(define-condition postgresql-condition (condition)
+  ((connection :initarg :connection :reader postgresql-condition-connection)
+   (message :initarg :message :reader postgresql-condition-message))
+  (:report
+   (lambda (c stream)
+     (format stream "~@<~A occurred on connection ~A. ~:@_Reason: ~A~:@>"
+            (type-of c)
+            (postgresql-condition-connection c)
+            (postgresql-condition-message c)))))
+
+(define-condition postgresql-error (error postgresql-condition)
+  ())
+
+(define-condition postgresql-fatal-error (postgresql-error)
+  ())
+
+(define-condition postgresql-login-error (postgresql-fatal-error)
+  ())
+
+(define-condition postgresql-warning (warning postgresql-condition)
+  ())
+
+(define-condition postgresql-notification (postgresql-condition)
+  ()
+  (:report
+   (lambda (c stream)
+     (format stream "~@<Asynchronous notification on connection ~A: ~:@_~A~:@>"
+            (postgresql-condition-connection c)
+            (postgresql-condition-message c)))))
+
+;;; Structures
+
+(defstruct postgresql-connection
+  host
+  port
+  database
+  user
+  password
+  options
+  tty
+  socket
+  pid
+  key)
+
+(defstruct postgresql-cursor
+  connection
+  name
+  fields)
+
+;;; Socket stuff
+
+(defconstant +postgresql-server-default-port+ 5432
+  "Default port of PostgreSQL server.")
+
+(defvar *postgresql-server-socket-timeout* 60
+  "Timeout in seconds for reads from the PostgreSQL server.")
+
+
+#+cmu
+(defun open-postgresql-socket (host port)
+  (etypecase host
+    (pathname
+     ;; Directory to unix-domain socket
+     (ext:connect-to-unix-socket
+      (namestring
+       (make-pathname :name ".s.PGSQL" :type (princ-to-string port)
+                     :defaults host))))
+    (string
+     (ext:connect-to-inet-socket host port))))
+
+#+cmu
+(defun open-postgresql-socket-stream (host port)
+  (system:make-fd-stream
+   (open-postgresql-socket host port)
+   :input t :output t :element-type '(unsigned-byte 8)
+   :buffering :none
+   :timeout *postgresql-server-socket-timeout*))
+
+#+allegro
+(defun open-postgresql-socket-stream (host port)
+  (etypecase host
+    (pathname
+     (let ((path (namestring
+                 (make-pathname :name ".s.PGSQL" :type (princ-to-string port)
+                                :defaults host))))
+       (socket:make-socket :type :stream :address-family :file
+                          :connect :active
+                          :remote-filename path :local-filename path)))
+    (string
+     (socket:with-pending-connect
+        (mp:with-timeout (*postgresql-server-socket-timeout* (error "connect failed"))
+          (socket:make-socket :type :stream :address-family :internet
+                              :remote-port port :remote-host host
+                              :connect :active :nodelay t))))
+    ))
+
+#+lispworks
+(defun open-postgresql-socket-stream (host port)
+  (etypecase host
+    (pathname
+     (error "File sockets not supported on Lispworks."))
+    (string
+     (comm:open-tcp-stream host port :direction :io :element-type '(unsigned-byte 8)
+                          :read-timeout *postgresql-server-socket-timeout*))
+    ))
+
+;;; Interface Functions
+
+(defun open-postgresql-connection (&key (host (cmucl-compat:required-argument))
+                                       (port +postgresql-server-default-port+)
+                                       (database (cmucl-compat:required-argument))
+                                       (user (cmucl-compat:required-argument))
+                                       options tty password)
+  "Open a connection to a PostgreSQL server with the given parameters.
+Note that host, database and user arguments must be supplied.
+
+If host is a pathname, it is assumed to name a directory containing
+the local unix-domain sockets of the server, with port selecting which
+of those sockets to open.  If host is a string, it is assumed to be
+the name of the host running the PostgreSQL server.  In that case a
+TCP connection to the given port on that host is opened in order to
+communicate with the server.  In either case the port argument
+defaults to `+postgresql-server-default-port+'.
+
+Password is the clear-text password to be passed in the authentication
+phase to the server.  Depending on the server set-up, it is either
+passed in the clear, or encrypted via crypt and a server-supplied
+salt.  In that case the alien function specified by `*crypt-library*'
+and `*crypt-function-name*' is used for encryption.
+
+Note that all the arguments (including the clear-text password
+argument) are stored in the `postgresql-connection' structure, in
+order to facilitate automatic reconnection in case of communication
+troubles."
+  (reopen-postgresql-connection
+   (make-postgresql-connection :host host :port port
+                              :options (or options "") :tty (or tty "")
+                              :database database :user user
+                              :password (or password ""))))
+
+(defun reopen-postgresql-connection (connection)
+  "Reopen the given PostgreSQL connection.  Closes any existing
+connection, if it is still open."
+  (when (postgresql-connection-open-p connection)
+    (close-postgresql-connection connection))
+  (let ((socket (open-postgresql-socket-stream 
+                 (postgresql-connection-host connection)
+                 (postgresql-connection-port connection))))
+    (unwind-protect
+        (progn
+          (setf (postgresql-connection-socket connection) socket)
+          (send-startup-message socket
+                                (postgresql-connection-database connection)
+                                (postgresql-connection-user connection)
+                                (postgresql-connection-options connection)
+                                (postgresql-connection-tty connection))
+          (force-output socket)
+          (loop
+              (case (read-socket-value 'int8 socket)
+                (#.+authentication-message+
+                 (case (read-socket-value 'int32 socket)
+                   (0 (return))
+                   ((1 2)
+                    (error 'postgresql-login-error
+                           :connection connection
+                           :message
+                           "Postmaster expects unsupported Kerberos authentication."))
+                   (3
+                    (send-unencrypted-password-message
+                     socket
+                     (postgresql-connection-password connection)))
+                   (4
+                    (let ((salt (make-string 2)))
+                      (read-socket-sequence salt socket)
+                      (send-encrypted-password-message
+                       socket
+                       (crypt-password
+                        (postgresql-connection-password connection) salt))))
+                   (t
+                    (error 'postgresql-login-error
+                           :connection connection
+                           :message
+                           "Postmaster expects unknown authentication method."))))
+                (#.+error-response-message+
+                 (let ((message (read-socket-value 'string socket)))
+                   (error 'postgresql-login-error
+                          :connection connection :message message)))
+                (t
+                 (error 'postgresql-login-error
+                        :connection connection
+                        :message
+                        "Received garbled message from Postmaster"))))
+          ;; Start backend communication
+          (force-output socket)
+          (loop
+              (case (read-socket-value 'int8 socket)
+                (#.+backend-key-message+
+                 (setf (postgresql-connection-pid connection)
+                       (read-socket-value 'int32 socket)
+                       (postgresql-connection-key connection)
+                       (read-socket-value 'int32 socket)))
+                (#.+ready-for-query-message+
+                 (setq socket nil)
+                 (return connection))
+                (#.+error-response-message+
+                 (let ((message (read-socket-value 'string socket)))
+                   (error 'postgresql-login-error
+                          :connection connection
+                          :message message)))
+                (#.+notice-response-message+
+                 (let ((message (read-socket-value 'string socket)))
+                   (warn 'postgresql-warning :connection connection
+                         :message message)))
+                (t
+                 (error 'postgresql-login-error
+                        :connection connection
+                        :message
+                        "Received garbled message from Postmaster")))))
+      (when socket
+       (close socket)))))
+
+(defun close-postgresql-connection (connection &optional abort)
+  (unless abort
+    (ignore-errors
+      (send-terminate-message (postgresql-connection-socket connection))))
+  (close (postgresql-connection-socket connection)))
+
+(defun postgresql-connection-open-p (connection)
+  (let ((socket (postgresql-connection-socket connection)))
+    (and socket (streamp socket) (open-stream-p socket))))
+
+(defun ensure-open-postgresql-connection (connection)
+  (unless (postgresql-connection-open-p connection)
+    (reopen-postgresql-connection connection)))
+
+(defun process-async-messages (connection)
+  (assert (postgresql-connection-open-p connection))
+  ;; Process any asnychronous messages
+  (loop with socket = (postgresql-connection-socket connection)
+       while (listen socket)
+       do
+       (case (read-socket-value 'int8 socket)
+         (#.+notice-response-message+
+          (let ((message (read-socket-value 'string socket)))
+            (warn 'postgresql-warning :connection connection
+                  :message message)))
+         (#.+notification-response-message+
+          (let ((pid (read-socket-value 'int32 socket))
+                (message (read-socket-value 'string socket)))
+            (when (= pid (postgresql-connection-pid connection))
+              (signal 'postgresql-notification :connection connection
+                      :message message))))
+         (t
+          (close-postgresql-connection connection)
+          (error 'postgresql-fatal-error :connection connection
+                 :message "Received garbled message from backend")))))
+
+(defun start-query-execution (connection query)
+  (ensure-open-postgresql-connection connection)
+  (process-async-messages connection)
+  (send-query-message (postgresql-connection-socket connection) query)
+  (force-output (postgresql-connection-socket connection)))
+
+(defun wait-for-query-results (connection)
+  (assert (postgresql-connection-open-p connection))
+  (let ((socket (postgresql-connection-socket connection))
+       (cursor-name nil)
+       (error nil))
+    (loop
+       (case (read-socket-value 'int8 socket)
+         (#.+completed-response-message+
+          (return (values :completed (read-socket-value 'string socket))))
+         (#.+cursor-response-message+
+          (setq cursor-name (read-socket-value 'string socket)))
+         (#.+row-description-message+
+          (let* ((count (read-socket-value 'int16 socket))
+                 (fields
+                  (loop repeat count
+                    collect
+                    (list
+                     (read-socket-value 'string socket)
+                     (read-socket-value 'int32 socket)
+                     (read-socket-value 'int16 socket)
+                     (read-socket-value 'int32 socket)))))
+            (return
+              (values :cursor
+                      (make-postgresql-cursor :connection connection
+                                              :name cursor-name
+                                              :fields fields)))))
+         (#.+copy-in-response-message+
+          (return :copy-in))
+         (#.+copy-out-response-message+
+          (return :copy-out))
+         (#.+ready-for-query-message+
+          (when error
+            (error error))
+          (return nil))
+         (#.+error-response-message+
+          (let ((message (read-socket-value 'string socket)))
+            (setq error
+                  (make-condition 'postgresql-error
+                                  :connection connection :message message))))
+         (#.+notice-response-message+
+          (let ((message (read-socket-value 'string socket)))
+            (warn 'postgresql-warning
+                  :connection connection :message message)))
+         (#.+notification-response-message+
+          (let ((pid (read-socket-value 'int32 socket))
+                (message (read-socket-value 'string socket)))
+            (when (= pid (postgresql-connection-pid connection))
+              (signal 'postgresql-notification :connection connection
+                      :message message))))
+         (t
+          (close-postgresql-connection connection)
+          (error 'postgresql-fatal-error :connection connection
+                 :message "Received garbled message from backend"))))))
+
+(defun read-null-bit-vector (socket count)
+  (let ((result (make-array count :element-type 'bit)))
+    (dotimes (offset (ceiling count 8))
+      (loop with byte = (read-byte socket)
+           for index from (* offset 8) below (min count (* (1+ offset) 8))
+           for weight downfrom 7
+           do (setf (aref result index) (ldb (byte 1 weight) byte))))
+    result))
+
+
+(defun read-field (socket type)
+  (let ((length (- (read-socket-value 'int32 socket) 4)))
+    (case type
+      ((:int32 :int64)
+       (read-integer-from-socket socket length))
+      (:double
+       (read-double-from-socket socket length))
+      (t
+       (let ((result (make-string length)))
+        (read-socket-sequence result socket)
+        result)))))
+
+(uffi:def-constant +char-code-zero+ (char-code #\0))
+(uffi:def-constant +char-code-minus+ (char-code #\-))
+(uffi:def-constant +char-code-plus+ (char-code #\+))
+(uffi:def-constant +char-code-period+ (char-code #\.))
+(uffi:def-constant +char-code-lower-e+ (char-code #\e))
+(uffi:def-constant +char-code-upper-e+ (char-code #\E))
+
+(defun read-integer-from-socket (socket length)
+  (declare (fixnum length))
+  (if (zerop length)
+      nil
+    (let ((val 0)
+         (first-char (read-byte socket))
+         (minusp nil))
+      (declare (fixnum first-char))
+      (decf length) ;; read first char
+      (cond
+       ((= first-char +char-code-minus+)
+       (setq minusp t))
+       ((= first-char +char-code-plus+)
+       )               ;; nothing to do
+       (t
+       (setq val (- first-char +char-code-zero+))))
+      
+      (dotimes (i length)
+       (declare (fixnum i))
+       (setq val (+
+                  (* 10 val)
+                  (- (read-byte socket) +char-code-zero+))))
+      (if minusp
+         (- val)
+       val))))
+
+(defmacro ascii-digit (int)
+  (let ((offset (gensym)))
+    `(let ((,offset (- ,int +char-code-zero+)))
+      (declare (fixnum ,int ,offset))
+      (if (and (>= ,offset 0)
+              (< ,offset 10))
+         ,offset
+         nil))))
+      
+(defun read-double-from-socket (socket length)
+  (declare (fixnum length))
+  (let ((before-decimal 0)
+       (after-decimal 0)
+       (decimal-count 0)
+       (exponent 0)
+       (decimalp nil)
+       (minusp nil)
+       (result nil)
+       (char (read-byte socket)))
+    (declare (fixnum char exponent decimal-count))
+    (decf length) ;; already read first character
+    (cond
+      ((= char +char-code-minus+)
+       (setq minusp t))
+      ((= char +char-code-plus+)
+       )
+      ((= char +char-code-period+)
+       (setq decimalp t))
+      (t
+       (setq before-decimal (ascii-digit char))
+       (unless before-decimal
+        (error "Unexpected value"))))
+    
+    (block loop
+      (dotimes (i length)
+       (setq char (read-byte socket))
+       ;;      (format t "~&len:~D, i:~D, char:~D, minusp:~A, decimalp:~A" length i char minusp decimalp)
+       (let ((weight (ascii-digit char)))
+         (cond 
+          ((and weight (not decimalp)) ;; before decimal point
+           (setq before-decimal (+ weight (* 10 before-decimal))))
+          ((and weight decimalp) ;; after decimal point
+           (setq after-decimal (+ weight (* 10 after-decimal)))
+           (incf decimal-count))
+          ((and (= char +char-code-period+))
+           (setq decimalp t))
+          ((or (= char +char-code-lower-e+)          ;; E is for exponent
+               (= char +char-code-upper-e+))
+           (setq exponent (read-integer-from-socket socket (- length i 1)))
+           (setq exponent (or exponent 0))
+           (return-from loop))
+         (t 
+          (break "Unexpected value"))
+         )
+       )))
+    (setq result (* (+ (coerce before-decimal 'double-float)
+                      (* after-decimal 
+                         (expt 10 (- decimal-count))))
+                   (expt 10 exponent)))
+    (if minusp
+       (- result)
+       result)))
+       
+      
+#+ignore
+(defun read-double-from-socket (socket length)
+  (let ((result (make-string length)))
+    (read-socket-sequence result socket)
+    (let ((*read-default-float-format* 'double-float))
+      (read-from-string result))))
+
+(defun read-cursor-row (cursor types)
+  (let* ((connection (postgresql-cursor-connection cursor))
+        (socket (postgresql-connection-socket connection))
+        (fields (postgresql-cursor-fields cursor)))
+    (assert (postgresql-connection-open-p connection))
+    (loop
+       (let ((code (read-socket-value 'int8 socket)))
+         (case code
+           (#.+ascii-row-message+
+            (return
+              (loop with count = (length fields)
+                    with null-vector = (read-null-bit-vector socket count)
+                    repeat count
+                    for null-bit across null-vector
+                    for i from 0
+                    for null-p = (zerop null-bit)
+                    if null-p
+                    collect nil
+                    else
+                    collect
+                    (read-field socket (nth i types)))))
+           (#.+binary-row-message+
+            (error "NYI"))
+           (#.+completed-response-message+
+            (return (values nil (read-socket-value 'string socket))))
+           (#.+error-response-message+
+            (let ((message (read-socket-value 'string socket)))
+              (error 'postgresql-error
+                     :connection connection :message message)))
+           (#.+notice-response-message+
+            (let ((message (read-socket-value 'string socket)))
+              (warn 'postgresql-warning
+                    :connection connection :message message)))
+           (#.+notification-response-message+
+            (let ((pid (read-socket-value 'int32 socket))
+                  (message (read-socket-value 'string socket)))
+              (when (= pid (postgresql-connection-pid connection))
+                (signal 'postgresql-notification :connection connection
+                        :message message))))
+           (t
+            (close-postgresql-connection connection)
+            (error 'postgresql-fatal-error :connection connection
+                   :message "Received garbled message from backend")))))))
+
+(defun map-into-indexed (result-seq func seq)
+  (dotimes (i (length seq))
+    (declare (fixnum i))
+    (setf (elt result-seq i)
+         (funcall func (elt seq i) i)))
+  result-seq)
+
+(defun copy-cursor-row (cursor sequence types)
+  (let* ((connection (postgresql-cursor-connection cursor))
+        (socket (postgresql-connection-socket connection))
+        (fields (postgresql-cursor-fields cursor)))
+    (assert (= (length fields) (length sequence)))
+    (loop
+       (let ((code (read-socket-value 'int8 socket)))
+         (case code
+           (#.+ascii-row-message+
+            (return
+              #+ignore
+              (let* ((count (length sequence))
+                     (null-vector (read-null-bit-vector socket count)))
+                (dotimes (i count)
+                  (declare (fixnum i))
+                  (if (zerop (elt null-vector i))
+                      (setf (elt sequence i) nil)
+                      (let ((value (read-field socket (nth i types))))
+                        (setf (elt sequence i) value)))))
+              (map-into-indexed
+               sequence
+               #'(lambda (null-bit i)
+                   (if (zerop null-bit)
+                       nil
+                       (read-field socket (nth i types))))
+               (read-null-bit-vector socket (length sequence)))))
+           (#.+binary-row-message+
+            (error "NYI"))
+           (#.+completed-response-message+
+            (return (values nil (read-socket-value 'string socket))))
+           (#.+error-response-message+
+            (let ((message (read-socket-value 'string socket)))
+              (error 'postgresql-error
+                     :connection connection :message message)))
+           (#.+notice-response-message+
+            (let ((message (read-socket-value 'string socket)))
+              (warn 'postgresql-warning
+                    :connection connection :message message)))
+           (#.+notification-response-message+
+            (let ((pid (read-socket-value 'int32 socket))
+                  (message (read-socket-value 'string socket)))
+              (when (= pid (postgresql-connection-pid connection))
+                (signal 'postgresql-notification :connection connection
+                        :message message))))
+           (t
+            (close-postgresql-connection connection)
+            (error 'postgresql-fatal-error :connection connection
+                   :message "Received garbled message from backend")))))))
+
+(defun skip-cursor-row (cursor)
+  (let* ((connection (postgresql-cursor-connection cursor))
+        (socket (postgresql-connection-socket connection))
+        (fields (postgresql-cursor-fields cursor)))
+    (loop
+       (let ((code (read-socket-value 'int8 socket)))
+         (case code
+           (#.+ascii-row-message+
+            (loop for null-bit across
+                  (read-null-bit-vector socket (length fields))
+                  do
+                  (unless (zerop null-bit)
+                    (let* ((length (read-socket-value 'int32 socket)))
+                      (loop repeat (- length 4) do (read-byte socket)))))
+            (return t))
+           (#.+binary-row-message+
+            (error "NYI"))
+           (#.+completed-response-message+
+            (return (values nil (read-socket-value 'string socket))))
+           (#.+error-response-message+
+            (let ((message (read-socket-value 'string socket)))
+              (error 'postgresql-error
+                     :connection connection :message message)))
+           (#.+notice-response-message+
+            (let ((message (read-socket-value 'string socket)))
+              (warn 'postgresql-warning
+                    :connection connection :message message)))
+           (#.+notification-response-message+
+            (let ((pid (read-socket-value 'int32 socket))
+                  (message (read-socket-value 'string socket)))
+              (when (= pid (postgresql-connection-pid connection))
+                (signal 'postgresql-notification :connection connection
+                        :message message))))
+           (t
+            (close-postgresql-connection connection)
+            (error 'postgresql-fatal-error :connection connection
+                   :message "Received garbled message from backend")))))))
+
+(defun run-query (connection query &optional (types nil))
+  (start-query-execution connection query)
+  (multiple-value-bind (status cursor)
+      (wait-for-query-results connection)
+    (assert (eq status :cursor))
+    (loop for row = (read-cursor-row cursor types)
+         while row
+         collect row
+         finally
+         (wait-for-query-results connection))))