"""Create test problems for tensor factorizations."""
import logging
import math
from dataclasses import dataclass, field
from typing import Callable, Optional, Tuple, Union, cast, overload
import numpy as np
from numpy_groupies import aggregate as accumarray
import pyttb as ttb
from pyttb.pyttb_utils import Shape, parse_shape
solution_generator = Callable[[Tuple[int, ...]], np.ndarray]
core_generator_t = Callable[
[Tuple[int, ...]], Union[ttb.tensor, ttb.sptensor, np.ndarray]
]
def randn(shape: Tuple[int, ...]) -> np.ndarray:
"""Stub for MATLAB randn.
TODO move somewhere shareable.
"""
return np.random.normal(0, 1, size=shape)
[docs]
@dataclass
class BaseProblem:
"""Parameters general to all solutions.
Attributes
----------
shape:
Tensor shape for generated problem.
factor_generator:
Method to generate factor matrices.
symmetric:
List of modes that should be symmetric.
For instance, `[(1,2), (3,4)]` specifies that
modes 1 and 2 have identical factor matrices, and modes 3 and 4
also have identical factor matrices.
num_factors:
Number of factors.
noise:
Amount of Gaussian noise to add to solution.
If data is sparse noise is only added to nonzero entries.
"""
shape: Shape = field(metadata={"doc": "A shape"})
factor_generator: solution_generator = randn
symmetric: Optional[list[Tuple[int, int]]] = None
num_factors: Union[int, list[int], None] = None
noise: float = 0.10
def __post_init__(self):
self.shape = ttb.pyttb_utils.parse_shape(self.shape)
if not 0.0 <= self.noise <= 1.0:
raise ValueError(f"Noise must be in [0,1] but got {self.noise}")
[docs]
@dataclass
class CPProblem(BaseProblem):
"""Parameters specifying CP Solutions.
Attributes
----------
shape:
Tensor shape for generated problem.
factor_generator:
Method to generate factor matrices.
symmetric:
List of modes that should be symmetric.
For instance, `[(1,2), (3,4)]` specifies that
modes 1 and 2 have identical factor matrices, and modes 3 and 4
also have identical factor matrices.
num_factors:
Number of factors.
noise:
Amount of Gaussian noise to add to solution.
If data is sparse noise is only added to nonzero entries.
weight_generator:
Method to generate weights for ktensor solution.
"""
# NOTE inherited attributes are manually copy pasted, keep aligned between problems
num_factors: int = 2
weight_generator: solution_generator = np.random.random
# TODO: This is in DataParams in MATLAB, but only works for CP problems so
# feels more reasonable here
sparse_generation: Optional[float] = None
[docs]
@dataclass
class TuckerProblem(BaseProblem):
"""Parameters specifying Tucker Solutions.
Attributes
----------
shape:
Tensor shape for generated problem.
factor_generator:
Method to generate factor matrices.
symmetric:
List of modes that should be symmetric.
For instance, `[(1,2), (3,4)]` specifies that
modes 1 and 2 have identical factor matrices, and modes 3 and 4
also have identical factor matrices.
num_factors:
Number of factors.
noise:
Amount of Gaussian noise to add to solution.
If data is sparse noise is only added to nonzero entries.
core_generator:
Method to generate weights for ttensor solution.
"""
# TODO post_init set to [2, 2, 2]
num_factors: Optional[list[int]] = None
core_generator: core_generator_t = randn
def __post_init__(self):
super().__post_init__()
self.num_factors = self.num_factors or [2, 2, 2]
[docs]
@dataclass
class ExistingSolution:
"""Parameters for using an existing tensor solution.
Attributes
----------
solution:
Pre-existing tensor solution (ktensor or ttensor).
noise:
Amount of Gaussian noise to add to solution.
If data is sparse noise is only added to nonzero entries.
"""
solution: Union[ttb.ktensor, ttb.ttensor]
noise: float = 0.10
def __post_init__(self):
if not 0.0 <= self.noise <= 1.0:
raise ValueError(f"Noise must be in [0,1] but got {self.noise}")
@property
def symmetric(self) -> None:
"""Get the symmetric modes from the solution."""
# ExistingSolution doesn't support symmetry constraints
return None
[docs]
@dataclass
class ExistingTuckerSolution(ExistingSolution):
"""Parameters for using an existing tucket tensor solution.
Attributes
----------
solution:
Pre-existing ttensor solution.
noise:
Amount of Gaussian noise to add to solution.
If data is sparse noise is only added to nonzero entries.
"""
solution: ttb.ttensor
[docs]
@dataclass
class ExistingCPSolution(ExistingSolution):
"""Parameters for using an existing tucket tensor solution.
Attributes
----------
solution:
Pre-existing ktensor solution.
noise:
Amount of Gaussian noise to add to solution.
If data is sparse noise is only added to nonzero entries.
sparse_generation:
Generate a sparse tensor that can be scaled so that the
column factors and weights are stochastic. Provide a number
of nonzeros to be inserted. A value in range [0,1) will be
interpreted as a ratio.
"""
solution: ttb.ktensor
sparse_generation: Optional[float] = None
[docs]
@dataclass
class MissingData:
"""Parameters to control missing data.
Attributes
----------
missing_ratio:
Proportion of missing data.
missing_pattern:
An explicit tensor representing missing data locations.
sparse_model:
Whether to generate sparse rather than dense missing data pattern.
Only useful for large tensors that don't easily fit in memory and
when missing ratio > 0.8.
"""
missing_ratio: float = 0.0
missing_pattern: Optional[Union[ttb.sptensor, ttb.tensor]] = None
sparse_model: bool = False
def __post_init__(self):
if not 0.0 <= self.missing_ratio <= 1.0:
raise ValueError(
f"Missing ratio must be in [0,1] but got {self.missing_ratio}"
)
if self.missing_ratio > 0.0 and self.missing_pattern is not None:
raise ValueError(
"Can't set ratio and explicit pattern to specify missing data. "
"Select one or the other."
)
[docs]
def has_missing(self) -> bool:
"""Check if any form of missing data is requested."""
return self.missing_ratio > 0.0 or self.missing_pattern is not None
[docs]
def raise_symmetric(self):
"""Raise for unsupported symmetry request."""
if self.missing_ratio:
raise ValueError("Can't generate a symmetric problem with missing data.")
if self.sparse_model:
raise ValueError("Can't generate sparse symmetric problem.")
[docs]
def get_pattern(self, shape: Shape) -> Union[None, ttb.tensor, ttb.sptensor]:
"""Generate a tensor pattern of missing data."""
if self.missing_pattern is not None:
if self.missing_pattern.shape != shape:
raise ValueError(
"Missing pattern and problem shapes are not compatible."
)
return self.missing_pattern
if self.missing_ratio == 0.0:
# All usages of this are internal, should we just rule out this situation?
return None
if self.missing_ratio < 0.8 and self.sparse_model:
logging.warning(
"Setting sparse to false because there are"
" fewer than 80% missing elements."
)
return _create_missing_data_pattern(
shape, self.missing_ratio, self.sparse_model
)
def _create_missing_data_pattern(
shape: Shape, missing_ratio: float, sparse_model: bool = False
) -> Union[ttb.tensor, ttb.sptensor]:
"""Create a randomly missing element indicator tensor.
Creates a binary tensor of specified size with 0's indication missing data
and 1's indicating valid data. Will only return a tensor that has at least
one entry per N-1 dimensional slice.
"""
shape = parse_shape(shape)
ndim = len(shape)
P = math.prod(shape)
Q = math.ceil((1 - missing_ratio) * P)
W: Union[ttb.tensor, ttb.sptensor]
# Create tensor
## Keep iterating until tensor is created or we give up.
# TODO: make range configurable?
for _ in range(20):
if sparse_model:
# Start with 50% more than Q random subs
# Note in original matlab to work out expected value of a*Q to guarantee
# Q unique entries
subs = np.unique(
np.floor(
np.random.random((int(np.ceil(1.5 * Q)), len(shape))).dot(
np.diag(shape)
)
),
axis=0,
).astype(int)
# Check if there are too many unique subs
if len(subs) > Q:
# TODO: check if note from matlab still relevant
# Note in original matlab: unique orders the subs and would bias toward
# first subs with lower values, so we sample to cut back
idx = np.random.permutation(subs.shape[0])
subs = subs[idx[:Q]]
elif subs.shape[0] < Q:
logging.warning(
f"Only generated {subs.shape[0]} of " f"{Q} desired subscripts"
)
W = ttb.sptensor(
subs,
np.ones(
(len(subs), 1),
),
shape=shape,
)
else:
# Compute the linear indices of the missing entries.
idx = np.random.permutation(P)
idx = idx[:Q]
W = ttb.tenzeros(shape)
W[idx] = 1
# return W
# Check if W has any empty slices
isokay = True
for n in range(ndim):
all_but_n = np.arange(W.ndims)
all_but_n = np.delete(all_but_n, n)
collapse_W = W.collapse(all_but_n)
if isinstance(collapse_W, np.ndarray):
isokay &= bool(np.all(collapse_W))
else:
isokay &= bool(np.all(collapse_W.double()))
# Quit if okay
if isokay:
break
if not isokay:
raise ValueError(
f"After {iter} iterations, cannot produce a tensor with"
f"{missing_ratio*100} missing data without an empty slice."
)
return W
@overload
def create_problem(
problem_params: CPProblem, missing_params: Optional[MissingData] = None
) -> Tuple[
ttb.ktensor, Union[ttb.tensor, ttb.sptensor]
]: ... # pragma: no cover see coveragepy/issues/970
@overload
def create_problem(
problem_params: TuckerProblem,
missing_params: Optional[MissingData] = None,
) -> Tuple[ttb.ttensor, ttb.tensor]: ... # pragma: no cover see coveragepy/issues/970
@overload
def create_problem(
problem_params: ExistingSolution,
missing_params: Optional[MissingData] = None,
) -> Tuple[
Union[ttb.ktensor, ttb.ttensor], Union[ttb.tensor, ttb.sptensor]
]: ... # pragma: no cover see coveragepy/issues/970
[docs]
def create_problem(
problem_params: Union[CPProblem, TuckerProblem, ExistingSolution],
missing_params: Optional[MissingData] = None,
) -> Tuple[Union[ttb.ktensor, ttb.ttensor], Union[ttb.tensor, ttb.sptensor]]:
"""Generate a problem and solution.
Arguments
---------
problem_params:
Parameters related to the problem to generate, or an existing solution.
missing_params:
Parameters to control missing data in the generated data/solution.
Examples
--------
Base example params
>>> shape = (5, 4, 3)
Generate a CP problem
>>> cp_specific_params = CPProblem(shape=shape, num_factors=3, noise=0.1)
>>> no_missing_data = MissingData()
>>> solution, data = create_problem(cp_specific_params, no_missing_data)
>>> diff = (solution.full() - data).norm() / solution.full().norm()
>>> bool(np.isclose(diff, 0.1))
True
Generate Tucker Problem
>>> tucker_specific_params = TuckerProblem(shape, num_factors=[3, 3, 2], noise=0.1)
>>> solution, data = create_problem(tucker_specific_params, no_missing_data)
>>> diff = (solution.full() - data).norm() / solution.full().norm()
>>> bool(np.isclose(diff, 0.1))
True
Use existing solution
>>> factor_matrices = [np.random.random((dim, 3)) for dim in shape]
>>> weights = np.random.random(3)
>>> existing_ktensor = ttb.ktensor(factor_matrices, weights)
>>> existing_params = ExistingSolution(existing_ktensor, noise=0.1)
>>> solution, data = create_problem(existing_params, no_missing_data)
>>> assert solution is existing_ktensor
"""
if missing_params is None:
missing_params = MissingData()
if problem_params.symmetric is not None:
missing_params.raise_symmetric()
solution = generate_solution(problem_params)
data: Union[ttb.tensor, ttb.sptensor]
if (
isinstance(problem_params, (CPProblem, ExistingCPSolution))
and problem_params.sparse_generation is not None
):
if missing_params.has_missing():
raise ValueError(
f"Can't combine missing data {MissingData.__name__} and "
f" sparse generation {CPProblem.__name__}."
)
solution = cast(ttb.ktensor, solution)
solution, data = generate_data_sparse(solution, problem_params)
elif missing_params.has_missing():
pattern = missing_params.get_pattern(solution.shape)
data = generate_data(solution, problem_params, pattern)
else:
data = generate_data(solution, problem_params)
return solution, data
def generate_solution_factors(base_params: BaseProblem) -> list[np.ndarray]:
"""Generate the factor matrices for either type of solution."""
# Get shape of final tensor
shape = cast(Tuple[int, ...], base_params.shape)
# Get shape of factors
if isinstance(base_params.num_factors, int):
nfactors = [base_params.num_factors] * len(shape)
elif base_params.num_factors is not None:
nfactors = base_params.num_factors
else:
raise ValueError("Num_factors shouldn't be none.")
if len(nfactors) != len(shape):
raise ValueError(
"Num_factors should be the same dimensions as shape but got"
f"{nfactors} and {shape}"
)
factor_matrices = []
for shape_i, nfactors_i in zip(shape, nfactors):
factor_matrices.append(base_params.factor_generator((shape_i, nfactors_i)))
if base_params.symmetric is not None:
for grp in base_params.symmetric:
for j in range(1, len(grp)):
factor_matrices[grp[j]] = factor_matrices[grp[0]]
return factor_matrices
@overload
def generate_solution(
problem_params: TuckerProblem,
) -> ttb.ttensor: ...
@overload
def generate_solution(
problem_params: CPProblem,
) -> ttb.ktensor: ...
@overload
def generate_solution(
problem_params: ExistingSolution,
) -> Union[ttb.ktensor, ttb.ttensor]: ...
def generate_solution(
problem_params: Union[CPProblem, TuckerProblem, ExistingSolution],
) -> Union[ttb.ktensor, ttb.ttensor]:
"""Generate problem solution."""
if isinstance(problem_params, ExistingSolution):
return problem_params.solution
factor_matrices = generate_solution_factors(problem_params)
# Create final model
if isinstance(problem_params, TuckerProblem):
nfactors = cast(list[int], problem_params.num_factors)
generated_core = problem_params.core_generator(tuple(nfactors))
if isinstance(generated_core, (ttb.tensor, ttb.sptensor)):
core = generated_core
else:
core = ttb.tensor(generated_core)
return ttb.ttensor(core, factor_matrices)
elif isinstance(problem_params, CPProblem):
weights = problem_params.weight_generator((problem_params.num_factors,))
return ttb.ktensor(factor_matrices, weights)
raise ValueError(f"Unsupported problem parameter type: {type(problem_params)=}")
@overload
def generate_data(
solution: Union[ttb.ktensor, ttb.ttensor],
problem_params: Union[BaseProblem, ExistingSolution],
pattern: Optional[ttb.tensor] = None,
) -> ttb.tensor: ... # pragma: no cover see coveragepy/issues/970
@overload
def generate_data(
solution: Union[ttb.ktensor, ttb.ttensor],
problem_params: Union[BaseProblem, ExistingSolution],
pattern: ttb.sptensor,
) -> ttb.sptensor: ... # pragma: no cover see coveragepy/issues/970
def generate_data(
solution: Union[ttb.ktensor, ttb.ttensor],
problem_params: Union[BaseProblem, ExistingSolution],
pattern: Optional[Union[ttb.tensor, ttb.sptensor]] = None,
) -> Union[ttb.tensor, ttb.sptensor]:
"""Generate problem data."""
shape = solution.shape
Rdm: Union[ttb.tensor, ttb.sptensor]
if pattern is not None:
if isinstance(pattern, ttb.sptensor):
Rdm = ttb.sptensor(pattern.subs, randn((pattern.nnz, 1)), pattern.shape)
Z = pattern * solution
elif isinstance(pattern, ttb.tensor):
Rdm = pattern * ttb.tensor(randn(shape))
Z = pattern * solution.full()
else:
raise ValueError(f"Unsupported sparsity pattern of type {type(pattern)}")
else:
# TODO don't we already have a randn tensor method?
Rdm = ttb.tensor(randn(shape))
Z = solution.full()
if problem_params.symmetric is not None:
# TODO Note in MATLAB code to follow up
Rdm = Rdm.symmetrize(np.array(problem_params.symmetric))
D = Z + problem_params.noise * Z.norm() * Rdm / Rdm.norm()
# Make sure the final result is definitely symmetric
if problem_params.symmetric is not None:
D = D.symmetrize(np.array(problem_params.symmetric))
return D
def prosample(nsamples: int, prob: np.ndarray) -> np.ndarray:
"""Proportional Sampling."""
bins = np.minimum(np.cumsum(np.array([0, *prob])), 1)
bins[-1] = 1
indices = np.digitize(np.random.random(nsamples), bins=bins)
return indices - 1
def generate_data_sparse(
solution: ttb.ktensor,
problem_params: Union[CPProblem, ExistingCPSolution],
) -> Tuple[ttb.ktensor, ttb.sptensor]:
"""Generate sparse CP data from a given solution."""
# Error check on solution
if np.any(solution.weights < 0):
raise ValueError("All weights must be nonnegative.")
if any(np.any(factor < 0) for factor in solution.factor_matrices):
raise ValueError("All factor matrices must be nonnegative.")
if problem_params.symmetric is not None:
logging.warning("Summetric constraints have been ignored.")
if problem_params.sparse_generation is None:
raise ValueError("Cannot generate sparse data without sparse_generation set.")
# Convert solution to probability tensor
# NOTE: Make copy since normalize modifies in place
P = solution.copy().normalize(mode=0)
eta = np.sum(P.weights)
P.weights /= eta
# Determine how many samples per component
nedges = problem_params.sparse_generation
if nedges < 1:
nedges = np.round(nedges * math.prod(P.shape)).astype(int)
nedges = int(nedges)
nd = P.ndims
nc = P.ncomponents
csample = prosample(nedges, P.weights)
# TODO check this
csums = accumarray(csample, 1, size=nc)
# Determine the subscripts for each randomly sampled entry
shape = solution.shape
subs: list[np.ndarray] = []
for c in range(nc):
nsample = csums[c]
if nsample == 0:
continue
subs.append(np.zeros((nsample, nd), dtype=int))
for d in range(nd):
subs[-1][:, d] = prosample(nsample, P.factor_matrices[d][:, c])
# TODO could sum csums and allocate in place with slicing
allsubs = np.vstack(subs)
# Assemble final tensor. Note that duplicates are summed.
# TODO should we have sptenones for purposes like this?
Z = ttb.sptensor.from_aggregator(
allsubs,
np.ones(
(len(allsubs), 1),
),
shape=shape,
)
# Rescale S so that it is proportional to the number of edges inserted
solution = P
# raise ValueError(
# f"{nedges=}"
# f"{solution.weights=}"
# )
solution.weights *= nedges
# TODO no noise introduced in this special case in MATLAB
return solution, Z