r24958 MediaWiki - Code Review archive

Repository:MediaWiki
Revision:r24957‎ | r24958 | r24959 >
Date:18:41, 20 August 2007
Author:mikeb
Status:old
Tags:
Comment:
** The video recode daemon. See README for usage. **
* Also a continuation of my AV support via uploadverification hook, mime plugin, and beginnings of a media handler (AV.php)
* Added sending POST content to the Http class, cURL and PHP-based
* Added an uploadComplete hook in SpecialUpload.php that adds video files to the recode queue/initializes the recode process
Modified paths:
  • /branches/mikeb/encoder_example.c (modified) (history)
  • /branches/mikeb/encoder_example.c.patch (added) (history)
  • /branches/mikeb/phase3/includes/DefaultSettings.php (modified) (history)
  • /branches/mikeb/phase3/includes/HttpFunctions.php (modified) (history)
  • /branches/mikeb/phase3/includes/SpecialUpload.php (modified) (history)
  • /branches/mikeb/phase3/includes/media/AV.php (modified) (history)
  • /branches/mikeb/phase3/includes/mime.info (modified) (history)
  • /branches/mikeb/queue (added) (history)
  • /branches/mikeb/queue/README (added) (history)
  • /branches/mikeb/queue/db_schema.txt (added) (history)
  • /branches/mikeb/queue/notify.php (added) (history)
  • /branches/mikeb/queue/recode-common.php (added) (history)
  • /branches/mikeb/queue/recode-config.php (added) (history)
  • /branches/mikeb/queue/recoded.php (added) (history)

Diff [purge]

Index: branches/mikeb/phase3/includes/SpecialUpload.php
@@ -1350,5 +1350,116 @@
13511351
13521352 function recodePut($img)
13531353 {
1354 - var_dump($img);
 1354+ if(! is_object($img))
 1355+ {
 1356+ return; //probably should throw some error
 1357+ }
 1358+
 1359+ //if this is a re-upload of a file already running or enqueued, cancel it
 1360+ $key = $img->getTitle()->getDBkey();
 1361+
 1362+ $db = wfGetDB(DB_SLAVE);
 1363+ $deleted = false;
 1364+
 1365+ $enq = $db->selectRow('avrecode_queue', "q_img_name", "q_img_name = '$key'", __FUNCTION__);
 1366+ if($enq !== false)
 1367+ {
 1368+ //drop old job from queue.
 1369+ $dbw = wfGetDB(DB_MASTER);
 1370+ $dbw->delete( 'avrecode_queue', array( 'q_img_name' => $key ), __FUNCTION__ );
 1371+ $affected = $dbw->affectedRows();
 1372+ $dbw->commit(); //because job queue does.
 1373+ if($affected) $deleted = true;
 1374+ }
 1375+
 1376+ if(!$deleted) //if we didn't delete it, it might be a running job right now
 1377+ {
 1378+ //look for it as a currently running job
 1379+ $enq = $db->selectRow('avrecode_farm', 'notify_address', "img_name = '$key'", __FUNCTION__);
 1380+ if($enq !== false)
 1381+ {
 1382+ /*
 1383+ * stop running recode.
 1384+ * to ensure this node is back in the usable pool when we look for idle
 1385+ * nodes to do the new upload, this MUST block until the node has
 1386+ * finished updating the db.
 1387+ */
 1388+
 1389+ Http::addNameValuePair('cancel', $key);
 1390+ $result = explode("\n", Http::post($enq->notify_address, 5), 2);
 1391+ if($result[0] == 'success')
 1392+ {
 1393+ //be happy
 1394+ } else if($result[0] == 'error')
 1395+ {
 1396+ //a log entry should probably be silently written with $result[1].
 1397+ //this can theoretically occur if the job finished after the
 1398+ //above query ran, which isn't anything to worry about.
 1399+ } else {
 1400+ //we didn't even communicate with the recoding daemon properly.
 1401+ //this shouldn't happen...network problem? Crashed daemon?
 1402+ }
 1403+ }
 1404+ }
 1405+
 1406+ // the audio/video specific code
 1407+ if(
 1408+ is_object($img)
 1409+ && $img->getMediaType() == MEDIATYPE_VIDEO
 1410+ //|| $img->getMediaType() == MEDIATYPE_AUDIO
 1411+ )
 1412+ {
 1413+ //add to queue
 1414+ $dbw = wfGetDB(DB_MASTER);
 1415+
 1416+ $retryCount = 0;
 1417+ do
 1418+ {
 1419+ //get next number in line
 1420+ $max = $dbw->selectRow('avrecode_queue', 'MAX(q_order) + 1 AS m', array(), __FUNCTION__);
 1421+ $max = $max->m;
 1422+
 1423+ $ins = $dbw->insert('avrecode_queue', array('q_order' => $max, 'q_img_name' => $key));
 1424+ $insCount = $dbw->affectedRows();
 1425+ $retryCount++;
 1426+ } while((!$ins || $insCount == 0) && $retryCount < 4 );
 1427+
 1428+ if(!$ins || $insCount == 0)
 1429+ {
 1430+ //error, could not add to queue. Key exists from nearly concurrent upload?
 1431+ } else {
 1432+ //try to assign to an idle node
 1433+ raiseIdleNode();
 1434+ }
 1435+ }
 1436+}
 1437+
 1438+function raiseIdleNode($except = array())
 1439+{
 1440+ //echo "in raiseIdleNode ";
 1441+ $conds = array("img_name = ''");
 1442+ foreach($except AS $tmp)
 1443+ {
 1444+ $conds[] = "notify_address <> '$tmp'";
 1445+ }
 1446+
 1447+ $dbw = wfGetDB(DB_MASTER);
 1448+ // using a slave here would be a good way to get a populated queue and
 1449+ // recode nodes sitting idly by
 1450+ $node = $dbw->selectRow('avrecode_farm', 'notify_address', $conds);
 1451+
 1452+ if(is_object($node))
 1453+ {
 1454+ //echo ("contacting " . $node->notify_address);
 1455+ //there is a free node to process the recode immediately
 1456+ Http::addNameValuePair('recode', NULL);
 1457+ $result = explode("\n", Http::post($node->notify_address, 4), 2);
 1458+ if($result[0] == 'success')
 1459+ {
 1460+ //the node acknowledges and will check the queue for jobs.
 1461+ } else {
 1462+ $except[] = $node->notify_address;
 1463+ raiseIdleNode($except);
 1464+ }
 1465+ }
13551466 }
\ No newline at end of file
Index: branches/mikeb/phase3/includes/media/AV.php
@@ -492,7 +492,7 @@
493493
494494 protected function mimeInfo()
495495 {
496 - return 'video/x-ms-asf [VIDEO]';
 496+ return "video/x-ms-asf [VIDEO]\nvideo/quicktime [VIDEO]";
497497 }
498498
499499 public function getMediaType($file, $mime)
@@ -514,4 +514,35 @@
515515 }
516516 }
517517 }
 518+
 519+class VideoHandler extends ImageHandler
 520+{
 521+ function canRender()
 522+ {
 523+ return false;
 524+ }
 525+
 526+ function doTransform( $image, $dstPath, $dstUrl, $params, $flags = 0 )
 527+ {
 528+ return false;
 529+ }
 530+
 531+ function getThumbType($ext, $mime)
 532+ {
 533+ return array('ogg', 'video/ogg');
 534+ }
 535+
 536+ function getImageSize( $image, $path )
 537+ {
 538+ $out = array();
 539+ $inspector = new CompositeAVInspector($path);
 540+ $out[0] = $inspector->getFrameWidth();
 541+ $out[1] = $inspector->getFrameHeight();
 542+ $out[2] = NULL;
 543+ $out[3] = 'height="' . $out[1] . '" width="' . $out[0] . '"';
 544+ $out[4] = MimeMagic::singleton()->guessMimeType($path);
 545+
 546+ return $out;
 547+ }
 548+}
518549 ?>
\ No newline at end of file
Index: branches/mikeb/phase3/includes/DefaultSettings.php
@@ -1557,8 +1557,6 @@
15581558 'image/x-ms-bmp' => 'BmpHandler',
15591559 'image/svg+xml' => 'SvgHandler',
15601560 'image/vnd.djvu' => 'DjVuHandler',
1561 - MEDIATYPE_VIDEO => 'VideoHandler',
1562 - MEDIATYPE_AUDIO => 'AudioHandler'
15631561 );
15641562
15651563
Index: branches/mikeb/phase3/includes/HttpFunctions.php
@@ -4,6 +4,8 @@
55 * Various HTTP related functions
66 */
77 class Http {
 8+ private static $data = array();
 9+
810 static function get( $url, $timeout = 'default' ) {
911 return Http::request( "GET", $url, $timeout );
1012 }
@@ -13,6 +15,40 @@
1416 }
1517
1618 /**
 19+ * Add to data sent with the next request. Applies to GET and POST.
 20+ * @param string $name Must be valid for inclusion in a url query.
 21+ * @param string $value Should not be pre-urlencoded.
 22+ */
 23+ static function addNameValuePair($name, $value)
 24+ {
 25+ Http::$data[$name] = urlencode($value);
 26+ }
 27+
 28+ /**
 29+ * Forget previous calls to addNameValuePair.
 30+ */
 31+ static function resetNameValuePairs()
 32+ {
 33+ Http::$data = array();
 34+ }
 35+
 36+ private static function makeQueryString()
 37+ {
 38+ if(count(Http::$data))
 39+ {
 40+ list($name, $val) = each(Http::$data);
 41+ $out = $name . '=' . $val;
 42+ while(list($name, $val) = each(Http::$data))
 43+ {
 44+ $out .= '&' . $name . '=' . $val;
 45+ }
 46+ return $out;
 47+ } else {
 48+ return '';
 49+ }
 50+ }
 51+
 52+ /**
1753 * Get the contents of a file by HTTP
1854 *
1955 * if $timeout is 'default', $wgHTTPTimeout is used
@@ -35,9 +71,18 @@
3672 curl_setopt( $c, CURLOPT_TIMEOUT, $timeout );
3773 curl_setopt( $c, CURLOPT_USERAGENT, "MediaWiki/$wgVersion" );
3874 if ( $method == 'POST' )
 75+ {
3976 curl_setopt( $c, CURLOPT_POST, true );
40 - else
 77+ if(count(Http::$data))
 78+ {
 79+ curl_setopt( $c, CURLOPT_POSTFIELDS, Http::makeQueryString() );
 80+ }
 81+ } else if ( $method == 'GET' && count(Http::$data))
 82+ {
 83+ curl_setopt( $c, CURLOPT_URL, $url . '?' . Http::makeQueryString() );
 84+ } else {
4185 curl_setopt( $c, CURLOPT_CUSTOMREQUEST, $method );
 86+ }
4287
4388 # Set the referer to $wgTitle, even in command-line mode
4489 # This is useful for interwiki transclusion, where the foreign
@@ -61,14 +106,29 @@
62107 } else {
63108 # Otherwise use file_get_contents, or its compatibility function from GlobalFunctions.php
64109 # This may take 3 minutes to time out, and doesn't have local fetch capabilities
 110+ $httpOpts = array( 'method' => $method );
 111+ if(count(Http::$data))
 112+ {
 113+ if($method == 'POST')
 114+ {
 115+ $httpOpts['header'] = 'Content-type: application/x-www-form-urlencoded';
 116+ $httpOpts['content'] = Http::makeQueryString();
 117+ } else if($method == 'GET')
 118+ {
 119+ $url .= '?' . Http::makeQueryString();
 120+ }
 121+ }
65122
66 - $opts = array('http' => array( 'method' => $method ) );
 123+ $opts = array('http' => $httpOpts);
67124 $ctx = stream_context_create($opts);
68125
69126 $url_fopen = ini_set( 'allow_url_fopen', 1 );
70127 $text = file_get_contents( $url, false, $ctx );
71128 ini_set( 'allow_url_fopen', $url_fopen );
72129 }
 130+
 131+ Http::resetNameValuePairs();
 132+
73133 return $text;
74134 }
75135
@@ -104,4 +164,3 @@
105165 return false;
106166 }
107167 }
108 -?>
Index: branches/mikeb/phase3/includes/mime.info
@@ -45,8 +45,8 @@
4646 model/iges [MULTIMEDIA]
4747 model/mesh [MULTIMEDIA]
4848 model/vrml [MULTIMEDIA]
49 -video/quicktime [MULTIMEDIA]
50 -video/x-msvideo [MULTIMEDIA]
 49+video/quicktime [VIDEO]
 50+video/x-msvideo [VIDEO]
5151
5252 text/plain [TEXT]
5353 text/html application/xhtml+xml [TEXT]
Index: branches/mikeb/queue/db_schema.txt
@@ -0,0 +1,22 @@
 2+CREATE TABLE avrecode_queue (
 3+ q_order smallint unsigned auto_increment,
 4+ q_img_name varchar(255),
 5+ PRIMARY KEY(q_img_name),
 6+ UNIQUE KEY(q_order)
 7+);
 8+
 9+CREATE TABLE avrecode_farm (
 10+ notify_address varchar(255),
 11+ last_claimed timestamp,
 12+ img_name varchar(255) NOT NULL DEFAULT '',
 13+ PRIMARY KEY(notify_address)
 14+);
 15+
 16+CREATE TABLE avrecode (
 17+ img_name varchar(255),
 18+ KEY(img_name),
 19+ container_format char(3),
 20+ bitrate smallint unsigned,
 21+ status ENUM('PENDING', 'AVAILABLE', 'FAILED') NOT NULL DEFAULT 'PENDING',
 22+ UNIQUE KEY(img_name, bitrate, container_format)
 23+);
\ No newline at end of file
Property changes on: branches/mikeb/queue/db_schema.txt
___________________________________________________________________
Name: svn:eol-style
124 + native
Index: branches/mikeb/queue/recode-config.php
@@ -0,0 +1,63 @@
 2+<?php
 3+/*
 4+* $wgVideoBitrates and $wgVideoWidths must be parallel arrays,
 5+* with each bitrate being encoded at the corresponding width.
 6+*/
 7+$wgVideoBitrates = array(150, 300, 700);
 8+$wgVideoWidths = array(175, 300, 640);
 9+
 10+$wgRecodeAudio = array(
 11+ 'ogg' => array(
 12+ 'classname' => 'OggAudioFormat',
 13+ 32, 128
 14+ )
 15+);
 16+
 17+$wgRecodeVideo = array(
 18+ 'ogg' => array(
 19+ 'classname' => 'OggVideoFormat',
 20+ array('width' => 175, 'bitrate' => 150),
 21+ array('width' => 320, 'bitrate' => 300),
 22+ array('width' => 640, 'bitrate' => 700)
 23+ )
 24+);
 25+
 26+/*
 27+* IP addresses of hosts that may process uploads and need to notify recoded
 28+* of new jobs.
 29+*/
 30+$acceptIPs = array('127.0.0.1', '192.168.123.133');
 31+
 32+/*
 33+* The path to encoder_example
 34+*/
 35+$encoder_directory = '/data/build/libtheora-1.0alpha7/examples';
 36+
 37+if(! defined("NOTIFY_SCRIPT_URL"))
 38+{
 39+ /* the below is a default that will probably be wrong for your install.
 40+ * Set this value correctly to point to the notify script that will
 41+ * accompany the daemon on this machine.
 42+ */
 43+ define("NOTIFY_SCRIPT_URL", 'http://' . trim(`hostname`) . '/queue/notify.php');
 44+}
 45+
 46+define("MW_INSTALL_PATH", "/data/www/wiki/phase3");
 47+
 48+if(! defined("SIGUSR1"))
 49+{
 50+ /* PHP will not define the signal constants for notify.php because it
 51+ * runs under a web server. You may need to change this value to
 52+ * match your system.
 53+ */
 54+ define("SIGUSR1", 10);
 55+}
 56+
 57+##################### DO NOT EDIT BELOW #####################
 58+define("RECODE_DAEMON_LOCKFILE", "MWRECODED_LOCKFILE");
 59+define("RECODE_NOTIFY_LOCKFILE", "MWRECODEN_LOCKFILE");
 60+define("RECODE_DAEMON_OUTPIPE", "MWRECODED_OUTPIPE.fifo");
 61+define("RECODE_DAEMON_INPIPE", "MWRECODED_INPIPE.fifo");
 62+define("RECODE_DAEMON_STATUS_ERR", 1);
 63+
 64+require('recode-common.php');
