Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Address intermittent AtomJMSIT test errors
  • Loading branch information
Andrew Woods committed Oct 11, 2013
1 parent 9e7a174 commit b673667
Showing 1 changed file with 69 additions and 60 deletions.
Expand Up @@ -25,31 +25,24 @@
import static org.modeshape.jcr.api.JcrConstants.JCR_CONTENT;
import static org.modeshape.jcr.api.JcrConstants.JCR_DATA;

import java.util.HashSet;
import java.util.List;
import java.util.Set;

import javax.inject.Inject;
import javax.jcr.ItemExistsException;
import javax.jcr.LoginException;
import javax.jcr.Node;
import javax.jcr.PathNotFoundException;
import javax.jcr.Repository;
import javax.jcr.RepositoryException;
import javax.jcr.Session;
import javax.jcr.lock.LockException;
import javax.jcr.nodetype.ConstraintViolationException;
import javax.jcr.nodetype.NoSuchNodeTypeException;
import javax.jcr.version.VersionException;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

import org.apache.abdera.Abdera;
import org.apache.abdera.model.Category;
import org.apache.abdera.model.Entry;
import org.apache.abdera.parser.Parser;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.fcrepo.jms.legacy.LegacyMethod;
import org.junit.After;
Expand Down Expand Up @@ -78,9 +71,7 @@ public class AtomJMSIT implements MessageListener {

private MessageConsumer consumer;

static Parser parser = new Abdera().getParser();

private volatile Entry entry;
private volatile Set<Entry> entries;

final private Logger logger = LoggerFactory.getLogger(AtomJMSIT.class);

Expand All @@ -92,7 +83,7 @@ public void acquireConnection() throws JMSException {
session = connection.createSession(false, AUTO_ACKNOWLEDGE);
consumer = session.createConsumer(session.createTopic("fedora"));
consumer.setMessageListener(this);
entry = null;
entries = new HashSet<>();
}

@After
Expand All @@ -104,24 +95,28 @@ public void releaseConnection() throws JMSException {
}

@Test
public void testAtomStream() throws LoginException, RepositoryException,
public void testAtomStream() throws RepositoryException,
InterruptedException {
Session session = repository.login();
session.getRootNode().addNode("test1").addMixin(FEDORA_OBJECT);
session.save();

waitForEntry();

waitForEntry(1);
session.logout();

if (entry == null) fail("Waited a second, got no messages");
List<Category> categories = copyOf(entry.getCategories("xsd:string"));
final String title = entry.getTitle();
if (entries.isEmpty()) fail("Waited a second, got no messages");

String title = null;
String path = null;
for (Category cat : categories) {
if (cat.getLabel().equals("fedora-types:pid")) {
logger.debug("Found Category with term: " + cat.getTerm());
path = cat.getTerm();
for (Entry entry : entries) {
List<Category> categories = copyOf(entry.getCategories("xsd:string"));
String p = null;
for (Category cat : categories) {
if (cat.getLabel().equals("fedora-types:pid")) {
logger.debug("Found Category with term: " + cat.getTerm());
path = cat.getTerm();
title = entry.getTitle();
}
}
}
assertEquals("Got wrong pid!", "test1", path);
Expand All @@ -130,32 +125,37 @@ public void testAtomStream() throws LoginException, RepositoryException,


@Test
public void testAtomStreamNodePath() throws LoginException, RepositoryException,
public void testAtomStreamNodePath() throws RepositoryException,
InterruptedException {
Session session = repository.login();
session.getRootNode().addNode("test1/sigma").addMixin(FEDORA_OBJECT);
session.save();

waitForEntry();

waitForEntry(1);
session.logout();

if (entry == null) fail("Waited a second, got no messages");
List<Category> categories = copyOf(entry.getCategories("xsd:string"));
if (entries.isEmpty())
fail("Waited a second, got no messages");

String path = null;
for (Category cat : categories) {
if (cat.getLabel().equals("path")) {
logger.debug("Found Category with term: " + cat.getTerm());
path = cat.getTerm();
for (Entry entry : entries) {
List<Category> categories = copyOf(entry.getCategories("xsd:string"));
String p = null;
for (Category cat : categories) {
if (cat.getLabel().equals("path")) {
logger.debug("Found Category with term: " + cat.getTerm());
p = cat.getTerm();
}
}
if (p.equals("/test1/sigma")) {
path = p;
}
}
assertEquals("Got wrong path!", "/test1/sigma", path);
}

@Test
public void testDatastreamTerm() throws NoSuchNodeTypeException,
VersionException, ConstraintViolationException, LockException,
ItemExistsException, PathNotFoundException, RepositoryException,
public void testDatastreamTerm() throws RepositoryException,
InterruptedException {
logger.trace("BEGIN: testDatastreamTerm()");
Session session = repository.login();
Expand All @@ -164,20 +164,24 @@ public void testDatastreamTerm() throws NoSuchNodeTypeException,
session.save();
logger.trace("testDatastreamTerm called session.save()");

waitForEntry();
if (entry == null) fail("Waited a second, got no messages");
List<Category> categories = copyOf(entry.getCategories("xsd:string"));
entry = null;
String path = null;
waitForEntry(1);
if (entries.isEmpty()) fail("Waited a second, got no messages");

logger.trace("Matched {} categories with scheme xsd:string", categories
.size());
for (Category cat : categories) {
if (cat.getLabel().equals("fedora-types:pid")) {
logger.debug("Found Category with term: " + cat.getTerm());
path = cat.getTerm();
String path = null;
for (Entry entry : entries) {
List<Category> categories = copyOf(entry.getCategories("xsd:string"));

logger.trace("Matched {} categories with scheme xsd:string",
categories
.size());
for (Category cat : categories) {
if (cat.getLabel().equals("fedora-types:pid")) {
logger.debug("Found Category with term: " + cat.getTerm());
path = cat.getTerm();
}
}
}
entries.clear();
assertEquals("Got wrong object PID!", "testDatastreamTerm", path);

final Node ds = object.addNode("DATASTREAM");
Expand All @@ -188,17 +192,21 @@ public void testDatastreamTerm() throws NoSuchNodeTypeException,
session.logout();
logger.trace("testDatastreamTerm called session.logout()");

waitForEntry();
if (entry == null) fail("Waited a second, got no messages");
categories = copyOf(entry.getCategories("xsd:string"));
entry = null;

logger.trace("Matched {} categories with scheme xsd:string", categories
.size());
for (Category cat : categories) {
if (cat.getLabel().equals("fedora-types:dsID")) {
logger.debug("Found Category with term: " + cat.getTerm());
path = cat.getTerm();
waitForEntry(2);
if (entries.isEmpty()) fail("Waited a second, got no messages");

path = null;
for (Entry entry : entries) {
List<Category> categories = copyOf(entry.getCategories("xsd:string"));

logger.trace("Matched {} categories with scheme xsd:string",
categories
.size());
for (Category cat : categories) {
if (cat.getLabel().equals("fedora-types:dsID")) {
logger.debug("Found Category with term: " + cat.getTerm());
path = cat.getTerm();
}
}
}

Expand All @@ -214,7 +222,8 @@ public void onMessage(Message message) {
try {
if (LegacyMethod.canParse(message)) {
LegacyMethod legacy = new LegacyMethod(tMessage.getText());
entry = legacy.getEntry();
Entry entry = legacy.getEntry();
entries.add(entry);
logger.debug("Parsed Entry: {}", entry.toString());
} else {
logger.warn("Could not parse message: {}", message);
Expand All @@ -228,9 +237,9 @@ public void onMessage(Message message) {
}
}

private void waitForEntry() throws InterruptedException {
private void waitForEntry(int size) throws InterruptedException {
for (int i = 0; i < 5; i++) {
if (entry == null) { // must not have rec'vd event yet
if (entries.size() < size) { // must not have rec'vd event yet
synchronized (this) {
this.wait(1000);
}
Expand Down

0 comments on commit b673667

Please sign in to comment.