Skip to content

AFUNIXSocketChannel read returns 0 on EoF (should be -1 to align with Tcp Socket) #171

@aiquestion

Description

@aiquestion

Describe the bug
we found that AFUNIXSocketChannel read returns 0 on EoF (should be -1 to align with Tcp Socket).
And it should return -1 acoording to the doc in SocketChannel and Tcp Socket will return -1.

Image

My version is : 2.10.1
running on Linux x86

seems in the code: Java_org_newsclub_net_unix_NativeUnixSocket_receive

JNIEXPORT jint JNICALL Java_org_newsclub_net_unix_NativeUnixSocket_receive
(JNIEnv *env, jclass clazz CK_UNUSED, jobject fd, jobject buffer, jint offset, jint length, jobject addressBuffer, jint opt, jobject ancSupp, jint hardTimeoutMillis) {

    .....
    memset(senderBuf, 0, senderBufLen);
    ssize_t count = recvmsg_wrapper(env, handle, dataBufferRef.buf, length, senderBuf, &senderBufLen, opt, ancSupp);

    int theError;
    if(count == -1) {
        theError = errno;
    } else if(count == 0) {
        //##COMMENT## recv in kernal will return 0 when EoF
        // check if non-blocking below
        theError = EWOULDBLOCK;
    } else {
        return (jint)count;
    }

    if(checkNonBlocking0(handle, theError, opt)) {
        if(theError == 0 || theError == EAGAIN || theError == EWOULDBLOCK || theError == ETIMEDOUT
           || theError == EINTR) {
            // just return 0
            //##COMMENT## here we will return 0
        } else {
            _throwErrnumException(env, theError, fd);
        }
        return 0;
    } else if(theError == EWOULDBLOCK) {
.....
     }else {
.....
}

    return 0;
}

To Reproduce
here is my reproduce code:

package org.example.uds;

import org.newsclub.net.unix.AFUNIXSelectorProvider;
import org.newsclub.net.unix.AFUNIXServerSocketChannel;
import org.newsclub.net.unix.AFUNIXSocketAddress;
import org.newsclub.net.unix.AFUNIXSocketChannel;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.time.Instant;
import java.util.*;

public class UdsServer {
    private static final String SOCKET_PATH = "/tmp/test-uds.sock";
    private static boolean SHORT_CONN = false;
    private static final Map<Channel, Session> sessions = new HashMap<>();

    static class Session {
        long createTime;
        ByteBuffer buffer = ByteBuffer.allocate(1024);

        Session() {
            this.createTime = System.currentTimeMillis();
        }
    }

    public static void main(String[] args) throws IOException {
        for (String arg : args) {
            if (arg.startsWith("--short-conn=")) {
                SHORT_CONN = Boolean.parseBoolean(arg.split("=")[1]);
            }
        }
        log("Starting server. Short connection mode: " + SHORT_CONN);

        File socketFile = new File(SOCKET_PATH);
        // Ensure clean start
        if (socketFile.exists()) {
             socketFile.delete();
        }
        socketFile.deleteOnExit();

        AFUNIXSelectorProvider provider = AFUNIXSelectorProvider.provider();
        ServerSocketChannel serverChannel = provider.openServerSocketChannel();
        serverChannel.bind(AFUNIXSocketAddress.of(socketFile));
        serverChannel.configureBlocking(false);

        Selector selector = provider.openSelector();
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);

        log("Listening on " + SOCKET_PATH);

