"""
- Coarsen rate matrices
- Coarsen transition matrix
"""
from __future__ import print_function
import os
import numpy as np
from dcma import tools
[docs]class Transitions(object):
"""Class representing a transition matrix and context information
(mainly edges and lag time).
"""
# --- CONSTRUCTORS ---
def __init__(self, matrix, lag_time, edges, weight=1.0):
assert lag_time > 0.0
self._matrix = matrix
self._lag_time = lag_time
self._edges = edges
self._bin_widths = tools.bin_edges_to_widths(edges)
self._bin_centers = tools.bin_edges_to_centers(edges)
assert weight >= 0.0
self._weight = weight
assert len(edges) == len(matrix) + 1
[docs] @staticmethod
def from_file(filename, weight=1.0):
"""
Read a transition matrix and context information from file.
"""
assert os.path.isfile(filename)
lag_time = None
edges = None
with open(filename, "r") as f:
n_comment_lines = 0
for l in f:
if not l.startswith("#"):
break
n_comment_lines += 1
if l.startswith("#lt"):
lag_time = float(l.replace("#lt","").strip())
elif l.startswith("#edges"):
edges = l.replace("#edges","").strip().split()
edges = np.array([float(e) for e in edges])
assert lag_time is not None
assert edges is not None
transition_matrix = np.loadtxt(filename, skiprows=n_comment_lines)
return Transitions(transition_matrix, lag_time, edges, weight)
# --- PROPERTIES ---
@property
def matrix(self):
return self._matrix
@property
def lag_time(self):
return self._lag_time
@property
def bin_widths(self):
return self._bin_widths
@property
def bin_centers(self):
return self._bin_centers
@property
def edges(self):
return self._edges
@property
def weight(self):
return self._weight
def __add__(self, other):
assert abs(other.lag_time - self.lag_time) < 0.5, "Can only add transition matrices with same lag time."
assert len(self.edges) == len(other.edges), "Can only add transition matrices with same number of bins."
assert tools.are_edges_consistent([self.edges, other.edges], [self.weight, other.weight]), \
"Edges have to be consistent in order to add transition matrices."
m = self.matrix + other.matrix
return Transitions(matrix=m,
lag_time=self.lag_time,
edges=np.average([self.edges, other.edges], axis=0, weights=[self.weight, other.weight]),
weight=self.weight + other.weight
)
def __radd__(self, other):
if other == 0: # first iteration of sum()
return self
assert abs(other.lag_time - self.lag_time) < 0.001, "Can only add transition matrices with same lag time."
assert len(self.edges) == len(other.edges), "Can only add transition matrices with same number of bins."
assert tools.are_edges_consistent([self.edges, other.edges], [self.weight, other.weight]), \
"Edges have to be consistent in order to add transition matrices."
self._matrix += other.matrix
self._edges = np.average([self.edges, other.edges], axis=0, weights=[self.weight, other.weight])
self._weight += other.weight
self._bin_widths = tools.bin_edges_to_widths()
self._bin_centers = tools.bin_edges_to_centers()
return self
def __eq__(self, other):
return (
np.isclose(other.matrix, self._matrix).all()
and np.isclose(other.edges, self._edges).all()
and abs(other.weight - self._weight) < 1e-7
and np.isclose(other.bin_widths, self._bin_widths).all()
and np.isclose(other.bin_centers, self._bin_centers).all()
)
# --- METHODS ---
[docs] def coarsen(self, new_edges):
raise NotImplementedError
[docs] def save(self, filename, dt=1.0, count="pbc"):
"""
save transition matrix to file
Args:
dt (float): The dt that is written into the header.
count (str): That count that is written into the header.
"""
edges = " ".join([str(e) for e in self.edges])
lines = [
"#lt {}".format(self.lag_time),
"#count {}".format(count),
"#dt {}".format(dt),
"#dn {}".format(int(self.lag_time)),
"#edges {}".format(edges)
]
lines += [ " ".join("{:d}".format(int(el)) for el in row) for row in self.matrix]
with open(filename, 'w') as fp:
fp.writelines([line + os.linesep for line in lines])
[docs]def collapse_transition_matrices(transition_matrices):
"""
Combines all transition matrices with equal lag times into one transition matrix each.
Args:
transition_matrices: A list of transition matrices. Different transition matrices might share a lag time.
Returns:
A list of transition matrices. Only one transition matrix per lag time.
"""
assert len(transition_matrices) > 0, "transition_matices must be non-empty list"
collapsed = {}
for t in transition_matrices:
key = round(t.lag_time, 1)
if key in collapsed:
collapsed[key] += t
else:
collapsed[key] = t
return list(collapsed.values())
[docs]def coarsen_rate_matrix(rate, new_edges):
raise NotImplementedError