Merge pull request #9752 from annando/fork
authorHypolite Petovan <hypolite@mrpetovan.com>
Wed, 6 Jan 2021 14:22:10 +0000 (09:22 -0500)
committerGitHub <noreply@github.com>
Wed, 6 Jan 2021 14:22:10 +0000 (09:22 -0500)
We can now fork worker

bin/daemon.php
bin/worker.php
index.php
src/App/Mode.php
src/Core/Process.php
src/Core/Worker.php
static/defaults.config.php

index d08aa37..fcdd735 100755 (executable)
@@ -29,6 +29,7 @@ if (php_sapi_name() !== 'cli') {
 }
 
 use Dice\Dice;
+use Friendica\App\Mode;
 use Friendica\Core\Logger;
 use Friendica\Core\Worker;
 use Friendica\Database\DBA;
@@ -65,6 +66,8 @@ if (DI::mode()->isInstall()) {
        die("Friendica isn't properly installed yet.\n");
 }
 
+DI::mode()->setExecutor(Mode::DAEMON);
+
 DI::config()->load();
 
 if (empty(DI::config()->get('system', 'pidfile'))) {
@@ -144,34 +147,35 @@ Logger::notice('Starting worker daemon.', ["pid" => $pid]);
 if (!$foreground) {
        echo "Starting worker daemon.\n";
 
-       // Switch over to daemon mode.
-       if ($pid = pcntl_fork()) {
-               return;     // Parent
-       }
-
-       fclose(STDIN);  // Close all of the standard
-
-       // Enabling this seem to block a running php process with 100% CPU usage when there is an outpout
-       // fclose(STDOUT); // file descriptors as we
-       // fclose(STDERR); // are running as a daemon.
-
        DBA::disconnect();
 
+       // Fork a daemon process
+       $pid = pcntl_fork();
+       if ($pid == -1) {
+               echo "Daemon couldn't be forked.\n";
+               Logger::warning('Could not fork daemon');
+               exit(1);
+       } elseif ($pid) {
+               // The parent process continues here
+               echo 'Child process started with pid ' . $pid . ".\n";
+               Logger::notice('Child process started', ['pid' => $pid]);
+               file_put_contents($pidfile, $pid);
+               exit(0);
+       }
+
+       // We now are in the child process
        register_shutdown_function('shutdown');
 
+       // Make the child the main process, detach it from the terminal
        if (posix_setsid() < 0) {
                return;
        }
 
-       if ($pid = pcntl_fork()) {
-               return;     // Parent
-       }
+       // Closing all existing connections with the outside
+       fclose(STDIN);
 
-       $pid = getmypid();
-       file_put_contents($pidfile, $pid);
-
-       // We lose the database connection upon forking
-       DBA::reconnect();
+       // And now connect the database again
+       DBA::connect();
 }
 
 DI::config()->set('system', 'worker_daemon_mode', true);
@@ -219,6 +223,11 @@ while (true) {
                $sleep = min(1000000, round(log10($arg) * 1000000, 0));
                usleep($sleep);
 
+               $pid = pcntl_waitpid(-1, $status, WNOHANG);
+               if ($pid > 0) {
+                       Logger::info('Children quit via pcntl_waitpid', ['pid' => $pid, 'status' => $status]);
+               }
+
                $timeout = ($seconds >= $wait_interval);
        } while (!$timeout && !Worker::IPCJobsExists());
 
index 5698cf1..52400a0 100755 (executable)
@@ -28,7 +28,7 @@ if (php_sapi_name() !== 'cli') {
 
 use Dice\Dice;
 use Friendica\App;
-use Friendica\Core\Process;
+use Friendica\App\Mode;
 use Friendica\Core\Update;
 use Friendica\Core\Worker;
 use Friendica\DI;
@@ -59,6 +59,8 @@ $dice = $dice->addRule(LoggerInterface::class,['constructParams' => ['worker']])
 DI::init($dice);
 $a = DI::app();
 
+DI::mode()->setExecutor(Mode::WORKER);
+
 // Check the database structure and possibly fixes it
 Update::check($a->getBasePath(), true, DI::mode());
 
index fdb15fd..baa6818 100644 (file)
--- a/index.php
+++ b/index.php
@@ -36,6 +36,8 @@ $dice = $dice->addRule(Friendica\App\Mode::class, ['call' => [['determineRunMode
 
 $a = \Friendica\DI::app();
 
+\Friendica\DI::mode()->setExecutor(\Friendica\App\Mode::INDEX);
+
 $a->runFrontend(
        $dice->create(\Friendica\App\Module::class),
        $dice->create(\Friendica\App\Router::class),
index e19de8f..8aa812c 100644 (file)
@@ -38,6 +38,11 @@ class Mode
        const DBCONFIGAVAILABLE   = 4;
        const MAINTENANCEDISABLED = 8;
 
+       const UNDEFINED = 0;
+       const INDEX = 1;
+       const DAEMON = 2;
+       const WORKER = 3;
+
        const BACKEND_CONTENT_TYPES = ['application/jrd+json', 'text/xml',
                'application/rss+xml', 'application/atom+xml', 'application/activity+json'];
 
@@ -47,6 +52,12 @@ class Mode
         */
        private $mode;
 
+       /***
+        * @var int Who executes this Application
+        *
+        */
+       private $executor = self::UNDEFINED;
+
        /**
         * @var bool True, if the call is a backend call
         */
@@ -163,6 +174,31 @@ class Mode
                return ($this->mode & $mode) > 0;
        }
 
+       /**
+        * Set the execution mode
+        *
+        * @param integer $executor Execution Mode
+        * @return void
+        */
+       public function setExecutor(int $executor)
+       {
+               $this->executor = $executor;
+
+               // Daemon and worker are always backend
+               if (in_array($executor, [self::DAEMON, self::WORKER])) {
+                       $this->isBackend = true;
+               }
+       }
+
+       /*isBackend = true;*
+        * get the execution mode
+        *
+        * @return int Execution Mode
+        */
+       public function getExecutor()
+       {
+               return $this->executor;
+       }
 
        /**
         * Install mode is when the local config file is missing or the DB schema hasn't been installed yet.
index 919d37d..447d312 100644 (file)
@@ -77,6 +77,17 @@ class Process
                $this->pid = $pid;
        }
 
+       /**
+        * Set the process id
+        *
+        * @param integer $pid
+        * @return void
+        */
+       public function setPid(int $pid)
+       {
+               $this->pid = $pid;
+       }
+
        /**
         * Log active processes into the "process" table
         */
index 1759fae..504a7b9 100644 (file)
@@ -21,6 +21,7 @@
 
 namespace Friendica\Core;
 
+use Friendica\App\Mode;
 use Friendica\Core;
 use Friendica\Database\DBA;
 use Friendica\DI;
@@ -49,6 +50,7 @@ class Worker
        private static $lock_duration = 0;
        private static $last_update;
        private static $state;
+       private static $daemon_mode = null;
 
        /**
         * Processes the tasks that are in the workerqueue table
@@ -95,6 +97,10 @@ class Worker
 
                // We fetch the next queue entry that is about to be executed
                while ($r = self::workerProcess()) {
+                       if (self::IPCJobsExists(getmypid())) {
+                               self::IPCDeleteJobState(getmypid());
+                       }
+
                        // Don't refetch when a worker fetches tasks for multiple workers
                        $refetched = DI::config()->get('system', 'worker_multiple_fetch');
                        foreach ($r as $entry) {
@@ -145,13 +151,17 @@ class Worker
                        if (time() > ($starttime + (DI::config()->get('system', 'cron_interval') * 60))) {
                                Logger::info('Process lifetime reached, respawning.');
                                self::unclaimProcess();
-                               self::spawnWorker();
+                               if (self::isDaemonMode()) {
+                                       self::IPCSetJobState(true);
+                               } else {
+                                       self::spawnWorker();
+                               }
                                return;
                        }
                }
 
                // Cleaning up. Possibly not needed, but it doesn't harm anything.
-               if (DI::config()->get('system', 'worker_daemon_mode', false)) {
+               if (self::isDaemonMode()) {
                        self::IPCSetJobState(false);
                }
                Logger::info("Couldn't select a workerqueue entry, quitting process", ['pid' => getmypid()]);
@@ -189,7 +199,7 @@ class Worker
                        Logger::warning('Maximum processes reached, quitting.');
                        return false;
                }
-               
+
                return true;
        }
 
@@ -774,7 +784,7 @@ class Worker
                        // Are there fewer workers running as possible? Then fork a new one.
                        if (!DI::config()->get("system", "worker_dont_fork", false) && ($queues > ($active + 1)) && self::entriesExists()) {
                                Logger::info("There are fewer workers as possible, fork a new worker.", ['active' => $active, 'queues' => $queues]);
-                               if (DI::config()->get('system', 'worker_daemon_mode', false)) {
+                               if (self::isDaemonMode()) {
                                        self::IPCSetJobState(true);
                                } else {
                                        self::spawnWorker();
@@ -783,7 +793,7 @@ class Worker
                }
 
                // if there are too much worker, we don't spawn a new one.
-               if (DI::config()->get('system', 'worker_daemon_mode', false) && ($active > $queues)) {
+               if (self::isDaemonMode() && ($active > $queues)) {
                        self::IPCSetJobState(false);
                }
 
@@ -1184,6 +1194,67 @@ class Worker
                self::killStaleWorkers();
        }
 
+       /**
+        * Fork a child process
+        *
+        * @param boolean $do_cron
+        * @return void
+        */
+       private static function forkProcess(bool $do_cron)
+       {
+               if (DI::process()->isMinMemoryReached()) {
+                       Logger::warning('Memory limit reached - quitting');
+                       return;
+               }
+
+               // Children inherit their parent's database connection.
+               // To avoid problems we disconnect and connect both parent and child
+               DBA::disconnect();
+               $pid = pcntl_fork();
+               if ($pid == -1) {
+                       DBA::connect();
+                       Logger::warning('Could not spawn worker');
+                       return;
+               } elseif ($pid) {
+                       // The parent process continues here
+                       DBA::connect();
+
+                       self::IPCSetJobState(true, $pid);
+                       Logger::info('Spawned new worker', ['pid' => $pid]);
+
+                       $cycles = 0;
+                       while (self::IPCJobsExists($pid) && (++$cycles < 100)) {
+                               usleep(10000);
+                       }
+
+                       Logger::info('Spawned worker is ready', ['pid' => $pid, 'wait_cycles' => $cycles]);
+                       return;
+               }
+
+               // We now are in the new worker
+               $pid = getmypid();
+
+               DBA::connect();
+               /// @todo Reinitialize the logger to set a new process_id and uid
+               DI::process()->setPid($pid);
+
+               $cycles = 0;
+               while (!self::IPCJobsExists($pid) && (++$cycles < 100)) {
+                       usleep(10000);
+               }
+
+               Logger::info('Worker spawned', ['pid' => $pid, 'wait_cycles' => $cycles]);
+
+               self::processQueue($do_cron);
+
+               self::unclaimProcess();
+
+               self::IPCSetJobState(false, $pid);
+               DI::process()->end();
+               Logger::info('Worker ended', ['pid' => $pid]);
+               exit();
+       }
+
        /**
         * Spawns a new worker
         *
@@ -1193,16 +1264,14 @@ class Worker
         */
        public static function spawnWorker($do_cron = false)
        {
-               $command = 'bin/worker.php';
-
-               $args = ['no_cron' => !$do_cron];
-
-               $a = DI::app();
-               $process = new Core\Process(DI::logger(), DI::mode(), DI::config(), DI::modelProcess(), $a->getBasePath(), getmypid());
-               $process->run($command, $args);
-
-               // after spawning we have to remove the flag.
-               if (DI::config()->get('system', 'worker_daemon_mode', false)) {
+               if (self::isDaemonMode() && DI::config()->get('system', 'worker_fork')) {
+                       self::forkProcess($do_cron);
+               } else {
+                       $process = new Core\Process(DI::logger(), DI::mode(), DI::config(),
+                               DI::modelProcess(), DI::app()->getBasePath(), getmypid());
+                       $process->run('bin/worker.php', ['no_cron' => !$do_cron]);
+               }
+               if (self::isDaemonMode()) {
                        self::IPCSetJobState(false);
                }
        }
@@ -1294,7 +1363,7 @@ class Worker
                }
 
                // Set the IPC flag to ensure an immediate process execution via daemon
-               if (DI::config()->get('system', 'worker_daemon_mode', false)) {
+               if (self::isDaemonMode()) {
                        self::IPCSetJobState(true);
                }
 
@@ -1319,7 +1388,7 @@ class Worker
                }
 
                // Quit on daemon mode
-               if (DI::config()->get('system', 'worker_daemon_mode', false)) {
+               if (self::isDaemonMode()) {
                        return $added;
                }
 
@@ -1413,12 +1482,27 @@ class Worker
         * Set the flag if some job is waiting
         *
         * @param boolean $jobs Is there a waiting job?
+        * @param int $key Key number
         * @throws \Exception
         */
-       public static function IPCSetJobState($jobs)
+       public static function IPCSetJobState(bool $jobs, int $key = 0)
        {
                $stamp = (float)microtime(true);
-               DBA::update('worker-ipc', ['jobs' => $jobs], ['key' => 1], true);
+               DBA::replace('worker-ipc', ['jobs' => $jobs, 'key' => $key]);
+               self::$db_duration += (microtime(true) - $stamp);
+               self::$db_duration_write += (microtime(true) - $stamp);
+       }
+
+       /**
+        * Delete a key entry
+        *
+        * @param int $key Key number
+        * @throws \Exception
+        */
+       public static function IPCDeleteJobState(int $key)
+       {
+               $stamp = (float)microtime(true);
+               DBA::delete('worker-ipc', ['key' => $key]);
                self::$db_duration += (microtime(true) - $stamp);
                self::$db_duration_write += (microtime(true) - $stamp);
        }
@@ -1426,13 +1510,14 @@ class Worker
        /**
         * Checks if some worker job waits to be executed
         *
+        * @param int $key Key number
         * @return bool
         * @throws \Exception
         */
-       public static function IPCJobsExists()
+       public static function IPCJobsExists(int $key = 0)
        {
                $stamp = (float)microtime(true);
-               $row = DBA::selectFirst('worker-ipc', ['jobs'], ['key' => 1]);
+               $row = DBA::selectFirst('worker-ipc', ['jobs'], ['key' => $key]);
                self::$db_duration += (microtime(true) - $stamp);
 
                // When we don't have a row, no job is running
@@ -1443,6 +1528,51 @@ class Worker
                return (bool)$row['jobs'];
        }
 
+       /**
+        * Checks if the worker is running in the daemon mode.
+        *
+        * @return boolean
+        */
+       public static function isDaemonMode()
+       {
+               if (!is_null(self::$daemon_mode)) {
+                       return self::$daemon_mode;
+               }
+
+               if (DI::mode()->getExecutor() == Mode::DAEMON) {
+                       return true;
+               }
+
+               $daemon_mode = DI::config()->get('system', 'worker_daemon_mode', false, true);
+               if ($daemon_mode) {
+                       return $daemon_mode;
+               }
+
+               if (!function_exists('pcntl_fork')) {
+                       self::$daemon_mode = false;
+                       return false;
+               }
+
+               $pidfile = DI::config()->get('system', 'pidfile');
+               if (empty($pidfile)) {
+                       // No pid file, no daemon
+                       self::$daemon_mode = false;
+                       return false;
+               }
+
+               if (!is_readable($pidfile)) {
+                       // No pid file. We assume that the daemon had been intentionally stopped.
+                       self::$daemon_mode = false;
+                       return false;
+               }
+
+               $pid = intval(file_get_contents($pidfile));
+               $running = posix_kill($pid, 0);
+
+               self::$daemon_mode = $running;
+               return $running;
+       }
+
        /**
         * Test if the daemon is running. If not, it will be started
         *
index 310d1ea..2bc2bea 100644 (file)
@@ -538,6 +538,11 @@ return [
                // Number of worker tasks that are fetched in a single query.
                'worker_fetch_limit' => 1,
 
+               // worker_fork (Boolean)
+               // Experimental setting. Use pcntl_fork to spawn a new worker process.
+               // Does not work when "worker_multiple_fetch" is enabled (Needs more testing)
+               'worker_fork' => false,
+
                // worker_jpm (Boolean)
                // If enabled, it prints out the jobs per minute.
                'worker_jpm' => false,
@@ -555,6 +560,7 @@ return [
                // worker_multiple_fetch (Boolean)
                // When activated, the worker fetches jobs for multiple workers (not only for itself).
                // This is an experimental setting without knowing the performance impact.
+               // Does not work when "worker_fork" is enabled (Needs more testing)
                'worker_multiple_fetch' => false,
                
                // worker_defer_limit (Integer)