\ No newline at end of file
Property changes on: branches/mikeb/queue/recode-config.php
___________________________________________________________________
Name: svn:eol-style
165 + native
Index: branches/mikeb/queue/recoded.php
@@ -0,0 +1,889 @@
 2+<?php
 3+
 4+if(!function_exists("pcntl_fork"))
 5+{
 6+ trigger_error("Process control functions unavailable; cannot daemonize.\n Perhaps you are trying to run the daemon using a non CLI flavor of PHP, or you did not configure php with --enable-pcntl", E_USER_ERROR);
 7+}
 8+
 9+//ensure working directory is same as where recoded resides
 10+chdir(dirname(__FILE__));
 11+require('recode-config.php');
 12+
 13+//ensure the needed MPlayer A/V pipes are in place
 14+if(! verifyFifo('stream.wav'))
 15+{
 16+ trigger_error("Cannot create or use audio fifo stream.wav. Check file perms?", E_USER_ERROR);
 17+}
 18+
 19+if(! verifyFifo('stream.yuv'))
 20+{
 21+ trigger_error("Cannot create or use video fifo stream.yuv. Check file perms?", E_USER_ERROR);
 22+}
 23+
 24+/*
 25+* include the necessary features from MediaWiki...database and FileRepo,
 26+* Title and some GlobalFunctions.
 27+* FileRepo requires a significant chunk of the MW environment to work
 28+*/
 29+define("MEDIAWIKI", true);
 30+require(MW_INSTALL_PATH . '/includes/Defines.php');
 31+require(MW_INSTALL_PATH .'/LocalSettings.php');
 32+require(MW_INSTALL_PATH . '/includes/ProfilerStub.php');
 33+require(MW_INSTALL_PATH . '/includes/AutoLoader.php');
 34+require(MW_INSTALL_PATH . '/includes/GlobalFunctions.php');
 35+require(MW_INSTALL_PATH . '/includes/Setup.php');
 36+
 37+$lfh = fopen(RECODE_DAEMON_LOCKFILE, 'a');
 38+if(!$lfh)
 39+{
 40+ trigger_error("Unable to open the RECODE_DAEMON_LOCKFILE \"" . RECODE_DAEMON_LOCKFILE . "\" for writing", E_USER_ERROR);
 41+}
 42+
 43+chmod(RECODE_DAEMON_LOCKFILE, 0666);
 44+
 45+/* if starting interactive, first action is to wait for commands on the notify
 46+* pipe. Otherwise, the daemon just looks for unclaimed jobs in queue.
 47+*/
 48+if($argv[1] == 'interactive') $interactive = true; else $interactive = false;
 49+
 50+$pid = pcntl_fork();
 51+if ($pid == -1) {
 52+ trigger_error("could not fork", E_USER_ERROR);
 53+} else if ($pid) {
 54+ exit(0); // we are the parent
 55+}
 56+
 57+// else we are the child
 58+
 59+/*
 60+* The file must be locked here, we can't do it before we've forked.
 61+* The lock seems to be lost if it is acquired before fork()ing, and
 62+* a subsequent call to flock within the forked process does nothing. Php bug?
 63+*/
 64+if(!flock($lfh, LOCK_EX + LOCK_NB))
 65+{
 66+ //already locked
 67+ trigger_error("Cannot obtain lock on " . RECODE_DAEMON_LOCKFILE . ". Is another instance of recoded already running?", E_USER_ERROR);
 68+}
 69+
 70+//make an initial connection to the pipes
 71+if(! verifyFifo(RECODE_DAEMON_INPIPE))
 72+{
 73+ trigger_error("Named pipe " . RECODE_DAEMON_INPIPE . " could not be created", E_USER_ERROR);
 74+}
 75+if(! verifyFifo(RECODE_DAEMON_OUTPIPE))
 76+{
 77+ trigger_error("Named pipe " . RECODE_DAEMON_OUTPIPE . " could not be created", E_USER_ERROR);
 78+}
 79+
 80+ftruncate($lfh, 0);
 81+fseek($lfh, 0);
 82+fwrite($lfh, posix_getpid());
 83+fflush($lfh);
 84+
 85+declare(ticks=1); //enables asynch. signal handling
 86+pcntl_signal(SIGUSR1, 'handleSignal');
 87+pcntl_signal(SIGTERM, 'handleSignal');
 88+pcntl_signal(SIGCHLD, 'handleChildTerm');
 89+
 90+define("IN", 0);
 91+define("OUT", 1);
 92+define("ERR", 2);
 93+
 94+$children = array(); //holds vital info for accessing child procs.
 95+
 96+// detach from the controlling terminal
 97+if (!posix_setsid()) {
 98+ trigger_error("could not detach from terminal", E_USER_ERROR);
 99+}
 100+
 101+//fatal errors can leave messes if we don't tell children to quit
 102+register_shutdown_function("fatalError", "PHP has seen fit to terminate the daemon");
 103+set_error_handler("phpErrorHandler");
 104+
 105+
 106+$readPipes = array();
 107+$writePipes = array();
 108+$junk = NULL; // select params 2 & 3 are by reference and thus must be variable
 109+
 110+if($interactive)
 111+{
 112+ echo 'opening outpipe for writing';
 113+ $outfh = fopen(RECODE_DAEMON_OUTPIPE, 'w');
 114+ echo ' opening inpipe for reading';
 115+ $infh = fopen(RECODE_DAEMON_INPIPE, 'r');
 116+ echo ' pipes open';
 117+ $readPipes[] = $infh;
 118+
 119+ $interactive = false;
 120+
 121+ message_send($outfh, 'pid=' . posix_getpid());
 122+} else {
 123+ $outfh = -1;
 124+ $infh = -1;
 125+ seekJob();
 126+}
 127+
 128+foreach(childProcess::$childrenByPid AS $child)
 129+{
 130+
 131+ $readPipes[] = $child->io[OUT][childProcess::PIPE];
 132+ $readPipes[] = $child->io[ERR][childProcess::PIPE];
 133+}
 134+
 135+// MAIN LOOP
 136+while(true)
 137+{
 138+ //recode_log("--------------LOOP-------------");
 139+ $pipeCount = stream_select($readPipes, $junk, $junk, NULL);
 140+ if(!$pipeCount)
 141+ {
 142+ //get here if stream_select is interrupted by an incoming signal
 143+ $readPipes = array();
 144+ }
 145+
 146+ foreach($readPipes AS $read)
 147+ {
 148+ if(isset(childProcess::$childrenByPipe[(int)$read]))
 149+ {
 150+ $child = childProcess::$childrenByPipe[(int)$read];
 151+ if((int) $read == (int) $child->io[OUT][childProcess::PIPE])
 152+ {
 153+ $pipeIndex = OUT;
 154+ } else {
 155+ $pipeIndex = ERR;
 156+ }
 157+ $callback = $child->io[$pipeIndex][childProcess::HANDLER];
 158+ //recode_log("Calling " . $callback);
 159+
 160+ $callback($child->io[$pipeIndex][childProcess::PIPE]);
 161+ } else {
 162+ if((int) $read == (int) $infh)
 163+ {
 164+ handleNotification();
 165+ } else {
 166+ unset(childPRocess::$childrenByPipe[(int)$read]);
 167+ }
 168+ }
 169+ }
 170+
 171+ //set up for next loop
 172+ $readPipes = array();
 173+
 174+ /* infh is added to the watch list here and in handleSignal to ensure it is * included regardless of when a SIGUSR1 is handled.
 175+ */
 176+ if(is_resource($infh))
 177+ {
 178+ $readPipes[] = $infh;
 179+ }
 180+
 181+ foreach(childProcess::$childrenByPid AS $pid => $child)
 182+ {
 183+ if(!is_resource($child->io[OUT][childProcess::PIPE]))
 184+ {
 185+ unset(childProcess::$childrenByPid[$pid]);
 186+ continue;
 187+ }
 188+
 189+ if(!feof($child->io[OUT][childProcess::PIPE]))
 190+ $readPipes[] = $child->io[OUT][childProcess::PIPE];
 191+
 192+ if(!feof($child->io[ERR][childProcess::PIPE]))
 193+ $readPipes[] = $child->io[ERR][childProcess::PIPE];
 194+ }
 195+}
 196+
 197+function piperead($pipe)
 198+{
 199+ if(!is_resource($pipe)) throw new Exception();
 200+ static $pipeBuffers = array();
 201+ if(@isset($pipeBuffers[(int)$pipe]))
 202+ {
 203+ $out = $pipeBuffers[(int)$pipe];
 204+ } else {
 205+ $out = '';
 206+ }
 207+ $maxRead = 8192;
 208+ stream_set_blocking($pipe, 0);
 209+
 210+ do
 211+ {
 212+ $read = fread($pipe, $maxRead);
 213+ $out .= $read;
 214+ } while(strlen($read) == $maxRead);
 215+ $endC = substr($out, -1);
 216+ //only return complete lines
 217+ if($endC != "\n" || $endC != "\r")
 218+ {
 219+ //find last end of line, and only return up through there
 220+ $lastLB = strrpos($out, "\n");
 221+ if($lastLB === false)
 222+ {
 223+ $lastLB = strrpos($out, "\r");
 224+ }
 225+ if(! $lastLB)
 226+ {
 227+ $pipeBuffers[(int)$pipe] = $out;
 228+ return '';
 229+ } else {
 230+ $pipeBuffers[(int)$pipe] = substr($out, $lastLB + 1);
 231+ return substr($out, 0, $lastLB + 1);
 232+ }
 233+ } else {
 234+ return $out;
 235+ }
 236+}
 237+
 238+function handleNotification()
 239+{
 240+ global $infh, $outfh;
 241+ $data = null;
 242+ if(! message_readresponse($infh, $data))
 243+ {
 244+ message_send($outfh, false);
 245+ recode_log("Failed reading (misformed?) message on notify pipe. $data");
 246+ //handleSignal(SIGUSR1); //pipes are out of sync, have to reset them
 247+ } else {
 248+ recode_log("New message: " . $data);
 249+ $msg = explode(' ', $data, 2);
 250+ recode_log("Action is " . $msg[0]);
 251+ $action = trim($msg[0]);
 252+ switch($action)
 253+ {
 254+ case 'cancel':
 255+ $cancelFlag = true;
 256+ if(abortRecode(trim($msg[1])))
 257+ {
 258+ message_send($outfh, $data);
 259+ } else {
 260+ message_send($outfh, 'Job "' . trim($msg[1]) . '" is not currently processing on this node, cannot cancel it.');
 261+ }
 262+ break;
 263+
 264+ case 'recode':
 265+ message_send($outfh, $data);
 266+ seekJob();
 267+ break;
 268+
 269+ default:
 270+ message_send($outfh, false);
 271+ }
 272+ }
 273+ fclose($infh);
 274+ fclose($outfh);
 275+ $infh = -1;
 276+ $outfh = -1;
 277+}
 278+
 279+function seekJob()
 280+{
 281+ global $currentJob;
 282+ if($currentJob)
 283+ {
 284+ return false;
 285+ }
 286+ $dbm = wfGetDB(DB_MASTER);
 287+ $job = $dbm->selectField('avrecode_queue', 'q_img_name', 'q_order = (select min(q_order) from avrecode_queue)', __FUNCTION__);
 288+ if(strlen($job))
 289+ {
 290+ $dbm->delete('avrecode_queue', array("q_img_name = '$job'"), __FUNCTION__);
 291+ if($dbm->affectedRows())
 292+ {
 293+ $dbm->immediateCommit();
 294+ if(! startRecode($job))
 295+ {
 296+ return seekJob();
 297+ } else {
 298+ return true;
 299+ }
 300+ } else {
 301+ return seekJob();
 302+ }
 303+ } else {
 304+ //empty queue
 305+ return false;
 306+ }
 307+}
 308+
 309+function startRecode($name)
 310+{
 311+ /*$wgLocalFileRepo = array(
 312+ 'class' => 'LocalRepo',
 313+ 'name' => '?',
 314+ 'url' => 'http://mikeb.servehttp.com:8080/wiki/phase3/images',
 315+ 'hashLevels' => 2,
 316+ 'directory' => '/data/www/wiki/phase3/images'
 317+ );
 318+ */
 319+
 320+ $title = Title::makeTitleSafe( NS_IMAGE, $name );
 321+ $original = wfFindFile($title);
 322+ $title = $title->getDBkey(); //not the best naming here, but oh well
 323+ if(!is_object($original))
 324+ {
 325+ recode_log("Enqueued job \"$title\" does not exist!");
 326+ return false;
 327+ }
 328+
 329+ $mediatype = $original->getMediaType();
 330+ if($mediatype == MEDIATYPE_VIDEO)
 331+ {
 332+ global $wgVideoBitrates;
 333+ global $wgVideoWidths;
 334+ $bitrates = $wgVideoBitrates;
 335+ $widths = $wgVideoWidths;
 336+ } /* else if($mediatype == MEDIATYPE_AUDIO)
 337+ {
 338+ global $wgAudioBitrates;
 339+ $bitrates = $wgAudioBitrates;
 340+ $widths = array();
 341+ }*/
 342+ if(JobStrategy::buildStrategy($original, $bitrates, $widths))
 343+ {
 344+ global $currentJob;
 345+ $currentJob = $title;
 346+ $dbm = wfGetDB(DB_MASTER);
 347+ $dbm->update('avrecode_farm', array('img_name' => $title), array('notify_address' => NOTIFY_SCRIPT_URL), __FUNCTION__);
 348+
 349+ JobStrategy::runNextPart();
 350+ return true;
 351+ }
 352+}
 353+
 354+function abortRecode($title)
 355+{
 356+ global $currentJob;
 357+ if(strcmp($title, $currentJob) === 0)
 358+ {
 359+ if(! sigkill_children())
 360+ fatalError("Can't quit child processe(s), giving up.");
 361+
 362+ $currentJob = false;
 363+ //mark all jobs associated with this file as failed
 364+ $dbm = wfGetDB(DB_MASTER);
 365+ $dbm->update('avrecode', array('status' => 'FAILED'), array('img_name' => $title), __FUNCTION__);
 366+
 367+ $dbm->update('avrecode_farm', array('img_name' => ''), array('notify_address' => NOTIFY_SCRIPT_URL), __FUNCTION__);
 368+
 369+ return true;
 370+ } else {
 371+ return false;
 372+ }
 373+}
 374+
 375+function sigkill_children()
 376+{
 377+ foreach(childProcess::$childrenByPid AS $pid => $child)
 378+ {
 379+ if(! posix_kill($pid, SIGKILL))
 380+ {
 381+ return false;
 382+ }
 383+ }
 384+ return true;
 385+}
 386+
 387+function recode_log($entry)
 388+{
 389+ echo ("recoded: " . $entry . "\n");
 390+}
 391+
 392+function fatalError($err)
 393+{
 394+ static $called = false;
 395+ //prevents fatalError from rerunning by register_shutdown_function
 396+ if(!$called)
 397+ {
 398+ $called = true;
 399+ //eventually will clean up everything
 400+ recode_log ("FATAL: " . $err);
 401+
 402+ global $children;
 403+ if(is_array($children) && count($children))
 404+ {
 405+ recode_log ("Sending children the term signal...");
 406+ foreach($children AS $pid => $child)
 407+ {
 408+ posix_kill($pid, SIGTERM);
 409+ proc_close($child->handle);
 410+ }
 411+ }
 412+
 413+ global $lfh;
 414+ if($lfh)
 415+ {
 416+ fclose($lfh);
 417+ unlink(RECODE_DAEMON_LOCKFILE);
 418+ }
 419+ die(1);
 420+ }
 421+}
 422+
 423+function handleSignal($sig)
 424+{
 425+ if($sig == SIGUSR1)
 426+ {
 427+ //reconnect with notify i/o pipes
 428+ global $infh, $outfh;
 429+ @fclose($infh);
 430+ @fclose($outfh);
 431+ $outfh = @fopen(RECODE_DAEMON_OUTPIPE, 'w');
 432+ $infh = @fopen(RECODE_DAEMON_INPIPE, 'r');
 433+ global $readPipes;
 434+ $readPipes[] = $infh;
 435+ } else {
 436+ fatalError("Signal received");
 437+ }
 438+}
 439+
 440+function handleChildTerm($sig)
 441+{
 442+ $status;
 443+ $pid = pcntl_wait($status);
 444+
 445+ if($pid > 0)
 446+ {
 447+ if(isset(childProcess::$childrenByPid[$pid]))
 448+ {
 449+ switch(childProcess::$childrenByPid[$pid]->type)
 450+ {
 451+ case childProcess::TYPE_MPLYR:
 452+ //todo: add mechanism for identifying undesired terminations
 453+ break;
 454+
 455+ case childProcess::TYPE_ENCODER:
 456+ //recode_log("Encoder terminated...");
 457+ onEncoderOutput(childProcess::$childrenByPid[$pid]->io[OUT][childProcess::PIPE]);
 458+ onEncoderOutput(childProcess::$childrenByPid[$pid]->io[ERR][childProcess::PIPE]);
 459+ break;
 460+
 461+ default:
 462+ fatalError('A child with unrecognized classification "' . $children[$pid]->name . '" has terminated. Don\'t know how to recover. The lack of a classification is a bug.');
 463+ }
 464+
 465+ childProcess::$childrenByPid[$pid]->destruct();
 466+
 467+ global $cancelFlag;
 468+ if($cancelFlag)
 469+ {
 470+ if(count(childProcess::$childrenByPid) === 0)
 471+ {
 472+ $cancelFlag = false;
 473+ seekJob();
 474+ }
 475+ }
 476+ } else {
 477+ $how = (pcntl_wifexited($status)) ? 'cleanly' : 'badly (' . pcntl_wexitstatus($status) . ')';
 478+
 479+ fatalError("An unregistered child process has quit $how. This shouldn't happen; if you're seeing this message you have encountered a bug. Child pid $pid");
 480+ }
 481+ }
 482+}
 483+
 484+function phpErrorHandler($level, $string, $file, $line)
 485+{
 486+ if($level == E_USER_ERROR)
 487+ {
 488+ fatalError($string . " in " . $file . " on line " . $line);
 489+ } else {
 490+ recode_log($string . " in " . $file . " on line " . $line);
 491+ }
 492+ return true;
 493+}
 494+
 495+class childProcess
 496+{
 497+ static $childrenByPid = array();
 498+ static $childrenByPipe = array();
 499+
 500+ //for the benefits of a clearly defined type
 501+ public $pid;
 502+ public $type;
 503+ public $name;
 504+ public $io;
 505+ public $handle;
 506+
 507+ const TYPE_MPLYR = 1;
 508+ const TYPE_ENCODER = 2;
 509+ const PIPE = 0;//Index of the stream resource for entries in the io 2d array
 510+ const HANDLER = 1;//Index of the correspondig read event handler callback
 511+
 512+ function __construct($type, $name, &$io, &$handle)
 513+ {
 514+ $info = proc_get_status($handle);
 515+
 516+ $this->pid = $info['pid'];
 517+ $this->type = $type;
 518+ $this->name = $name;
 519+ $this->io = &$io;
 520+ $this->handle = &$handle;
 521+
 522+ childProcess::$childrenByPid[$this->pid] = $this;
 523+
 524+ $outpipe = $io[OUT][childProcess::PIPE];
 525+ $errpipe = $io[ERR][childProcess::PIPE];
 526+ childProcess::$childrenByPipe[(int) $outpipe] = $this;
 527+ childProcess::$childrenByPipe[(int) $errpipe] = $this;
 528+ }
 529+
 530+ function destruct()
 531+ {
 532+ unset(childProcess::$childrenByPid[$this->pid]);
 533+ unset(childProcess::$childrenByPipe[(int) $this->io[OUT][childProcess::PIPE]]);
 534+ unset(childProcess::$childrenByPipe[(int) $this->io[ERR][childProcess::PIPE]]);
 535+ }
 536+}
 537+
 538+class JobStrategy
 539+{
 540+ public static $parts = array();
 541+ private static $fileTitle; //used for readability of logs
 542+ private static $origFile;
 543+
 544+ private $sourcefile;
 545+ private $width;
 546+ public $bitrate;
 547+ private $mplyr_opts;
 548+ private $enc_opts;
 549+ private $enc_videofile;
 550+ private $enc_audiofile;
 551+ private $thumbfile;
 552+
 553+ public static function buildStrategy($original, $newBitrates, $newWidths = array())
 554+ {
 555+ JobStrategy::$parts = array();
 556+ JobStrategy::$origFile = $original;
 557+ JobStrategy::$fileTitle = $original->getTitle()->getText();
 558+
 559+ // make sure we don't regenerate any existing versions
 560+ $dbs = wfGetDb(DB_SLAVE);
 561+ $key = $original->getTitle()->getDBkey();
 562+ $result = $dbs->select('avrecode', array('bitrate', 'status'), array("img_name = '$key'", 'container_format = "ogg"'), __METHOD__);
 563+
 564+ $existings = array();
 565+ while($row = $dbs->fetchRow($result))
 566+ {
 567+ //if status is pending but no nodes are claiming it,
 568+ //delete it from the table so this node can try it
 569+ if($row['status'] == 'PENDING')
 570+ {
 571+ $dbm = wfGetDB(DB_MASTER);
 572+ $r = $dbm->selectRow('avrecode_farm', array('notify_address'), array("img_name = '$key'"), __METHOD__);
 573+ if(! is_object($r))
 574+ {
 575+ $dbm->delete('avrecode', array("img_name = '$key'", "bitrate = '" . $row['bitrate'] . "'", "container_format = 'ogg'"));
 576+ } else {
 577+ $existings[] = $row['bitrate'];
 578+ }
 579+ } else {
 580+ $existings[] = $row['bitrate'];
 581+ }
 582+ }
 583+
 584+ if(count($existings))
 585+ {
 586+ recode_log("Not remaking " . count($existings) . " existing versions of " . JobStrategy::$fileTitle . ":");
 587+ $length = count($newBitrates);
 588+ for($i = 0; $i < $length; $i++)
 589+ {
 590+ if(in_array($newBitrates[$i], $existings))
 591+ {
 592+ recode_log("\t" . $newBitrates[$i] . " kbps");
 593+ unset($newBitrates[$i]);
 594+ unset($newWidths[$i]);
 595+ }
 596+ }
 597+ }
 598+
 599+ if(! count($newBitrates))
 600+ {
 601+ global $currentJob;
 602+ $currentJob = false;
 603+ return false;
 604+ }
 605+
 606+ $origWidth = $original->getWidth();
 607+ $mediatype = $original->getMediaType();
 608+
 609+ if($mediatype == MEDIATYPE_AUDIO)
 610+ {
 611+ foreach($newBitrates AS $newBitrate)
 612+ {
 613+ if($newBitrate <= $origBitrate && isset($newBitrate))
 614+ {
 615+ JobStrategy::$parts[] = new JobStrategy($newBitrate);
 616+ }
 617+ }
 618+ } else if($mediatype == MEDIATYPE_VIDEO)
 619+ {
 620+ reset($newWidths);
 621+ reset($newBitrates);
 622+ $currWidth = current($newWidths);
 623+ $currBitrate = current($newBitrates);
 624+ do
 625+ {
 626+ if($currWidth <= $origWidth)
 627+ {
 628+ JobStrategy::$parts[] = new JobStrategy($currBitrate, $currWidth);
 629+ }
 630+ } while(($currWidth = next($newWidths)) && ($currBitrate = next($newBitrates)));
 631+
 632+ if(! count(JobStrategy::$parts) && ! count($existings))
 633+ {
 634+ /* the clip has a smaller frame size than the smallest preset.
 635+ * Make a single version in the original frame size and lowest
 636+ * preset bitrate, unless it happens to be ogg theora already, in
 637+ * which case no recoding will take place
 638+ */
 639+ $mime = $original->getMimeType();
 640+ if(strcasecmp($mime, 'video/ogg') === 0 || strcasecmp($mime, 'application/ogg') === 0)
 641+ {
 642+ /*probably the presentation code should just know to use the
 643+ original when these contitions are true.
 644+ */
 645+ return true;
 646+ } else {
 647+ reset($newBitrates);
 648+ $min = current($newBitrates);
 649+ while($br = next($newBitrates))
 650+ {
 651+ if($br < $min) $min = $br;
 652+ }
 653+ JobStrategy::$parts[] = new JobStrategy($min, $origWidth);
 654+ }
 655+ }
 656+ } else {
 657+ recode_log("File \"" . JobStrategy::$fileTitle . "\" is not a supported media type.");
 658+ recode_log($mediatype);
 659+ global $currentJob;
 660+ $currentJob = false;
 661+ return false;
 662+ }
 663+
 664+ if(count(JobStrategy::$parts))
 665+ {
 666+ /* Sort the parts. When done, the parts will be descending by width when
 667+ * available (ie video), or bitrate otherwise. Because the initial
 668+ * part's decompressed video output may be cached, this ensures the
 669+ * initial part is the largest, avoiding upscaling in subsequent parts.
 670+ */
 671+ usort(JobStrategy::$parts, array(__CLASS__, "uSortCmp"));
 672+
 673+ if($original->isLocal())
 674+ {
 675+ $addy = $original->getPath();
 676+ } else {
 677+ $addy = $original->getUrl();
 678+ }
 679+ JobStrategy::$parts[0]->sourcefile = $addy;
 680+ JobStrategy::$parts[0]->enc_videofile = 'stream.yuv';
 681+ JobStrategy::$parts[0]->enc_audiofile = 'stream.wav';
 682+
 683+ if(count(JobStrategy::$parts) > 1)
 684+ {
 685+ //tell encoder in first part to cache
 686+ JobStrategy::$parts[0]->enc_opts .= " -c";
 687+ for($i = 1; $i < count(JobStrategy::$parts); $i++)
 688+ {
 689+ JobStrategy::$parts[$i]->sourcefile = 'cache_stream.yuv';
 690+ JobStrategy::$parts[$i]->enc_audiofile = 'cache_stream.wav';
 691+ JobStrategy::$parts[$i]->enc_videofile = 'stream.yuv';
 692+ JobStrategy::$parts[$i]->mplyr_opts = '-speed 100';
 693+ }
 694+ }
 695+
 696+ //write rows for each part to avrecode.
 697+ $dbm = wfGetDB(DB_MASTER);
 698+ foreach(JobStrategy::$parts AS $p)
 699+ {
 700+ $dbm->insert('avrecode', array('img_name' => $original->getTitle()->getDBKey(), 'container_format' => 'ogg', 'bitrate' => $p->bitrate), __METHOD__);
 701+ }
 702+
 703+ reset(JobStrategy::$parts);
 704+ }
 705+ return true;
 706+ }
 707+
 708+ public static function uSortCmp(JobStrategy $a, JobStrategy $b)
 709+ {
 710+ if($a->width)
 711+ {
 712+ return $b->width - $a->width;
 713+ } else if($b->width)
 714+ {
 715+ return -1;
 716+ } else {
 717+ return $b->bitrate - $a->bitrate;
 718+ }
 719+ }
 720+
 721+ public function __construct($bitrate, $width = null)
 722+ {
 723+ $this->bitrate = $bitrate;
 724+ $this->width = $width;
 725+
 726+ $thumbName = "{$bitrate}kbps-";
 727+ $dotPos = strrpos(JobStrategy::$origFile->getName(), '.');
 728+ if($dotPos)
 729+ {
 730+ $thumbName .= substr(JobStrategy::$origFile->getName(), 0, $dotPos);
 731+ }
 732+ $thumbName .= '.ogg';
 733+
 734+ $this->thumbfile = JobStrategy::$origFile->getThumbPath() . '/' . $thumbName;
 735+ recode_log("new part br:" . $this->bitrate . ", w:" . $width);
 736+ }
 737+
 738+ public static function runNextPart($started = false)
 739+ {
 740+ if($started)
 741+ {
 742+ foreach(childProcess::$childrenByPid AS $pid => $child)
 743+ {
 744+ posix_kill($pid, SIGTERM);
 745+ proc_close($child->handle);
 746+ //removing from childrenByPid is not appropriate until SIGCHLD
 747+ }
 748+
 749+ $strategy = next(JobStrategy::$parts);
 750+ } else {
 751+ $started = true;
 752+ $strategy = current(JobStrategy::$parts);
 753+ }
 754+
 755+ if(! $strategy)
 756+ {
 757+ //no more parts
 758+ recode_log("No more parts, looking for new job...");
 759+ global $currentJob;
 760+ $currentJob = false;
 761+ $dbm = wfGetDB(DB_MASTER);
 762+ $dbm->update('avrecode_farm', array('img_name' => ''), array('notify_address' => NOTIFY_SCRIPT_URL), __METHOD__);
 763+ return seekJob();
 764+ }
 765+
 766+ $descriptorspec = array(
 767+ IN => array("pipe", "r"),
 768+ OUT => array("pipe", "w"),
 769+ ERR => array("pipe", "w")
 770+ );
 771+
 772+ $mplyr_io = array();
 773+ $enc_io = array();
 774+
 775+ //start new decoder as per $strategy settings.
 776+ /* if it were made possible to manipulate the software scaler in slave
 777+ * mode, restarting MPlayer like this wouldn't be necessary.
 778+ */
 779+ $mplyr_command = 'mplayer -idle -slave -ao pcm:waveheader:fast:file=stream.wav -vo yuv4mpeg -vf scale=' . $strategy->width . ':-2 -af format=u16le ' . $strategy->mplyr_opts;
 780+ //recode_log("Starting MPlayer with: " . $mplyr_command);
 781+ $mplyr_resource = proc_open($mplyr_command, $descriptorspec, $mplyr_io);
 782+ $mplyr_io[IN] = array($mplyr_io[IN], 'no-mplayer-in-handler');
 783+ $mplyr_io[OUT] = array($mplyr_io[OUT], 'onMplayerOutput');
 784+ $mplyr_io[ERR] = array($mplyr_io[ERR], 'onMplayerError');
 785+ new childProcess(childProcess::TYPE_MPLYR, 'MPlayer (' . $strategy->__toString() . ')', $mplyr_io, $mplyr_resource);
 786+ //implicit calling of toString supposedly works as of PHP 5.2
 787+
 788+ /**
 789+ * wait for it to load up. The below sync command and sync response is an
 790+ * ugly hack for detecting completion of mplayer's startup sequence. Mplayer
 791+ * only writes the sync response to standard error when it has finished loading.
 792+ */
 793+ $sync_cmd = "v\n";
 794+ $sync_resp = "Command volume requires at least 1 arguments, we found only 0 so far.\n";
 795+ fwrite($mplyr_io[IN][childProcess::PIPE], $sync_cmd);
 796+ $mplyr_verbosity = '';
 797+ do
 798+ {
 799+ $waitSet = array($mplyr_io[ERR][childProcess::PIPE]);
 800+ $junk = array();
 801+ $startTime = microtime(true);
 802+ $r = stream_select($waitSet, $junk, $junk, 25);
 803+ $select_wait = microtime(true) - $startTime;
 804+ if(!$r)
 805+ {
 806+ if($select_wait > 24)
 807+ {
 808+ fatalError("Mplayer did not start successfully. Any MPlayer error output follows:\n" . $mplyr_verbosity);
 809+ } //otherwise it might have just gotten interrupted by a signal
 810+ } else {
 811+ $mplyr_verbosity .= piperead($mplyr_io[ERR][childProcess::PIPE]);
 812+ }
 813+ } while(strcmp(substr($mplyr_verbosity, -strlen($sync_resp)), $sync_resp)!==0);
 814+ unset($mplyr_verbosity);
 815+ //MPlayer's ready to go!
 816+ fwrite($mplyr_io[IN][childProcess::PIPE], "loadfile " . $strategy->sourcefile . "\n");
 817+
 818+ //and the encoder...
 819+ $output = $strategy->thumbfile;
 820+ // ^ the big todo: send recoded files to repo
 821+ $thumbdir = dirname($strategy->thumbfile);
 822+ if(! is_dir($thumbdir) && ! wfMkdirParents($thumbdir))
 823+ {
 824+ //we can't write where we need to, this daemon is useless until fixed.
 825+ abortRecode();
 826+ fatalError("Cannot create thumbnail directory " . $thumbdir);
 827+ }
 828+ global $encoder_directory;
 829+ $enc_command = $encoder_directory . '/encoder_example ' .
 830+ $strategy->enc_opts . ' -a 2 -V ' . $strategy->bitrate . ' ' . $strategy->enc_audiofile . ' ' . $strategy->enc_videofile . ' > ' . $output;
 831+ recode_log("Starting encoder with: " . $enc_command);
 832+ $enc_resource = proc_open($enc_command,
 833+ $descriptorspec, $enc_io);
 834+
 835+ $enc_io[IN] = array($enc_io[IN], 'no-encoder-in-handler');
 836+ $enc_io[OUT] = array($enc_io[OUT], 'onEncoderOutput');
 837+ $enc_io[ERR] = array($enc_io[ERR], 'onEncoderOutput');
 838+
 839+ new childProcess(childProcess::TYPE_ENCODER, "Encoder (" . $strategy->__toString() . ")", $enc_io, $enc_resource);
 840+ }
 841+
 842+ public function __toString()
 843+ {
 844+ return ('"' . JobStrategy::$fileTitle . "\", w:$this->width;b:$this->bitrate;source:$this->sourcefile");
 845+ }
 846+}
 847+
 848+function onMplayerOutput($pipe)
 849+{
 850+ piperead($pipe);
 851+}
 852+
 853+function onMplayerError($pipe)
 854+{
 855+ $error = piperead($pipe);
 856+ if(strlen($error))
 857+ {
 858+ if(substr($error, 0, 9) == "SwScaler:")
 859+ {
 860+ return;
 861+ }
 862+ recode_log("On Mplayer error pipe: " . piperead($pipe));
 863+ }
 864+}
 865+
 866+function onEncoderOutput($pipe)
 867+{
 868+ $lines = explode("\n", piperead($pipe));
 869+ foreach($lines AS $line)
 870+ {
 871+ if($line == "done.")
 872+ {
 873+ global $cancelFlag;
 874+ if(!$cancelFlag)
 875+ {
 876+ recode_log("Part done. Running next...");
 877+
 878+ //mark this part as available in avrecode.
 879+ $dbm = wfGetDB(DB_MASTER);
 880+ $currStrategy = current(JobStrategy::$parts);
 881+ global $currentJob;
 882+ $dbm->update('avrecode', array('status' => 'AVAILABLE'), array('img_name' => $currentJob, 'bitrate' => $currStrategy->bitrate, 'container_format' => 'ogg'), __FUNCTION__);
 883+
 884+ JobStrategy::runNextPart(true);
 885+ }
 886+ } else {
 887+ //recode_log("e: " . $line);
 888+ }
 889+ }
 890+}
