Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Add completion handler for send, and allow all channels to have error…
… handlers.

The completion handler will get an error if one occurs during a send. If
no completion handler is set for the send, or the completion handler
throws, the top-level error handler will be notified.
  • Loading branch information
tobias committed Jan 21, 2015
1 parent 9c6aaf0 commit 71d9a7d
Show file tree
Hide file tree
Showing 10 changed files with 111 additions and 60 deletions.
Expand Up @@ -21,11 +21,11 @@
public interface Channel {
void notifyOpen(Object context);

boolean isOpen();
void notifyError(Throwable error);

boolean send(Object message) throws Exception;
boolean isOpen();

boolean send(Object message, boolean shouldClose) throws Exception;
boolean send(Object message, boolean shouldClose, OnComplete callback) throws Exception;

void close() throws IOException;

Expand All @@ -34,6 +34,14 @@ interface OnOpen {
}

interface OnClose {
void handle(Channel channel, int code, String reason);
void handle(Channel channel, Object code, String reason);
}

interface OnError {
void handle(Channel channel, Throwable error);
}

interface OnComplete {
void handle(Throwable error);
}
}
Expand Up @@ -17,5 +17,4 @@
package org.projectodd.wunderboss.web.async;

public interface HttpChannel extends Channel {
static int NORMAL_CLOSURE = 1000;
}
Expand Up @@ -16,15 +16,14 @@

package org.projectodd.wunderboss.web.async;

import org.projectodd.wunderboss.web.async.websocket.Util;

import java.io.IOException;
import java.io.OutputStream;

