Skip to content

Commit

Permalink
fix thread shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
barmintor committed Apr 8, 2013
1 parent a028cd0 commit 0e47c87
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 12 deletions.
26 changes: 26 additions & 0 deletions src/main/java/org/fcrepo/federation/bagit/BagItConnector.java
Expand Up @@ -10,6 +10,11 @@
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import javax.jcr.NamespaceRegistry;
import javax.jcr.RepositoryException;
Expand Down Expand Up @@ -68,6 +73,8 @@ public class BagItConnector extends FileSystemConnector {
private String directoryAbsolutePath;

private int directoryAbsolutePathLength;

private ExecutorService threadPool;

@Override
public void initialize(NamespaceRegistry registry,
Expand Down Expand Up @@ -99,6 +106,17 @@ public void initialize(NamespaceRegistry registry,

setExtraPropertiesStore(new BagItExtraPropertiesStore());
getLogger().trace("Initialized.");
BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<Runnable>(1);
threadPool = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, workQueue);
getLogger().trace("Threadpool initialized.");
threadPool.execute(new ManifestMonitor(this));
getLogger().trace("Monitor thread queued.");
}

@Override
public void shutdown() {
threadPool.shutdown();
getLogger().trace("Threadpool shutdown.");
}

@Override
Expand Down Expand Up @@ -206,5 +224,13 @@ public void updateDocument(DocumentChanges documentChanges) {
File getBagItDirectory() {
return this.directory;
}

void changeManifest(File file) {
// do some manifest stuff
}

void changeTagFile(File file) {
// do some tagFile stuff
}

}
15 changes: 12 additions & 3 deletions src/main/java/org/fcrepo/federation/bagit/BagItWatchService.java
Expand Up @@ -45,6 +45,14 @@ public class BagItWatchService implements WatchService {
delegate = FileSystems.getDefault().newWatchService();
}

/**
* Constructor to facilitate testing
* @param delegate
*/
BagItWatchService(WatchService delegate) {
this.delegate = delegate;
}

public BagItWatchService(File bagItDir) throws IOException {
this();
for (File file: bagItDir.listFiles()) {
Expand Down Expand Up @@ -80,13 +88,13 @@ public WatchKey take() throws InterruptedException {
public void monitorTagFile(File input) throws IOException {
Path path = Paths.get(input.toURI());
if (!tagFiles.contains(path)) tagFiles.add(path);
path.register(this, ENTRY_MODIFY);
path.register(delegate, ENTRY_MODIFY);
}

public void monitorManifest(File input) throws IOException {
Path path = Paths.get(input.toURI());
if (!manifests.contains(path)) manifests.add(path);
path.register(this, ENTRY_MODIFY);
path.register(delegate, ENTRY_MODIFY);
}

boolean isManifest(String fileName) {
Expand Down Expand Up @@ -146,7 +154,8 @@ public Collection<File> apply(File input) {
LineNumberReader lnr = new LineNumberReader(new FileReader(input));
String line = null;
while((line = lnr.readLine()) != null) {
File file = new File(input.getParentFile(), line);
String fileName = line.split(" ")[0];
File file = new File(input.getParentFile(), fileName);
result.add(file);
}
return result;
Expand Down
28 changes: 19 additions & 9 deletions src/main/java/org/fcrepo/federation/bagit/ManifestMonitor.java
Expand Up @@ -5,44 +5,54 @@
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.util.List;
import java.util.concurrent.TimeUnit;

public class ManifestMonitor implements Runnable {

private BagItConnector connector;

private BagItWatchService watchService;

private boolean run;

private volatile boolean shutdown;
public ManifestMonitor(BagItConnector connector) throws IOException {
this.connector = connector;
this.watchService = new BagItWatchService(connector.getBagItDirectory());
this.shutdown = false;
}

@Override
public void run() {
while(!Thread.interrupted()) {
WatchKey key = watchService.poll();
while(!this.shutdown) {
try {
WatchKey key = watchService.poll(1, TimeUnit.SECONDS);
List<WatchEvent<?>> events = key.pollEvents();
boolean manifest = false;
boolean tagManifest = false;
Path path;
Path path = null;
for (WatchEvent<?> event: events) {
Path context = (Path)event.context();
if (watchService.isManifest(context)) {
manifest = true;
path= context;
path = context;
} else if (watchService.isTagManifest(context)) {
tagManifest = true;
path= context;
path = context;
}
}
if (manifest) {
//connector.manifestChanged(path.toFile());
connector.changeManifest(path.toFile());
} else if (tagManifest) {
//connector.tagFileChanged(path.toFile());
connector.changeTagFile(path.toFile());
}
} catch (InterruptedException e) {
this.shutdown = true;
}
}
}

public void shutdown() {
this.shutdown = true;
}

}

0 comments on commit 0e47c87

Please sign in to comment.