Friday, September 5, 2014


Getting PyQt5 Coroutines Without Dragging In Another Event Loop

Suppose you are in the middle of implementing a big project in Python with PyQt5.  You find yourself needing to perform consecutive asynchronous operations without blocking, and you want to avoid writing a state machine to pick up where you left off after each asynchronous operation completes.

The most sensible representation of sequential asynchronous operations is no different than the most sensible representation of sequential synchronous operations: sequential calls within a function.  This is accomplished by using coroutines.

Ok, Python's new async library (Tulip) has coroutines.  Not ok: Tulip's coroutines are dispatched by its own event loop.

Ok, gevent has coroutines.  Not ok: gevent coroutines are dispatched by gevent's event loop.

Ok, this that and the other big fat framework has coroutines.  Not ok: the same sad story of requiring an event loop that is not Qt's event loop!

You could write a Qt event dispatcher in C++ enabling Qt to sit on someone else's event loop - for example, the event loop your async library obstinately insists upon having.  This is a huge pain.  Qt's event loop is perfectly fine.  Another event loop is neither wanted nor needed.  A whole raft of AsyncServer and Protocol classes and Sockets and Streams are neither wanted nor needed.  Again, we're happy with Qt's event loop.  We're happy with Qt's sockets.  We're smitten with QSerialPort - it can signal me when data arrives on the port, which pyserial certainly can not.  Can all these other event loops and networking classes please get lost and let me do my own coroutine dispatch upon Qt signal receipt?

In fact, yes.  Here's how to do it: use greenlets and dispatch them from your Qt slots.  It works everywhere (even in IPython after doing %gui qt) it's easy to understand, it's easy to explain, it doesn't bring in another godforsaken event loop, and it takes only a few lines of code.

Here's a worthlessly simple example from an interactive IPython command line session:

In [1]: import greenlet

In [2]: from PyQt5 import Qt

In [3]: %gui qt

In [4]: button = Qt.QPushButton('resume')

In [5]:

In [6]: def f0():
   ...:     print('kicking off an async operation.  The async operation is waiting-for-you-to-press-button.')
   ...:     greenlet.getcurrent().parent.switch()
   ...:     print('ok, you have pressed the button.  Press it again.')
   ...:     greenlet.getcurrent().parent.switch()
   ...:     print('and you pressed it again.  I think we are done here.')

In [7]: g0 = greenlet.greenlet(f0)

In [8]: def onPress():
   ...:     g0.switch()

In [9]: button.clicked.connect(onPress)

In [10]: g0.switch()
kicking off an async operation.  The async operation is waiting-for-you-to-press-button.
Out[10]: ()

In [11]: ok, you have pressed the button.  Press it again.
and you pressed it again.  I think we are done here.

In [11]: g0 = greenlet.greenlet(f0)

In [12]: g0.switch()
kicking off an async operation.  The async operation is waiting-for-you-to-press-button.
Out[12]: ()

In [13]: 1+1
Out[13]: 2

In [14]: 2+2
Out[14]: 4

In [15]: print('hi mom')
hi mom

In [16]: ok, you have pressed the button.  Press it again.

In [16]: 3+3
Out[16]: 6

In [17]: print('hi dad')
hi dad

In [18]: and you pressed it again.  I think we are done here.

You may ask, "what is greenlet.getcurrent().parent.switch()doing?"  It switches back to the implicit always extant greenlet that, thanks to the %gui qt command, is running the Qt event loop and associated input hook and whatnot.  It doesn't matter what it's doing, really.  Only that we switched back to it matters.

Because there are only two kinds of example in this world, here's one of the second kind: a worthlessly over-complicated example.  This one is meant to execute as a stand-alone program:

import greenlet
from PyQt5 import Qt
import sys
from acquisition.pedals.pedals import Pedals

class WaitDialog(Qt.QDialog):
    def __init__(self, dialog, pedalIdx, mainGt):
        self._dialog = dialog
        self._pedalIdx = pedalIdx
        self._mainGt = mainGt
        self.setWindowTitle('Green Qt Test')
        self.layout().addWidget(Qt.QLabel('Waiting for pedal {} up...'.format(self._pedalIdx)))
        self._cancelBtn = Qt.QPushButton('cancel')
        self.waitForPedalGt = greenlet.greenlet(self.waitForPedal)

    def waitForPedal(self):
        if self._mainGt.switch():
            Qt.QMessageBox.information(self, 'Green Qt Test', 'Pedal {} up received.'.format(self._pedalIdx))
        self._dialog.pedalWaitDlgs[self._pedalIdx] = None

    def cancel(self):

