well, if the customers don’t know what they want (as a highlight the TPE is managing thread reuse), the magic is all yours (may not all be fun though, but comments should always be welcome)
import threading as thd
import random
import wx
class Gui(wx.Frame):
def __init__(self):
super().__init__(None, wx.ID_ANY, size=(600, 400),
title='Thread Testing')
self.sb = self.CreateStatusBar(style=wx.SB_FLAT)
self.sb_timer = None
self.pnl = wx.Panel(self)
vbox = wx.BoxSizer(wx.VERTICAL)
hbox = wx.BoxSizer(wx.HORIZONTAL)
btn = wx.Button(self.pnl, wx.ID_ANY, 'Add Thread')
btn.Bind(wx.EVT_BUTTON, self.add_gauge)
hbox.Add(btn)
btn = wx.Button(self.pnl, wx.ID_ANY, 'Thread Info')
btn.Bind(wx.EVT_BUTTON, self.thread_info)
hbox.Add(btn)
vbox.Add(hbox)
self.results_vbox = wx.BoxSizer(wx.VERTICAL)
vbox.Add(self.results_vbox, 0, wx.EXPAND)
self.pnl.SetSizer(vbox)
self.Bind(wx.EVT_CLOSE, self.evt_close)
self.exit = thd.Event()
self.thread_num, self.ref_cnt = 0, 0
self.Show()
def add_gauge(self, _):
hbox = wx.BoxSizer(wx.HORIZONTAL)
gauge = wx.Gauge(self.pnl)
hbox.Add(gauge, 1, wx.EXPAND|wx.RIGHT, 35)
text = wx.StaticText(self.pnl)
self.thread_num += 1
text.SetLabel(f'Job: {self.thread_num}')
hbox.Add(text, 0, wx.EXPAND|wx.RIGHT, 15)
self.results_vbox.Add(hbox, 0, wx.EXPAND|wx.LEFT, 15)
self.pnl.Layout()
self.ref_cnt += 1
Job(self.exit, self.layout_result, gauge, text).start()
def layout_result(self):
self.results_vbox.Layout()
self.ref_cnt -= 1
def thread_info(self, _):
for entry in thd.enumerate():
print(entry)
def evt_close(self, _):
if self.ref_cnt:
if not self.sb_timer:
self.sb.SetBackgroundColour('red')
self.sb.SetStatusText('jobs are still running..')
self.sb_timer = thd.Timer(1.5, self.auto_reset_sb)
self.sb_timer.start()
else:
self.exit.set()
if self.sb_timer:
self.sb_timer.cancel()
self.Destroy()
def auto_reset_sb(self):
self.sb.SetBackgroundColour(None)
self.sb.SetStatusText('')
self.sb_timer = None
class Job(thd.Thread):
def __init__(self, evt, lot_res, gauge, text):
super().__init__()
self.exit = evt
self.layout_result = lot_res
self.gauge = gauge
self.text = text
def run(self):
sleep = random.randint(1, 10) / 10
rate = random.randint(10, 100) / 10
completed = 0
while True:
self.exit.wait(sleep)
if self.exit.is_set():
break
if completed < 100:
self.gauge.SetValue(int(completed))
completed += rate
else:
if self.gauge.GetValue() != 100:
self.gauge.SetValue(100)
self.text.SetLabel('COMPLETED')
wx.CallAfter(self.layout_result)
break
app = wx.App()
Gui()
app.MainLoop()