Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,8 @@
</execution>
</executions>
<configuration>
<source>1.8</source>
<target>1.8</target>
<source>1.7</source>
<target>1.7</target>
<compilerArgs>
<arg>-Xlint:unchecked</arg>
</compilerArgs>
Expand Down
192 changes: 113 additions & 79 deletions src/main/java/io/socket/client/Socket.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,19 +84,31 @@ private void subEvents() {

final Manager io = Socket.this.io;
Socket.this.subs = new LinkedList<On.Handle>() {{
add(On.on(io, Manager.EVENT_OPEN, args -> {
Socket.this.onopen();
add(On.on(io, Manager.EVENT_OPEN, new Listener() {
@Override
public void call(Object... args) {
Socket.this.onopen();
}
}));
add(On.on(io, Manager.EVENT_PACKET, args -> {
Socket.this.onpacket((Packet<?>) args[0]);
add(On.on(io, Manager.EVENT_PACKET, new Listener() {
@Override
public void call(Object... args) {
Socket.this.onpacket((Packet<?>) args[0]);
}
}));
add(On.on(io, Manager.EVENT_ERROR, args -> {
if (!Socket.this.connected) {
Socket.super.emit(EVENT_CONNECT_ERROR, args[0]);
add(On.on(io, Manager.EVENT_ERROR, new Listener() {
@Override
public void call(Object... args) {
if (!Socket.this.connected) {
Socket.super.emit(EVENT_CONNECT_ERROR, args[0]);
}
}
}));
add(On.on(io, Manager.EVENT_CLOSE, args -> {
Socket.this.onclose(args.length > 0 ? (String) args[0] : null);
add(On.on(io, Manager.EVENT_CLOSE, new Listener() {
@Override
public void call(Object... args) {
Socket.this.onclose(args.length > 0 ? (String) args[0] : null);
}
}));
}};
}
Expand All @@ -109,12 +121,15 @@ public boolean isActive() {
* Connects the socket.
*/
public Socket open() {
EventThread.exec(() -> {
if (Socket.this.connected || Socket.this.io.isReconnecting()) return;
EventThread.exec(new Runnable() {
@Override
public void run() {
if (Socket.this.connected || Socket.this.io.isReconnecting()) return;

Socket.this.subEvents();
Socket.this.io.open(); // ensure open
if (Manager.ReadyState.OPEN == Socket.this.io.readyState) Socket.this.onopen();
Socket.this.subEvents();
Socket.this.io.open(); // ensure open
if (Manager.ReadyState.OPEN == Socket.this.io.readyState) Socket.this.onopen();
}
});
return this;
}
Expand All @@ -133,7 +148,12 @@ public Socket connect() {
* @return a reference to this object.
*/
public Socket send(final Object... args) {
EventThread.exec(() -> Socket.this.emit(EVENT_MESSAGE, args));
EventThread.exec(new Runnable() {
@Override
public void run() {
Socket.this.emit(EVENT_MESSAGE, args);
}
});
return this;
}

Expand All @@ -150,23 +170,26 @@ public Emitter emit(final String event, final Object... args) {
throw new RuntimeException("'" + event + "' is a reserved event name");
}

EventThread.exec(() -> {
Ack ack;
Object[] _args;
int lastIndex = args.length - 1;

if (args.length > 0 && args[lastIndex] instanceof Ack) {
_args = new Object[lastIndex];
for (int i = 0; i < lastIndex; i++) {
_args[i] = args[i];
EventThread.exec(new Runnable() {
@Override
public void run() {
Ack ack;
Object[] _args;
int lastIndex = args.length - 1;

if (args.length > 0 && args[lastIndex] instanceof Ack) {
_args = new Object[lastIndex];
for (int i = 0; i < lastIndex; i++) {
_args[i] = args[i];
}
ack = (Ack) args[lastIndex];
} else {
_args = args;
ack = null;
}
ack = (Ack) args[lastIndex];
} else {
_args = args;
ack = null;
}

emit(event, _args, ack);
Socket.this.emit(event, _args, ack);
}
});
return this;
}
Expand All @@ -180,52 +203,55 @@ public Emitter emit(final String event, final Object... args) {
* @return a reference to this object.
*/
public Emitter emit(final String event, final Object[] args, final Ack ack) {
EventThread.exec(() -> {
JSONArray jsonArgs = new JSONArray();
jsonArgs.put(event);
EventThread.exec(new Runnable() {
@Override
public void run() {
JSONArray jsonArgs = new JSONArray();
jsonArgs.put(event);

if (args != null) {
for (Object arg : args) {
jsonArgs.put(arg);
if (args != null) {
for (Object arg : args) {
jsonArgs.put(arg);
}
}
}

Packet<JSONArray> packet = new Packet<>(Parser.EVENT, jsonArgs);
Packet<JSONArray> packet = new Packet<>(Parser.EVENT, jsonArgs);

if (ack != null) {
final int ackId = Socket.this.ids;
if (ack != null) {
final int ackId = Socket.this.ids;

logger.fine(String.format("emitting packet with ack id %d", ackId));
logger.fine(String.format("emitting packet with ack id %d", ackId));

if (ack instanceof AckWithTimeout) {
final AckWithTimeout ackWithTimeout = (AckWithTimeout) ack;
ackWithTimeout.schedule(new TimerTask() {
@Override
public void run() {
// remove the ack from the map (to prevent an actual acknowledgement)
acks.remove(ackId);
if (ack instanceof AckWithTimeout) {
final AckWithTimeout ackWithTimeout = (AckWithTimeout) ack;
ackWithTimeout.schedule(new TimerTask() {
@Override
public void run() {
// remove the ack from the map (to prevent an actual acknowledgement)
acks.remove(ackId);

// remove the packet from the buffer (if applicable)
Iterator<Packet<JSONArray>> iterator = sendBuffer.iterator();
while (iterator.hasNext()) {
if (iterator.next().id == ackId) {
iterator.remove();
// remove the packet from the buffer (if applicable)
Iterator<Packet<JSONArray>> iterator = sendBuffer.iterator();
while (iterator.hasNext()) {
if (iterator.next().id == ackId) {
iterator.remove();
}
}

ackWithTimeout.onTimeout();
}
});
}

ackWithTimeout.onTimeout();
}
});
Socket.this.acks.put(ackId, ack);
packet.id = ids++;
}

Socket.this.acks.put(ackId, ack);
packet.id = ids++;
}

if (Socket.this.connected) {
Socket.this.packet(packet);
} else {
Socket.this.sendBuffer.add(packet);
if (Socket.this.connected) {
Socket.this.packet(packet);
} else {
Socket.this.sendBuffer.add(packet);
}
}
});
return this;
Expand Down Expand Up @@ -376,23 +402,31 @@ private void onevent(Packet<JSONArray> packet) {

private Ack ack(final int id) {
final Socket self = this;
final boolean[] sent = new boolean[]{false};
return args -> EventThread.exec(() -> {
if (sent[0]) return;
sent[0] = true;
if (logger.isLoggable(Level.FINE)) {
logger.fine(String.format("sending ack %s", args.length != 0 ? args : null));
}
final boolean[] sent = new boolean[] {false};
return new Ack() {
@Override
public void call(final Object... args) {
EventThread.exec(new Runnable() {
@Override
public void run() {
if (sent[0]) return;
sent[0] = true;
if (logger.isLoggable(Level.FINE)) {
logger.fine(String.format("sending ack %s", args.length != 0 ? args : null));
}

JSONArray jsonArgs = new JSONArray();
for (Object arg : args) {
jsonArgs.put(arg);
}
JSONArray jsonArgs = new JSONArray();
for (Object arg : args) {
jsonArgs.put(arg);
}

Packet<JSONArray> packet = new Packet<>(Parser.ACK, jsonArgs);
packet.id = id;
self.packet(packet);
});
Packet<JSONArray> packet = new Packet<>(Parser.ACK, jsonArgs);
packet.id = id;
self.packet(packet);
}
});
}
};
}

private void onack(Packet<JSONArray> packet) {
Expand Down
52 changes: 29 additions & 23 deletions src/test/java/io/socket/client/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,34 +54,40 @@ private Process startServerProcess(String script, int port) throws IOException {
return Runtime.getRuntime().exec(String.format(script, nsp()), createEnv(port));
}

private Future<?> startServerOutput(Process process, String serverName, CountDownLatch latch) {
return serverService.submit(() -> {
BufferedReader reader = new BufferedReader(
new InputStreamReader(process.getInputStream()));
String line;
try {
line = reader.readLine();
latch.countDown();
do {
logger.fine(serverName + " SERVER OUT: " + line);
} while ((line = reader.readLine()) != null);
} catch (IOException e) {
logger.warning(e.getMessage());
private Future<?> startServerOutput(final Process process, final String serverName, final CountDownLatch latch) {
return serverService.submit(new Runnable() {
@Override
public void run() {
BufferedReader reader = new BufferedReader(
new InputStreamReader(process.getInputStream()));
String line;
try {
line = reader.readLine();
latch.countDown();
do {
logger.fine(serverName + " SERVER OUT: " + line);
} while ((line = reader.readLine()) != null);
} catch (IOException e) {
logger.warning(e.getMessage());
}
}
});
}

private Future<?> startServerError(Process process, String serverName) {
return serverService.submit(() -> {
BufferedReader reader = new BufferedReader(
new InputStreamReader(process.getErrorStream()));
String line;
try {
while ((line = reader.readLine()) != null) {
logger.fine(serverName + " SERVER ERR: " + line);
private Future<?> startServerError(final Process process, final String serverName) {
return serverService.submit(new Runnable() {
@Override
public void run() {
BufferedReader reader = new BufferedReader(
new InputStreamReader(process.getErrorStream()));
String line;
try {
while ((line = reader.readLine()) != null) {
logger.fine(serverName + " SERVER ERR: " + line);
}
} catch (IOException e) {
logger.warning(e.getMessage());
}
} catch (IOException e) {
logger.warning(e.getMessage());
}
});
}
Expand Down
Loading