Source code for qick.qick_asm

"""
The interface for writing QICK programs.
This contains tools for managing the board configuration and the base class for QICK programs.
The assembly language for QICK programs is defined separately for the v1 and v2 tProcessors.
"""
import logging
import numpy as np
import json
from collections import namedtuple, OrderedDict, defaultdict
import operator
import functools
from abc import ABC, abstractmethod
from tqdm.auto import tqdm

from qick import obtain, get_version
from .helpers import to_int, cosine, gauss, triang, DRAG, decode_array

logger = logging.getLogger(__name__)

[docs]class QickConfig(): """Uses the QICK configuration to convert frequencies and clock delays. If running on the QICK, you don't need to use this class - the QickSoc class has all of the same methods. If running remotely, you may want to initialize a QickConfig from a JSON file. Parameters ---------- cfg : dict or str config dictionary, or path to JSON file Returns ------- """ def __init__(self, cfg=None): if cfg is not None: # we are getting an external config dictionary (e.g. from a Pyro server) if isinstance(cfg, str): with open(cfg) as f: self._cfg = json.load(f) else: self._cfg = cfg # compare the remote and local versions, warn on mismatch # if the remote library is so old that it doesn't have sw_version, get() will return None extversion = self._cfg.get('sw_version') ourversion = get_version() if extversion != ourversion: logger.warning("QICK library version mismatch: %s remote (the board), %s local (the PC)\n\ This may cause errors, usually KeyError in QickConfig initialization.\n\ If this happens, you must bring your versions in sync."%(extversion, ourversion)) def __str__(self): return self.description() def __getitem__(self, key): return self._cfg[key] def __setitem__(self, key, val): self._cfg[key] = val def _describe_dac(self, dacname): tile, block = [int(c) for c in dacname] if self['board']=='ZCU111': label = "DAC%d_T%d_CH%d or RF board output %d" % (tile + 228, tile, block, tile*4 + block) elif self['board']=='ZCU216': label = "%d_%d, on JHC%d" % (block, tile + 228, 1 + (block%2) + 2*(tile//2)) elif self['board']=='RFSoC4x2': label = {'00': 'DAC_B', '20': 'DAC_A'}[dacname] return "DAC tile %d, blk %d is %s" % (tile, block, label) def _describe_adc(self, adcname): tile, block = [int(c) for c in adcname] if self['board']=='ZCU111': rfbtype = "DC" if tile > 1 else "AC" label = "ADC%d_T%d_CH%d or RF board %s input %d" % (tile + 224, tile, block, rfbtype, (tile%2)*2 + block) elif self['board']=='ZCU216': label = "%d_%d, on JHC%d" % (block, tile + 224, 5 + (block%2) + 2*(tile//2)) elif self['board']=='RFSoC4x2': label = {'00': 'ADC_D', '01': 'ADC_C', '20': 'ADC_B', '21': 'ADC_A'}[adcname] return "ADC tile %d, blk %d is %s" % (tile, block, label)
[docs] def description(self): """Generate a printable description of the QICK configuration. Parameters ---------- Returns ------- str description """ tproc = self['tprocs'][0] lines = [] lines.append("\n\tBoard: " + self['board']) lines.append("\n\tSoftware version: " + self['sw_version']) lines.append("\tFirmware timestamp: " + self['fw_timestamp']) lines.append("\n\tGlobal clocks (MHz): tProcessor %.3f, RF reference %.3f" % ( tproc['f_time'], self['refclk_freq'])) lines.append("\n\t%d signal generator channels:" % (len(self['gens']))) for iGen, gen in enumerate(self['gens']): dacname = gen['dac'] dac = self['dacs'][dacname] buflen = gen['maxlen']/(gen['samps_per_clk']*gen['f_fabric']) lines.append("\t%d:\t%s - envelope memory %d samples (%.3f us)" % (iGen, gen['type'], gen['maxlen'], buflen)) lines.append("\t\tfs=%.3f MHz, fabric=%.3f MHz, %d-bit DDS, range=%.3f MHz" % (dac['fs'], gen['f_fabric'], gen['b_dds'], gen['f_dds'])) lines.append("\t\t" + self._describe_dac(dacname)) if self['iqs']: lines.append("\n\t%d constant-IQ outputs:" % (len(self['iqs']))) for iIQ, iq in enumerate(self['iqs']): dacname = iq['dac'] dac = self['dacs'][dacname] lines.append("\t%d:\tfs=%.3f MHz" % (iIQ, *dacname, iq['fs'])) lines.append("\t\t" + self._describe_dac(dacname)) lines.append("\n\t%d readout channels:" % (len(self['readouts']))) for iReadout, readout in enumerate(self['readouts']): adcname = readout['adc'] adc = self['adcs'][adcname] buflen = readout['buf_maxlen']/readout['f_output'] if 'tproc_ctrl' in readout: lines.append("\t%d:\t%s - configured by tProc output %d" % (iReadout, readout['ro_type'], readout['tproc_ctrl'])) else: lines.append("\t%d:\t%s - configured by PYNQ" % (iReadout, readout['ro_type'])) lines.append("\t\tfs=%.3f MHz, decimated=%.3f MHz, %d-bit DDS, range=%.3f MHz" % (adc['fs'], readout['f_output'], readout['b_dds'], readout['f_dds'])) lines.append("\t\tmaxlen %d accumulated, %d decimated (%.3f us)" % ( readout['avg_maxlen'], readout['buf_maxlen'], buflen)) lines.append("\t\ttriggered by %s %d, pin %d, feedback to tProc input %d" % ( readout['trigger_type'], readout['trigger_port'], readout['trigger_bit'], readout['tproc_ch'])) lines.append("\t\t" + self._describe_adc(adcname)) lines.append("\n\t%d digital output pins:" % (len(tproc['output_pins']))) for iPin, (porttype, port, pin, name) in enumerate(tproc['output_pins']): lines.append("\t%d:\t%s" % (iPin, name)) #lines.append("\t%d:\t%s (%s %d, pin %d)" % (iPin, name, porttype, port, pin)) lines.append("\n\ttProc %s: program memory %d words, data memory %d words" % (tproc['type'], tproc['pmem_size'], tproc['dmem_size'])) lines.append("\t\texternal start pin: %s" % (tproc['start_pin'])) bufnames = [ro['avgbuf_fullpath'] for ro in self['readouts']] if "ddr4_buf" in self._cfg: buf = self['ddr4_buf'] buflist = [bufnames.index(x) for x in buf['readouts']] buflen = buf['maxlen']/self['readouts'][buflist[0]]['f_fabric'] lines.append("\n\tDDR4 memory buffer: %d samples (%.3f sec), %d samples/transfer" % (buf['maxlen'], buflen/1e6, buf['burst_len'])) lines.append("\t\twired to readouts %s" % (buflist)) #lines.append("\t\twired to readouts %s, triggered by %s %d, pin %d" % ( # buflist, buf['trigger_type'], buf['trigger_port'], buf['trigger_bit'])) if "mr_buf" in self._cfg: buf = self['mr_buf'] buflist = [bufnames.index(x) for x in buf['readouts']] buflen = buf['maxlen']/self['adcs'][self['readouts'][buflist[0]]['adc']]['fs'] lines.append("\n\tMR buffer: %d samples (%.3f us), wired to readouts %s" % ( buf['maxlen'], buflen, buflist)) #lines.append("\n\tMR buffer: %d samples, wired to readouts %s, triggered by %s %d, pin %d" % ( # buf['maxlen'], buflist, buf['trigger_type'], buf['trigger_port'], buf['trigger_bit'])) return "\nQICK configuration:\n"+"\n".join(lines)
[docs] def get_cfg(self): """Return the QICK configuration dictionary. This contains everything you need to recreate the QickConfig. Parameters ---------- Returns ------- dict configuration dictionary """ return self._cfg
[docs] def dump_cfg(self): """Generate a JSON description of the QICK configuration. You can save this string to a file and load it to recreate the QickConfig. Parameters ---------- Returns ------- str configuration in JSON format """ return json.dumps(self._cfg, indent=4)
[docs] def calc_fstep_int(self, dict1, other_dicts): """Finds the multiplier that needs to be applied to a channel's frequency step size to allow this channel to be frequency-matched with another channel. Parameters ---------- dict1 : dict config dict for this channel other_dicts : list of dict config dict for the other channel(s) Returns ------- int frequency step multiplier for the first channel """ refclk = self['refclk_freq'] # Calculate least common multiple of sampling frequencies. alldicts = [dict1] + other_dicts # The DDS ranges are related to the refclk by fs_mult and fdds_div, both integers: f_dds = refclk*fs_mult/fdds_div # So we can find a common div: max_div = np.lcm.reduce([d['fdds_div'] for d in alldicts]) # and the max of the bit resolutions: b_max = max([d['b_dds'] for d in alldicts]) # so the frequency steps are both divisible by a "common divisor" of refclk/max_div/2**b_max # and these multipliers from the common divisor to the channel steps are always integer fsmults = [d['fs_mult'] * (max_div//d['fdds_div']) * 2**(b_max - d['b_dds']) for d in alldicts] # the LCM of those multipliers will give us a common multiple of the channel steps mult_lcm = np.lcm.reduce(fsmults) # so mult_lcm times the common divisor gives us a common step size that is divisible by all channel steps # we want the common step divided by the channel 1 step: return mult_lcm//fsmults[0]
[docs] def ch_fstep(self, dict1): """Finds the frequency step size of a single channel (generator or readout). Parameters ---------- dict1 : dict config dict for one channel Returns ------- float frequency step for this channel """ return dict1['fs_mult'] * (self['refclk_freq']/dict1['fdds_div']) / 2**dict1['b_dds']
[docs] def calc_fstep(self, dicts): """Finds the least common multiple of the frequency steps of one or more channels (typically two, a generator and a readout) For proper frequency matching, you should only use frequencies that are evenly divisible by this value. The order of the parameters does not matter. Parameters ---------- dicts : list of dict config dict for the channels Returns ------- float frequency step common to all channels """ fstep = self.ch_fstep(dicts[0]) if len(dicts) > 1: # find the multiplier from channel 1's minimum step size to the common step size step_int1 = self.calc_fstep_int(dicts[0], dicts[1:]) # multiply channel 1's step size by the multiplier fstep *= step_int1 return fstep
[docs] def roundfreq(self, f, dicts): """Round a frequency to the LCM of the frequency steps of one or more channels (typically two, a generator and a readout). Parameters ---------- f : float or array frequency (MHz) dicts : list of dict config dict for the channels Returns ------- float or array rounded frequency (MHz) """ fstep = self.calc_fstep(dicts) return np.round(f/fstep) * fstep
[docs] def freq2int(self, f, thisch, otherch=None): """Converts frequency in MHz to integer value suitable for writing to a register. This method works for both generators and readouts. If a gen will be connected to an RO, the two channels must have exactly the same frequency, and you must supply the config for the other channel. Parameters ---------- f : float frequency (MHz) thisch : dict config dict for the channel you're configuring otherch : dict config dict for a channel you will set to the same frequency Returns ------- int Re-formatted frequency """ if otherch is None: step_int = 1 else: step_int = self.calc_fstep_int(thisch, [otherch]) return to_int(f, 1/self.ch_fstep(thisch), parname='freq', quantize=step_int)
[docs] def int2freq(self, r, thisch): """Converts register value to MHz. This method works for both generators and readouts. Parameters ---------- r : int register value thisch : dict config dict for the channel you're configuring Returns ------- float Re-formatted frequency (MHz) """ return r / (2**thisch['b_dds'] / thisch['f_dds'])
[docs] def freq2reg(self, f, gen_ch=0, ro_ch=None): """Converts frequency in MHz to tProc generator register value. Parameters ---------- f : float frequency (MHz) gen_ch : int generator channel ro_ch : int readout channel (use None if you don't want to frequency-match to a readout) Returns ------- int Re-formatted frequency """ if ro_ch is None: rocfg = None else: rocfg = self['readouts'][ro_ch] gencfg = self['gens'][gen_ch] #if gencfg['type'] in ['axis_sg_int4_v1', 'axis_sg_mux4_v1', 'axis_sg_mux4_v2']: if gencfg['interpolation'] != 1: # because of the interpolation filter, there is no output power in the higher nyquist zones if f > gencfg['f_dds']/2 or f < -gencfg['f_dds']/2: raise RuntimeError("requested frequency %f is outside of the range [-fs/2, fs/2]"%(f)) return self.freq2int(f, gencfg, rocfg) % 2**gencfg['b_dds']
[docs] def freq2reg_adc(self, f, ro_ch=0, gen_ch=None): """Converts frequency in MHz to readout register value. Parameters ---------- f : float frequency (MHz) ro_ch : int readout channel gen_ch : int generator channel (use None if you don't want to frequency-match to a generator) Returns ------- int Re-formatted frequency """ if gen_ch is None: gencfg = None else: gencfg = self['gens'][gen_ch] rocfg = self['readouts'][ro_ch] return self.freq2int(f, rocfg, gencfg) % 2**rocfg['b_dds']
[docs] def reg2freq(self, r, gen_ch=0): """Converts frequency from format readable by generator to MHz. Parameters ---------- r : int frequency in generator format gen_ch : int generator channel Returns ------- float Re-formatted frequency in MHz """ gencfg = self['gens'][gen_ch] return self.int2freq(r, gencfg)
[docs] def reg2freq_adc(self, r, ro_ch=0): """Converts frequency from format readable by readout to MHz. Parameters ---------- r : int frequency in readout format ro_ch : int readout channel Returns ------- float Re-formatted frequency in MHz """ rocfg = self['readouts'][ro_ch] return self.int2freq(r, rocfg)
[docs] def adcfreq(self, f, gen_ch=0, ro_ch=0): """Takes a frequency and trims it to the closest DDS frequency valid for both channels. Parameters ---------- f : float frequency (MHz) gen_ch : int generator channel ro_ch : int readout channel Returns ------- float Re-formatted frequency """ return self.roundfreq(f, [self['gens'][gen_ch], self['readouts'][ro_ch]])
def _get_ch_cfg(self, gen_ch=None, ro_ch=None): """Helper method to grab the config dictionary for a generator or readout. Parameters ---------- gen_ch : int generator channel (index in 'gens' list) ro_ch : int readout channel (index in 'readouts' list) Returns ------- dict Config dictionary, or None if neither paramater was defined """ if gen_ch is not None and ro_ch is not None: raise RuntimeError("can't specify both gen_ch and ro_ch!") elif gen_ch is not None: return self['gens'][gen_ch] elif ro_ch is not None: return self['readouts'][ro_ch] else: return None def _get_mixer_cfg(self, gen_ch): """ Create a fake config dictionary for a generator's NCO, for use in frequency matching. """ gencfg = self['gens'][gen_ch] mixercfg = {} mixercfg['fs_mult'] = gencfg['fs_mult'] mixercfg['fdds_div'] = gencfg['fs_div'] mixercfg['b_dds'] = 48 return mixercfg
[docs] def deg2int(self, deg, thisch): """Converts phase in degrees to integer value suitable for writing to a register. This method works for both generators and readouts. Parameters ---------- deg : float phase (degrees) thisch : dict config dict for the channel you're configuring Returns ------- int Re-formatted phase """ return to_int(deg, 2**thisch['b_phase']/360, parname='phase') % 2**thisch['b_phase']
[docs] def int2deg(self, r, thisch): """Converts register value to degrees. This method works for both generators and readouts. Parameters ---------- r : int register value thisch : dict config dict for the channel you're configuring Returns ------- float Re-formatted phase (degrees) """ return r / (2**thisch['b_phase'] / 360)
[docs] def deg2reg(self, deg, gen_ch=0, ro_ch=None): """Converts degrees into phase register values; numbers greater than 360 will effectively be wrapped. Parameters ---------- deg : float Number of degrees gen_ch : int generator channel (index in 'gens' list) ro_ch : int readout channel (index in 'readouts' list) Returns ------- int Re-formatted number of degrees """ ch_cfg = self._get_ch_cfg(gen_ch=gen_ch, ro_ch=ro_ch) if ch_cfg is None: raise RuntimeError("must specify either gen_ch or ro_ch!") return self.deg2int(deg, ch_cfg)
[docs] def reg2deg(self, r, gen_ch=0, ro_ch=None): """Converts phase register values into degrees. Parameters ---------- reg : int Re-formatted number of degrees gen_ch : int generator channel (index in 'gens' list) ro_ch : int readout channel (index in 'readouts' list) Returns ------- float Number of degrees """ ch_cfg = self._get_ch_cfg(gen_ch=gen_ch, ro_ch=ro_ch) if ch_cfg is None: raise RuntimeError("must specify either gen_ch or ro_ch!") return self.int2deg(r, ch_cfg)
[docs] def cycles2us(self, cycles, gen_ch=None, ro_ch=None): """Converts clock cycles to microseconds. Uses tProc clock frequency by default. If gen_ch or ro_ch is specified, uses that generator/readout channel's fabric clock. Parameters ---------- cycles : int Number of clock cycles gen_ch : int generator channel (index in 'gens' list) ro_ch : int readout channel (index in 'readouts' list) Returns ------- float Number of microseconds """ if gen_ch is not None and ro_ch is not None: raise RuntimeError("can't specify both gen_ch and ro_ch!") if gen_ch is not None: fclk = self['gens'][gen_ch]['f_fabric'] elif ro_ch is not None: fclk = self['readouts'][ro_ch]['f_output'] else: fclk = self['tprocs'][0]['f_time'] return cycles/fclk
[docs] def us2cycles(self, us, gen_ch=None, ro_ch=None): """Converts microseconds to integer number of clock cycles. Uses tProc clock frequency by default. If gen_ch or ro_ch is specified, uses that generator/readout channel's fabric clock. Parameters ---------- us : float Number of microseconds gen_ch : int generator channel (index in 'gens' list) ro_ch : int readout channel (index in 'readouts' list) Returns ------- int Number of clock cycles """ if gen_ch is not None and ro_ch is not None: raise RuntimeError("can't specify both gen_ch and ro_ch!") if gen_ch is not None: fclk = self['gens'][gen_ch]['f_fabric'] elif ro_ch is not None: fclk = self['readouts'][ro_ch]['f_output'] else: fclk = self['tprocs'][0]['f_time'] #return np.int64(np.round(obtain(us)*fclk)) return to_int(obtain(us), fclk, parname='length')
[docs] def calc_mixer_freq(self, gen_ch, mixer_freq, nqz, ro_ch): """ Set the NCO frequency that will be mixed with the generator output. The RFdc driver does its own math to convert a frequency to a register value. (see XRFdc_SetMixerSettings in xrfdc_mixer.c, and "NCO Frequency Conversion" in PG269) This is what it does: 1. Add/subtract fs to get the frequency in the range of [-fs/2, fs/2]. 2. If the original frequency was not in [-fs/2, fs/2] and the DAC is configured for 2nd Nyquist zone, multiply by -1. 3. Convert to a 48-bit register value, rounding using C integer casting (i.e. round towards 0). Step 2 is not desirable for us, so we must undo it. The rounding gives unexpected results sometimes: it's hard to tell if a freq will get rounded up or down. This is important if the demanded frequency was rounded to a valid frequency for frequency matching. The safest way to get consistent behavior is to always round to a valid NCO frequency. We are trusting that the floating-point math is exact and a number we rounded here is still a round number in the RFdc driver. """ cfg = {} cfg['userval'] = mixer_freq gencfg = self['gens'][gen_ch] mixercfg = self._get_mixer_cfg(gen_ch) if ro_ch is None: rounded_f = self.roundfreq(mixer_freq, [mixercfg]) else: rounded_f = self.roundfreq(mixer_freq, [mixercfg, self['readouts'][ro_ch]]) cfg['rounded'] = rounded_f if abs(rounded_f) > gencfg['fs']/2 and nqz==2: cfg['setval'] = -rounded_f else: cfg['setval'] = rounded_f return cfg
[docs] def calc_muxgen_regs(self, gen_ch, freqs, gains, phases, ro_ch): """Calculate the register values to program into a multiplexed generator. """ gencfg = self['gens'][gen_ch] if gains is not None and len(gains) != len(freqs): raise RuntimeError("lengths of freqs and gains lists do not match") if phases is not None and len(phases) != len(freqs): raise RuntimeError("lengths of freqs and phases lists do not match") tones = [] for i, freq in enumerate(freqs): tone = {} tone['freq_int'] = self.freq2reg(freq, gen_ch=gen_ch, ro_ch=ro_ch) tone['freq_rounded'] = self.reg2freq(tone['freq_int'], gen_ch=gen_ch) if gencfg['has_gain']: gain = 1.0 if gains is None else gains[i] tone['gain_int'] = int(np.round(gain * gencfg['maxv'])) tone['gain_rounded'] = tone['gain_int']/gencfg['maxv'] if gencfg['has_phase']: phase = 0.0 if phases is None else phases[i] tone['phase_int'] = self.deg2reg(phase, gen_ch=gen_ch) tone['phase_rounded'] = self.reg2deg(tone['phase_int'], gen_ch=gen_ch) tones.append(tone) return tones
[docs] def calc_ro_regs(self, rocfg, phase, sel): """Calculate the settings to configure a readout. Returns ------- dict settings for QickSoc.config_readout() """ ro_regs = {} if rocfg['has_outsel']: ro_regs['sel'] = sel elif sel != 'product': raise RuntimeError("sel parameter was specified for readout %d, which doesn't support this parameter" % (ch)) # calculate phase register if 'b_phase' in rocfg: ro_regs['phase_int'] = self.deg2int(phase, rocfg) ro_regs['phase_rounded'] = self.int2deg(ro_regs['phase_int'], rocfg) elif phase != 0: raise RuntimeError("phase parameter was specified for readout %d, which doesn't support this parameter" % (ch)) return ro_regs
[docs] def calc_ro_freq(self, rocfg, ro_pars, ro_regs, mixer_freq): """Calculate the readout frequency and registers. """ gen_ch = ro_pars['gen_ch'] # now do frequency stuff if gen_ch is not None: # calculate the frequency that will be applied to the generator ro_regs['f_rounded'] = self.roundfreq(ro_pars['freq'], [self['gens'][gen_ch], rocfg]) if mixer_freq is not None: ro_regs['f_rounded'] += mixer_freq else: # round to RO frequency ro_regs['f_rounded'] = self.roundfreq(ro_pars['freq'], [rocfg]) # calculate the freq register(s) if 'pfb_nout' in rocfg: # for mux readout, this is complicated self._calc_pfbro_freq(rocfg, ro_regs) else: # for regular readout, this is easy ro_regs['f_int'] = self.freq2int(ro_regs['f_rounded'], rocfg)
def _calc_pfbro_freq(self, rocfg, ro_regs): """Calculate the PFB settings to configure a muxed readout. """ freq = ro_regs['f_rounded'] nqz = int(freq // (rocfg['fs']/2)) + 1 if nqz % 2 == 0: # even Nyquist zone freq *= -1 # fold into 1st nyquist zone freq %= rocfg['fs'] ro_regs['f_folded'] = freq # the PFB channels are separated by half the DDS range # round() gives you the single best channel # floor() and ceil() would give you the 2 best channels # if you have two RO frequencies close together, you might need to force one of them onto a non-optimal channel f_steps = int(np.round(freq/(rocfg['f_dds']/2))) f_dds = freq - f_steps*(rocfg['f_dds']/2) ro_regs['pfb_f'] = f_dds ro_regs['pfb_lo'] = (f_steps-0.5) * (rocfg['f_dds']/2) ro_regs['pfb_hi'] = (f_steps+0.5) * (rocfg['f_dds']/2) ro_regs['pfb_center'] = f_steps * (rocfg['f_dds']/2) ro_regs['pfb_lolo'] = max(f_steps-1, 0) * (rocfg['f_dds']/2) ro_regs['pfb_hihi'] = min(f_steps+1, rocfg['pfb_nch']) * (rocfg['f_dds']/2) ro_regs['pfb_ch'] = (rocfg['pfb_ch_offset'] + f_steps) % rocfg['pfb_nch'] ro_regs['f_int'] = self.freq2int(f_dds, rocfg)
[docs] def check_pfb_collisions(self, rocfg, cfg1, cfgs): """Check whether the specified PFB config collides or interefers with any others. If this PFB block can't put two readouts on the same channel, this method will raise an error on collisions. Possible crosstalk will be identified in warnings. Parameters ---------- rocfg : dict firmware config dictionary for the PFB or the readout chain cfg1 : dict PFB config to check cfgs : list of dict PFB configs to check cfg1 against """ for cfg2 in cfgs: if cfg2['f_rounded'] == cfg1['f_rounded']: # it's fine to set two PFB outputs to identical frequencies continue if cfg2['pfb_ch'] == cfg1['pfb_ch']: p = {k:[x[k] for x in [cfg1, cfg2]] for k in ['f_rounded', 'f_folded']} message = [] message.append('Two tones on same PFB channel:') message.append("You have readouts at frequencies %.3f and %.3f MHz."% (cfg1['f_rounded'], cfg2['f_rounded'])) message.append("(after rounding and accounting for the generator's digital mixer, if applicable).") message.append("Both map to the same PFB channel. In terms of the ADC's first Nyquist zone:") message.append("The tone frequencies are %.3f and %.3f MHz, and"% (cfg1['f_folded'], cfg2['f_folded'])) message.append("the PFB channel range is [%.3f, %.3f] MHz."% (cfg1['pfb_lo'], cfg2['pfb_hi'])) if rocfg['pfb_dds_on_output']: message.append("This is allowed, but you should expect crosstalk if you play one tone while reading out the other.") logger.warning('\n'.join(message)) else: message.append("The PFB used in your firmware does not allow reading out two tones on the same channel.") raise RuntimeError('\n'.join(message)) else: # no collision, but we must check if either tone is in the overlap region of the other tone's channel if np.abs(cfg2['f_folded'] - cfg1['pfb_center']) < rocfg['f_dds']/2: logger.warning("The readout at %.3f MHz may see some crosstalk from the tone at %.3f MHz." % (cfg1['f_rounded'], cfg2['f_rounded'])) #source = cfg2 #victim = cfg1 #message = [] #message.append("Possible PFB crosstalk:") #message.append("You have declared a readout at %s MHz, or %s MHz after Nyquist folding."% (victim['rounded'], victim['folded'])) #message.append("The PFB channel for this tone is sensitive over [%f, %f] MHz."% (victim['pfb_lolo'], victim['pfb_hihi'])) #message.append("You have declared another readout at %s MHz, or %s MHz after Nyquist folding."% (source['rounded'], source['folded'])) #message.append("The readout at %s MHz may see some crosstalk from the tone at %s MHz." % (victim['rounded'], source['rounded'])) #logger.warning('\n'.join(message)) if np.abs(cfg1['f_folded'] - cfg2['pfb_center']) < rocfg['f_dds']/2: logger.warning("The readout at %.3f MHz may see some crosstalk from the tone at %.3f MHz." % (cfg2['f_rounded'], cfg1['f_rounded']))
[docs]class DummyIp: """Stores the configuration constants for a firmware IP block. """ def __init__(self, iptype, fullpath): # config dictionary for QickConfig self._cfg = {'type': iptype, 'fullpath': fullpath} @property def cfg(self): return self._cfg def __getitem__(self, key): return self._cfg[key]
[docs]class AbsQickProgram: """Generic QICK program, including support for generator and readout configuration but excluding tProc-specific code. QickProgram/QickProgramV2 are the concrete subclasses for tProc v1/v2. The tProc executes binary machine code; you write declarations and ASM code (or macros that get expanded to ASM). So before a program gets run, you need to fill it with declarations and ASM, and they need to get compiled (converted to machine code). There are three ways to prepare a QickProgram for running: 1. External initialization: Create an empty program object. Write the program by calling declaration and ASM methods of the program object. The program will be compiled when you try to run, dump, or print it. 2. Internal initialization: Create a subclass which calls declaration and ASM methods as part of __init__(). When you create an instance of the subclass, it will automatically fill itself. Typically you won't subclass QickProgram directly, you will subclass something like AveragerProgram which does a lot of the work for you. The program will be compiled when you try to run, dump, or print. 3. Loading a dump: Create an empty program object. Call QickProgram.load_prog() to load the program definition from a dump. The program will be compiled as part of load_prog(). """ # Calls to these methods will be passed through to the soccfg object. soccfg_methods = ['freq2reg', 'freq2reg_adc', 'reg2freq', 'reg2freq_adc', 'cycles2us', 'us2cycles', 'deg2reg', 'reg2deg'] # duration units in declare_readout and envelope definitions are in user units (float, us), not raw (int, clock ticks) USER_DURATIONS = False def __init__(self, soccfg): """ Constructor method """ self.soccfg = soccfg self.tproccfg = self.soccfg['tprocs'][0] self._init_declarations() self._init_instructions() # Attributes to dump when saving the program to JSON. self.dump_keys = ['envelopes', 'ro_chs', 'gen_chs'] def _init_declarations(self): """Initialize data structures for keeping track of program declarations. Structures that are filled directly by user code or a make_program() should be initialized here. This will typically mean macros, channels and envelopes. Concrete subclasses will extend this method to add more data structures. This should be called at class initialization. If a program is filled using a make_program() that is called during compilation, this should also be called before make_program(). """ logger.debug("init_declarations") # Pulse envelopes. self.envelopes = [{"next_addr": 0, "envs": {}} for ch in self.soccfg['gens']] # readout channels to configure before running the program self.ro_chs = OrderedDict() # signal generator channels to configure before running the program self.gen_chs = OrderedDict() def _init_instructions(self): """Initialize data structures for keeping track of program instructions. Structures that are filled automatically at compilation should be initialized here. This will typically mean the ASM list. Concrete subclasses will extend this method to add more data structures. This should be called at class initialization and before compilation. """ logger.debug("init_instructions") # Timestamps, for keeping track of pulse and readout end times. self._gen_ts = [0]*len(self.soccfg['gens']) self._ro_ts = [0]*len(self.soccfg['readouts']) # binary program, ready to execute self.binprog = None def __getattr__(self, a): """ Include QickConfig methods as methods of the QickProgram. This allows e.g. this.freq2reg(f) instead of this.soccfg.freq2reg(f). :param a: Instruction name :type a: str :return: Instruction arguments :rtype: *args object """ if a in self.__class__.soccfg_methods: return getattr(self.soccfg, a) else: return object.__getattribute__(self, a)
[docs] @abstractmethod def compile(self): """Fills self.binprog with a binary representation of the program. """ ...
[docs] def dump_prog(self): """ Dump the program to a dictionary. This output contains all the information necessary to run the program. In other words, it will have the low-level ASM and pulse+envelope data, but not higher-level structures. Caution: don't modify the sub-dictionaries of this dict! You will be modifying the original program (this is not a deep copy). """ progdict = {} for key in self.dump_keys: progdict[key] = getattr(self, key) return progdict
[docs] def load_prog(self, progdict): """ Load the program from a dictionary. """ for key in self.dump_keys: setattr(self, key, progdict[key]) # tweak data structures that got screwed up by JSON: # in JSON, dict keys are always strings, so we must cast back to int self.gen_chs = OrderedDict([(int(k),v) for k,v in self.gen_chs.items()]) self.ro_chs = OrderedDict([(int(k),v) for k,v in self.ro_chs.items()]) # the envelope arrays need to be restored as numpy arrays with the proper type for iCh, envdict in enumerate(self.envelopes): for name, env in envdict['envs'].items(): env['data'] = decode_array(env['data'])
[docs] def config_all(self, soc, load_pulses=True, reset=False): """ Load the waveform memory, gens, ROs, and program memory as specified for this program. The decimated+accumulated buffers are not configured, since those should be re-configured for each acquisition. The tProc is set to internal start before any other configuration is done, to prevent spurious external starts. Parameters ---------- reset : bool Force-stop the tProc before loading the program. This option only affects tProc v1, where the reset takes several ms. For tProc v2, where reset is easy, we always do the reset. """ # compile() first, because envelopes might be declared in a make_program() inside _make_asm() if self.binprog is None: self.compile() # set tproc to internal-start, to prevent spurious starts soc.start_src("internal") # now stop the tproc (if the tproc supports it) soc.stop_tproc(lazy=not reset) # Load the pulses from the program into the soc if load_pulses: self.load_pulses(soc) # Configure signal generators self.config_gens(soc) # Configure the readout down converters self.config_readouts(soc) # Load the program into the tProc soc.load_bin_program(self.binprog)
[docs] def run(self, soc, load_prog=True, load_pulses=True, start_src="internal"): """Load the program into the tProcessor and start it. Because there is in general no way to tell when a program is done running, there is no guarantee that the program will be done before this method returns. If you want that guarantee, use run_rounds(). Parameters ---------- soc : QickSoc The QickSoc that will execute this program. load_prog : bool Load the program before starting the tProc. load_pulses : bool Load the generator envelopes before starting the tProc. If load_prog is False, load_pulses is ignored. start_src: str "internal" (tProc starts immediately) or "external" (each round waits for an external trigger). """ if load_prog: self.config_all(soc, load_pulses=load_pulses) # configure tproc for internal/external start soc.start_src(start_src) # run the assembly program # if start_src="external", it won't actually start until it sees a pulse soc.start_tproc()
[docs] def declare_readout(self, ch, length, freq=None, phase=0, sel='product', gen_ch=None): """Add a channel to the program's list of readouts. Duration units depend on the program type: tProc v1 programs use integer number of samples, tProc v2 programs use float us. Parameters ---------- ch : int readout channel number (index in 'readouts' list) freq : float downconverting frequency (MHz) phase : float phase (degrees) length : int or float readout length (number of decimated samples for tProc v1, us for tProc v2) sel : str output select ('product', 'dds', 'input') gen_ch : int generator channel (use None if you don't want the downconversion frequency to be rounded to a valid DAC frequency or be offset by the DAC mixer frequency) """ ro_cfg = self.soccfg['readouts'][ch] # the number of triggers per shot will be filled in later, by trigger() or set_read_per_shot() cfg = {'trigs': 0} if self.USER_DURATIONS: cfg['length'] = self.us2cycles(ro_ch=ch, us=length) else: cfg['length'] = length cfg['length_us'] = self.cycles2us(cfg['length'], ro_ch=ch) # this number comes from the fact that the ADC is 12 bit + 3 bits from decimation = 15 bit # and the sum buffer values are 32 bit signed # TODO: check this math if cfg['length'] > 2**(31-15): logger.warning(f'With the given readout length there is a possibility that the sum buffer will overflow giving invalid results.') if 'tproc_ctrl' not in ro_cfg: # readout is controlled by PYNQ if freq is None: raise RuntimeError("frequency must be declared for a PYNQ-configured readout") cfg['freq'] = freq cfg['gen_ch'] = gen_ch cfg['ro_config'] = self.soccfg.calc_ro_regs(ro_cfg, phase, sel) else: # readout is controlled by tProc if phase!=0 or sel!='product' or freq is not None or gen_ch is not None: raise RuntimeError("this is a tProc-configured readout - freq/phase/sel parameters are set using tProc instructions") self.ro_chs[ch] = cfg
[docs] def config_readouts(self, soc): """Configure the readout channels specified in this program. This is usually called as part of an acquire() method. Parameters ---------- soc : QickSoc the QickSoc that will execute this program """ # because readout freqs need to account for mixer freqs, we can only compute freq registers here, after we know all gens have been declared # store PFB parameters in PFB list so we can check for collisions and configure the PFB pfbs = defaultdict(list) for ch, cfg in self.ro_chs.items(): rocfg = self.soccfg['readouts'][ch] if 'tproc_ctrl' not in rocfg: if cfg['gen_ch'] is not None and cfg['gen_ch'] in self.gen_chs and 'mixer_freq' in self.gen_chs[cfg['gen_ch']]: mixer_freq = self.gen_chs[cfg['gen_ch']]['mixer_freq']['rounded'] else: mixer_freq = None # add frequency ro_regs = cfg['ro_config'] self.soccfg.calc_ro_freq(rocfg, cfg, ro_regs, mixer_freq) if 'pfb_port' in rocfg: # if this is a muxed readout, don't write the settings yet ro_regs['pfb_port'] = rocfg['pfb_port'] pfbname = rocfg['ro_fullpath'] self.soccfg.check_pfb_collisions(rocfg, ro_regs, pfbs[pfbname]) pfbs[pfbname].append(ro_regs) else: # if this is a standard readout, save the settings and write them to the readout soc.configure_readout(ch, cfg['ro_config']) # write the mux settings for pfbpath, pfb_regs in pfbs.items(): sels = [x.get('sel') for x in pfb_regs] # all sels should be the same (if has_outsel=False, get() will return None) if len(set(sels)) != 1: raise RuntimeError("all declared readouts on a muxed readout must have the same 'sel' setting, you have %s" % (sels)) soc.config_mux_readout(pfbpath, pfb_regs, sels[0])
[docs] def config_bufs(self, soc, enable_avg=True, enable_buf=True): """Configure the readout buffers specified in this program. This is usually called as part of an acquire() method. Parameters ---------- soc : QickSoc the QickSoc that will execute this program enable_avg : bool enable the accumulated (averaging) buffer enable_buf : bool enable the decimated (waveform) buffer """ for ch, cfg in self.ro_chs.items(): if enable_avg: soc.config_avg(ch, address=0, length=cfg['length'], enable=True) if enable_buf: soc.config_buf(ch, address=0, length=cfg['length'], enable=True)
[docs] def declare_gen(self, ch, nqz=1, mixer_freq=None, mux_freqs=None, mux_gains=None, mux_phases=None, ro_ch=None): """Add a channel to the program's list of signal generators. If this is a generator with a mixer (interpolated or muxed generator), you may define a mixer frequency. If this is a muxed generator, the mux_freqs list must be long enough to define all the tones you will play. (in other words, if your mask list ever enables tone 2 you must define at least 3 freqs+gains) If your mux gen supports gains and/or phases and you define them, those lists must be the same length. If you don't define gains or phases, they will be set to defaults (max positive gain, zero phase). Parameters ---------- ch : int generator channel (index in 'gens' list) nqz : int, optional Nyquist zone (must be 1 or 2). Setting the NQZ to 2 increases output power in the 2nd/3rd Nyquist zones. mixer_freq : float, optional Mixer frequency (in MHz) mux_freqs : list of float, optional Tone frequencies for the muxed generator (in MHz). Positive and negative values are allowed. mux_gains : list of float, optional Tone amplitudes for the muxed generator (in range -1 to 1). mux_phases : list of float, optional Phases for the muxed generator (in degrees). ro_ch : int, optional readout channel for frequency-matching mixer and mux freqs """ cfg = { 'nqz': nqz, 'ro_ch': ro_ch } gencfg = self.soccfg['gens'][ch] if gencfg['has_mixer']: if mixer_freq is None: raise RuntimeError("generator %d has a digital mixer, but no mixer_freq was defined" % (ch)) cfg['mixer_freq'] = self.soccfg.calc_mixer_freq(ch, mixer_freq, nqz, ro_ch) else: if mixer_freq is not None: logger.warning("generator %d doesn't have a digital mixer, but mixer_freq was defined" % (ch)) if 'n_tones' in gencfg: if mux_freqs is None: raise RuntimeError("generator %d is multiplexed, but no mux_freqs were defined" % (ch)) if mux_gains is not None and not gencfg['has_gain']: logger.warning("generator %d doesn't support gain config, but mux_gains was defined" % (ch)) if mux_phases is not None and not gencfg['has_phase']: logger.warning("generator %d doesn't support phase config, but mux_phases was defined" % (ch)) cfg['mux_tones'] = self.soccfg.calc_muxgen_regs(ch, mux_freqs, mux_gains, mux_phases, ro_ch) else: if any([x is not None for x in [mux_freqs, mux_gains, mux_phases]]): logger.warning("generator %d is not multiplexed, but mux parameters were defined" % (ch)) if ro_ch is not None and not gencfg['has_mixer'] and 'n_tones' not in gencfg: logger.warning("ro_ch was defined for generator %d, but it's not multiplexed and doesn't have a mixer, so it will do nothing" % (ch)) self.gen_chs[ch] = cfg
[docs] def config_gens(self, soc): """Configure the signal generators specified in this program. This is usually called as part of an acquire() method. Parameters ---------- soc : QickSoc the QickSoc that will execute this program """ for ch, cfg in self.gen_chs.items(): soc.set_nyquist(ch, cfg['nqz']) if 'mixer_freq' in cfg: soc.set_mixer_freq(ch, cfg['mixer_freq']['setval']) if 'mux_tones' in cfg: soc.config_mux_gen(ch, cfg['mux_tones'])
[docs] def add_envelope(self, ch, name, idata=None, qdata=None): """Adds a waveform to the list of envelope waveforms available for this channel. The I and Q arrays must be of equal length, and the length must be divisible by the samples-per-clock of this generator. Parameters ---------- ch : int generator channel (index in 'gens' list) name : str Name of the pulse idata : array I data Numpy array qdata : array Q data Numpy array """ gencfg = self.soccfg['gens'][ch] length = [len(d) for d in [idata, qdata] if d is not None] if len(length)==0: raise RuntimeError("Error: no data argument was supplied") # if both arrays were defined, they must be the same length if len(length)>1 and length[0]!=length[1]: raise RuntimeError("Error: I and Q envelope lengths must be equal") length = length[0] if (length % gencfg['samps_per_clk']) != 0: raise RuntimeError("Error: envelope lengths must be an integer multiple of %d"%(gencfg['samps_per_clk'])) # currently, all gens with envelopes use int16 for I and Q data = np.zeros((length, 2), dtype=np.int16) for i, d in enumerate([idata, qdata]): if d is not None: # range check if np.max(np.abs(d)) > gencfg['maxv']: raise ValueError("max abs val of envelope (%d) exceeds limit (%d)" % (np.max(np.abs(d)), gencfg['maxv'])) # copy data data[:,i] = np.round(d) self.envelopes[ch]['envs'][name] = {"data": data, "addr": self.envelopes[ch]['next_addr']} self.envelopes[ch]['next_addr'] += length
[docs] def add_cosine(self, ch, name, length, maxv=None, even_length=False): """Adds a cosine to the envelope library. The envelope will peak at length/2. Duration units depend on the program type: tProc v1 programs use integer number of fabric clocks, tProc v2 programs use float us. Parameters ---------- ch : int generator channel (index in 'gens' list) name : str Name of the envelope length : int Total envelope length (in fabric clocks or us) maxv : float Value at the peak (if None, the max value for this generator will be used) even_length : bool If length is in us, round the envelope length to an even number of fabric clock cycles. This is useful for flat_top pulses, where the envelope gets split into two halves. """ gencfg = self.soccfg['gens'][ch] if maxv is None: maxv = gencfg['maxv']*gencfg['maxv_scale'] samps_per_clk = gencfg['samps_per_clk'] # convert to integer number of fabric clocks if self.USER_DURATIONS: if even_length: lenreg = 2*self.us2cycles(gen_ch=ch, us=length/2) else: lenreg = self.us2cycles(gen_ch=ch, us=length) else: lenreg = np.round(length) # convert to number of samples lenreg *= samps_per_clk self.add_envelope(ch, name, idata=cosine(length=lenreg, maxv=maxv))
[docs] def add_gauss(self, ch, name, sigma, length, maxv=None, even_length=False): """Adds a Gaussian to the envelope library. The envelope will peak at length/2. Duration units depend on the program type: tProc v1 programs use integer number of fabric clocks, tProc v2 programs use float us. Parameters ---------- ch : int generator channel (index in 'gens' list) name : str Name of the envelope sigma : float Standard deviation of the Gaussian (in fabric clocks or us) length : int or float Total envelope length (in fabric clocks or us) maxv : float Value at the peak (if None, the max value for this generator will be used) even_length : bool If length is in us, round the envelope length to an even number of fabric clock cycles. This is useful for flat_top pulses, where the envelope gets split into two halves. """ gencfg = self.soccfg['gens'][ch] if maxv is None: maxv = gencfg['maxv']*gencfg['maxv_scale'] samps_per_clk = gencfg['samps_per_clk'] # convert to integer number of fabric clocks if self.USER_DURATIONS: if even_length: lenreg = 2*self.us2cycles(gen_ch=ch, us=length/2) else: lenreg = self.us2cycles(gen_ch=ch, us=length) sigreg = self.us2cycles(gen_ch=ch, us=sigma) else: lenreg = np.round(length) sigreg = np.round(sigma) # convert to number of samples lenreg *= samps_per_clk sigreg *= samps_per_clk self.add_envelope(ch, name, idata=gauss(mu=lenreg/2-0.5, si=sigreg, length=lenreg, maxv=maxv))
[docs] def add_DRAG(self, ch, name, sigma, length, delta, alpha=0.5, maxv=None, even_length=False): """Adds a DRAG to the envelope library. The envelope will peak at length/2. Parameters ---------- ch : int generator channel (index in 'gens' list) name : str Name of the envelope sigma : float or float Standard deviation of the Gaussian (in fabric clocks or us) length : int or float Total envelope length (in fabric clocks or us) maxv : float Value at the peak (if None, the max value for this generator will be used) delta : float anharmonicity of the qubit (units of MHz) alpha : float alpha parameter of DRAG (order-1 scale factor) even_length : bool If length is in us, round the envelope length to an even number of fabric clock cycles. This is useful for flat_top pulses, where the envelope gets split into two halves. """ gencfg = self.soccfg['gens'][ch] if maxv is None: maxv = gencfg['maxv']*gencfg['maxv_scale'] samps_per_clk = gencfg['samps_per_clk'] f_fabric = gencfg['f_fabric'] delta /= samps_per_clk*f_fabric # convert to integer number of fabric clocks if self.USER_DURATIONS: if even_length: lenreg = 2*self.us2cycles(gen_ch=ch, us=length/2) else: lenreg = self.us2cycles(gen_ch=ch, us=length) sigreg = self.us2cycles(gen_ch=ch, us=sigma) else: lenreg = np.round(length) sigreg = np.round(sigma) # convert to number of samples lenreg *= samps_per_clk sigreg *= samps_per_clk idata, qdata = DRAG(mu=lenreg/2-0.5, si=sigreg, length=lenreg, maxv=maxv, alpha=alpha, delta=delta) self.add_envelope(ch, name, idata=idata, qdata=qdata)
[docs] def add_triangle(self, ch, name, length, maxv=None, even_length=False): """Adds a triangle to the envelope library. The envelope will peak at length/2. Duration units depend on the program type: tProc v1 programs use integer number of fabric clocks, tProc v2 programs use float us. Parameters ---------- ch : int generator channel (index in 'gens' list) name : str Name of the envelope length : int or float Total envelope length (in fabric clocks or us) maxv : float Value at the peak (if None, the max value for this generator will be used) even_length : bool If length is in us, round the envelope length to an even number of fabric clock cycles. This is useful for flat_top pulses, where the envelope gets split into two halves. """ gencfg = self.soccfg['gens'][ch] if maxv is None: maxv = gencfg['maxv']*gencfg['maxv_scale'] samps_per_clk = gencfg['samps_per_clk'] # convert to integer number of fabric clocks if self.USER_DURATIONS: if even_length: lenreg = 2*self.us2cycles(gen_ch=ch, us=length/2) else: lenreg = self.us2cycles(gen_ch=ch, us=length) else: lenreg = np.round(length) # convert to number of samples lenreg *= samps_per_clk self.add_envelope(ch, name, idata=triang(length=lenreg, maxv=maxv))
[docs] def load_pulses(self, soc): """Loads pulses that were added using add_envelope into the SoC's signal generator memories. Parameters ---------- soc : Qick object Qick object """ for iCh, pulses in enumerate(self.envelopes): for name, pulse in pulses['envs'].items(): soc.load_pulse_data(iCh, data=pulse['data'], addr=pulse['addr'])
def reset_timestamps(self, gen_t0=None): # used by init and sync_all() self._gen_ts = [0]*len(self._gen_ts) if gen_t0 is None else gen_t0.copy() self._ro_ts = [0]*len(self._ro_ts) def decrement_timestamps(self, t): # used by sync() in v2 self._gen_ts = [max(0, x-t) for x in self._gen_ts] self._ro_ts = [max(0, x-t) for x in self._ro_ts] def get_timestamp(self, gen_ch=None, ro_ch=None): if gen_ch is not None and ro_ch is not None: raise RuntimeError("can't specify both gen_ch and ro_ch!") if gen_ch is not None: return self._gen_ts[gen_ch] elif ro_ch is not None: return self._ro_ts[ro_ch] else: raise RuntimeError("must specify gen_ch or ro_ch!") def set_timestamp(self, val, gen_ch=None, ro_ch=None): if gen_ch is not None and ro_ch is not None: raise RuntimeError("can't specify both gen_ch and ro_ch!") if gen_ch is not None: self._gen_ts[gen_ch] = val elif ro_ch is not None: self._ro_ts[ro_ch] = val else: raise RuntimeError("must specify gen_ch or ro_ch!") def get_max_timestamp(self, gens=True, ros=True, gen_t0=None): timestamps = [] if gens: if gen_t0 is None: timestamps += list(self._gen_ts) else: gen_ts_copy = np.copy(self._gen_ts) gen_t0_copy = np.copy(gen_t0) timestamps += list(np.maximum(gen_ts_copy - gen_t0_copy, 0)) if ros: timestamps += list(self._ro_ts) return max(timestamps)
[docs]class AcquireMixin: """Adds acquire() and acquire_decimated() methods for acquiring readout data, and run_rounds() for running repeatedly without acquisition. Program classes that use this mixin must call setup_acquire() after _init_prog() and before acquire()/acquire_decimated(). """ def __init__(self, *args, **kwargs): # pass through any init arguments super().__init__(*args, **kwargs) # Attributes to dump when saving the program to JSON. self.dump_keys += ['counter_addr', 'reads_per_shot', 'loop_dims', 'avg_level'] # measurements from the most recent acquisition # raw I/Q data without normalizing to window length or averaging over reps self.d_buf = None # shot-by-shot threshold classification self.shots = None def _init_declarations(self): super()._init_declarations() # tProc address of the rep counter, must be defined self.counter_addr = None # data dimensions, must be defined: # number of times each readout is triggered in a single shot self.reads_per_shot = None # list of loop dimensions, outermost loop first self.loop_dims = None # which loop level to average over (0 is outermost) self.avg_level = None
[docs] def setup_counter(self, counter_addr, loop_dims): """Set the parameters needed to track the progress of the program. This is a subset of setup_acquire(), appropriate for programs where you have no readouts. You should use this if you're updating a tProc counter and want to use it to track program progress. Parameters ---------- counter_addr : int The special tProc address holding the number of shots read out thus far. loop_dims : list of int List of loop dimensions, outermost loop first. """ self.counter_addr = counter_addr self.loop_dims = loop_dims
[docs] def setup_acquire(self, counter_addr, loop_dims, avg_level): """Set the parameters needed to define the data acquisition. Since the number of readouts per shot is set based on calls to trigger(), this should be called after the program has been fully defined. Parameters ---------- counter_addr : int The special tProc address holding the number of shots read out thus far. loop_dims : list of int List of loop dimensions, outermost loop first. avg_level : int Which loop level to average over (0 is outermost). """ # this doesn't work unless trigger macros have been processed, so compile if we haven't already compiled if self.binprog is None: self.compile() self.setup_counter(counter_addr, loop_dims) self.avg_level = avg_level self.reads_per_shot = [ro['trigs'] for ro in self.ro_chs.values()]
[docs] def set_reads_per_shot(self, reads_per_shot): """Override the default count of readout triggers per shot. This should be called after setup_acquire(). You probably shouldn't be using this method; the default value is usually correct. Parameters ---------- reads_per_shot : int or list of int Number of readout triggers per shot. If int, all declared readout channels use this value. """ try: self.reads_per_shot = [int(reads_per_shot)]*len(self.ro_chs) except TypeError: self.reads_per_shot = reads_per_shot
[docs] def get_raw(self): """Get the raw integer I/Q values (before normalizing to the readout window, averaging across reps, removing the readout offset, or thresholding). Returns ------- list of ndarray Array of I/Q values for each readout channel. """ return self.d_buf
[docs] def get_shots(self): """Get the shot-by-shot threshold decisions. Returns ------- list of ndarray Array of shots for each readout channel. """ return self.shots
[docs] def acquire(self, soc, soft_avgs=1, load_pulses=True, start_src="internal", threshold=None, angle=None, progress=True, remove_offset=True): """Acquire data using the accumulated readout. Parameters ---------- soc : QickSoc Qick object soft_avgs : int number of times to rerun the program, averaging results in software (aka "rounds") load_pulses : bool if True, load pulse envelopes start_src: str "internal" (tProc starts immediately) or "external" (each round waits for an external trigger) threshold : float or list of float The threshold(s) to apply to the I values after rotation. Length-normalized units (same units as the output of acquire()). If scalar, the same threshold will be applied to all readout channels. A list must have length equal to the number of declared readout channels. angle : float or list of float The angle to rotate the I/Q values by before applying the threshold. Units of radians. If scalar, the same angle will be applied to all readout channels. A list must have length equal to the number of declared readout channels. progress: bool if true, displays progress bar remove_offset: bool Some readouts (muxed and tProc-configured) introduce a small fixed offset to the I and Q values of every decimated sample. This subtracts that offset, if any, before returning the averaged IQ values or rotating to apply software thresholding. Returns ------- ndarray averaged IQ values (float) divided by the length of the RO window, and averaged over reps and rounds if threshold is defined, the I values will be the fraction of points over threshold dimensions for a simple averaging program: (n_ch, n_reads, 2) dimensions for a program with multiple expts/steps: (n_ch, n_reads, n_expts, 2) """ self.config_all(soc, load_pulses=load_pulses) if any([x is None for x in [self.counter_addr, self.loop_dims, self.avg_level]]): raise RuntimeError("data dimensions need to be defined with setup_acquire() before calling acquire()") # configure tproc for internal/external start soc.start_src(start_src) n_ro = len(self.ro_chs) total_count = functools.reduce(operator.mul, self.loop_dims) self.d_buf = [np.zeros((*self.loop_dims, nreads, 2), dtype=np.int32) for nreads in self.reads_per_shot] self.stats = [] # select which tqdm progress bar to show hiderounds = True hidereps = True if progress: if soft_avgs>1: hiderounds = False else: hidereps = False # avg_d doesn't have a specific shape here, so that it's easier for child programs to write custom _average_buf avg_d = None for ir in tqdm(range(soft_avgs), disable=hiderounds): # Configure and enable buffer capture. self.config_bufs(soc, enable_avg=True, enable_buf=False) count = 0 with tqdm(total=total_count, disable=hidereps) as pbar: soc.start_readout(total_count, counter_addr=self.counter_addr, ch_list=list(self.ro_chs), reads_per_shot=self.reads_per_shot) while count<total_count: new_data = obtain(soc.poll_data()) for new_points, (d, s) in new_data: for ii, nreads in enumerate(self.reads_per_shot): # use reshape to view the d_buf array in a shape that matches the raw data self.d_buf[ii].reshape((-1,2))[count*nreads:(count+new_points)*nreads] = d[ii] count += new_points self.stats.append(s) pbar.update(new_points) # if we're thresholding, apply the threshold before averaging if threshold is None: d_reps = self.d_buf round_d = self._average_buf(d_reps, self.reads_per_shot, length_norm=True, remove_offset=remove_offset) else: d_reps = [np.zeros_like(d) for d in self.d_buf] self.shots = self._apply_threshold(self.d_buf, threshold, angle, remove_offset=remove_offset) for i, ch_shot in enumerate(self.shots): d_reps[i][...,0] = ch_shot round_d = self._average_buf(d_reps, self.reads_per_shot, length_norm=False) # sum over rounds axis if avg_d is None: avg_d = round_d else: for ii, d in enumerate(round_d): avg_d[ii] += d # divide total by rounds for d in avg_d: d /= soft_avgs return avg_d
def _average_buf(self, d_reps: np.ndarray, reads_per_shot: list, length_norm: bool=True, remove_offset: bool=True) -> np.ndarray: """ calculate averaged data in a data acquire round. This function should be overwritten in the child qick program if the data is created in a different shape. :param d_reps: buffer data acquired in a round :param reads_per_shot: readouts per experiment :param length_norm: normalize by readout window length (disable for thresholded values) :param remove_offset: if normalizing by length, also subtract the readout's IQ offset if any :return: averaged iq data after each round. """ avg_d = [] for i_ch, (ro_ch, ro) in enumerate(self.ro_chs.items()): # average over the avg_level avg = d_reps[i_ch].sum(axis=self.avg_level) / self.loop_dims[self.avg_level] if length_norm: avg /= ro['length'] if remove_offset: offset = self.soccfg['readouts'][ro_ch]['iq_offset'] avg -= offset # the reads_per_shot axis should be the first one avg_d.append(np.moveaxis(avg, -2, 0)) return avg_d def _apply_threshold(self, d_buf, threshold, angle, remove_offset): """ This method converts the raw I/Q data to single shots according to the threshold and rotation angle Parameters ---------- d_buf : list of ndarray Raw IQ data threshold : float or list of float The threshold(s) to apply to the I values after rotation. Length-normalized units (same units as the output of acquire()). If scalar, the same threshold will be applied to all readout channels. A list must have length equal to the number of declared readout channels. angle : float or list of float The angle to rotate the I/Q values by before applying the threshold. Units of radians. If scalar, the same angle will be applied to all readout channels. A list must have length equal to the number of declared readout channels. remove_offset: bool Subtract the readout's IQ offset, if any. Returns ------- list of ndarray Single shot data """ # try to convert threshold to list of floats; if that fails, assume it's already a list try: thresholds = [float(threshold)]*len(self.ro_chs) except TypeError: thresholds = threshold # angle is 0 if not specified if angle is None: angle = 0.0 try: angles = [float(angle)]*len(self.ro_chs) except TypeError: angles = angle shots = [] for i_ch, (ro_ch, ro) in enumerate(self.ro_chs.items()): avg = d_buf[i_ch]/ro['length'] if remove_offset: offset = self.soccfg['readouts'][ro_ch]['iq_offset'] avg -= offset rotated = np.inner(avg, [np.cos(angles[i_ch]), np.sin(angles[i_ch])]) shots.append(np.heaviside(rotated - thresholds[i_ch], 0)) return shots
[docs] def get_time_axis(self, ro_index): """Get an array usable as the time axis for plotting decimated data. Parameters ---------- ro_index : int Index of the readout channel in this program. The first readout declared in your program has index 0 and it will have index 0 in the output array, etc. Returns ------- ndarray of float An array starting at 0 and spaced by the time (in us) per decimated sample. """ ch, ro = list(self.ro_chs.items())[ro_index] return self.soccfg.cycles2us(ro_ch=ch, cycles=np.arange(ro['length']))
[docs] def get_time_axis_ddr4(self, ro_ch, data): """Get an array usable as the time axis for plotting DDR4 data. Parameters ---------- ro_ch : int readout channel (index in 'readouts' list) data : ndarray DDR4 data array, the returned array will have the same length. Returns ------- ndarray of float An array starting at 0 and spaced by the time (in us) per decimated sample. """ return self.soccfg.cycles2us(ro_ch=ro_ch, cycles=np.arange(data.shape[0]))
[docs] def get_time_axis_mr(self, ro_ch, data): """Get an array usable as the time axis for plotting MR data. Parameters ---------- ro_ch : int readout channel (index in 'readouts' list) data : ndarray MR data array, the returned array will have the same length. Returns ------- ndarray of float An array starting at 0 and spaced by the time (in us) per MR sample. """ return np.arange(data.shape[0])/self.soccfg['readouts'][ro_ch]['fs']
[docs] def run_rounds(self, soc, rounds=1, load_pulses=True, start_src="internal", progress=True): """Run the program and wait until it completes, once or multiple times. No data will be saved. Parameters ---------- soc : QickSoc Qick object rounds : int number of times to rerun the program load_pulses : bool if True, load pulse envelopes start_src: str "internal" (tProc starts immediately) or "external" (each round waits for an external trigger) progress: bool if true, displays progress bar """ self.config_all(soc, load_pulses=load_pulses) if any([x is None for x in [self.counter_addr, self.loop_dims]]): raise RuntimeError("data dimensions need to be defined with setup_acquire() before calling run_rounds()") # configure tproc for internal/external start soc.start_src(start_src) total_count = functools.reduce(operator.mul, self.loop_dims) # select which tqdm progress bar to show hiderounds = True hidereps = True if progress: if rounds>1: hiderounds = False else: hidereps = False # run each round for ii in tqdm(range(rounds), disable=hiderounds): # make sure count variable is reset to 0 soc.set_tproc_counter(addr=self.counter_addr, val=0) # run the assembly program # if start_src="external", you must pulse the trigger input once for every round soc.start_tproc() count = 0 with tqdm(total=total_count, disable=hidereps) as pbar: while count < total_count: newcount = soc.get_tproc_counter(addr=self.counter_addr) pbar.update(newcount-count) count = newcount
[docs] def acquire_decimated(self, soc, soft_avgs, load_pulses=True, start_src="internal", progress=True, remove_offset=True): """Acquire data using the decimating readout. Parameters ---------- soc : QickSoc Qick object soft_avgs : int number of times to rerun the program, averaging results in software (aka "rounds") load_pulses : bool if True, load pulse envelopes start_src: str "internal" (tProc starts immediately) or "external" (each round waits for an external trigger) progress: bool if true, displays progress bar remove_offset: bool Subtract the readout's IQ offset, if any. Returns ------- list of ndarray decimated values, averaged over rounds (float) dimensions for a single-rep, single-read program : (length, 2) multi-rep or multi-read: (n_reps*n_reads, length, 2) multi-rep and multi-read: (n_reps, n_reads, length, 2) """ self.config_all(soc, load_pulses=load_pulses) if any([x is None for x in [self.counter_addr, self.loop_dims, self.avg_level]]): raise RuntimeError("data dimensions need to be defined with setup_acquire() before calling acquire_decimated()") # configure tproc for internal/external start soc.start_src(start_src) total_count = functools.reduce(operator.mul, self.loop_dims) # Initialize data buffers # buffer for decimated data dec_buf = [] for ch, ro in self.ro_chs.items(): maxlen = self.soccfg['readouts'][ch]['buf_maxlen'] if ro['length']*ro['trigs']*total_count > maxlen: raise RuntimeError("Warning: requested readout length (%d x %d trigs x %d reps) exceeds buffer size (%d)"%(ro['length'], ro['trigs'], total_count, maxlen)) dec_buf.append(np.zeros((ro['length']*total_count*ro['trigs'], 2), dtype=float)) # for each soft average, run and acquire decimated data for ii in tqdm(range(soft_avgs), disable=not progress): # buffer for accumulated data (for convenience/debug) self.d_buf = [] # Configure and enable buffer capture. self.config_bufs(soc, enable_avg=True, enable_buf=True) # make sure count variable is reset to 0 soc.set_tproc_counter(addr=self.counter_addr, val=0) # run the assembly program # if start_src="external", you must pulse the trigger input once for every round soc.start_tproc() count = 0 while count < total_count: count = soc.get_tproc_counter(addr=self.counter_addr) for ii, (ch, ro) in enumerate(self.ro_chs.items()): dec_buf[ii] += obtain(soc.get_decimated(ch=ch, address=0, length=ro['length']*ro['trigs']*total_count)) self.d_buf.append(obtain(soc.get_accumulated(ch=ch, address=0, length=ro['trigs']*total_count).reshape((*self.loop_dims, ro['trigs'], 2)))) onetrig = all([ro['trigs']==1 for ro in self.ro_chs.values()]) # average the decimated data result = [] for ii, (ch, ro) in enumerate(self.ro_chs.items()): d_avg = dec_buf[ii]/soft_avgs if remove_offset: offset = self.soccfg['readouts'][ch]['iq_offset'] d_avg -= offset if total_count == 1 and onetrig: # simple case: data is 1D (one rep and one shot), just average over rounds result.append(d_avg) else: # split the data into the individual reps if onetrig or total_count==1: d_reshaped = d_avg.reshape(total_count*ro['trigs'], -1, 2) else: d_reshaped = d_avg.reshape(total_count, ro['trigs'], -1, 2) result.append(d_reshaped) return result