Package wtf :: Package opi :: Module daemon
[hide private]
[frames] | no frames]

Source Code for Module wtf.opi.daemon

  1  # -*- coding: ascii -*- 
  2  # 
  3  # Copyright 2006-2012 
  4  # Andr\xe9 Malo or his licensors, as applicable 
  5  # 
  6  # Licensed under the Apache License, Version 2.0 (the "License"); 
  7  # you may not use this file except in compliance with the License. 
  8  # You may obtain a copy of the License at 
  9  # 
 10  #     http://www.apache.org/licenses/LICENSE-2.0 
 11  # 
 12  # Unless required by applicable law or agreed to in writing, software 
 13  # distributed under the License is distributed on an "AS IS" BASIS, 
 14  # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
 15  # See the License for the specific language governing permissions and 
 16  # limitations under the License. 
 17  """ 
 18  Daemon Integration 
 19  ================== 
 20   
 21  Here's the daemon handling implemented. 
 22  """ 
 23  __author__ = u"Andr\xe9 Malo" 
 24  __docformat__ = "restructuredtext en" 
 25   
 26  import errno as _errno 
 27  import fcntl as _fcntl 
 28  import os as _os 
 29  import signal as _signal 
 30  import sys as _sys 
 31  import time as _time 
 32  import warnings as _warnings 
 33   
 34  from wtf import Error, WtfWarning 
 35  from wtf import autoreload as _reload 
 36  from wtf import opi as _opi 
 37  from wtf import osutil as _osutil 
 38  from wtf.cmdline import CommandlineError 
 39  from wtf.config import ConfigurationError 
 40  from wtf.opi import listener as _listener 
 41  from wtf.opi import worker as _worker 
 42   
 43   
