Source code for sfepy.homogenization.coefs_base

import os
import time

import numpy as nm
import numpy.linalg as nla
import scipy as sc

from sfepy.base.base import output, assert_, get_default, debug, Struct
from sfepy.discrete.evaluate import eval_equations
from sfepy.solvers.ts import TimeStepper
from sfepy.discrete.fem.meshio import HDF5MeshIO
from sfepy.solvers import Solver, eig
from sfepy.linalg import MatrixAction
from utils import iter_sym, create_pis, create_scalar_pis

[docs]class MiniAppBase(Struct):
[docs] def any_from_conf(name, problem, kwargs): try: cls = kwargs['class'] except KeyError: raise KeyError("set 'class' for MiniApp %s!" % name) obj = cls(name, problem, kwargs) return obj
any_from_conf = staticmethod(any_from_conf) def __init__(self, name, problem, kwargs): Struct.__init__(self, name=name, problem=problem, **kwargs) if self.problem is not None: self.problem.clear_equations() self.set_default('requires', []) self.set_default('is_linear', False) self.set_default('dtype', nm.float64) self.set_default('term_mode', None) self.set_default('set_volume', 'total') # Application-specific options. self.app_options = self.process_options()
[docs] def process_options(self): """ Setup application-specific options. Subclasses should implement this method as needed. Returns ------- app_options : Struct instance The application options. """
[docs] def init_solvers(self, problem): """ Setup solvers. Use local options if these are defined, otherwise use the global ones. For linear problems, assemble the matrix and try to presolve the linear system. """ if hasattr(self, 'solvers'): opts = self.solvers else: opts = problem.conf.options problem.set_conf_solvers(problem.conf.solvers, opts) problem.init_solvers() if self.is_linear: output('linear problem, trying to presolve...') tt = time.clock() ev = problem.get_evaluator() state = problem.create_state() try: mtx_a = ev.eval_tangent_matrix(state(), is_full=True) except ValueError: output('matrix evaluation failed, giving up...') raise problem.set_linear(True) problem.try_presolve(mtx_a) output('...done in %.2f s' % (time.clock() - tt)) else: problem.set_linear(False)
def _get_volume(self, volume): if isinstance(volume, dict): return volume[self.set_volume] else: return volume
[docs]class CorrSolution(Struct): """ Class for holding solutions of corrector problems. """
[docs] def iter_solutions(self): if hasattr(self, 'components'): for indx in self.components: key = ('%d' * len(indx)) % indx yield key, self.states[indx] else: yield '', self.state
[docs]class CorrMiniApp(MiniAppBase): def __init__(self, name, problem, kwargs): MiniAppBase.__init__(self, name, problem, kwargs) self.output_dir = self.problem.output_dir self.set_default('save_name', '(not_set)') self.set_default('dump_name', self.save_name) self.set_default('dump_variables', []) self.set_default('save_variables', self.dump_variables) self.save_name = os.path.normpath(os.path.join(self.output_dir, self.save_name)) self.dump_name = os.path.normpath(os.path.join(self.output_dir, self.dump_name))
[docs] def setup_output(self, save_format=None, dump_format=None, post_process_hook=None, file_per_var=None): """Instance attributes have precedence!""" self.set_default('dump_format', dump_format) self.set_default('save_format', save_format) self.set_default('post_process_hook', post_process_hook) self.set_default('file_per_var', file_per_var)
[docs] def get_save_name_base(self): return self.save_name
[docs] def get_dump_name_base(self): return self.get_save_name_base()
[docs] def get_save_name(self): return '.'.join((self.get_save_name_base(), self.save_format))
[docs] def get_dump_name(self): if self.dump_format is not None: return '.'.join((self.get_dump_name_base(), self.dump_format)) else: return None
[docs] def get_output(self, corr_sol, is_dump=False, extend=True, variables=None): if variables is None: variables = self.problem.get_variables() to_output = variables.state_to_output if is_dump: var_names = self.dump_variables extend = False else: var_names = self.save_variables out = {} for key, sol in corr_sol.iter_solutions(): for var_name in var_names: if key: skey = var_name + '_' + key else: skey = var_name dof_vector = sol[var_name] if is_dump: var = variables[var_name] shape = (var.n_dof / var.n_components, var.n_components) out[skey] = Struct(name = 'dump', mode = 'nodes', data = dof_vector, dofs = var.dofs, shape = shape, var_name = var_name) else: aux = to_output(dof_vector, var_info={var_name: (True, var_name)}, extend=extend) if self.post_process_hook is not None: aux = self.post_process_hook(aux, self.problem, None, extend=extend) for _key, val in aux.iteritems(): if key: new_key = _key + '_' + key else: new_key = _key out[new_key] = val return out
[docs] def save(self, state, problem, variables=None): save_name = self.get_save_name() if save_name is not None: extend = not self.file_per_var out = self.get_output(state, extend=extend, variables=variables) problem.save_state(save_name, out=out, file_per_var=self.file_per_var) dump_name = self.get_dump_name() if dump_name is not None: problem.save_state(dump_name, out=self.get_output(state, is_dump=True, variables=variables), file_per_var=False)
[docs]class ShapeDimDim(CorrMiniApp): def __call__(self, problem=None, data=None): problem = get_default(problem, self.problem) clist, pis = create_pis(problem, self.variables[0]) corr_sol = CorrSolution(name=self.name, states=pis, components=clist) return corr_sol
[docs]class ShapeDim(CorrMiniApp): def __call__(self, problem=None, data=None): problem = get_default(problem, self.problem) clist, pis = create_scalar_pis(problem, self.variables[0]) corr_sol = CorrSolution(name=self.name, states=pis, components=clist) return corr_sol
[docs]class OnesDim(CorrMiniApp): def __call__(self, problem=None, data=None): problem = get_default(problem, self.problem) var_name = self.variables[0] var = problem.get_variables(auto_create=True)[var_name] dim = problem.domain.mesh.dim nnod = var.n_nod e00 = nm.zeros((nnod, dim), dtype=nm.float64) e1 = nm.ones((nnod,), dtype=nm.float64) ones = nm.zeros((dim,), dtype=nm.object) clist = [] for ir in range(dim): aux = e00.copy() aux[:,ir] = e1 ones[ir] = {var_name : nm.ascontiguousarray(aux)} clist.append('pi_%d' % (ir,)) corr_sol = CorrSolution(name=self.name, states=ones, components=clist) return corr_sol
[docs]class CopyData(CorrMiniApp): def __call__(self, problem=None, data=None): problem = get_default(problem, self.problem) var_name = self.variable clist = ['data'] dn = self.data if type(dn) is list: data = problem for ii in dn: data = data.get(ii, 'None') else: data = problem.get(dn, 'None') if type(data) is dict: corr_sol = CorrSolution(name=self.name, state=data, components=clist) else: if data.dtype == nm.object: corr_sol = CorrSolution(name=self.name, states=data, components=clist) else: ndof, ndim = data.shape state = {var_name: data.reshape((ndof * ndim,))} corr_sol = CorrSolution(name=self.name, state=state, components=clist) return corr_sol
[docs]class CorrNN(CorrMiniApp): """ __init__() kwargs: { 'ebcs' : [], 'epbcs' : [], 'equations' : {}, 'set_variables' : None, }, """
[docs] def set_variables_default(variables, ir, ic, set_var, data): for (var, req, comp) in set_var: variables[var].set_data(data[req].states[ir,ic][comp])
set_variables_default = staticmethod(set_variables_default) def __init__(self, name, problem, kwargs): """When dim is not in kwargs, problem dimension is used.""" CorrMiniApp.__init__(self, name, problem, kwargs) self.set_default('dim', problem.get_dim()) def __call__(self, problem=None, data=None): problem = get_default(problem, self.problem) problem.set_equations(self.equations) problem.select_bcs(ebc_names=self.ebcs, epbc_names=self.epbcs, lcbc_names=self.get('lcbcs', [])) problem.update_materials(problem.ts) self.init_solvers(problem) variables = problem.get_variables() states = nm.zeros((self.dim, self.dim), dtype=nm.object) clist = [] for ir in range(self.dim): for ic in range(self.dim): if isinstance(self.set_variables, list): self.set_variables_default(variables, ir, ic, self.set_variables, data) else: self.set_variables(variables, ir, ic, **data) state = problem.solve() assert_(state.has_ebc()) states[ir,ic] = state.get_parts() clist.append((ir, ic)) corr_sol = CorrSolution(name=self.name, states=states, components=clist) self.save(corr_sol, problem) return corr_sol
[docs]class CorrN(CorrMiniApp):
[docs] def set_variables_default(variables, ir, set_var, data): for (var, req, comp) in set_var: variables[var].set_data(data[req].states[ir][comp])
set_variables_default = staticmethod(set_variables_default) def __init__(self, name, problem, kwargs): """When dim is not in kwargs, problem dimension is used.""" CorrMiniApp.__init__(self, name, problem, kwargs) self.set_default('dim', problem.get_dim()) def __call__(self, problem=None, data=None): problem = get_default(problem, self.problem) problem.set_equations(self.equations) problem.select_bcs(ebc_names=self.ebcs, epbc_names=self.epbcs, lcbc_names=self.get('lcbcs', [])) problem.update_materials(problem.ts) self.init_solvers(problem) variables = problem.get_variables() states = nm.zeros((self.dim,), dtype=nm.object) clist = [] for ir in range(self.dim): if isinstance(self.set_variables, list): self.set_variables_default(variables, ir, self.set_variables, data) else: self.set_variables(variables, ir, **data) state = problem.solve() assert_(state.has_ebc()) states[ir] = state.get_parts() clist.append((ir,)) corr_sol = CorrSolution(name=self.name, states=states, components=clist) self.save(corr_sol, problem) return corr_sol
[docs]class CorrDimDim(CorrNN): pass
[docs]class CorrDim(CorrN): pass
[docs]class CorrOne(CorrMiniApp):
[docs] def set_variables_default(variables, set_var, data): for (var, req, comp) in set_var: variables[var].set_data(data[req].state[comp])
set_variables_default = staticmethod(set_variables_default) def __call__(self, problem=None, data=None): problem = get_default(problem, self.problem) problem.set_equations(self.equations) problem.select_bcs(ebc_names=self.ebcs, epbc_names=self.epbcs, lcbc_names=self.get('lcbcs', [])) problem.update_materials(problem.ts) self.init_solvers(problem) variables = problem.get_variables() if hasattr(self, 'set_variables'): if isinstance(self.set_variables, list): self.set_variables_default(variables, self.set_variables, data) else: self.set_variables(variables, **data) state = problem.solve() assert_(state.has_ebc()) corr_sol = CorrSolution(name=self.name, state=state.get_parts()) self.save(corr_sol, problem) return corr_sol
[docs]class CorrSetBCS(CorrMiniApp): def __call__(self, problem=None, data=None): from sfepy.base.base import select_by_names from sfepy.discrete.variables import Variables from sfepy.discrete.state import State from sfepy.discrete.conditions import Conditions problem = get_default(problem, self.problem) conf_ebc = select_by_names(problem.conf.ebcs, self.ebcs) conf_epbc = select_by_names(problem.conf.epbcs, self.epbcs) ebcs = Conditions.from_conf(conf_ebc, problem.domain.regions) epbcs = Conditions.from_conf(conf_epbc, problem.domain.regions) conf_variables = select_by_names(problem.conf.variables, self.variable) problem.set_variables(conf_variables) variables = Variables.from_conf(conf_variables, problem.fields) variables.equation_mapping(ebcs, epbcs, problem.ts, problem.functions) state = State(variables) state.fill(0.0) state.apply_ebc() corr_sol = CorrSolution(name=self.name, state=state.get_parts()) self.save(corr_sol, problem, variables) return corr_sol
[docs]class CorrEqPar(CorrOne): """ The corrector which equation can be parametrized via 'eq_pars', the dimension is given by the number of parameters. Example: 'equations': 'dw_diffusion.5.Y(mat.k, q, p) = dw_surface_integrate.5.%s(q)', 'eq_pars': ('bYMp', 'bYMm'), 'class': cb.CorrEqPar, """ def __init__(self, name, problem, kwargs): """When dim is not in kwargs, problem dimension is used.""" CorrMiniApp.__init__(self, name, problem, kwargs) self.set_default('dim', len(self.eq_pars)) def __call__(self, problem=None, data=None): problem = get_default(problem, self.problem) states = nm.zeros((self.dim,), dtype=nm.object) clist = [] eqns ={} for ir in range(self.dim): for key_eq, val_eq in self.equations.iteritems(): eqns[key_eq] = val_eq % self.eq_pars[ir] problem.set_equations(eqns) problem.select_bcs(ebc_names=self.ebcs, epbc_names=self.epbcs, lcbc_names=self.get('lcbcs', [])) problem.update_materials(problem.ts) self.init_solvers(problem) variables = problem.get_variables() if hasattr(self, 'set_variables'): if isinstance(self.set_variables, list): self.set_variables_default(variables, self.set_variables, data) else: self.set_variables(variables, **data) state = problem.solve() assert_(state.has_ebc()) states[ir] = state.get_parts() clist.append((ir,)) corr_sol = CorrSolution(name=self.name, states=states, components=clist) self.save(corr_sol, problem) return corr_sol
[docs]class PressureEigenvalueProblem(CorrMiniApp): """Pressure eigenvalue problem solver for time-dependent correctors."""
[docs] def presolve(self, mtx): """Prepare A^{-1} B^T for the Schur complement.""" mtx_a = mtx['A'] mtx_bt = mtx['BT'] output('full A size: %.3f MB' % (8.0 * nm.prod(mtx_a.shape) / 1e6)) output('full B size: %.3f MB' % (8.0 * nm.prod(mtx_bt.shape) / 1e6)) ls = Solver.any_from_conf(self.problem.ls_conf, presolve=True, mtx=mtx_a) if self.mode == 'explicit': tt = time.clock() mtx_aibt = nm.zeros(mtx_bt.shape, dtype=mtx_bt.dtype) for ic in xrange(mtx_bt.shape[1]): mtx_aibt[:,ic] = ls(mtx_bt[:,ic].toarray().squeeze()) output('mtx_aibt: %.2f s' % (time.clock() - tt)) action_aibt = MatrixAction.from_array(mtx_aibt) else: ## # c: 30.08.2007, r: 13.02.2008 def fun_aibt(vec): # Fix me for sparse mtx_bt... rhs = sc.dot(mtx_bt, vec) out = ls(rhs) return out action_aibt = MatrixAction.from_function(fun_aibt, (mtx_a.shape[0], mtx_bt.shape[1]), nm.float64) mtx['action_aibt'] = action_aibt
[docs] def solve_pressure_eigenproblem(self, mtx, eig_problem=None, n_eigs=0, check=False): """G = B*AI*BT or B*AI*BT+D""" def get_slice(n_eigs, nn): if n_eigs > 0: ii = slice(0, n_eigs) elif n_eigs < 0: ii = slice(nn + n_eigs, nn) else: ii = slice(0, 0) return ii eig_problem = get_default(eig_problem, self.eig_problem) n_eigs = get_default(n_eigs, self.n_eigs) check = get_default(check, self.check) mtx_c, mtx_b, action_aibt = mtx['C'], mtx['B'], mtx['action_aibt'] mtx_g = mtx_b * action_aibt.to_array() # mtx_b must be sparse! if eig_problem == 'B*AI*BT+D': mtx_g += mtx['D'].toarray() mtx['G'] = mtx_g output(mtx_c.shape, mtx_g.shape) eigs, mtx_q = eig(mtx_c.toarray(), mtx_g, method='eig.sgscipy') if check: ee = nm.diag(sc.dot(mtx_q.T * mtx_c, mtx_q)).squeeze() oo = nm.diag(sc.dot(sc.dot(mtx_q.T, mtx_g), mtx_q)).squeeze() try: assert_(nm.allclose(ee, eigs)) assert_(nm.allclose(oo, nm.ones_like(eigs))) except ValueError: debug() nn = mtx_c.shape[0] if isinstance(n_eigs, tuple): output('required number of eigenvalues: (%d, %d)' % n_eigs) if sum(n_eigs) < nn: ii0 = get_slice(n_eigs[0], nn) ii1 = get_slice(-n_eigs[1], nn) eigs = nm.concatenate((eigs[ii0], eigs[ii1])) mtx_q = nm.concatenate((mtx_q[:,ii0], mtx_q[:,ii1]), 1) else: output('required number of eigenvalues: %d' % n_eigs) if (n_eigs != 0) and (abs(n_eigs) < nn): ii = get_slice(n_eigs, nn) eigs = eigs[ii] mtx_q = mtx_q[:,ii] ## from sfepy.base.plotutils import pylab, iplot ## pylab.semilogy(eigs) ## pylab.figure(2) ## iplot(eigs) ## pylab.show() ## debug() out = Struct(eigs=eigs, mtx_q=mtx_q) return out
def __call__(self, problem=None, data=None): problem = get_default(problem, self.problem) problem.set_equations(self.equations) problem.select_bcs(ebc_names=self.ebcs, epbc_names=self.epbcs, lcbc_names=self.get('lcbcs', [])) problem.update_materials() mtx = problem.equations.eval_tangent_matrices(problem.create_state()(), problem.mtx_a, by_blocks=True) self.presolve(mtx) evp = self.solve_pressure_eigenproblem(mtx) return Struct(name=self.name, ebcs=self.ebcs, epbcs=self.epbcs, mtx=mtx, evp=evp)
[docs]class TCorrectorsViaPressureEVP(CorrMiniApp): """ Time correctors via the pressure eigenvalue problem. """
[docs] def setup_equations(self, equations, problem=None): """ Set equations, update boundary conditions and materials. """ problem = get_default(problem, self.problem) problem.set_equations(equations) problem.select_bcs(ebc_names=self.ebcs, epbc_names=self.epbcs, lcbc_names=self.get('lcbcs', [])) problem.update_materials() # Assume parameters constant in time.
[docs] def compute_correctors(self, evp, sign, state0, ts, dump_name, save_name, problem=None, vec_g=None): problem = get_default(problem, self.problem) eigs = evp.evp.eigs mtx_q = evp.evp.mtx_q mtx = evp.mtx nr, nc = mtx_q.shape if vec_g is not None: output('nonzero pressure EBC: max = %e, min = %e' \ % (vec_g.max(), vec_g.min())) one = nm.ones((nc,), dtype=nm.float64) vu, vp = self.dump_variables variables = problem.get_variables() var_u = variables[vu] var_p = variables[vp] ## # follow_epbc = False -> R1 = - R2 as required. ? for other correctors? vec_p0 = sign * var_p.get_reduced(state0[vp], follow_epbc=False) ## print state0 ## print vec_p0 ## print vec_p0.min(), vec_p0.max(), nla.norm(vec_p0) ## debug() # xi0 = Q^{-1} p(0) = Q^T G p(0) vec_xi0 = sc.dot(mtx_q.T, sc.dot(mtx['G'], vec_p0[:,nm.newaxis])).squeeze() action_aibt = mtx['action_aibt'] e_e_qg = 0.0 iee_e_qg = 0.0 format = '====== time %%e (step %%%dd of %%%dd) ====='\ % ((ts.n_digit,) * 2) for step, time in ts: output(format % (time, step + 1, ts.n_step)) e_e = nm.exp(- eigs * time) e_e_qp = e_e * vec_xi0 # exp(-Et) Q^{-1} p(0) if vec_g is not None: Qg = sc.dot(mtx_q.T, vec_g) e_e_qg = e_e * Qg iee_e_qg = ((one - e_e) / eigs) * Qg vec_p = sc.dot(mtx_q, e_e_qp + iee_e_qg) vec_dp = - sc.dot(mtx_q, (eigs * e_e_qp - e_e_qg)) vec_u = action_aibt(vec_dp) ## bbb = sc.dot(vec_dp.T, - mtx['C'] * vec_p0) vec_u = var_u.get_full(vec_u) vec_p = var_p.get_full(vec_p) # BC nodes - time derivative of constant is zero! vec_dp = var_p.get_full(vec_dp, force_value=0.0) ## aaa = sc.dot(vec_xi0.T, eigs * (eigs * e_e_qp)) ## print aaa ## print bbb self.save(dump_name, save_name, vec_u, vec_p, vec_dp, ts, problem)
[docs] def save(self, dump_name, save_name, vec_u, vec_p, vec_dp, ts, problem): """ 1. saves raw correctors into hdf5 files (filename) 2. saves correctors transformed to output for visualization """ vu, vp = self.dump_variables out = {vu : Struct(name='dump', mode='nodes', data=vec_u, dofs=None, var_name=vu), vp : Struct(name='dump', mode='nodes', data=vec_p, dofs=None, var_name=vp), 'd'+vp : Struct(name='dump', mode='nodes', data=vec_dp, dofs=None, var_name=vp)} problem.save_state(dump_name, out=out, file_per_var=False, ts=ts) # For visualization... out = {} extend = not self.file_per_var variables = self.problem.get_variables() to_output = variables.state_to_output out.update(to_output(vec_u, var_info={vu : (True, vu)}, extend=extend)) out.update(to_output(vec_p, var_info={vp : (True, vp)}, extend=extend)) out.update(to_output(vec_dp, var_info={vp : (True, 'd'+vp)}, extend=extend)) if self.post_process_hook is not None: out = self.post_process_hook(out, problem, {vu : vec_u, vp : vec_p, 'd'+vp : vec_dp}, extend=extend) problem.save_state(save_name, out=out, file_per_var=self.file_per_var, ts=ts)
[docs] def verify_correctors(self, sign, state0, filename, problem=None): problem = get_default(problem, self.problem) io = HDF5MeshIO(filename) ts = TimeStepper(*io.read_time_stepper()) ts.set_step(0) problem.equations.init_time(ts) variables = self.problem.get_variables() vu, vp = self.dump_variables vdp = self.verify_variables[-1] p0 = sign * state0[vp] format = '====== time %%e (step %%%dd of %%%dd) ====='\ % ((ts.n_digit,) * 2) vv = variables ok = True for step, time in ts: output(format % (time, step + 1, ts.n_step)) data = io.read_data(step) if step == 0: assert_(nm.allclose(data[vp].data, p0)) state0 = problem.create_state() state0.set_full(data[vu].data, vu) state0.set_full(data[vp].data, vp) vv[vdp].set_data(data['d'+vp].data) problem.update_time_stepper(ts) state = problem.solve(state0) state, state0 = state(), state0() err = nla.norm(state - state0) / nla.norm(state0) output(state.min(), state.max()) output(state0.min(), state0.max()) output('>>>>>', err) ok = ok and (err < 1e-12) problem.advance(ts) return ok
[docs]class CoefDummy(MiniAppBase): """ Dummy class serving for computing and returning its requirements. """ def __call__(self, volume=None, problem=None, data=None): return data
[docs]class TSTimes(MiniAppBase): """Coefficient-like class, returns times of the time stepper.""" def __call__(self, volume=None, problem=None, data=None): problem = get_default(problem, self.problem) return problem.get_time_solver().ts.times
[docs]class VolumeFractions(MiniAppBase): """Coefficient-like class, returns volume fractions of given regions within the whole domain.""" def __call__(self, volume=None, problem=None, data=None): problem = get_default(problem, self.problem) vf = {} for region_name in self.regions: vkey = 'volume_%s' % region_name key = 'fraction_%s' % region_name equations, variables = problem.create_evaluable(self.expression % region_name) val = eval_equations(equations, variables) vf[vkey] = nm.asarray(val, dtype=nm.float64) vf[key] = vf[vkey] / self._get_volume(volume) return vf
[docs]class CoefSymSym(MiniAppBase):
[docs] def set_variables_default(variables, ir, ic, mode, set_var, data): def get_corr_state(corr, ir, ic): if hasattr(corr, 'states'): return corr.states[ir,ic] else: return corr.state mode2var = {'row' : 0, 'col' : 1} idx = mode2var[mode] for (var, req, comp) in [set_var[idx]] + set_var[2:]: if type(req) is tuple: val = get_corr_state(data[req[0]], ir, ic)[comp].copy() for ii in req[1:]: val += get_corr_state(data[ii], ir, ic)[comp] else: val = get_corr_state(data[req], ir, ic)[comp] variables[var].set_data(val)
set_variables_default = staticmethod(set_variables_default) def __call__(self, volume, problem=None, data=None): problem = get_default(problem, self.problem) dim, sym = problem.get_dim(get_sym=True) coef = nm.zeros((sym, sym), dtype=self.dtype) term_mode = self.term_mode equations, variables = problem.create_evaluable(self.expression, term_mode=term_mode) for ir, (irr, icr) in enumerate(iter_sym(dim)): if isinstance(self.set_variables, list): self.set_variables_default(variables, irr, icr, 'row', self.set_variables, data) else: self.set_variables(variables, irr, icr, 'row', **data) for ic, (irc, icc) in enumerate(iter_sym(dim)): if isinstance(self.set_variables, list): self.set_variables_default(variables, irc, icc, 'col', self.set_variables, data) else: self.set_variables(variables, irc, icc, 'col', **data) val = eval_equations(equations, variables, term_mode=term_mode) coef[ir,ic] = val coef /= self._get_volume(volume) return coef
[docs]class CoefFMSymSym(MiniAppBase): """ Fading memory sym x sym coefficients. """ def __call__(self, volume, problem=None, data=None): problem = get_default(problem, self.problem) dim, sym = problem.get_dim(get_sym=True) filename = self.set_variables(None, None, None, 0, 0, 'filename', **data) ts = TimeStepper(*HDF5MeshIO(filename).read_time_stepper()) coef = nm.zeros((ts.n_step, sym, sym), dtype=self.dtype) term_mode = self.term_mode equations, variables = problem.create_evaluable(self.expression, term_mode=term_mode) for ir, (irr, icr) in enumerate(iter_sym(dim)): filename = self.set_variables(None, None, None, irr, icr, 'filename', **data) io = HDF5MeshIO(filename) for step, time in ts: self.set_variables(variables, io, step, None, None, 'row', **data) for ic, (irc, icc) in enumerate(iter_sym(dim)): self.set_variables(variables, None, None, irc, icc, 'col', **data) val = eval_equations(equations, variables, term_mode=term_mode) coef[step,ir,ic] = val coef /= self._get_volume(volume) return coef
[docs]class CoefDimSym(MiniAppBase): def __call__(self, volume, problem=None, data=None): problem = get_default(problem, self.problem) dim, sym = problem.get_dim(get_sym=True) coef = nm.zeros((dim, sym), dtype=self.dtype) term_mode = self.term_mode equations, variables = problem.create_evaluable(self.expression, term_mode=term_mode) for ir in range(dim): self.set_variables(variables, ir, None, 'row', **data) for ic, (irc, icc) in enumerate(iter_sym(dim)): self.set_variables(variables, irc, icc, 'col', **data) val = eval_equations(equations, variables, term_mode=term_mode) coef[ir,ic] = val coef /= self._get_volume(volume) return coef
[docs]class CoefNN(MiniAppBase):
[docs] def set_variables_default(variables, ir, ic, mode, set_var, data): def get_corr_state(corr, ii): if hasattr(corr, 'states'): return corr.states[ii] else: return corr.state mode2var = {'row' : 0, 'col' : 1} if mode == 'col': ir = ic idx = mode2var[mode] for (var, req, comp) in [set_var[idx]] + set_var[2:]: if type(req) is tuple: val = get_corr_state(data[req[0]], ir)[comp].copy() for ii in req[1:]: val += get_corr_state(data[ii], ir)[comp] else: val = get_corr_state(data[req], ir)[comp] variables[var].set_data(val)
set_variables_default = staticmethod(set_variables_default) def __init__(self, name, problem, kwargs): """When dim is not in kwargs, problem dimension is used.""" MiniAppBase.__init__(self, name, problem, kwargs) self.set_default('dim', problem.get_dim()) def __call__(self, volume, problem=None, data=None): problem = get_default(problem, self.problem) coef = nm.zeros((self.dim, self.dim), dtype=self.dtype) term_mode = self.term_mode equations, variables = problem.create_evaluable(self.expression, term_mode=term_mode) if isinstance(self.set_variables, list): for ir in range(self.dim): self.set_variables_default(variables, ir, None, 'row', self.set_variables, data) for ic in range(self.dim): self.set_variables_default(variables, None, ic, 'col', self.set_variables, data) val = eval_equations(equations, variables, term_mode=term_mode) coef[ir,ic] = val else: for ir in range(self.dim): self.set_variables(variables, ir, None, 'row', **data) for ic in range(self.dim): self.set_variables(variables, None, ic, 'col', **data) val = eval_equations(equations, variables, term_mode=term_mode) coef[ir,ic] = val coef /= self._get_volume(volume) return coef
[docs]class CoefN(MiniAppBase):
[docs] def set_variables_default(variables, ir, set_var, data): def get_corr_state(corr, ii): if hasattr(corr, 'states'): return corr.states[ii] else: return corr.state for (var, req, comp) in set_var: if type(req) is tuple: val = get_corr_state(data[req[0]], ir)[comp].copy() for ii in req[1:]: val += get_corr_state(data[ii], ir)[comp] else: val = get_corr_state(data[req], ir)[comp] variables[var].set_data(val)
set_variables_default = staticmethod(set_variables_default) def __init__(self, name, problem, kwargs): """When dim is not in kwargs, problem dimension is used.""" MiniAppBase.__init__(self, name, problem, kwargs) self.set_default('dim', problem.get_dim()) def __call__(self, volume, problem=None, data=None): problem = get_default(problem, self.problem) coef = nm.zeros((self.dim,), dtype=self.dtype) term_mode = self.term_mode equations, variables = problem.create_evaluable(self.expression, term_mode=term_mode) for ir in range(self.dim): if isinstance(self.set_variables, list): self.set_variables_default(variables, ir, self.set_variables, data) else: self.set_variables(variables, ir, **data) val = eval_equations(equations, variables, term_mode=term_mode) coef[ir] = val coef /= self._get_volume(volume) return coef
[docs]class CoefDimDim(CoefNN): pass
[docs]class CoefDim(CoefN): pass
[docs]class CoefSym(MiniAppBase):
[docs] def set_variables_default(variables, ir, ic, mode, set_var, data): def get_corr_state(corr, ir, ic): if hasattr(corr, 'states'): return corr.states[ir,ic] else: return corr.state if mode == 'row': for (var, req, comp) in set_var: if type(req) is tuple: val = get_corr_state(data[req[0]], ir, ic)[comp].copy() for ii in req[1:]: val += get_corr_state(data[ii], ir, ic)[comp] else: val = get_corr_state(data[req], ir, ic)[comp] variables[var].set_data(val)
set_variables_default = staticmethod(set_variables_default) def __call__(self, volume, problem=None, data=None): problem = get_default(problem, self.problem) dim, sym = problem.get_dim(get_sym=True) coef = nm.zeros((sym,), dtype=self.dtype) term_mode = self.term_mode equations, variables = problem.create_evaluable(self.expression, term_mode=term_mode) if isinstance(self.set_variables, list): self.set_variables_default(variables, None, None, 'col', self.set_variables, data) else: self.set_variables(variables, None, None, 'col', **data) for ii, (ir, ic) in enumerate(iter_sym(dim)): if isinstance(self.set_variables, list): self.set_variables_default(variables, ir, ic, 'row', self.set_variables, data) else: self.set_variables(variables, ir, ic, 'row', **data) val = eval_equations(equations, variables, term_mode=term_mode) coef[ii] = val coef /= self._get_volume(volume) return coef
[docs]class CoefFMSym(MiniAppBase): """ Fading memory sym coefficients. """ def __call__(self, volume, problem=None, data=None): problem = get_default(problem, self.problem) dim, sym = problem.get_dim(get_sym=True) filename = self.set_variables(None, 0, 0, 'filename', **data) ts = TimeStepper(*HDF5MeshIO(filename).read_time_stepper()) coef = nm.zeros((ts.n_step, sym), dtype=self.dtype) term_mode = self.term_mode equations, variables = problem.create_evaluable(self.expression, term_mode=term_mode) self.set_variables(variables, None, None, 'col', **data) for ii, (ir, ic) in enumerate(iter_sym(dim)): filename = self.set_variables(None, ir, ic, 'filename', **data) io = HDF5MeshIO(filename) for step, time in ts: self.set_variables(variables, io, step, 'row', **data) val = eval_equations(equations, variables, term_mode=term_mode) coef[step,ii] = val coef /= self._get_volume(volume) return coef
[docs]class CoefOne(MiniAppBase):
[docs] def set_variables_default(variables, set_var, data): for (var, req, comp) in set_var: variables[var].set_data(data[req].state[comp])
set_variables_default = staticmethod(set_variables_default) def __call__(self, volume, problem=None, data=None): problem = get_default(problem, self.problem) term_mode = self.term_mode equations, variables = problem.create_evaluable(self.expression, term_mode=term_mode) if isinstance(self.set_variables, list): self.set_variables_default(variables, self.set_variables, data) else: self.set_variables(variables, **data) val = eval_equations(equations, variables, term_mode=term_mode) coef = val / self._get_volume(volume) return coef
[docs]class CoefFMOne(MiniAppBase): """ Fading memory scalar coefficients. """ def __call__(self, volume, problem=None, data=None): problem = get_default(problem, self.problem) filename = self.set_variables(None, None, None, 'filename', **data) io = HDF5MeshIO(filename) ts = TimeStepper(*io.read_time_stepper()) coef = nm.zeros((ts.n_step, 1), dtype=self.dtype) term_mode = self.term_mode equations, variables = problem.create_evaluable(self.expression, term_mode=term_mode) self.set_variables(variables, None, None, 'col', **data) for step, time in ts: self.set_variables(variables, io, step, 'row', **data) val = eval_equations(equations, variables, term_mode=term_mode) coef[step] = val coef /= self._get_volume(volume) return coef
[docs]class CoefSum(MiniAppBase): def __call__(self, volume, problem=None, data=None): coef = nm.zeros_like(data[self.requires[0]]) for i in range(len(self.requires)): coef += data[self.requires[i]] return coef
[docs]class CoefEval(MiniAppBase): """ Evaluate expression. """ def __call__(self, volume, problem=None, data=None): expr = self.expression for i in range(len(self.requires)): expr = expr.replace(self.requires[i], "data['%s']" % self.requires[i]) coef = eval(expr) return coef
[docs]class CoefNone(MiniAppBase): def __call__(self, volume, problem=None, data=None): coef = 0.0 return coef
[docs]class CoefExprPar(MiniAppBase): """ The coefficient which expression can be parametrized via 'expr_pars', the dimension is given by the number of parameters. Example: 'expression': 'dw_surface_ndot.5.Ys(mat_norm.k%d, corr1)', 'expr_pars': [ii for ii in range(dim)], 'class': cb.CoefExprPar, """
[docs] def set_variables_default(variables, ir, set_var, data): for (var, req, comp) in set_var: if hasattr(data[req], 'states'): variables[var].set_data(data[req].states[ir][comp]) else: variables[var].set_data(data[req].state[comp])
set_variables_default = staticmethod(set_variables_default) def __init__(self, name, problem, kwargs): """When dim is not in kwargs, problem dimension is used.""" MiniAppBase.__init__(self, name, problem, kwargs) dim = len(self.expr_pars) self.set_default('dim', dim) def __call__(self, volume, problem=None, data=None): problem = get_default(problem, self.problem) coef = nm.zeros((self.dim,), dtype=self.dtype) term_mode = self.term_mode for ir in range(self.dim): expression = self.expression % self.expr_pars[ir] equations, variables = \ problem.create_evaluable(expression, term_mode=term_mode) if isinstance(self.set_variables, list): self.set_variables_default(variables, ir, self.set_variables, data) else: self.set_variables(variables, ir, **data) val = eval_equations(equations, variables, term_mode=term_mode) coef[ir] = val coef /= self._get_volume(volume) return coef