Package wtf :: Package opi :: Package worker :: Module threaded
[hide private]
[frames] | no frames]

Source Code for Module wtf.opi.worker.threaded

  1  # -*- coding: ascii -*- 
  2  # 
  3  # Copyright 2006-2012 
  4  # Andr\xe9 Malo or his licensors, as applicable 
  5  # 
  6  # Licensed under the Apache License, Version 2.0 (the "License"); 
  7  # you may not use this file except in compliance with the License. 
  8  # You may obtain a copy of the License at 
  9  # 
 10  #     http://www.apache.org/licenses/LICENSE-2.0 
 11  # 
 12  # Unless required by applicable law or agreed to in writing, software 
 13  # distributed under the License is distributed on an "AS IS" BASIS, 
 14  # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
 15  # See the License for the specific language governing permissions and 
 16  # limitations under the License. 
 17  """ 
 18  Threaded Worker Model 
 19  ===================== 
 20   
 21  Here's the threadpool handling implemented. 
 22  """ 
 23  __author__ = u"Andr\xe9 Malo" 
 24  __docformat__ = "restructuredtext en" 
 25   
 26  import collections as _collections 
 27  import errno as _errno 
 28  import os as _os 
 29  import signal as _signal 
 30  import socket as _socket 
 31  import sys as _sys 
 32  import thread as _thread 
 33  import threading as _threading 
 34  import time as _time 
 35  import traceback as _traceback 
 36   
 37  from wtf import autoreload as _reload 
 38  from wtf import impl as _impl 
 39  from wtf import osutil as _osutil 
 40  from wtf import app as _app 
 41  from wtf.opi import worker as _worker 
 42   
 43   