public abstract class OutputStreamHttpChannel implements HttpChannel {

public OutputStreamHttpChannel(final OnOpen onOpen, final OnClose onClose) {
public OutputStreamHttpChannel(final OnOpen onOpen, final OnError onError, final OnClose onClose) {
this.onOpen = onOpen;
this.onError = onError;
this.onClose = onClose;
}

Expand All @@ -45,18 +44,22 @@ public void notifyOpen(final Object context) {
}

@Override
public boolean isOpen() {
return this.open;
public void notifyError(Throwable error) {
if (this.onError != null) {
this.onError.handle(this, error);
}
}

@Override
public boolean send(final Object message) throws Exception {
return send(message, true);
public boolean isOpen() {
return this.open;
}

// message must be String or byte[]. Allowing Object makes life easier from clojure
@Override
public boolean send(final Object message, final boolean shouldClose) throws Exception {
public boolean send(final Object message,
final boolean shouldClose,
final OnComplete onComplete) throws Exception {
if (!isOpen()) {
return false;
}
Expand All @@ -70,33 +73,37 @@ public boolean send(final Object message, final boolean shouldClose) throws Exce
throw Util.wrongMessageType(message.getClass());
}

if (!sendStarted) {
if (shouldClose) {
setContentLength(data.length);
}
this.stream = getOutputStream();
sendStarted = true;
}
Throwable ex = null;

try {
this.stream.write(data);
if (!shouldClose) {
this.stream.flush();
if (!sendStarted) {
if (shouldClose) {
setContentLength(data.length);
}
this.stream = getOutputStream();
sendStarted = true;
}
} catch (IOException e) {
// TODO: should we only deal with "Broken pipe" IOE's here? rethrow others?

try {
this.close();
} catch (IOException ignored) {
// undertow throws when you close with unwritten data,
// but the data can never be written - see UNDERTOW-368
this.stream.write(data);
if (!shouldClose) {
this.stream.flush();
}
} catch (IOException e) {
// TODO: should we only deal with "Broken pipe" IOE's here? rethrow others?
this.closer.run();
}
}

if (shouldClose) {
this.close();
if (shouldClose) {
this.closer.run();
}
} catch (Throwable e) {
this.closer.run();
ex = e;
}

Util.notifyComplete(this, onComplete, ex);

return true;
}

Expand All @@ -108,13 +115,26 @@ public void close() throws IOException {
this.open = false;

if (this.onClose != null) {
this.onClose.handle(this, NORMAL_CLOSURE, null);
this.onClose.handle(this, null, null);
}
}

protected Runnable closer = new Runnable() {
@Override
public void run() {
try {
close();
} catch (IOException ignored) {
// undertow throws when you close with unwritten data,
// but the data can never be written - see UNDERTOW-368
}
}
};

private boolean open = false;
private boolean sendStarted = false;
private OutputStream stream;
private final OnOpen onOpen;
private final OnError onError;
private final OnClose onClose;
}
Expand Up @@ -27,8 +27,9 @@ public class ServletHttpChannel extends OutputStreamHttpChannel {
public ServletHttpChannel(final HttpServletRequest request,
final HttpServletResponse response,
final OnOpen onOpen,
final OnError onError,
final OnClose onClose){
super(onOpen, onClose);
super(onOpen, onError, onClose);
this.response = response;
request.startAsync();
}
Expand Down
Expand Up @@ -24,8 +24,9 @@
public class UndertowHttpChannel extends OutputStreamHttpChannel {
public UndertowHttpChannel(final HttpServerExchange exchange,
final OnOpen onOpen,
final OnError onError,
final OnClose onClose) {
super(onOpen, onClose);
super(onOpen, onError, onClose);
this.exchange = exchange.setPersistent(true).dispatch();
}

Expand Down
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package org.projectodd.wunderboss.web.async.websocket;
package org.projectodd.wunderboss.web.async;

import javax.websocket.MessageHandler;

Expand All @@ -41,4 +41,16 @@ static public RuntimeException wrongMessageType(Class clazz) {
return new IllegalArgumentException("message is neither a String or byte[], but is " +
clazz.getName());
}

static public void notifyComplete(Channel channel, Channel.OnComplete callback, Throwable error) {
if (callback != null) {
try {
callback.handle(error);
} catch (Exception e) {
channel.notifyError(e);
}
} else if (error != null) {
channel.notifyError(error);
}
}
}
Expand Up @@ -16,6 +16,8 @@

package org.projectodd.wunderboss.web.async.websocket;

import org.projectodd.wunderboss.web.async.Util;

import javax.websocket.CloseReason;
import javax.websocket.Endpoint;
import javax.websocket.EndpointConfig;
Expand All @@ -29,10 +31,10 @@
public class JavaxWebsocketChannel extends WebsocketChannelSkeleton {

public JavaxWebsocketChannel(final OnOpen onOpen,
final OnError onError,
final OnClose onClose,
final OnMessage onMessage,
final OnError onError) {
super(onOpen, onClose, onMessage, onError);
final OnMessage onMessage) {
super(onOpen, onError, onClose, onMessage);
}

@Override
Expand Down Expand Up @@ -82,25 +84,27 @@ public boolean isOpen() {
}

@Override
public boolean send(Object message, final boolean shouldClose) throws Exception {
public boolean send(Object message,
final boolean shouldClose,
final OnComplete onComplete) throws Exception {
if (!isOpen()) {
return false;
}

SendHandler handler = new SendHandler() {
@Override
public void onResult(SendResult sendResult) {
Throwable ex = sendResult.getException();
if (sendResult.isOK()) {
if (shouldClose) {
try {
close();
} catch (IOException e) {
notifyError(e);
ex = e;
}
}
} else {
notifyError(sendResult.getException());
}
notifyComplete(onComplete, ex);
}
};

Expand Down
Expand Up @@ -22,17 +22,18 @@
import io.undertow.websockets.core.WebSocketChannel;
import io.undertow.websockets.core.WebSockets;
import io.undertow.websockets.spi.WebSocketHttpExchange;
import org.projectodd.wunderboss.web.async.Util;

import java.io.IOException;
import java.nio.ByteBuffer;

public class UndertowWebsocketChannel extends WebsocketChannelSkeleton {

public UndertowWebsocketChannel(final OnOpen onOpen,
final OnError onError,
final OnClose onClose,
final OnMessage onMessage,
final OnError onError) {
super(onOpen, onClose, onMessage, onError);
final OnMessage onMessage) {
super(onOpen, onError, onClose, onMessage);
}

@Override
Expand Down Expand Up @@ -75,26 +76,30 @@ public boolean isOpen() {
}

@Override
public boolean send(final Object message, final boolean shouldClose) throws Exception {
public boolean send(final Object message,
final boolean shouldClose,
final OnComplete onComplete) throws Exception {
if (!isOpen()) {
return false;
}

final WebSocketCallback<Void> callback = new WebSocketCallback<Void>() {
@Override
public void complete(WebSocketChannel channel, Void context) {
Exception ex = null;
if (shouldClose) {
try {
close();
} catch (IOException e) {
notifyError(e);
ex = e;
}
}
notifyComplete(onComplete, ex);
}

@Override
public void onError(WebSocketChannel channel, Void context, Throwable throwable) {
notifyError(throwable);
notifyComplete(onComplete, throwable);
}
};
if (message instanceof String) {
Expand Down
Expand Up @@ -28,8 +28,4 @@ public interface WebsocketChannel extends Channel {
interface OnMessage {
void handle(WebsocketChannel channel, Object message);
}

interface OnError {
void handle(WebsocketChannel channel, Throwable error);
}
}
Expand Up @@ -16,10 +16,15 @@

package org.projectodd.wunderboss.web.async.websocket;

import org.projectodd.wunderboss.web.async.Util;

public abstract class WebsocketChannelSkeleton implements WebsocketChannel {


public WebsocketChannelSkeleton(final OnOpen onOpen, final OnClose onClose, final OnMessage onMessage, final OnError onError) {
public WebsocketChannelSkeleton(final OnOpen onOpen,
final OnError onError,
final OnClose onClose,
final OnMessage onMessage) {
this.onOpen = onOpen;
this.onClose = onClose;
this.onMessage = onMessage;
Expand All @@ -46,8 +51,10 @@ public void notifyOpen(final Object context) {
}

@Override
public boolean send(Object message) throws Exception {
return send(message, false);
public void notifyError(Throwable error) {
if (this.onError != null) {
this.onError.handle(this, error);
}
}

protected void notifyClose(int code, String reason) {
Expand All @@ -64,10 +71,8 @@ protected void notifyMessage(Object message) {
}
}

protected void notifyError(Throwable error) {
if (this.onError != null) {
this.onError.handle(this, error);
}
protected void notifyComplete(OnComplete callback, Throwable error) {
Util.notifyComplete(this, callback, error);
}

private final OnOpen onOpen;
Expand Down

0 comments on commit 71d9a7d

Please sign in to comment.