Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Get messaging working inside EAP.
  • Loading branch information
tobias committed Jul 24, 2015
1 parent 7ec100f commit f365313
Show file tree
Hide file tree
Showing 13 changed files with 603 additions and 289 deletions.
@@ -0,0 +1,67 @@
/*
* Copyright 2015 Red Hat, Inc, and individual contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.projectodd.wunderboss.as;

import org.jboss.msc.service.ServiceName;
import org.jboss.msc.service.ServiceTarget;
import org.jboss.msc.value.Value;
import org.projectodd.wunderboss.messaging.jms.JMSDestination;

import javax.jms.Queue;
import javax.jms.Topic;

public abstract class ASDestinationManager {
final static String TIMEOUT_PROP = "wunderboss.messaging.destination-availability-timeout";

public ASDestinationManager(ServiceTarget target, ServiceName hqServiceName) {
this.serviceTarget = target;
this.hqServiceName = hqServiceName;
}

public abstract Queue installQueueService(String name, String jndiName, String selector,
boolean durable) throws Exception;

public abstract Topic installTopicService(String name, String jndiName) throws Exception;

public abstract void removeDestination(Value service, String name, String jndiName, JMSDestination.Type type);

protected ServiceTarget target() {
return this.serviceTarget;
}

protected ServiceName hqServiceName() {
return this.hqServiceName;
}

protected long destinationServiceTimeout() {
String timeout = System.getProperty(TIMEOUT_PROP);
if (timeout != null) {
return Long.parseLong(timeout);
} else {
return 60000;
}
}

protected void throwTimeout(String message) {
throw new RuntimeException("Gave up waiting for " + message + " after " +
destinationServiceTimeout() + "ms. If that time is too short, you can adjust with the " +
TIMEOUT_PROP + " system property.");
}

private final ServiceTarget serviceTarget;
private final ServiceName hqServiceName;
}
@@ -0,0 +1,121 @@
/*
* Copyright 2014-2015 Red Hat, Inc, and individual contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.projectodd.wunderboss.as;

import org.jboss.as.messaging.jms.JMSServices;
import org.jboss.msc.service.ServiceController;
import org.projectodd.wunderboss.Options;
import org.projectodd.wunderboss.WunderBoss;
import org.projectodd.wunderboss.messaging.hornetq.HQMessaging;
import org.projectodd.wunderboss.messaging.jms.DestinationUtil;
import org.projectodd.wunderboss.messaging.jms.JMSDestination;
import org.slf4j.Logger;

import javax.jms.Queue;
import javax.jms.Topic;
import javax.naming.Context;
import javax.naming.NamingException;

public class ASMessaging extends HQMessaging {

public ASMessaging(String name, MSCService service,
ASDestinationManager destinationManager, Options<CreateOption> options) {
super(name, options);
this.mscService = service;
this.context = service.namingContext();
this.destinationManager = destinationManager;
}

@Override
public synchronized void start() throws Exception {
started = true;
}

@Override
public synchronized void stop() throws Exception {
if (started) {
closeCloseables();
started = false;
}
}

@Override
protected Queue createQueue(final String name, final String selector, final boolean durable) throws Exception {
return this.destinationManager.installQueueService(name,
DestinationUtil.jndiName(name, JMSDestination.Type.QUEUE),
selector, durable);
}

@Override
protected Topic createTopic(final String name) throws Exception {
return this.destinationManager.installTopicService(name,
DestinationUtil.jndiName(name, JMSDestination.Type.TOPIC));
}

@Override
protected void destroyQueue(final String name) {
ServiceController controller = this.mscService.serviceRegistry()
.getService(JMSServices.getJmsQueueBaseServiceName(MSCService.hqServiceName()).append(name));
controller.setMode(ServiceController.Mode.REMOVE);

this.destinationManager.removeDestination(controller, name,
DestinationUtil.jndiName(name, JMSDestination.Type.QUEUE),
JMSDestination.Type.QUEUE);
}

@Override
protected void destroyTopic(final String name) {
ServiceController controller = this.mscService.serviceRegistry()
.getService(JMSServices.getJmsTopicBaseServiceName(MSCService.hqServiceName()).append(name));
controller.setMode(ServiceController.Mode.REMOVE);

this.destinationManager.removeDestination(controller, name,
DestinationUtil.jndiName(name, JMSDestination.Type.TOPIC),
JMSDestination.Type.TOPIC);
}

@Override
protected Object lookupJNDI(String jndiName) {
return lookupJNDIWithRetry(jndiName, 0);
}

private Object lookupJNDIWithRetry(String jndiName, int attempt) {
try {
return context.lookup(jndiName);
} catch (NamingException ex) {
if (ex.getCause() instanceof IllegalStateException
&& attempt < 100) {
//TODO: do this a better way
//the destination isn't yet available
try {
Thread.sleep(10);
} catch (InterruptedException ignored) {}

return lookupJNDIWithRetry(jndiName, attempt + 1);
}

return null;
}
}

private final MSCService mscService;
private final Context context;
private final ASDestinationManager destinationManager;

private final static Logger log = WunderBoss.logger("org.projectodd.wunderboss.as");

}
@@ -0,0 +1,44 @@
/*
* Copyright 2014-2015 Red Hat, Inc, and individual contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.projectodd.wunderboss.as;

import org.projectodd.wunderboss.ComponentProvider;
import org.projectodd.wunderboss.Options;
import org.projectodd.wunderboss.WunderBoss;
import org.projectodd.wunderboss.as.eap.EAPDestinationManager;
import org.projectodd.wunderboss.as.wildfly.WildFlyDestinationManager;
import org.projectodd.wunderboss.messaging.Messaging;

public class ASMessagingProvider implements ComponentProvider<Messaging> {

@Override
public Messaging create(String name, Options options) {
final MSCService service = (MSCService)WunderBoss.options().get(MSCService.KEY);
ASDestinationManager destManager;

if (ASUtils.containerType() == ASUtils.ContainerType.EAP) {
destManager = new EAPDestinationManager(service.serviceTarget(),
MSCService.hqServiceName(),
service.namingContext());
} else {
destManager = new WildFlyDestinationManager(service.serviceTarget(),
MSCService.hqServiceName());
}

return new ASMessaging(name, service, destManager, options);
}
}

0 comments on commit f365313

Please sign in to comment.