Source code for attune.curve._topas

import numpy as np
import pathlib
import warnings
from ._base import Curve
from ._dependent import Setpoints, Dependent
import WrightTools as wt

__all__ = ["TopasCurve"]

TOPAS_C_motor_names = {
    0: ["Crystal_1", "Delay_1", "Crystal_2", "Delay_2"],
    1: ["Mixer_1"],
    2: ["Mixer_2"],
    3: ["Mixer_3"],
}

# [num_between, motor_names]
TOPAS_C_interactions = {
    "NON-NON-NON-Sig": [8, TOPAS_C_motor_names[0]],
    "NON-NON-NON-Idl": [8, TOPAS_C_motor_names[0]],
    "NON-NON-SH-Sig": [11, TOPAS_C_motor_names[1]],
    "NON-SH-NON-Sig": [11, TOPAS_C_motor_names[2]],
    "NON-NON-SH-Idl": [11, TOPAS_C_motor_names[1]],
    "NON-NON-SF-Sig": [11, TOPAS_C_motor_names[1]],
    "NON-NON-SF-Idl": [11, TOPAS_C_motor_names[1]],
    "NON-SH-SH-Sig": [11, TOPAS_C_motor_names[2]],
    "SH-SH-NON-Sig": [11, TOPAS_C_motor_names[3]],
    "NON-SH-SH-Idl": [11, TOPAS_C_motor_names[2]],
    "SH-NON-SH-Idl": [11, TOPAS_C_motor_names[3]],
    "DF1-NON-NON-Sig": [10, TOPAS_C_motor_names[3]],
}

TOPAS_800_motor_names = {
    0: ["Crystal", "Amplifier", "Grating"],
    1: [""],
    2: [""],
    3: ["NDFG_Crystal", "NDFG_Mirror", "NDFG_Delay"],
}

# [num_between, motor_names]
TOPAS_800_interactions = {
    "NON-NON-NON-Sig": [8, TOPAS_800_motor_names[0]],
    "NON-NON-NON-Idl": [8, TOPAS_800_motor_names[0]],
    "DF1-NON-NON-Sig": [7, TOPAS_800_motor_names[3]],
    "DF2-NON-NON-Sig": [7, TOPAS_800_motor_names[3]],
}

TOPAS_interaction_by_kind = {"TOPAS-C": TOPAS_C_interactions, "TOPAS-800": TOPAS_800_interactions}


