1
- use managed:: Managed ;
1
+ use managed:: { Managed , ManagedSlice } ;
2
2
3
3
use { Error , Result } ;
4
4
use super :: Resettable ;
5
5
6
6
/// A ring buffer.
7
7
#[ derive( Debug ) ]
8
8
pub struct RingBuffer < ' a , T : ' a > {
9
- storage : Managed < ' a , [ T ] > ,
9
+ storage : ManagedSlice < ' a , T > ,
10
10
read_at : usize ,
11
- length : usize ,
11
+ length : usize ,
12
12
}
13
13
14
14
impl < ' a , T : ' a > RingBuffer < ' a , T > {
15
15
/// Create a ring buffer with the given storage.
16
16
///
17
17
/// During creation, every element in `storage` is reset.
18
18
pub fn new < S > ( storage : S ) -> RingBuffer < ' a , T >
19
- where S : Into < Managed < ' a , [ T ] > > , T : Resettable ,
19
+ where S : Into < ManagedSlice < ' a , T > > ,
20
20
{
21
- let mut storage = storage. into ( ) ;
22
- for elem in storage. iter_mut ( ) {
23
- elem. reset ( ) ;
24
- }
25
-
26
21
RingBuffer {
27
- storage : storage,
22
+ storage : storage. into ( ) ,
28
23
read_at : 0 ,
29
24
length : 0 ,
30
25
}
31
26
}
32
27
33
- fn mask ( & self , index : usize ) -> usize {
34
- index % self . storage . len ( )
28
+ /// Clear the ring buffer.
29
+ pub fn clear ( & mut self ) {
30
+ self . read_at = 0 ;
31
+ self . length = 0 ;
32
+ }
33
+
34
+ /// Clear the ring buffer, and reset every element.
35
+ pub fn reset ( & mut self )
36
+ where T : Resettable {
37
+ self . clear ( ) ;
38
+ for elem in self . storage . iter_mut ( ) {
39
+ elem. reset ( ) ;
40
+ }
35
41
}
36
42
37
- fn incr ( & self , index : usize ) -> usize {
38
- self . mask ( index + 1 )
43
+ /// Return the current number of elements in the ring buffer.
44
+ pub fn len ( & self ) -> usize {
45
+ self . length
46
+ }
47
+
48
+ /// Return the maximum number of elements in the ring buffer.
49
+ pub fn capacity ( & self ) -> usize {
50
+ self . storage . len ( )
51
+ }
52
+
53
+ /// Return the number of elements that can be added to the ring buffer.
54
+ pub fn window ( & self ) -> usize {
55
+ self . capacity ( ) - self . len ( )
39
56
}
40
57
41
58
/// Query whether the buffer is empty.
42
59
pub fn empty ( & self ) -> bool {
43
- self . length == 0
60
+ self . len ( ) == 0
44
61
}
45
62
46
63
/// Query whether the buffer is full.
47
64
pub fn full ( & self ) -> bool {
48
- self . length == self . storage . len ( )
49
- }
50
-
51
- /// Enqueue an element into the buffer, and return a pointer to it, or return
52
- /// `Err(Error::Exhausted)` if the buffer is full.
53
- pub fn enqueue < ' b > ( & ' b mut self ) -> Result < & ' b mut T > {
54
- if self . full ( ) { return Err ( Error :: Exhausted ) }
55
-
56
- let index = self . mask ( self . read_at + self . length ) ;
57
- self . length += 1 ;
58
- Ok ( & mut self . storage [ index] )
65
+ self . window ( ) == 0
59
66
}
67
+ }
60
68
61
- /// Call `f` with a buffer element, and enqueue the element if `f` returns successfully, or
62
- /// return `Err(Error::Exhausted)` if the buffer is full.
69
+ // This is the "discrete" ring buffer interface: it operates with single elements,
70
+ // and boundary conditions (empty/full) are errors.
71
+ impl < ' a , T : ' a > RingBuffer < ' a , T > {
72
+ /// Call `f` with a single buffer element, and enqueue the element if `f`
73
+ /// returns successfully, or return `Err(Error::Exhausted)` if the buffer is full.
63
74
pub fn try_enqueue < ' b , R , F > ( & ' b mut self , f : F ) -> Result < R >
64
75
where F : FnOnce ( & ' b mut T ) -> Result < R > {
65
76
if self . full ( ) { return Err ( Error :: Exhausted ) }
66
77
67
- let index = self . mask ( self . read_at + self . length ) ;
78
+ let index = ( self . read_at + self . length ) % self . capacity ( ) ;
68
79
match f ( & mut self . storage [ index] ) {
69
80
Ok ( result) => {
70
81
self . length += 1 ;
@@ -74,15 +85,10 @@ impl<'a, T: 'a> RingBuffer<'a, T> {
74
85
}
75
86
}
76
87
77
- /// Dequeue an element from the buffer, and return a mutable reference to it, or return
78
- /// `Err(Error::Exhausted)` if the buffer is empty.
79
- pub fn dequeue ( & mut self ) -> Result < & mut T > {
80
- if self . empty ( ) { return Err ( Error :: Exhausted ) }
81
-
82
- let read_at = self . read_at ;
83
- self . length -= 1 ;
84
- self . read_at = self . incr ( self . read_at ) ;
85
- Ok ( & mut self . storage [ read_at] )
88
+ /// Enqueue a single element into the buffer, and return a pointer to it,
89
+ /// or return `Err(Error::Exhausted)` if the buffer is full.
90
+ pub fn enqueue < ' b > ( & ' b mut self ) -> Result < & ' b mut T > {
91
+ self . try_enqueue ( Ok )
86
92
}
87
93
88
94
/// Call `f` with a buffer element, and dequeue the element if `f` returns successfully, or
@@ -91,7 +97,7 @@ impl<'a, T: 'a> RingBuffer<'a, T> {
91
97
where F : FnOnce ( & ' b mut T ) -> Result < R > {
92
98
if self . empty ( ) { return Err ( Error :: Exhausted ) }
93
99
94
- let next_at = self . incr ( self . read_at ) ;
100
+ let next_at = ( self . read_at + 1 ) % self . capacity ( ) ;
95
101
match f ( & mut self . storage [ self . read_at ] ) {
96
102
Ok ( result) => {
97
103
self . length -= 1 ;
@@ -101,32 +107,91 @@ impl<'a, T: 'a> RingBuffer<'a, T> {
101
107
Err ( error) => Err ( error)
102
108
}
103
109
}
110
+
111
+ /// Dequeue an element from the buffer, and return a mutable reference to it, or return
112
+ /// `Err(Error::Exhausted)` if the buffer is empty.
113
+ pub fn dequeue ( & mut self ) -> Result < & mut T > {
114
+ self . try_dequeue ( Ok )
115
+ }
104
116
}
105
117
106
- #[ cfg( test) ]
107
- mod test {
108
- use super :: * ;
118
+ // This is the "continuous" ring buffer interface: it operates with element slices,
119
+ // and boundary conditions (empty/full) simply result in empty slices.
120
+ impl < ' a , T : ' a > RingBuffer < ' a , T > {
121
+ fn clamp_writer ( & self , mut size : usize ) -> ( usize , usize ) {
122
+ let write_at = ( self . read_at + self . length ) % self . capacity ( ) ;
123
+ // We can't enqueue more than there is free space.
124
+ let free = self . capacity ( ) - self . length ;
125
+ if size > free { size = free }
126
+ // We can't contiguously enqueue past the beginning of the storage.
127
+ let until_end = self . capacity ( ) - write_at;
128
+ if size > until_end { size = until_end }
129
+
130
+ ( write_at, size)
131
+ }
109
132
110
- impl Resettable for usize {
111
- fn reset ( & mut self ) {
112
- * self = 0 ;
113
- }
133
+ pub ( crate ) fn enqueue_slice < ' b > ( & ' b mut self , size : usize ) -> & ' b mut [ T ] {
134
+ let ( write_at , size ) = self . clamp_writer ( size ) ;
135
+ self . length += size ;
136
+ & mut self . storage [ write_at..write_at + size ]
114
137
}
115
138
116
- const SIZE : usize = 5 ;
139
+ pub ( crate ) fn enqueue_slice_all ( & mut self , data : & [ T ] )
140
+ where T : Copy {
141
+ let data = {
142
+ let mut dest = self . enqueue_slice ( data. len ( ) ) ;
143
+ let ( data, rest) = data. split_at ( dest. len ( ) ) ;
144
+ dest. copy_from_slice ( data) ;
145
+ rest
146
+ } ;
147
+ // Retry, in case we had a wraparound.
148
+ let mut dest = self . enqueue_slice ( data. len ( ) ) ;
149
+ let ( data, _) = data. split_at ( dest. len ( ) ) ;
150
+ dest. copy_from_slice ( data) ;
151
+ }
117
152
118
- fn buffer ( ) -> RingBuffer < ' static , usize > {
119
- let mut storage = vec ! [ ] ;
120
- for i in 0 ..SIZE {
121
- storage. push ( i + 10 ) ;
122
- }
153
+ fn clamp_reader ( & self , offset : usize , mut size : usize ) -> ( usize , usize ) {
154
+ let read_at = ( self . read_at + offset) % self . capacity ( ) ;
155
+ // We can't read past the end of the queued data.
156
+ if offset > self . length { return ( read_at, 0 ) }
157
+ // We can't dequeue more than was queued.
158
+ let clamped_length = self . length - offset;
159
+ if size > clamped_length { size = clamped_length }
160
+ // We can't contiguously dequeue past the end of the storage.
161
+ let until_end = self . capacity ( ) - read_at;
162
+ if size > until_end { size = until_end }
163
+
164
+ ( read_at, size)
165
+ }
166
+
167
+ pub ( crate ) fn dequeue_slice ( & mut self , size : usize ) -> & [ T ] {
168
+ let ( read_at, size) = self . clamp_reader ( 0 , size) ;
169
+ self . read_at = ( self . read_at + size) % self . capacity ( ) ;
170
+ self . length -= size;
171
+ & self . storage [ read_at..read_at + size]
172
+ }
123
173
124
- RingBuffer :: new ( storage)
174
+ pub ( crate ) fn peek ( & self , offset : usize , size : usize ) -> & [ T ] {
175
+ let ( read_at, size) = self . clamp_reader ( offset, size) ;
176
+ & self . storage [ read_at..read_at + size]
125
177
}
178
+ }
179
+
180
+ impl < ' a , T : ' a > From < ManagedSlice < ' a , T > > for RingBuffer < ' a , T > {
181
+ fn from ( slice : ManagedSlice < ' a , T > ) -> RingBuffer < ' a , T > {
182
+ RingBuffer :: new ( slice)
183
+ }
184
+ }
185
+
186
+ #[ cfg( test) ]
187
+ mod test {
188
+ use super :: * ;
189
+
190
+ const SIZE : usize = 5 ;
126
191
127
192
#[ test]
128
193
pub fn test_buffer ( ) {
129
- let mut buf = buffer ( ) ;
194
+ let mut buf = RingBuffer :: new ( vec ! [ 0 ; SIZE ] ) ;
130
195
assert ! ( buf. empty( ) ) ;
131
196
assert ! ( !buf. full( ) ) ;
132
197
assert_eq ! ( buf. dequeue( ) , Err ( Error :: Exhausted ) ) ;
@@ -152,7 +217,7 @@ mod test {
152
217
153
218
#[ test]
154
219
pub fn test_buffer_try ( ) {
155
- let mut buf = buffer ( ) ;
220
+ let mut buf = RingBuffer :: new ( vec ! [ 0 ; SIZE ] ) ;
156
221
assert ! ( buf. empty( ) ) ;
157
222
assert ! ( !buf. full( ) ) ;
158
223
assert_eq ! ( buf. try_dequeue( |_| unreachable!( ) ) as Result <( ) >,
1 commit comments
whitequark commentedon Sep 8, 2017
cc @batonius on this and next two commits