\ No newline at end of file
Property changes on: branches/mikeb/queue/recoded.php
___________________________________________________________________
Name: svn:eol-style
1891 + native
Index: branches/mikeb/queue/README
@@ -0,0 +1,80 @@
 2+Installation steps:
 3+1. Install MPlayer. Tested on Mplayer 1.0rc1-4.1.1
 4+2. Obtain libtheora source.
 5+3. In your libtheora source directory, apply the supplied patch to
 6+ examples/encoder_example. This will allow the encoder to work on data not
 7+ saved to disk (sent from mplayer over named pipes instead), and makes a few
 8+ additional customizations like an option to write a copy of the incoming pcm
 9+ and yuv data to disk so it can be used to process subsequent recodes of the
 10+ same file.
 11+4. Make libtheora (and its dependents, libogg, ...)
 12+5. If the machine to be used does not already have a functional MediaWiki
 13+ installation, install it.
 14+6. Review and edit recode-config.php as necessary.
 15+8. If this is the first recode node you are setting up, you will need to add
 16+ the tables described in db_schema.txt to your database.
 17+7. Add a row for this node to your avrecode_farm table. Setting
 18+ notify_address equal to the NOTIFY_SCRIPT_URL is sufficient.
 19+8. Ensure that the user you run Apache as has read/write permissions to the
 20+ directory you have located recoded.php in.
 21+9. Start recoded.php. To permit signaling between the notify script and the
 22+ daemon, it should be run by the same user that runs apache.
 23+
 24+If you have done everything correctly, the daemon will check the queue
 25+immediately upon startup and start working if it finds a job. When there are
 26+no jobs to do, the daemon will sit idly waiting for the notify script to tell
 27+it to check the queue again.
 28+
 29+For the recoding queue and daemon to integrate with MediaWiki, MediaWiki must
 30+be able to answer such simple questions as "is this upload a video file?" and
 31+if so, "do we know how to convert it to theora?" My proposed solution to this
 32+is my work adding "Mime Plugins" to MimeMagic.php, and, in the case of A/V
 33+files specifically, includes/media/AV.php. This provides an uploadverification
 34+hook, a mime plugin, and an elementary media handler for video files which
 35+collectively add a good measure of support for A/V files to MediaWiki. (The
 36+daemon will in fact break if this mechanism is not in place, as it will not
 37+be able to determine the video's width, and so will not know what maximum sizes
 38+of recoded video to produce.) The hook, plugin, and handler are all powered by
 39+my AVInspector classes, which I have written for both the Ffmpeg php api and
 40+Mplayer's companion script, 'midentify.' You must have at least one installed.
 41+Ffmpeg is faster, midentify is more accurate with a small subset of files -
 42+for the best of both worlds install both, they will automatically be used as
 43+necessary in an optimized fashion.
 44+Even with these A/V support features, as of this writing there is still a
 45+missing component: Binding of files to media handlers is currently done by
 46+the file's guessed mime type. This is...inconvenient, because there are so many
 47+possible mime types that the video media handler (and future audio media
 48+handler) should be assigned to, and you can also make an argument that it is
 49+incorrect by design due to the ambiguous application/ogg which could go to
 50+either. I will leave making changes to the media handler framework up to its
 51+author, but I would suggest that it be modified so that handlers may be
 52+selected based on a file's MEDIATYPE if no direct mime-type match is found.
 53+
 54+My goal was only to recode uploaded video, but of course handling audio is a
 55+conceptually very similar task. While audio recoding is not supported, you can
 56+find references to audio files throughout my code in anticipation of future
 57+audio support. The main missing element is teaching the daemon how to
 58+interface with a vorbis encoder - encoder_example makes theora files only.
 59+This shouldn't be much more than translating JobStrategy objects into whatever
 60+parameter string your encoder wants, and writing an onAudioEncoderOutput and/or
 61+onAudioEncoderError function to handle the status and error information the
 62+encoder produces.
 63+
 64+I put considerable effort into allowing the notify script to automatically
 65+start the daemon if it has not been started manually. There is code in both
 66+notify.php and recoded.php written with this intent, however it is still buggy
 67+at this time. Since this is a small convenience feature only, I am comfortable
 68+leaving its smoothing out to future improvements.
 69+
 70+By daemonizing the script that oversees the actions of MPlayer and the encoder,
 71+it is "theoretically" possible to employ a single persistent instance of
 72+MPlayer for all the decoding the daemon will do in its lifetime, a cleaner and
 73+more efficient approach than starting and killing instance after instance for
 74+every file processed. Unfortunately, the sole missing link to implementing the
 75+daemon in this way is that it is not currently possible to communicate new
 76+frame scaling dimensions to a persistent slave instance of MPlayer. Adding this
 77+capability to the MPlayer slave mode and updating recoded to take advantage of
 78+it is another obvious possible future improvement.
 79+
 80+Any further work by me on this project will be posted @
 81+svn.wikimedia.org/svnroot/mediawiki/branches/mikeb
