Skip to content

Commit

Permalink
Support idle timeouts for async channels [IMMUTANT-523]
Browse files Browse the repository at this point in the history
This uses the built-in idle facilities for WebSockets, and provides a
reaper thread for HTTP streams.
  • Loading branch information
tobias committed Aug 4, 2015
1 parent cbd9cb7 commit 03e3c64
Show file tree
Hide file tree
Showing 6 changed files with 150 additions and 4 deletions.
Expand Up @@ -68,6 +68,7 @@ public void onError(WebSocketChannel _, Throwable error) {
@Override
public void setUnderlyingChannel(final Object channel) {
this.underlyingChannel = (WebSocketChannel) channel;
setTimeoutOnUnderlyingChannel();
}

@Override
Expand Down Expand Up @@ -117,6 +118,14 @@ public void onError(WebSocketChannel channel, Void context, Throwable throwable)
return true;
}

@Override
protected void setTimeoutOnUnderlyingChannel() {
if (this.underlyingChannel != null
&& idleTimeout() > -1) {
this.underlyingChannel.setIdleTimeout(idleTimeout());
}
}

@Override
public void close() {
if (isOpen()) {
Expand All @@ -134,4 +143,4 @@ protected void maybeCloseOnError(Throwable error) {
}

private WebSocketChannel underlyingChannel;
}
}
Expand Up @@ -31,6 +31,8 @@ public interface Channel extends Attachments {

void close() throws IOException;

void setIdleTimeout(long timeout);

interface OnOpen {
void handle(Channel channel, Object context);
}
Expand Down
@@ -0,0 +1,80 @@
/*
* Copyright 2015 Red Hat, Inc, and individual contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.projectodd.wunderboss.web.async;

import org.jboss.logging.Logger;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

public class IdleChannelReaper implements Runnable {

public final static IdleChannelReaper INSTANCE = new IdleChannelReaper();

@Override
public void run() {
log.debug("starting idle channel reaper thread");
while(true) {
if (Thread.currentThread().isInterrupted()) {
log.debug("exiting idle channel reaper thread");

return;
}

try {
checkChannels();
} catch (Exception e) {
e.printStackTrace();
}

try {
Thread.sleep(10);
} catch (InterruptedException _) {
Thread.currentThread().interrupt();
}
}

}

private synchronized void checkChannels() {
List<OutputStreamHttpChannel> removals = new ArrayList<>();
for (OutputStreamHttpChannel each : channels) {
if (each.closeIfIdleTimeoutExpired()) {
log.debug("closed idle " + each);
removals.add(each);
}
}

channels.removeAll(removals);
}

public synchronized void watchChannel(OutputStreamHttpChannel channel) {
log.debug("watching for idleness: " + channel);
this.channels.add(channel);
if (!this.running) {
(new Thread(this, "idle-channel-reaper")).start();
this.running = true;
}

}

private boolean running = false;
private final Set<OutputStreamHttpChannel> channels = new HashSet<>();
private final Logger log = Logger.getLogger("org.projectodd.wunderboss.web.async");
}
Expand Up @@ -26,7 +26,8 @@

public abstract class OutputStreamHttpChannel implements HttpChannel {

public OutputStreamHttpChannel(final OnOpen onOpen, final OnError onError, final OnClose onClose) {
public OutputStreamHttpChannel(final OnOpen onOpen, final OnError onError,
final OnClose onClose) {
this.onOpen = onOpen;
this.onError = onError;
this.onClose = onClose;
Expand Down Expand Up @@ -68,11 +69,11 @@ public void notifyError(Throwable error) {
}

protected void notifyClose() {
if (!closeNotified &&
if (!this.closeNotified &&
this.onClose != null) {
this.closeNotified = true;
this.onClose.handle(this, null, null);
}
closeNotified = true;
}

@Override
Expand Down Expand Up @@ -156,6 +157,7 @@ void enqueue(PendingSend data) {
protected void doSend(final byte[] data,
final boolean shouldClose,
final OnComplete onComplete) {
this.lastActive = System.currentTimeMillis();
Throwable ex = null;
try {
if (!headersSent) {
Expand Down Expand Up @@ -200,6 +202,27 @@ public void close() throws IOException {
notifyClose();
}

@Override
public void setIdleTimeout(long timeout) {
this.idleTimeout = timeout;

if (idleTimeout > 0) {
IdleChannelReaper.INSTANCE.watchChannel(this);
}
}

public boolean closeIfIdleTimeoutExpired() {
if (this.idleTimeout > 0 &&
isOpen() &&
this.lastActive + this.idleTimeout < System.currentTimeMillis()) {
closer.run();

return true;
}

return false;
}

protected Runnable closer = new Runnable() {
@Override
public void run() {
Expand All @@ -216,6 +239,8 @@ public void run() {
private boolean sendQueued = false;
private boolean headersSent = false;
private boolean closeNotified = false;
private long lastActive = System.currentTimeMillis();
private long idleTimeout;
private OutputStream stream;
private final OnOpen onOpen;
private final OnError onError;
Expand Down
Expand Up @@ -75,6 +75,7 @@ public void onError(final Session _, final Throwable error) {
@Override
public void setUnderlyingChannel(Object channel) {
this.session = (Session)channel;
setTimeoutOnUnderlyingChannel();
}

@Override
Expand Down Expand Up @@ -129,5 +130,13 @@ public void close() throws IOException {
}
}

@Override
protected void setTimeoutOnUnderlyingChannel() {
if (this.session != null
&& idleTimeout() > -1) {
this.session.setMaxIdleTimeout(idleTimeout());
}
}

private Session session;
}
Expand Up @@ -24,6 +24,8 @@
public abstract class WebsocketChannelSkeleton implements WebsocketChannel {


private long idleTimeout = -1;

public WebsocketChannelSkeleton(final OnOpen onOpen,
final OnError onError,
final OnClose onClose,
Expand Down Expand Up @@ -78,11 +80,30 @@ protected void notifyComplete(OnComplete callback, Throwable error) {
WebsocketUtil.notifyComplete(this, callback, error);
}

@Override
public void setIdleTimeout(long timeout) {
if (timeout < 0) {
throw new IllegalArgumentException("Idle timeout must be 0 or greater, was:" +
timeout);
}

this.idleTimeout = timeout;
setTimeoutOnUnderlyingChannel();
}

public long idleTimeout() {
return this.idleTimeout;
}

protected abstract void setTimeoutOnUnderlyingChannel();

private final OnOpen onOpen;
private final OnClose onClose;
private final OnMessage onMessage;
private final OnError onError;
private final Map<Object, Object> attachments = new ConcurrentHashMap<>();
private boolean closeNotified = false;
private boolean openNotified = false;


}

0 comments on commit 03e3c64

Please sign in to comment.