Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 73 additions & 38 deletions src/test/java/io/socket/client/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,61 +18,88 @@ public abstract class Connection {

final static int TIMEOUT = 7_000;
final static int PORT = 3000;
final static int NO_RECOVERY_PORT = 3001;

private Process serverProcess;
private Process noRecoveryServerProcess;
private ExecutorService serverService;
private Future serverOutput;
private Future serverError;
private Future<?> serverOutput;
private Future<?> serverError;
private Future<?> noRecoveryServerOutput;
private Future<?> noRecoveryServerError;

@Before
public void startServer() throws IOException, InterruptedException {
logger.fine("Starting server ...");
logger.fine("Starting servers...");

// Start main server
final CountDownLatch latch = new CountDownLatch(1);
serverProcess = Runtime.getRuntime().exec(
String.format("node src/test/resources/server.js %s", nsp()), createEnv());
serverProcess = startServerProcess("node src/test/resources/server.js %s", PORT);
serverService = Executors.newCachedThreadPool();
serverOutput = serverService.submit(new Runnable() {
@Override
public void run() {
BufferedReader reader = new BufferedReader(
new InputStreamReader(serverProcess.getInputStream()));
String line;
try {
line = reader.readLine();
latch.countDown();
do {
logger.fine("SERVER OUT: " + line);
} while ((line = reader.readLine()) != null);
} catch (IOException e) {
logger.warning(e.getMessage());
}
serverOutput = startServerOutput(serverProcess, "MAIN", latch);
serverError = startServerError(serverProcess, "MAIN");

// Start no-recovery server
final CountDownLatch noRecoveryLatch = new CountDownLatch(1);
noRecoveryServerProcess = startServerProcess("node src/test/resources/server_no_recovery.js %s", NO_RECOVERY_PORT);
noRecoveryServerOutput = startServerOutput(noRecoveryServerProcess, "NO_RECOVERY", noRecoveryLatch);
noRecoveryServerError = startServerError(noRecoveryServerProcess, "NO_RECOVERY");

// Wait for both servers to start
latch.await(3000, TimeUnit.MILLISECONDS);
noRecoveryLatch.await(3000, TimeUnit.MILLISECONDS);
}

private Process startServerProcess(String script, int port) throws IOException {
return Runtime.getRuntime().exec(String.format(script, nsp()), createEnv(port));
}

private Future<?> startServerOutput(Process process, String serverName, CountDownLatch latch) {
return serverService.submit(() -> {
BufferedReader reader = new BufferedReader(
new InputStreamReader(process.getInputStream()));
String line;
try {
line = reader.readLine();
latch.countDown();
do {
logger.fine(serverName + " SERVER OUT: " + line);
} while ((line = reader.readLine()) != null);
} catch (IOException e) {
logger.warning(e.getMessage());
}
});
serverError = serverService.submit(new Runnable() {
@Override
public void run() {
BufferedReader reader = new BufferedReader(
new InputStreamReader(serverProcess.getErrorStream()));
String line;
try {
while ((line = reader.readLine()) != null) {
logger.fine("SERVER ERR: " + line);
}
} catch (IOException e) {
logger.warning(e.getMessage());
}

private Future<?> startServerError(Process process, String serverName) {
return serverService.submit(() -> {
BufferedReader reader = new BufferedReader(
new InputStreamReader(process.getErrorStream()));
String line;
try {
while ((line = reader.readLine()) != null) {
logger.fine(serverName + " SERVER ERR: " + line);
}
} catch (IOException e) {
logger.warning(e.getMessage());
}
});
latch.await(3000, TimeUnit.MILLISECONDS);
}

@After
public void stopServer() throws InterruptedException {
logger.fine("Stopping server ...");
logger.fine("Stopping servers...");

// Stop main server
serverProcess.destroy();
serverOutput.cancel(false);
serverError.cancel(false);

// Stop no-recovery server
noRecoveryServerProcess.destroy();
noRecoveryServerOutput.cancel(false);
noRecoveryServerError.cancel(false);

serverService.shutdown();
serverService.awaitTermination(3000, TimeUnit.MILLISECONDS);
}
Expand All @@ -90,11 +117,16 @@ Socket client(IO.Options opts) {
}

Socket client(String path, IO.Options opts) {
return IO.socket(URI.create(uri() + path), opts);
int port = opts.port != -1 ? opts.port : PORT;
return IO.socket(URI.create(uri(port) + path), opts);
}

URI uri() {
return URI.create("http://localhost:" + PORT);
return uri(PORT);
}

URI uri(int port) {
return URI.create("http://localhost:" + port);
}

String nsp() {
Expand All @@ -108,16 +140,19 @@ IO.Options createOptions() {
}

String[] createEnv() {
return createEnv(PORT);
}

String[] createEnv(int port) {
Map<String, String> env = new HashMap<>(System.getenv());
env.put("DEBUG", "socket.io:*");
env.put("PORT", String.valueOf(PORT));
env.put("PORT", String.valueOf(port));
String[] _env = new String[env.size()];
int i = 0;
for (String key : env.keySet()) {
_env[i] = key + "=" + env.get(key);
i++;
}
return _env;

}
}
7 changes: 1 addition & 6 deletions src/test/java/io/socket/client/SocketTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,7 @@ public void shouldChangeSocketIdUponReconnection() throws InterruptedException {

IO.Options opts = createOptions();
opts.forceNew = true;
try {
JSONObject auth = new JSONObject();
auth.put("noRecovery", true);
opts.auth = auth;
} catch (JSONException ignored) {
}
opts.port = Connection.NO_RECOVERY_PORT;

socket = client(opts);
socket.once(Socket.EVENT_CONNECT, new Emitter.Listener() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@
public class ConnectionFailure {

public static void main(String[] args) throws URISyntaxException {
int port = Integer.parseInt(System.getenv("PORT"));
port++;
int port = 60_000;
IO.Options options = new IO.Options();
options.forceNew = true;
options.reconnection = false;
Expand Down
8 changes: 0 additions & 8 deletions src/test/resources/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,6 @@ var port = process.env.PORT || 3000;
var nsp = process.argv[2] || '/';
var slice = Array.prototype.slice;

// Disable recovery on demand
io.use((socket, next) => {
if (socket.handshake.auth?.noRecovery === true) {
socket.handshake.auth._pid = 'invalid-' + Date.now();
}
next();
});

const fooNsp = io.of('/foo');

fooNsp.on('connection', (socket) => {
Expand Down
31 changes: 31 additions & 0 deletions src/test/resources/server_no_recovery.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
var fs = require('fs');

var server;
if (process.env.SSL) {
server = require('https').createServer({
key: fs.readFileSync(__dirname + '/key.pem'),
cert: fs.readFileSync(__dirname + '/cert.pem')
});
} else {
server = require('http').createServer();
}

// Create server without connection state recovery
var io = require('socket.io')(server, {
pingInterval: 2000
});

var port = process.env.PORT || 3001; // Different port to avoid conflicts
var nsp = process.argv[2] || '/';

server.listen(port, () => {
console.log(`Test server without recovery running on port ${port}`);
});

io.of(nsp).on('connection', (socket) => {
console.log(`New connection: ${socket.id}`);

socket.on('disconnect', () => {
console.log(`Client disconnected: ${socket.id}`);
});
});
Loading