Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 65 additions & 1 deletion src/unix_stream.carp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,19 @@
(let [n (send-bytes- stream data)]
(if (= n -1) (Result.Error (System.error-text)) (Result.Success n))))

(private send-len-)
(hidden send-len-)
(register send-len-
(Fn [&UnixStream &String Int] Int)
"UnixStream_send_MINUS_len_")

(doc send-len "sends a string with known length (avoids strlen). `len` must be
between 0 and the string’s length; returns an error otherwise.
Returns bytes sent or an error.")
(defn send-len [stream msg len]
(let [n (send-len- stream msg len)]
(if (= n -1) (Result.Error (System.error-text)) (Result.Success n))))

(doc read
"reads up to 4096 bytes from the stream. Returns the data as a string, or an error.
Returns an empty string on connection close.")
Expand All @@ -83,7 +96,11 @@ Returns bytes read (0 = connection closed), or an error.")
(if (= n -1) (Result.Error (System.error-text)) (Result.Success n))))

(register close (Fn [UnixStream] ()))
(doc close "closes the stream, releasing the file descriptor.")
(doc close "closes the stream, consuming it.")

(doc close! "closes the stream by reference, for use when the stream lives
in a collection. Sets the fd to -1 to prevent double-close.")
(register close! (Fn [&UnixStream] ()) "UnixStream_close_MINUS_ref")

(doc shutdown "shuts down the stream. 0 = reads, 1 = writes, 2 = both.")
(register shutdown- (Fn [&UnixStream Int] ()) "UnixStream_shutdown_")
Expand All @@ -98,12 +115,59 @@ Returns bytes read (0 = connection closed), or an error.")
(doc set-timeout "sets read and write timeouts in seconds.")
(register set-timeout (Fn [&UnixStream Int] ()))

(doc set-nonblocking "puts the socket into non-blocking mode. After this
call, `send-nb` and `read-append-nb` are the appropriate I/O entry points;
the blocking variants will return `EAGAIN` instead of waiting.")
(register set-nonblocking (Fn [&UnixStream] ()))

(private send-nb-)
(hidden send-nb-)
(register send-nb-
(Fn [&UnixStream &(Array Byte) Int] Int)
"UnixStream_send_MINUS_nb_")

(doc send-nb "non-blocking send. Sends as many bytes as the kernel will
accept right now from `data`, starting at `offset`.

Returns `(Result Int String)`. The `Int` is the number of bytes actually
written, which may be `0` if the socket is not currently writable. Re-arm
write interest on the next event-loop iteration in that case.")
(defn send-nb [stream data offset]
(let [n (send-nb- stream data offset)]
(if (= n -1) (Result.Error (System.error-text)) (Result.Success n))))

(private read-append-nb-)
(hidden read-append-nb-)
(register read-append-nb-
(Fn [&UnixStream &(Array Byte)] Int)
"UnixStream_read_MINUS_append_MINUS_nb_")

(doc read-blocked "sentinel returned by `read-append-nb` when no data is
currently available on a non-blocking socket.")
(def read-blocked -2)

(doc read-append-nb "non-blocking append-read. Reads whatever the kernel
has buffered into `buf`, growing it as needed.

Returns `(Result Int String)`. The `Int` is one of:

- `> 0` bytes appended to `buf`,
- `0` peer closed cleanly (EOF),
- `read-blocked` (-2) socket has no data right now, retry on the next
readable event.")
(defn read-append-nb [stream buf]
(let [n (read-append-nb- stream buf)]
(if (= n -1) (Result.Error (System.error-text)) (Result.Success n))))

(doc peer-path "returns the path of the remote peer socket.")
(register peer-path (Fn [&UnixStream] String))

(register copy (Fn [&UnixStream] UnixStream))
(implements copy UnixStream.copy)

(register prn (Fn [UnixStream] String) "UnixStream_prn_")
(implements prn UnixStream.prn)

(defn poll-fd [s] (fd- s))
(implements poll-fd UnixStream.poll-fd)

Expand Down
43 changes: 43 additions & 0 deletions src/unix_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,49 @@ String UnixStream_peer_MINUS_path(UnixStream* s) {
return str;
}

void UnixStream_close_MINUS_ref(UnixStream* s) {
if (s->fd >= 0) { close(s->fd); s->fd = -1; }
}

void UnixStream_set_MINUS_nonblocking(UnixStream* s) {
int flags = fcntl(s->fd, F_GETFL, 0);
if (flags >= 0) fcntl(s->fd, F_SETFL, flags | O_NONBLOCK);
}

int UnixStream_send_MINUS_len_(UnixStream* s, String* msg, int len) {
if (len < 0 || len > (int)strlen(*msg)) return -1;
return (int)send_all(s->fd, *msg, (size_t)len);
}

int UnixStream_send_MINUS_nb_(UnixStream* s, Array* data, int offset) {
if (offset < 0 || offset >= data->len) return 0;
ssize_t n = send(s->fd, (char*)data->data + offset,
(size_t)(data->len - offset), 0);
if (n >= 0) return (int)n;
if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) return 0;
return -1;
}

int UnixStream_read_MINUS_append_MINUS_nb_(UnixStream* s, Array* buf) {
if ((int)(buf->capacity - buf->len) < SOCK_BUF_SIZE) {
int new_cap = (buf->len + SOCK_BUF_SIZE) * 2;
buf->data = CARP_REALLOC(buf->data, new_cap);
buf->capacity = new_cap;
}
ssize_t r = read(s->fd, (char*)buf->data + buf->len, SOCK_BUF_SIZE);
if (r > 0) { buf->len += (int)r; return (int)r; }
if (r == 0) return 0;
if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) return -2;
return -1;
}

String UnixStream_prn_(UnixStream s) {
size_t len = (size_t)snprintf(NULL, 0, "UnixStream(%d)", s.fd);
String r = CARP_MALLOC(len + 1);
snprintf(r, len + 1, "UnixStream(%d)", s.fd);
return r;
}

UnixStream UnixStream_copy(UnixStream* s) {
UnixStream c;
c.fd = s->fd;
Expand Down
118 changes: 117 additions & 1 deletion test/unix_test.carp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,112 @@
(UnixListener.close listener)
false))))))))