class Dialog(Qt.QDialog):
    def __init__(self, mainGt):
        self._mainGt = mainGt
        self.setWindowTitle('Green Qt Test')
        layout = Qt.QVBoxLayout()

        self.quitButton = Qt.QPushButton('quit')

        self.pedalWaitBtns = [None, None]
        self.pedalWaitDlgs = [None, None]

        self.pedalWaitBtns[0] = Qt.QPushButton('wait for pedal 0 up')
        self.pedalWaitBtns[0].clicked.connect(lambda: self.pedalWaitBtnClickedSlot(0))

        self.pedalWaitBtns[1] = Qt.QPushButton('wait for pedal 1 up')
        self.pedalWaitBtns[1].clicked.connect(lambda: self.pedalWaitBtnClickedSlot(1))

        self.pedals = Pedals()

    def pedalWaitBtnClickedSlot(self, pedalIdx):
        if self.pedalWaitDlgs[pedalIdx] is None:
            self.pedalWaitDlgs[pedalIdx] = WaitDialog(self, pedalIdx, self._mainGt)

    def pedalUpChangedSlot(self, pedalIdx, isUp):
        if isUp and self.pedalWaitDlgs[pedalIdx] is not None:

if __name__ == '__main__':
    app = Qt.QApplication(sys.argv)

    def eventLoop():

    eventLoopGt = greenlet.greenlet(eventLoop)
    dialog = Dialog(eventLoopGt)

Another worthlessly over-complicated example, this one meant to be run from within any context, including IPython interactive session:

import greenlet
import numpy
from PyQt5 import Qt
import sys
import time

def _brenner(im, direction):
    if direction == 'h':
        xo = 2
        yo = 0
    elif direction == 'v':
        xo = 0
        yo = 2
        raise ValueError('direction must be h or v.')
    iml = numpy.pad(im[0:im.shape[0]-yo, 0:im.shape[1]-xo], ((yo, 0), (xo, 0)), mode='constant')
    imr = im.copy()
    if direction == 'h':
        imr[:, :xo] = 0
        imr[:yo, :] = 0
    return iml - imr

def brennerFocusMeasure(im):
    imh = _brenner(im, 'h')
    imv = _brenner(im, 'v')
    return numpy.sqrt(imh**2 + imv**2)

