Skip to content

Commit

Permalink
This closes #3309
Browse files Browse the repository at this point in the history
  • Loading branch information
aljoscha committed Jun 13, 2017
2 parents fe3d554 + 4d18606 commit 7126fdc
Show file tree
Hide file tree
Showing 4 changed files with 232 additions and 251 deletions.
Expand Up @@ -27,6 +27,7 @@
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;

import com.google.common.collect.Iterables;
import java.util.Arrays;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -570,4 +571,43 @@ public void testMergeLatestWatermarkIntoSource() throws Exception {
assertThat(value1.read(), equalTo(null));
assertThat(value2.read(), equalTo(null));
}

@Test
public void testSetReadable() throws Exception {
SetState<String> value = underTest.state(NAMESPACE_1, STRING_SET_ADDR);

// test contains
ReadableState<Boolean> readable = value.contains("A");
value.add("A");
assertFalse(readable.read());

// test addIfAbsent
value.addIfAbsent("B");
assertTrue(value.contains("B").read());
}

@Test
public void testMapReadable() throws Exception {
MapState<String, Integer> value = underTest.state(NAMESPACE_1, STRING_MAP_ADDR);

// test iterable, should just return a iterable view of the values contained in this map.
// The iterable is backed by the map, so changes to the map are reflected in the iterable.
ReadableState<Iterable<String>> keys = value.keys();
ReadableState<Iterable<Integer>> values = value.values();
ReadableState<Iterable<Map.Entry<String, Integer>>> entries = value.entries();
value.put("A", 1);
assertFalse(Iterables.isEmpty(keys.read()));
assertFalse(Iterables.isEmpty(values.read()));
assertFalse(Iterables.isEmpty(entries.read()));

// test get
ReadableState<Integer> get = value.get("B");
value.put("B", 2);
assertNull(get.read());

// test addIfAbsent
value.putIfAbsent("C", 3);
assertThat(value.get("C").read(), equalTo(3));
}

}
1 change: 0 additions & 1 deletion runners/flink/pom.xml
Expand Up @@ -91,7 +91,6 @@
<excludedGroups>
org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders,
org.apache.beam.sdk.testing.LargeKeys$Above100MB,
org.apache.beam.sdk.testing.UsesSetState,
org.apache.beam.sdk.testing.UsesCommittedMetrics,
org.apache.beam.sdk.testing.UsesTestStream,
org.apache.beam.sdk.testing.UsesSplittableParDo
Expand Down

0 comments on commit 7126fdc

Please sign in to comment.