Facebook
Twitter
Google+
Kommentare
0

Multiprocessing mit einer simplen Job Queue und PCNTL

Hallo zusammen,

da ich in letztens auf der Suche nach einer Möglichkeit war die Laufzeit eines langwierigen Konsolen Scripts zu beschleunigen, wurde ich auf die Process Control kurz PCNTL Extension aufmerksam.

Hier mal frei nach php.net: Die PCTNL Extension in PHP implementiert die Unix Prozesserzeugung, die Prozessausführung, die Signalverwaltung und die Prozessbeendigung.
Mir ging es dabei letztendlich um die Prozesserzeugung, also um die Möglichkeit von einem Hauptprozess mehrere Kindprozesse abzweigen zu lassen (forken).

Hier die Ausgangssituation:
Ich hatte ein Skript dass sich von ca. 50 verschiedenen Urls Bilder abholte. Bisher lief das ganze schön in einem Skript Url für Url durch, was auf Dauer ziemlich langsam wurde.

Das ganze sah ungefähr so aus.

$urls = getImageUrls();

$client = new Zend_Http_Client();

foreach ($urls as $url) {
    $client->setUri($url);
    $content = new Zend_Dom_Query($client->request()->getBody());

    // ... zeugs um die bildurls aus dem request body heraus zu parsen

    foreach ($imageUrls as $imageUrl) {
        $file = $client->setUri($imageUrl)->request()->getBody();
        file_put_contents('irgendeinname', $file);
    }
}

Mein erster Ansatz mit der PCNTL Extension war nun jede URL als eigenen Prozess in einer Art Queue laufen zu lassen.

$queue = getImageUrls();

$client = new Zend_Http_Client();

for ($i = 0; $i < count($queue); $i++) {
    $pid = pcntl_fork();

    if (!$pid) {
      $client->setUri($queue[$i]);
      $content = new Zend_Dom_Query($client->request()->getBody());

      // ... zeugs um die bildurls aus dem request body heraus zu parsen

        foreach ($imageUrls as $imageUrl) {
            $file = $client->setUri($imageUrl)->request()->getBody();
            file_put_contents('irgendeinname', $file);
        }
    }
}

Das war schön und gut, aber berücksichtigte nicht die Tatsache, dass ein geforkter Prozess immer eine Kopie das aktuell laufenden Eltern Prozesses war.
Daraus ergaben sich natürlich Probleme im Bezug auf alle benutzten Variablen. Ich musste also dafür sorgen, dass jeder Kindprozess seine eigenen Variablen benutzte.

Also entschloss ich mich dazu, eine simple MultiprocessQueue zu bauen, die Jobs ausführen konnte.
Wobei jeder Job eine eigene Instanz einer beliebigen Jobklasse ist, die von einer abstrakten Jobklasse abgeleitet wurde.

Wichtig ist an dieser Stelle noch anzumerken, dass die PCNTL Extension nicht auf Windows Systemen läuft, sondern ausschließlich auf Unix Systemen. Des weiteren sollte sie nicht in einer Webserverumgebung ausgeführt werden um unerwünschte Nebeneffekte zu vermeiden.

Um nun mein Ziel zu erreichen, erstellte ich zunächst eine abstrakte Job Klasse die im Prinzip so aussah:

abstract class DT_Multiprocess_Job_Abstract
{
    const STATUS_PENDING = 1;
    const STATUS_RUNNING = 2;
    const STATUS_FINISHED = 3;

    protected $_jobId;
    protected $_status = self::STATUS_PENDING;

    public function execute ()
    {
        throw new DT_Multiprocess_Exception(__FUNCTION__
            . ' must be implemented');
    }

    public final function setJobId ($jobId)
    {
        $this->_jobId = $jobId;
    }

    public final function getJobId ()
    {
        return $this->_jobId;
    }

    public final function setStatus ($status)
    {
        $this->_status = $status;
    }

    public final function getStatus ()
    {
        return $this->_status;
    }
}

Und dazu die Queue Klasse, die von DT_Multiprocess_Job_Abstract abgeleitete Klassen ausführen konnte.

Hier ein kurzer Auszug der wichtigsten Funktionen:

class DT_Multiprocess_Job_Queue
{
    /**
     * jobs
     *
     * @var array
     */
    protected $_jobs;

...
    /**
     * Adds a new job to the queue
     *
     * Adds a new job to the queue. The job has to extend from
     * DT_Multiprocess_Job_Abstract in order to be accepted.
     *
     * @param DT_Core_Job_Abstract $job
     * @return void
     */
    public function addJob (DT_Multiprocess_Job_Abstract $job)
    {
        $this->_jobs[] = $job;
    }

    /**
     * forks the available jobs and executes them
     *
     * @param void
     */
    public function work ()
    {
       for ($jobId = 0; $jobId < count($this->_jobs); $jobId++) {
             $pid = pcntl_fork();

            if (!$pid) {
                $this->_jobs[$jobId]->setJobId($jobId);
                $this->_jobs[$jobId]->execute();
                exit($jobId);
            }
        }

        // wait for all childs to finish
        while (pcntl_waitpid(0, $status) != -1) {
            $status = pcntl_wexitstatus($status);
            $this->_jobs[$status]->setStatus(
                DT_Multiprocess_Job_Abstract::STATUS_FINISHED
            );
        }
    }
}

Nun konnte ich einen konkreten Job implementieren und Instanzen von diesem von der Queue abarbeiten lassen.
Hier vereinfacht der Bildjob:

class Job_ExampleImageFetching extends DT_Multiprocess_Job_Abstract
{
    protected $_url;

...

    public function __construct ($url)
    {
          $this->_url = $url;
    }

    public function execute()
    {
        $this->setStatus(self::STATUS_RUNNING);
        $client = new Zend_Http_Client();
        $client->setUri($this->_url);
        $content = new Zend_Dom_Query($client->request()->getBody());
         // ... zeugs um die bildurls aus dem request body heraus zu parsen

        foreach ($imageUrls as $imageUrl) {
            $file = $client->setUri($imageUrl)->request()->getBody();
            file_put_contents('irgendeinname', $file);
    }

...
}

Daraus ergab sich dann folgender Workflow zur Abarbeitung der Queue:

    $imageUrls = getImageUrls();
    $jobQueue = new DT_Multiprocess_Job_Queue();
    foreach ($imageUrls as $url) {
        $jobQueue->addJob(new Job_ExampleImageFetching($url));
    }
    $jobQueue->work();

That’s it ;)
Die Ausführungszeit des Konsolenskriptes hat sich drastisch reduziert und ich war glücklich.
Wer sich das genauer anschauen will, kann sich die Klassen auf GitHub anschauen. bald die Klassen auf GitHub anschauen ich versuche sie die Woche noch hoch zu pushen.

In dem Sinne viel Spaß beim forken und bis zum nächsten Mal.

Über den Autor

devtalk

Link erfolgreich vorgeschlagen.

Vielen Dank, dass du einen Link vorgeschlagen hast. Wir werden ihn sobald wie möglich prüfen. Schließen