Source code for e2cnn.diffops.utils


from typing import List, Union, Iterable, Tuple
import os
import warnings
import pickle

import numpy as np
import scipy.special

try:
    from sympy.calculus.finite_diff import finite_diff_weights
    _SYMPY_AVAILABLE = True
except ImportError:
    _SYMPY_AVAILABLE = False

try:
    from rbf.pde.fd import weight_matrix # type: ignore
    from rbf.basis import set_symbolic_to_numeric_method, get_rbf # type: ignore

    _RBF_AVAILABLE = True

    set_symbolic_to_numeric_method('lambdify')
    _gaussian = get_rbf("ga")

except ImportError:
    _RBF_AVAILABLE = False

_DIFFOP_CACHE = {}
_1D_KERNEL_CACHE = {}

[docs]def load_cache(path: str = "./.e2cnn_cache/diffops.pickle"): """ Load cached PDO discretizations from disk. The cache should have been previously created using :func:`~e2cnn.diffops.store_cache`. Args: path (str, optional): the path to the file with discretizations. """ if os.path.exists(path): print("Loading cached Diffops") with open(path, "rb") as f: _DIFFOP_CACHE = pickle.load(f) else: print("Diffop cache not found, skipping")
[docs]def store_cache(path: str = "./.e2cnn_cache/diffops.pickle"): """ Cache PDO discretizations on disk. The cache can later be loaded using :func:`~e2cnn.diffops.load_cache`. This will speed up network instantiation as long as the architecture does not change too much. Args: path (str, optional): the path to the file with discretizations. """ directory = os.path.dirname(path) os.makedirs(directory, exist_ok=True) with open(path, "w+b") as f: pickle.dump(_DIFFOP_CACHE, f)
def discretize_homogeneous_polynomial( points: np.ndarray, coefficients: np.ndarray, smoothing: float = None, phi: str = "ga", method: str = "fd", ) -> np.ndarray: r"""Discretize a homogeneous partial differential operator. Homogeneous means that all terms have the same derivative order. See :meth:`e2cnn.diffops.DiffopBasis.sample` for details. .. warning:: The discretization relies on two external packages: `sympy <https://docs.sympy.org/>`_ and `rbf <https://rbf.readthedocs.io>`_. If they are not available, an error is raised. """ assert isinstance(points, np.ndarray) assert points.shape[0] == 2 num_points = points.shape[1] targets = np.array([[0, 0]]) key = (points.tobytes(), coefficients.tobytes(), smoothing, phi, method) if key in _DIFFOP_CACHE: return _DIFFOP_CACHE[key] n = len(coefficients) - 1 nonzero = (coefficients != 0) nonzero_indices = [k for k in range(n + 1) if nonzero[k]] diffs = [[n - k, k] for k in nonzero_indices] if not diffs: # we have the zero operator return np.zeros(num_points) if method == "gauss": if smoothing is None: raise ValueError("smoothing must be set when method = 'gauss'") if not _RBF_AVAILABLE: raise RuntimeError("Using derivatives of Gaussians for discretization " "requires the RBF package, please install it. " "See https://github.com/treverhines/RBF for instructions.") out = gaussian_derivative(coefficients, points, smoothing) elif method == "fd": if not _SYMPY_AVAILABLE: raise RuntimeError("Using finite difference discretization " "requires sympy, please install it.") # For FD, points needs to be a regular grid, and we need to # extract its projection onto the two axes. axis_points = (np.unique(points[0]), np.unique(points[1])) # Here, we just check that everything went as expected # (i.e. that the input had the expected format). # The minus sign is the convention that get_grid_coords # uses, which is what the modules in the nn package use. check_points = np.stack( np.meshgrid(axis_points[0], -axis_points[1], indexing="xy") ).reshape(2, -1) if not np.array_equal(check_points, points): print(axis_points) print(points) print(check_points) raise ValueError("Format of points doesn't match the one needed for FD.") kernels = np.stack( [ # type checkers get confused by expanding diff, but we know # that it has length 2. discretize_2d_monomial(*diff, axis_points) # type: ignore for diff in diffs ] ).reshape(-1, num_points) out = np.sum( coefficients[nonzero][:, None] * kernels, axis=0 ) elif method == "rbffd": if not _RBF_AVAILABLE: raise RuntimeError("Using RBF-FD for discretization " "requires the RBF package, please install it. " "See https://github.com/treverhines/RBF for instructions.") out = weight_matrix(targets, points.T, num_points, diffs, coefficients[nonzero], phi=phi) if np.any(np.isnan(out.data)): raise Exception(f"NaNs encountered while discretizing diffop {display_diffop(coefficients)} on {num_points} points.\n" f"Diffop passed to RBF: {diffs} with coefficients {coefficients[nonzero]}") if np.all(out.data == 0): warnings.warn(f"Zero filter encountered while discretizing diffop {display_diffop(coefficients)} on {num_points} points. " "This might indicate that the kernel size is too small for this differential operator.\n" f"Diffop passed to RBF: {diffs} with coefficients {coefficients[nonzero]}", RuntimeWarning) if np.any(np.abs(out.data) > 1e2): warnings.warn(f"Large filter values encountered while discretizing diffop {display_diffop(coefficients)} on {num_points} points. " "A larger kernel size or different RBF might help.\n" f"Max abs filter value: {np.max(np.abs(out))}\n" f"Diffop passed to RBF: {diffs} with coefficients {coefficients[nonzero]}", RuntimeWarning) out = out.toarray().flatten() else: raise ValueError(f"Discretization method {method} not recognized. " "Choices are 'fd', 'gauss' and 'rbffd'.") _DIFFOP_CACHE[key] = out return out def discretize_1d_monomial(n: int, points: List[float]) -> np.ndarray: """Discretize the differential operator d^n/dx^n as a convolutional kernel.""" # calculating the finite difference coefficients using sympy is fast, # but given that this function is called extremely often when sampling # a basis, it's still a bottleneck. Even with the cache on the level # of differential operators (because that one only caches something # if exactly the same operator appears multiple times). # So we use an additional cache here, which caches single monomials. key = (n, tuple(points)) if key not in _1D_KERNEL_CACHE: assert n < len(points), (f"Can't discretize differential operator of order {n} on {len(points)} points, " f"at least {n + 1} points are needed") _1D_KERNEL_CACHE[key] = fd_weights(n, points) return _1D_KERNEL_CACHE[key] def fd_weights(n, points): weights = finite_diff_weights(n, points, 0) # first -1 is because we want the highest order (n), # second -1 means we want the most accurate approximation (using all points) return np.array(weights[-1][-1], dtype=float) def discretize_2d_monomial(n_x: int, n_y: int, points: Tuple[List[float], List[float]], ) -> np.ndarray: """Discretize the differential operator d^{n_x + n_y}/{dx^n_x dy^n_y}.""" x_kernel = discretize_1d_monomial(n_x, points[0]) y_kernel = discretize_1d_monomial(n_y, points[1]) return x_kernel[:, None] * y_kernel[None, :] def multiply_polynomials(a: np.ndarray, b: np.ndarray): """Multiply two homogeneous polynomials. Args: a (ndarray): coefficients of x^n, x^{n - 1}y, ..., y^n for the first polynomial b (ndarray): coefficients for the second polynomial Returns: coefficients of the product """ n, m = len(a), len(b) # compute the Cauchy product return np.array([ sum(a[l] * b[k - l] for l in range(k + 1) if l < n and (k - l) < m) for k in range(n + m - 1) ]) def expand_binomial(a, b, exponent): """Expand (ax + by)^exponent in terms of monomials and return their coefficients.""" ks = np.arange(exponent + 1) binom_coeffs = scipy.special.binom(exponent, ks) return binom_coeffs * a ** (exponent - ks) * b ** ks def transform_polynomial(coefficients: np.ndarray, matrix: np.ndarray): """Calculate the coefficients of P(Ax), where P is the homogeneous polynomial defined by the given coefficients and A is the given matrix. Args: coefficients: ndarray of shape(..., n + 1), where the last axis enumerates coefficients for x^n, x^{n - 1}y, ..., y^n. The other axes are batch dimension. matrix: ndarray of shape (2, 2)""" n = coefficients.shape[-1] - 1 batch_shape = coefficients.shape[:-1] # the transformed polynomial will have the same degree: transformed = np.zeros(batch_shape + (n + 1, )) # now we iterate over all n + 1 coefficients: for i in range(n + 1): # we have a term coeff * x^{n - i} * y^i # First, we calculate the transformed versions of the x and y terms: # (x')^{n - i} = (A_11 x + A_21 y)^{n - i} x_trafo = expand_binomial(matrix[0, 0], matrix[1, 0], n - i) # (y')^i = (A_12 x + A_22 y)^i y_trafo = expand_binomial(matrix[0, 1], matrix[1, 1], i) # then we combine them xy_trafo = multiply_polynomials(x_trafo, y_trafo) # and finally scale by the coefficient # broadcasting: (*batch_shape, 1) * (1, ..., 1, n + 1) -> (*batch_shape, n + 1) transformed += coefficients[..., i, None] * xy_trafo[(None, ) * len(batch_shape)] return transformed def gaussian_derivative(coefficients: np.ndarray, points: np.ndarray, std: float = 1.): n = len(coefficients) - 1 num_points = points.shape[1] nonzero = (coefficients != 0) nonzero_indices = [k for k in range(n + 1) if nonzero[k]] diffs = [[n - k, k] for k in nonzero_indices] if not diffs: # we have the zero operator return np.zeros(num_points) center = np.zeros((1, 2)) kernels = np.stack( [_gaussian(points.T, center, 1/std**2, np.array(diff)).flatten() for diff in diffs] ) return np.sum( coefficients[nonzero][:, None] * kernels, axis=0 ) def homogenized_cheby(n: int, kind: str = "t") -> np.ndarray: """Compute the coefficients for the homogenized version of T_n or U'_n. Args: n: degree of the polynomial. May be negative, in that case the degree will be abs(n) but the sign may differ (see notes for exact definition) kind: Either 't' for the first kind or 'u' for the second kind Returns: ndarray with coefficients, ordered in descending order of the power of x, i.e. x^n, x^{n - 1}y, ..., y^n.""" sign = np.sign(n) n = abs(n) kind = kind.lower() result = np.zeros(n + 1) if kind == "t": q = n // 2 ks = np.arange(q + 1) # these are the coefficients of x^{n - 2k}y^{2k}, from k = 0 to n/2 result[::2] = scipy.special.binom(n, 2 * ks) * (-1) ** ks elif kind == "u": q = (n - 1) // 2 ks = np.arange(q + 1) # these are the coefficients of x^{n - 2k - 1}y^{2k + 1}, from k = 0 to (n - 1)/2 result[1::2] = sign * scipy.special.binom(n, 2 * ks + 1) * (-1) ** ks else: raise ValueError("kind must be either 'u' or 't'") return result def laplacian_power(k: int): """Compute the coefficients of x^n, x^{n - 1}y, ..., y^n for the k-th power of the Laplacian.""" result = np.zeros(2*k + 1) # The k-th power is given by (x^2 + y^2)^k = sum_{i = 0}^k {k choose i} x^{2i}y^{2(k - i)}. # So the coefficient of x^{2i}y^{2(k - i)} is the binomial coefficient, and the coefficients # at odd indices are zero result[::2] = scipy.special.binom(k, np.arange(k + 1)) return result def display_diffop(coefficients: np.ndarray): """Show a homogeneous differential operator as a pretty string.""" out = "" n = len(coefficients) - 1 for k, coeff in enumerate(coefficients): if isinstance(coeff, float) and coeff.is_integer(): coeff = int(coeff) if isinstance(coeff, float): coeff = round(coeff, 2) if coeff == 0: continue if out == "": if coeff == -1: out += "-" elif coeff != 1: out += f"{coeff}" if coeff in {1, -1} and n == 0: # constant term, we can't drop the 1 out += "1" elif coeff == 1: out += " + " elif coeff == -1: out += " - " elif coeff > 0: out += f" + {coeff}" if not isinstance(coeff, int): out += " " elif coeff < 0: out += f" - {abs(coeff)}" if not isinstance(coeff, int): out += " " if n - k > 0: out += "x" + prettify_exponent(n - k) if n - k > 3 and k > 0: out += " " if k > 0: out += "y" + prettify_exponent(k) if out == "": out = "0" return out def prettify_exponent(k): if k == 1: return "" if k == 2: return "²" if k == 3: return "³" return f"^{k}" def eval_polys(coefficients, points): """Evaluate homogeneous polynomials on a set of points. Args: coefficients: list of ndarrays of shape (..., n + 1) with coefficients of x^n, x^{n - 1}y, ..., y^n points: ndarray with shape (2, N) Returns: ndarray with shape (L, ..., N) where N is the number of points and L the length of the input list """ results = [] for element in coefficients: dims = len(element.shape) nones = (dims - 1) * (None, ) n = element.shape[-1] - 1 ks = np.arange(n + 1).reshape(1, -1) xs = points[0].reshape(-1, 1) ys = points[1].reshape(-1, 1) monomials = xs**(n - ks) * ys**ks results.append((element[..., None, :] * monomials[nones]).sum(axis=-1)) return np.stack(results) def required_points(order: int, accuracy: int) -> int: """Compute the number of points necessary to achieve an approximation of the given accuracy. Important note: this function assumes that the points will arranged symmetrically around 0. Corollary 7 from https://arxiv.org/pdf/1102.3203.pdf then says that the order of accuracy may be boosted by 1 compared to the usual one, which is exploited in this function to return a lower number of points when possible. Args: order (int): order of the differential operator accuracy (int): desired accuracy (e.g. 2 for approximation to quadratic order) Returns: number of required sampling points """ # The usual formula for the required number of points: N = order + accuracy # The remaining question is whether we can reduce this number by # 1 and still retain the desired accuracy, an effect called "boosted order of accuracy". # Corollary 7 from https://arxiv.org/pdf/1102.3203.pdf says that # boosting happens if the number of points minus the order is odd. # We want to check whether N - 1 still has the desired accuracy, # i.e. whether (N - 1) - order, or simply accuracy - 1, is odd: if (accuracy - 1) % 2: return N - 1 return N def largest_possible_order(num_points: int, accuracy: int) -> int: """Compute the largest diffop order such that the given accuracy is satisfied. See ``required_points`` for details.""" order = num_points - accuracy assert order >= 0 # check if the next-larger order would be boosted: if (num_points - (order + 1)) % 2: return order + 1 return order def guaranteed_accuracy(num_points: int, order: int) -> int: """Compute the accuracy that is guaranteed for the given order and number of points. See ``required_points`` for details.""" accuracy = num_points - order assert accuracy >= 0 # check if boosting happens: if accuracy % 2: return accuracy + 1 return accuracy def symmetric_points(n: int, dilation: float = 1) -> List[float]: # return e.g. [-1, 0, 1] for n = 3 and [-0.5, 0.5] for n = 2, etc. points = range(n) offset = (n - 1) / 2 return [(x - offset) * dilation for x in points]