Source code for alhambra.endreorder

# End reordering code to optimize placement.

import random as pyrand
from copy import deepcopy

import numpy as np
import numpy.ma as ma
import numpy.random as random
import stickydesign as sd

from . import anneal
from . import sensitivity as sens


[docs]def flatten(seq): for item in seq: if is_string_like(item) or not iterable(item): yield item else: for subitem in flatten(item): yield subitem
[docs]def ecomp(x): if x[-1] == "/": return x[:-1] else: return x + "/"
[docs]class FseqState: def __init__(self, seqs=None): if not seqs: self.seqs = {} else: self.seqs = seqs
[docs] def copy(self): return FseqState({"DT": self.seqs["DT"].copy(), "TD": self.seqs["TD"].copy()})
[docs]class cachedarray: def __init__(self, func, shape): self.arr = ma.masked_all(shape) self.func = func
[docs] def __getitem__(self, index): if self.arr.mask[index]: self.arr[index] = self.func(*index) return self.arr[index]
from copy import deepcopy
[docs]class FastState: def __init__(self, state): self.state = state
[docs] def copy(self): return deepcopy(self)
[docs] def __getitem__(self, it): return self.state[it]
[docs]class EndSystemFseq: def __init__( self, tilesys, newends=None, pairs=None, energetics=None, inputpairs=False, multiscore=False, ): tilesys = deepcopy(tilesys) self.ends = tilesys.ends self.tiles = tilesys.tiles self.tilesystem = tilesys self.ef = energetics self.multiscore = multiscore if not pairs: pairs = sens.consolidate_pairs( sens.senspairs(tilesys), comcomp=1, onlytop=True ) if inputpairs: inputpairs = [ tuple(z[:-1] for z, i in zip(x["ends"], x["input"]) if i) for x in self.tiles ] print(inputpairs) self.inputpairs = inputpairs self.names = {} fseqsTD, self.names["TD"] = ( list(x) for x in zip( *[ [end["fseq"].lower(), end["name"]] for end in self.ends if end["type"] == "TD" ] ) ) fseqsDT, self.names["DT"] = ( list(x) for x in zip( *[ [end["fseq"].lower(), end["name"]] for end in self.ends if end["type"] == "DT" ] ) ) self.seqs = {} self.seqs["TD"] = sd.endarray(fseqsTD, "TD") self.seqs["DT"] = sd.endarray(fseqsDT, "DT") self.initstate = FastState( { "DT": np.arange(0, len(self.seqs["DT"])), "TD": np.arange(0, len(self.seqs["TD"])), } ) self.enlocs = {} for i, endn in enumerate(self.names["TD"]): self.enlocs[endn] = (i, "TD") for i, endn in enumerate(self.names["DT"]): self.enlocs[endn] = (i, "DT") # ends that can be reordered. if newends: self.mutableTD = [ i for i, t in [self.enlocs[x] for x in newends] if t == "TD" ] self.mutableDT = [ i for i, t in [self.enlocs[x] for x in newends] if t == "DT" ] else: self.mutableTD = range(0, len(fseqsTD)) self.mutableDT = range(0, len(fseqsDT)) self.pairdict = {} for pairclass, memberset in pairs.items(): for x, y in memberset: self.pairdict[(x, ecomp(y))] = pairclass self.pairdict[(y, ecomp(x))] = pairclass tdsh = (len(self.seqs["TD"]), len(self.seqs["TD"])) dtsh = (len(self.seqs["DT"]), len(self.seqs["DT"])) if not multiscore: # Get the mean non-spurious interaction self.meangse = 0.5 * ( np.mean(self.ef.matching_uniform(self.seqs["TD"])) + np.mean(self.ef.matching_uniform(self.seqs["DT"])) ) self.mult = { "1NGO": np.exp(-2.0 * self.meangse), "2NGO": np.exp(-1.65 * self.meangse), "1GO": np.exp(-1.5 * self.meangse), "2GO": np.exp(-1.1 * self.meangse), "I": np.exp(-1.0 * self.meangse), } self.ecache_cc = { "TD": cachedarray( lambda x, y: self.ef.uniform( self.seqs["TD"][x : x + 1].comps, self.seqs["TD"][y : y + 1].comps, ), tdsh, ), "DT": cachedarray( lambda x, y: self.ef.uniform( self.seqs["DT"][x : x + 1].comps, self.seqs["DT"][y : y + 1].comps, ), dtsh, ), } self.ecache_ce = { "TD": cachedarray( lambda x, y: self.ef.uniform( self.seqs["TD"][x : x + 1].comps, self.seqs["TD"][y : y + 1].ends, ), tdsh, ), "DT": cachedarray( lambda x, y: self.ef.uniform( self.seqs["DT"][x : x + 1].comps, self.seqs["DT"][y : y + 1].ends, ), dtsh, ), } self.ecache_ec = { "TD": cachedarray( lambda x, y: self.ef.uniform( self.seqs["TD"][x : x + 1].ends, self.seqs["TD"][y : y + 1].comps, ), tdsh, ), "DT": cachedarray( lambda x, y: self.ef.uniform( self.seqs["DT"][x : x + 1].ends, self.seqs["DT"][y : y + 1].comps, ), dtsh, ), } self.ecache_ee = { "TD": cachedarray( lambda x, y: self.ef.uniform( self.seqs["TD"][x : x + 1].ends, self.seqs["TD"][y : y + 1].ends ), tdsh, ), "DT": cachedarray( lambda x, y: self.ef.uniform( self.seqs["DT"][x : x + 1].ends, self.seqs["DT"][y : y + 1].ends ), dtsh, ), } else: self.ecache_cc = [] self.ecache_ee = [] self.ecache_ce = [] self.ecache_ec = [] self.mult = [] self.meangse = [] for ef in self.ef: self.meangse.append( 0.5 * ( np.mean(ef.matching_uniform(self.seqs["TD"])) + np.mean(ef.matching_uniform(self.seqs["DT"])) ) ) self.mult.append( { "1NGO": np.exp(-2.0 * self.meangse[-1]), "2NGO": np.exp(-1.65 * self.meangse[-1]), "1GO": np.exp(-1.5 * self.meangse[-1]), "2GO": np.exp(-1.1 * self.meangse[-1]), "I": np.exp(-1.0 * self.meangse[-1]), } ) self.ecache_cc.append( { "TD": cachedarray( lambda x, y: ef.uniform( self.seqs["TD"][x : x + 1].comps, self.seqs["TD"][y : y + 1].comps, ), tdsh, ), "DT": cachedarray( lambda x, y: ef.uniform( self.seqs["DT"][x : x + 1].comps, self.seqs["DT"][y : y + 1].comps, ), dtsh, ), } ) self.ecache_ce.append( { "TD": cachedarray( lambda x, y: ef.uniform( self.seqs["TD"][x : x + 1].comps, self.seqs["TD"][y : y + 1].ends, ), tdsh, ), "DT": cachedarray( lambda x, y: ef.uniform( self.seqs["DT"][x : x + 1].comps, self.seqs["DT"][y : y + 1].ends, ), dtsh, ), } ) self.ecache_ec.append( { "TD": cachedarray( lambda x, y: ef.uniform( self.seqs["TD"][x : x + 1].ends, self.seqs["TD"][y : y + 1].comps, ), tdsh, ), "DT": cachedarray( lambda x, y: ef.uniform( self.seqs["DT"][x : x + 1].ends, self.seqs["DT"][y : y + 1].comps, ), dtsh, ), } ) self.ecache_ee.append( { "TD": cachedarray( lambda x, y: ef.uniform( self.seqs["TD"][x : x + 1].ends, self.seqs["TD"][y : y + 1].ends, ), tdsh, ), "DT": cachedarray( lambda x, y: ef.uniform( self.seqs["DT"][x : x + 1].ends, self.seqs["DT"][y : y + 1].ends, ), dtsh, ), } )
[docs] def slowseqs(self, state): "Give the state as the slow version would have" return {"DT": self.seqs["DT"][state["DT"]], "TD": self.seqs["TD"][state["TD"]]}
[docs] def mutate(self, state): # Start by deciding to swap TD or DT ends. if random.rand() > 1.0 * len(self.mutableTD) / ( len(self.mutableDT) + len(self.mutableTD) ): a = random.choice(self.mutableDT) b = random.choice(self.mutableDT) state["DT"][[a, b]] = state["DT"][[b, a]] else: a = random.choice(self.mutableTD) b = random.choice(self.mutableTD) state["TD"][[a, b]] = state["TD"][[b, a]]
[docs] def score(self, state): sc = 0.0 if self.inputpairs: if self.multiscore: ip = np.zeros((len(self.ef), len(self.inputpairs))) else: ip = np.zeros(len(self.inputpairs)) for i, (xn, yn) in enumerate(self.inputpairs): if xn[-1] == "/": xn = xn[:-1] if yn[-1] == "/": yn = yn[:-1] xi, xt = self.enlocs[xn] yi, yt = self.enlocs[yn] if self.multiscore: scc = 0 for j in range(len(self.ef)): ip[j][i] = np.abs( ( self.ecache_ec[j][xt][state[xt][xi], state[xt][xi]] + self.ecache_ec[j][yt][state[yt][yi], state[yt][yi]] ) ) else: ip[i] = np.abs( ( self.ecache_ec[xt][state[xt][xi], state[xt][xi]] + self.ecache_ec[yt][state[yt][yi], state[yt][yi]] ) ) if self.multiscore: sc += np.mean( np.array([z["I"] for z in self.mult]) * np.exp(np.ptp(ip, axis=1)) ) else: sc += self.mult["I"] * np.exp(np.ptp(ip)) for (xn, yn), pairclass in self.pairdict.items(): # set comp flags xc = False yc = False if xn[-1] == "/": xc = True xn = xn[:-1] if yn[-1] == "/": yc = True yn = yn[:-1] # get end indexes and types xi, xt = self.enlocs[xn] yi, yt = self.enlocs[yn] # print "%s, %s: (%s %s)" % (xn,yn,xt,yt) # skip if not same type if xt != yt: continue if not self.multiscore: if yc and xc: val = self.ecache_cc[xt][ state[xt][xi], state[yt][yi] ] # self.ef.uniform(state.seqs[xt][xi:xi+1].comps,state.seqs[yt][yi:yi+1].comps)[0] elif xc: val = self.ecache_ce[xt][ state[xt][xi], state[yt][yi] ] # self.ef.uniform(state.seqs[xt][xi:xi+1].comps,state.seqs[yt][yi:yi+1].ends)[0] elif yc: val = self.ecache_ec[xt][ state[xt][xi], state[yt][yi] ] # state[f.uniform(state.seqs[xt][xi:xi+1].ends,state.seqs[yt][yi:yi+1].comps)[0] else: val = self.ecache_ee[xt][ state[xt][xi], state[yt][yi] ] # self.ef.uniform(state.seqs[xt][xi:xi+1].ends,state.seqs[yt][yi:yi+1].ends)[0] sc += self.mult[pairclass] * np.exp(val) else: scc = 0 for i in range(len(self.ef)): if yc and xc: val = self.ecache_cc[i][xt][ state[xt][xi], state[yt][yi] ] # self.ef.uniform(state.seqs[xt][xi:xi+1].comps,state.seqs[yt][yi:yi+1].comps)[0] elif xc: val = self.ecache_ce[i][xt][ state[xt][xi], state[yt][yi] ] # self.ef.uniform(state.seqs[xt][xi:xi+1].comps,state.seqs[yt][yi:yi+1].ends)[0] elif yc: val = self.ecache_ec[i][xt][ state[xt][xi], state[yt][yi] ] # state[f.uniform(state.seqs[xt][xi:xi+1].ends,state.seqs[yt][yi:yi+1].comps)[0] else: val = self.ecache_ee[i][xt][ state[xt][xi], state[yt][yi] ] # self.ef.uniform(state.seqs[xt][xi:xi+1].ends,state.seqs[yt][yi:yi+1].ends)[0] scc += self.mult[i][pairclass] * np.exp(val) sc += scc / len(self.ef) return sc
[docs]wcd = { "a": "t", "b": "v", "c": "g", "d": "h", "g": "c", "h": "d", "k": "m", "m": "k", "n": "n", "s": "s", "t": "a", "v": "b", "w": "w", }
[docs]def wc(seqstr): return "".join(wcd[x] for x in reversed(seqstr))