Skip to content

Commit

Permalink
Reimplement messaging to use JMS1 instead of JMS2.
Browse files Browse the repository at this point in the history
This allows us to work with Hornetq 2.3, and thus EAP.
  • Loading branch information
tobias committed Jul 22, 2015
1 parent e895a4a commit 7ec100f
Show file tree
Hide file tree
Showing 24 changed files with 809 additions and 474 deletions.
Expand Up @@ -20,24 +20,27 @@
import org.jboss.as.messaging.jms.JMSQueueService;
import org.jboss.as.messaging.jms.JMSServices;
import org.jboss.as.messaging.jms.JMSTopicService;
import org.jboss.msc.service.Service;
import org.jboss.msc.service.ServiceController;
import org.jboss.msc.service.ServiceName;
import org.jboss.msc.service.Service;
import org.jboss.msc.value.Value;
import org.projectodd.wunderboss.Options;
import org.projectodd.wunderboss.WunderBoss;
import org.projectodd.wunderboss.as.MSCService;
import org.projectodd.wunderboss.messaging.jms2.JMSDestination;
import org.projectodd.wunderboss.messaging.hornetq.HQMessaging;
import org.projectodd.wunderboss.messaging.jms.DestinationUtil;
import org.projectodd.wunderboss.messaging.jms.JMSDestination;
import org.slf4j.Logger;

import javax.jms.Queue;
import javax.jms.Topic;
import javax.naming.Context;
import javax.naming.NamingException;

import java.lang.reflect.Method;
import java.util.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