\ No newline at end of file
Index: branches/mikeb/queue/recode-common.php
@@ -0,0 +1,69 @@
 2+<?php
 3+function verifyFifo($name)
 4+{
 5+ if(! file_exists($name))
 6+ {
 7+ //no fifo (or file), attempt to create one
 8+ if(function_exists("posix_mkfifo"))
 9+ {
 10+ if(! posix_mkfifo($name, 0700))
 11+ {
 12+ return false;
 13+ } else {
 14+ return true;
 15+ }
 16+ } else {
 17+ //try to do it via shell
 18+ if(strlen(trim(shell_exec("mkfifo " . escapeshellarg($name)))) == 0)
 19+ {
 20+ chmod($name, 0700);
 21+ return true;
 22+ } else {
 23+ return false;
 24+ }
 25+ }
 26+ } else if(filetype($name) != 'fifo')
 27+ {
 28+ return false;
 29+ } else {
 30+ return true;
 31+ }
 32+}
 33+
 34+function message_send($fh, $data)
 35+{
 36+ if(!is_resource($fh)) return false;
 37+ $data = serialize($data);
 38+ $msgLength = strlen($data);
 39+ if($msgLength > 9999)
 40+ {
 41+ return false;
 42+ } else {
 43+ $msgLength = str_pad($msgLength, 4, "0", STR_PAD_LEFT);
 44+ if(! fwrite($fh, $msgLength)) return false;
 45+ if(! fwrite($fh, $data)) return false;
 46+ return true;
 47+ }
 48+}
 49+
 50+function message_readresponse($fh, &$data)
 51+{
 52+ if(!is_resource($fh)) return false;
 53+ //get message length
 54+ $bytes = fread($fh, 4);
 55+ if(!$bytes || $bytes != intval($bytes))
 56+ {
 57+ $data = "Expecting byte count, got: $bytes";
 58+ return false;
 59+ }
 60+ $bytes = intval($bytes);
 61+ $read = fread($fh, $bytes);
 62+ if(!$read || $bytes != strlen($read))
 63+ {
 64+ $data = "Fread stopped short of $bytes bytes. Got: $read";
 65+ return false;
 66+ } else {
 67+ $data = unserialize($read);
 68+ return true;
 69+ }
 70+}
