Navigation Menu

Skip to content

Commit

Permalink
Don't use the built-in idle timeout for websockets
Browse files Browse the repository at this point in the history
Under EAP, the timeout appears to be the minimum - the actual timeout
can fire many seconds later, which is too wide of a margin. So we
now use the reaper thread for all channel timeouts.
  • Loading branch information
tobias committed Aug 6, 2015
1 parent 03e3c64 commit 44666b5
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 32 deletions.
Expand Up @@ -68,7 +68,6 @@ public void onError(WebSocketChannel _, Throwable error) {
@Override
public void setUnderlyingChannel(final Object channel) {
this.underlyingChannel = (WebSocketChannel) channel;
setTimeoutOnUnderlyingChannel();
}

@Override
Expand All @@ -85,6 +84,8 @@ public boolean send(final Object message,
return false;
}

updateLastActive();

final WebSocketCallback<Void> callback = new WebSocketCallback<Void>() {
@Override
public void complete(WebSocketChannel channel, Void context) {
Expand Down Expand Up @@ -118,14 +119,6 @@ public void onError(WebSocketChannel channel, Void context, Throwable throwable)
return true;
}

@Override
protected void setTimeoutOnUnderlyingChannel() {
if (this.underlyingChannel != null
&& idleTimeout() > -1) {
this.underlyingChannel.setIdleTimeout(idleTimeout());
}
}

@Override
public void close() {
if (isOpen()) {
Expand Down
Expand Up @@ -33,6 +33,8 @@ public interface Channel extends Attachments {

void setIdleTimeout(long timeout);

boolean closeIfIdleTimeoutExpired();

interface OnOpen {
void handle(Channel channel, Object context);
}
Expand Down
Expand Up @@ -53,8 +53,8 @@ public void run() {
}

private synchronized void checkChannels() {
List<OutputStreamHttpChannel> removals = new ArrayList<>();
for (OutputStreamHttpChannel each : channels) {
List<Channel> removals = new ArrayList<>();
for (Channel each : channels) {
if (each.closeIfIdleTimeoutExpired()) {
log.debug("closed idle " + each);
removals.add(each);
Expand All @@ -64,7 +64,7 @@ private synchronized void checkChannels() {
channels.removeAll(removals);
}

public synchronized void watchChannel(OutputStreamHttpChannel channel) {
public synchronized void watchChannel(Channel channel) {
log.debug("watching for idleness: " + channel);
this.channels.add(channel);
if (!this.running) {
Expand All @@ -75,6 +75,6 @@ public synchronized void watchChannel(OutputStreamHttpChannel channel) {
}

private boolean running = false;
private final Set<OutputStreamHttpChannel> channels = new HashSet<>();
private final Set<Channel> channels = new HashSet<>();
private final Logger log = Logger.getLogger("org.projectodd.wunderboss.web.async");
}
Expand Up @@ -211,6 +211,7 @@ public void setIdleTimeout(long timeout) {
}
}

@Override
public boolean closeIfIdleTimeoutExpired() {
if (this.idleTimeout > 0 &&
isOpen() &&
Expand Down Expand Up @@ -240,7 +241,7 @@ public void run() {
private boolean headersSent = false;
private boolean closeNotified = false;
private long lastActive = System.currentTimeMillis();
private long idleTimeout;
private long idleTimeout = -1;
private OutputStream stream;
private final OnOpen onOpen;
private final OnError onError;
Expand Down
Expand Up @@ -75,7 +75,6 @@ public void onError(final Session _, final Throwable error) {
@Override
public void setUnderlyingChannel(Object channel) {
this.session = (Session)channel;
setTimeoutOnUnderlyingChannel();
}

@Override
Expand All @@ -92,6 +91,8 @@ public boolean send(Object message,
return false;
}

updateLastActive();

SendHandler handler = new SendHandler() {
@Override
public void onResult(SendResult sendResult) {
Expand Down Expand Up @@ -130,13 +131,5 @@ public void close() throws IOException {
}
}

@Override
protected void setTimeoutOnUnderlyingChannel() {
if (this.session != null
&& idleTimeout() > -1) {
this.session.setMaxIdleTimeout(idleTimeout());
}
}

private Session session;
}
Expand Up @@ -16,16 +16,16 @@

package org.projectodd.wunderboss.web.async.websocket;

import org.projectodd.wunderboss.web.async.IdleChannelReaper;
import org.projectodd.wunderboss.web.async.WebsocketUtil;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public abstract class WebsocketChannelSkeleton implements WebsocketChannel {


private long idleTimeout = -1;

public WebsocketChannelSkeleton(final OnOpen onOpen,
final OnError onError,
final OnClose onClose,
Expand Down Expand Up @@ -71,6 +71,7 @@ protected void notifyClose(int code, String reason) {
}

protected void notifyMessage(Object message) {
updateLastActive();
if (this.onMessage != null) {
this.onMessage.handle(this, message);
}
Expand All @@ -82,26 +83,43 @@ protected void notifyComplete(OnComplete callback, Throwable error) {

@Override
public void setIdleTimeout(long timeout) {
if (timeout < 0) {
throw new IllegalArgumentException("Idle timeout must be 0 or greater, was:" +
timeout);
this.idleTimeout = timeout;

if (idleTimeout > 0) {
IdleChannelReaper.INSTANCE.watchChannel(this);
}
}

this.idleTimeout = timeout;
setTimeoutOnUnderlyingChannel();
@Override
public boolean closeIfIdleTimeoutExpired() {
if (this.idleTimeout > 0 &&
isOpen() &&
this.lastActive + this.idleTimeout < System.currentTimeMillis()) {
try {
close();
} catch (IOException ignored) {}

return true;
}

return false;
}

protected void updateLastActive() {
this.lastActive = System.currentTimeMillis();
}

public long idleTimeout() {
return this.idleTimeout;
}

protected abstract void setTimeoutOnUnderlyingChannel();

private final OnOpen onOpen;
private final OnClose onClose;
private final OnMessage onMessage;
private final OnError onError;
private final Map<Object, Object> attachments = new ConcurrentHashMap<>();
private long idleTimeout = -1;
private long lastActive = System.currentTimeMillis();
private boolean closeNotified = false;
private boolean openNotified = false;

Expand Down

0 comments on commit 44666b5

Please sign in to comment.