ThreadLevelDataFile.java

Go to the documentation of this file.
00001 package edu.rice.cs.hpc.viewer.metric;
00002 
00003 import java.io.IOException;
00004 import java.util.ArrayList;
00005 import java.util.ConcurrentModificationException;
00006 
00007 import org.eclipse.jface.action.IStatusLineManager;
00008 
00009 import edu.rice.cs.hpc.common.ui.TimelineProgressMonitor;
00010 import edu.rice.cs.hpc.common.ui.Util;
00011 import edu.rice.cs.hpc.data.experiment.extdata.FileDB2;
00012 import edu.rice.cs.hpc.data.util.Constants;
00013 import edu.rice.cs.hpc.data.util.LargeByteBuffer;
00014 
00015 
00016 /*****************************************
00017  * class to manage data on thread level of a specific experiment
00018  * 
00019  * @author laksonoadhianto
00020  *
00021  */
00022 public class ThreadLevelDataFile extends FileDB2 {
00023 
00024     // header bytes to skip
00025     static private final int HEADER_LONG    =   32;
00026     static int recordSz = Constants.SIZEOF_LONG + Constants.SIZEOF_LONG;
00027 
00028     public void open(String filename) throws IOException
00029     {
00030         super.open(filename, HEADER_LONG, recordSz);
00031     }
00040     public double[] getMetrics(long nodeIndex, int metricIndex, int numMetrics, IStatusLineManager statusMgr) {
00041     
00042         final double []metrics = new double[getNumberOfRanks()];
00043         TimelineProgressMonitor monitor = null;
00044         if (statusMgr != null) {
00045             monitor = new TimelineProgressMonitor(statusMgr);
00046         }
00047 
00048         final int numWork = getNumberOfRanks();
00049         int num_threads = Math.min(numWork, Runtime.getRuntime().availableProcessors());
00050         final int numWorkPerThreads = (int) Math.ceil((float)numWork / (float)num_threads);
00051         ArrayList<DataReadThread> listThreads = new ArrayList<DataReadThread>(num_threads);
00052         
00053         if (monitor != null) {
00054             monitor.beginProgress(numWork, "Reading data ...", "Metric raw data", Util.getActiveShell());
00055         }
00056         
00057         // --------------------------------------------------------------
00058         // assign each thread for a range of files to gather the data
00059         // --------------------------------------------------------------
00060         for (int i=0; i<num_threads; i++) {
00061             
00062             final int start = i * numWorkPerThreads;
00063             final int end = Math.min(start+numWorkPerThreads, numWork);
00064             
00065             DataReadThread thread = new DataReadThread(nodeIndex, metricIndex, numMetrics, start, end,
00066                     monitor, metrics);
00067             thread.start();
00068             listThreads.add(thread);
00069         }
00070         
00071         // --------------------------------------------------------------
00072         // wait until all threads finish
00073         // --------------------------------------------------------------
00074         while (!listThreads.isEmpty()) {
00075             try {
00076                 for (DataReadThread thread : listThreads) {
00077                     try {
00078                         Thread.sleep(30);
00079                         if (monitor != null) {
00080                             monitor.reportProgress();
00081                         }
00082                     }
00083                     catch (InterruptedException e) {
00084                         e.printStackTrace();
00085                     }
00086                     if (!thread.isAlive()) {
00087                         // The thread has done his job
00088                         // remove it from the list
00089                         listThreads.remove(thread);
00090                         
00091                         // this break is ugly, but needed to avoid using the "old" list
00092                         // we will use the "new" list in the next while iteration
00093                         break;
00094                     }
00095                 }
00096             } catch (ConcurrentModificationException e) {
00097                 // we just remove a thread while iterating a list
00098                 // it's considered as normal behavior. nothing to do.
00099                 System.err.println(e.getMessage());
00100             }
00101         }
00102 
00103         if (monitor != null) {
00104             monitor.endProgress();
00105         }
00106         
00107         return metrics;
00108     }
00109 
00110 
00118     private long getFilePosition(long nodeIndex, int metricIndex, int num_metrics) {
00119         return ((nodeIndex-1) * num_metrics * Constants.SIZEOF_LONG) + (metricIndex * Constants.SIZEOF_LONG) +
00120             // header to skip
00121             HEADER_LONG;
00122     }
00123     
00124     
00125     /***
00126      * Thread helper class to read a range of files
00127      *
00128      */
00129     private class DataReadThread extends Thread 
00130     {
00131         final private long _nodeIndex;
00132         final private int _metricIndex;
00133         final private int _numMetrics;
00134         final private int _indexFileStart, _indexFileEnd;
00135         final private TimelineProgressMonitor _monitor;
00136         final private double _metrics[];
00137         
00138         /***
00139          * Initialization for reading a range of file from indexFileStart to indexFileEnd
00140          * The caller has to create a thread and collect the output from metrics[] variable
00141          * 
00142          * Note: the output metrics has to have the same range as indexFileStart ... indexFileEnd
00143          * 
00144          * @param nodeIndex:    cct node index
00145          * @param metricIndex:  metric index
00146          * @param numMetrics:   number of metrics
00147          * @param indexFileStart:   the beginning of file index
00148          * @param indexFileEnd:     the end of file index
00149          * @param monitor:      monitor for long process
00150          * @param metrics:      output to gather metrics
00151          */
00152         public DataReadThread(long nodeIndex, int metricIndex, int numMetrics,
00153                 int indexFileStart, int indexFileEnd, TimelineProgressMonitor monitor,
00154                 double metrics[]) {
00155             _nodeIndex = nodeIndex;
00156             _metricIndex = metricIndex;
00157             _numMetrics = numMetrics;
00158             _indexFileStart = indexFileStart;
00159             _indexFileEnd = indexFileEnd;
00160             _monitor = monitor;
00161             _metrics = metrics;
00162         }
00163         
00164         public void run() {
00165             final long pos_relative = getFilePosition(_nodeIndex, _metricIndex, _numMetrics);
00166             final LargeByteBuffer masterBuff = getMasterBuffer();
00167             final long offsets[] = getOffsets();
00168             
00169             for (int i=_indexFileStart; i<_indexFileEnd; i++) {
00170                 final long pos_absolute = offsets[i] + pos_relative;
00171                 try {
00172                     _metrics[i] = (double)masterBuff.getDouble(pos_absolute);
00173                 } catch (IOException e) {
00174                     // TODO Auto-generated catch block
00175                     e.printStackTrace();
00176                     break;
00177                 }
00178                 if (_monitor != null) {
00179                     _monitor.announceProgress();
00180                 }
00181             }
00182         }
00183     }
00184 
00185 }

Generated on 5 May 2015 for HPCVIEWER by  doxygen 1.6.1