\ No newline at end of file
Property changes on: branches/mikeb/queue/recode-common.php
___________________________________________________________________
Name: svn:eol-style
171 + native
Index: branches/mikeb/queue/notify.php
@@ -0,0 +1,205 @@
 2+<?php
 3+require('recode-config.php');
 4+
 5+if(! in_array($_SERVER['REMOTE_ADDR'], $acceptIPs) && false)
 6+ spewError("The request originated from an unauthorized host.");
 7+
 8+if(isset($_POST['stop']))
 9+{
 10+ //ensure the current job is the one this request intended to stop
 11+ if(strlen($_POST['stop']))
 12+ {
 13+ cancel($_POST['stop']);
 14+ } else {
 15+ spewError("Stop job request is missing the targeted job name.");
 16+ }
 17+}
 18+
 19+if(isset($_POST['recode']))
 20+{
 21+ startRecode($_POST['recode']);
 22+}
 23+
 24+function spewError($msg = "Unspecified error")
 25+{
 26+ header("Content-type: text/plain");
 27+ echo "error\n";
 28+ echo "$msg";
 29+ exit(1);
 30+}
 31+
 32+function spewSuccess($msg = "")
 33+{
 34+ header("Content-type: text/plain");
 35+ echo "success\n";
 36+ echo $msg;
 37+ exit(0);
 38+}
 39+
 40+function startRecode()
 41+{
 42+ $result = message("recode");
 43+ if($result && $result['status'] == RECODE_DAEMON_STATUS_OK
 44+ && $result['response'])
 45+ {
 46+ spewSuccess($result['response']);
 47+ } else {
 48+ spewError($result['comment']);
 49+ }
 50+}
 51+
 52+function cancel($name)
 53+{
 54+ $message = "cancel $name";
 55+ $result = message($message);
 56+ if($result['status'] == RECODE_DAEMON_STATUS_ERR)
 57+ {
 58+ spewError("Unable to stop job \"$name\". " . $result['comment']);
 59+ } else {
 60+ if(strcmp($result['response'], $message) === 0)
 61+ {
 62+ spewSuccess("Job \"$name\" has been stopped.");
 63+ } else {
 64+ spewError("Unable to stop job \"$name\". " . $result['response']);
 65+ }
 66+ }
 67+}
 68+
 69+/**
 70+* Sends a message to the daemon and gets its response.
 71+* @param string $msg What will be sent to the daemon.
 72+* @return array Associative array with at least the index 'status',
 73+* set to one of the RECODE_DAEMON_STATUS constants.
 74+*/
 75+function message($msg)
 76+{
 77+ static $daemonPid = false;
 78+ static $outfh;
 79+ static $infh;
 80+
 81+ if(!$daemonPid)
 82+ {
 83+ //wait for exclusive access to the daemon
 84+ $lfh = fopen(RECODE_NOTIFY_LOCKFILE, 'a');
 85+ $reps = 0;
 86+ do
 87+ {
 88+ $wait = flock($lfh, LOCK_EX + LOCK_NB);
 89+ $wait = true;
 90+ $reps++;
 91+ sleep(1);
 92+ } while(!$wait && $reps < 4);
 93+ if(!$wait)
 94+ {
 95+ spewError("Timeout waiting for exclusive access to the daemon");
 96+ }
 97+
 98+ //examine the daemon's own lockfile to ensure one is running
 99+ $dlfh = fopen(RECODE_DAEMON_LOCKFILE, 'a');
 100+ if(!$dlfh)
 101+ {
 102+ spewError("Cannot access daemon's lockfile. Check file permissions.");
 103+ }
 104+ if(flock($dlfh, LOCK_EX + LOCK_NB))
 105+ {
 106+ //no daemon already running!
 107+ flock($dlfh, LOCK_UN);
 108+ echo 'no daemon already! ';
 109+ //ensure that the communication pipes exist
 110+ if(! verifyFifo(RECODE_DAEMON_OUTPIPE))
 111+ {
 112+ return array('status' => RECODE_DAEMON_STATUS_ERR, 'comment' => "Daemon communication error: Couldn't establish outut pipe.");
 113+ }
 114+ if(! verifyFifo(RECODE_DAEMON_INPIPE))
 115+ {
 116+ return array('status' => RECODE_DAEMON_STATUS_ERR, 'comment' => "Daemon communication error: Couldn't establish input pipe.");
 117+ }
 118+
 119+ //wait for the daemon to come up on the output fifo
 120+
 121+ $retval = null;
 122+ echo 'starting daemon ';
 123+ $retval = popen('php ./recoded.php interactive', 'r');
 124+ echo 'daemon is independent ';
 125+ if(! is_resource($retval))
 126+ {
 127+ spewError("Couldn't start recoded! Popen failed, check your PHP CLI installation?");
 128+ } else {
 129+ echo 'notify opening outpipe for reading';
 130+ $outfh = fopen(RECODE_DAEMON_OUTPIPE, 'r');
 131+ echo ' notify opening inpipe for writing';
 132+ $infh = fopen(RECODE_DAEMON_INPIPE, 'w');
 133+ echo ' pipes open';
 134+ $junk = array();
 135+ $read = array($outfh);
 136+ $count = stream_select($read, $junk, $junk, 12);
 137+ if(!$count)
 138+ {
 139+ return array('status' => RECODE_DAEMON_STATUS_ERR, 'comment' => "Timeout waiting for daemon startup.");
 140+ } else {
 141+ $data = null;
 142+ $startupMsg = message_readresponse($outfh, $data);
 143+ if(!$startupMsg || !$data)
 144+ {
 145+ return array('status' => RECODE_DAEMON_STATUS_ERR, 'comment' => "Bad message or failed read during daemon startup.");
 146+ } else {
 147+ $m = array();
 148+ if(! preg_match("/pid=(\d*)/", $data, $m))
 149+ {
 150+ return array('status' => RECODE_DAEMON_STATUS_ERR, 'comment' => "Daemon sent unexpected message at startup.");
 151+ }
 152+
 153+ $daemonPid = $m[1];
 154+ }
 155+ }
 156+ }
 157+
 158+ } else {
 159+ $daemonPid = intval(file_get_contents(RECODE_DAEMON_LOCKFILE));
 160+ }
 161+
 162+ if(!$daemonPid)
 163+ {
 164+ return array('status' => RECODE_DAEMON_STATUS_ERR, 'comment' => "Couldn't find daemon's pid.");
 165+ } else {
 166+ //connect pipes, our end
 167+ if(!$outfh)
 168+ {
 169+ //signal daemon to connect to pipes on its end
 170+ posix_kill($daemonPid, SIGUSR1) or die('cant signal :(');
 171+ //shell_exec("/bin/kill -SIGUSR1 " . $daemonPid);
 172+ $outfh = fopen(RECODE_DAEMON_OUTPIPE, 'r');
 173+ }
 174+ if(!$infh)
 175+ {
 176+ $infh = fopen(RECODE_DAEMON_INPIPE, 'w');
 177+ }
 178+ }
 179+ }
 180+
 181+ message_send($infh, $msg);
 182+ $data = null;
 183+ if(message_readresponse($outfh, $data))
 184+ {
 185+ return array('status' => RECODE_DAEMON_STATUS_OK, 'response' => $data);
 186+ } else {
 187+ return array('status' => RECODE_DAEMON_STATUS_ERR, 'comment' => 'Daemon communication fault. ' . $data);
 188+ }
 189+}
 190+
 191+if(!count($_POST))
 192+{
 193+?>
 194+<html>
 195+<body>
 196+<form method="post">
 197+stop: <input type="text" name="stop">
 198+<input type="submit">
 199+</form>
 200+<form method="post">
 201+check queue: <input name="recode" type="submit">
 202+</form>
 203+</body>
 204+</html>
 205+<?php
 206+}
