Commit 65b16ffc by Maiyannah Bishop

Merge branch 'expiry-respawn' into 'nightly'

Add queue daemon expiry respawn from takeshitakenji

See merge request !45
2 parents 7e9258cd c7ee75bd
......@@ -241,6 +241,9 @@ sending out SMS email or XMPP messages, for off-line processing. See
other server required), "stomp" for a stomp server, and "redis" for a Redis
server.
* items_to_handle (int): How many items to handle before a daemon process exits.
Default to unlimited.
* stomp_server (string, default null): "broker URI" for stomp server.
Something like "tcp://hostname:61613". More complicated ones are possible;
see your stomp server's documentation for details.
......
......@@ -131,6 +131,7 @@ class DBQueueManager extends QueueManager
$this->_log(LOG_ERR, "[{$qi->transport}:$rep] Exception (".get_class($e).') thrown: '._ve($e->getMessage()));
$result = false;
}
$this->recordItemHandled();
if ($result) {
$this->_log(LOG_INFO, "[{$qi->transport}:$rep] Successfully handled item");
$this->_done($qi);
......@@ -186,4 +187,4 @@ class DBQueueManager extends QueueManager
// END OF FILE
// ============================================================================
?>
\ No newline at end of file
?>
......@@ -69,6 +69,7 @@
// o protected $groups = array();
// o protected $activeGroups = array();
// o protected $ignoredTransports = array();
// o protected $itemsUntilExpiration = null;
abstract class QueueManager extends IoManager
{
static $qm = null;
......@@ -78,6 +79,7 @@ abstract class QueueManager extends IoManager
protected $groups = array();
protected $activeGroups = array();
protected $ignoredTransports = array();
protected $itemsUntilExpiration = null;
// ------------------------------------------------------------------------
// Function: __construct
......@@ -88,6 +90,48 @@ abstract class QueueManager extends IoManager
}
// ------------------------------------------------------------------------
// Function: setItemsUntilExpiration
// Set a value for the number of items to process before respawning a daemon
// process
//
// Parameters:
// o int $itemsUntilExpiration - number of items until daemon process is respawned
public function setItemsUntilExpiration($itemsUntilExpiration) {
// Don't allow negative values
if ($itemsUntilExpiration > 0)
$this->itemsUntilExpiration = $itemsUntilExpiration;
}
// ------------------------------------------------------------------------
// Function: recordItemHandled
// Keeps track of the number of items left to process before the daemon
// process has to be respawned.
//
// Returns:
// o true if limit has not been reached, false otherwise
public function recordItemHandled() {
if ($this->itemsUntilExpiration !== null) {
$this->_log(LOG_DEBUG, "Items until expiration: $this->itemsUntilExpiration");
// Don't keep decrementing after hitting zero.
if ($this->itemsUntilExpiration <= 0 || $this->itemsUntilExpiration-- <= 0)
return false;
}
return true;
}
// ------------------------------------------------------------------------
// Function: isExpired
// Return true if the maximum number of items for a given daemon process
// has been reached.
//
// Returns:
// o boolean true if there are no more items until expiration or if a limit
// has not been set.
public function isExpired() {
return ($this->itemsUntilExpiration !== null && $this->itemsUntilExpiration <= 0);
}
// ------------------------------------------------------------------------
// Function: _log
// Log a string to the common log
//
......@@ -156,6 +200,7 @@ abstract class QueueManager extends IoManager
$enabled = common_config('queue', 'enabled');
$type = common_config('queue', 'subsystem');
$itemsUntilExpiration = common_config('queue', 'items_to_handle');
if (!$enabled) {
// does everything immediately
......@@ -174,6 +219,7 @@ abstract class QueueManager extends IoManager
default:
throw new ServerException("No queue manager class for type '$type'");
}
self::$qm->setItemsUntilExpiration($itemsUntilExpiration);
}
}
}
......@@ -455,4 +501,4 @@ abstract class QueueManager extends IoManager
// END OF FILE
// ============================================================================
?>
\ No newline at end of file
?>
......@@ -217,8 +217,8 @@ class RedisQueueManager extends QueueManager {
$result = $handler->handle($item);
} catch (NoQueueHandlerException $e) {
$this->_log(LOG_WARNING, "[$transport:{$rep}] No handler for queue $transport; discarding.");
return $this->_done($queue_item, $transport);
$this->_log(LOG_WARNING, "[$transport:{$rep}] No handler for queue $transport; discarding.");
return $this->_done($queue_item, $transport);
} catch (NoResultException $e) {
$this->_log(LOG_ERR, "[$transport:$rep] ".get_class($e).' thrown ('._ve($e->getMessage()).'), ignoring queue_item '._ve($queue_item->id));
......@@ -232,6 +232,7 @@ class RedisQueueManager extends QueueManager {
$this->_log(LOG_ERR, "[$transport:$rep] Exception (".get_class($e).') thrown: '._ve($e->getMessage()));
$result = false;
}
$this->recordItemHandled();
if ($result) {
$this->_log(LOG_INFO, "[$transport:$rep] Successfully handled item");
......@@ -294,4 +295,4 @@ class RedisQueueManager extends QueueManager {
// END OF FILE
// ============================================================================
?>
\ No newline at end of file
?>
......@@ -627,6 +627,7 @@ class StompQueueManager extends QueueManager {
$this->_log(LOG_ERR, "Exception on queue $queue: " . $e->getMessage());
$ok = false;
}
$this->recordItemHandled();
if ($ok) {
$this->_log(LOG_INFO, "Successfully handled $info");
......
......@@ -103,6 +103,7 @@ $default =
'queue' =>
array('enabled' => true,
'daemon' => false, # Use queuedaemon. Default to false
'items_to_handle' => null, # How many items to handle before a daemon process exits. Default to unlimited
'subsystem' => 'db', # default to database, or 'stomp' or 'redis'
'stomp_server' => null,
'queue_basename' => '/queue/statusnet/',
......
......@@ -126,11 +126,13 @@ class QueueDaemon extends SpawningDaemon
class QueueMaster extends IoMaster
{
protected $processManager;
protected $queueManager;
function __construct($id, $processManager)
{
parent::__construct($id);
$this->processManager = $processManager;
$this->queueManager = null;
}
/**
......@@ -140,9 +142,9 @@ class QueueMaster extends IoMaster
{
$managers = array();
if (Event::handle('StartQueueDaemonIoManagers', array(&$managers))) {
$qm = QueueManager::get();
$qm->setActiveGroup('main');
$managers[] = $qm;
$this->queueManager = QueueManager::get();
$this->queueManager->setActiveGroup('main');
$managers[] = $this->queueManager;
$managers[] = $this->processManager;
}
Event::handle('EndQueueDaemonIoManagers', array(&$managers));
......@@ -151,6 +153,14 @@ class QueueMaster extends IoMaster
$this->instantiate($manager);
}
}
function idle() {
if ($this->queueManager !== null && $this->queueManager->isExpired()) {
common_log(LOG_INFO, 'Queue daemon has expired as dictated by configuration, requesting a respawn.');
$this->requestRestart();
} else
IoMaster::idle();
}
}
if (have_option('i')) {
......
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!