44 -class SignalError(Error):
45 """ Error while signalling another daemon process """
46
47 -class PidfileError(Error):
48 """ Something's wrong with the pidfile """
49
50 -class PidfileValidError(PidfileError):
51 """ Pidfile already exists and is valid """
52
53 -class PidfileGarbageError(PidfileError):
54 """ Pidfile contained garbage """
55
56 -class PidfileEmptyError(PidfileGarbageError):
57 """ Pidfile was empty """
58
59 -class PidfileWarning(WtfWarning):
60 """ Warning category for pidfile issues """
61 62
63 -class SigTerm(SystemExit):
64 """ SIGTERM received """
65
66 -class SigHup(SystemExit):
67 """ SIGHUP received """
68 69
70 -class DaemonOPI(object):
71 """ 72 Implement daemonized application 73 74 :See: `wtf.opi.OPIInterface` 75 76 :IVariables: 77 - `_work`: Specialized doer based on the running mode 78 - `pidfile`: pidfile object 79 80 :Types: 81 - `_work`: ``callable`` 82 - `pidfile`: `Pidfile` 83 """ 84 __implements__ = [_opi.OPIInterface] 85 pidfile = None 86
87 - def __init__(self, config, opts, args):
88 self.config = config 89 self.opts = opts 90 self.args = args 91 92 if not opts.keep_descriptors: 93 _osutil.close_descriptors() 94 95 if 'pidfile' not in config.wtf: 96 raise ConfigurationError("Missing pidfile configuration") 97 self.pidfile = Pidfile( 98 _os.path.normpath(_os.path.join(config.ROOT, config.wtf.pidfile)) 99 ) 100 101 if 'detach' not in config.wtf: 102 raise ConfigurationError("Missing detach configuration") 103 self._work = config.wtf.detach and \ 104 DetachedRunner(self).run or Runner(self).run 105 106 self.mode = _opi.OPIInterface.MODE_THREADED 107 self.errorlog = config.wtf('errorlog') 108 if self.errorlog is not None: 109 self.errorlog = _os.path.normpath( 110 _os.path.join(config.ROOT, self.errorlog) 111 )
112
113 - def work(self):
114 """ :see: `wtf.opi.OPIInterface` """ 115 try: 116 return self._work() 117 except PidfileValidError, e: 118 raise _opi.OPIDone(str(e)) 119 except PidfileError, e: 120 raise ConfigurationError(str(e))
121 122
123 -class Runner(object):
124 """ 125 Runner logic, socket setup and that all 126 127 :CVariables: 128 - `detached`: Is the runner detached? 129 130 :IVariables: 131 - `_daemonopi`: DaemonOPI instance 132 133 :Types: 134 - `detached`: ``bool`` 135 - `_daemonopi`: `DaemonOPI` 136 """ 137 detached = False 138
139 - def __init__(self, daemonopi):
140 """ 141 Initialization 142 143 :Parameters: 144 - `daemonopi`: `DaemonOPI` instance 145 146 :Types: 147 - `daemonopi`: `DaemonOPI` 148 """ 149 self._daemonopi = daemonopi
150
151 - def run(self, prerun=None, parent_cleanup=None, child_cleanup=None, 152 logrotate=None):
153 """ 154 Finalize the setup and start the worker 155 156 :Parameters: 157 - `prerun`: Optional initializer/finalizer executed before actually 158 starting up. Called in the worker child (if any). 159 - `parent_cleanup`: Optional function which is called in the parent 160 after a main worker child fork happens. 161 - `child_cleanup`: Optional function which is called in the child 162 after a main worker child fork happens. 163 - `logrotate`: Optional log rotation function 164 165 :Types: 166 - `prerun`: ``callable`` 167 - `parent_cleanup`: ``callable`` 168 - `child_cleanup`: ``callable`` 169 - `logrotate`: ``callable`` 170 """ 171 # pylint: disable = R0912 172 173 opi = self._daemonopi 174 if opi.opts.listen: 175 bind = opi.opts.listen 176 else: 177 try: 178 bind = opi.config.wtf.listen 179 except KeyError: 180 raise ConfigurationError("Missing listen configuration") 181 sock = _listener.ListenerSocket(bind, basedir=opi.config.ROOT) 182 try: 183 baseworker = _worker.factory(opi.config, opi.opts, opi.args) 184 worker = baseworker.setup( 185 sock, prerun, parent_cleanup, child_cleanup 186 ) 187 try: 188 while True: 189 try: 190 self._setup_signals(baseworker) 191 worker.run() 192 except SigTerm: 193 break # just shut down 194 except SigHup: 195 if logrotate is not None: 196 print >> _sys.stderr, "Reopening log file now." 197 logrotate() 198 except _reload.ReloadRequested: 199 print >> _sys.stderr, "Restarting the worker now." 200 else: 201 raise RuntimeError("Worker finished unexpectedly...") 202 finally: 203 worker.shutdown() 204 finally: 205 sock.close()
206
207 - def _setup_signals(self, baseworker):
208 """ 209 Setup signal handlers 210 211 :Parameters: 212 - `baseworker`: The worker model instance 213 214 :Types: 215 - `baseworker`: `worker.WorkerInterface` 216 """ 217 def termhandler(*args): 218 """ TERM handler """ 219 _signal.signal(_signal.SIGTERM, _signal.SIG_IGN) 220 raise SigTerm()
221 _signal.signal(_signal.SIGTERM, termhandler) 222 223 if not self.detached: 224 # job control sends HUP on shell exit -> terminate 225 huphandler = termhandler 226 elif baseworker.sig_hup: 227 def huphandler(*args): 228 """ HUP handler """ 229 _signal.signal(_signal.SIGHUP, _signal.SIG_IGN) 230 raise SigHup()
231 else: 232 huphandler = _signal.SIG_IGN 233 _signal.signal(_signal.SIGHUP, huphandler) 234 235 if not self.detached: 236 def inthandler(*args): 237 """ INT handler """ 238 _signal.signal(_signal.SIGINT, _signal.SIG_IGN) 239 raise _opi.OPIDone() 240 else: 241 inthandler = _signal.SIG_IGN 242 _signal.signal(_signal.SIGINT, inthandler) 243 244
245 -class DetachedRunner(Runner):
246 """ Derived runner, which detaches itself from the terminal """ 247 detached = True 248
249 - def __init__(self, daemonopi):
250 """ 251 Initialization 252 253 :Parameters: 254 - `daemonopi`: `DaemonOPI` instance 255 256 :Types: 257 - `daemonopi`: `DaemonOPI` 258 """ 259 if len(daemonopi.args) <= 1: 260 raise CommandlineError("Missing argument(s)") 261 command = daemonopi.args[-1] 262 try: 263 self.run = dict( 264 start = self._start, 265 stop = self._stop, 266 logrotate = self._logreopen, 267 logreopen = self._logreopen, 268 )[command] 269 except KeyError: 270 raise CommandlineError("Unrecognized command: %s" % command) 271 super(DetachedRunner, self).__init__(daemonopi)
272
273 - def run(self): # pylint: disable = W0221, E0202
274 """ :see: Runner.run """ 275 raise AssertionError("DetachedRunner not properly initialized")
276
277 - def _start(self):
278 """ 279 Do the start command 280 281 Actually detach the current process from the terminal and start 282 the regular runner. 283 """ 284 rfd, wfd = map(_osutil.safe_fd, _os.pipe()) 285 pid = _os.fork() 286 287 # Parent 288 if pid: 289 _os.close(wfd) 290 _os.waitpid(pid, 0) 291 while True: 292 try: 293 success = _os.read(rfd, 1) 294 except OSError, e: 295 if e[0] == _errno.EINTR: 296 continue 297 raise 298 break 299 _os.close(rfd) 300 exit_code = not(bool(success)) 301 _os._exit(exit_code) # pylint: disable = W0212 302 303 # 1st Child 304 else: 305 _os.close(rfd) 306 _os.setsid() 307 _signal.signal(_signal.SIGHUP, _signal.SIG_IGN) 308 pid = _os.fork() 309 if pid: 310 _os._exit(0) # pylint: disable = W0212 311 312 # 2nd Child 313 else: 314 err_fd, logrotate = self._setup_detached() 315 prerun, pcleanup, ccleanup = \ 316 self._make_finalizers(err_fd, wfd) 317 return super(DetachedRunner, self).run( 318 prerun=prerun, 319 parent_cleanup=pcleanup, 320 child_cleanup=ccleanup, 321 logrotate=logrotate, 322 )
323
324 - def _make_finalizers(self, err_fd, wfd):
325 """ 326 Make finalizers 327 328 :Parameters: 329 - `err_fd`: Deferred error fd setup 330 - `wfd`: success fd to the parent 331 332 :Types: 333 - `err_fd`: `_DeferredStreamSetup` 334 - `wfd`: ``int`` 335 336 :return: Three callables, prerun, parent_cleanup and child_cleanup 337 :rtype: ``tuple`` 338 """ 339 def prerun(): 340 """ Finalize detaching setup """ 341 err_fd.finish() 342 while True: 343 try: 344 if _os.write(wfd, "!") > 0: 345 break 346 except OSError, e: 347 if e[0] == _errno.EPIPE: 348 print >> _sys.stderr, "Parent died, so do I." 349 _sys.exit(1) 350 elif e[0] == _errno.EINTR: 351 continue 352 raise 353 _os.close(wfd)
354 355 def parent_cleanup(): 356 """ Cleanup parent after fork """ 357 _os.close(wfd) 358 359 pidfile = self._daemonopi.pidfile 360 def child_cleanup(): 361 """ Cleanup child after fork """ 362 pidfile.close() 363 364 return prerun, parent_cleanup, child_cleanup 365
366 - def _setup_detached(self):
367 """ 368 Setup the detached main process 369 370 :return: The error stream setup object and a logrotator 371 (maybe ``None``). The setup is deferred so more errors go to 372 the caller's shell 373 :rtype: ``tuple`` 374 375 :Exceptions: 376 - `PidfileValidError`: The pidfile is used by another daemon 377 - `PidfileError`: Error while handling the pidfile 378 - `ConfigurationError`: Happening while setting up the streams 379 """ 380 pidfile = self._daemonopi.pidfile 381 if not pidfile.acquire(): 382 raise PidfileValidError( 383 "Pidfile %r already in use and locked. There's " 384 "probably another daemon running already." % 385 (pidfile.name,) 386 ) 387 try: 388 pidfile.read() # just to print a warning if it's rubbish 389 except PidfileEmptyError: 390 pass 391 pidfile.write(_os.getpid()) 392 self._setup_stream(0, '/dev/null', _os.O_RDONLY) 393 self._setup_stream(1, '/dev/null', _os.O_WRONLY) 394 if self._daemonopi.errorlog is not None: 395 err_fd = self._setup_stream(2, self._daemonopi.errorlog, 396 _os.O_WRONLY | _os.O_CREAT | _os.O_APPEND, defer=True) 397 def logrotate(): 398 """ Rotate error log """ 399 self._setup_stream(2, self._daemonopi.errorlog, 400 _os.O_WRONLY | _os.O_CREAT | _os.O_APPEND)
401 else: 402 logrotate, err_fd = None, self._setup_stream(2, '/dev/null', 403 _os.O_WRONLY, defer=True) 404 return err_fd, logrotate 405
406 - def _stop(self, signals=('TERM',)):
407 """ 408 Do the stop command 409 410 :Parameters: 411 - `signals`: List of signals to send (``('name', ...)``) 412 413 :Types: 414 - `signals`: ``tuple`` 415 416 :Exceptions: 417 - `AttributeError`: A signal name does not exist 418 - `SignalError`: Error sending the signal 419 """ 420 msg = "No daemon found running" 421 try: 422 locked = self._daemonopi.pidfile.acquire() 423 except PidfileError: 424 raise _opi.OPIDone(msg) 425 426 try: 427 if locked: 428 raise _opi.OPIDone(msg) 429 try: 430 pid = self._daemonopi.pidfile.read() 431 except PidfileGarbageError, e: 432 _warnings.warn(str(e), category=PidfileWarning) 433 raise _opi.OPIDone(msg) 434 435 for signame in signals: 436 self._signal(pid, signame) 437 raise _opi.OPIDone("Sent %s to pid %d" % ("/".join(signals), pid)) 438 finally: 439 self._daemonopi.pidfile.release()
440
441 - def _logreopen(self):
442 """ 443 Do the logrotate/logreopen command 444 445 This calls `_stop` with a different signal set. 446 """ 447 return self._stop(signals=('HUP', 'CONT'))
448
449 - def _signal(self, pid, signame):
450 """ 451 Send a signal named `signame` to process `pid`. 452 453 :Parameters: 454 - `pid`: pid 455 - `signame`: signal name 456 457 :Types: 458 - `pid`: ``int`` 459 - `signame`: ``str`` 460 461 :Exceptions: 462 - `AttributeError`: The signal name does not exist 463 - `SignalError`: Error sending the signal 464 """ 465 sig = getattr(_signal, "SIG%s" % signame) 466 try: 467 _os.kill(pid, sig) 468 except OSError, e: 469 if e.errno != _errno.ESRCH: 470 raise SignalError("Error sending signal %s to pid %d: %s" % ( 471 signame, pid, str(e)))
472
473 - def _setup_stream(self, fileno, filename, flags, defer=False):
474 """ 475 Setup a stream to point to a particular file 476 477 :Parameters: 478 - `fileno`: The descriptor number of the target stream 479 - `filename`: The name of the file to attach 480 - `flags`: Flags to be passed to the open(2) call 481 - `defer`: Actually dupe the file descriptor to fileno? 482 483 :Types: 484 - `fileno`: ``int`` 485 - `filename`: ``str`` 486 - `flags`: ``int`` 487 - `defer`: ``bool`` 488 489 :return: A `_DeferredStreamSetup` object if the setup is deferred, 490 ``None`` otherwise 491 :rtype: `_DeferredStreamSetup` 492 493 :Exceptions: 494 - `ConfigurationError`: Error while opening `filename` 495 - `OSError`: dup2(2) error 496 """ 497 try: 498 fd = _os.open(filename, flags, 0666) 499 except OSError, e: 500 raise ConfigurationError( 501 "Could not open %s: %s" % (filename, str(e))) 502 if defer: 503 return _DeferredStreamSetup(fd, fileno) 504 if fd != fileno: 505 try: 506 _os.dup2(fd, fileno) 507 finally: 508 _os.close(fd)
509 510
511 -class _DeferredStreamSetup(object):
512 """ 513 Finalizer for deferred setup streams 514 515 :IVariables: 516 - `_fd`: The descriptor to finalize 517 - `_fileno`: The target descriptor number 518 519 :Types: 520 - `_fd`: ``int`` 521 - `_fileno`: ``int`` 522 """
523 - def __init__(self, fd, fileno):
524 """ 525 Initialization 526 527 :Parameters: 528 - `fd`: The descriptor to finalize 529 - `fileno`: The target descriptor number 530 531 :Types: 532 - `fd`: ``int`` 533 - `fileno`: ``int`` 534 """ 535 self._fd, self._fileno = fd, fileno
536
537 - def finish(self):
538 """ Actually finalize the descriptor setup """ 539 fd, fileno = self._fd, self._fileno 540 if fd != fileno: 541 try: 542 _, self._fd = _os.dup2(fd, fileno), fileno 543 finally: 544 _os.close(fd)
545 546
547 -class Pidfile(object):
548 """ 549 PID file representation 550 551 :CVariables: 552 - `_O_FLAGS`: opening flags for the pid file 553 - `_L_FLAGS`: locking flags for the pid file 554 555 :IVariables: 556 - `name`: The name of the pidfile 557 558 :Types: 559 - `name`: ``str`` 560 - `_O_FLAGS`: ``int`` 561 - `_L_FLAGS`: ``int`` 562 """ 563 _O_FLAGS = _os.O_RDWR | _os.O_CREAT 564 _L_FLAGS = _fcntl.LOCK_EX | _fcntl.LOCK_NB 565 566 _fp, _locked, name = None, False, None 567
568 - def __init__(self, name):
569 """ 570 Initialization 571 572 :Parameters: 573 - `name`: The name of the pid file 574 575 :Types: 576 - `name`: ``str`` 577 """ 578 self.name = name
579
580 - def __del__(self):
581 self.release()
582
583 - def release(self):
584 """ Close the pid file, remove it and release any lock """ 585 try: 586 name, self.name = self.name, None 587 if name is not None and self._fp is not None and self._locked: 588 _osutil.unlink_silent(name) 589 finally: 590 fp, self._fp, self._locked = self._fp, None, False 591 if fp is not None: 592 fp.close()
593
594 - def close(self):
595 """ Close the pidfile (do not release it) """ 596 fp, self.name, self._fp, self._locked = self._fp, None, None, False 597 if fp is not None: 598 fp.close()
599
600 - def acquire(self):
601 """ 602 Open the pid file and acquire a lock 603 604 :return: Did we acquire the lock? 605 :rtype: ``bool`` 606 607 :Exceptions: 608 - `PidfileError`: An error happened while opening or locking 609 """ 610 self._fp = self._open() 611 return self._lock()
612
613 - def read(self):
614 """ 615 Read the pid from the file 616 617 :return: The pid from the file 618 :rtype: ``int`` 619 620 :Exceptions: 621 - `AssertionError`: The pid file was not open 622 - `PidfileEmptyError`: The pid file was empty 623 - `PidfileGarbageError`: The pid file contained garbage 624 """ 625 fp = self._fp 626 if fp is None: 627 raise AssertionError("Pidfile not open") 628 629 try: 630 fp.seek(0) 631 pid = fp.read() 632 if not pid: 633 raise PidfileEmptyError("Pidfile %r was empty" % (self.name,)) 634 try: 635 return int(pid.rstrip()) 636 except (ValueError, TypeError), e: 637 raise PidfileGarbageError( 638 "Pidfile %r contained garbage (huh?)" % (self.name,) 639 ) 640 except (OSError, IOError), e: 641 raise PidfileError(str(e))
642
643 - def write(self, pid):
644 """ 645 Write a pid into the file 646 647 :Parameters: 648 - `pid`: The PID to write 649 650 :Types: 651 - `pid`: ``int`` 652 653 :Exceptions: 654 - `AssertionError`: The pid file was not open or not locked 655 - `PidfileError`: I/O error while writing the file 656 """ 657 fp = self._fp 658 if fp is None: 659 raise AssertionError("Pidfile not open") 660 elif not self._locked: 661 raise AssertionError("Pidfile not locked") 662 663 try: 664 fp.seek(0) 665 fp.truncate() 666 fp.write("%d\n" % pid) 667 fp.flush() 668 except IOError, e: 669 raise PidfileError(str(e))
670
671 - def _open(self):
672 """ 673 Open the pid file 674 675 :return: The file object 676 :rtype: ``file`` 677 678 :Exceptions: 679 - `PidfileError`: The file could not be opened 680 """ 681 fp = self._fp 682 if fp is not None: 683 return fp 684 try: 685 fd = _osutil.safe_fd(_os.open(self.name, self._O_FLAGS, 0666)) 686 try: 687 _osutil.close_on_exec(fd) 688 return _os.fdopen(fd, 'w+') 689 except: # pylint: disable = W0702 690 e = _sys.exc_info() 691 try: 692 _os.close(fd) 693 finally: 694 try: 695 raise e[0], e[1], e[2] 696 finally: 697 del e 698 except (OSError, IOError), e: 699 raise PidfileError(str(e))
700
701 - def _lock(self):
702 """ 703 Try locking the pid file 704 705 :return: Did we get the lock? 706 :rtype: ``bool`` 707 708 :Exceptions: 709 - `AssertionError`: Pidfile is not open 710 - `PidfileError`: Some I/O error occurred while accessing the 711 descriptor 712 """ 713 fp = self._fp 714 if fp is None: 715 raise AssertionError("Pidfile not open") 716 717 locked, tries = False, 2 718 while True: 719 try: 720 _fcntl.lockf(fp.fileno(), self._L_FLAGS) 721 except IOError, e: 722 if e.errno not in (_errno.EACCES, _errno.EAGAIN): 723 raise PidfileError(str(e)) 724 tries -= 1 725 if tries <= 0: 726 break 727 _time.sleep(0.1) 728 else: 729 locked = True 730 break 731 self._locked = locked 732 return locked
733