DecompressionThread.java

Go to the documentation of this file.
00001 package edu.rice.cs.hpc.traceviewer.db.remote;
00002 
00003 import java.io.ByteArrayInputStream;
00004 import java.io.DataInputStream;
00005 import java.io.IOException;
00006 import java.util.HashMap;
00007 import java.util.concurrent.ConcurrentLinkedQueue;
00008 import java.util.concurrent.atomic.AtomicInteger;
00009 import java.util.zip.InflaterInputStream;
00010 
00011 import edu.rice.cs.hpc.traceviewer.data.db.TraceDataByRank;
00012 import edu.rice.cs.hpc.traceviewer.data.db.DataRecord;
00013 import edu.rice.cs.hpc.traceviewer.painter.ImageTraceAttributes;
00014 import edu.rice.cs.hpc.traceviewer.services.ProcessTimelineService;
00015 import edu.rice.cs.hpc.traceviewer.data.graph.CallPath;
00016 import edu.rice.cs.hpc.traceviewer.data.timeline.ProcessTimeline;
00017 import edu.rice.cs.hpc.traceviewer.data.util.Constants;
00018 import edu.rice.cs.hpc.traceviewer.data.util.Debugger;
00019 
00020 //Perhaps this would all be more suited to a ThreadPool 
00021 
00022 /*
00023  * Philip 5/29/13 Moved rendering code to the canvases to align the remote
00024  * version with changes made to the local version. This used to be responsible
00025  * for rendering and decompressing, but now is not. I'm keeping the WorkItemToDo
00026  * structure just in case this expands again. 
00027  * Philip 7/23/13 Got rid of WorkItemToDo. This thread looks like it's only going
00028  * to be for decompression now that we have a decent way to do the rendering in
00029  * parallel without waiting for all threads to be decompressed.
00030  */
00031 
00032 public class DecompressionThread extends Thread {
00033 
00034     final private ConcurrentLinkedQueue<DecompressionItemToDo> workToDo;
00035     final private ConcurrentLinkedQueue<Integer> timelinesAvailableForRendering;
00036     // Variables for decompression
00037     final ProcessTimelineService timelineServ;
00038     final HashMap<Integer, CallPath> scopeMap;
00039 
00040     final ImageTraceAttributes attributes;
00041     
00042     private final IThreadListener listener;
00043     
00044     public final static int COMPRESSION_TYPE_MASK = 0xFFFF;//Save two bytes for formatting versions
00045     public final static short ZLIB_COMPRESSSED  = 1;
00046     
00047     static boolean first = true;
00048 
00049     final private AtomicInteger ranksRemainingToDecompress;
00050 
00051     /********
00052      * Constructor for decompression thread. 
00053      * Despite its name, this class is not for decompressing data from the server,
00054      * but mainly for accepting trace data. If the data is compressed, it will then
00055      * automatically decompress.
00056      *  
00057      * @param ptlService
00058      * @param _scopeMap
00059      * @param attributes
00060      * @param queue work to do
00061      * @param listener
00062      */
00063     public DecompressionThread(
00064             ProcessTimelineService ptlService,
00065             HashMap<Integer, CallPath> _scopeMap,
00066             ImageTraceAttributes attributes,
00067             ConcurrentLinkedQueue<DecompressionItemToDo> workToDo, 
00068             ConcurrentLinkedQueue<Integer> timelinesAvailableForRendering,
00069             AtomicInteger ranksRemainingToDecompress,
00070             IThreadListener listener) {
00071         timelineServ    = ptlService;
00072         scopeMap        = _scopeMap;
00073 
00074         this.attributes = attributes;
00075         this.workToDo   = workToDo;
00076         this.timelinesAvailableForRendering = timelinesAvailableForRendering;
00077         this.ranksRemainingToDecompress     = ranksRemainingToDecompress;
00078         this.listener   = listener;
00079     }
00080     
00081     
00082     
00091 /*  public static Integer getNextTimelineToRender() {
00092         return timelinesAvailableForRendering.poll();
00093     }*/
00094     
00095     @Override
00096     public void run() {
00097         int i = 0;
00098         while (ranksRemainingToDecompress.get() > 0)
00099         {
00100             DecompressionItemToDo wi = workToDo.poll();
00101             if (wi == null)
00102             {
00103                 if ( i++ > RemoteDataRetriever.getTimeOut() ) {
00104                     // time out
00105                     break;
00106                 }
00107                 //There is still work that needs to get done, but it is not available to be worked on at the moment.
00108                 //Wait a little and try again
00109                 try {
00110                     Thread.sleep( RemoteDataRetriever.getTimeSleep() );
00111 
00112                 } catch (InterruptedException e) {
00113                     // error in I/O
00114                     e.printStackTrace();
00115                     break;
00116                 }
00117             } else {
00118                 i = 0;
00119                 if (first){
00120                     first = false;
00121                     Debugger.printTimestampDebug("First decompression beginning.");
00122                 }
00123                 ranksRemainingToDecompress.getAndDecrement();
00124                 DecompressionItemToDo toDecomp = (DecompressionItemToDo)wi;
00125                 try {
00126                     decompress(toDecomp);
00127                 } catch (IOException e) {
00128                     // error in decompression
00129                     Debugger.printDebug(1, "IO Exception in decompression algorithm.");
00130                     e.printStackTrace();
00131                     break;
00132                 }
00133             }
00134         }
00135         if (ranksRemainingToDecompress.get() > 0) {
00136             listener.notify("Decompression error due to time out");
00137         }
00138     }
00139 
00140     private void decompress(DecompressionItemToDo toDecomp) throws IOException
00141     {
00142         DataRecord[] ranksData = readTimeCPIDArray(toDecomp.packet, toDecomp.itemCount, toDecomp.startTime, toDecomp.endTime, toDecomp.compressed);
00143         TraceDataByRank dataAsTraceDBR = new TraceDataByRank(ranksData);
00144 
00145         int lineNumber = toDecomp.rankNumber;
00146 
00147         // laks attempts to fix, 2015.02.06: I think the process time line class expect 
00148         // the number of horizontal pixels in the 4th parameter instead of number of processors
00149         // TODO: need to check
00150         
00151         ProcessTimeline ptl = new ProcessTimeline(dataAsTraceDBR, scopeMap, lineNumber, 
00152                 attributes.numPixelsH, attributes.getTimeInterval(), attributes.getTimeBegin());
00153         
00154         timelineServ.setProcessTimeline(lineNumber, ptl);
00155         timelinesAvailableForRendering.add(lineNumber);
00156     }
00157 
00168     private DataRecord[] readTimeCPIDArray(byte[] packedTraceLine, int length, long t0, long tn, int compressed) throws IOException {
00169 
00170         DataInputStream decompressor;
00171         if ((compressed & COMPRESSION_TYPE_MASK) == ZLIB_COMPRESSSED)
00172             decompressor= new DataInputStream(new InflaterInputStream(new ByteArrayInputStream(packedTraceLine)));
00173         else
00174             decompressor = new DataInputStream(new ByteArrayInputStream(packedTraceLine));
00175         DataRecord[] toReturn = new DataRecord[length];
00176         long currentTime = t0;
00177         for (int i = 0; i < toReturn.length; i++) {
00178             // There are more efficient ways to send the timestamps. Namely,
00179             // instead of sending t_n - t_(n-1), we can send (t_n - t_(n-1))-T,
00180             // where T is the expected delta T, calculated by
00181             // (t_n-t_0)/(length-1). These will fit in three bytes for certain
00182             // and often will fit in two. Because of the gzip layer on top,
00183             // though, the actual savings may be marginal, which is why it is
00184             // implemented more simply right now. This is left as a possible
00185             // extension with the compression type flag.
00186             int deltaT = decompressor.readInt();
00187             currentTime += deltaT;
00188             int CPID = decompressor.readInt();
00189             /*if (CPID <= 0)
00190                 System.out.println("CPID too small");*/
00191             toReturn[i] = new DataRecord(currentTime, CPID, Constants.dataIdxNULL);
00192         }
00193         decompressor.close();
00194         return toReturn;
00195     }
00196 
00197 
00198 
00199 
00200 public static class DecompressionItemToDo {
00201     final byte[] packet;
00202     final int itemCount;//The number of Time-CPID pairs
00203     final long startTime, endTime;
00204     final int rankNumber;
00205     final int compressed;
00206     public DecompressionItemToDo(byte[] _packet, int _itemCount, long _startTime, long _endTime, int _rankNumber, int _compressionType) {
00207         packet = _packet;
00208         itemCount = _itemCount;
00209         startTime = _startTime;
00210         endTime = _endTime;
00211         rankNumber = _rankNumber;
00212         compressed = _compressionType;
00213     }
00214 }
00215 }

Generated on 5 May 2015 for HPCVIEWER by  doxygen 1.6.1