Hi, I want to make sure I’m not shooting myself in the foot before I get too far into my current project with wxPython. The code listed below works and meets my needs but I wanted to make sure I wasn’t just getting lucky each time.
import random
import threading
import time
import wx
from concurrent.futures import ThreadPoolExecutor
class MyForm(wx.Frame):
def __init__(self):
wx.Frame.__init__(self, None, wx.ID_ANY, title='Thread Testing')
self.SetInitialSize(wx.Size(600, 400))
self.threads_created = 0
self.Bind(wx.EVT_CLOSE, self.on_exit)
self.executor = ThreadPoolExecutor(max_workers=3)
self.panel = wx.Panel(self, wx.ID_ANY)
self.results_sizer = wx.BoxSizer(wx.VERTICAL)
self.sizer = wx.BoxSizer(wx.VERTICAL)
btn_sizer = wx.BoxSizer(wx.HORIZONTAL)
run_btn = wx.Button(self.panel, wx.ID_ANY, 'Add Thread')
self.Bind(wx.EVT_BUTTON, self.run, run_btn)
btn_sizer.Add(run_btn)
print_btn = wx.Button(self.panel, wx.ID_ANY, 'Print')
self.Bind(wx.EVT_BUTTON, self.print_to_terminal, print_btn)
btn_sizer.Add(print_btn)
self.sizer.Add(btn_sizer)
self.sizer.Add(self.results_sizer, 0, wx.EXPAND)
self.panel.SetSizer(self.sizer)
self.exit = False
def run(self, evt):
print('Run btn pressed.')
# Occurs in the main thread so no need for locks
self.threads_created += 1
# This could be a very long running task
t = threading.Thread(
target=self.create_and_initiate_gauge, args=(self.threads_created,)
)
t.start()
def create_and_initiate_gauge(self, thread_num):
# Doesn't modify main GUI; no need for CallAfter
sizer = wx.BoxSizer(wx.HORIZONTAL)
# Add Spacer
wx.CallAfter(sizer.AddSpacer, 15)
# Two-step Create Gauge
gauge = wx.Gauge()
wx.CallAfter(gauge.Create, self.panel)
wx.CallAfter(sizer.Add, gauge, 1, wx.EXPAND)
# Add Spacer
wx.CallAfter(sizer.AddSpacer, 35)
# Two-step create StaticText
text = wx.StaticText()
wx.CallAfter(text.Create, self.panel)
wx.CallAfter(text.SetLabel, f'Job: {thread_num}')
wx.CallAfter(sizer.Add, text, 0, wx.EXPAND)
# Add Spacer
wx.CallAfter(sizer.AddSpacer, 15)
# Add thread created Gauge and StaticText to main GUI
wx.CallAfter(self.results_sizer.Add, sizer, 0, wx.EXPAND)
# Update Layout
wx.CallAfter(self.sizer.Layout)
# Demonstrate widget passing between threads
self.executor.submit(self.update_gauge, gauge, text)
def update_gauge(self, gauge, text):
sleep = random.randint(1, 10) / 10
rate = random.randint(10, 100) / 10
completed = 0
while completed < 100:
if self.exit:
return
time.sleep(sleep)
wx.CallAfter(gauge.SetValue, int(completed))
completed += rate
if gauge.GetValue() != 100:
wx.CallAfter(gauge.SetValue, 100)
wx.CallAfter(text.SetLabel, 'COMPLETED')
wx.CallAfter(self.results_sizer.Layout)
def print_to_terminal(self, evt):
# Demonstrates the main GUI is not blocked
print('Print button pressed.')
def on_exit(self, evt):
# Stop and cleanup all executor tasks on Exit
self.exit = True
self.executor.shutdown(wait=False, cancel_futures=True)
self.Destroy()
if __name__ == '__main__':
app = wx.App()
frame = MyForm().Show()
app.MainLoop()```
Thanks for any help or advice.