|
1 | 1 | import numpy as np
|
2 | 2 | from scipy.interpolate import splrep, splev, spalde
|
| 3 | +from scipy.special import binom |
| 4 | + |
| 5 | + |
| 6 | +class UnivariateMultiSpline: |
| 7 | + """Multidimensional wrapper around `scipy.interpolate.sp*` functions. |
| 8 | + `scipy.inteprolate.splprep` unfortunately does only up to 12 dimsions. |
| 9 | + """ |
| 10 | + def __init__(self, x, y, order=4): |
| 11 | + self.order = order |
| 12 | + self.s = [splrep(x, yi, k=order - 1) for yi in y] |
| 13 | + |
| 14 | + def lev(self, x, *a, **k): |
| 15 | + return np.array([splev(x, si) for si in self.s]) |
| 16 | + |
| 17 | + def alde(self, x): |
| 18 | + u = np.array([spalde(x, si) for si in self.s]) |
| 19 | + if len(x) == 1: |
| 20 | + u = u[:, None, :] |
| 21 | + return u |
| 22 | + |
| 23 | + def __call__(self, x, use_alde=True): |
| 24 | + if use_alde: |
| 25 | + u = self.alde(x)[:, :, :self.order] |
| 26 | + s = (len(self.s), len(x), self.order) |
| 27 | + assert u.shape == s, (u.shape, s) |
| 28 | + return u.transpose(2, 0, 1) |
| 29 | + else: |
| 30 | + return np.array([self.lev(x, der=i) for i in range(self.order)]) |
| 31 | + |
| 32 | + |
| 33 | +class UnivariateMultiSparseSpline(UnivariateMultiSpline): |
| 34 | + def __init__(self, d, x0=None, order=4): |
| 35 | + self.order = order |
| 36 | + self.n = sorted(set(n for x, n, y in d)) |
| 37 | + self.s = [] |
| 38 | + for n in self.n: |
| 39 | + x, y = np.array([(x, y) for x, ni, y in d if n == ni]).T |
| 40 | + if x0 is not None: |
| 41 | + y0 = splev(x0, splrep(x, y, k=order - 1)) |
| 42 | + x, y = x0, y0 |
| 43 | + s = splrep(x, y, k=order - 1) |
| 44 | + self.s.append(s) |
| 45 | + |
| 46 | + |
| 47 | +def pad_const(x, n, axis=0): |
| 48 | + """Prefix and postfix the array `x` by `n` repetitions of the first and |
| 49 | + last vlaue along `axis`. |
| 50 | + """ |
| 51 | + a = np.repeat(x.take([0], axis), n, axis) |
| 52 | + b = np.repeat(x.take([-1], axis), n, axis) |
| 53 | + xp = np.concatenate([a, x, b], axis) |
| 54 | + s = list(x.shape) |
| 55 | + s[axis] += 2*n |
| 56 | + assert xp.shape == tuple(s), (x.shape, s, xp.shape) |
| 57 | + return xp |
| 58 | + |
| 59 | + |
| 60 | +def build_segment(durations, coefficients, target="bias", |
| 61 | + variable="amplitude"): |
| 62 | + """Build a wavesynth-style segment from homogeneous duration and |
| 63 | + coefficient data. |
| 64 | +
|
| 65 | + :param durations: 1D sequence of line durations. |
| 66 | + :param coefficients: 3D array with shape `(n, m, len(x))`, |
| 67 | + with `n` being the interpolation order + 1 and `m` the number of |
| 68 | + channels. |
| 69 | + :param target: The target component of the channel to affect. |
| 70 | + :param variable: The variable within the target component. |
| 71 | + """ |
| 72 | + for dxi, yi in zip(durations, coefficients.transpose()): |
| 73 | + d = {"duration": int(dxi)} |
| 74 | + d["channel_data"] = cd = [] |
| 75 | + for yij in yi: |
| 76 | + cdj = [] |
| 77 | + for yijk in reversed(yij): |
| 78 | + if cdj or abs(yijk): |
| 79 | + cdj.append(float(yijk)) |
| 80 | + cdj.reverse() |
| 81 | + cd.append({target: {variable: cdj}}) |
| 82 | + yield d |
3 | 83 |
|
4 | 84 |
|
5 | 85 | class CoefficientSource:
|
6 |
| - def get_times(self, t, speed, clock): |
7 |
| - pass |
8 |
| - |
9 |
| - def get_coefficients(self, t, speed): |
10 |
| - pass |
11 |
| - |
12 |
| - def get_program(self, dt, u): |
13 |
| - pass |
14 |
| - |
15 |
| - |
16 |
| -def _round_times(times, sample_times=None): |
17 |
| - times = np.asanyarray(times) |
18 |
| - if sample_times is None: |
19 |
| - sample_times = np.rint(times) |
20 |
| - duration = np.diff(sample_times) |
21 |
| - sample_times = sample_times[:-1] |
22 |
| - assert np.all(duration >= 0) |
23 |
| - assert np.all(duration < (1 << 16)) |
24 |
| - return times, sample_times, duration |
25 |
| - |
26 |
| - |
27 |
| -def _interpolate(time, data, sample_times, order=3): |
28 |
| - # FIXME: this does not ensure that the spline does not clip |
29 |
| - spline = splrep(time, data, k=order or 1) |
30 |
| - # FIXME: this could be faster but needs k knots outside t_eval |
31 |
| - # dv = np.array(spalde(t_eval, s)) |
32 |
| - coeffs = np.array([splev(sample_times, spline, der=i, ext=0) |
33 |
| - for i in range(order + 1)]).T |
34 |
| - return coeffs |
35 |
| - |
36 |
| - |
37 |
| - |
38 |
| -def _zip_program(times, channels, target): |
39 |
| - for tc in zip(times, *channels): |
40 |
| - yield { |
41 |
| - "duration": tc[0], |
42 |
| - "channel_data": tc[1:], |
43 |
| - } |
44 |
| -# FIXME: this does not handle: |
45 |
| -# `clear` (clearing the phase accumulator) |
46 |
| -# `silence` (stopping the dac clock) |
47 |
| - |
48 |
| - |
49 |
| -def interpolate_channels(times, data, sample_times=None, **kwargs): |
50 |
| - if len(times) == 1: |
51 |
| - return _zip_program(np.array([1]), data[:, :, None]) |
52 |
| - data = np.asanyarray(data) |
53 |
| - assert len(times) == len(data) |
54 |
| - times, sample_times, duration = _round_times(times, sample_times) |
55 |
| - channel_coeff = [_interpolate(sample_times, i, **kwargs) for i in data.T] |
56 |
| - return _zip_program(duration, np.array(channel_coeff)) |
57 |
| - # v = np.clip(v/self.max_out, -1, 1) |
58 |
| - # |
59 |
| - # |
| 86 | + def crop_x(self, start, stop, num=2): |
| 87 | + """Return an array of valid sample positions. |
| 88 | +
|
| 89 | + This function needs to be implemented only if this |
| 90 | + `CoefficientSource` does not support sampling at arbitrary |
| 91 | + positions. |
| 92 | +
|
| 93 | + :param start: First sample position. |
| 94 | + :param stop: Last sample position. |
| 95 | + :param num: Number of samples between `start` and `stop`. |
| 96 | + :return: Array of sample positions. `start` and `stop` should be |
| 97 | + returned as the first and last value in the array respectively. |
| 98 | + """ |
| 99 | + return np.linspace(start, stop, num) |
| 100 | + |
| 101 | + def scale_x(self, x, scale): |
| 102 | + """Scale and round sample positions. |
| 103 | +
|
| 104 | + :param x: Input sample positions in data space. |
| 105 | + :param scale: Data space position to cycles conversion scale, |
| 106 | + in units of x-units per clock cycle. |
| 107 | + :return: `x_sample`, the rounded sample positions and `durations`, the |
| 108 | + integer durations of the individual samples in cycles. |
| 109 | + """ |
| 110 | + raise NotImplementedError |
| 111 | + |
| 112 | + def __call__(self, x, **kwargs): |
| 113 | + """Perform sampling and return coefficients. |
| 114 | +
|
| 115 | + :param x: Sample positions. |
| 116 | + :return: `y` the array of coefficients. `y.shape == (order, n, len(x))` |
| 117 | + with `n` being the number of channels.""" |
| 118 | + raise NotImplementedError |
| 119 | + |
| 120 | + def get_segment_data(self, start, stop, scale, cutoff=1e-12, |
| 121 | + min_duration=1, min_length=20, |
| 122 | + target="bias", variable="amplitude"): |
| 123 | + """Build wavesynth segment data. |
| 124 | +
|
| 125 | + :param start: see `crop_x()`. |
| 126 | + :param stop: see `crop_x()`. |
| 127 | + :param scale: see `scale_x()`. |
| 128 | + :param num: see `crop_x()`. |
| 129 | + :param cutoff: coefficient cutoff towards zero to compress data. |
| 130 | + :param min_duration: Minimum duration of a line. |
| 131 | + :param min_length: Minimum segment length to space triggers. |
| 132 | + """ |
| 133 | + x = self.crop_x(start, stop) |
| 134 | + x_sample, durations = self.scale_x(x, scale) |
| 135 | + coefficients = self(x_sample) |
| 136 | + np.clip(np.fabs(durations), min_duration, None, out=durations) |
| 137 | + if len(durations) == 1: |
| 138 | + durations[0] = max(durations[0], min_length) |
| 139 | + if start == stop: |
| 140 | + coefficients = coefficients[:1] |
| 141 | + # rescale coefficients accordingly |
| 142 | + coefficients *= (scale*np.sign(durations))**np.arange( |
| 143 | + coefficients.shape[0])[:, None, None] |
| 144 | + if cutoff: |
| 145 | + coefficients[np.fabs(coefficients) < cutoff] = 0 |
| 146 | + return build_segment(durations, coefficients, target=target, |
| 147 | + variable=variable) |
| 148 | + |
| 149 | + def extend_segment(self, segment, *args, **kwargs): |
| 150 | + """Extend a wavesynth segment. |
| 151 | +
|
| 152 | + See `get_segment()` for arguments. |
| 153 | + """ |
| 154 | + for line in self.get_segment_data(*args, **kwargs): |
| 155 | + segment.add_line(**line) |
| 156 | + |
| 157 | + |
| 158 | +class SplineSource(CoefficientSource): |
| 159 | + def __init__(self, x, y, order=4, pad_dx=1.): |
| 160 | + """ |
| 161 | + :param x: 1D sample positions. |
| 162 | + :param y: 2D sample values. |
| 163 | + """ |
| 164 | + self.x = np.asanyarray(x) |
| 165 | + assert self.x.ndim == 1 |
| 166 | + self.y = np.asanyarray(y) |
| 167 | + assert self.y.ndim == 2 |
| 168 | + |
| 169 | + if pad_dx is not None: |
| 170 | + a = np.arange(-order, 0)*pad_dx + self.x[0] |
| 171 | + b = self.x[-1] + np.arange(1, order + 1)*pad_dx |
| 172 | + self.x = np.r_[a, self.x, b] |
| 173 | + self.y = pad_const(self.y, order, axis=1) |
| 174 | + |
| 175 | + assert self.y.shape[1] == self.x.shape[0] |
| 176 | + self.spline = UnivariateMultiSpline(self.x, self.y, order) |
| 177 | + |
| 178 | + def crop_x(self, start, stop): |
| 179 | + ia, ib = np.searchsorted(self.x, (start, stop)) |
| 180 | + if start > stop: |
| 181 | + x = self.x[ia - 1:ib - 1:-1] |
| 182 | + else: |
| 183 | + x = self.x[ia:ib] |
| 184 | + return np.r_[start, x, stop] |
| 185 | + |
| 186 | + def scale_x(self, x, scale, nudge=1e-9): |
| 187 | + # We want to only sample a spline at t_knot + epsilon |
| 188 | + # where the highest order derivative has just jumped |
| 189 | + # and is valid at least up to the next knot after t_knot. |
| 190 | + # |
| 191 | + # To ensure that we are on the right side of a knot: |
| 192 | + # * only ever increase t when rounding (for increasing t) |
| 193 | + # * or only ever decrease it (for decreasing t) |
| 194 | + # |
| 195 | + # The highest derivative is discontinuous at t |
| 196 | + # and the correct value for a segment is obtained |
| 197 | + # for t_int >= t_float == t_knot (and v.v. for t decreasing). |
| 198 | + x = x/scale |
| 199 | + inc = np.diff(x) >= 0 |
| 200 | + inc = np.r_[inc, inc[-1]] |
| 201 | + x = np.where(inc, np.ceil(x + nudge), np.floor(x - nudge)) |
| 202 | + if len(x) > 1 and x[0] == x[1]: |
| 203 | + x = x[1:] |
| 204 | + if len(x) > 1 and x[-2] == x[-1]: |
| 205 | + x = x[:-1] |
| 206 | + x_sample = x[:-1]*scale |
| 207 | + durations = np.diff(x.astype(np.int)) |
| 208 | + return x_sample, durations |
| 209 | + |
| 210 | + def __call__(self, x): |
| 211 | + return self.spline(x) |
| 212 | + |
| 213 | + |
| 214 | +class ComposingSplineSource(SplineSource): |
| 215 | + # TODO |
| 216 | + def __init__(self, x, y, components, order=4, pad_dx=1.): |
| 217 | + self.x = np.asanyarray(x) |
| 218 | + assert self.x.ndim == 1 |
| 219 | + self.y = np.asanyarray(y) |
| 220 | + assert self.y.ndim == 3 |
| 221 | + |
| 222 | + if pad_dx is not None: |
| 223 | + a = np.arange(-order, 0)*pad_dx + self.x[0] |
| 224 | + b = self.x[-1] + np.arange(1, order + 1)*pad_dx |
| 225 | + self.x = np.r_[a, self.x, b] |
| 226 | + self.y = pad_const(self.y, order, axis=2) |
| 227 | + |
| 228 | + assert self.y.shape[2] == self.x.shape[0] |
| 229 | + self.splines = [UnivariateMultiSpline(self.x, yi, order) |
| 230 | + for yi in self.y] |
| 231 | + |
| 232 | + # need to resample/upsample the shim splines to the master spline knots |
| 233 | + # shim knot spacings can span an master spline knot and thus would |
| 234 | + # cross a highest order derivative boundary |
| 235 | + self.components = UnivariateMultiSparseSpline( |
| 236 | + components, self.x, order) |
| 237 | + |
| 238 | + def __call__(self, t, gain={}, offset={}): |
| 239 | + der = list((set(self.components.n) | set(offset)) & set(self.der)) |
| 240 | + u = np.zeros((self.splines[0].order, len(self.splines[0].s), len(t))) |
| 241 | + # der, order, ele, t |
| 242 | + p = np.array([self.splines[i](t) for i in der]) |
| 243 | + # order, der, None, t |
| 244 | + s = self.components(t) |
| 245 | + s_gain = np.array([gain.get(_, 1.) for _ in self.components.n]) |
| 246 | + s = s[:, :, None, :]*s_gain[None, :, None, None] |
| 247 | + for k, v in offset.items(): |
| 248 | + if v and k in self.der: |
| 249 | + u += v*p[self.der.index(k)] |
| 250 | + ps = p[[self.der.index(_) for _ in self.shims.der]] |
| 251 | + for i in range(u.shape[1]): |
| 252 | + for j in range(i + 1): |
| 253 | + u[i] += binom(i, j)*(s[j]*ps[:, i - j]).sum(0) |
| 254 | + return u # (order, ele, t) |
| 255 | + |
60 | 256 |
|
61 | 257 | def discrete_compensate(c):
|
62 | 258 | """Compensate spline coefficients for discrete accumulators
|
63 | 259 |
|
64 |
| - Given continuous time b-spline coefficients, this function |
| 260 | + Given continuous-time b-spline coefficients, this function |
65 | 261 | compensates for the effect of discrete time steps in the
|
66 | 262 | target devices.
|
67 | 263 |
|
|
0 commit comments