Skip to content

Commit

Permalink
Default concurrency based on destination type [IMMUTANT-360]
Browse files Browse the repository at this point in the history
  • Loading branch information
jcrossley3 committed Apr 7, 2015
1 parent 2e59311 commit eafab77
Show file tree
Hide file tree
Showing 5 changed files with 14 additions and 2 deletions.
Expand Up @@ -27,7 +27,7 @@ public interface Destination {

class ListenOption extends Option {
public static final ListenOption CONTEXT = opt("context", ListenOption.class);
public static final ListenOption CONCURRENCY = opt("concurrency", Runtime.getRuntime().availableProcessors(), ListenOption.class);
public static final ListenOption CONCURRENCY = opt("concurrency", ListenOption.class);
public static final ListenOption SELECTOR = opt("selector", ListenOption.class);
public static final ListenOption MODE = opt("mode", Context.Mode.TRANSACTED, ListenOption.class);
}
Expand Down
Expand Up @@ -57,6 +57,8 @@ public Destination jmsDestination() {

public abstract String jmsName();

public abstract int defaultConcurrency();

@Override
public Listener listen(MessageHandler handler, Codecs codecs, Map<ListenOption, Object> options) throws Exception {
Options<ListenOption> opts = new Options<>(options);
Expand Down
Expand Up @@ -118,4 +118,8 @@ public String fullName() {
return fullName(name());
}

@Override
public int defaultConcurrency() {
return Runtime.getRuntime().availableProcessors();
}
}
Expand Up @@ -107,6 +107,11 @@ public Destination jmsDestination() {
return broker().lookupTopic(name());
}

@Override
public int defaultConcurrency() {
return 1;
}

protected HQSpecificContext context(final String id, final Object context) throws Exception {
if (context != null) {
return ((HQSpecificContext)context).asNonCloseable();
Expand Down
Expand Up @@ -45,7 +45,8 @@ public MessageHandlerGroup(HQSpecificContext context,

public synchronized MessageHandlerGroup start() throws Exception {
if (!this.started) {
int concurrency = this.options.getInt(ListenOption.CONCURRENCY);
Integer option = this.options.getInt(ListenOption.CONCURRENCY);
int concurrency = option != null ? option : this.destination.defaultConcurrency();
log.info("Starting listener for '" + this.destination.name() + "' concurrency=" + concurrency);
while(concurrency-- > 0) {
HQSpecificContext subContext =
Expand Down

0 comments on commit eafab77

Please sign in to comment.