44 -class SigTerm(SystemExit):
45 """ SIGTERM received """
46 47
48 -class ThreadedWorker(object):
49 """ 50 Implement threadpool worker model 51 52 :See: `wtf.opi.worker.WorkerInterface` 53 """ 54 __implements__ = [_worker.WorkerInterface] 55 sig_hup = True 56
57 - def __init__(self, config, opts, args):
58 """ 59 Initialization 60 61 :See: `wtf.opi.worker.WorkerInterface` 62 """ 63 self.config, self.opts, self.args = config, opts, args
64
65 - def setup(self, sock, prerun, parent_cleanup, child_cleanup):
66 """ 67 Initialization 68 69 :See: `wtf.opi.worker.WorkerInterface` 70 """ 71 return WorkerChild(self, sock, prerun, parent_cleanup, child_cleanup)
72 73
74 -class WorkerChild(object):
75 """ Worker pool implementation """ 76 __implements__ = [_worker.WorkerPoolInterface] 77 _pid, _usergroup = None, None 78
79 - def __init__(self, model, sock, prerun, parent_cleanup, child_cleanup):
80 """ 81 Initialization 82 83 :Parameters: 84 - `model`: The worker model implementation 85 - `sock`: Main socket 86 - `prerun`: Prerunner (maybe ``None``) 87 - `parent_cleanup`: Parent cleanup function (maybe ``None``) 88 - `child_cleanup`: Child cleanup function (maybe ``None``) 89 90 :Types: 91 - `model`: `ThreadedWorker` 92 - `sock`: ``socket.socket`` 93 - `prerun`: ``callable`` 94 - `parent_cleanup`: ``callable`` 95 - `child_cleanup`: ``callable`` 96 """ 97 self.model = model 98 self.sock = sock 99 self.prerun = prerun 100 self.parent_cleanup = parent_cleanup 101 self.child_cleanup = child_cleanup 102 if 'user' in model.config.wtf: 103 self._usergroup = model.config.wtf.user, model.config.wtf.group
104
105 - def run(self):
106 """ 107 Pool runner 108 109 :See: `wtf.opi.worker.WorkerPoolInterface` 110 """ 111 # pylint: disable = R0912 112 113 oldpid, self._pid = self._pid, None 114 prerun, self.prerun = self.prerun, None 115 parent_cleanup, self.parent_cleanup = self.parent_cleanup, None 116 self._pid = _os.fork() 117 if self._pid == 0: # child 118 try: 119 try: 120 if self._usergroup: 121 _osutil.change_identity(*self._usergroup) 122 _signal.signal(_signal.SIGINT, _signal.SIG_IGN) 123 _signal.signal(_signal.SIGHUP, _signal.SIG_IGN) 124 if self.child_cleanup is not None: 125 self.child_cleanup() 126 127 model = self.model 128 config, opts, args = model.config, model.opts, model.args 129 reload_checker = _reload.Autoreload(config, opts, args) 130 impl = _impl.factory(config, opts, args) 131 app = _app.factory(config, opts, args) 132 133 try: 134 pool = ThreadPool( 135 self, reload_checker, impl, app.call 136 ) 137 if prerun is not None: 138 prerun() 139 pool.run() 140 finally: 141 app.shutdown() 142 except SystemExit, e: 143 _os._exit(e.code or 0) # pylint: disable = W0212 144 except: 145 _traceback.print_exc() 146 _os._exit(1) # pylint: disable = W0212 147 finally: 148 _os._exit(0) # pylint: disable = W0212 149 else: # parent 150 if parent_cleanup is not None: 151 parent_cleanup() 152 if oldpid is not None: 153 _os.kill(oldpid, _signal.SIGTERM) 154 _os.waitpid(oldpid, 0) 155 self._pid, (_, code) = None, _os.waitpid(self._pid, 0) 156 if _os.WIFEXITED(code): 157 code = _os.WEXITSTATUS(code) 158 if code == _reload.ReloadRequested.CODE: 159 raise _reload.ReloadRequested()
160
161 - def shutdown(self):
162 """ 163 Pool shutdown 164 165 :See: `wtf.opi.worker.WorkerPoolInterface` 166 """ 167 oldpid, self._pid = self._pid, None 168 if oldpid is not None: 169 try: 170 _os.kill(oldpid, _signal.SIGTERM) 171 except OSError, e: 172 if e[0] != _errno.ESRCH: 173 raise 174 else: 175 _os.waitpid(oldpid, 0)
176 177
178 -class ThreadPool(object):
179 """ 180 Dynamic threadpool implementation 181 182 :IVariables: 183 - `sock`: Main socket 184 - `impl`: WSGI implementation 185 - `app`: WSGI application 186 - `maxthreads`: Hard limit of number of threads 187 - `maxspare`: Maximum number of idle threads 188 (remaining ones are killed off) 189 - `minspare`: Minimum number of idel threads 190 (new threads are started if the threshold is reached) 191 - `maxqueue`: Maximum of jobs in the queue (if no thread is available). 192 The queue blocks if maxqueue is reached. 193 194 :Types: 195 - `sock`: ``socket.socket`` 196 - `impl`: `wtf.impl.ServerInterface` 197 - `app`: ``callable`` 198 - `maxthreads`: ``int`` 199 - `maxspare`: ``int`` 200 - `minspare`: ``int`` 201 - `maxqueue`: ``int`` 202 """ 203
204 - def __init__(self, workerchild, reload_checker, impl, app):
205 """ 206 Initialization 207 208 :Parameters: 209 - `workerchild`: Worker pool implementation 210 - `reload_checker`: Reload checker 211 - `impl`: WSGI implementation 212 - `app`: WSGI application 213 214 :Types: 215 - `workerchild`: `WorkerChild` 216 - `reload_checker`: `wtf.autoreload.Autoreload` 217 - `impl`: `wtf.impl.ServerInterface` 218 - `app`: ``callable`` 219 """ 220 config = workerchild.model.config 221 self.sock = workerchild.sock 222 self.reload_checker, self.impl, self.app = reload_checker, impl, app 223 self.maxthreads = max(1, config.wtf('maxthreads', 5)) 224 self.maxspare = min( 225 self.maxthreads, max(1, config.wtf('maxspare', 4)) 226 ) 227 self.minspare = min( 228 self.maxspare, max(1, config.wtf('minspare', 1)) 229 ) 230 self.maxqueue = max(1, config.wtf('maxqueue', 1))
231
232 - def run(self):
233 """ Run the pool infinitely """ 234 def termhandler(*args): 235 """ Act on SIGTERM """ 236 _signal.signal(_signal.SIGTERM, _signal.SIG_IGN) 237 raise SigTerm()
238 239 queue, accept = JobWorkerQueue(self), self.sock.accept 240 need_reload = self.reload_checker.check 241 try: 242 try: 243 _signal.signal(_signal.SIGTERM, termhandler) 244 queue.startup() 245 while True: 246 task = accept() 247 changed = need_reload() 248 if changed: 249 print >> _sys.stderr, ( 250 "Application reload requested by mtime change " 251 "of module(s):\n * %s" % "\n * ".join(changed) 252 ) 253 queue.shutdown() 254 self._force_reload(task) 255 raise _reload.ReloadRequested() 256 queue.put_task(task) 257 except SigTerm: 258 pass 259 finally: 260 queue.shutdown()
261
262 - def _force_reload(self, accepted):
263 """ 264 Force the application reload and handle the one accepted socket 265 266 This method forks the process and proxies the socket to the new one. 267 268 :Parameters: 269 - `accepted`: Accepted socket 270 271 :Types: 272 - `accepted`: ``tuple`` 273 274 :Exceptions: 275 - `ReloadRequested`: raised in parent to stop further processing 276 """ 277 # pylint: disable = R0912, R0914, R0915 278 279 osock, _ = accepted 280 rfd, wfd = _os.pipe() # need to synchronize 281 282 if _os.fork() != 0: # parent 283 osock.close() 284 return 285 286 # child 287 try: 288 try: 289 _os.close(wfd) 290 _os.read(rfd, 1) # wait for EOF (parent died) 291 _os.close(rfd) 292 293 _osutil.close_descriptors(osock.fileno()) 294 sockname = osock.getsockname() 295 if isinstance(sockname, str): 296 family = _socket.AF_UNIX 297 elif len(sockname) == 2: 298 family = _socket.AF_INET 299 else: 300 family = _socket.AF_INET6 301 302 # dup here, to keep the descriptor IDs low. 303 sock = _socket.fromfd( 304 osock.fileno(), family, _socket.SOCK_STREAM 305 ) 306 osock.close() 307 psock = _socket.socket(family, _socket.SOCK_STREAM) 308 psock.settimeout(10) 309 psock.connect(sockname) 310 psock.settimeout(0) 311 sock.settimeout(0) 312 _osutil.disable_nagle(psock) 313 314 import select as _select 315 rset = [sock.fileno(), psock.fileno()] 316 wset = set() 317 peers = { 318 sock.fileno(): psock.fileno(), 319 psock.fileno(): sock.fileno(), 320 } 321 socks = {sock.fileno(): sock, psock.fileno(): psock} 322 buf = {sock.fileno(): [], psock.fileno(): []} 323 wwait = {} 324 while rset or wset: 325 for fd, flag in wwait.iteritems(): 326 if flag: 327 wset.add(fd) 328 wwait[fd] = (wwait[fd] + 1) % 2 329 rfds, wfds, _ = _select.select(rset, wset, [], 1) 330 for fd in rfds: 331 sock = socks[fd] 332 try: 333 data = sock.recv(8192) 334 if data: 335 buf[fd].append(data) 336 wset.add(peers[fd]) 337 else: 338 rset.remove(fd) 339 wwait[fd] = 1 340 sock.shutdown(_socket.SHUT_RD) 341 except _socket.error, e: 342 if e[0] != _errno.EAGAIN: 343 raise 344 345 for fd in wfds: 346 sock = socks[fd] 347 try: 348 data = ''.join(buf[peers[fd]]) 349 if data: 350 numsent = sock.send(data) 351 data = data[numsent:] 352 else: 353 rset.remove(peers[fd]) 354 socks[peers[fd]].shutdown(_socket.SHUT_RD) 355 if data: 356 buf[peers[fd]] = [data] 357 else: 358 buf[peers[fd]] = [] 359 wset.remove(fd) 360 if peers[fd] not in rset: 361 sock.shutdown(_socket.SHUT_WR) 362 except _socket.error, e: 363 if e[0] != _errno.EAGAIN: 364 raise 365 366 if len(rset) + len(wset) == 1: 367 if sum(map(len, buf.itervalues())) > 0: 368 continue 369 break 370 371 for sock in socks.itervalues(): 372 sock.close() 373 except SystemExit: 374 raise 375 except: 376 _traceback.print_exc() 377 finally: 378 _os._exit(0) # pylint: disable = W0212
379 380
381 -class Flags(object):
382 """ 383 Flag container for workers 384 385 :IVariables: 386 - `_shutdown`: Shutdown flag and mutex (``(bool, threading.Lock)``) 387 """ 388 __implements__ = [_impl.FlagsInterface] 389 multithread = True 390 multiprocess = False 391 run_once = False 392
393 - def __init__(self, shutdown=False):
394 """ 395 Initialization 396 397 :Parameters: 398 - `shutdown`: Initial state of shutdown flag 399 400 :Types: 401 - `shutdown`: ``bool`` 402 """ 403 self._shutdown = bool(shutdown), _threading.Lock()
404
405 - def shutdown(self, flag=None):
406 """ 407 Set and/or retrieve shutdown flag 408 409 :Parameters: 410 - `flag`: The new shutdown flag value (or ``None``) 411 412 :Types: 413 - `flag`: ``bool`` 414 415 :return: The previous state of the flag 416 :rtype: ``bool`` 417 """ 418 # pylint: disable = W0221 419 lock = self._shutdown[1] 420 lock.acquire() 421 try: 422 oldflag = self._shutdown[0] 423 if flag is not None: 424 self._shutdown = bool(flag), lock 425 return oldflag 426 finally: 427 lock.release()
428 429
430 -class JobWorkerQueue(object):
431 """ Combined management of jobs and workers """ 432
433 - def __init__(self, pool):
434 """ 435 Initialization 436 437 :Parameters: 438 - `pool`: thread pool instance 439 440 :Types: 441 - `pool`: `ThreadPool` 442 """ 443 self.pool = pool 444 self.flags = Flags() 445 self._tasks = _collections.deque() 446 self._runners = set() 447 self._idle = set() 448 self._lock = _threading.Lock() 449 self._not_empty = _threading.Condition(self._lock) 450 self._not_full = _threading.Condition(self._lock)
451
452 - def startup(self):
453 """ Start the queue """ 454 self._not_full.acquire() 455 try: 456 while len(self._idle) < self.pool.maxspare: 457 TaskRunner(self).start() 458 finally: 459 self._not_full.release()
460
461 - def shutdown(self):
462 """ Shutdown the queue - finish all threads """ 463 self.flags.shutdown(True) 464 self._not_full.acquire() 465 try: 466 self._tasks.extendleft([None] * (len(self._runners) + 1)) 467 self._not_empty.notify() 468 finally: 469 self._not_full.release()
470
471 - def put_task(self, task):
472 """ 473 Put a new task into the queue 474 475 This function blocks until there's actually space in the queue. 476 477 :Parameters: 478 - `task`: The task to put, if ``None``, the receiving runner 479 should finish 480 481 :Types: 482 - `task`: any 483 """ 484 self._not_full.acquire() 485 try: 486 while (len(self._idle) < self.pool.minspare and 487 len(self._runners) < self.pool.maxthreads): 488 TaskRunner(self).start() 489 490 # wait for space 491 while True: 492 if self._idle \ 493 or len(self._runners) < self.pool.maxthreads \ 494 or len(self._tasks) < self.pool.maxqueue: 495 break 496 self._not_full.wait() 497 498 # ...and queue it 499 self._tasks.appendleft(task) 500 self._not_empty.notify() 501 finally: 502 self._not_full.release()
503
504 - def get_task(self, runner):
505 """ 506 Get the next task out of the queue. 507 508 This function blocks until there's actually a task available. 509 510 :return: The new task, if ``None``, the receiving runner should finish 511 :rtype: any 512 """ 513 self._not_empty.acquire() 514 try: 515 self._idle.add(runner) 516 try: 517 while not self._tasks: 518 if len(self._idle) > self.pool.maxspare: 519 task = None 520 break 521 self._not_empty.wait() 522 else: 523 task = self._tasks.pop() 524 finally: 525 self._idle.remove(runner) 526 self._not_full.notify() 527 return task 528 finally: 529 self._not_empty.release()
530
531 - def register(self, runner):
532 """ 533 Add a runner to the set 534 535 :Parameters: 536 - `runner`: The runner to add 537 538 :Types: 539 - `runner`: `TaskRunner` 540 """ 541 self._runners.add(runner) 542 self._idle.add(runner)
543
544 - def unregister(self, runner):
545 """ 546 Remove a runner from the list 547 548 :Parameters: 549 - `runner`: The runner to remove 550 551 :Types: 552 - `runner`: `TaskRunner` 553 """ 554 self._runners -= set([runner])
555 556
557 -class TaskRunner(object):
558 """ Run tasks until requested to be finished """ 559
560 - def __init__(self, queue):
561 """ 562 Initialization 563 564 :Parameters: 565 - `queue`: Queue instance 566 567 :Types: 568 - `queue`: `JobWorkerQueue` 569 """ 570 self._queue = queue
571
572 - def start(self):
573 """ Start the worker thread """ 574 queue = self._queue 575 get_task, flags, unregister = \ 576 queue.get_task, queue.flags, queue.unregister 577 handle, app = queue.pool.impl.handle, queue.pool.app 578 579 def work(): 580 """ Wait for tasks and run them """ 581 try: 582 while True: 583 task = get_task(self) 584 if task is None: # finish command 585 break 586 try: 587 handle(task, app, flags) 588 except: # pylint: disable = W0702 589 _sys.stderr.write( 590 "Uncaught exception in worker thread:\n" + 591 _traceback.format_exc() 592 ) 593 break 594 finally: 595 unregister(self)
596 597 queue.register(self) 598 try: 599 _thread.start_new_thread(work, ()) 600 except: 601 unregister(self) 602 raise 603 _time.sleep(0.000001) # 1 usec, to let the thread run (Solaris hack)
604