HPCToolkit
threadmgr.c
Go to the documentation of this file.
1 // -*-Mode: C++;-*- // technically C99
2 
3 // * BeginRiceCopyright *****************************************************
4 //
5 // $HeadURL$
6 // $Id$
7 //
8 // --------------------------------------------------------------------------
9 // Part of HPCToolkit (hpctoolkit.org)
10 //
11 // Information about sources of support for research and development of
12 // HPCToolkit is at 'hpctoolkit.org' and in 'README.Acknowledgments'.
13 // --------------------------------------------------------------------------
14 //
15 // Copyright ((c)) 2002-2019, Rice University
16 // All rights reserved.
17 //
18 // Redistribution and use in source and binary forms, with or without
19 // modification, are permitted provided that the following conditions are
20 // met:
21 //
22 // * Redistributions of source code must retain the above copyright
23 // notice, this list of conditions and the following disclaimer.
24 //
25 // * Redistributions in binary form must reproduce the above copyright
26 // notice, this list of conditions and the following disclaimer in the
27 // documentation and/or other materials provided with the distribution.
28 //
29 // * Neither the name of Rice University (RICE) nor the names of its
30 // contributors may be used to endorse or promote products derived from
31 // this software without specific prior written permission.
32 //
33 // This software is provided by RICE and contributors "as is" and any
34 // express or implied warranties, including, but not limited to, the
35 // implied warranties of merchantability and fitness for a particular
36 // purpose are disclaimed. In no event shall RICE or contributors be
37 // liable for any direct, indirect, incidental, special, exemplary, or
38 // consequential damages (including, but not limited to, procurement of
39 // substitute goods or services; loss of use, data, or profits; or
40 // business interruption) however caused and on any theory of liability,
41 // whether in contract, strict liability, or tort (including negligence
42 // or otherwise) arising in any way out of the use of this software, even
43 // if advised of the possibility of such damage.
44 //
45 // ******************************************************* EndRiceCopyright *
46 
47 //******************************************************************************
48 // File: threadmgr.c:
49 // Purpose: maintain information about the number of live threads
50 //******************************************************************************
51 
52 
53 
54 //******************************************************************************
55 // system include files
56 //******************************************************************************
57 
58 #include <stdint.h>
59 #include <stdlib.h>
60 
61 #include <pthread.h>
62 #include <sys/sysinfo.h>
63 
64 //******************************************************************************
65 // local include files
66 //******************************************************************************
67 #include "threadmgr.h"
68 #include "thread_data.h"
69 #include "write_data.h"
70 #include "trace.h"
71 #include "sample_sources_all.h"
72 
74 #include <lib/prof-lean/spinlock.h>
75 
76 #include <include/queue.h>
77 
78 #include <monitor.h>
79 
80 //******************************************************************************
81 // macro
82 //******************************************************************************
83 
84 #define HPCRUN_OPTION_MERGE_THREAD "HPCRUN_MERGE_THREADS"
85 #define HPCRUN_THREADS_DEBUG 0
86 
87 //******************************************************************************
88 // data structure
89 //******************************************************************************
90 
91 typedef struct thread_list_s {
93  SLIST_ENTRY(thread_list_s) entries;
95 
96 //******************************************************************************
97 // private data
98 //******************************************************************************
99 static atomic_int_least32_t threadmgr_active_threads = ATOMIC_VAR_INIT(1); // one for the process main thread
100 
101 static atomic_int_least32_t threadmgr_num_threads = ATOMIC_VAR_INIT(1); // number of logical threads
102 
103 #if HPCRUN_THREADS_DEBUG
104 static atomic_int_least32_t threadmgr_tot_threads = ATOMIC_VAR_INIT(1); // number of total threads
105 #endif
106 
107 static SLIST_HEAD(thread_list_head, thread_list_s) list_thread_head =
108  SLIST_HEAD_INITIALIZER(thread_list_head);
109 
110 static spinlock_t threaddata_lock = SPINLOCK_UNLOCKED;
111 
112 //******************************************************************************
113 // private operations
114 //******************************************************************************
115 
116 static void
117 adjust_thread_count(int32_t val)
118 {
120 }
121 
122 static int32_t
124 {
126 }
127 
128 
129 static int32_t
131 {
133 }
134 
135 static bool
137 {
139 }
140 
141 static thread_data_t *
143 {
145 
146  // we need to immediately set the thread data here since the following statements
147  // under the hood, will call get_thread_data()
148 
150 
151  // ----------------------------------------
152  // need to initialize thread_data here. before calling hpcrun_thread_data_init,
153  // make sure we already call hpcrun_set_thread_data to set the variable.
154  // ----------------------------------------
156 
157  // ----------------------------------------
158  // set up initial 'epoch'
159  // ----------------------------------------
160  TMSG(EPOCH,"process init setting up initial epoch/loadmap");
161  hpcrun_epoch_init(thr_ctxt);
162 
163  // ----------------------------------------
164  // opening trace file
165  // ----------------------------------------
167 
168  return data;
169 }
170 
171 static void
173 {
174  hpcrun_write_profile_data( current_data );
175  hpcrun_trace_close( current_data );
176 }
177 
178 
179 static thread_list_t *
181 {
182  spinlock_lock(&threaddata_lock);
183 
184  thread_list_t *item = NULL;
185 
186  if (!SLIST_EMPTY(&list_thread_head)) {
187  item = SLIST_FIRST(&list_thread_head);
188  SLIST_REMOVE_HEAD(&list_thread_head, entries);
189  }
190 
191  spinlock_unlock(&threaddata_lock);
192 
193  return item;
194 }
195 
196 
197 static void*
199 {
200  thread_list_t *data = (thread_list_t *) arg;
201 
202  while (data != NULL) {
204  finalize_thread_data(cptd);
205 
206  TMSG(PROCESS, "%d: write thread data", cptd->id);
207 
208  data = grab_thread_data();
209  }
210  return NULL;
211 }
212 
213 //******************************************************************************
214 // interface operations
215 //******************************************************************************
216 
217 void
219 {
220  adjust_thread_count(1);
221 }
222 
223 
224 void
226 {
227  adjust_thread_count(-1);
228 }
229 
230 
231 int
233 {
235 }
236 
243 int
245 {
246  static int compact_thread = -1;
247 
248  if (compact_thread >= 0) {
249  return compact_thread;
250  }
251 
252  char *env_option = getenv(HPCRUN_OPTION_MERGE_THREAD);
253  if (env_option) {
254  compact_thread = atoi(env_option);
255  EMSG("hpcrun compact thread: %d", compact_thread);
256  } else {
257  compact_thread = OPTION_COMPACT_THREAD;
258  }
259  return compact_thread;
260 }
261 
262 
263 /***
264  * get pointer of thread local data
265  * two possibilities:
266  * - if we don't want compact thread, we just allocate and return
267  * - if we want a compact thread, we check if there is already unused thread data
268  * - if there is an unused thread data, we'll reuse it again
269  * - if there is no more thread data available, we need to allocate a new one
270  *
271  * Return true if we allocate a new thread data,
272  * false if we reuse an existing data
273  *****/
274 bool
276 {
277  // -----------------------------------------------------------------
278  // if we don't want coalesce threads, just allocate it and return
279  // -----------------------------------------------------------------
280 
281  if (!is_compact_thread()) {
282  *data = allocate_and_init_thread_data(id, thr_ctxt);
283  return true;
284  }
285 
286  // -----------------------------------------------------------------
287  // try to grab existing unused context. If no context available,
288  // we need to allocate a new one.
289  // -----------------------------------------------------------------
290 
292  *data = item ? item->thread_data : NULL;
293  bool need_to_allocate = (*data == NULL);
294 
295  if (need_to_allocate) {
296 
298  *data = allocate_and_init_thread_data(id, thr_ctxt);
299  }
300 
301 #if HPCRUN_THREADS_DEBUG
302  atomic_fetch_add_explicit(&threadmgr_tot_threads, 1, memory_order_relaxed);
303 #endif
304 
305  return need_to_allocate;
306 }
307 
308 
309 void
311 {
312 
313  // ---------------------------------------------------------------------
314  // case 1: non-compact threads:
315  // if it's in non-compact thread, we write the profile data,
316  // close the trace file, and exit
317  // ---------------------------------------------------------------------
318 
320 
323 
324  return;
325  }
326 
327  // ---------------------------------------------------------------------
328  // case 2: compact threads mode
329  // enqueue the thread data in the list to be reused by
330  // other thread (if any). The thread data will be written
331  // to the file at the end of the process
332  // ---------------------------------------------------------------------
333 
334  // step 1: enqueue thread data into the free list
335 
336  spinlock_lock(&threaddata_lock);
337 
338  thread_list_t *list_item = (thread_list_t *) hpcrun_malloc(sizeof(thread_list_t));
339  list_item->thread_data = data;
340  SLIST_INSERT_HEAD(&list_thread_head, list_item, entries);
341 
342  spinlock_unlock(&threaddata_lock);
343 
344  // step 2: get the dummy node that marks the end of the thread trace
345 
347  hpcrun_trace_append(&(data->core_profile_trace_data), node, 0);
348 
349  TMSG(PROCESS, "%d: release thread data", data->core_profile_trace_data.id);
350 }
351 
352 
353 void
355 {
356  int num_cores = get_nprocs();
357  int num_log_thr = get_num_logical_threads();
358  int max_iter = num_cores < num_log_thr ? num_cores : num_log_thr;
359  int num_threads = 0;
360 
361  // -----------------------------------------------------------------
362  // make sure we disable monitoring threads
363  // -----------------------------------------------------------------
364 
365  monitor_disable_new_threads();
366 
367  // -----------------------------------------------------------------
368  // Create threads to distribute the finalization work
369  // The number of created threads is the minimum of:
370  // - the number of thread data in the queue
371  // - the number of cores
372  // - the number of logical threads
373  // -----------------------------------------------------------------
374 
375  pthread_t threads[max_iter];
376  pthread_attr_t attr;
377 
378  pthread_attr_init(&attr);
379  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
380 
381  for (int i=0; !SLIST_EMPTY(&list_thread_head) && i < max_iter; i++)
382  {
383  thread_list_t * item = grab_thread_data();
384 
385  int rc = pthread_create( &threads[i], &attr, finalize_all_thread_data,
386  (void*) item);
387  if (rc) {
388  EMSG("Error cannot create thread %d with return code: %d",
389  (item ? item->thread_data->core_profile_trace_data.id : -1), rc);
390  break;
391  }
392  num_threads++;
393  }
394 
395  // -----------------------------------------------------------------
396  // wait until all worker threads finish the work
397  // -----------------------------------------------------------------
398 
399  pthread_attr_destroy(&attr);
400  void *status;
401 
402  for(int i=0; i<num_threads; i++) {
403  int rc = pthread_join(threads[i], &status);
404  if (rc) {
405  EMSG("Error: return code from pthread_join: %d for thread #%d", rc, i);
406  }
407  }
408 
409  // -----------------------------------------------------------------
410  // enable monitor new threads
411  // -----------------------------------------------------------------
412 
413  monitor_enable_new_threads();
414 
415  // -----------------------------------------------------------------
416  // main thread (thread 0) may not be in the list
417  // for sequential or pure MPI programs, they don't have list of thread data in this queue.
418  // hence, we need to process specifically here
419  // -----------------------------------------------------------------
420 
421  if (td && td->core_profile_trace_data.id == 0) {
422 
424 
425  TMSG(PROCESS, "%d: write thread data, finally", td->core_profile_trace_data.id);
426  }
427 #if HPCRUN_THREADS_DEBUG
428  int tot_threads = atomic_load_explicit(&threadmgr_tot_threads, memory_order_relaxed);
429  EMSG("Total threads: %d, logical threads: %d", tot_threads, num_threads);
430 #endif
431 }
432 
433 
static atomic_int_least32_t threadmgr_num_threads
Definition: threadmgr.c:101
void hpcrun_threadMgr_data_fini(thread_data_t *td)
Definition: threadmgr.c:354
void hpcrun_set_thread_data(thread_data_t *td)
Definition: thread_data.c:116
static SLIST_HEAD(thread_list_head, thread_list_s)
Definition: threadmgr.c:107
#define SLIST_HEAD_INITIALIZER(head)
Definition: queue.h:190
static int32_t get_num_logical_threads()
Definition: threadmgr.c:130
struct thread_list_s thread_list_t
static thread_list_t * grab_thread_data()
Definition: threadmgr.c:180
void hpcrun_trace_open(core_profile_trace_data_t *cptd)
Definition: trace.c:124
#define atomic_fetch_add_explicit(object, operand, order)
Definition: stdatomic.h:366
#define ATOMIC_VAR_INIT(value)
Definition: stdatomic.h:132
void hpcrun_threadmgr_thread_delete()
Definition: threadmgr.c:225
thread_data_t * thread_data
Definition: threadmgr.c:92
int hpcrun_threadmgr_thread_count()
Definition: threadmgr.c:232
static void spinlock_unlock(spinlock_t *l)
Definition: spinlock.h:96
void hpcrun_threadMgr_data_put(epoch_t *epoch, thread_data_t *data)
Definition: threadmgr.c:310
void hpcrun_trace_append(core_profile_trace_data_t *cptd, cct_node_t *node, uint metric_id)
Definition: trace.c:173
#define SLIST_REMOVE_HEAD(head, field)
Definition: queue.h:293
static int32_t adjust_num_logical_threads(int32_t val)
Definition: threadmgr.c:123
#define OPTION_NO_COMPACT_THREAD
Definition: threadmgr.h:65
static atomic_int_least32_t threadmgr_active_threads
Definition: threadmgr.c:99
static bool is_compact_thread()
Definition: threadmgr.c:136
size_t hpcrun_get_num_sample_sources(void)
cct_node_t * node
Definition: cct.c:128
#define OPTION_COMPACT_THREAD
Definition: threadmgr.h:66
void hpcrun_thread_data_init(int id, cct_ctxt_t *thr_ctxt, int is_child, size_t n_sources)
Definition: thread_data.c:265
#define atomic_load_explicit(object, order)
Definition: stdatomic.h:378
void hpcrun_trace_close(core_profile_trace_data_t *cptd)
Definition: trace.c:195
int hpcrun_write_profile_data(core_profile_trace_data_t *cptd)
Definition: write_data.c:329
cct_bundle_t csdata
Definition: epoch.h:65
bool hpcrun_threadMgr_data_get(int id, cct_ctxt_t *thr_ctxt, thread_data_t **data)
Definition: threadmgr.c:275
void hpcrun_threadmgr_thread_new()
Definition: threadmgr.c:218
void hpcrun_epoch_init(cct_ctxt_t *ctxt)
Definition: epoch.c:72
static void finalize_thread_data(core_profile_trace_data_t *current_data)
Definition: threadmgr.c:172
#define EMSG
Definition: messages.h:70
#define SLIST_FIRST(head)
Definition: queue.h:231
#define SLIST_EMPTY(head)
Definition: queue.h:229
static void * finalize_all_thread_data(void *arg)
Definition: threadmgr.c:198
core_profile_trace_data_t core_profile_trace_data
Definition: thread_data.h:168
cct_node_t * hpcrun_cct_bundle_get_nothread_node(cct_bundle_t *cct)
Definition: cct_bundle.c:180
Definition: epoch.h:64
void * hpcrun_malloc(size_t size)
Definition: mem.c:275
int hpcrun_threadMgr_compact_thread()
Definition: threadmgr.c:244
static void spinlock_lock(spinlock_t *l)
Definition: spinlock.h:111
#define TMSG(f,...)
Definition: messages.h:93
#define NULL
Definition: ElfHelper.cpp:85
Definition: cct.c:96
#define SLIST_ENTRY(type)
Definition: queue.h:193
#define SPINLOCK_UNLOCKED
Definition: spinlock.h:84
#define HPCRUN_OPTION_MERGE_THREAD
Definition: threadmgr.c:84
static thread_data_t * allocate_and_init_thread_data(int id, cct_ctxt_t *thr_ctxt)
Definition: threadmgr.c:142
thread_data_t * hpcrun_allocate_thread_data(int id)
Definition: thread_data.c:208
#define SLIST_INSERT_HEAD(head, elm, field)
Definition: queue.h:267