RemoteDataRetriever.java

Go to the documentation of this file.
00001 package edu.rice.cs.hpc.traceviewer.db.remote;
00002 
00003 import java.io.BufferedInputStream;
00004 import java.io.BufferedOutputStream;
00005 import java.io.DataInputStream;
00006 import java.io.DataOutputStream;
00007 import java.io.IOException;
00008 import java.net.Socket;
00009 import java.util.HashMap;
00010 import java.util.concurrent.ConcurrentLinkedQueue;
00011 
00012 import org.eclipse.core.runtime.IProgressMonitor;
00013 import org.eclipse.core.runtime.IStatus;
00014 import org.eclipse.core.runtime.Status;
00015 import org.eclipse.core.runtime.jobs.Job;
00016 import org.eclipse.swt.widgets.Shell;
00017 import edu.rice.cs.hpc.traceviewer.data.graph.CallPath;
00018 import edu.rice.cs.hpc.traceviewer.data.util.Constants;
00019 import edu.rice.cs.hpc.traceviewer.data.util.Debugger;
00020 import edu.rice.cs.hpc.traceviewer.db.remote.DecompressionThread.DecompressionItemToDo;
00021 import edu.rice.cs.hpc.traceviewer.painter.ImageTraceAttributes;
00022 
00033 public class RemoteDataRetriever {
00034     
00035     // -------------------------------------
00036     // Constants
00037     // -------------------------------------
00038     
00039     //For more information on message structure, see protocol documentation at the end of this file. 
00040     private static final int DATA = 0x44415441;
00041     private static final int HERE = 0x48455245;
00042     
00043     /****
00044      * time out counter is based on TIME_SLEEP ms unit
00045      */
00046     private static final int TIME_OUT = 2000;
00047     
00048     private static final int TIME_SLEEP = 50;
00049     
00050     // -------------------------------------
00051     // Variables
00052     // -------------------------------------
00053 
00054     private final Socket socket;
00055     DataInputStream receiver;
00056     BufferedInputStream rcvBacking;
00057     DataOutputStream sender;
00058     
00059     final int compressionType;
00060 
00061     /******
00062      * Constructor for communicating with remote data server
00063      * 
00064      * @param _serverConnection : connection socket
00065      * @param _statusMgr : line manager
00066      * @param _shell : window shell
00067      * @param _compressionType : type of compression, see {@link DecompressionThread.COMPRESSION_TYPE_MASK}
00068      * 
00069      * @throws IOException
00070      */
00071     public RemoteDataRetriever(Socket _serverConnection, Shell _shell, int _compressionType) throws IOException {
00072         socket = _serverConnection;
00073         
00074         compressionType = _compressionType;
00075         
00076         rcvBacking      = new BufferedInputStream(socket.getInputStream());
00077         receiver        = new DataInputStream(rcvBacking);
00078         
00079         sender          = new DataOutputStream(new BufferedOutputStream(socket.getOutputStream()));
00080     }
00081 
00082     //TODO: I think these are all inclusive, but check.
00094     public void getData( ImageTraceAttributes attributes, 
00095             HashMap<Integer, CallPath> _scopeMap, 
00096             final ConcurrentLinkedQueue<DecompressionItemToDo> workToDo) throws IOException
00097     {
00098         //Make the call
00099         //Check to make sure the server is sending back data
00100         //Wait/Receive/Parse:
00101                 //          Make into TimeCPID[]
00102                 //          Make into DataByRank
00103                 //          Make into ProcessTimeline
00104                 //          Put into appropriate place in array
00105         //When all are done, return the array
00106         // int P0, int Pn, long t0, long tn, int vertRes, int horizRes,
00107         int P0 = attributes.getProcessBegin();
00108         int Pn = attributes.getProcessEnd();
00109         long t0 = attributes.getTimeBegin();
00110         long tn = attributes.getTimeEnd();
00111         int vertRes = attributes.numPixelsV;
00112         int horizRes = attributes.numPixelsH;
00113         
00114         Debugger.printTimestampDebug("Requesting data");
00115         requestData(P0, Pn, t0, tn, vertRes, horizRes);
00116         Debugger.printTimestampDebug("Data request finished");
00117         
00118         int responseCommand = waitAndReadInt(receiver);     
00119         if (responseCommand != HERE)//"HERE" in ASCII
00120             throw new IOException("The server did not send back data");
00121     
00122         Debugger.printTimestampDebug("Data receive begin");
00123         
00124         final int ranksExpected = Math.min(Pn-P0, vertRes);
00125 
00126         Job unpacker = new Job("Receiving data") {
00127 
00128             @Override
00129             public IStatus run(IProgressMonitor monitor) {
00130                 DataInputStream dataReader;
00131                 int ranksReceived = 0;
00132                 dataReader = receiver;
00133                 boolean first = true;
00134                 
00135                 try {
00136                     while (ranksReceived < ranksExpected) {
00137 
00138                         int rankNumber = dataReader.readInt();
00139                         if (first){
00140                             Debugger.printTimestampDebug("First real data byte received.");
00141                             first = false;
00142                         }
00143                         int length = dataReader.readInt();// Number of CPID's
00144 
00145                         long startTimeForThisTimeline = dataReader.readLong();
00146                         long endTimeForThisTimeline = dataReader.readLong();
00147                         int compressedSize = dataReader.readInt();
00148                         
00149                         // when there's network issue, the value of compressedSize can be negative
00150                         // this has happened when the server process was suspended and the user keeps
00151                         //  asking data from the client to the server
00152                         if (compressedSize >0) {
00153                             byte[] compressedTraceLine = new byte[compressedSize];
00154 
00155                             int numRead = 0;
00156                             while (numRead < compressedSize) {
00157                                 numRead += dataReader.read(compressedTraceLine,
00158                                         numRead, compressedSize - numRead);
00159 
00160                             }
00161 
00162                             workToDo.add(
00163                                 new DecompressionThread.DecompressionItemToDo(
00164                                             compressedTraceLine, length,
00165                                             startTimeForThisTimeline,
00166                                             endTimeForThisTimeline, rankNumber,
00167                                             compressionType));
00168 
00169                             ranksReceived++;
00170                             monitor.worked(1);
00171                         }
00172                     }
00173                 } catch (IOException e) {
00174                     //Should we provide some UI notification to the user?
00175                     e.printStackTrace();
00176                 }
00177                 monitor.done();
00178                 //Updating the progress doesn't work anyways and will throw
00179                 //an exception because this is a different thread
00180                 //monitor.endProgress();
00181                 Debugger.printTimestampDebug("Data receive end");
00182                 return Status.OK_STATUS;
00183             }
00184         };
00185         unpacker.setUser(true);
00186         unpacker.schedule();
00187         
00188     }
00189 
00190     
00191     private void requestData(int P0, int Pn, long t0, long tn, int vertRes,
00192             int horizRes) throws IOException {
00193         sender.writeInt(DATA);
00194         sender.writeInt(P0);
00195         sender.writeInt(Pn);
00196         sender.writeLong(t0);
00197         sender.writeLong(tn);
00198         sender.writeInt(vertRes);
00199         sender.writeInt(horizRes);
00200         //That's it for the message
00201         sender.flush();
00202     }
00203 
00204 
00205     static int waitAndReadInt(DataInputStream receiver)
00206             throws IOException {
00207         int nextCommand;
00208         int timeout = 0;
00209         // Sometime the buffer is filled with 0s for some reason. This flushes
00210         // them out. This is awful, but otherwise we just get 0s
00211 
00212         while (receiver.available() <= 4
00213                 || ((nextCommand = receiver.readInt()) == 0)) {
00214 
00215             if (timeout++ > TIME_OUT) {
00216                 throw new IOException("Timeout: no response from the server.");
00217             }
00218             try {
00219                 Thread.sleep(TIME_SLEEP);
00220             } catch (InterruptedException e) {
00221 
00222                 e.printStackTrace();
00223             }
00224         }
00225         if (receiver.available() < 4)// There certainly isn't a message
00226                                         // available, since every message is at
00227                                         // least 4 bytes, but the next time the
00228                                         // buffer has anything there will be a
00229                                         // message
00230         {
00231             timeout = 0;
00232             if (timeout++ > TIME_OUT) {
00233                 throw new IOException("Timeout while waiting for command: no response from the server.");
00234             }
00235 
00236             receiver.read(new byte[receiver.available()]);// Flush the rest of
00237                                                             // the buffer
00238             while (receiver.available() <= 0) {
00239 
00240                 try {
00241                     Thread.sleep(TIME_SLEEP);
00242                 } catch (InterruptedException e) {
00243                     e.printStackTrace();
00244                 }
00245             }
00246             nextCommand = receiver.readInt();
00247         }
00248         return nextCommand;
00249     }
00250     public void closeConnection() throws IOException {
00251         sender.writeInt(Constants.DONE);
00252         sender.flush();
00253         sender.close();
00254         receiver.close();
00255         socket.close();
00256     }
00257     
00258     
00259     static public int getTimeSleep() {
00260         return TIME_SLEEP;
00261     }
00262     
00263     static public int getTimeOut() {
00264         return TIME_OUT;
00265     }
00266 } 

Generated on 5 May 2015 for HPCVIEWER by  doxygen 1.6.1