        while (true) {
            if (selector.select() == 0) {
                continue;
            }

            Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
            while (iter.hasNext()) {
                SelectionKey key = iter.next();
                iter.remove();

                if (!key.isValid()) continue;

                try {
                    if (key.isAcceptable()) {
                        handleAccept(serverChannel, selector);
                    } else if (key.isReadable()) {
                        handleRead(key);
                    } else if (key.isWritable()) {
                        handleWrite(key);
                    }
                } catch (IOException e) {
                    log("Error handling key: " + e.getMessage());
                    key.cancel();
                    try {
                         key.channel().close();
                    } catch (IOException ignored) {}
                }
            }
        }
    }

    private static void handleAccept(ServerSocketChannel serverChannel, Selector selector) throws IOException {
        SocketChannel client = serverChannel.accept();
        if (client == null) return;
        client.configureBlocking(false);
        client.register(selector, SelectionKey.OP_READ);
        sessions.put(client, new Session());
        log("New connection accepted: " + client);
    }

    private static void handleRead(SelectionKey key) throws IOException {
        SocketChannel client = (SocketChannel) key.channel();
        Session session = sessions.get(client);
        if (session == null) {
            client.close();
            return;
        }

        session.buffer.clear();
        int totalRead = 0;
        while (true) {
            int read = client.read(session.buffer);
            log("Read return value: " + read + " from " + client);
            
            if (read > 0) {
                totalRead += read;
                if (!session.buffer.hasRemaining()) {
                     // Buffer full, stop reading for now (simplified)
                     break;
                }
            } else if (read == -1) {
                closeSession(client);
                return;
            } else if (read == 0) {
                log("Read 0 bytes.");
                break;
            }
        }

        if (totalRead > 0) {
            session.buffer.flip();
            client.register(key.selector(), SelectionKey.OP_WRITE);
            log("Registered OP_WRITE for echo. Total read: " + totalRead);
        } else {
             // If we read 0 bytes and didn't get -1, we keep OP_READ interest (already set)
        }
    }

    private static void handleWrite(SelectionKey key) throws IOException {
        SocketChannel client = (SocketChannel) key.channel();
        Session session = sessions.get(client);
        if (session == null) return;

        if (session.buffer.hasRemaining()) {
            client.write(session.buffer);
        }

        if (!session.buffer.hasRemaining()) {
            if (SHORT_CONN) {
                log("Short connection mode: closing after write.");
                closeSession(client);
            } else {
                client.register(key.selector(), SelectionKey.OP_READ);
                log("Echo complete, registered OP_READ.");
            }
        }
    }

    private static void closeSession(SocketChannel client) throws IOException {
        Session session = sessions.remove(client);
        if (session != null) {
            long duration = System.currentTimeMillis() - session.createTime;
            log("Closing session. Duration: " + duration + "ms. Client: " + client);
        } else {
            log("Closing unknown session: " + client);
        }
        client.close();
    }

    private static void log(String msg) {
        System.out.println(String.format("[%s] %s", Instant.now(), msg));
    }
}

Test client

package org.example.uds;

import org.newsclub.net.unix.AFUNIXSocketAddress;
import org.newsclub.net.unix.AFUNIXSocketChannel;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;

public class UdsClient {
    private static final String SOCKET_PATH = "/tmp/test-uds.sock";

    public static void main(String[] args) throws InterruptedException, IOException {
        File socketFile = new File(SOCKET_PATH);
        AFUNIXSocketAddress address = AFUNIXSocketAddress.of(socketFile);

        System.out.println("=== Test Case 1: Send and immediately close ===");
        runCase1(address);
        Thread.sleep(2000); // Wait for server to process
    }

    private static void runCase1(AFUNIXSocketAddress address) throws IOException {
        try (AFUNIXSocketChannel channel = AFUNIXSocketChannel.open(address)) {
            String msg = "Hello Case 1";
            channel.write(ByteBuffer.wrap(msg.getBytes(StandardCharsets.UTF_8)));
            System.out.println("Sent: " + msg);
            // Close immediately by try-with-resources
        }
        System.out.println("Closed connection immediately.");
    }
}

After run it prints:

[2025-12-31T06:12:13.426030Z] Read return value: 0 from org.newsclub.net.unix.AFUNIXSocketChannel@58165325[local=org.newsclub.net.unix.AFUNIXSocketAddress[path=/tmp/test-uds.sock];remote=null]
[2025-12-31T06:12:13.426309Z] Read 0 bytes.

Metadata

Metadata

Assignees

Labels

bugThe issue describes a bug in the code

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions