since the AI upgrade 3.11 of asyncio the invocation is straight forward…
a somewhat more illustrating version: by increasing the number of threads (line 113) the Gui gets more sticky (not so with the 100,000 tasks in one thread)
import datetime
import threading as thd
import asyncio
import wx
# not using wx.CallAfter runs into trouble at times
# using wx.CallAfter makes the difference less, but still above 10x
async def main(evh, single_thread):
async def display_date():
while True:
now = f'{datetime.datetime.now()} {thd.get_ident()}'
evh.disp_async(now)
# wx.CallAfter(evh.disp_async, now)
await asyncio.sleep(1)
async def exception():
n = 0
try:
while True:
now = f'{datetime.datetime.now()} {thd.get_ident()} +++++'
evh.disp_async(now)
# wx.CallAfter(evh.disp_async, now)
await asyncio.sleep(1)
n += 1
if n > 5: raise Exception
except Exception:
evh.disp_status('exception raised +++++')
async with asyncio.TaskGroup() as tg:
evh.btn_stop.Enable(True)
if single_thread:
for _ in range(100000):
task = tg.create_task(display_date())
evh.reg_task((tg, task))
else:
task = tg.create_task(display_date())
evh.reg_task((tg,task))
task = tg.create_task(exception())
evh.reg_task((tg,task))
evh.btn_stop.Enable(False)
evh.btn_single_thread.Enable(True)
evh.btn_start.Enable(True)
class Gui(wx.Frame):
def __init__(self, parent):
super().__init__(parent, title='wx & asyncio')
self.sb = self.CreateStatusBar(style=wx.SB_FLAT)
pnl = wx.Panel(self)
vbox = wx.BoxSizer(wx.VERTICAL)
hbox = wx.BoxSizer(wx.HORIZONTAL)
self.btn_single_thread = wx.Button(pnl, id=1,
label='one thread for each task')
hbox.Add(self.btn_single_thread, 0, wx.LEFT|wx.TOP, 10)
vbox.Add(hbox)
hbox = wx.BoxSizer(wx.HORIZONTAL)
self.btn_start = wx.Button(pnl, id=2, label='Start')
hbox.Add(self.btn_start, 0, wx.LEFT|wx.TOP, 10)
self.btn_stop = wx.Button(pnl, id= 3, label='Stop')
hbox.Add(self.btn_stop, 0, wx.LEFT|wx.TOP, 10)
hbox.AddSpacer(20)
hbox.Add(wx.Button(pnl, label='task - thread info'),
0, wx.LEFT|wx.TOP, 10)
vbox.Add(hbox)
hbox = wx.BoxSizer(wx.HORIZONTAL)
self.disp_text = wx.StaticText(pnl)
hbox.Add(self.disp_text, 0, wx.LEFT|wx.TOP, 10)
vbox.Add(hbox)
pnl.SetSizer(vbox)
self.Bind(wx.EVT_BUTTON, self.button)
self.Bind(wx.EVT_CLOSE, self.on_quit)
self.Centre()
self.Show()
self.btn_stop.Enable(False)
self.tasks = []
self.thd_t_lock = thd.Lock()
self.single_thread = False
self.thd_async = None
def on_quit(self, _):
for entry in self.tasks:
if entry[1].done() or entry[1].cancelled():
continue
entry[1].cancel()
if self.thd_async and self.thd_async.is_alive():
self.disp_status('some tasks still running.. try again')
else:
self.Destroy()
def button(self, evt):
self.disp_status('')
if evt.Id == 1:
btn = evt.GetEventObject()
if self.single_thread:
btn.SetLabel('one thread for each task')
self.single_thread = False
else:
btn.SetLabel('one thread for all tasks')
self.single_thread = True
elif evt.Id == 2:
if not (self.thd_async and self.thd_async.is_alive()):
self.btn_start.Enable(False)
self.btn_single_thread.Enable(False)
self.tasks.clear()
if self.single_thread:
self.thd_async = RunAsync(self, self.single_thread)
else:
for _ in range(500):
self.thd_async = RunAsync(self, self.single_thread)
elif evt.Id == 3:
for entry in self.tasks:
entry[1].cancel()
self.disp_async('')
else:
n = 0
for entry in self.tasks:
if not entry[1].done():
n += 1
self.disp_status(
f'tasks not done {n:,} - threads {thd.active_count()}')
def reg_task(self, task):
with self.thd_t_lock:
self.tasks.append(task)
def disp_async(self, txt):
self.disp_text.SetLabel(txt)
def disp_status(self, status):
self.sb.SetStatusText(status)
class RunAsync(thd.Thread):
def __init__(self, evh, single_thread):
# super().__init__(daemon=True)
super().__init__()
self.evh = evh
self.single_thread = single_thread
self.start()
def run(self):
asyncio.run(main(self.evh, self.single_thread))
app = wx.App()
Gui(None)
app.MainLoop()