Source code for qick.streamer

from threading import Thread, Event
from queue import Queue
import time
import numpy as np
import traceback

# This code originally used Process not Thread.
# Process is much slower to start (Process.start() is ~100 ms, Thread.start() is a few ms)
# The process-safe versions of Queue and Event are also significantly slower.
# On the other hand, CPU-bound Python threads can't run in parallel ("global interpreter lock").
# The overall problem is not CPU-bound - we should always be limited by tProc execution.
# In the worst case where the tProc is running fast, we should actually be waiting for IO a lot (due to the DMA).
# So we think it's safe to use threads.
# However, this is a complicated problem and we may ultimately need to mess around with sys.setswitchinterval() or go back to Process.
# To use Process instead of Thread, use the following import and change WORKERTYPE.
#from multiprocessing import Process, Queue, Event

[docs]class DataStreamer(): """ Uses a separate thread to read data from the average buffers. The class methods define the readout loop and initialization of the worker thread. The QickSoc methods start_readout() and poll_data() are the external interface to the streamer. We don't lock the QickSoc or the IPs. The user is responsible for not disrupting a readout in progress. :param soc: The QickSoc object. :type soc: QickSoc """ #WORKERTYPE = Process WORKERTYPE = Thread def __init__(self, soc): self.soc = soc self.start_worker() def start_worker(self): # Initialize flags and queues. # Passes run commands from the main thread to the worker thread. self.job_queue = Queue() # Passes data from the worker thread to the main thread. self.data_queue = Queue() # Passes exceptions from the worker thread to the main thread. self.error_queue = Queue() # The main thread can use this flag to tell the worker thread to stop. # The main thread clears the flag when starting readout. self.stop_flag = Event() # The worker thread uses this to tell the main thread when it's done. # The main thread clears the flag when starting readout. self.done_flag = Event() self.done_flag.set() # Process object for the streaming readout. # daemon=True means the readout thread will be killed if the parent is killed self.readout_worker = self.WORKERTYPE(target=self._run_readout, daemon=True) self.readout_worker.start()
[docs] def stop_readout(self): """ Signal the readout loop to break. """ self.stop_flag.set()
[docs] def readout_running(self): """ Test if the readout loop is running. :return: readout thread status :rtype: bool """ return not self.done_flag.is_set()
[docs] def data_available(self): """ Test if data is available in the queue. :return: data queue status :rtype: bool """ return not self.data_queue.empty()
def _run_readout(self): """ Worker thread for the streaming readout :param total_count: Number of data points expected :type addr: int :param counter_addr: Data memory address for the loop counter :type counter_addr: int :param ch_list: List of readout channels :type addr: list of int :param reads_per_count: Number of data points to expect per counter increment :type reads_per_count: list of int """ while True: try: # wait for a job total_shots, counter_addr, ch_list, reads_per_count, stride = self.job_queue.get(block=True) #print("streamer loop: start", total_count) shots = 0 last_shots = 0 # how many shots worth of data to transfer at a time if stride is None: stride = int(0.1 * self.soc.get_avg_max_length(0)/max(reads_per_count)) # bigger stride is more efficient, but the transfer size must never exceed AVG_MAX_LENGTH, so the stride should be set with some safety margin # make sure count variable is reset to 0 before starting processor self.soc.set_tproc_counter(addr=counter_addr, val=0) stats = [] t_start = time.time() # if the tproc is configured for internal start, this will start the program # for external start, the program will not start until a start pulse is received self.soc.start_tproc() # Keep streaming data until you get all of it while last_shots < total_shots: if self.stop_flag.is_set(): print("streamer loop: got stop flag") break shots = self.soc.get_tproc_counter(addr=counter_addr) # wait until either you've gotten a full stride of measurements or you've finished (so you don't go crazy trying to download every measurement) if shots >= min(last_shots+stride, total_shots): newshots = shots-last_shots # buffer for each channel d_buf = [None for nreads in reads_per_count] # for each adc channel get the single shot data and add it to the buffer for iCh, ch in enumerate(ch_list): newpoints = newshots*reads_per_count[iCh] if newpoints >= self.soc.get_avg_max_length(ch): raise RuntimeError("Overflowed the averages buffer (%d unread samples >= buffer size %d)." % (newpoints, self.soc.get_avg_max_length(ch)) + "\nYou need to slow down the tProc by increasing relax_delay." + "\nIf the TQDM progress bar is enabled, disabling it may help.") addr = last_shots * reads_per_count[iCh] % self.soc.get_avg_max_length(ch) data = self.soc.get_accumulated(ch=ch, address=addr, length=newpoints) d_buf[iCh] = data last_shots += newshots stats = (time.time()-t_start, shots, addr, newshots) self.data_queue.put((newshots, (d_buf, stats))) #if last_count==total_count: print("streamer loop: normal completion") except Exception as e: print("streamer loop: got exception") # traceback.print_exc() # pass the exception to the main thread self.error_queue.put(e) # put dummy data in the data queue, to trigger a poll_data read self.data_queue.put((0, (None, None))) finally: # we should set the done flag regardless of whether we completed readout, used the stop flag, or errored out self.done_flag.set() # set tproc for internal start so we don't run the program repeatedly (this also clears the internal-start register) self.soc.start_src("internal")