1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 """
18 Threaded Worker Model
19 =====================
20
21 Here's the threadpool handling implemented.
22 """
23 __author__ = u"Andr\xe9 Malo"
24 __docformat__ = "restructuredtext en"
25
26 import collections as _collections
27 import errno as _errno
28 import os as _os
29 import signal as _signal
30 import socket as _socket
31 import sys as _sys
32 import thread as _thread
33 import threading as _threading
34 import time as _time
35 import traceback as _traceback
36
37 from wtf import autoreload as _reload
38 from wtf import impl as _impl
39 from wtf import osutil as _osutil
40 from wtf import app as _app
41 from wtf.opi import worker as _worker
42
43
45 """ SIGTERM received """
46
47
49 """
50 Implement threadpool worker model
51
52 :See: `wtf.opi.worker.WorkerInterface`
53 """
54 __implements__ = [_worker.WorkerInterface]
55 sig_hup = True
56
58 """
59 Initialization
60
61 :See: `wtf.opi.worker.WorkerInterface`
62 """
63 self.config, self.opts, self.args = config, opts, args
64
65 - def setup(self, sock, prerun, parent_cleanup, child_cleanup):
66 """
67 Initialization
68
69 :See: `wtf.opi.worker.WorkerInterface`
70 """
71 return WorkerChild(self, sock, prerun, parent_cleanup, child_cleanup)
72
73
75 """ Worker pool implementation """
76 __implements__ = [_worker.WorkerPoolInterface]
77 _pid, _usergroup = None, None
78
79 - def __init__(self, model, sock, prerun, parent_cleanup, child_cleanup):
80 """
81 Initialization
82
83 :Parameters:
84 - `model`: The worker model implementation
85 - `sock`: Main socket
86 - `prerun`: Prerunner (maybe ``None``)
87 - `parent_cleanup`: Parent cleanup function (maybe ``None``)
88 - `child_cleanup`: Child cleanup function (maybe ``None``)
89
90 :Types:
91 - `model`: `ThreadedWorker`
92 - `sock`: ``socket.socket``
93 - `prerun`: ``callable``
94 - `parent_cleanup`: ``callable``
95 - `child_cleanup`: ``callable``
96 """
97 self.model = model
98 self.sock = sock
99 self.prerun = prerun
100 self.parent_cleanup = parent_cleanup
101 self.child_cleanup = child_cleanup
102 if 'user' in model.config.wtf:
103 self._usergroup = model.config.wtf.user, model.config.wtf.group
104
106 """
107 Pool runner
108
109 :See: `wtf.opi.worker.WorkerPoolInterface`
110 """
111
112
113 oldpid, self._pid = self._pid, None
114 prerun, self.prerun = self.prerun, None
115 parent_cleanup, self.parent_cleanup = self.parent_cleanup, None
116 self._pid = _os.fork()
117 if self._pid == 0:
118 try:
119 try:
120 if self._usergroup:
121 _osutil.change_identity(*self._usergroup)
122 _signal.signal(_signal.SIGINT, _signal.SIG_IGN)
123 _signal.signal(_signal.SIGHUP, _signal.SIG_IGN)
124 if self.child_cleanup is not None:
125 self.child_cleanup()
126
127 model = self.model
128 config, opts, args = model.config, model.opts, model.args
129 reload_checker = _reload.Autoreload(config, opts, args)
130 impl = _impl.factory(config, opts, args)
131 app = _app.factory(config, opts, args)
132
133 try:
134 pool = ThreadPool(
135 self, reload_checker, impl, app.call
136 )
137 if prerun is not None:
138 prerun()
139 pool.run()
140 finally:
141 app.shutdown()
142 except SystemExit, e:
143 _os._exit(e.code or 0)
144 except:
145 _traceback.print_exc()
146 _os._exit(1)
147 finally:
148 _os._exit(0)
149 else:
150 if parent_cleanup is not None:
151 parent_cleanup()
152 if oldpid is not None:
153 _os.kill(oldpid, _signal.SIGTERM)
154 _os.waitpid(oldpid, 0)
155 self._pid, (_, code) = None, _os.waitpid(self._pid, 0)
156 if _os.WIFEXITED(code):
157 code = _os.WEXITSTATUS(code)
158 if code == _reload.ReloadRequested.CODE:
159 raise _reload.ReloadRequested()
160
162 """
163 Pool shutdown
164
165 :See: `wtf.opi.worker.WorkerPoolInterface`
166 """
167 oldpid, self._pid = self._pid, None
168 if oldpid is not None:
169 try:
170 _os.kill(oldpid, _signal.SIGTERM)
171 except OSError, e:
172 if e[0] != _errno.ESRCH:
173 raise
174 else:
175 _os.waitpid(oldpid, 0)
176
177
179 """
180 Dynamic threadpool implementation
181
182 :IVariables:
183 - `sock`: Main socket
184 - `impl`: WSGI implementation
185 - `app`: WSGI application
186 - `maxthreads`: Hard limit of number of threads
187 - `maxspare`: Maximum number of idle threads
188 (remaining ones are killed off)
189 - `minspare`: Minimum number of idel threads
190 (new threads are started if the threshold is reached)
191 - `maxqueue`: Maximum of jobs in the queue (if no thread is available).
192 The queue blocks if maxqueue is reached.
193
194 :Types:
195 - `sock`: ``socket.socket``
196 - `impl`: `wtf.impl.ServerInterface`
197 - `app`: ``callable``
198 - `maxthreads`: ``int``
199 - `maxspare`: ``int``
200 - `minspare`: ``int``
201 - `maxqueue`: ``int``
202 """
203
204 - def __init__(self, workerchild, reload_checker, impl, app):
205 """
206 Initialization
207
208 :Parameters:
209 - `workerchild`: Worker pool implementation
210 - `reload_checker`: Reload checker
211 - `impl`: WSGI implementation
212 - `app`: WSGI application
213
214 :Types:
215 - `workerchild`: `WorkerChild`
216 - `reload_checker`: `wtf.autoreload.Autoreload`
217 - `impl`: `wtf.impl.ServerInterface`
218 - `app`: ``callable``
219 """
220 config = workerchild.model.config
221 self.sock = workerchild.sock
222 self.reload_checker, self.impl, self.app = reload_checker, impl, app
223 self.maxthreads = max(1, config.wtf('maxthreads', 5))
224 self.maxspare = min(
225 self.maxthreads, max(1, config.wtf('maxspare', 4))
226 )
227 self.minspare = min(
228 self.maxspare, max(1, config.wtf('minspare', 1))
229 )
230 self.maxqueue = max(1, config.wtf('maxqueue', 1))
231
233 """ Run the pool infinitely """
234 def termhandler(*args):
235 """ Act on SIGTERM """
236 _signal.signal(_signal.SIGTERM, _signal.SIG_IGN)
237 raise SigTerm()
238
239 queue, accept = JobWorkerQueue(self), self.sock.accept
240 need_reload = self.reload_checker.check
241 try:
242 try:
243 _signal.signal(_signal.SIGTERM, termhandler)
244 queue.startup()
245 while True:
246 task = accept()
247 changed = need_reload()
248 if changed:
249 print >> _sys.stderr, (
250 "Application reload requested by mtime change "
251 "of module(s):\n * %s" % "\n * ".join(changed)
252 )
253 queue.shutdown()
254 self._force_reload(task)
255 raise _reload.ReloadRequested()
256 queue.put_task(task)
257 except SigTerm:
258 pass
259 finally:
260 queue.shutdown()
261
263 """
264 Force the application reload and handle the one accepted socket
265
266 This method forks the process and proxies the socket to the new one.
267
268 :Parameters:
269 - `accepted`: Accepted socket
270
271 :Types:
272 - `accepted`: ``tuple``
273
274 :Exceptions:
275 - `ReloadRequested`: raised in parent to stop further processing
276 """
277
278
279 osock, _ = accepted
280 rfd, wfd = _os.pipe()
281
282 if _os.fork() != 0:
283 osock.close()
284 return
285
286
287 try:
288 try:
289 _os.close(wfd)
290 _os.read(rfd, 1)
291 _os.close(rfd)
292
293 _osutil.close_descriptors(osock.fileno())
294 sockname = osock.getsockname()
295 if isinstance(sockname, str):
296 family = _socket.AF_UNIX
297 elif len(sockname) == 2:
298 family = _socket.AF_INET
299 else:
300 family = _socket.AF_INET6
301
302
303 sock = _socket.fromfd(
304 osock.fileno(), family, _socket.SOCK_STREAM
305 )
306 osock.close()
307 psock = _socket.socket(family, _socket.SOCK_STREAM)
308 psock.settimeout(10)
309 psock.connect(sockname)
310 psock.settimeout(0)
311 sock.settimeout(0)
312 _osutil.disable_nagle(psock)
313
314 import select as _select
315 rset = [sock.fileno(), psock.fileno()]
316 wset = set()
317 peers = {
318 sock.fileno(): psock.fileno(),
319 psock.fileno(): sock.fileno(),
320 }
321 socks = {sock.fileno(): sock, psock.fileno(): psock}
322 buf = {sock.fileno(): [], psock.fileno(): []}
323 wwait = {}
324 while rset or wset:
325 for fd, flag in wwait.iteritems():
326 if flag:
327 wset.add(fd)
328 wwait[fd] = (wwait[fd] + 1) % 2
329 rfds, wfds, _ = _select.select(rset, wset, [], 1)
330 for fd in rfds:
331 sock = socks[fd]
332 try:
333 data = sock.recv(8192)
334 if data:
335 buf[fd].append(data)
336 wset.add(peers[fd])
337 else:
338 rset.remove(fd)
339 wwait[fd] = 1
340 sock.shutdown(_socket.SHUT_RD)
341 except _socket.error, e:
342 if e[0] != _errno.EAGAIN:
343 raise
344
345 for fd in wfds:
346 sock = socks[fd]
347 try:
348 data = ''.join(buf[peers[fd]])
349 if data:
350 numsent = sock.send(data)
351 data = data[numsent:]
352 else:
353 rset.remove(peers[fd])
354 socks[peers[fd]].shutdown(_socket.SHUT_RD)
355 if data:
356 buf[peers[fd]] = [data]
357 else:
358 buf[peers[fd]] = []
359 wset.remove(fd)
360 if peers[fd] not in rset:
361 sock.shutdown(_socket.SHUT_WR)
362 except _socket.error, e:
363 if e[0] != _errno.EAGAIN:
364 raise
365
366 if len(rset) + len(wset) == 1:
367 if sum(map(len, buf.itervalues())) > 0:
368 continue
369 break
370
371 for sock in socks.itervalues():
372 sock.close()
373 except SystemExit:
374 raise
375 except:
376 _traceback.print_exc()
377 finally:
378 _os._exit(0)
379
380
382 """
383 Flag container for workers
384
385 :IVariables:
386 - `_shutdown`: Shutdown flag and mutex (``(bool, threading.Lock)``)
387 """
388 __implements__ = [_impl.FlagsInterface]
389 multithread = True
390 multiprocess = False
391 run_once = False
392
394 """
395 Initialization
396
397 :Parameters:
398 - `shutdown`: Initial state of shutdown flag
399
400 :Types:
401 - `shutdown`: ``bool``
402 """
403 self._shutdown = bool(shutdown), _threading.Lock()
404
406 """
407 Set and/or retrieve shutdown flag
408
409 :Parameters:
410 - `flag`: The new shutdown flag value (or ``None``)
411
412 :Types:
413 - `flag`: ``bool``
414
415 :return: The previous state of the flag
416 :rtype: ``bool``
417 """
418
419 lock = self._shutdown[1]
420 lock.acquire()
421 try:
422 oldflag = self._shutdown[0]
423 if flag is not None:
424 self._shutdown = bool(flag), lock
425 return oldflag
426 finally:
427 lock.release()
428
429
431 """ Combined management of jobs and workers """
432
434 """
435 Initialization
436
437 :Parameters:
438 - `pool`: thread pool instance
439
440 :Types:
441 - `pool`: `ThreadPool`
442 """
443 self.pool = pool
444 self.flags = Flags()
445 self._tasks = _collections.deque()
446 self._runners = set()
447 self._idle = set()
448 self._lock = _threading.Lock()
449 self._not_empty = _threading.Condition(self._lock)
450 self._not_full = _threading.Condition(self._lock)
451
453 """ Start the queue """
454 self._not_full.acquire()
455 try:
456 while len(self._idle) < self.pool.maxspare:
457 TaskRunner(self).start()
458 finally:
459 self._not_full.release()
460
462 """ Shutdown the queue - finish all threads """
463 self.flags.shutdown(True)
464 self._not_full.acquire()
465 try:
466 self._tasks.extendleft([None] * (len(self._runners) + 1))
467 self._not_empty.notify()
468 finally:
469 self._not_full.release()
470
472 """
473 Put a new task into the queue
474
475 This function blocks until there's actually space in the queue.
476
477 :Parameters:
478 - `task`: The task to put, if ``None``, the receiving runner
479 should finish
480
481 :Types:
482 - `task`: any
483 """
484 self._not_full.acquire()
485 try:
486 while (len(self._idle) < self.pool.minspare and
487 len(self._runners) < self.pool.maxthreads):
488 TaskRunner(self).start()
489
490
491 while True:
492 if self._idle \
493 or len(self._runners) < self.pool.maxthreads \
494 or len(self._tasks) < self.pool.maxqueue:
495 break
496 self._not_full.wait()
497
498
499 self._tasks.appendleft(task)
500 self._not_empty.notify()
501 finally:
502 self._not_full.release()
503
505 """
506 Get the next task out of the queue.
507
508 This function blocks until there's actually a task available.
509
510 :return: The new task, if ``None``, the receiving runner should finish
511 :rtype: any
512 """
513 self._not_empty.acquire()
514 try:
515 self._idle.add(runner)
516 try:
517 while not self._tasks:
518 if len(self._idle) > self.pool.maxspare:
519 task = None
520 break
521 self._not_empty.wait()
522 else:
523 task = self._tasks.pop()
524 finally:
525 self._idle.remove(runner)
526 self._not_full.notify()
527 return task
528 finally:
529 self._not_empty.release()
530
532 """
533 Add a runner to the set
534
535 :Parameters:
536 - `runner`: The runner to add
537
538 :Types:
539 - `runner`: `TaskRunner`
540 """
541 self._runners.add(runner)
542 self._idle.add(runner)
543
545 """
546 Remove a runner from the list
547
548 :Parameters:
549 - `runner`: The runner to remove
550
551 :Types:
552 - `runner`: `TaskRunner`
553 """
554 self._runners -= set([runner])
555
556
558 """ Run tasks until requested to be finished """
559
561 """
562 Initialization
563
564 :Parameters:
565 - `queue`: Queue instance
566
567 :Types:
568 - `queue`: `JobWorkerQueue`
569 """
570 self._queue = queue
571
573 """ Start the worker thread """
574 queue = self._queue
575 get_task, flags, unregister = \
576 queue.get_task, queue.flags, queue.unregister
577 handle, app = queue.pool.impl.handle, queue.pool.app
578
579 def work():
580 """ Wait for tasks and run them """
581 try:
582 while True:
583 task = get_task(self)
584 if task is None:
585 break
586 try:
587 handle(task, app, flags)
588 except:
589 _sys.stderr.write(
590 "Uncaught exception in worker thread:\n" +
591 _traceback.format_exc()
592 )
593 break
594 finally:
595 unregister(self)
596
597 queue.register(self)
598 try:
599 _thread.start_new_thread(work, ())
600 except:
601 unregister(self)
602 raise
603 _time.sleep(0.000001)
604