RemoteDBOpener.java

Go to the documentation of this file.
00001 package edu.rice.cs.hpc.traceviewer.db.remote;
00002 
00003 import java.io.BufferedInputStream;
00004 import java.io.BufferedOutputStream;
00005 import java.io.ByteArrayInputStream;
00006 import java.io.DataInputStream;
00007 import java.io.DataOutputStream;
00008 import java.io.IOException;
00009 import java.io.InputStream;
00010 import java.net.InetSocketAddress;
00011 import java.net.Socket;
00012 import java.net.SocketAddress;
00013 import java.net.SocketException;
00014 import java.net.UnknownHostException;
00015 import java.util.zip.GZIPInputStream;
00016 
00017 import org.eclipse.jface.action.IStatusLineManager;
00018 import org.eclipse.ui.IWorkbenchWindow;
00019 
00020 import com.jcraft.jsch.JSchException;
00021 import edu.rice.cs.hpc.data.experiment.InvalExperimentException;
00022 import edu.rice.cs.hpc.data.experiment.extdata.TraceName;
00023 import edu.rice.cs.hpc.remote.tunnel.LocalTunneling;
00024 import edu.rice.cs.hpc.remote.tunnel.RemoteUserInfo;
00025 import edu.rice.cs.hpc.traceviewer.spaceTimeData.SpaceTimeDataController;
00026 import edu.rice.cs.hpc.traceviewer.data.util.Constants;
00027 import edu.rice.cs.hpc.traceviewer.data.util.Debugger;
00028 import edu.rice.cs.hpc.traceviewer.db.AbstractDBOpener;
00029 import edu.rice.cs.hpc.traceviewer.db.DatabaseAccessInfo;
00030 import edu.rice.cs.hpc.traceviewer.db.TraceDatabase;
00039 public class RemoteDBOpener extends AbstractDBOpener 
00040 {
00041     // -----------------
00042     // constants
00043     // -----------------
00044     
00045     private static final int PROTOCOL_VERSION = 0x00010001;
00046     private static final String LOCALHOST = "localhost";
00047 
00048     // -----------------
00049     // static variables
00050     // -----------------
00051     // TODO: static variables are discouraged in Eclipse since
00052     //       it isn't suitable for multiple instances of applications
00053     // -----------------
00054 
00055     static private Socket serverConnection = null;
00056     static private LocalTunneling tunnelMain, tunnelXML;
00057     static private RemoteUserInfo remoteUserInfo;
00058 
00059     // -----------------
00060     // object variables
00061     // -----------------
00062     
00063     private final DatabaseAccessInfo connectionInfo;
00064 
00065     private DataOutputStream sender;
00066     private DataInputStream receiver;
00067 
00068 
00069     /**************
00070      * constructor
00071      * 
00072      * @param connectionInfo
00073      */
00074     public RemoteDBOpener(DatabaseAccessInfo connectionInfo) {
00075         this.connectionInfo = connectionInfo;
00076     }
00077 
00078     // --------------------------------------------------------------------------------------
00079     // override methods
00080     // --------------------------------------------------------------------------------------
00081     
00082     @Override
00083     /*
00084      * (non-Javadoc)
00085      * @see edu.rice.cs.hpc.traceviewer.db.AbstractDBOpener#openDBAndCreateSTDC(org.eclipse.ui.IWorkbenchWindow, 
00086      * org.eclipse.jface.action.IStatusLineManager)
00087      */
00088     public SpaceTimeDataController openDBAndCreateSTDC(
00089             IWorkbenchWindow window, IStatusLineManager statusMgr) 
00090             throws InvalExperimentException, Exception 
00091     {
00092 
00093         // --------------------------------------------------------------
00094         // step 1 : create a SSH tunnel for the main port if necessary
00095         // --------------------------------------------------------------
00096         
00097         int port = Integer.parseInt(connectionInfo.serverPort);
00098         boolean use_tunnel = connectionInfo.isTunnelEnabled();
00099         String host = connectionInfo.serverName;
00100         
00101         int num_attempts = 3;
00102         
00103         {
00104             if  (use_tunnel) {
00105                 // we need to setup the SSH tunnel
00106                 tunnelMain = createSSHTunnel(window, tunnelMain, port);
00107                 host = LOCALHOST;
00108             }
00109             
00110             // --------------------------------------------------------------
00111             // step 2 : initial contact to the server.
00112             //          if there's no reply or I/O error, we quit
00113             // --------------------------------------------------------------
00114             
00115             connectToServer(window, host, port);
00116 
00117             // --------------------------------------------------------------
00118             // step 3 : send OPEN information including the database to open
00119             // --------------------------------------------------------------
00120             
00121             if (sendOpenDB(connectionInfo.databasePath))
00122             {
00123                 // communication has been done successfully
00124                 num_attempts     = 0;
00125             } else 
00126             {   // problem with the communication
00127                 // try to reset the connection if we can solve this by resetting the channel
00128                 remoteUserInfo   = null;
00129                 tunnelMain       = null;
00130                 serverConnection = null;
00131                 
00132                 num_attempts--;
00133             }
00134         } while (num_attempts > 0);
00135 
00136         // --------------------------------------------------------------
00137         // step 4 : Blocking waiting the reply from the server. 
00138         //          A better way is to wait a
00139         // --------------------------------------------------------------
00140         
00141         int traceCount;
00142         int messageTag = RemoteDataRetriever.waitAndReadInt(receiver);
00143         int xmlMessagePortNumber;
00144         int compressionType;
00145         TraceName[] valuesX;
00146 
00147         if (messageTag == Constants.DB_OK)// DBOK
00148         {
00149             xmlMessagePortNumber  = receiver.readInt();
00150             traceCount = receiver.readInt();
00151             compressionType = receiver.readInt();
00152             valuesX = formatTraceNames(traceCount);
00153             
00154         } else 
00155         {
00156             //If the message is not a DBOK, it must be a NODB 
00157             //Right now, the error code isn't used, but it is there for the future
00158             int errorCode = receiver.readInt();
00159             String errorMessage="The server could not find traces in the directory:\n"
00160                 + connectionInfo.databasePath + "\nPlease select a directory that contains traces.\nError code: " + errorCode ;
00161             throw new IOException(errorMessage);
00162         }
00163         
00164         // --------------------------------------------------------------
00165         // step 5 : create a SSH tunnel for XML port if necessary
00166         // --------------------------------------------------------------
00167         
00168         Debugger.printDebug(2, "About to connect to socket "+ xmlMessagePortNumber + " at "+ System.nanoTime());
00169         
00170         if (use_tunnel &&  (port != xmlMessagePortNumber)) {
00171             // only create SSH tunnel if the XML socket has different port number
00172             tunnelXML = createSSHTunnel(window, tunnelXML, xmlMessagePortNumber);           
00173         }
00174         
00175         statusMgr.setMessage("Receiving XML stream");
00176         
00177         InputStream xmlStream = getXmlStream(host, port, xmlMessagePortNumber);
00178         
00179         if (xmlStream == null) {//null if getting it failed
00180             String errorMessage="Error communicating with server:\nCould not receive XML stream. \nPlease try again.";
00181             throw new IOException(errorMessage);
00182         }
00183 
00184         // --------------------------------------------------------------
00185         // step 6 : prepare communication stream fir sending & receiving 
00186         // --------------------------------------------------------------
00187         
00188         RemoteDataRetriever dataRetriever = new RemoteDataRetriever(serverConnection,
00189                  window.getShell(), compressionType);
00190         
00191         SpaceTimeDataControllerRemote stData = new SpaceTimeDataControllerRemote(dataRetriever, window, statusMgr,
00192                 xmlStream, connectionInfo.databasePath + " on " + host, traceCount, valuesX, sender);
00193 
00194         sendInfoPacket(sender, stData);
00195         
00196         return stData;  
00197     }
00198     
00199 
00200     @Override
00201     /*
00202      * (non-Javadoc)
00203      * @see edu.rice.cs.hpc.traceviewer.db.AbstractDBOpener#end()
00204      */
00205     public void end() {
00206         try {
00207             // closing I/O and network connection
00208             sender.close();
00209             receiver.close();
00210             
00211             serverConnection.close();
00212             if (tunnelMain != null) {
00213                 try {
00214                     tunnelMain.disconnect();
00215                 } catch (JSchException e) {
00216                     System.err.println("Warning: Cannot close the SSH tunnel !");
00217                     e.printStackTrace();
00218                 }
00219             }
00220             
00221         } catch (IOException e) {
00222             // TODO Auto-generated catch block
00223             e.printStackTrace();
00224         }
00225     }
00226 
00227     // --------------------------------------------------------------------------------------
00228     // private methods
00229     // --------------------------------------------------------------------------------------
00230     
00231     /*****
00232      * a wrapper for tunneling() function to throw an IOException
00233      * 
00234      * @param window : the reference of the current workbench window
00235      * @param tunnel : the local tunnel, if the tunnel is null, we'll create a new one. Otherwise,
00236      *                  just use this argument. The caller needs to assign with the return tunnel
00237      * @param port   : the local and the remote port
00238      *  
00239      * @throws JSchException 
00240      */
00241     private LocalTunneling createSSHTunnel(IWorkbenchWindow window, LocalTunneling tunnel, int port) 
00242             throws JSchException
00243     {
00244         if (tunnel == null)
00245         {
00246             if (remoteUserInfo == null)
00247             {
00248                 remoteUserInfo = new RemoteUserInfo(window.getShell());
00249             }
00250             remoteUserInfo.setInfo( connectionInfo.sshTunnelUsername, 
00251                                     connectionInfo.sshTunnelHostname, port);
00252             tunnel = new LocalTunneling(remoteUserInfo);
00253         }
00254         
00255         tunnel.connect(connectionInfo.sshTunnelUsername, connectionInfo.sshTunnelHostname, 
00256                 connectionInfo.serverName, port);
00257         
00258         System.out.println("tunnel: " + tunnel);
00259         return tunnel;
00260     }
00261     
00262 
00263     
00264     /******
00265      * 
00266      * @param traceCount
00267      * @return
00268      * @throws IOException
00269      */
00270     private TraceName[] formatTraceNames(int traceCount) throws IOException {
00271         TraceName[] names  = new TraceName[traceCount];
00272         for (int i = 0; i < names.length; i++) {
00273             int processID = receiver.readInt();
00274             int threadID = receiver.readShort();
00275             names[i] = new TraceName(processID, threadID);
00276         }
00277         return names;
00278     }
00279 
00280     
00281     /***************
00282      * Get XML data from the server
00283      * 
00284      * @param serverURL
00285      * @param port
00286      * @param xmlMessagePortNumber
00287      * 
00288      * @return XML data in zipped format
00289      * 
00290      * @throws IOException
00291      */
00292     private GZIPInputStream getXmlStream(String serverURL, int port, int xmlMessagePortNumber)
00293             throws IOException {
00294 
00295         byte[] compressedXMLMessage;
00296         DataInputStream dxmlReader;
00297         if (xmlMessagePortNumber == port)
00298         {
00299             dxmlReader = receiver;
00300 
00301         }
00302         else
00303         {
00304             Socket xmlConnection = new Socket();
00305             SocketAddress xmlAddress = new InetSocketAddress(serverURL, xmlMessagePortNumber);
00306             xmlConnection.connect(xmlAddress, 1000);
00307             BufferedInputStream buf = new BufferedInputStream(xmlConnection.getInputStream());
00308             dxmlReader = new DataInputStream(buf);
00309         }
00310 
00311         int exml = dxmlReader.readInt();
00312         if (exml != Constants.XML_HEADER) 
00313         {
00314             Debugger.printDebug(0,"Expected XML Message (" + Constants.XML_HEADER
00315                     + ")  on data socket, got " + exml);
00316             return null;
00317         }
00318         int size = dxmlReader.readInt();
00319         
00320         compressedXMLMessage = new byte[size];
00321         int numRead = 0;
00322         while (numRead < size)
00323         {
00324             numRead += dxmlReader.read(compressedXMLMessage, numRead, size- numRead);
00325         }
00326         GZIPInputStream xmlStream = new GZIPInputStream(new 
00327                 ByteArrayInputStream(compressedXMLMessage));
00328         return xmlStream;
00329     }
00330 
00331     
00332     /*****************
00333      * Try to connect to a remote server
00334      * 
00335      * @param window
00336      * @param serverURL
00337      * @param port
00338      * @throws UnknownHostException
00339      * @throws IOException
00340      */
00341     private void connectToServer(IWorkbenchWindow window, String serverURL, int port) 
00342             throws UnknownHostException, IOException 
00343     {
00344         if (serverConnection != null && !serverConnection.isClosed()) 
00345         {
00346             InetSocketAddress addr = new InetSocketAddress(serverURL, port);
00347             SocketAddress sockAddr = serverConnection.getRemoteSocketAddress();
00348             
00349             if (sockAddr.equals(addr)) 
00350             {
00351                 //Connecting to same server, don't do anything.
00352                 initDataIOStream();
00353                 return;
00354             } else {
00355                 //Connecting to a different server
00356                 TraceDatabase.removeInstance(window);
00357             }
00358         }
00359         serverConnection = new Socket(serverURL, port);
00360         initDataIOStream();
00361         System.out.println("connect: " + serverConnection.getRemoteSocketAddress());
00362     }
00363     
00364     
00365     private void initDataIOStream() 
00366             throws IOException
00367     {
00368         sender = new DataOutputStream(new BufferedOutputStream(
00369                 serverConnection.getOutputStream()));
00370         receiver = new DataInputStream(new BufferedInputStream(
00371                 serverConnection.getInputStream()));
00372     }
00373 
00374     /*******
00375      * sending info message to the server
00376      * 
00377      * @param _sender
00378      * @param stData
00379      * @throws IOException
00380      *******/
00381     private void sendInfoPacket(DataOutputStream _sender,
00382             SpaceTimeDataControllerRemote stData) throws IOException {
00383 
00384         // The server
00385         // needs information (min & max time, etc.) that can only be gotten (as
00386         // far as I know) from the
00387         // XML processing that happens in the SpaceTimeDataController
00388         // constructor, so we construct it, get what we need, then pass in the
00389         // RemoteDataRetriever as soon as possible.
00390 
00391         /*
00392          * Then: overallMinTime (long) overallMaxTime (long) headerSize (int)
00393          */
00394         sender.writeInt(Constants.INFO);
00395         sender.writeLong(stData.getMinBegTime());
00396         sender.writeLong(stData.getMaxEndTime());
00397         sender.writeInt(stData.getHeaderSize());
00398         sender.flush();
00399 
00400     }
00401 
00402     
00403     /*******
00404      * sending a message to the server to open a database
00405      * 
00406      * @param serverPathToDB
00407      * 
00408      * @throws  SocketException when fail to send via SSH tunnel
00409      *          IOException for general cases
00410      *******/
00411     private boolean sendOpenDB(String serverPathToDB) 
00412             throws IOException, SocketException 
00413             {
00414         sender.writeInt(Constants.OPEN);
00415         sender.writeInt(PROTOCOL_VERSION);
00416         int len = serverPathToDB.length();
00417         sender.writeShort(len);
00418         for (int i = 0; i < len; i++) {
00419             int charVal = serverPathToDB.charAt(i);
00420             if (charVal > 0xFF)
00421                 System.out.println("Path to database cannot contain special characters");
00422             sender.writeByte(charVal);
00423         }
00424         
00425         try {
00426             // TODO Warning this flush may cause @see SocketException broken pipe
00427             sender.flush();
00428         } catch (SocketException e) {
00429             return false;
00430         }
00431 
00432         Debugger.printDebug(0,"Open database message sent");
00433         
00434         return true;
00435     }   
00436 }
00437 
00438 

Generated on 5 May 2015 for HPCVIEWER by  doxygen 1.6.1