Skip to content

Commit

Permalink
Multicast implemented
Browse files Browse the repository at this point in the history
  • Loading branch information
mairbek committed Apr 12, 2013
1 parent 98f6cbc commit 0499cff
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 38 deletions.
Expand Up @@ -3,47 +3,14 @@
import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.subjects.DefaultSubject;
import rx.subjects.Subject;
import rx.util.functions.Func1;

public class ConnectableObservable<T, R> extends Observable<T> {
private final Observable<T> source;
private final Subject<T, R> subject;
public abstract class ConnectableObservable<T> extends Observable<T> {

public static <T, R> ConnectableObservable create(final Observable<T> source, final Subject<T, R> subject) {
return new ConnectableObservable<T, R>(source, subject, new Func1<Observer<T>, Subscription>() {
@Override
public Subscription call(Observer<T> observer) {
return subject.subscribe(observer);
}
});
}

protected ConnectableObservable(Observable<T> source, Subject<T, R> subject, Func1<Observer<T>, Subscription> onSubscribe) {
protected ConnectableObservable(Func1<Observer<T>, Subscription> onSubscribe) {
super(onSubscribe);
this.source = source;
this.subject = subject;
}

public Subscription connect() {
return source.subscribe(new Observer<T>() {
@Override
public void onCompleted() {
subject.onCompleted();
}

@Override
public void onError(Exception e) {
subject.onError(e);
}

@Override
public void onNext(T args) {
subject.onNext(args);
}
});
}

public abstract Subscription connect();

}
44 changes: 42 additions & 2 deletions rxjava-core/src/main/java/rx/operators/OperatorMulticast.java
Expand Up @@ -3,11 +3,51 @@
import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.observables.ConnectableObservable;
import rx.subjects.Subject;
import rx.util.functions.Func1;

public class OperatorMulticast {
public static <T> Func1<Observer<T>, Subscription> multicast(Observable<T> source, Func1<T, Boolean> predicate) {
return null;
public static <T, R> ConnectableObservable<R> multicast(Observable<T> source, final Subject<T, R> subject) {
return new MulticastConnectableObservable<T ,R>(source, subject);
}

private static class MulticastConnectableObservable<T, R> extends ConnectableObservable<R> {
private final Observable<T> source;
private final Subject<T, R> subject;

public MulticastConnectableObservable(Observable<T> source, final Subject<T, R> subject) {
super(new Func1<Observer<R>, Subscription>() {
@Override
public Subscription call(Observer<R> observer) {
return subject.subscribe(observer);
}
});
this.source = source;
this.subject = subject;
}

public Subscription connect() {
return source.subscribe(new Observer<T>() {
@Override
public void onCompleted() {
subject.onCompleted();
}

@Override
public void onError(Exception e) {
subject.onError(e);
}

@Override
public void onNext(T args) {
subject.onNext(args);
}
});
}


}


}

0 comments on commit 0499cff

Please sign in to comment.