Let the worker run for an hour in daemon mode
authorMichael <heluecht@pirati.ca>
Mon, 4 Jan 2021 09:20:44 +0000 (09:20 +0000)
committerMichael <heluecht@pirati.ca>
Mon, 4 Jan 2021 09:20:44 +0000 (09:20 +0000)
bin/daemon.php
src/Core/Worker.php

index cc86a9f..fcdd735 100755 (executable)
@@ -224,8 +224,10 @@ while (true) {
                usleep($sleep);
 
                $pid = pcntl_waitpid(-1, $status, WNOHANG);
-               Logger::info('Checked children status via pcntl_waitpid', ['pid' => $pid, 'status' => $status]);
-       
+               if ($pid > 0) {
+                       Logger::info('Children quit via pcntl_waitpid', ['pid' => $pid, 'status' => $status]);
+               }
+
                $timeout = ($seconds >= $wait_interval);
        } while (!$timeout && !Worker::IPCJobsExists());
 
index 559ae68..bb17e43 100644 (file)
@@ -94,67 +94,101 @@ class Worker
 
                $last_check = $starttime = time();
                self::$state = self::STATE_STARTUP;
+               $wait_interval = self::isDaemonMode() ? 360 : 10;
+               $start = time();
+
+               do {
+                       // We fetch the next queue entry that is about to be executed
+                       while ($r = self::workerProcess()) {
+                               // Don't refetch when a worker fetches tasks for multiple workers
+                               $refetched = DI::config()->get('system', 'worker_multiple_fetch');
+                               foreach ($r as $entry) {
+                                       // Assure that the priority is an integer value
+                                       $entry['priority'] = (int)$entry['priority'];
+
+                                       // The work will be done
+                                       if (!self::execute($entry)) {
+                                               Logger::notice('Process execution failed, quitting.');
+                                               return;
+                                       }
 
-               // We fetch the next queue entry that is about to be executed
-               while ($r = self::workerProcess()) {
-                       // Don't refetch when a worker fetches tasks for multiple workers
-                       $refetched = DI::config()->get('system', 'worker_multiple_fetch');
-                       foreach ($r as $entry) {
-                               // Assure that the priority is an integer value
-                               $entry['priority'] = (int)$entry['priority'];
-
-                               // The work will be done
-                               if (!self::execute($entry)) {
-                                       Logger::notice('Process execution failed, quitting.');
-                                       return;
+                                       // Trying to fetch new processes - but only once when successful
+                                       if (!$refetched && DI::lock()->acquire(self::LOCK_PROCESS, 0)) {
+                                               self::findWorkerProcesses();
+                                               DI::lock()->release(self::LOCK_PROCESS);
+                                               self::$state = self::STATE_REFETCH;
+                                               $refetched = true;
+                                       } else {
+                                               self::$state = self::STATE_SHORT_LOOP;
+                                       }
                                }
 
-                               // Trying to fetch new processes - but only once when successful
-                               if (!$refetched && DI::lock()->acquire(self::LOCK_PROCESS, 0)) {
-                                       self::findWorkerProcesses();
-                                       DI::lock()->release(self::LOCK_PROCESS);
-                                       self::$state = self::STATE_REFETCH;
-                                       $refetched = true;
-                               } else {
-                                       self::$state = self::STATE_SHORT_LOOP;
-                               }
-                       }
-
-                       // To avoid the quitting of multiple workers only one worker at a time will execute the check
-                       if ((time() > $last_check + 5) && !self::getWaitingJobForPID()) {
-                               self::$state = self::STATE_LONG_LOOP;
-
-                               if (DI::lock()->acquire(self::LOCK_WORKER, 0)) {
-                               // Count active workers and compare them with a maximum value that depends on the load
-                                       if (self::tooMuchWorkers()) {
-                                               Logger::notice('Active worker limit reached, quitting.');
+                               // To avoid the quitting of multiple workers only one worker at a time will execute the check
+                               if ((time() > $last_check + 5) && !self::getWaitingJobForPID()) {
+                                       self::$state = self::STATE_LONG_LOOP;
+
+                                       if (DI::lock()->acquire(self::LOCK_WORKER, 0)) {
+                                       // Count active workers and compare them with a maximum value that depends on the load
+                                               if (self::tooMuchWorkers()) {
+                                                       Logger::notice('Active worker limit reached, quitting.');
+                                                       DI::lock()->release(self::LOCK_WORKER);
+                                                       return;
+                                               }
+
+                                               // Check free memory
+                                               if (DI::process()->isMinMemoryReached()) {
+                                                       Logger::warning('Memory limit reached, quitting.');
+                                                       DI::lock()->release(self::LOCK_WORKER);
+                                                       return;
+                                               }
                                                DI::lock()->release(self::LOCK_WORKER);
-                                               return;
                                        }
+                                       $last_check = time();
+                               }
 
-                                       // Check free memory
-                                       if (DI::process()->isMinMemoryReached()) {
-                                               Logger::warning('Memory limit reached, quitting.');
-                                               DI::lock()->release(self::LOCK_WORKER);
-                                               return;
+                               // Quit the worker once every cron interval
+                               if (time() > ($starttime + (DI::config()->get('system', 'cron_interval') * 60))) {
+                                       Logger::info('Process lifetime reached, respawning.');
+                                       self::unclaimProcess();
+                                       if (self::isDaemonMode()) {
+                                               self::IPCSetJobState(true);
+                                       } else {
+                                               self::spawnWorker();
                                        }
-                                       DI::lock()->release(self::LOCK_WORKER);
+                                       return;
                                }
-                               $last_check = time();
+                               $start = time();
                        }
 
-                       // Quit the worker once every cron interval
-                       if (time() > ($starttime + (DI::config()->get('system', 'cron_interval') * 60))) {
-                               Logger::info('Process lifetime reached, respawning.');
-                               self::unclaimProcess();
-                               if (self::isDaemonMode()) {
-                                       self::IPCSetJobState(true);
-                               } else {
-                                       self::spawnWorker();
+                       $seconds = (time() - $start);
+
+                       // logarithmic wait time calculation.
+                       $arg = (($seconds + 1) / ($wait_interval / 9)) + 1;
+                       $sleep = min(1000000, round(log10($arg) * 1000000, 0));
+                       usleep($sleep);
+
+                       $timeout = ($seconds >= $wait_interval);
+                       Logger::info('Timeout', ['timeout' => $timeout, 'seconds' => $seconds, 'sleep' => $sleep]);
+
+                       if (!$timeout) {
+                               if (DI::process()->isMaxLoadReached()) {
+                                       Logger::notice('maximum load reached, quitting.');
+                                       return;
                                }
-                               return;
+
+                               // Kill stale processes every 5 minutes
+                               $last_cleanup = DI::config()->get('system', 'worker_last_cleaned', 0);
+                               if (time() > ($last_cleanup + 300)) {
+                                       DI::config()->set('system', 'worker_last_cleaned', time());
+                                       self::killStaleWorkers();
+                               }
+
+                               // Check if the system is ready
+                               if (!self::isReady()) {
+                                       return;
+                               }               
                        }
-               }
+               } while (!$timeout);
 
                // Cleaning up. Possibly not needed, but it doesn't harm anything.
                if (self::isDaemonMode()) {