Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Allow sync sends to HTTP streams for WildFly 9.0.0.Alpha1 and up.
This uses the thread pool provided by AsyncContext.start() instead
of creating a component ThreadPool via WunderBoss.

We grab the WF version via JMX and stuff it in WunderBoss.options for
later use.
  • Loading branch information
tobias committed Jan 27, 2015
1 parent 2729289 commit 1158fca
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 43 deletions.
Expand Up @@ -20,7 +20,6 @@
import java.io.OutputStream;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;

public abstract class OutputStreamHttpChannel implements HttpChannel {
Expand All @@ -37,7 +36,7 @@ public OutputStreamHttpChannel(final OnOpen onOpen, final OnError onError, final

protected abstract OutputStream getOutputStream() throws IOException;

protected abstract Executor getExecutor();
protected abstract void execute(Runnable runnable);

@Override
public void notifyOpen(final Object context) {
Expand Down Expand Up @@ -88,39 +87,39 @@ protected void send(PendingSend pending) {
doSend(pending.message, pending.shouldClose, pending.onComplete);
}

void enqueue(PendingSend data) {
//TODO: convert to do/while?
final Runnable worker = new Runnable() {
@Override
public void run() {
PendingSend pending;
//TODO: convert to do/while?
private final Runnable pumpWorker = new Runnable() {
@Override
public void run() {
PendingSend pending;
synchronized (workerRunning) {
pending = queue.poll();
if (pending == null) {
workerRunning.set(false);
}
}
while (pending != null) {
try {
send(pending);
} catch (Exception e) {
e.printStackTrace();
}
synchronized (workerRunning) {
pending = queue.poll();
if (pending == null) {
workerRunning.set(false);
}
}
while (pending != null) {
try {
send(pending);
} catch (Exception e) {
e.printStackTrace();
}
synchronized (workerRunning) {
pending = queue.poll();
if (pending == null) {
workerRunning.set(false);
}
}
}

}
};

}
};

void enqueue(PendingSend data) {
synchronized (workerRunning) {
queue.add(data);
if (workerRunning.compareAndSet(false, true)) {
getExecutor().execute(worker);
execute(pumpWorker);
}
}
}
Expand Down
Expand Up @@ -17,15 +17,15 @@
package org.projectodd.wunderboss.web.async;

import io.undertow.util.Headers;
import org.projectodd.wunderboss.ThreadPool;
import org.jboss.logging.Logger;
import org.projectodd.wunderboss.WunderBoss;

import javax.servlet.AsyncContext;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.OutputStream;
import java.util.concurrent.Executor;
import java.lang.management.ManagementFactory;

public class ServletHttpChannel extends OutputStreamHttpChannel {

Expand Down Expand Up @@ -56,21 +56,41 @@ protected OutputStream getOutputStream() throws IOException {
}

@Override
protected Executor getExecutor() {
if (this.executor == null){
this.executor = WunderBoss.findOrCreateComponent(ThreadPool.class,
"http-stream-worker",
null);
protected void execute(Runnable runnable) {
this.asyncContext.start(runnable);
}

static boolean isAsync() {
if (asyncSupported == null) {
asyncSupported = false;
String version = WunderBoss.options().getString("wildfly-version", "");
String[] parts = version.split("\\.");
if (parts.length > 0) {
try {
if (Integer.parseInt(parts[0]) >= 9) {
asyncSupported = true;
}
} catch (NumberFormatException _) {
}
}

if (!asyncSupported) {
log.warn("NOTE: HTTP stream sends are synchronous in WildFly " + version +
". Use 9.0.0.Alpha1 or higher to have asynchronous sends.");
}
}

return this.executor;
return asyncSupported;
}


protected void enqueue(PendingSend pending) {
//TODO: be async in 9.x, sync in 8.x (due to https://issues.jboss.org/browse/WFLY-3715)
//super.enqueue(pending); // async
send(pending); // sync
@Override
void enqueue(PendingSend pending) {
//be async in 9.x, sync in 8.x (due to https://issues.jboss.org/browse/WFLY-3715)
if (isAsync()) {
super.enqueue(pending); // async
} else {
send(pending); // sync
}
}

@Override
Expand All @@ -81,5 +101,7 @@ public void close() throws IOException {

private final HttpServletResponse response;
private final AsyncContext asyncContext;
private Executor executor;
private static Boolean asyncSupported;

private static final Logger log = Logger.getLogger("org.projectodd.wunderboss.web.async");
}
Expand Up @@ -19,7 +19,6 @@
import io.undertow.server.HttpServerExchange;

import java.io.OutputStream;
import java.util.concurrent.Executor;

public class UndertowHttpChannel extends OutputStreamHttpChannel {
public UndertowHttpChannel(final HttpServerExchange exchange,
Expand Down Expand Up @@ -53,8 +52,11 @@ protected OutputStream getOutputStream() {
}

@Override
protected Executor getExecutor() {
return this.exchange.getConnection().getWorker();
protected void execute(Runnable runnable) {
this.exchange
.getConnection()
.getWorker()
.execute(runnable);
}

private final HttpServerExchange exchange;
Expand Down
Expand Up @@ -27,11 +27,16 @@
import org.projectodd.wunderboss.WunderBoss;
import org.projectodd.wunderboss.caching.Caching;
import org.projectodd.wunderboss.messaging.Messaging;
import org.projectodd.wunderboss.transactions.Transaction;
import org.projectodd.wunderboss.singleton.SingletonContext;
import org.projectodd.wunderboss.transactions.Transaction;

import javax.management.MBeanException;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import javax.management.OperationsException;
import javax.management.ReflectionException;
import javax.naming.Context;
import javax.naming.InitialContext;
import java.lang.management.ManagementFactory;

public class WildFlyService implements Service<WildFlyService> {
public static final String KEY = "wildfly-service";
Expand All @@ -53,6 +58,7 @@ public WildFlyService(String deploymentName, ServiceRegistry registry, ServiceTa
// TODO: Get rid of these options and just make them statics here
WunderBoss.putOption("deployment-name", this.deploymentName);
WunderBoss.putOption("service-registry", this.serviceRegistry);
WunderBoss.putOption("wildfly-version", getWildFlyVersion());
WunderBoss.putOption(KEY, this);
}

Expand Down Expand Up @@ -86,6 +92,27 @@ public WildFlyService getValue() throws IllegalStateException, IllegalArgumentEx
return this;
}

String getWildFlyVersion() {
//TODO: what about EAP?
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
String version = null;
try {
ObjectName name = new ObjectName("jboss.as:management-root=server");
// 9.x stores it under "productVersion"
version = (String)mbs.getAttribute(name, "productVersion");
if (version == null) {
// 8.x stores it under "releaseVersion"
version = (String)mbs.getAttribute(name, "releaseVersion");
}
} catch (OperationsException |
MBeanException |
ReflectionException ffs) {
ffs.printStackTrace();
}

return version;
}

public ServiceTarget serviceTarget() {
return this.serviceTarget;
}
Expand Down

0 comments on commit 1158fca

Please sign in to comment.