(defn unix-nb-roundtrip []
(let [path @"/tmp/carp_test_unix_nb.sock"]
(match (UnixListener.bind &path)
(Result.Error _) false
(Result.Success listener)
(match (UnixStream.connect &path)
(Result.Error _) (do (UnixListener.close listener) false)
(Result.Success client)
(match (UnixListener.accept &listener)
(Result.Error _)
(do
(UnixStream.close client)
(UnixListener.close listener)
false)
(Result.Success conn)
(let-do [buf (the (Array Byte) (Array.allocate 1024))
payload (the (Array Byte)
[(Char.to-byte \h) (Char.to-byte \i)])
ok true]
(UnixStream.set-nonblocking &client)
; nothing to read yet -> read-blocked sentinel
(match (UnixStream.read-append-nb &client &buf)
(Result.Success n)
(when (/= n UnixStream.read-blocked) (set! ok false))
(Result.Error _) (set! ok false))
; peer writes, we should be able to read it
(ignore (UnixStream.send &conn "ping"))
(System.sleep-micros 50000)
(match (UnixStream.read-append-nb &client &buf)
(Result.Success n) (when (/= n 4) (set! ok false))
(Result.Error _) (set! ok false))
; non-blocking send returns bytes written
(match (UnixStream.send-nb &client &payload 0)
(Result.Success n) (when (/= n 2) (set! ok false))
(Result.Error _) (set! ok false))
; send-nb with offset skips leading bytes
(match (UnixStream.send-nb &client &payload 1)
(Result.Success n) (when (/= n 1) (set! ok false))
(Result.Error _) (set! ok false))
; send-nb with negative offset returns 0 (guard)
(match (UnixStream.send-nb &client &payload -1)
(Result.Success n) (when (/= n 0) (set! ok false))
(Result.Error _) (set! ok false))
(UnixStream.close client)
(UnixStream.close conn)
(UnixListener.close listener)
ok))))))

(defn unix-send-len-test []
(let [path @"/tmp/carp_test_unix_sendlen.sock"]
(match (UnixListener.bind &path)
(Result.Error _) false
(Result.Success listener)
(match (UnixStream.connect &path)
(Result.Error _) (do (UnixListener.close listener) false)
(Result.Success client)
(match (UnixListener.accept &listener)
(Result.Error _)
(do
(UnixStream.close client)
(UnixListener.close listener)
false)
(Result.Success conn)
(do
; send only first 3 bytes of "hello"
(ignore (UnixStream.send-len &client "hello" 3))
(match (the (Result String String) (UnixStream.read &conn))
(Result.Success msg)
(let-do [ok (= &msg "hel")]
(UnixStream.close conn)
(UnixStream.close client)
(UnixListener.close listener)
ok)
_
(do
(UnixStream.close conn)
(UnixStream.close client)
(UnixListener.close listener)
false))))))))

(defn unix-close-ref-test []
(let [path @"/tmp/carp_test_unix_closeref.sock"]
(match (UnixListener.bind &path)
(Result.Error _) false
(Result.Success listener)
(match (UnixStream.connect &path)
(Result.Error _) (do (UnixListener.close listener) false)
(Result.Success client)
(match (UnixListener.accept &listener)
(Result.Error _)
(do
(UnixStream.close client)
(UnixListener.close listener)
false)
(Result.Success conn)
(do
; close! should work on a reference
(UnixStream.close! &conn)
; the stream is now closed; send should fail
(let-do [ok (match (UnixStream.send &conn "test")
(Result.Error _) true
_ false)]
(UnixStream.close client)
(UnixListener.close listener)
ok)))))))

(deftest test
(assert-true test
(match (UnixListener.bind "/tmp/carp_test_unix2.sock")
Expand All @@ -46,4 +152,14 @@
_ false)
"UnixStream.connect fails on nonexistent path")

(assert-true test (unix-roundtrip) "Unix socket roundtrip: send and receive"))
(assert-true test (unix-roundtrip) "Unix socket roundtrip: send and receive")

(assert-true test
(unix-nb-roundtrip)
"non-blocking I/O: read-blocked, then read 4 bytes, then send 2 bytes")

(assert-true test
(unix-send-len-test)
"send-len sends only the specified number of bytes")

(assert-true test (unix-close-ref-test) "close! closes stream by reference"))
Loading