public class WildFlyMessaging extends HQMessaging {

Expand Down Expand Up @@ -65,7 +68,8 @@ protected Queue createQueue(final String name, final String selector, final bool
Queue queue = (Queue) waitForValueAvailabilityChange(
installService(JMSQueueService.class, name, this.mscService.serviceTarget(),
hqServiceName(),
selector, durable, new String[]{JMSDestination.jndiName(name, "queue")}),
selector, durable,
new String[]{DestinationUtil.jndiName(name, JMSDestination.Type.QUEUE)}),
false);
if (queue == null) {
throwTimeout("creation of queue " + name);
Expand All @@ -79,7 +83,7 @@ protected Topic createTopic(final String name) throws Exception {
Topic topic = (Topic) waitForValueAvailabilityChange(
installService(JMSTopicService.class, name, hqServiceName(),
this.mscService.serviceTarget(),
new String[]{JMSDestination.jndiName(name, "topic")}),
new String[]{DestinationUtil.jndiName(name, JMSDestination.Type.TOPIC)}),
false);
if (topic == null) {
throwTimeout("creation of topic " + name);
Expand Down Expand Up @@ -190,6 +194,6 @@ private Service installService(Class clazz, Object... args) throws Exception {
private final MSCService mscService;
private final Context context;

private final static Logger log = WunderBoss.logger("org.projectodd.wunderboss.wildfly");
private final static Logger log = WunderBoss.logger("org.projectodd.wunderboss.as");
private final static String TIMEOUT_PROP = "wunderboss.messaging.destination-availability-timeout";
}
Expand Up @@ -23,19 +23,19 @@
import org.hornetq.jms.server.JMSServerManager;
import org.projectodd.wunderboss.Options;
import org.projectodd.wunderboss.WunderBoss;
import org.projectodd.wunderboss.messaging.jms2.JMSDestination;
import org.projectodd.wunderboss.messaging.jms2.JMSMessaging;
import org.projectodd.wunderboss.messaging.jms2.JMSQueue;
import org.projectodd.wunderboss.messaging.jms2.JMSTopic;
import org.projectodd.wunderboss.messaging.jms.DestinationUtil;
import org.projectodd.wunderboss.messaging.jms.JMSDestination;
import org.projectodd.wunderboss.messaging.jms.JMSMessagingSkeleton;
import org.slf4j.Logger;

import javax.jms.ConnectionFactory;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class HQMessaging extends JMSMessaging {
public class HQMessaging extends JMSMessagingSkeleton {

public static final String REMOTE_TYPE_WILDFLY = "hornetq_wildfly";
public static final String REMOTE_TYPE_STANDALONE = "hornetq_standalone";
Expand Down Expand Up @@ -95,14 +95,17 @@ public String name() {

protected ConnectionFactory createRemoteConnectionFactory(final Options<CreateContextOption> options) {
//TODO: possibly cache the remote cf's?
Map<String, Object> transportOpts = new HashMap<>();
transportOpts.put("host", options.getString(CreateContextOption.HOST));
transportOpts.put("port", options.getInt(CreateContextOption.PORT));
if (options.has(CreateContextOption.REMOTE_TYPE)) {
transportOpts.put("http-upgrade-enabled",
REMOTE_TYPE_WILDFLY.equals(options.getString(CreateContextOption.REMOTE_TYPE)));
}

TransportConfiguration config =
new TransportConfiguration("org.hornetq.core.remoting.impl.netty.NettyConnectorFactory",
new HashMap() {{
put("host", options.getString(CreateContextOption.HOST));
put("port", options.getInt(CreateContextOption.PORT));
put("http-upgrade-enabled",
REMOTE_TYPE_WILDFLY.equals(options.getString(CreateContextOption.REMOTE_TYPE)));
}});
transportOpts);
HornetQConnectionFactory hornetQcf = HornetQJMSClient
.createConnectionFactoryWithoutHA(options.has(CreateContextOption.XA) ?
JMSFactoryType.XA_CF :
Expand All @@ -120,15 +123,15 @@ protected ConnectionFactory createRemoteConnectionFactory(final Options<CreateCo
protected javax.jms.Topic createTopic(String name) throws Exception {
this.server
.serverManager()
.createTopic(false, name, JMSDestination.jndiName(name, "topic"));
.createTopic(false, name, DestinationUtil.jndiName(name, JMSDestination.Type.TOPIC));

return lookupTopic(name);
}

protected javax.jms.Queue createQueue(String name, String selector, boolean durable) throws Exception {
this.server
.serverManager()
.createQueue(false, name, selector, durable, JMSDestination.jndiName(name, "queue"));
.createQueue(false, name, selector, durable, DestinationUtil.jndiName(name, JMSDestination.Type.QUEUE));

return lookupQueue(name);
}
Expand All @@ -140,8 +143,8 @@ protected javax.jms.Topic lookupTopic(String name) {
jndiNames.addAll(Arrays.asList(this.server.serverManager().getJNDIOnTopic(name)));
}
jndiNames.add(name);
jndiNames.add(JMSTopic.jmsName(name));
jndiNames.add(JMSDestination.jndiName(name, "topic"));
jndiNames.add(DestinationUtil.jmsName(name, JMSDestination.Type.TOPIC));
jndiNames.add(DestinationUtil.jndiName(name, JMSDestination.Type.TOPIC));

return (javax.jms.Topic)lookupJNDI(jndiNames);
}
Expand All @@ -153,8 +156,8 @@ protected javax.jms.Queue lookupQueue(String name) {
jndiNames.addAll(Arrays.asList(this.server.serverManager().getJNDIOnQueue(name)));
}
jndiNames.add(name);
jndiNames.add(JMSQueue.jmsName(name));
jndiNames.add(JMSDestination.jndiName(name, "queue"));
jndiNames.add(DestinationUtil.jmsName(name, JMSDestination.Type.QUEUE));
jndiNames.add(DestinationUtil.jndiName(name, JMSDestination.Type.QUEUE));

return (javax.jms.Queue)lookupJNDI(jndiNames);
}
Expand Down
Expand Up @@ -82,6 +82,12 @@
(defn create-topic [name]
(.findOrCreateTopic default name nil))

(defmacro throws-with-cause [clazz & body]
`(try
~@body
(catch Exception e#
(is (instance? ~clazz (.getCause e#))))))

(deftest queue-creation-publish-receive-close
(let [queue (create-queue "a-queue")]

Expand All @@ -97,16 +103,16 @@

;; a stopped queue should no longer be avaiable
(.stop queue)
(is (thrown? javax.jms.InvalidDestinationRuntimeException
(is (thrown? javax.jms.InvalidDestinationException
(.receive queue codecs (coerce-receive-options {:timeout 1}))))))

(deftest publish-should-use-the-passed-context
(let [c (.createContext default nil)
q (create-queue "publish-c")]
(.close c)
(is (thrown? javax.jms.IllegalStateRuntimeException
(.publish q "boom" None/INSTANCE
(coerce-publish-options {:context c}))))))
(throws-with-cause javax.jms.IllegalStateException
(.publish q "boom" None/INSTANCE
(coerce-publish-options {:context c})))))

(deftest publish-should-encode-with-the-given-codec-and-receive-should-find-the-right-one
(let [q (create-queue)]
Expand All @@ -121,42 +127,42 @@
(.request q "boom" None/INSTANCE codecs
(coerce-publish-options {:context c}))
(catch Exception e
(is (instance? javax.jms.IllegalStateRuntimeException (.getCause e)))))))
(is (instance? javax.jms.IllegalStateException (-> e .getCause .getCause)))))))

(deftest receive-should-use-the-passed-context
(let [c (.createContext default nil)
q (create-queue "receive-context")]
(.close c)
(is (thrown? javax.jms.IllegalStateRuntimeException
(.receive q codecs (coerce-receive-options {:context c}))))))
(throws-with-cause javax.jms.IllegalStateException
(.receive q codecs (coerce-receive-options {:context c})))))

(deftest listen-should-use-the-passed-context
(let [c (.createContext default (coerce-context-options {:host "localhost"}))
q (create-queue "listen-context")]
(.close c)
(is (thrown? javax.jms.IllegalStateRuntimeException
(.listen q (handler identity) codecs (coerce-listen-options {:context c}))))))
(throws-with-cause javax.jms.IllegalStateException
(.listen q (handler identity) codecs (coerce-listen-options {:context c})))))

(deftest respond-should-use-the-passed-context
(let [c (.createContext default (coerce-context-options {:host "localhost"}))
q (create-queue "listen-context")]
(.close c)
(is (thrown? javax.jms.IllegalStateRuntimeException
(.respond q (handler identity) codecs (coerce-listen-options {:context c}))))))
(throws-with-cause javax.jms.IllegalStateException
(.respond q (handler identity) codecs (coerce-listen-options {:context c})))))

(deftest subscribe-should-use-the-passed-context
(let [c (.createContext default (coerce-context-options {:client_id "ham"}))
t (create-topic "subscribe-context")]
(.close c)
(is (thrown? javax.jms.IllegalStateRuntimeException
(.subscribe t "ham" (handler identity) codecs (coerce-subscribe-options {:context c}))))))
(throws-with-cause javax.jms.IllegalStateException
(.subscribe t "ham" (handler identity) codecs (coerce-subscribe-options {:context c})))))

(deftest unsubscribe-should-use-the-passed-context
(let [c (.createContext default (coerce-context-options {:client_id "ham"}))
t (create-topic "subscribe-context")]
(.close c)
(is (thrown? javax.jms.IllegalStateRuntimeException
(.unsubscribe t "ham" (coerce-unsubscribe-options {:context c}))))))
(throws-with-cause javax.jms.IllegalStateException
(.unsubscribe t "ham" (coerce-unsubscribe-options {:context c})))))

(deftest closing-a-listener-should-work
(let [queue (create-queue "listen-queue")]
Expand Down
Expand Up @@ -14,20 +14,17 @@
* limitations under the License.
*/

package org.projectodd.wunderboss.messaging.jms2;

import org.projectodd.wunderboss.messaging.Message;
import org.projectodd.wunderboss.messaging.Response;
package org.projectodd.wunderboss.messaging;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class JMSResponse implements Response {
public class ConcreteResponse implements Response {

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
throw new UnsupportedOperationException("HornetQ response futures can't be cancelled.");
throw new UnsupportedOperationException("Messaging response futures can't be cancelled.");
}

@Override
Expand Down Expand Up @@ -65,6 +62,7 @@ public synchronized Message get(long timeout, TimeUnit unit) throws InterruptedE
return this.value;
}

@Override
public synchronized void deliver(Message value) {
if (this.value != null) {
throw new IllegalStateException("Delivery to a future that is already delivered.");
Expand Down
Expand Up @@ -16,7 +16,9 @@

package org.projectodd.wunderboss.messaging;

public interface Context extends AutoCloseable {
public interface Context extends AutoCloseable, HasCloseables {

String id();

public enum Mode { AUTO_ACK, CLIENT_ACK, TRANSACTED }

Expand All @@ -30,7 +32,5 @@ public enum Mode { AUTO_ACK, CLIENT_ACK, TRANSACTED }

boolean enlist() throws Exception;

void addCloseable(AutoCloseable closeable);

boolean isRemote();
}
Expand Up @@ -22,7 +22,7 @@

import java.util.Map;

public interface Destination {
public interface Destination extends HasCloseables {
String name();

class ListenOption extends Option {
Expand Down Expand Up @@ -56,5 +56,7 @@ class ReceiveOption extends MessageOpOption {

Message receive(Codecs codecs, Map<MessageOpOption, Object> options) throws Exception;

int defaultConcurrency();

void stop() throws Exception;
}
@@ -0,0 +1,23 @@
/*
* Copyright 2014-2015 Red Hat, Inc, and individual contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.projectodd.wunderboss.messaging;

public interface HasCloseables {
void addCloseable(Object closeable);

void closeCloseables() throws Exception;
}
Expand Up @@ -23,6 +23,10 @@ public interface Message {

String contentType();

String requestID();

String requestNodeID();

Map<String, Object> properties();

Destination endpoint();
Expand Down

0 comments on commit 7ec100f

Please sign in to comment.