Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Identify ResponseRouters by context id as well as queue [IMMUTANT-537]
  • Loading branch information
tobias committed Mar 17, 2015
1 parent 51856eb commit 3dff330
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 7 deletions.
Expand Up @@ -22,6 +22,7 @@
import javax.jms.JMSContext;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

public class HQContext implements HQSpecificContext {

Expand All @@ -44,6 +45,7 @@ public HQContext(JMSContext jmsContext,
this.mode = mode;
this.remote = remote;
this.parentContext = parent;
this.id = UUID.randomUUID().toString();

if (parentContext != null) {
parentContext.addCloseable(this);
Expand All @@ -67,6 +69,11 @@ public static int modeToJMSMode(Mode mode) {
return jmsMode;
}

@Override
public String id() {
return this.id;
}

@Override
public Mode mode() {
return this.mode;
Expand Down Expand Up @@ -153,6 +160,11 @@ public HQSpecificContext createChildContext(Mode mode) {
}

class NonClosing implements HQSpecificContext {
@Override
public String id() {
return HQContext.this.id();
}

@Override
public Mode mode() {
return HQContext.this.mode();
Expand Down Expand Up @@ -223,6 +235,7 @@ public HQSpecificContext createChildContext(Mode mode) {
}
}

private final String id;
private final Mode mode;
private final boolean remote;
private final JMSContext jmsContext;
Expand Down
Expand Up @@ -41,11 +41,13 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;

public class HQMessaging implements Messaging {

public static final String REMOTE_TYPE_WILDFLY = "hornetq_wildfly";
public static final String REMOTE_TYPE_STANDALONE = "hornetq_standalone";
public static final String BROKER_ID = UUID.randomUUID().toString();

public HQMessaging(String name, Options<CreateOption> options) {
this.name = name;
Expand Down
Expand Up @@ -73,15 +73,15 @@ public Response request(Object content, Codec codec,
Map<MessageOpOption, Object> options) throws Exception {
final Options<MessageOpOption> opts = new Options<>(options);
final String id = UUID.randomUUID().toString();
//TODO: there's probably a better way to get this
final String nodeId = System.getProperty("jboss.node.name", "node1");
final HQSpecificContext context = (HQSpecificContext)opts.get(MessageOpOption.CONTEXT);
final String nodeId = context != null ? context.id() : HQMessaging.BROKER_ID;
final HQResponse response = new HQResponse();
Options<ListenOption> routerOpts = new Options<>();
routerOpts.put(ListenOption.SELECTOR,
HQMessage.REQUEST_NODE_ID_PROPERTY + " = '" + nodeId + "' AND " +
HQMessage.SYNC_RESPONSE_PROPERTY + " = TRUE");
if (opts.has(MessageOpOption.CONTEXT)) {
routerOpts.put(ListenOption.CONTEXT, opts.get(MessageOpOption.CONTEXT));
if (context != null) {
routerOpts.put(ListenOption.CONTEXT, context);
}

ResponseRouter.routerFor(this, codecs, routerOpts).registerResponse(id, response);
Expand Down
Expand Up @@ -22,6 +22,8 @@
import javax.jms.JMSContext;

public interface HQSpecificContext extends Context {
String id();

JMSContext jmsContext();

Messaging broker();
Expand Down
Expand Up @@ -37,7 +37,7 @@ public ResponseRouter(String id) {
@Override
public Reply onMessage(Message msg, Context ignored) throws Exception {
String id = ((HQMessage)msg).requestID();
HQResponse response = responses.remove(id);
HQResponse response = this.responses.remove(id);
if (response == null) {
throw new IllegalStateException("No responder for id " + id);
}
Expand All @@ -54,14 +54,22 @@ public void registerResponse(String id, HQResponse response) {

public synchronized static ResponseRouter routerFor(HQQueue queue, Codecs codecs,
Options<Destination.ListenOption> options) {
ResponseRouter router = routers.get(queue.name());
String id = queue.name();
HQSpecificContext givenContext = (HQSpecificContext)options.get(Destination.ListenOption.CONTEXT);
if (givenContext != null) {
id += ":" + givenContext.id();
}
ResponseRouter router = routers.get(id);
if (router == null) {
router = new ResponseRouter(queue.name());
router = new ResponseRouter(id);
try {
router.setEnclosingListener(queue.listen(router, codecs, options));
} catch (Exception e) {
throw new RuntimeException(e);
}
if (givenContext != null) {
givenContext.addCloseable(router);
}
queue.broker().addCloseableForDestination(queue, router);
routers.put(router.id(), router);
}
Expand Down

0 comments on commit 3dff330

Please sign in to comment.