wxPython & asyncio

since the AI upgrade 3.11 of asyncio the invocation is straight forward… :sunglasses:
a somewhat more illustrating version: by increasing the number of threads (line 113) the Gui gets more sticky (not so with the 100,000 tasks in one thread) :joy:

import datetime
import threading as thd
import asyncio
import wx

# not using wx.CallAfter runs into trouble at times
# using wx.CallAfter makes the difference less, but still above 10x

async def main(evh, single_thread):

    async def display_date():
        while True:
            now = f'{datetime.datetime.now()}   {thd.get_ident()}'
            evh.disp_async(now)
            # wx.CallAfter(evh.disp_async, now)
            await asyncio.sleep(1)

    async def exception():
        n = 0
        try:
            while True:
                now = f'{datetime.datetime.now()}   {thd.get_ident()} +++++'
                evh.disp_async(now)
                # wx.CallAfter(evh.disp_async, now)
                await asyncio.sleep(1)
                n += 1
                if n > 5: raise Exception
        except Exception:
            evh.disp_status('exception raised +++++')

    async with asyncio.TaskGroup() as tg:
        evh.btn_stop.Enable(True)
        if single_thread:
            for _ in range(100000):
                task = tg.create_task(display_date())
                evh.reg_task((tg, task))
        else:
            task = tg.create_task(display_date())
            evh.reg_task((tg,task))
            task = tg.create_task(exception())
            evh.reg_task((tg,task))
    evh.btn_stop.Enable(False)
    evh.btn_single_thread.Enable(True)
    evh.btn_start.Enable(True)

class Gui(wx.Frame):

    def __init__(self, parent):
        super().__init__(parent, title='wx & asyncio')

        self.sb = self.CreateStatusBar(style=wx.SB_FLAT)
        pnl = wx.Panel(self)
        vbox = wx.BoxSizer(wx.VERTICAL)
        hbox = wx.BoxSizer(wx.HORIZONTAL)
        self.btn_single_thread = wx.Button(pnl, id=1,
                                            label='one thread for each task')
        hbox.Add(self.btn_single_thread, 0, wx.LEFT|wx.TOP, 10)
        vbox.Add(hbox)
        hbox = wx.BoxSizer(wx.HORIZONTAL)
        self.btn_start = wx.Button(pnl, id=2, label='Start')
        hbox.Add(self.btn_start, 0, wx.LEFT|wx.TOP, 10)
        self.btn_stop = wx.Button(pnl, id= 3, label='Stop')
        hbox.Add(self.btn_stop, 0, wx.LEFT|wx.TOP, 10)
        hbox.AddSpacer(20)
        hbox.Add(wx.Button(pnl, label='task - thread info'),
                                    0, wx.LEFT|wx.TOP, 10)
        vbox.Add(hbox)
        hbox = wx.BoxSizer(wx.HORIZONTAL)
        self.disp_text =  wx.StaticText(pnl)
        hbox.Add(self.disp_text, 0, wx.LEFT|wx.TOP, 10)
        vbox.Add(hbox)
        pnl.SetSizer(vbox)
        self.Bind(wx.EVT_BUTTON, self.button)
        self.Bind(wx.EVT_CLOSE, self.on_quit)

        self.Centre()
        self.Show()
        self.btn_stop.Enable(False)

        self.tasks = []
        self.thd_t_lock = thd.Lock()
        self.single_thread = False
        self.thd_async = None

    def on_quit(self, _):
        for entry in self.tasks:
            if entry[1].done() or entry[1].cancelled():
                continue
            entry[1].cancel()
        if self.thd_async and self.thd_async.is_alive():
            self.disp_status('some tasks still running.. try again')
        else:
            self.Destroy()

    def button(self, evt):
        self.disp_status('')
        if evt.Id == 1:
            btn = evt.GetEventObject()
            if self.single_thread:
                btn.SetLabel('one thread for each task')
                self.single_thread = False
            else:
                btn.SetLabel('one thread for all tasks')
                self.single_thread = True
        elif evt.Id == 2:
            if not (self.thd_async and self.thd_async.is_alive()):
                self.btn_start.Enable(False)
                self.btn_single_thread.Enable(False)
                self.tasks.clear()
                if self.single_thread:
                    self.thd_async = RunAsync(self, self.single_thread)
                else:
                    for _ in range(500):
                        self.thd_async = RunAsync(self, self.single_thread)
        elif evt.Id == 3:
            for entry in self.tasks:
                entry[1].cancel()
            self.disp_async('')
        else:
            n = 0
            for entry in self.tasks:
                if not entry[1].done():
                    n += 1
            self.disp_status(
                f'tasks not done {n:,} - threads {thd.active_count()}')

    def reg_task(self, task):
        with self.thd_t_lock:
            self.tasks.append(task)

    def disp_async(self, txt):
        self.disp_text.SetLabel(txt)

    def disp_status(self, status):
        self.sb.SetStatusText(status)

class RunAsync(thd.Thread):

    def __init__(self, evh, single_thread):
        # super().__init__(daemon=True)
        super().__init__()
        self.evh = evh
        self.single_thread = single_thread
        self.start()

    def run(self):
        asyncio.run(main(self.evh, self.single_thread))

app = wx.App()
Gui(None)
app.MainLoop()