class LinearSearchAutofocuser:
    def __init__(self, camera, zDrive, minZ, maxZ, stepsPerRound, numberOfRounds, focusMeasure=brennerFocusMeasure, rw=None):
        self._running = False
        self._camera = camera
        self._zDrive = zDrive
        self._zRange = (minZ, maxZ)
        self._stepsPerRound = stepsPerRound
        self._numberOfRounds = numberOfRounds
        self._focusMeasure = focusMeasure
        self._rw = rw
        self.idleGt = greenlet.getcurrent()
        self.runGt = greenlet.greenlet(self._run)
        self.bestZ = None

    def abort(self):
        if self._running:

    def _run(self):
        t0 = time.time()
        self.bestZ = None
        self._running = True

        curZRange = self._zRange
        buffers = [self._camera.makeAcquisitionBuffer() for i in range(self._stepsPerRound)]
        for roundIdx in range(0, self._numberOfRounds):
            print('Starting autofocus round {}.'.format(roundIdx))
            fmvs = []
            stepIdxsDone = []
            if roundIdx == 0:
                # The first round computes focus measures for every step in the Z range, including endpoints
                steps = numpy.linspace(*curZRange, num=self._stepsPerRound)
                # Every subsequent round's Z range is the interval between the previous round's Z steps bracketing the Z step
                # with the highest focus measure value:
                # ----- Previous Z step above best previous Z
                #   | --- Current Z step
                #   | --- "
                #   | --- "
                # ----- Previous Z step with best focus measure value
                #   | --- Current Z step
                #   | --- "
                #   | --- "
                # ----- Previous Z step below best previous Z
                # So, if a subsequent round's step sequence is computed in the same manner as the first round's step sequence,
                # it will include the previous round's bracketing Z step positions as endpoints, repeating the focus measure
                # computation for those positions, increasing the computational expense of the linear search.  This is most clearly
                # illustrated by the case where stepsPerRound is 3 and first round best Z step is the middle position: in this case,
                # the linear search will repeat the same calculations at the same Z step positions for an arbitrary number of rounds,
                # failing to refine the best Z step position.
                # This is avoided while maintaining uniform step size by treating subsequent Z ranges as an open interval bounded
                # by bracketing Z step positions.  If stepsPerRound is odd, the best previous Z step position focus measure is
                # still recomputed, but this is considered acceptable.
                steps = numpy.linspace(*curZRange, num=self._stepsPerRound+2)[1:-1]
            self._camera.shutter = self._camera.Shutter.Rolling
            self._camera.triggerMode = self._camera.TriggerMode.Software
            self._camera.cycleMode = self._camera.CycleMode.Fixed
            self._camera.frameCount = self._stepsPerRound

            for buffer in buffers:

            for stepIdx, z in enumerate(steps):
                self._zDrive.pos = z
                # Return to main event loop (or whatever was the enclosing greenlet when this class was instantiated) until
                # the stage stops moving, aborting and cleaning up if resumed with switch(False)
                if not self.idleGt.switch():
                    print('Autofocus aborted.')
                    self._running = False
                    raise greenlet.GreenletExit()
                # Resuming after stage has moved
                actualZ = self._zDrive.pos
                if abs(actualZ - z) > self._zDrive._factor - sys.float_info.epsilon:
                    w = 'Current Z position ({0}) does not match requested Z position ({1}).  '
                    w+= 'The autofocus step for {1} is being skipped.  This can occur if the requested Z position '
                    w+= 'is out of range or if the scope\'s Z position controller has been moved during the '
                    w+= 'auto-focus operation.'
                    print(w.format(actualZ, z))
                # Stage Z position is actually what we requested.  Command exposure.
                print('exposed ', stepIdx)

            # Retrieve, show, and process resulting exposures
            for bufferIdx, stepIdx in enumerate(stepIdxsDone):
                print('got buffer', bufferIdx)
                buffer = buffers[bufferIdx]
                if self._rw is not None:
                im = buffer.astype(numpy.float32) / 65535
                fmv = (self._focusMeasure(im)**2).sum()
                print('round={:02}, z={:<10}, focus_measure={}'.format(roundIdx, steps[stepIdx], fmv))
                fmvs.append((steps[stepIdx], fmv))


            if len(fmvs) == 0:
                print('Failed to move to any of the Z step positions.')
                self._running = False
                raise greenlet.GreenletExit()
            if len(fmvs) == 1:
                print('Successfully moved to only one of the Z step positions, making that position the best found.')
                self.bestZ = fmvs[0][0]
                self._running = False
                raise greenlet.GreenletExit()

            bestZIdx = numpy.array([fmv[1] for fmv in fmvs], dtype=numpy.float64).argmax()

            if roundIdx + 1 < self._numberOfRounds:
                # Next round, search the range between the steps adjacent to the best step
                curZRange = ( steps[max(bestZIdx - 1, 0)],
                              steps[min(bestZIdx + 1, len(fmvs) - 1)] )
                # There is no next round.  Store result.
                self.bestZ = steps[bestZIdx]

        print('Autofocus completed ({}s).'.format(time.time() - t0))
        self._running = False

    def _zDrivePosChanged(self, _):
        if self._running and not self._zDrive.moving:

def linearSearchAutofocus(camera, zDrive, minZ, maxZ, stepsPerRound, numberOfRounds, focusMeasure=brennerFocusMeasure, rw=None):
    autofocuser = _Autofocuser(camera, zDrive, minZ, maxZ, stepsPerRound, numberOfRounds, rw)
    # blocks until the greenlet is dead and is the correct call to use when blocking behavior is desired
    return autofocuser.bestZ


Post a Comment

Subscribe to Post Comments [Atom]

<< Home


July 2009   August 2009   September 2009   October 2009   November 2009   December 2009   January 2010   September 2010   December 2010   January 2011   February 2011   April 2011   June 2011   August 2011   February 2012   June 2012   July 2012   August 2012   October 2012   November 2012   January 2014   April 2014   June 2014   August 2014   September 2014   October 2014   January 2015   March 2015   April 2015   June 2015   November 2015   December 2015   January 2016   June 2016   August 2016   January 2017   March 2017   April 2018  

This page is powered by Blogger. Isn't yours?

Subscribe to Posts [Atom]