[docs]class TopasCurve(Curve): @classmethod def read(cls, filepaths, interaction_string): """Create a curve object from a TOPAS crv file. Parameters ---------- filepaths : list of str [base, mixer 1, mixer 2, mixer 3] Paths to all crv files for OPA. Filepaths may be None if not needed / not applicable. interaction_string : str Interaction string for this curve, in the style of Light Conversion - e.g. 'NON-SF-NON-Sig'. Returns ------- WrightTools.tuning.curve.TopasCurve object """ return cls.read_all(filepaths)[interaction_string] @classmethod def read_all(cls, filepaths): curves = {} for f in filepaths: curves.update(cls._read_file(f)) for i, c in curves.items(): interaction = i.split("-") for j, part in enumerate(interaction): if part != "NON": interaction[j] = "NON" break interaction = "-".join(interaction) if interaction in curves: c.subcurve = curves[interaction] curves[interaction].supercurves.append(c) c.interpolate() return curves @classmethod def _read_file(cls, filepath): ds = np.DataSource(None) f = ds.open(str(filepath), "rt") head = f.readline().strip() if head != "600": warnings.warn(f"Unexpected header {head}, expected '600'") kind = f.readline().strip() config = {} if kind == "OPA/NOPA": kind = "OPA" if f.readline().strip() == "0" else "NOPA" config["use grating equation"] = f.readline().strip() == "1" config["grating motor index"] = int(f.readline()) config["grating constant"] = float(f.readline()) config["maximum grating position"] = float(f.readline()) n_motors = int(f.readline()) motor_indexes = np.fromstring(f.readline(), dtype=int, sep="\t") n_curves = int(f.readline().strip()) curves = {} for _ in range(n_curves): interaction_string = f.readline().strip() comment = "" for _a in range(int(f.readline())): comment += f.readline() # TODO: Check H/V polarization = "H" if int(f.readline()) else "V" pump_wavelength = float(f.readline()) f.readline() # n_motors offsets = np.fromstring(f.readline(), dtype=float, sep="\t") npts = int(f.readline()) strings = [f.readline().strip() for _ in range(npts)] arr = np.genfromtxt(strings, delimiter="\t", max_rows=npts, filling_values=np.nan).T source_setpoints = Dependent(arr[0], "source", "nm") setpoints = Setpoints(arr[1], "output", "nm") dependents = [] for i in range(n_motors): name = str(motor_indexes[i]) units = None dependents.append(Dependent(arr[i + 3], name, units, index=motor_indexes[i])) # TODO: figure out what namem should default to name = interaction_string curves[interaction_string] = cls( setpoints, dependents, name=name, interaction=interaction_string, kind=kind, source_setpoints=source_setpoints, polarization=polarization, pump_wavelength=pump_wavelength, config=config, motor_indexes=motor_indexes, comment=comment, offsets=offsets, ) for key,value in curves.items(): setattr(value, "siblings", [v for k,v in curves.items() if key != k]) setattr(value, "supercurves", []) f.close() return curves def save(self, save_directory, full=True): """Save a curve object. Parameters ---------- save_directory : string. Save directory. kind : string Curve kind. full : boolean Toggle saving subcurves. Returns ------- string Output path. """ # unpack curve = self.copy() curve.convert("nm") interaction_string = curve.interaction to_insert = {} if full: to_insert = curve._get_family_dict() to_insert[interaction_string] = curve if interaction_string == "NON-NON-NON-Idl": to_insert["NON-NON-NON-Sig"] = _convert(curve) to_insert["NON-NON-NON-Sig"].interaction = "NON-NON-NON-Sig" if interaction_string == "NON-NON-NON-Sig": to_insert["NON-NON-NON-Idl"] = _convert(curve) to_insert["NON-NON-NON-Idl"].interaction = "NON-NON-NON-Idl" save_directory = pathlib.Path(save_directory) save_directory.mkdir(parents=True, exist_ok=True) timestamp = wt.kit.TimeStamp().path out_paths = [] while len(to_insert): _, curve = to_insert.popitem() out_name = curve.kind + "- " + timestamp out_path = (save_directory / out_name).with_suffix(".crv") out_paths.append(out_path) all_sibs = [curve] if curve.siblings: all_sibs += curve.siblings with open(out_path, "w") as new_crv: _write_headers(new_crv, curve) new_crv.write(f"{len(all_sibs)}\n") for c in all_sibs: _write_curve(new_crv, c) to_insert.pop(c.interaction, None) return out_paths def _get_family_dict(self, start=None): if start is None: start = {} d = {k:v for k,v in start.items()} d.update({self.interaction: self}) if self.siblings: for s in self.siblings: if s.interaction not in d: d.update(s._get_family_dict(d)) if self.subcurve: if self.subcurve.interaction not in d: d.update(self.subcurve._get_family_dict(d)) if self.supercurves: for s in self.supercurves: if s.interaction not in d: d.update(s._get_family_dict(d)) return d
def _insert(curve): motor_indexes = curve.motor_indexes arr = np.empty((len(motor_indexes) + 3, len(curve.setpoints))) arr[0] = curve.source_setpoints[:] arr[1] = curve.setpoints[:] arr[2] = len(motor_indexes) print(motor_indexes) print([d.index for d in curve.dependents.values()]) for i, m in enumerate(motor_indexes): arr[3 + i] = next(d for d in curve.dependents.values() if d.index == m)[:] return arr.T def _convert(curve): curve = curve.copy() curve.setpoints.positions = 1 / ((1 / curve.pump_wavelength) - (1 / curve.setpoints[:])) curve.polarization = "V" if curve.polarization == "H" else "H" return curve def _write_headers(f, curve): f.write("600\n") if curve.kind in "OPA/NOPA": f.write("OPA/NOPA\n") f.write(f"{int(curve.kind=='NOPA')}\n") f.write(f"{int(curve.config.get('use grating equation', 0))}\n") f.write(f"{curve.config.get('grating motor index', -1)}\n") f.write(f"{curve.config.get('grating constant', 0)}\n") f.write(f"{curve.config.get('maximum grating position', 0)}\n") else: f.write(f"{curve.kind}\n") f.write(f"{len(curve.dependents)}\n") f.write("\t".join(str(i) for i in curve.motor_indexes)) f.write("\n") def _write_curve(f, curve): curve = curve.copy() curve.convert("nm") curve.sort() f.write(f"{curve.interaction}\n") if len(curve.comment) == 0 or curve.comment[-1] != "\n": curve.comment += "\n" num_lines = curve.comment.count("\n") f.write(f"{num_lines}\n") f.write(curve.comment) f.write(f"{int(curve.polarization=='H')}\n") f.write(f"{curve.pump_wavelength}\n") f.write(f"{len(curve.dependents)}\n") f.write("\t".join(str(i) for i in curve.offsets)) f.write("\n") array = _insert(curve) f.write(f"{len(array)}\n") fmt = ["%0.6f"] * len(array.T) fmt[2] = "%0.f" # this field is an int np.savetxt(f, array, fmt=fmt, delimiter="\t", newline="\n")