Skip to content

Commit

Permalink
Use underlying mechanisms for any channel timeout [IMMUTANT-523]
Browse files Browse the repository at this point in the history
This does away with the reaper thread, but treats timeout differently,
depending on the channel type:

* websockets use it as an idle timeout, so since the last message
  received or send call
* http streams use it as an absolute timeout from opening
  • Loading branch information
tobias committed Aug 6, 2015
1 parent 42be74b commit 8de3c16
Show file tree
Hide file tree
Showing 8 changed files with 90 additions and 164 deletions.
Expand Up @@ -19,8 +19,11 @@
import io.undertow.server.HttpServerExchange;
import io.undertow.server.ServerConnection;
import org.projectodd.wunderboss.web.async.OutputStreamHttpChannel;
import org.xnio.XnioExecutor;

import java.io.IOException;
import java.io.OutputStream;
import java.util.concurrent.TimeUnit;

public class UndertowHttpChannel extends OutputStreamHttpChannel {
public UndertowHttpChannel(final HttpServerExchange exchange,
Expand All @@ -42,6 +45,34 @@ public boolean asyncSendSupported() {
return true;
}

@Override
public void close() throws IOException {
try {
super.close();
} finally {
cancelTimeout();
}
}

private boolean cancelTimeout() {
XnioExecutor.Key key = this.timeoutKey;
if (key != null) {
return key.remove();
}

return true;
}

@Override
public void setTimeout(long timeout) {
if (!cancelTimeout()) {
return;
}
if (timeout > 0) {
this.timeoutKey = exchange.getIoThread().executeAfter(closer, timeout, TimeUnit.MILLISECONDS);
}
}

@Override
protected String getResponseCharset() {
// getResponseCharset claims to return ISO-8859-1 if a charset can't be found,
Expand Down Expand Up @@ -73,5 +104,5 @@ protected void execute(Runnable runnable) {
}

private final HttpServerExchange exchange;

private XnioExecutor.Key timeoutKey;
}
Expand Up @@ -68,6 +68,7 @@ public void onError(WebSocketChannel _, Throwable error) {
@Override
public void setUnderlyingChannel(final Object channel) {
this.underlyingChannel = (WebSocketChannel) channel;
setTimeoutOnUnderlyingChannel();
}

@Override
Expand All @@ -84,8 +85,6 @@ public boolean send(final Object message,
return false;
}

updateLastActive();

final WebSocketCallback<Void> callback = new WebSocketCallback<Void>() {
@Override
public void complete(WebSocketChannel channel, Void context) {
Expand Down Expand Up @@ -129,11 +128,24 @@ public void close() {
notifyClose(CloseMessage.NORMAL_CLOSURE, "");
}

@Override
public void setTimeout(long timeout) {
this.timeout = timeout;
setTimeoutOnUnderlyingChannel();
}

private void setTimeoutOnUnderlyingChannel() {
if (this.underlyingChannel != null &&
this.timeout >= 0) {
this.underlyingChannel.setIdleTimeout(timeout);
}
}
protected void maybeCloseOnError(Throwable error) {
if (error instanceof IOException) {
close();
}
}

private WebSocketChannel underlyingChannel;
}
private long timeout = -1;
}
Expand Up @@ -31,9 +31,7 @@ public interface Channel extends Attachments {

void close() throws IOException;

void setIdleTimeout(long timeout);

boolean closeIfIdleTimeoutExpired();
void setTimeout(long timeout);

interface OnOpen {
void handle(Channel channel, Object context);
Expand Down

This file was deleted.

Expand Up @@ -157,7 +157,6 @@ void enqueue(PendingSend data) {
protected void doSend(final byte[] data,
final boolean shouldClose,
final OnComplete onComplete) {
this.lastActive = System.currentTimeMillis();
Throwable ex = null;
try {
if (!headersSent) {
Expand Down Expand Up @@ -202,28 +201,6 @@ public void close() throws IOException {
notifyClose();
}

@Override
public void setIdleTimeout(long timeout) {
this.idleTimeout = timeout;

if (idleTimeout > 0) {
IdleChannelReaper.INSTANCE.watchChannel(this);
}
}

@Override
public boolean closeIfIdleTimeoutExpired() {
if (this.idleTimeout > 0 &&
isOpen() &&
this.lastActive + this.idleTimeout < System.currentTimeMillis()) {
closer.run();

return true;
}

return false;
}

protected Runnable closer = new Runnable() {
@Override
public void run() {
Expand All @@ -240,8 +217,6 @@ public void run() {
private boolean sendQueued = false;
private boolean headersSent = false;
private boolean closeNotified = false;
private long lastActive = System.currentTimeMillis();
private long idleTimeout = -1;
private OutputStream stream;
private final OnOpen onOpen;
private final OnError onError;
Expand Down
Expand Up @@ -19,6 +19,8 @@
import org.jboss.logging.Logger;

import javax.servlet.AsyncContext;
import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
Expand All @@ -37,6 +39,24 @@ public ServletHttpChannel(final HttpServletRequest request,
this.asyncContext = request.startAsync();
this.asyncContext.setTimeout(0);
this.asyncSupported = asyncSupported;
this.asyncContext.addListener(new AsyncListener() {
@Override
public void onComplete(AsyncEvent event) throws IOException {
}

@Override
public void onTimeout(AsyncEvent event) throws IOException {
close();
}

@Override
public void onError(AsyncEvent event) throws IOException {
}

@Override
public void onStartAsync(AsyncEvent event) throws IOException {
}
});
}

@Override
Expand Down Expand Up @@ -80,6 +100,13 @@ public void close() throws IOException {
super.close();
}

@Override
public void setTimeout(long timeout) {
if (timeout >= 0) {
this.asyncContext.setTimeout(timeout);
}
}

private final HttpServletResponse response;
private final AsyncContext asyncContext;
private final boolean asyncSupported;
Expand Down
Expand Up @@ -75,6 +75,7 @@ public void onError(final Session _, final Throwable error) {
@Override
public void setUnderlyingChannel(Object channel) {
this.session = (Session)channel;
setTimeoutOnSession();
}

@Override
Expand All @@ -91,8 +92,6 @@ public boolean send(Object message,
return false;
}

updateLastActive();

SendHandler handler = new SendHandler() {
@Override
public void onResult(SendResult sendResult) {
Expand Down Expand Up @@ -131,5 +130,19 @@ public void close() throws IOException {
}
}

@Override
public void setTimeout(long timeout) {
this.timeout = timeout;
setTimeoutOnSession();
}

private void setTimeoutOnSession() {
if (this.session != null &&
this.timeout >= 0) {
this.session.setMaxIdleTimeout(this.timeout);
}
}

private Session session;
private long timeout = -1;
}
Expand Up @@ -16,7 +16,6 @@

package org.projectodd.wunderboss.web.async.websocket;

import org.projectodd.wunderboss.web.async.IdleChannelReaper;
import org.projectodd.wunderboss.web.async.WebsocketUtil;

import java.io.IOException;
Expand Down Expand Up @@ -71,7 +70,6 @@ protected void notifyClose(int code, String reason) {
}

protected void notifyMessage(Object message) {
updateLastActive();
if (this.onMessage != null) {
this.onMessage.handle(this, message);
}
Expand All @@ -81,47 +79,11 @@ protected void notifyComplete(OnComplete callback, Throwable error) {
WebsocketUtil.notifyComplete(this, callback, error);
}

@Override
public void setIdleTimeout(long timeout) {
this.idleTimeout = timeout;

if (idleTimeout > 0) {
IdleChannelReaper.INSTANCE.watchChannel(this);
}
}

@Override
public boolean closeIfIdleTimeoutExpired() {
if (this.idleTimeout > 0 &&
isOpen() &&
this.lastActive + this.idleTimeout < System.currentTimeMillis()) {
try {
close();
} catch (IOException ignored) {}

return true;
}

return false;
}

protected void updateLastActive() {
this.lastActive = System.currentTimeMillis();
}

public long idleTimeout() {
return this.idleTimeout;
}

private final OnOpen onOpen;
private final OnClose onClose;
private final OnMessage onMessage;
private final OnError onError;
private final Map<Object, Object> attachments = new ConcurrentHashMap<>();
private long idleTimeout = -1;
private long lastActive = System.currentTimeMillis();
private boolean closeNotified = false;
private boolean openNotified = false;


}

0 comments on commit 8de3c16

Please sign in to comment.