00001 
00002 
00003 
00004 
00005 
00006 
00007 
00008 
00009 
00010 
00011 
00012 
00013 
00014 
00015 
00016 
00017 
00018 
00019 
00020 
00021 
00022 
00023 
00024 
00025 
00026 
00027 
00028 
00029 
00030 
00031 
00032 
00033 
00034 
00035 
00036 
00037 
00038 
00039 
00040 
00041 
00042 
00043 
00044 
00045 
00046 
00047 
00048 
00049 
00050 
00051 
00052 
00053 
00054 
00055 
00056 
00057 
00058 
00059 
00060 
00061 
00062 
00063 
00064 
00065 
00066 
00067 
00068 
00069 
00070 
00071 
00072 
00073 
00074 
00075 
00076 
00077 
00078 
00079 
00080 
00081 
00082 
00083 
00084 
00085 
00086 
00087 
00088 
00089 
00090 #include <sys/types.h>
00091 #include <sys/socket.h>
00092 #include <sys/times.h>
00093 #include <time.h>
00094 #include <netinet/in.h>
00095 #include <unistd.h>
00096 #include <netdb.h>
00097 #include <errno.h>
00098 #include <string.h>
00099 #include <signal.h>
00100 #include "all.h"
00101 #include "param.h"
00102 #include "mpeg.h"
00103 #include "prototypes.h"
00104 #include "parallel.h"
00105 #include "readframe.h"
00106 #include "fsize.h"
00107 #include "combine.h"
00108 #include "frames.h"
00109 
00110 
00111 #define MAX_IO_SERVERS  10
00112 #ifndef SOMAXCONN
00113 #define SOMAXCONN 5
00114 #endif
00115 
00116 
00117 
00118 
00119 
00120 #define TERMINATE_PID_SIGNAL    SIGTERM  
00121 #ifndef MAXARGS
00122 #define MAXARGS         1024   
00123 #endif
00124 
00125 
00126 
00127 
00128 
00129 static int32   diffTime;
00130 static char     rsh[256];
00131 static struct hostent *hostEntry = NULL;
00132 static boolean  *frameDone;
00133 static int      outputServerSocket;
00134 static int      decodeServerSocket;
00135 static boolean  parallelPerfect = FALSE;
00136 static  int     current_max_forked_pid=0;
00137 
00138 
00139 
00140 
00141 
00142 
00143 extern int yuvHeight, yuvWidth;
00144 extern  time_t  timeStart, timeEnd;
00145 extern char     statFileName[256];
00146 extern FILE *statFile;
00147 extern boolean  debugMachines;
00148 extern boolean debugSockets;
00149 int parallelTestFrames = 10;
00150 int parallelTimeChunks = 60;
00151 char *IOhostName;
00152 int ioPortNumber;
00153 int combinePortNumber;
00154 int decodePortNumber;
00155 boolean niceProcesses = FALSE;
00156 boolean forceIalign = FALSE;
00157 int         machineNumber = -1;
00158 boolean remoteIO = FALSE;
00159 boolean separateConversion;
00160 time_t  IOtime = 0;
00161 extern char encoder_name[];
00162 int     ClientPid[MAX_MACHINES+4];
00163 
00164 
00165 
00166 
00167 
00168 static void     TransmitPortNum _ANSI_ARGS_((char *hostName, int portNum,
00169                                                int ioPortNum));
00170 static void     EndIOServer _ANSI_ARGS_((void));
00171 static void SafeRead _ANSI_ARGS_((int fd, char *buf, int nbyte));
00172 static void SafeWrite _ANSI_ARGS_((int fd, char *buf, int nbyte));
00173 static int  CreateListeningSocket _ANSI_ARGS_((int *portNumber));
00174 static int  ConnectToSocket _ANSI_ARGS_((char *machineName, int portNum,
00175                                          struct hostent **hostEnt));
00176 static int safe_fork _ANSI_ARGS_((char *command));
00177 void cleanup_fork _ANSI_ARGS_ ((int dummy));
00178 
00179 
00180 
00181 
00182 
00183 
00184                         
00185 
00186 
00187 
00188 
00189 
00190 
00191 
00192 
00193 
00194 
00195 
00196 
00197 
00198 
00199 
00200 
00201 void
00202 SetIOConvert(separate)
00203     boolean separate;
00204 {
00205     separateConversion = separate;
00206 }
00207 
00208 
00209 
00210 
00211 
00212 
00213 
00214 
00215 
00216 
00217 
00218 
00219 
00220 
00221 void
00222 SetParallelPerfect(val)
00223 boolean val;
00224 {
00225     parallelPerfect = val;
00226 }
00227 
00228 
00229 
00230 
00231 
00232 
00233 
00234 
00235 
00236 
00237 
00238 
00239 
00240 
00241 void
00242 SetRemoteShell(shell)
00243     char *shell;
00244 {
00245     strcpy(rsh, shell);
00246 }
00247 
00248 
00249 
00250 
00251 
00252 
00253 
00254 
00255 
00256 
00257 
00258 
00259 
00260 
00261 void
00262   StartIOServer(numInputFiles, parallelHostName, portNum)
00263 int numInputFiles;
00264 char *parallelHostName;
00265 int portNum;
00266 {
00267   int       ioPortNum;
00268   int       serverSocket;
00269   int       otherSock, otherSize;
00270   struct sockaddr otherSocket;
00271   int32   buffer[8];
00272   boolean       done = FALSE;
00273   int       frameNumber;
00274   MpegFrame *frame;
00275   register int y;
00276   int       numBytes;
00277   unsigned char   *bigBuffer;
00278   unsigned char   smallBuffer[1000];
00279   int       bigBufferSize;
00280   FILE    *filePtr;
00281   uint32  data;
00282   char    inputFileName[1024];
00283   char    fileName[1024];
00284 
00285   bigBufferSize = 0;
00286   bigBuffer = NULL;
00287 
00288   
00289 
00290   serverSocket = CreateListeningSocket(&ioPortNum);
00291 
00292   if ( debugSockets ) {
00293     fprintf(stdout, "====I/O USING PORT %d\n", ioPortNum);
00294   }
00295 
00296   TransmitPortNum(parallelHostName, portNum, ioPortNum);
00297 
00298   otherSize = sizeof(otherSocket);
00299 
00300   if ( separateConversion ) {
00301     SetFileType(ioConversion);  
00302   } else {
00303     SetFileType(inputConversion);
00304   }
00305 
00306   
00307   while ( ! done ) {
00308     otherSock = accept(serverSocket, &otherSocket, &otherSize);
00309     if ( otherSock == -1 ) {
00310       fprintf(stderr, "ERROR:  I/O SERVER accept returned error %d\n", errno);
00311       exit(1);
00312     }
00313 
00314     SafeRead(otherSock, (char *)buffer, 4);
00315     frameNumber = ntohl(buffer[0]);
00316 
00317     if ( frameNumber == -1 ) {
00318       done = TRUE;
00319     } else if ( frameNumber == -2 ) {
00320       
00321       SafeRead(otherSock, (char *)buffer, 4);
00322       frameNumber = ntohl(buffer[0]);       
00323 
00324       if ( debugSockets ) {
00325         fprintf(stdout, "INPUT SERVER:  GETTING DECODED FRAME %d\n", frameNumber);
00326         fflush(stdout);
00327       }
00328 
00329       
00330       frame = Frame_New(frameNumber, 'i');
00331 
00332       Frame_AllocDecoded(frame, TRUE);
00333 
00334       for ( y = 0; y < Fsize_y; y++ ) {
00335         SafeRead(otherSock, (char *)frame->decoded_y[y], Fsize_x);
00336       }
00337 
00338       for (y = 0; y < (Fsize_y >> 1); y++) { 
00339         SafeRead(otherSock, (char *)frame->decoded_cb[y], (Fsize_x >> 1));
00340       }
00341 
00342       for (y = 0; y < (Fsize_y >> 1); y++) { 
00343         SafeRead(otherSock, (char *)frame->decoded_cr[y], (Fsize_x >> 1));
00344       }
00345 
00346       
00347       WriteDecodedFrame(frame);
00348 
00349       Frame_Free(frame);
00350     } else if ( frameNumber == -3 ) {
00351       
00352       SafeRead(otherSock, (char *)buffer, 4);
00353       frameNumber = ntohl(buffer[0]);       
00354 
00355       if ( debugSockets ) {
00356         fprintf(stdout, "INPUT SERVER:  READING DECODED FRAME %d from DISK\n", frameNumber);
00357         fflush(stdout);
00358       }
00359 
00360       
00361       frame = Frame_New(frameNumber, 'i');
00362 
00363       Frame_AllocDecoded(frame, TRUE);
00364 
00365       ReadDecodedRefFrame(frame, frameNumber);
00366 
00367       
00368       for ( y = 0; y < Fsize_y; y++ ) {
00369         SafeWrite(otherSock, (char *)frame->decoded_y[y], Fsize_x);
00370       }
00371 
00372       for (y = 0; y < (Fsize_y >> 1); y++) { 
00373         SafeWrite(otherSock, (char *)frame->decoded_cb[y], (Fsize_x >> 1));
00374       }
00375 
00376       for (y = 0; y < (Fsize_y >> 1); y++) { 
00377         SafeWrite(otherSock, (char *)frame->decoded_cr[y], (Fsize_x >> 1));
00378       }
00379 
00380       Frame_Free(frame);
00381     } else if ( frameNumber == -4 ) {
00382       
00383       SafeRead(otherSock, (char *)buffer, 8);
00384       frameNumber = buffer[0];
00385       frameNumber = ntohl(frameNumber);
00386 
00387       
00388       numBytes = buffer[1];
00389       numBytes = ntohl(numBytes);
00390 
00391       
00392       if ( numBytes > bigBufferSize ) {
00393         bigBufferSize = numBytes;
00394         if ( bigBuffer != NULL ) {
00395           free(bigBuffer);
00396         }
00397 
00398         bigBuffer = (unsigned char *) malloc(bigBufferSize*
00399                                              sizeof(unsigned char));
00400       }
00401 
00402       
00403       SafeRead(otherSock, (char *) bigBuffer, numBytes);
00404 
00405       
00406       sprintf(fileName, "%s.frame.%d", outputFileName, frameNumber);
00407       if ( (filePtr = fopen(fileName, "wb")) == NULL ) {
00408         fprintf(stderr, "ERROR:  Could not open output file(3):  %s\n",
00409                 fileName);
00410         exit(1);
00411       }
00412 
00413       
00414       fwrite(bigBuffer, sizeof(char), numBytes, filePtr);
00415 
00416       fclose(filePtr);
00417 
00418       if ( debugSockets ) {
00419         fprintf(stdout, "====I/O SERVER:  WROTE FRAME %d to disk\n",
00420                 frameNumber);
00421         fflush(stdout);
00422       }
00423     } else {
00424       if ( debugSockets ) {
00425         fprintf(stdout, "I/O SERVER GETTING FRAME %d\n", frameNumber);
00426         fflush(stdout);
00427       }
00428 
00429       
00430       frame = Frame_New(frameNumber, 'i');
00431 
00432       if ( separateConversion ) {
00433         GetNthInputFileName(inputFileName, frameNumber);
00434 
00435         
00436         filePtr = ReadIOConvert(inputFileName);
00437         do {
00438           numBytes = fread(smallBuffer, 1, 1000, filePtr);
00439 
00440           if ( numBytes > 0 ) {
00441             data = numBytes;
00442             data = htonl(data);
00443             SafeWrite(otherSock, (char *)&data, 4);
00444             SafeWrite(otherSock, (char *)smallBuffer, numBytes);
00445           }
00446         }
00447         while ( numBytes == 1000 );
00448 
00449         if ( strcmp(ioConversion, "*") == 0 ) {
00450           fclose(filePtr);
00451         } else {
00452           pclose(filePtr);
00453         }
00454       } else {
00455         GetNthInputFileName(inputFileName, frameNumber);
00456         ReadFrame(frame, inputFileName, inputConversion, TRUE);
00457 
00458         
00459         for (y = 0; y < yuvHeight; y++) { 
00460           SafeWrite(otherSock, (char *)frame->orig_y[y], yuvWidth);
00461         }
00462 
00463         for (y = 0; y < (yuvHeight >> 1); y++) { 
00464           SafeWrite(otherSock, (char *)frame->orig_cb[y], yuvWidth >> 1);
00465         }
00466 
00467         for (y = 0; y < (yuvHeight >> 1); y++) { 
00468           SafeWrite(otherSock, (char *)frame->orig_cr[y], yuvWidth >> 1);
00469         }
00470 
00471         
00472 
00473         SafeRead(otherSock, (char *)buffer, 4);
00474         
00475       }
00476 
00477       if ( debugSockets ) {
00478         fprintf(stdout, "====I/O SERVER:  READ FRAME %d\n",
00479                 frameNumber);
00480       }
00481 
00482       Frame_Free(frame);
00483     }
00484 
00485     close(otherSock);
00486   }
00487 
00488   close(serverSocket);
00489 
00490   if ( debugSockets ) {
00491     fprintf(stdout, "====I/O SERVER:  Shutting Down\n");
00492   }
00493 }
00494 
00495 
00496 
00497 
00498 
00499 
00500 
00501 
00502 
00503 
00504 
00505 
00506 
00507 
00508 void
00509 SendRemoteFrame(frameNumber, bb)
00510     int frameNumber;
00511     BitBucket *bb;
00512 {
00513     int clientSocket;
00514     u_long  data;
00515     int     negativeFour = -4;
00516     time_t  tempTimeStart, tempTimeEnd;
00517 
00518     time(&tempTimeStart);
00519 
00520     clientSocket = ConnectToSocket(IOhostName, ioPortNumber, &hostEntry);
00521 
00522     data = htonl(negativeFour);
00523     SafeWrite(clientSocket, (char *)&data, 4);
00524 
00525     data = htonl(frameNumber);
00526     SafeWrite(clientSocket, (char *)&data, 4);
00527 
00528     if ( frameNumber != -1 ) {
00529         
00530         data = (bb->totalbits+7)>>3;
00531         data = htonl(data);
00532         SafeWrite(clientSocket, (char *)&data, 4);
00533 
00534         
00535         Bitio_WriteToSocket(bb, clientSocket);
00536     }
00537 
00538     close(clientSocket);
00539 
00540     time(&tempTimeEnd);
00541     IOtime += (tempTimeEnd-tempTimeStart);
00542 }
00543 
00544 
00545 
00546 
00547 
00548 
00549 
00550 
00551 
00552 
00553 
00554 
00555 
00556 
00557 
00558 
00559 void
00560 NoteFrameDone(frameStart, frameEnd)
00561     int frameStart;
00562     int frameEnd;
00563 {
00564     int clientSocket;
00565     u_long  data;
00566     int     negativeTwo = -2;
00567     time_t  tempTimeStart, tempTimeEnd;
00568 
00569     time(&tempTimeStart);
00570 
00571     clientSocket = ConnectToSocket(IOhostName, combinePortNumber, &hostEntry);
00572 
00573     data = negativeTwo;
00574     data = htonl(negativeTwo);
00575     SafeWrite(clientSocket, (char *)&data, 4);
00576 
00577     data = htonl(frameStart);
00578     SafeWrite(clientSocket, (char *)&data, 4);
00579 
00580     data = htonl(frameEnd);
00581     SafeWrite(clientSocket, (char *)&data, 4);
00582 
00583     close(clientSocket);
00584 
00585     time(&tempTimeEnd);
00586     IOtime += (tempTimeEnd-tempTimeStart);
00587 }
00588 
00589 
00590 
00591 
00592 
00593 
00594 
00595 
00596 
00597 
00598 
00599 
00600 
00601 void
00602   GetRemoteFrame(frame, frameNumber)
00603 MpegFrame *frame;
00604 int frameNumber;
00605 {
00606   FILE    *filePtr;
00607   int   clientSocket;
00608   unsigned char   smallBuffer[1000];
00609   register int y;
00610   int       numBytes;
00611   u_long  data;
00612   char    fileName[256];
00613 
00614   Fsize_Note(frameNumber, yuvWidth, yuvHeight);
00615 
00616   if ( debugSockets ) {
00617     fprintf(stdout, "MACHINE %s REQUESTING connection for FRAME %d\n",
00618             getenv("HOST"), frameNumber);
00619     fflush(stdout);
00620   }
00621 
00622   clientSocket = ConnectToSocket(IOhostName, ioPortNumber, &hostEntry);
00623 
00624   data = frameNumber;
00625   data = htonl(data);
00626   SafeWrite(clientSocket, (char *)&data, 4);
00627 
00628   if ( frameNumber != -1 ) {
00629     if ( separateConversion ) {
00630       sprintf(fileName, "/tmp/foobar%d", machineNumber);
00631       filePtr = fopen(fileName, "wb");
00632 
00633       
00634       do {
00635         SafeRead(clientSocket, (char *)&numBytes, 4);
00636         numBytes = ntohl(numBytes);
00637 
00638         SafeRead(clientSocket, (char *)smallBuffer, numBytes);
00639 
00640         fwrite(smallBuffer, 1, numBytes, filePtr);
00641       } while ( numBytes == 1000 );
00642       fflush(filePtr);
00643       fclose(filePtr);
00644 
00645       
00646       ReadFrame(frame, fileName, slaveConversion, FALSE);
00647     } else {
00648       Frame_AllocYCC(frame);
00649 
00650       if ( debugSockets ) {
00651         fprintf(stdout, "MACHINE %s allocated YCC FRAME %d\n",
00652                 getenv("HOST"), frameNumber);
00653         fflush(stdout);
00654       }
00655 
00656       
00657       for (y = 0; y < yuvHeight; y++) { 
00658         SafeRead(clientSocket, (char *)frame->orig_y[y], yuvWidth);
00659       }
00660 
00661       for (y = 0; y < (yuvHeight >> 1); y++) { 
00662         SafeRead(clientSocket, (char *)frame->orig_cb[y], yuvWidth>>1);
00663       }
00664 
00665       for (y = 0; y < (yuvHeight >> 1); y++) { 
00666         SafeRead(clientSocket, (char *)frame->orig_cr[y], yuvWidth>>1);
00667       }
00668     }
00669   }
00670 
00671   data = 0;
00672   data = htonl(data);
00673   SafeWrite(clientSocket, (char *)&data, 4);
00674 
00675   close(clientSocket);
00676 
00677   if ( debugSockets ) {
00678     fprintf(stdout, "MACHINE %s READ COMPLETELY FRAME %d\n",
00679             getenv("HOST"), frameNumber);
00680     fflush(stdout);
00681   }
00682 }
00683 
00684 
00685 
00686 
00687 
00688 
00689 
00690 
00691 
00692 
00693 
00694 
00695 
00696 
00697 
00698 void
00699   StartCombineServer(numInputFiles, outputFileName, parallelHostName, portNum)
00700 int numInputFiles;
00701 char *outputFileName;
00702 char *parallelHostName;
00703 int portNum;
00704 {
00705   int       combinePortNum;
00706   FILE    *ofp;
00707   
00708   
00709   
00710   outputServerSocket = CreateListeningSocket(&combinePortNum);
00711   
00712   if ( debugSockets ) {
00713     fprintf(stdout, "====OUTPUT USING PORT %d\n", combinePortNum);
00714   }
00715   
00716   TransmitPortNum(parallelHostName, portNum, combinePortNum);
00717   
00718   frameDone = (boolean *) malloc(numInputFiles*sizeof(boolean));
00719   memset((char *)frameDone, 0, numInputFiles*sizeof(boolean));
00720   
00721   if ( (ofp = fopen(outputFileName, "wb")) == NULL ) {
00722     fprintf(stderr, "ERROR:  Could not open output file!!\n");
00723     fflush(stderr);
00724     exit(1);
00725   }
00726   FramesToMPEG(numInputFiles, outputFileName, ofp, TRUE);
00727   
00728   if ( debugSockets ) {
00729     fprintf(stdout, "====COMBINE SERVER:  Shutting Down\n");
00730     fflush(stdout);
00731   }
00732   
00733   
00734   TransmitPortNum(parallelHostName, portNum, combinePortNum);
00735   
00736   close(outputServerSocket);
00737 }
00738 
00739 
00740 
00741 
00742 
00743 
00744 
00745 
00746 
00747 
00748 
00749 
00750 
00751 
00752 void
00753   WaitForOutputFile(number)
00754 int number;
00755 {
00756   int       otherSock;
00757   static int otherSize = sizeof(struct sockaddr);
00758   struct sockaddr otherSocket;
00759   int       frameNumber;
00760   int32   buffer[8];
00761   int frameStart, frameEnd;
00762 
00763   while ( ! frameDone[number] ) {
00764     otherSock = accept(outputServerSocket, &otherSocket, &otherSize);
00765     if ( otherSock == -1 ) {
00766       fprintf(stderr, "ERROR:  Combine SERVER accept returned error %d\n", errno);
00767       exit(1);
00768     }
00769 
00770     SafeRead(otherSock, (char *)buffer, 4);
00771     frameNumber = ntohl(buffer[0]);
00772 
00773     if ( frameNumber == -2 ) {
00774       
00775 
00776       SafeRead(otherSock, (char *)buffer, 8);
00777       frameStart = buffer[0];
00778       frameStart = ntohl(frameStart);
00779       frameEnd = buffer[1];
00780       frameEnd = ntohl(frameEnd);
00781 
00782       for ( frameNumber = frameStart; frameNumber <= frameEnd;
00783            frameNumber++ ) {
00784         frameDone[frameNumber] = TRUE;
00785       }
00786     }
00787 
00788     close(otherSock);
00789   }
00790 
00791   if ( debugSockets ) {
00792     fprintf(stdout, "WAIT FOR FRAME %d over\n", number);
00793     fflush(stdout);
00794   }
00795 }
00796 
00797 
00798 
00799 
00800 
00801 
00802 
00803 
00804 
00805 
00806 
00807 
00808 
00809 
00810 
00811 
00812 
00813 
00814 void
00815   StartMasterServer(numInputFiles, paramFile, outputFileName)
00816 int numInputFiles;
00817 char *paramFile;
00818 char *outputFileName;
00819 {
00820   FILE    *filePtr;
00821   register int ind, ind2;
00822   int       framesPerMachine;
00823   char    command[1024];
00824   char    *hostName;
00825   int       portNum;
00826   int       serverSocket;
00827   boolean finished[MAX_MACHINES];
00828   int       numFinished;
00829   int       otherSock, otherSize;
00830   struct sockaddr otherSocket;
00831   int       seconds;
00832   int32   buffer[8];
00833   int ioPortNum[MAX_IO_SERVERS];
00834   int       combinePortNum, decodePortNum;
00835   int       nextFrame;
00836   int       startFrames[MAX_MACHINES];
00837   int       numFrames[MAX_MACHINES];
00838   int       lastNumFrames[MAX_MACHINES];
00839   int       numSeconds[MAX_MACHINES];
00840   float   fps[MAX_MACHINES];
00841   int       numMachinesToEstimate;
00842   float   framesPerSecond;
00843   float   totalFPS, localFPS;
00844   int       framesDone;
00845   float   avgFPS;
00846   char    niceNess[256];
00847   int32   startFrame, endFrame;
00848   int numInputPorts = 0;
00849   int   numRemote = SOMAXCONN;
00850   int totalRemote = 0;
00851   time_t  startUpBegin, startUpEnd;
00852   time_t  shutDownBegin, shutDownEnd;
00853   float   timeChunk;
00854 
00855   time(&startUpBegin);
00856 
00857   if ( niceProcesses ) {
00858     sprintf(niceNess, "nice");
00859   } else {
00860     niceNess[0] = '\0';
00861   }
00862 
00863   time(&timeStart);
00864 
00865   PrintStartStats(-1, 0);
00866 
00867   
00868   hostName = getenv("HOST");
00869 
00870   if ( hostName == NULL ) {
00871     fprintf(stderr, "ERROR:  Set HOST environment variable\n");
00872     exit(1);
00873   }
00874 
00875   hostEntry = gethostbyname(hostName);
00876   if ( hostEntry == NULL ) {
00877     fprintf(stderr, "ERROR:  Could not find host %s in database\n",
00878             hostName);
00879     exit(1);
00880   }
00881 
00882   hostName = hostEntry->h_name;
00883 
00884   serverSocket = CreateListeningSocket(&portNum);
00885   if ( debugSockets ) {
00886     fprintf(stdout, "---USING PORT %d\n", portNum);
00887   }
00888 
00889   
00890   sprintf(command, "%s -max_machines %d -output_server %s %d %d %s",
00891           encoder_name, numMachines, hostName, portNum, numInputFiles, paramFile);
00892   safe_fork(command);
00893 
00894   
00895   otherSize = sizeof(otherSocket);
00896   otherSock = accept(serverSocket, &otherSocket, &otherSize);
00897   if ( otherSock == -1 ) {
00898     fprintf(stderr, "ERROR:  MASTER SERVER accept returned error %d\n", errno);
00899     exit(1);
00900   }
00901 
00902   SafeRead(otherSock, (char *)(&combinePortNum), 4);
00903   combinePortNum = ntohl(combinePortNum);
00904   combinePortNumber = combinePortNum;
00905   close(otherSock);
00906 
00907   if ( debugSockets ) {
00908     fprintf(stdout, "---MASTER SERVER:  Combine port number = %d\n",
00909             combinePortNum);
00910   }
00911 
00912   
00913   if ( referenceFrame == DECODED_FRAME ) {
00914     sprintf(command, "%s -max_machines %d -decode_server %s %d %d %s",
00915             encoder_name, numMachines, hostName, portNum, numInputFiles, paramFile);
00916     safe_fork(command);
00917 
00918     
00919     otherSize = sizeof(otherSocket);
00920     otherSock = accept(serverSocket, &otherSocket, &otherSize);
00921     if ( otherSock == -1 ) {
00922       fprintf(stderr, "ERROR:  MASTER SERVER accept returned error %d\n", errno);
00923       exit(1);
00924     }
00925 
00926     SafeRead(otherSock, (char *)(&decodePortNum), 4);
00927     decodePortNum = ntohl(decodePortNum);
00928     close(otherSock);
00929 
00930     if ( debugSockets ) {
00931       fprintf(stdout, "---MASTER SERVER:  Decode port number = %d\n",
00932               decodePortNum);
00933     }
00934   }
00935 
00936   
00937 
00938   framesPerMachine = numInputFiles/numMachines;
00939 
00940   numFinished = 0;
00941 
00942   
00943   for ( ind = 0; ind < numMachines; ind++ ) {
00944     fps[ind] = -1.0;            
00945     if ( remote[ind] ) {
00946       totalRemote++;
00947     }
00948   }
00949 
00950   
00951   nextFrame = 0;
00952   for ( ind = 0; ind < numMachines; ind++ ) {
00953     if ( (totalRemote != 0) && (numRemote == SOMAXCONN) ) {
00954       
00955       sprintf(command, "%s -max_machines %d -io_server %s %d %s",
00956               encoder_name, numMachines, hostName, portNum, paramFile);
00957       safe_fork(command);
00958 
00959       
00960       otherSize = sizeof(otherSocket);
00961       otherSock = accept(serverSocket, &otherSocket, &otherSize);
00962       if ( otherSock == -1 ) {
00963         fprintf(stderr, "ERROR:  MASTER SERVER accept returned error %d\n", errno);
00964         exit(1);
00965       }
00966 
00967       SafeRead(otherSock, (char *)(&ioPortNum[numInputPorts]), 4);
00968       ioPortNum[numInputPorts] = ntohl(ioPortNum[numInputPorts]);
00969       close(otherSock);
00970 
00971       if ( debugSockets ) {
00972         fprintf(stdout, "---MASTER SERVER:  I/O port number = %d\n",
00973                 ioPortNum[numInputPorts]);
00974       }
00975 
00976       numInputPorts++;
00977       numRemote = 0;
00978     }
00979 
00980     finished[ind] = FALSE;
00981     numSeconds[ind] = 0;
00982 
00983     startFrame = nextFrame;
00984     if ( parallelPerfect ) {
00985       endFrame = startFrame+((numInputFiles-startFrame)/
00986                              (numMachines-ind))  -1;
00987 
00988       if ( forceIalign ) {
00989         while (FType_Type(endFrame) != 'i') {endFrame++;}
00990       }
00991 
00992       
00993       if ( endFrame < startFrame ) {
00994         endFrame = startFrame;
00995       }
00996 
00997       
00998       if ( endFrame >= numInputFiles ) {
00999         endFrame = numInputFiles-1;
01000       }
01001     } else if ( forceIalign ) {
01002       endFrame = startFrame+framePatternLen-1;
01003       while (FType_Type(endFrame) != 'i') {endFrame++;}
01004     } else {
01005       endFrame = startFrame+parallelTestFrames-1;
01006     }
01007             
01008     if ( remote[ind] ) {
01009       sprintf(command, "%s %s -l %s %s %s -child %s %d %d %d %d %d %d -frames %d %d %s",
01010               rsh,
01011               machineName[ind], userName[ind], niceNess,
01012               executable[ind],
01013               hostName, portNum, ioPortNum[numInputPorts-1],
01014               combinePortNum, decodePortNum, ind,
01015               remote[ind],
01016               startFrame, endFrame,
01017               remoteParamFile[ind]);
01018       numRemote++;
01019       totalRemote--;
01020     } else {
01021       sprintf(command, "%s %s -l %s %s %s -child %s %d %d %d %d %d %d -frames %d %d %s",
01022               rsh,
01023               machineName[ind], userName[ind], niceNess,
01024               executable[ind],
01025               hostName, portNum, ioPortNum[numInputPorts-1],
01026               combinePortNum, decodePortNum, ind,
01027               remote[ind],
01028               startFrame, endFrame,
01029               paramFile);
01030     }
01031 
01032     if ( debugMachines ) {
01033       fprintf(stdout, "---%s:  frames %d to %d\n",
01034               machineName[ind],
01035               startFrame, endFrame);
01036     }
01037         
01038 
01039     safe_fork(command);
01040 
01041     nextFrame = endFrame+1;
01042     startFrames[ind] = startFrame;
01043     numFrames[ind] = endFrame-startFrame+1;
01044     lastNumFrames[ind] = endFrame-startFrame+1;
01045   }
01046 
01047   framesDone = 0;
01048 
01049   time(&startUpEnd);
01050 
01051   
01052   while ( numFinished != numMachines ) {
01053     otherSize = sizeof(otherSocket);
01054     otherSock = accept(serverSocket, &otherSocket, &otherSize);
01055     if ( otherSock == -1 ) {
01056       fprintf(stderr, "ERROR:  MASTER SERVER 2 accept returned error %d\n", errno);
01057       exit(1);
01058     }
01059 
01060     SafeRead(otherSock, (char *)buffer, 8);
01061 
01062     ind = ntohl(buffer[0]);
01063     seconds = ntohl(buffer[1]);
01064 
01065     NoteFrameDone(startFrames[ind],
01066                   startFrames[ind]+lastNumFrames[ind]-1);
01067 
01068     numSeconds[ind] += seconds;
01069     fps[ind] = (float)numFrames[ind]/(float)numSeconds[ind];
01070 
01071     if ( seconds != 0 )
01072       framesPerSecond = (float)lastNumFrames[ind]/(float)seconds;
01073     else
01074       framesPerSecond = (float)lastNumFrames[ind]*2.0;
01075 
01076     framesDone += lastNumFrames[ind];
01077 
01078     if ( nextFrame >= numInputFiles ) {
01079       buffer[0] = htonl(-1);
01080       buffer[1] = htonl(0);
01081       SafeWrite(otherSock, (char *)buffer, 8);
01082       numFinished++;
01083 
01084       if ( debugMachines ) {
01085         fprintf(stdout, "---%s FINISHED job (%f fps) (%d/%d done):  DONE\n",
01086                 machineName[ind], framesPerSecond, numFinished,
01087                 numMachines);
01088       }
01089     } else {
01090       if (numSeconds[ind] != 0) {
01091         avgFPS = (float)numFrames[ind]/(float)numSeconds[ind];
01092       } else {
01093         avgFPS = 0.1;           
01094       }
01095 
01096       startFrame = nextFrame;
01097 
01098       if ( parallelTimeChunks == -1 ) { 
01099         
01100         
01101         totalFPS = 0.0;
01102         numMachinesToEstimate = 0;
01103         for ( ind2 = 0; ind2 < numMachines; ind2++ ) {
01104           if ( fps[ind2] < 0.0 ) {
01105             numMachinesToEstimate++;
01106           } else {
01107             totalFPS += fps[ind2];
01108           }
01109         }
01110 
01111         totalFPS = (float)numMachines*
01112           (totalFPS/(float)(numMachines-numMachinesToEstimate));
01113 
01114         timeChunk = (float)(numInputFiles-nextFrame)/totalFPS;
01115 
01116         fprintf(stdout, "ASSIGNING %s %.2f seconds of work\n",
01117                 machineName[ind], timeChunk);
01118         fflush(stdout);
01119         endFrame = nextFrame +
01120           (int)((float)timeChunk*avgFPS) - 1;
01121       } else {
01122         endFrame = nextFrame +
01123           (int)((float)parallelTimeChunks*avgFPS) - 1;
01124       }
01125 
01126       if ( forceIalign ) {
01127         while (FType_Type(endFrame) != 'i') {endFrame++;}
01128       }
01129 
01130       if ( endFrame < startFrame ) { 
01131         endFrame = startFrame;
01132       }
01133       if ( endFrame >= numInputFiles ) {
01134         endFrame = numInputFiles-1;
01135       }
01136 
01137       nextFrame = endFrame+1;
01138 
01139       startFrames[ind] = startFrame;
01140       numFrames[ind] += (endFrame-startFrame+1);
01141       lastNumFrames[ind] = (endFrame-startFrame+1);
01142 
01143       if ( debugMachines ) {
01144         fprintf(stdout, "---%s FINISHED job (%f fps) (%d/%d done):  next:  %d to %d\n",
01145                 machineName[ind], framesPerSecond, numFinished,
01146                 numMachines, startFrame, endFrame);
01147       }
01148 
01149       buffer[0] = htonl(startFrame);
01150       buffer[1] = htonl(endFrame);
01151 
01152       SafeWrite(otherSock, (char *)buffer, 8);
01153     }
01154 
01155     close(otherSock);
01156 
01157     if ( debugMachines ) {
01158       fprintf(stdout, "---FRAMES DONE:  %d\tFARMED OUT:  %d\tLEFT:  %d\n",
01159               framesDone, nextFrame-framesDone, numInputFiles-nextFrame);
01160     }
01161   }
01162 
01163   time(&shutDownBegin);
01164 
01165   
01166   IOhostName = hostName;
01167   for ( ind = 0; ind < numInputPorts; ind++ ) {
01168     ioPortNumber = ioPortNum[ind];
01169     EndIOServer();
01170   }
01171 
01172   
01173   otherSize = sizeof(otherSocket);
01174   otherSock = accept(serverSocket, &otherSocket, &otherSize);
01175   if ( otherSock == -1 ) {
01176     fprintf(stderr, "ERROR:  MASTER SERVER accept returned error %d\n", errno);
01177     exit(1);
01178   }
01179 
01180   SafeRead(otherSock, (char *)buffer, 4);
01181   close(otherSock);
01182     
01183   close(serverSocket);
01184 
01185   time(&timeEnd);
01186   diffTime = (int32)(timeEnd-timeStart);
01187 
01188   time(&shutDownEnd);
01189 
01190   for ( ind2 = 0; ind2 < 2; ind2++ ) {
01191     if ( ind2 == 0 ) {
01192       filePtr = stdout;
01193     } else if ( statFile != NULL ) {
01194       filePtr = statFile;
01195     } else {
01196       continue;
01197     }
01198 
01199     fprintf(filePtr, "\n\n");
01200     fprintf(filePtr, "PARALLEL SUMMARY\n");
01201     fprintf(filePtr, "----------------\n");
01202     fprintf(filePtr, "\n");
01203     fprintf(filePtr, "START UP TIME:  %d seconds\n",
01204             (int)startUpEnd-(int)startUpBegin);
01205     fprintf(filePtr, "SHUT DOWN TIME:  %d seconds\n",
01206             (int)shutDownEnd-(int)shutDownBegin);
01207 
01208     fprintf(filePtr, "%14s\tFrames\tSeconds\tFrames Per Second\tSelf Time\n",
01209             "MACHINE");
01210     fprintf(filePtr, "--------------\t------\t-------\t-----------------\t---------\n");
01211     totalFPS = 0.0;
01212     for ( ind = 0; ind < numMachines; ind++ ) {
01213       localFPS = (float)numFrames[ind]/(float)numSeconds[ind];
01214       fprintf(filePtr, "%14s\t%d\t%d\t%f\t\t%d\n",
01215               machineName[ind], numFrames[ind], numSeconds[ind],
01216               localFPS, (int)((float)numInputFiles/localFPS));
01217       totalFPS += localFPS;
01218     }
01219 
01220     fprintf(filePtr, "--------------\t------\t-------\t-----------------\t---------\n");
01221 
01222     fprintf(filePtr, "%14s\t\t%d\t%f\n", "OPTIMAL", 
01223             (int)((float)numInputFiles/totalFPS),
01224             totalFPS);
01225     fprintf(filePtr, "%14s\t\t%d\t%f\n", "ACTUAL", diffTime, 
01226             (float)numInputFiles/(float)diffTime);
01227 
01228     fprintf(filePtr, "\n\n");
01229   }
01230 
01231   if ( statFile != NULL ) {
01232     fclose(statFile);
01233   }
01234 }
01235 
01236 
01237 
01238 
01239 
01240 
01241 
01242 
01243 
01244 
01245 
01246 
01247 
01248 boolean
01249   NotifyMasterDone(hostName, portNum, machineNumber, seconds, frameStart,
01250                    frameEnd)
01251 char *hostName;
01252 int portNum;
01253 int machineNumber;
01254 int seconds;
01255 int *frameStart;
01256 int *frameEnd;
01257 {
01258   int   clientSocket;
01259   int32   buffer[8];
01260   time_t  tempTimeStart, tempTimeEnd;
01261 
01262   time(&tempTimeStart);
01263 
01264   clientSocket = ConnectToSocket(hostName, portNum, &hostEntry);
01265 
01266   buffer[0] = htonl(machineNumber);
01267   buffer[1] = htonl(seconds);
01268 
01269   SafeWrite(clientSocket, (char *)buffer, 8);
01270 
01271   SafeRead(clientSocket, (char *)buffer, 8);
01272   *frameStart = ntohl(buffer[0]);
01273   *frameEnd = ntohl(buffer[1]);
01274 
01275   close(clientSocket);
01276 
01277   time(&tempTimeEnd);
01278   IOtime += (tempTimeEnd-tempTimeStart);
01279 
01280   return ((*frameStart) >= 0);
01281 }
01282 
01283 
01284 
01285 
01286 
01287 
01288 
01289 
01290 
01291 
01292 
01293 
01294 
01295 
01296 
01297 
01298 void
01299   StartDecodeServer(numInputFiles, decodeFileName, parallelHostName, portNum)
01300 int numInputFiles;
01301 char *decodeFileName;
01302 char *parallelHostName;
01303 int portNum;
01304 {
01305   int       otherSock, otherSize;
01306   struct sockaddr otherSocket;
01307   int       decodePortNum;
01308   int32   buffer[8];
01309   int       frameReady;
01310   boolean *ready;
01311   int       *waitMachine;
01312   int       *waitPort;
01313   int       *waitList;
01314   int       slaveNumber;
01315   int       slavePort;
01316   int       waitPtr;
01317   struct hostent *nullHost = NULL;
01318   int       clientSocket;
01319 
01320   
01321 
01322   ready = (boolean *) calloc(numInputFiles, sizeof(boolean));
01323   waitMachine = (int *) calloc(numInputFiles, sizeof(int));
01324   waitPort = (int *) malloc(numMachines*sizeof(int));
01325   waitList = (int *) calloc(numMachines, sizeof(int));
01326 
01327   
01328 
01329   decodeServerSocket = CreateListeningSocket(&decodePortNum);
01330 
01331   if ( debugSockets ) {
01332     fprintf(stdout, "====DECODE USING PORT %d\n", decodePortNum);
01333   }
01334 
01335   TransmitPortNum(parallelHostName, portNum, decodePortNum);
01336 
01337   frameDone = (boolean *) malloc(numInputFiles*sizeof(boolean));
01338   memset((char *)frameDone, 0, numInputFiles*sizeof(boolean));
01339 
01340   
01341   while ( TRUE ) {
01342     otherSize = sizeof(otherSocket);
01343     otherSock = accept(decodeServerSocket, &otherSocket, &otherSize);
01344     if ( otherSock == -1 ) {
01345       fprintf(stderr, "ERROR:  DECODE SERVER accept returned error %d\n", errno);
01346       exit(1);
01347     }
01348 
01349     SafeRead(otherSock, (char *)buffer, 4);
01350     frameReady = buffer[0];
01351     frameReady = ntohl(frameReady);
01352 
01353     if ( frameReady == -2 ) {
01354       SafeRead(otherSock, (char *)buffer, 4);
01355       frameReady = buffer[0];
01356       frameReady = ntohl(frameReady);
01357 
01358       if ( debugSockets ) {
01359         fprintf(stdout, "====DECODE SERVER:  REQUEST FOR %d\n", frameReady);
01360         fflush(stdout);     
01361       }
01362 
01363       
01364       buffer[0] = frameDone[frameReady];
01365       buffer[0] = htonl(buffer[0]);
01366       SafeWrite(otherSock, (char *)buffer, 4);
01367 
01368       if ( ! frameDone[frameReady] ) {
01369         
01370         SafeRead(otherSock, (char *)buffer, 8);
01371         slaveNumber = buffer[0];
01372         slaveNumber = ntohl(slaveNumber);
01373         slavePort = buffer[1];
01374         slavePort = ntohl(slavePort);
01375 
01376         if ( debugSockets ) {
01377           fprintf(stdout, "WAITING:  SLAVE %d, PORT %d\n",
01378                   slaveNumber, slavePort);
01379         }
01380 
01381         waitPort[slaveNumber] = slavePort;
01382         if ( waitMachine[frameReady] == 0 ) {
01383           waitMachine[frameReady] = slaveNumber+1;
01384         } else {
01385           
01386           
01387           waitPtr = waitMachine[frameReady]-1;
01388           while ( waitList[waitPtr] != 0 ) {
01389             waitPtr = waitList[waitPtr]-1;
01390           }
01391 
01392           waitList[waitPtr] = slaveNumber+1;
01393           waitList[slaveNumber] = 0;
01394         }
01395       }
01396     } else {
01397       frameDone[frameReady] = TRUE;
01398 
01399       if ( debugSockets ) {
01400         fprintf(stdout, "====DECODE SERVER:  FRAME %d READY\n", frameReady);
01401         fflush(stdout);
01402       }
01403 
01404       if ( waitMachine[frameReady] ) {
01405         
01406         waitPtr = waitMachine[frameReady]-1;
01407         while ( waitPtr >= 0 ) {
01408           clientSocket = ConnectToSocket(machineName[waitPtr],
01409                                          waitPort[waitPtr],
01410                                          &nullHost);
01411           close(clientSocket);
01412           waitPtr = waitList[waitPtr]-1;
01413         }
01414       }
01415     }
01416 
01417     close(otherSock);
01418   }
01419 
01420   if ( debugSockets ) {
01421     fprintf(stdout, "====DECODE SERVER:  Shutting Down\n");
01422     fflush(stdout);
01423   }
01424 
01425   
01426   TransmitPortNum(parallelHostName, portNum, decodePortNum);
01427 
01428   close(decodeServerSocket);
01429 }
01430 
01431 
01432 
01433 
01434 
01435 
01436 
01437 
01438 
01439 
01440 
01441 
01442 
01443 
01444 
01445 
01446 
01447 
01448 
01449 static void
01450   TransmitPortNum(hostName, portNum, newPortNum)
01451 char *hostName;
01452 int portNum;
01453 int newPortNum;
01454 {
01455   int   clientSocket;
01456   u_long  data;
01457 
01458   clientSocket = ConnectToSocket(hostName, portNum, &hostEntry);
01459 
01460   data = htonl(newPortNum);
01461   SafeWrite(clientSocket, (char *) &data, 4);
01462 
01463   close(clientSocket);
01464 }
01465 
01466 
01467 
01468 
01469 
01470 
01471 
01472 
01473 
01474 
01475 
01476 
01477 
01478 
01479 static void
01480   SafeRead(fd, buf, nbyte)
01481 int fd;
01482 char *buf;
01483 int nbyte;
01484 {
01485   int numRead;
01486   int result;
01487 
01488   numRead = 0;
01489 
01490   while ( numRead != nbyte ) {
01491     result = read(fd, &buf[numRead], nbyte-numRead);
01492 
01493     if ( result == -1 ) {
01494       fprintf(stderr, "ERROR:  read (of %d bytes (total %d) ) returned error %d\n",
01495               nbyte-numRead, nbyte, errno);
01496       exit(1);
01497     }
01498     numRead += result;
01499   }
01500 }
01501 
01502 
01503 
01504 
01505 
01506 
01507 
01508 
01509 
01510 
01511 
01512 
01513 
01514 
01515 static void
01516   SafeWrite(fd, buf, nbyte)
01517 int fd;
01518 char *buf;
01519 int nbyte;
01520 {
01521   int numWritten;
01522   int result;
01523 
01524   numWritten = 0;
01525 
01526   while ( numWritten != nbyte ) {
01527     result = write(fd, &buf[numWritten], nbyte-numWritten);
01528 
01529     if ( result == -1 ) {
01530       fprintf(stderr, "ERROR:  read (of %d bytes (total %d) ) returned error %d\n",
01531               nbyte-numWritten, nbyte, errno);
01532       exit(1);
01533     }
01534     numWritten += result;
01535   }
01536 }
01537 
01538 
01539 
01540 
01541 
01542 
01543 
01544 
01545 
01546 
01547 
01548 
01549 
01550 
01551 static void
01552   EndIOServer()
01553 {
01554   
01555   GetRemoteFrame(NULL, -1);
01556 }
01557 
01558 
01559 
01560 
01561 
01562 
01563 
01564 
01565 
01566 
01567 
01568 
01569 
01570 
01571 void
01572   NotifyDecodeServerReady(id)
01573 int id;
01574 {
01575   int   clientSocket;
01576   u_long  data;
01577   time_t  tempTimeStart, tempTimeEnd;
01578 
01579   time(&tempTimeStart);
01580 
01581   clientSocket = ConnectToSocket(IOhostName, decodePortNumber, &hostEntry);
01582 
01583   data = htonl(id);
01584   SafeWrite(clientSocket, (char *)&data, 4);
01585 
01586   close(clientSocket);
01587 
01588   time(&tempTimeEnd);
01589   IOtime += (tempTimeEnd-tempTimeStart);
01590 }
01591 
01592 
01593 
01594 
01595 
01596 
01597 
01598 
01599 
01600 
01601 
01602 
01603 
01604 void
01605   WaitForDecodedFrame(id)
01606 int id;
01607 {
01608   int   clientSocket;
01609   u_long  data;
01610   int       negativeTwo = -2;
01611   int     ready;
01612 
01613   
01614   if ( debugSockets ) {
01615     fprintf(stdout, "WAITING FOR DECODED FRAME %d\n", id);
01616   }
01617 
01618   clientSocket = ConnectToSocket(IOhostName, decodePortNumber, &hostEntry);
01619 
01620   
01621   data = negativeTwo;
01622   data = htonl(negativeTwo);
01623   SafeWrite(clientSocket, (char *)&data, 4);
01624 
01625   data = htonl(id);
01626   SafeWrite(clientSocket, (char *)&data, 4);
01627 
01628   SafeRead(clientSocket, (char *)&data, 4);
01629   ready = data;
01630   ready = ntohl(ready);
01631 
01632   if ( ! ready ) {
01633     int     waitSocket;
01634     int     waitPort;
01635     int     otherSock, otherSize;
01636     struct sockaddr otherSocket;
01637 
01638     
01639     waitSocket = CreateListeningSocket(&waitPort);
01640 
01641     
01642     data = machineNumber;
01643     data = ntohl(data);
01644     SafeWrite(clientSocket, (char *)&data, 4);
01645 
01646     data = waitPort;
01647     data = ntohl(data);
01648     SafeWrite(clientSocket, (char *)&data, 4);
01649 
01650     close(clientSocket);
01651 
01652     if ( debugSockets ) {
01653       fprintf(stdout, "SLAVE:  WAITING ON SOCKET %d\n", waitPort);
01654       fflush(stdout);
01655     }
01656 
01657     otherSize = sizeof(otherSocket);
01658     otherSock = accept(waitSocket, &otherSocket, &otherSize);
01659     if ( otherSock == -1 ) {
01660       fprintf(stderr, "ERROR:  I/O SERVER accept returned error %d\n", errno);
01661       exit(1);
01662     }
01663 
01664     
01665     
01666 
01667     close(otherSock);
01668 
01669     close(waitSocket);
01670   } else {
01671     close(clientSocket);
01672   }
01673 
01674   if ( debugSockets ) {
01675     fprintf(stdout, "YE-HA FRAME %d IS NOW READY\n", id);
01676   }
01677 }
01678 
01679 
01680 
01681 
01682 
01683 
01684 
01685 
01686 
01687 
01688 
01689 
01690 
01691 static int
01692   CreateListeningSocket(portNumber)
01693 int *portNumber;
01694 {
01695   int       resultSocket;
01696   u_short tempShort;
01697   int       result;
01698   struct sockaddr_in    nameEntry;
01699 
01700   resultSocket = socket(AF_INET, SOCK_STREAM, 0);
01701   if ( resultSocket == -1 ) {
01702     fprintf(stderr, "ERROR:  Call to socket() gave error %d\n", errno);
01703     exit(1);
01704   }
01705 
01706   memset((char *) &nameEntry, 0, sizeof(nameEntry));
01707   nameEntry.sin_family = AF_INET;
01708 
01709   
01710   (*portNumber) = 2048;
01711   do {
01712     (*portNumber)++;
01713     tempShort = (*portNumber);
01714     nameEntry.sin_port = htons(tempShort);
01715     result = bind(resultSocket, (struct sockaddr *) &nameEntry,
01716                   sizeof(struct sockaddr));
01717   }
01718   while ( result == -1 );
01719 
01720   
01721 
01722 
01723   result = listen(resultSocket, SOMAXCONN);
01724   if ( result == -1 ) {
01725     fprintf(stderr, "ERROR:  call to listen() gave error %d\n", errno);
01726     exit(1);
01727   }
01728 
01729   return resultSocket;
01730 }
01731 
01732 
01733 
01734 
01735 
01736 
01737 
01738 
01739 
01740 
01741 
01742 
01743 
01744 
01745 
01746 static int
01747   ConnectToSocket(machineName, portNum, hostEnt)
01748 char *machineName;
01749 int     portNum;
01750 struct hostent **hostEnt;
01751 {
01752   int   resultSocket;
01753   int       result;
01754   u_short           tempShort;
01755   struct sockaddr_in  nameEntry;
01756 
01757   if ( (*hostEnt) == NULL ) {
01758     (*hostEnt) = gethostbyname(machineName);
01759     if ( (*hostEnt) == NULL ) {
01760       fprintf(stderr, "ERROR:  Couldn't get host by name (%s)\n",
01761               machineName);
01762       exit(1);
01763     }
01764   }
01765 
01766   resultSocket = socket(AF_INET, SOCK_STREAM, 0);
01767   if ( resultSocket == -1 ) {
01768     fprintf(stderr, "ERROR:  socket returned error %d\n", errno);
01769     exit(1);
01770   }
01771 
01772   nameEntry.sin_family = AF_INET;
01773   memset((void *) nameEntry.sin_zero, 0, 8);
01774   memcpy((void *) &(nameEntry.sin_addr.s_addr),
01775          (void *) (*hostEnt)->h_addr_list[0],
01776          (size_t) (*hostEnt)->h_length);
01777   tempShort = portNum;
01778   nameEntry.sin_port = htons(tempShort);
01779 
01780   result = connect(resultSocket, (struct sockaddr *) &nameEntry,
01781                    sizeof(struct sockaddr));
01782   if ( result == -1 ) {
01783     fprintf(stderr, "ERROR:  connect (ConnectToSocket, port %d) from machine %s returned error %d\n",
01784             portNum, getenv("HOST"), errno);
01785     exit(1);
01786   }
01787 
01788   return resultSocket;
01789 }
01790 
01791 
01792 
01793 
01794 
01795 
01796 
01797 
01798 
01799 
01800 
01801 
01802 
01803 void
01804   SendDecodedFrame(frame)
01805 MpegFrame *frame;
01806 {
01807   int   clientSocket;
01808   register int y;
01809   int       negativeTwo = -2;
01810   uint32  data;
01811 
01812   
01813   clientSocket = ConnectToSocket(IOhostName, ioPortNumber, &hostEntry);
01814 
01815   data = negativeTwo;
01816   data = htonl(data);
01817   SafeWrite(clientSocket, (char *)&data, 4);
01818 
01819   data = frame->id;
01820   data = htonl(data);
01821   SafeWrite(clientSocket, (char *)&data, 4);
01822 
01823   for ( y = 0; y < Fsize_y; y++ ) {
01824     SafeWrite(clientSocket, (char *)frame->decoded_y[y], Fsize_x);
01825   }
01826 
01827   for (y = 0; y < (Fsize_y >> 1); y++) { 
01828     SafeWrite(clientSocket, (char *)frame->decoded_cb[y], (Fsize_x >> 1));
01829   }
01830 
01831   for (y = 0; y < (Fsize_y >> 1); y++) { 
01832     SafeWrite(clientSocket, (char *)frame->decoded_cr[y], (Fsize_x >> 1));
01833   }
01834 
01835   close(clientSocket);
01836 }
01837 
01838 
01839 
01840 
01841 
01842 
01843 
01844 
01845 
01846 
01847 
01848 
01849 
01850 void
01851   GetRemoteDecodedRefFrame(frame, frameNumber)
01852 MpegFrame *frame;
01853 int frameNumber;
01854 {
01855   int   clientSocket;
01856   register int y;
01857   int       negativeThree = -3;
01858   uint32  data;
01859 
01860   
01861   clientSocket = ConnectToSocket(IOhostName, ioPortNumber, &hostEntry);
01862 
01863   
01864   data = negativeThree;
01865   data = htonl(data);
01866   SafeWrite(clientSocket, (char *)&data, 4);
01867 
01868   data = frame->id;
01869   data = htonl(data);
01870   SafeWrite(clientSocket, (char *)&data, 4);
01871 
01872   for ( y = 0; y < Fsize_y; y++ ) {
01873     SafeRead(clientSocket, (char *)frame->decoded_y[y], Fsize_x);
01874   }
01875 
01876   for (y = 0; y < (Fsize_y >> 1); y++) { 
01877     SafeRead(clientSocket, (char *)frame->decoded_cb[y], (Fsize_x >> 1));
01878   }
01879 
01880   for (y = 0; y < (Fsize_y >> 1); y++) { 
01881     SafeRead(clientSocket, (char *)frame->decoded_cr[y], (Fsize_x >> 1));
01882   }
01883 
01884   close(clientSocket);
01885     
01886 }
01887 
01888 
01889 
01890 
01891 
01892 
01893 
01894 
01895 
01896 
01897 
01898 
01899 
01900 
01901 
01902 
01903 
01904 
01905 
01906 
01907 void cleanup_fork( dummy )                      
01908      int dummy;
01909 {
01910   register int i;
01911   for (i = 0;  i < current_max_forked_pid;  ++i ) {
01912 
01913 #ifdef DEBUG_FORK
01914     fprintf(stderr, "cleanup_fork: killing PID %d\n", ClientPid[i]);
01915 #endif
01916 
01917     if (kill(ClientPid[i], TERMINATE_PID_SIGNAL)) {
01918       fprintf(stderr, "cleanup_fork: killed PID=%d failed (errno %d)\n", 
01919               ClientPid[i], errno);
01920     }
01921   }
01922 }
01923 
01924 
01925 
01926 
01927 
01928 
01929 
01930 
01931 
01932 
01933 
01934 
01935 static int safe_fork(command)           
01936      char *command;
01937 {
01938   static int init=0;
01939   char *argis[MAXARGS];
01940   register int i=1;
01941   
01942   if (!(argis[0] = strtok(command, " \t"))) return(0); 
01943   while ((argis[i] = strtok(NULL, " \t")) && i < MAXARGS) ++i;
01944   argis[i] = NULL;
01945   
01946 #ifdef DEBUG_FORK
01947   {register int i=0; 
01948    fprintf(stderr, "Command %s becomes:\n", command);
01949    while(argis[i]) {fprintf(stderr, "--%s--\n", argis[i]); ++i;} }
01950 #endif
01951   
01952   if (!init) {                  
01953     signal (SIGQUIT, cleanup_fork);
01954     signal (SIGTERM, cleanup_fork);
01955     signal (SIGINT , cleanup_fork);
01956     init=1;
01957   }
01958   
01959   if (-1 == (ClientPid[current_max_forked_pid] = fork()) )  {
01960     perror("safe_fork: fork failed ");
01961     return(-1);
01962   }
01963   if( !ClientPid[current_max_forked_pid]) { 
01964     execvp(argis[0], argis );
01965     perror("safe_fork child: exec failed ");
01966     exit(1);
01967   }
01968 #ifdef DEBUG_FORK
01969   fprintf(stderr, "parallel: forked PID=%d\n", ClientPid[current_max_forked_pid]);
01970 #endif
01971   current_max_forked_pid++;
01972   return(0);
01973 }