\ No newline at end of file
Property changes on: branches/mikeb/queue/notify.php
___________________________________________________________________
Name: svn:eol-style
1207 + native
Index: branches/mikeb/encoder_example.c.patch
@@ -0,0 +1,721 @@
 2+--- encoder_example.c 2007-07-17 19:00:15.609958500 -0500
 3+@@ -12,6 +12,7 @@
 4+
 5+ function: example encoder application; makes an Ogg Theora/Vorbis
 6+ file from YUV4MPEG2 and WAV input
 7++ last mod: $Id: encoder_example.c 11451 2006-05-28 18:18:26Z illiminable $
 8+
 9+ ********************************************************************/
 10+
 11+@@ -38,9 +39,6 @@
 12+ #include <stdio.h>
 13+ #ifndef WIN32
 14+ #include <unistd.h>
 15+-#include <sys/types.h>
 16+-#include <fcntl.h>
 17+-#include <errno.h>
 18+ #endif
 19+ #include <stdlib.h>
 20+ #include <string.h>
 21+@@ -69,7 +67,7 @@
 22+ }
 23+ #endif
 24+
 25+-const char *optstring = "o:a:A:v:V:s:S:f:F:c";
 26++const char *optstring = "o:a:A:v:V:s:S:f:F:";
 27+ struct option options [] = {
 28+ {"output",required_argument,NULL,'o'},
 29+ {"audio-rate-target",required_argument,NULL,'A'},
 30+@@ -83,37 +81,10 @@
 31+ {NULL,0,NULL,0}
 32+ };
 33+
 34+-struct bufferFifoNodeType
 35+-{
 36+- struct bufferFifoNodeType *next;
 37+- int size;
 38+- char *data; //a de-facto 64k block
 39+-};
 40+-
 41+-struct bufferFifoType
 42+-{
 43+- int totalBytes;
 44+- struct bufferFifoNodeType *head;
 45+- struct bufferFifoNodeType *tail;
 46+-};
 47+-
 48+-typedef struct bufferFifoType bufferFifo;
 49+-
 50+ /* You'll go to Hell for using globals. */
 51+
 52+-int *audio=NULL;
 53+-int audio_fd;
 54+-FILE *audio_cache_fd = NULL;
 55+-
 56+-int *video=NULL;
 57+-int video_fd;
 58+-FILE *video_cache_fd = NULL;
 59+-
 60+-int cache_input = 0;
 61+-FILE *cache_current;
 62+-
 63+-bufferFifo *audio_buffer=NULL;
 64+-bufferFifo *video_buffer=NULL;
 65++FILE *audio=NULL;
 66++FILE *video=NULL;
 67+
 68+ int audio_ch=0;
 69+ int audio_hz=0;
 70+@@ -171,376 +142,8 @@
 71+ exit(1);
 72+ }
 73+
 74+-int bufferFifoPut(bufferFifo *buffer, void *dataToAdd, int dataSize, char func)
 75+-{
 76+- //fprintf(stderr, "In bufferFifoPut\n");
 77+- if(dataSize <= 0) return 0;
 78+-
 79+- //if it will fit in the last node, decide whether to spend time copying
 80+- //or waste the rest of the last node's space and just point to another 64k block.
 81+-
 82+- /*fprintf(stderr, "Buffering ");
 83+- if(buffer == video_buffer)
 84+- {
 85+- fprintf(stderr, "video data...");
 86+- } else if (buffer==audio_buffer)
 87+- {
 88+- fprintf(stderr, "audio data...");
 89+- } else {
 90+- fprintf(stderr, "called by %c\n", func);
 91+- }
 92+- */
 93+-
 94+- int tailSpace;
 95+- if(buffer->tail != NULL)
 96+- {
 97+- //fprintf(stderr, "Accessing tail @ %p\n", buffer->tail);
 98+- tailSpace = 65536 - buffer->tail->size;
 99+- }
 100+- else tailSpace = 65536;
 101+-
 102+- //fprintf(stderr, "new data is %d, tailSpace is %d\n", dataSize, tailSpace);
 103+-
 104+- if(dataSize < 65536 / 2 && tailSpace - dataSize >= 0 && buffer->tail != NULL)
 105+- {
 106+- //fprintf(stderr, "Copying to end of last block...\n");
 107+- char *offset = buffer->tail->data + buffer->tail->size;
 108+- memcpy(offset, dataToAdd, dataSize);
 109+- free(dataToAdd);
 110+- buffer->tail->size += dataSize;
 111+- } else {
 112+- //fprintf(stderr, "Making new tail...\n");
 113+- //we'll create a new node.
 114+- struct bufferFifoNodeType *node = malloc( sizeof( struct bufferFifoNodeType ) );
 115+- //fprintf(stderr, "Building new node @ %p\n", node);
 116+-
 117+- //if we have a tail, link it to the new one
 118+- if(buffer->tail != NULL)
 119+- {
 120+- buffer->tail->next = node;
 121+- //fprintf(stderr, "Linked old tail @ %p to new tail @ %p\n", buffer->tail, node);
 122+- } else {
 123+- buffer->head = node;
 124+- }
 125+-
 126+- buffer->tail = node;
 127+-
 128+- buffer->tail->size = dataSize;
 129+- buffer->tail->data = dataToAdd;
 130+- buffer->tail->next = NULL;
 131+- }
 132+- buffer->totalBytes += dataSize;
 133+- //fprintf(stderr, "leaving bufferFifoPut, %d/%d\n", dataSize, buffer->totalBytes);
 134+- return dataSize;
 135+-}
 136+-
 137+-void waitFor(int *stream)
 138+-{
 139+- fd_set readStreams;
 140+- struct timeval tv;
 141+- int retval;
 142+- int nfds; //highest numbered file descriptor
 143+- if(*video > *audio) nfds = *video; else nfds = *audio;
 144+-
 145+- FD_ZERO(&readStreams);
 146+- FD_SET(*video, &readStreams);
 147+- FD_SET(*audio, &readStreams);
 148+-
 149+- {
 150+- tv.tv_sec = 7;
 151+- tv.tv_usec = 0;
 152+-
 153+- fd_set readyStreams = readStreams;
 154+- retval = select(nfds + 1, &readyStreams, NULL, NULL, &tv);
 155+- if(retval == -1)
 156+- {
 157+- fprintf(stderr, "Error while looking for new data to read.\n");
 158+- exit(1);
 159+- } else if(retval)
 160+- {
 161+- if(FD_ISSET(*stream, &readyStreams))
 162+- {
 163+- fcntl(*audio, F_SETFL, fcntl(*audio, F_GETFL) | O_NONBLOCK);
 164+- fcntl(*video, F_SETFL, fcntl(*video, F_GETFL) | O_NONBLOCK);
 165+- return;
 166+- } else {
 167+- //read the new data off the other stream until we get data on *stream
 168+- int otherfd;
 169+- bufferFifo *otherBuffer;
 170+- if(*video==*stream)
 171+- {
 172+- otherfd = *audio;
 173+- otherBuffer = audio_buffer;
 174+- } else {
 175+- otherfd = *video;
 176+- otherBuffer = video_buffer;
 177+- }
 178+-
 179+- void *readSpace = malloc ( 65536 );
 180+- int status = read(otherfd, readSpace, 65536);
 181+- if(status == -1)
 182+- {
 183+- fprintf(stderr, "Unknown error while trying to buffer more data\n");
 184+- exit(1);
 185+- }
 186+- bufferFifoPut(otherBuffer, readSpace, status, 'w');
 187+- }
 188+- } else {
 189+- fprintf(stderr, "ERROR:1:Timeout waiting for decoder to write new data!");
 190+- exit(1);
 191+- }
 192+- }
 193+-}
 194+-
 195+-int bufferFifoGet(bufferFifo *buffer, void *ptr, int dataSize)
 196+-{
 197+- //fprintf(stderr, "In bufferFifoGet\n");
 198+- int bytesPopped = 0;
 199+-
 200+- while(buffer->head != NULL && buffer->head->size <= dataSize - bytesPopped)
 201+- {
 202+- struct bufferFifoNodeType *currNode = buffer->head;
 203+- memcpy(ptr + bytesPopped, currNode->data, currNode->size);
 204+- bytesPopped += currNode->size;
 205+- free(currNode->data);
 206+-
 207+- buffer->head = currNode->next;
 208+-
 209+- //fprintf(stderr, "Freeing empty node at %p\n", currNode);
 210+- free(currNode);
 211+- }
 212+-
 213+- //at this point, either we've run out of buffered data or we need to add endy bits
 214+- if(buffer->head != NULL)
 215+- {
 216+- int endyBits = dataSize - bytesPopped;
 217+-
 218+- memcpy(ptr + bytesPopped, buffer->head->data, endyBits);
 219+- bytesPopped += endyBits;
 220+- buffer->head->size -= endyBits;
 221+- memmove(buffer->head->data, buffer->head->data + endyBits, buffer->head->size);
 222+- } else {
 223+- //we've consumed the tail node.
 224+- buffer->tail = NULL;
 225+- }
 226+-
 227+- buffer->totalBytes -= bytesPopped;
 228+- //fprintf(stderr, "Out bufferFifoGet\n");
 229+- if(buffer->tail != NULL)
 230+- {
 231+- //fprintf(stderr, "Tail @ %p\n", buffer->tail);
 232+- }
 233+-
 234+- return bytesPopped;
 235+-}
 236+-
 237+-int fiforead_hlpr(void *ptr, int totalNeeded, int *stream)
 238+-{
 239+- int bytesRead = 0;
 240+- int readStatus = 0;
 241+- int againCount = 0;
 242+- while(bytesRead < totalNeeded)
 243+- {
 244+- readStatus = read(*stream, ptr + bytesRead, totalNeeded - bytesRead);
 245+- //fprintf(stderr, "Read %d/%d bytes in fiforead_hlpr\n", readStatus, totalNeeded - bytesRead);
 246+- if(readStatus == -1)
 247+- {
 248+- if(errno == EAGAIN)
 249+- {
 250+- readStatus = 0; // for the benefit of the subtraction
 251+- if(againCount > 0 && againCount < 160)
 252+- {
 253+- //fprintf(stderr, "Multiple EAGAIN condition %d :(\n", againCount);
 254+- //don't try to read another page, there won't ever be any
 255+- } else if(againCount == 160)
 256+- {
 257+- /*we're not getting anywhere -- nothing's actively writing to the fifo.
 258+- * Only hope now is to put it back in blocking mode and hope the kernel
 259+- * wakes us back up someday...
 260+- */
 261+- waitFor(stream);
 262+-
 263+- //how about that...we're back!
 264+- againCount = 0;
 265+- continue;
 266+- }
 267+-
 268+- //could the problem be that the other fifo is full?
 269+- //get other stream
 270+- int other;
 271+- //fprintf(stderr, "Polling for data to buffer from other stream ");
 272+- if(stream == video)
 273+- {
 274+- other = *audio;
 275+- //fprintf(stderr, "(audio)\n");
 276+- } else {
 277+- other = *video;
 278+- //fprintf(stderr, "(video)\n");
 279+- }
 280+- char test;
 281+- int readOther = read(other, &test, 1);
 282+- if(readOther == -1)
 283+- {
 284+- if(errno == EAGAIN)
 285+- {
 286+- //fprintf(stderr, "Both input streams empty, waiting for more data...\n");
 287+- //nope, both fifo's empty. block target stream and wait.
 288+- waitFor(stream);
 289+- continue;
 290+- } else {
 291+- fprintf(stderr, "Error while buffering non-target stream. Code %d\n", errno);
 292+- exit(1);
 293+- }
 294+- } else {
 295+- //flush a bunch more out of the other named pipe.
 296+- char *readSpace = malloc( 65536 );
 297+- *readSpace = test;
 298+- //define other
 299+- readOther = read(other, readSpace + 1, 65535);
 300+- if(readOther == -1)
 301+- {
 302+- if(errno != EAGAIN)
 303+- {
 304+- fprintf(stderr, "Error while buffering non-target stream. Code %d\n", errno);
 305+- } else {
 306+- readOther = 0;
 307+- }
 308+- }
 309+- bufferFifo *otherBuff;
 310+- if(stream == video) otherBuff = audio_buffer; else otherBuff = video_buffer;
 311+- bufferFifoPut(otherBuff, readSpace, readOther + 1, 'h');
 312+-
 313+- //wait 1/50th sec for more data on the target stream
 314+- struct timespec remaining;
 315+- struct timespec wait = {0, 20000000};
 316+- nanosleep(&wait, &remaining );
 317+- }
 318+- againCount++;
 319+- } else {
 320+- fprintf(stderr, "Error reading from stream. Error code %d\n", errno);
 321+- exit(1);
 322+- }
 323+- } else if(readStatus == 0){
 324+- break; //actual eof.
 325+- } else {
 326+- bytesRead += readStatus;
 327+- againCount = 0;
 328+- }
 329+- } // end while
 330+- //fprintf(stderr, "Leaving fiforead_hlpr with %d bytes\n", bytesRead);
 331+- return bytesRead;
 332+-}
 333+-
 334+-int fiforead(void *ptr, int size, int nItems, int *stream)
 335+-{
 336+- int bufferedRead = 0;
 337+- int retval;
 338+- int cache_status;
 339+-
 340+- if(video != NULL && *stream == *video)
 341+- {
 342+- //check for buffered data
 343+- bufferedRead = bufferFifoGet(video_buffer, ptr, size * nItems);
 344+- if(bufferedRead < size * nItems)
 345+- {
 346+- //fprintf(stderr, "Reading from video stream...\n");
 347+- //fiforead_hlpr readBytes = [size * nItems - sizeof(buffer)] from fifo:
 348+- retval = bufferedRead + fiforead_hlpr(ptr + bufferedRead, size * nItems - bufferedRead, video);
 349+- } else {
 350+- retval = bufferedRead;
 351+- }
 352+-
 353+- if(cache_input)
 354+- {
 355+- cache_status = fwrite(ptr, 1, retval, video_cache_fd);
 356+- if( cache_status != retval)
 357+- {
 358+- fprintf(stderr, "ERROR:2:Failed writing video to decompressed cache. ");
 359+- cache_input = 0;
 360+- }
 361+- }
 362+-
 363+- return retval;
 364+- } else if(audio != NULL && *stream == *audio)
 365+- {
 366+- //fprintf(stderr, "Reading from audio stream...\n");
 367+- bufferedRead = bufferFifoGet(audio_buffer, ptr, size * nItems);
 368+- if(bufferedRead < size * nItems)
 369+- {
 370+- retval = bufferedRead + fiforead_hlpr(ptr + bufferedRead, size * nItems - bufferedRead, audio);
 371+- } else {
 372+- retval = bufferedRead;
 373+- }
 374+-
 375+- if(cache_input)
 376+- {
 377+- cache_status = fwrite(ptr, 1, retval, audio_cache_fd);
 378+- if( cache_status != retval)
 379+- {
 380+- fprintf(stderr, "ERROR:2:Failed writing audio to decompressed cache. ");
 381+- cache_input = 0;
 382+- }
 383+- }
 384+-
 385+- return retval;
 386+- } else {
 387+- //the stream hasn't yet been identified. Duplicate fread functionality to get all bits
 388+- int unbufferedRead = 0;
 389+- int rstatus;
 390+- while(unbufferedRead < size * nItems)
 391+- {
 392+- rstatus = read(*stream, ptr, size * nItems - unbufferedRead);
 393+- if(rstatus < 0)
 394+- {
 395+- fprintf(stderr, "Error while reading stream of unknown type. Error code %d\n", errno);
 396+- break;
 397+- } else if(rstatus == 0)
 398+- {
 399+- fprintf(stderr, "End of file while reading stream of unknown type\n");
 400+- if(cache_input) close(cache_input);
 401+- break;
 402+- } else {
 403+- unbufferedRead += rstatus;
 404+-
 405+- fd_set readStreams;
 406+- struct timeval tv;
 407+- int retval;
 408+-
 409+- FD_ZERO(&readStreams);
 410+- FD_SET(*stream, &readStreams);
 411+- tv.tv_sec = 15;
 412+- tv.tv_usec = 0;
 413+-
 414+- retval = select(*stream + 1, &readStreams, NULL, NULL, &tv);
 415+- if(retval == -1)
 416+- {
 417+- fprintf(stderr, "Error while looking for new data to read.\n");
 418+- exit(1);
 419+- } else if(retval == 0)
 420+- {
 421+- fprintf(stderr, "ERROR:1:Timeout waiting for decompressed data stream (id_file stage).\n");
 422+- exit(1);
 423+- }
 424+- }
 425+- }
 426+-
 427+- if(cache_input)
 428+- {
 429+- cache_status = fwrite(ptr, 1, unbufferedRead, cache_current);
 430+- if(cache_status != unbufferedRead)
 431+- {
 432+- fprintf(stderr, "ERROR:2:Could not write to decompressed cache.\n");
 433+- cache_input = 0;
 434+- }
 435+- }
 436+-
 437+- //fprintf(stderr, "Read %d bytes on unidentified stream\n", unbufferedRead);
 438+- return unbufferedRead;
 439+- }
 440+-}
 441+-
 442+ static void id_file(char *f){
 443+- int testFileDescriptor = -1, *test = &testFileDescriptor;
 444++ FILE *test;
 445+ unsigned char buffer[80];
 446+ int ret;
 447+ int tmp_video_hzn, tmp_video_hzd, tmp_video_an, tmp_video_ad;
 448+@@ -550,32 +153,16 @@
 449+
 450+ if(!strcmp(f,"-")){
 451+ /* stdin */
 452+- testFileDescriptor=0;
 453++ test=stdin;
 454+ }else{
 455+- testFileDescriptor=open(f,O_RDONLY);
 456+- if(testFileDescriptor == -1){
 457++ test=fopen(f,"rb");
 458++ if(!test){
 459+ fprintf(stderr,"Unable to open file %s.\n",f);
 460+ exit(1);
 461+- } else {
 462+- // below may be redundant since slow devices are nonblocking
 463+- fcntl(testFileDescriptor, F_SETFL, fcntl(testFileDescriptor, F_GETFL) & ~O_NONBLOCK);
 464+ }
 465+ }
 466+
 467+- if(cache_input)
 468+- {
 469+- char *cacheName = malloc(strlen(f) + 6 + 1);
 470+- strcat(cacheName, "cache_");
 471+- strcat(cacheName, f);
 472+- cache_current = fopen(cacheName, "w+b");
 473+- if(cache_input == -1)
 474+- {
 475+- fprintf(stderr, "ERROR:2:Unable to open decompression cache file %s\n", cacheName);
 476+- cache_input = 0;
 477+- }
 478+- }
 479+-
 480+- ret=fiforead(buffer, 1, 4, test);
 481++ ret=fread(buffer,1,4,test);
 482+ if(ret<4){
 483+ fprintf(stderr,"EOF determining file type of file %s.\n",f);
 484+ exit(1);
 485+@@ -592,18 +179,19 @@
 486+
 487+ /* Parse the rest of the header */
 488+
 489+- ret=fiforead(buffer, 1, 4, test);
 490+- ret=fiforead(buffer, 1, 4, test);
 491++ ret=fread(buffer,1,4,test);
 492++ ret=fread(buffer,1,4,test);
 493+ if(ret<4)goto riff_err;
 494+ if(!memcmp(buffer,"WAVE",4)){
 495+
 496+- while( (ret=fiforead(buffer, 1, 4, test)) && ret != 0){
 497++ while(!feof(test)){
 498++ ret=fread(buffer,1,4,test);
 499+ if(ret<4)goto riff_err;
 500+ if(!memcmp("fmt",buffer,3)){
 501+
 502+ /* OK, this is our audio specs chunk. Slurp it up. */
 503+
 504+- ret=fiforead(buffer, 1, 20, test);
 505++ ret=fread(buffer,1,20,test);
 506+ if(ret<20)goto riff_err;
 507+
 508+ extra_hdr_bytes = (buffer[0] + (buffer[1] << 8) +
 509+@@ -615,9 +203,7 @@
 510+ exit(1);
 511+ }
 512+
 513+- audio_fd = testFileDescriptor;
 514+- audio=&audio_fd;
 515+- audio_cache_fd = cache_current;
 516++ audio=test;
 517+ audio_ch=buffer[6]+(buffer[7]<<8);
 518+ audio_hz=buffer[8]+(buffer[9]<<8)+
 519+ (buffer[10]<<16)+(buffer[11]<<24);
 520+@@ -631,29 +217,27 @@
 521+ while(extra_hdr_bytes){
 522+ int read_size = (extra_hdr_bytes > sizeof(buffer)) ?
 523+ sizeof(buffer) : extra_hdr_bytes;
 524+- ret = fiforead(buffer, 1, read_size, test);
 525++ ret = fread(buffer, 1, read_size, test);
 526+
 527+ if (ret < read_size)
 528+ goto riff_err;
 529+ else
 530+ extra_hdr_bytes -= read_size;
 531+ }
 532+- fprintf(stderr, "Aligning to data...\n");
 533++
 534+ /* Now, align things to the beginning of the data */
 535+ /* Look for 'dataxxxx' */
 536+- while( (ret=fiforead(buffer, 1, 4, test)) && ret != 0){
 537+- if(ret<4)
 538+- { fprintf(stderr, "ret is %d\n", ret);
 539+- goto riff_err;}
 540++ while(!feof(test)){
 541++ ret=fread(buffer,1,4,test);
 542++ if(ret<4)goto riff_err;
 543+ if(!memcmp("data",buffer,4)){
 544+ /* We're there. Ignore the declared size for now. */
 545+- ret=fiforead(buffer, 1, 4, test);
 546++ ret=fread(buffer,1,4,test);
 547+ if(ret<4)goto riff_err;
 548+
 549+ fprintf(stderr,"File %s is 16 bit %d channel %d Hz RIFF WAV audio.\n",
 550+ f,audio_ch,audio_hz);
 551+
 552+- fcntl(testFileDescriptor, F_SETFL, fcntl(testFileDescriptor, F_GETFL) | O_NONBLOCK);
 553+ return;
 554+ }
 555+ }
 556+@@ -670,7 +254,7 @@
 557+ /* read until newline, or 80 cols, whichever happens first */
 558+ int i;
 559+ for(i=0;i<79;i++){
 560+- ret=fiforead(buffer+i, 1, 1, test);
 561++ ret=fread(buffer+i,1,1,test);
 562+ if(ret<1)goto yuv_err;
 563+ if(buffer[i]=='\n')break;
 564+ }
 565+@@ -711,14 +295,11 @@
 566+ exit(1);
 567+ }
 568+
 569+- video_fd = testFileDescriptor;
 570+- video=&video_fd;
 571+- video_cache_fd = cache_current;
 572++ video=test;
 573+
 574+ fprintf(stderr,"File %s is %dx%d %.02f fps YUV12 video.\n",
 575+ f,frame_x,frame_y,(double)video_hzn/video_hzd);
 576+
 577+- fcntl(testFileDescriptor, F_SETFL, fcntl(testFileDescriptor, F_GETFL) | O_NONBLOCK);
 578+ return;
 579+ }
 580+ }
 581+@@ -742,7 +323,7 @@
 582+ fprintf(stderr,"\r%c",spinascii[spinner]);
 583+ }
 584+
 585+-int fetch_and_process_audio(int *audio,ogg_page *audiopage,
 586++int fetch_and_process_audio(FILE *audio,ogg_page *audiopage,
 587+ ogg_stream_state *vo,
 588+ vorbis_dsp_state *vd,
 589+ vorbis_block *vb,
 590+@@ -760,7 +341,7 @@
 591+ /* read and process more audio */
 592+ signed char readbuffer[4096];
 593+ int toread=4096/2/audio_ch;
 594+- int bytesread=fiforead(readbuffer,1,toread*2*audio_ch,audio);
 595++ int bytesread=fread(readbuffer,1,toread*2*audio_ch,audio);
 596+ int sampread=bytesread/2/audio_ch;
 597+ float **vorbis_buffer;
 598+ int count=0;
 599+@@ -803,7 +384,7 @@
 600+ return audioflag;
 601+ }
 602+
 603+-int fetch_and_process_video(int *video,ogg_page *videopage,
 604++int fetch_and_process_video(FILE *video,ogg_page *videopage,
 605+ ogg_stream_state *to,
 606+ theora_state *td,
 607+ int videoflag){
 608+@@ -850,18 +431,18 @@
 609+
 610+ for(i=state;i<2;i++){
 611+ char c,frame[6];
 612+- int ret=fiforead(frame,1,6,video);
 613++ int ret=fread(frame,1,6,video);
 614+
 615+ /* match and skip the frame header */
 616+ if(ret<6)break;
 617+ if(memcmp(frame,"FRAME",5)){
 618+- fprintf(stderr,"Loss of framing in YUV input data..%s\n", frame);
 619++ fprintf(stderr,"Loss of framing in YUV input data\n");
 620+ exit(1);
 621+ }
 622+ if(frame[5]!='\n'){
 623+ int j;
 624+ for(j=0;j<79;j++)
 625+- if(fiforead(&c,1,1,video)&&c=='\n')break;
 626++ if(fread(&c,1,1,video)&&c=='\n')break;
 627+ if(j==79){
 628+ fprintf(stderr,"Error parsing YUV frame header\n");
 629+ exit(1);
 630+@@ -871,7 +452,7 @@
 631+ /* read the Y plane into our frame buffer with centering */
 632+ line=yuvframe[i]+video_x*frame_y_offset+frame_x_offset;
 633+ for(e=0;e<frame_y;e++){
 634+- ret=fiforead(line,1,frame_x,video);
 635++ ret=fread(line,1,frame_x,video);
 636+ if(ret!=frame_x) break;
 637+ line+=video_x;
 638+ }
 639+@@ -879,7 +460,7 @@
 640+ line=yuvframe[i]+(video_x*video_y)
 641+ +(video_x/2)*(frame_y_offset/2)+frame_x_offset/2;
 642+ for(e=0;e<frame_y/2;e++){
 643+- ret=fiforead(line,1,frame_x/2,video);
 644++ ret=fread(line,1,frame_x/2,video);
 645+ if(ret!=frame_x/2) break;
 646+ line+=video_x/2;
 647+ }
 648+@@ -887,7 +468,7 @@
 649+ line=yuvframe[i]+(video_x*video_y*5/4)
 650+ +(video_x/2)*(frame_y_offset/2)+frame_x_offset/2;
 651+ for(e=0;e<frame_y/2;e++){
 652+- ret=fiforead(line,1,frame_x/2,video);
 653++ ret=fread(line,1,frame_x/2,video);
 654+ if(ret!=frame_x/2) break;
 655+ line+=video_x/2;
 656+ }
 657+@@ -995,16 +576,6 @@
 658+
 659+ #endif
 660+
 661+- audio_buffer = malloc( sizeof(bufferFifo) );
 662+- audio_buffer->totalBytes = 0;
 663+- audio_buffer->head = NULL;
 664+- audio_buffer->tail = NULL;
 665+-
 666+- video_buffer = malloc( sizeof(bufferFifo) );
 667+- video_buffer->totalBytes = 0;
 668+- video_buffer->head = NULL;
 669+- video_buffer->tail = NULL;
 670+-
 671+ while((c=getopt_long(argc,argv,optstring,options,&long_option_index))!=EOF){
 672+ switch(c){
 673+ case 'o':
 674+@@ -1024,11 +595,6 @@
 675+ audio_r=-1;
 676+ break;
 677+
 678+- case 'c':
 679+- //copy input PCM/YUV to another file as it is read in
 680+- cache_input = 1;
 681+- break;
 682+-
 683+ case 'v':
 684+ video_q=rint(atof(optarg)*6.3);
 685+ if(video_q<0 || video_q>63){
 686+@@ -1151,16 +717,13 @@
 687+ /* initialize Vorbis too, assuming we have audio to compress. */
 688+ if(audio){
 689+ vorbis_info_init(&vi);
 690+- if(audio_hz > 192000){
 691+- audio_hz = audio_hz / 100;
 692+- }
 693+ if(audio_q>-99)
 694+ ret = vorbis_encode_init_vbr(&vi,audio_ch,audio_hz,audio_q);
 695+ else
 696+ ret = vorbis_encode_init(&vi,audio_ch,audio_hz,-1,audio_r,-1);
 697+ if(ret){
 698+ fprintf(stderr,"The Vorbis encoder could not set up a mode according to\n"
 699+- "the requested quality or bitrate.\nDetected audio at %d hz,\n%d\n", audio_hz, audio_r);
 700++ "the requested quality or bitrate.\n\n");
 701+ exit(1);
 702+ }
 703+
 704+@@ -1320,17 +883,6 @@
 705+
 706+ if(outfile && outfile!=stdout)fclose(outfile);
 707+
 708+-/*
 709+- if(audio_cache_fd)
 710+- {
 711+- fclose(audio_cache_fd);
 712+- }
 713+-
 714+- if(video_cache_fd)
 715+- {
 716+- fclose(video_cache_fd);
 717+- }
 718+-*/
 719+ fprintf(stderr,"\r \ndone.\n\n");
 720+
 721+ #ifdef THEORA_PERF_DATA
Index: branches/mikeb/encoder_example.c
@@ -69,7 +69,7 @@
7070 }
7171 #endif
7272
73 -const char *optstring = "o:a:A:v:V:s:S:f:F:";
 73+const char *optstring = "o:a:A:v:V:s:S:f:F:c";
7474 struct option options [] = {
7575 {"output",required_argument,NULL,'o'},
7676 {"audio-rate-target",required_argument,NULL,'A'},
@@ -103,10 +103,15 @@
104104
105105 int *audio=NULL;
106106 int audio_fd;
 107+FILE *audio_cache_fd = NULL;
107108
108109 int *video=NULL;
109110 int video_fd;
 111+FILE *video_cache_fd = NULL;
110112
 113+int cache_input = 0;
 114+FILE *cache_current;
 115+
111116 bufferFifo *audio_buffer=NULL;
112117 bufferFifo *video_buffer=NULL;
113118
@@ -231,10 +236,6 @@
232237
233238 void waitFor(int *stream)
234239 {
235 - //set blocking on streams
236 - //fcntl(*video, F_SETFL, fcntl(*video, F_GETFL) & ~O_NONBLOCK);
237 - //fcntl(*audio, F_SETFL, fcntl(*audio, F_GETFL) & ~O_NONBLOCK);
238 -
239240 fd_set readStreams;
240241 struct timeval tv;
241242 int retval;
@@ -285,7 +286,7 @@
286287 bufferFifoPut(otherBuffer, readSpace, status, 'w');
287288 }
288289 } else {
289 - fprintf(stderr, "Timeout waiting for decoder to write new data!");
 290+ fprintf(stderr, "ERROR:1:Timeout waiting for decoder to write new data!");
290291 exit(1);
291292 }
292293 }
@@ -333,8 +334,6 @@
334335 return bytesPopped;
335336 }
336337
337 -int fiforead(void *ptr, int size, int nItems, int *stream);
338 -
339338 int fiforead_hlpr(void *ptr, int totalNeeded, int *stream)
340339 {
341340 int bytesRead = 0;
@@ -435,6 +434,9 @@
436435 int fiforead(void *ptr, int size, int nItems, int *stream)
437436 {
438437 int bufferedRead = 0;
 438+ int retval;
 439+ int cache_status;
 440+
439441 if(video != NULL && *stream == *video)
440442 {
441443 //check for buffered data
@@ -443,20 +445,44 @@
444446 {
445447 //fprintf(stderr, "Reading from video stream...\n");
446448 //fiforead_hlpr readBytes = [size * nItems - sizeof(buffer)] from fifo:
447 - return bufferedRead + fiforead_hlpr(ptr + bufferedRead, size * nItems - bufferedRead, video);
 449+ retval = bufferedRead + fiforead_hlpr(ptr + bufferedRead, size * nItems - bufferedRead, video);
448450 } else {
449 - return bufferedRead;
 451+ retval = bufferedRead;
450452 }
 453+
 454+ if(cache_input)
 455+ {
 456+ cache_status = fwrite(ptr, 1, retval, video_cache_fd);
 457+ if( cache_status != retval)
 458+ {
 459+ fprintf(stderr, "ERROR:2:Failed writing video to decompressed cache. ");
 460+ cache_input = 0;
 461+ }
 462+ }
 463+
 464+ return retval;
451465 } else if(audio != NULL && *stream == *audio)
452466 {
453467 //fprintf(stderr, "Reading from audio stream...\n");
454468 bufferedRead = bufferFifoGet(audio_buffer, ptr, size * nItems);
455469 if(bufferedRead < size * nItems)
456470 {
457 - return bufferedRead + fiforead_hlpr(ptr + bufferedRead, size * nItems - bufferedRead, audio);
 471+ retval = bufferedRead + fiforead_hlpr(ptr + bufferedRead, size * nItems - bufferedRead, audio);
458472 } else {
459 - return bufferedRead;
 473+ retval = bufferedRead;
460474 }
 475+
 476+ if(cache_input)
 477+ {
 478+ cache_status = fwrite(ptr, 1, retval, audio_cache_fd);
 479+ if( cache_status != retval)
 480+ {
 481+ fprintf(stderr, "ERROR:2:Failed writing audio to decompressed cache. ");
 482+ cache_input = 0;
 483+ }
 484+ }
 485+
 486+ return retval;
461487 } else {
462488 //the stream hasn't yet been identified. Duplicate fread functionality to get all bits
463489 int unbufferedRead = 0;
@@ -471,17 +497,43 @@
472498 } else if(rstatus == 0)
473499 {
474500 fprintf(stderr, "End of file while reading stream of unknown type\n");
 501+ if(cache_input) close(cache_input);
475502 break;
476503 } else {
477504 unbufferedRead += rstatus;
478 - /*
479 - //wait 1/50th sec for more data on the target stream
480 - struct timespec remaining;
481 - struct timespec wait = {0, 20000000};
482 - nanosleep(&wait, &remaining );
483 - */
 505+
 506+ fd_set readStreams;
 507+ struct timeval tv;
 508+ int retval;
 509+
 510+ FD_ZERO(&readStreams);
 511+ FD_SET(*stream, &readStreams);
 512+ tv.tv_sec = 15;
 513+ tv.tv_usec = 0;
 514+
 515+ retval = select(*stream + 1, &readStreams, NULL, NULL, &tv);
 516+ if(retval == -1)
 517+ {
 518+ fprintf(stderr, "Error while looking for new data to read.\n");
 519+ exit(1);
 520+ } else if(retval == 0)
 521+ {
 522+ fprintf(stderr, "ERROR:1:Timeout waiting for decompressed data stream (id_file stage).\n");
 523+ exit(1);
 524+ }
484525 }
485526 }
 527+
 528+ if(cache_input)
 529+ {
 530+ cache_status = fwrite(ptr, 1, unbufferedRead, cache_current);
 531+ if(cache_status != unbufferedRead)
 532+ {
 533+ fprintf(stderr, "ERROR:2:Could not write to decompressed cache.\n");
 534+ cache_input = 0;
 535+ }
 536+ }
 537+
486538 //fprintf(stderr, "Read %d bytes on unidentified stream\n", unbufferedRead);
487539 return unbufferedRead;
488540 }
@@ -498,17 +550,31 @@
499551
500552 if(!strcmp(f,"-")){
501553 /* stdin */
502 - testFileDescriptor=3;
 554+ testFileDescriptor=0;
503555 }else{
504556 testFileDescriptor=open(f,O_RDONLY);
505557 if(testFileDescriptor == -1){
506558 fprintf(stderr,"Unable to open file %s.\n",f);
507559 exit(1);
508560 } else {
 561+ // below may be redundant since slow devices are nonblocking
509562 fcntl(testFileDescriptor, F_SETFL, fcntl(testFileDescriptor, F_GETFL) & ~O_NONBLOCK);
510563 }
511564 }
512565
 566+ if(cache_input)
 567+ {
 568+ char *cacheName = malloc(strlen(f) + 6 + 1);
 569+ strcat(cacheName, "cache_");
 570+ strcat(cacheName, f);
 571+ cache_current = fopen(cacheName, "w+b");
 572+ if(cache_input == -1)
 573+ {
 574+ fprintf(stderr, "ERROR:2:Unable to open decompression cache file %s\n", cacheName);
 575+ cache_input = 0;
 576+ }
 577+ }
 578+
513579 ret=fiforead(buffer, 1, 4, test);
514580 if(ret<4){
515581 fprintf(stderr,"EOF determining file type of file %s.\n",f);
@@ -528,19 +594,16 @@
529595
530596 ret=fiforead(buffer, 1, 4, test);
531597 ret=fiforead(buffer, 1, 4, test);
532 - fprintf(stderr, "Read %d bytes\n", ret);
533598 if(ret<4)goto riff_err;
534599 if(!memcmp(buffer,"WAVE",4)){
535600
536601 while( (ret=fiforead(buffer, 1, 4, test)) && ret != 0){
537 - fprintf(stderr, "Read %d bytes\n", ret);
538602 if(ret<4)goto riff_err;
539603 if(!memcmp("fmt",buffer,3)){
540604
541605 /* OK, this is our audio specs chunk. Slurp it up. */
542606
543607 ret=fiforead(buffer, 1, 20, test);
544 - fprintf(stderr, "Read %d bytes\n", ret);
545608 if(ret<20)goto riff_err;
546609
547610 extra_hdr_bytes = (buffer[0] + (buffer[1] << 8) +
@@ -554,6 +617,7 @@
555618
556619 audio_fd = testFileDescriptor;
557620 audio=&audio_fd;
 621+ audio_cache_fd = cache_current;
558622 audio_ch=buffer[6]+(buffer[7]<<8);
559623 audio_hz=buffer[8]+(buffer[9]<<8)+
560624 (buffer[10]<<16)+(buffer[11]<<24);
@@ -649,6 +713,7 @@
650714
651715 video_fd = testFileDescriptor;
652716 video=&video_fd;
 717+ video_cache_fd = cache_current;
653718
654719 fprintf(stderr,"File %s is %dx%d %.02f fps YUV12 video.\n",
655720 f,frame_x,frame_y,(double)video_hzn/video_hzd);
@@ -959,6 +1024,11 @@
9601025 audio_r=-1;
9611026 break;
9621027
 1028+ case 'c':
 1029+ //copy input PCM/YUV to another file as it is read in
 1030+ cache_input = 1;
 1031+ break;
 1032+
9631033 case 'v':
9641034 video_q=rint(atof(optarg)*6.3);
9651035 if(video_q<0 || video_q>63){
@@ -1250,6 +1320,17 @@
12511321
12521322 if(outfile && outfile!=stdout)fclose(outfile);
12531323
 1324+/*
 1325+ if(audio_cache_fd)
 1326+ {
 1327+ fclose(audio_cache_fd);
 1328+ }
 1329+
 1330+ if(video_cache_fd)
 1331+ {
 1332+ fclose(video_cache_fd);
 1333+ }
 1334+*/
12541335 fprintf(stderr,"\r \ndone.\n\n");
12551336
12561337 #ifdef THEORA_PERF_DATA

Status & tagging log