StarPU Internal Handbook
workers.h
1/* StarPU --- Runtime system for heterogeneous multicore architectures.
2 *
3 * Copyright (C) 2008-2021 Université de Bordeaux, CNRS (LaBRI UMR 5800), Inria
4 * Copyright (C) 2013 Thibaut Lambert
5 * Copyright (C) 2016 Uppsala University
6 *
7 * StarPU is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU Lesser General Public License as published by
9 * the Free Software Foundation; either version 2.1 of the License, or (at
10 * your option) any later version.
11 *
12 * StarPU is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
15 *
16 * See the GNU Lesser General Public License in COPYING.LGPL for more details.
17 */
18
19#ifndef __WORKERS_H__
20#define __WORKERS_H__
21
23/* @{ */
24
25#include <limits.h>
26
27#include <starpu.h>
28#include <common/config.h>
29#include <common/timing.h>
30#include <common/fxt.h>
31#include <common/thread.h>
32#include <common/utils.h>
33#include <core/jobs.h>
35#include <core/sched_policy.h>
36#include <core/topology.h>
37#include <core/errorcheck.h>
38#include <core/sched_ctx.h>
39#include <core/sched_ctx_list.h>
40#include <core/simgrid.h>
41#ifdef STARPU_HAVE_HWLOC
42#include <hwloc.h>
43#endif
44
45#include <core/drivers.h>
48
49#ifdef STARPU_USE_MIC
51#endif /* STARPU_USE_MIC */
52
53#ifdef STARPU_USE_MPI_MASTER_SLAVE
55#endif
56
58
60
61#include <starpu_parameters.h>
62
63#define STARPU_MAX_PIPELINE 4
64
65enum initialization { UNINITIALIZED = 0, CHANGING, INITIALIZED };
66
67struct _starpu_ctx_change_list;
68
71 struct _starpu_machine_config *config;
72 starpu_pthread_mutex_t mutex;
73 enum starpu_worker_archtype arch;
74 uint32_t worker_mask;
75 struct starpu_perfmodel_arch perf_arch;
76 starpu_pthread_t worker_thread;
77 unsigned devid;
78 unsigned subworkerid;
79 int bindid;
84 starpu_pthread_cond_t started_cond;
85 starpu_pthread_cond_t ready_cond;
86 unsigned memory_node;
92 starpu_pthread_cond_t sched_cond;
93 starpu_pthread_mutex_t sched_mutex;
95#ifdef STARPU_SPINLOCK_CHECK
96 const char *relax_on_file;
97 int relax_on_line;
98 const char *relax_on_func;
99 const char *relax_off_file;
100 int relax_off_line;
101 const char *relax_off_func;
102#endif
120 starpu_pthread_t thread_changing_ctx;
128 struct _starpu_ctx_change_list ctx_change_list;
129 struct starpu_task_list local_tasks;
130 struct starpu_task **local_ordered_tasks;
134 struct starpu_task *current_task;
135 struct starpu_task *current_tasks[STARPU_MAX_PIPELINE];
136#ifdef STARPU_SIMGRID
137 starpu_pthread_wait_t wait;
138#endif
139
140 struct timespec cl_start;
141 struct timespec cl_end;
142 unsigned char first_task;
143 unsigned char ntasks;
144 unsigned char pipeline_length;
145 unsigned char pipeline_stuck;
147 unsigned worker_is_running;
148 unsigned worker_is_initialized;
151 char name[128];
152 char short_name[32];
153 unsigned run_by_starpu;
154 struct _starpu_driver_ops *driver_ops;
155
156 struct _starpu_sched_ctx_list *sched_ctx_list;
157 int tmp_sched_ctx;
158 unsigned nsched_ctxs;
159 struct _starpu_barrier_counter tasks_barrier;
161 unsigned has_prev_init;
163 unsigned removed_from_ctx[STARPU_NMAX_SCHED_CTXS+1];
164
169 struct starpu_task *task_transferring;
175 unsigned shares_tasks_lists[STARPU_NMAX_SCHED_CTXS+1];
176
177 unsigned poped_in_ctx[STARPU_NMAX_SCHED_CTXS+1];
183 unsigned reverse_phase[2];
184
188 struct _starpu_sched_ctx *stream_ctx;
189
190#ifdef __GLIBC__
191 cpu_set_t cpu_set;
192#endif /* __GLIBC__ */
193#ifdef STARPU_HAVE_HWLOC
194 hwloc_bitmap_t hwloc_cpu_set;
195 hwloc_obj_t hwloc_obj;
196#endif
197
199 char padding[STARPU_CACHELINE_SIZE];
200);
201
203{
204 struct starpu_perfmodel_arch perf_arch;
205 uint32_t worker_mask;
206 int worker_size;
207 unsigned memory_node;
208 int combined_workerid[STARPU_NMAXWORKERS];
209#ifdef STARPU_USE_MP
210 int count;
211 starpu_pthread_mutex_t count_mutex;
212#endif
213
214#ifdef __GLIBC__
215 cpu_set_t cpu_set;
216#endif /* __GLIBC__ */
217#ifdef STARPU_HAVE_HWLOC
218 hwloc_bitmap_t hwloc_cpu_set;
219#endif
220
223 char padding[STARPU_CACHELINE_SIZE];
224};
225
231{
232 starpu_pthread_mutex_t mutex;
233 starpu_pthread_t worker_thread;
234 unsigned nworkers;
235 unsigned started;
236 void *retval;
237 struct _starpu_worker *workers;
238 starpu_pthread_cond_t ready_cond;
239 unsigned set_is_initialized;
240};
241
242#ifdef STARPU_USE_MPI_MASTER_SLAVE
243extern struct _starpu_worker_set mpi_worker_set[STARPU_MAXMPIDEVS];
244#endif
245
247{
249 unsigned nworkers;
250
253
254 unsigned nsched_ctxs;
255
256#ifdef STARPU_HAVE_HWLOC
258 hwloc_topology_t hwtopology;
259#endif
261 struct starpu_tree *tree;
262
266 unsigned nhwcpus;
267
271 unsigned nhwpus;
272
276 unsigned nhwcudagpus;
277
282
286 unsigned nhwmpi;
287
289 unsigned ncpus;
290
292 unsigned ncudagpus;
293 unsigned nworkerpercuda;
294 int cuda_th_per_stream;
295 int cuda_th_per_dev;
296
298 unsigned nopenclgpus;
299
301 unsigned nmpidevices;
302 unsigned nhwmpidevices;
303
304 unsigned nhwmpicores[STARPU_MAXMPIDEVS];
305 unsigned nmpicores[STARPU_MAXMPIDEVS];
306
310 unsigned nmicdevices;
311
312 unsigned nhwmiccores[STARPU_MAXMICDEVS];
313 unsigned nmiccores[STARPU_MAXMICDEVS];
314
322 unsigned workers_bindid[STARPU_NMAXWORKERS];
323
330 unsigned workers_cuda_gpuid[STARPU_NMAXWORKERS];
331
338 unsigned workers_opencl_gpuid[STARPU_NMAXWORKERS];
339
340 /*** Indicates the successive MIC devices that should be used
341 * by the MIC driver. It is either filled according to the
342 * user's explicit parameters (from starpu_conf) or according
343 * to the STARPU_WORKERS_MICID env. variable. Otherwise, they
344 * are taken in ID order. */
348 unsigned workers_mpi_ms_deviceid[STARPU_NMAXWORKERS];
349};
350
352{
353 struct _starpu_machine_topology topology;
354
355#ifdef STARPU_HAVE_HWLOC
356 int cpu_depth;
357 int pu_depth;
358#endif
359
362 char currently_bound[STARPU_NMAXWORKERS];
363 char currently_shared[STARPU_NMAXWORKERS];
364
367
370
373
376
387
389 char padding1[STARPU_CACHELINE_SIZE];
390
393 struct _starpu_worker workers[STARPU_NMAXWORKERS];
394
397 struct _starpu_combined_worker combined_workers[STARPU_NMAX_COMBINEDWORKERS];
398
399 starpu_pthread_mutex_t submitted_mutex;
400
402 char padding2[STARPU_CACHELINE_SIZE];
403
405 struct
406 {
407 int *workerids;
408 unsigned nworkers;
410 unsigned nbindid;
415 uint32_t worker_mask;
416
418 struct starpu_conf conf;
419
421 unsigned running;
422
423 int disable_kernels;
424
428
430 struct _starpu_sched_ctx sched_ctxs[STARPU_NMAX_SCHED_CTXS+1];
431
433 unsigned submitting;
434
435 int watchdog_ok;
436};
437
438extern int _starpu_worker_parallel_blocks;
439
440extern struct _starpu_machine_config _starpu_config STARPU_ATTRIBUTE_INTERNAL;
441extern int _starpu_keys_initialized STARPU_ATTRIBUTE_INTERNAL;
442extern starpu_pthread_key_t _starpu_worker_key STARPU_ATTRIBUTE_INTERNAL;
443extern starpu_pthread_key_t _starpu_worker_set_key STARPU_ATTRIBUTE_INTERNAL;
444
446void _starpu_set_argc_argv(int *argc, char ***argv);
447int *_starpu_get_argc();
448char ***_starpu_get_argv();
449
451void _starpu_conf_check_environment(struct starpu_conf *conf);
452
454void _starpu_may_pause(void);
455
457static inline unsigned _starpu_machine_is_running(void)
458{
459 unsigned ret;
460 /* running is just protected by a memory barrier */
461 STARPU_RMB();
462
463 ANNOTATE_HAPPENS_AFTER(&_starpu_config.running);
464 ret = _starpu_config.running;
465 ANNOTATE_HAPPENS_BEFORE(&_starpu_config.running);
466 return ret;
467}
468
469
471void _starpu_worker_init(struct _starpu_worker *workerarg, struct _starpu_machine_config *pconfig);
472
474uint32_t _starpu_worker_exists(struct starpu_task *);
475
477uint32_t _starpu_can_submit_cuda_task(void);
478
480uint32_t _starpu_can_submit_cpu_task(void);
481
483uint32_t _starpu_can_submit_opencl_task(void);
484
487unsigned _starpu_worker_can_block(unsigned memnode, struct _starpu_worker *worker);
488
492void _starpu_block_worker(int workerid, starpu_pthread_cond_t *cond, starpu_pthread_mutex_t *mutex);
493
495void _starpu_driver_start(struct _starpu_worker *worker, unsigned fut_key, unsigned sync);
497void _starpu_worker_start(struct _starpu_worker *worker, unsigned fut_key, unsigned sync);
498
499static inline unsigned _starpu_worker_get_count(void)
500{
501 return _starpu_config.topology.nworkers;
502}
503#define starpu_worker_get_count _starpu_worker_get_count
504
508static inline void _starpu_set_local_worker_key(struct _starpu_worker *worker)
509{
510 STARPU_ASSERT(_starpu_keys_initialized);
511 STARPU_PTHREAD_SETSPECIFIC(_starpu_worker_key, worker);
512}
513
516static inline struct _starpu_worker *_starpu_get_local_worker_key(void)
517{
518 if (!_starpu_keys_initialized)
519 return NULL;
520 return (struct _starpu_worker *) STARPU_PTHREAD_GETSPECIFIC(_starpu_worker_key);
521}
522
526static inline void _starpu_set_local_worker_set_key(struct _starpu_worker_set *worker)
527{
528 STARPU_ASSERT(_starpu_keys_initialized);
529 STARPU_PTHREAD_SETSPECIFIC(_starpu_worker_set_key, worker);
530}
531
534static inline struct _starpu_worker_set *_starpu_get_local_worker_set_key(void)
535{
536 if (!_starpu_keys_initialized)
537 return NULL;
538 return (struct _starpu_worker_set *) STARPU_PTHREAD_GETSPECIFIC(_starpu_worker_set_key);
539}
540
543static inline struct _starpu_worker *_starpu_get_worker_struct(unsigned id)
544{
545 STARPU_ASSERT(id < starpu_worker_get_count());
546 return &_starpu_config.workers[id];
547}
548
551static inline struct _starpu_sched_ctx *_starpu_get_sched_ctx_struct(unsigned id)
552{
553 return (id > STARPU_NMAX_SCHED_CTXS) ? NULL : &_starpu_config.sched_ctxs[id];
554}
555
556struct _starpu_combined_worker *_starpu_get_combined_worker_struct(unsigned id);
557
560static inline struct _starpu_machine_config *_starpu_get_machine_config(void)
561{
562 return &_starpu_config;
563}
564
566static inline int _starpu_get_disable_kernels(void)
567{
568 return _starpu_config.disable_kernels;
569}
570
572static inline enum _starpu_worker_status _starpu_worker_get_status(int workerid)
573{
574 return _starpu_config.workers[workerid].status;
575}
576
579static inline void _starpu_worker_set_status(int workerid, enum _starpu_worker_status status)
580{
581 _starpu_config.workers[workerid].status = status;
582}
583
585static inline struct _starpu_sched_ctx* _starpu_get_initial_sched_ctx(void)
586{
587 return &_starpu_config.sched_ctxs[STARPU_GLOBAL_SCHED_CTX];
588}
589
590int starpu_worker_get_nids_by_type(enum starpu_worker_archtype type, int *workerids, int maxsize);
591
594int starpu_worker_get_nids_ctx_free_by_type(enum starpu_worker_archtype type, int *workerids, int maxsize);
595
596static inline unsigned _starpu_worker_mutex_is_sched_mutex(int workerid, starpu_pthread_mutex_t *mutex)
597{
598 struct _starpu_worker *w = _starpu_get_worker_struct(workerid);
599 return &w->sched_mutex == mutex;
600}
601
602static inline int _starpu_worker_get_nsched_ctxs(int workerid)
603{
604 return _starpu_config.workers[workerid].nsched_ctxs;
605}
606
608static inline unsigned _starpu_get_nsched_ctxs(void)
609{
610 /* topology.nsched_ctxs may be increased asynchronously in sched_ctx_create */
611 STARPU_RMB();
612 return _starpu_config.topology.nsched_ctxs;
613}
614
616static inline int _starpu_worker_get_id(void)
617{
618 struct _starpu_worker * worker;
619
620 worker = _starpu_get_local_worker_key();
621 if (worker)
622 {
623 return worker->workerid;
624 }
625 else
626 {
627 /* there is no worker associated to that thread, perhaps it is
628 * a thread from the application or this is some SPU worker */
629 return -1;
630 }
631}
632#define starpu_worker_get_id _starpu_worker_get_id
633
636static inline unsigned __starpu_worker_get_id_check(const char *f, int l)
637{
638 (void) l;
639 (void) f;
640 int id = starpu_worker_get_id();
641 STARPU_ASSERT_MSG(id>=0, "%s:%d Cannot be called from outside a worker\n", f, l);
642 return id;
643}
644#define _starpu_worker_get_id_check(f,l) __starpu_worker_get_id_check(f,l)
645
646enum starpu_node_kind _starpu_worker_get_node_kind(enum starpu_worker_archtype type);
647
648void _starpu_worker_set_stream_ctx(unsigned workerid, struct _starpu_sched_ctx *sched_ctx);
649
650struct _starpu_sched_ctx* _starpu_worker_get_ctx_stream(unsigned stream_workerid);
651
657static inline void _starpu_worker_request_blocking_in_parallel(struct _starpu_worker * const worker)
658{
659 _starpu_worker_parallel_blocks = 1;
660 /* flush pending requests to start on a fresh transaction epoch */
661 while (worker->state_unblock_in_parallel_req)
662 STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
663
664 /* announce blocking intent */
665 STARPU_ASSERT(worker->block_in_parallel_ref_count < UINT_MAX);
667
668 if (worker->block_in_parallel_ref_count == 1)
669 {
670 /* only the transition from 0 to 1 triggers the block_in_parallel_req */
671
672 STARPU_ASSERT(!worker->state_blocked_in_parallel);
673 STARPU_ASSERT(!worker->state_block_in_parallel_req);
674 STARPU_ASSERT(!worker->state_block_in_parallel_ack);
675 STARPU_ASSERT(!worker->state_unblock_in_parallel_req);
676 STARPU_ASSERT(!worker->state_unblock_in_parallel_ack);
677
678 /* trigger the block_in_parallel_req */
679 worker->state_block_in_parallel_req = 1;
680 STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
681#ifdef STARPU_SIMGRID
682 starpu_pthread_queue_broadcast(&_starpu_simgrid_task_queue[worker->workerid]);
683#endif
684
685 /* wait for block_in_parallel_req to be processed */
686 while (!worker->state_block_in_parallel_ack)
687 STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
688
689 STARPU_ASSERT(worker->block_in_parallel_ref_count >= 1);
690 STARPU_ASSERT(worker->state_block_in_parallel_req);
691 STARPU_ASSERT(worker->state_blocked_in_parallel);
692
693 /* reset block_in_parallel_req state flags */
694 worker->state_block_in_parallel_req = 0;
695 worker->state_block_in_parallel_ack = 0;
696
697 /* broadcast block_in_parallel_req state flags reset */
698 STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
699 }
700}
701
706static inline void _starpu_worker_request_unblocking_in_parallel(struct _starpu_worker * const worker)
707{
708 /* flush pending requests to start on a fresh transaction epoch */
709 while (worker->state_block_in_parallel_req)
710 STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
711
712 /* unblocking may be requested unconditionnally
713 * thus, check is unblocking is really needed */
714 if (worker->state_blocked_in_parallel)
715 {
716 if (worker->block_in_parallel_ref_count == 1)
717 {
718 /* only the transition from 1 to 0 triggers the unblock_in_parallel_req */
719
720 STARPU_ASSERT(!worker->state_block_in_parallel_req);
721 STARPU_ASSERT(!worker->state_block_in_parallel_ack);
722 STARPU_ASSERT(!worker->state_unblock_in_parallel_req);
723 STARPU_ASSERT(!worker->state_unblock_in_parallel_ack);
724
725 /* trigger the unblock_in_parallel_req */
727 STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
728
729 /* wait for the unblock_in_parallel_req to be processed */
730 while (!worker->state_unblock_in_parallel_ack)
731 STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
732
733 STARPU_ASSERT(worker->state_unblock_in_parallel_req);
734 STARPU_ASSERT(!worker->state_blocked_in_parallel);
735
736 /* reset unblock_in_parallel_req state flags */
739
740 /* broadcast unblock_in_parallel_req state flags reset */
741 STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
742 }
743
744 /* announce unblocking complete */
745 STARPU_ASSERT(worker->block_in_parallel_ref_count > 0);
747 }
748}
749
755static inline void _starpu_worker_process_block_in_parallel_requests(struct _starpu_worker * const worker)
756{
757 while (worker->state_block_in_parallel_req)
758 {
759 STARPU_ASSERT(!worker->state_blocked_in_parallel);
760 STARPU_ASSERT(!worker->state_block_in_parallel_ack);
761 STARPU_ASSERT(!worker->state_unblock_in_parallel_req);
762 STARPU_ASSERT(!worker->state_unblock_in_parallel_ack);
763 STARPU_ASSERT(worker->block_in_parallel_ref_count > 0);
764
765 /* enter effective blocked state */
766 worker->state_blocked_in_parallel = 1;
767
768 /* notify block_in_parallel_req processing */
769 worker->state_block_in_parallel_ack = 1;
770 STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
771
772 /* block */
773 while (!worker->state_unblock_in_parallel_req)
774 STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
775
776 STARPU_ASSERT(worker->state_blocked_in_parallel);
777 STARPU_ASSERT(!worker->state_block_in_parallel_req);
778 STARPU_ASSERT(!worker->state_block_in_parallel_ack);
779 STARPU_ASSERT(!worker->state_unblock_in_parallel_ack);
780 STARPU_ASSERT(worker->block_in_parallel_ref_count > 0);
781
782 /* leave effective blocked state */
783 worker->state_blocked_in_parallel = 0;
784
785 /* notify unblock_in_parallel_req processing */
787 STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
788 }
789}
790
807#ifdef STARPU_SPINLOCK_CHECK
808static inline void __starpu_worker_enter_sched_op(struct _starpu_worker * const worker, const char*file, int line, const char* func)
809#else
810static inline void _starpu_worker_enter_sched_op(struct _starpu_worker * const worker)
811#endif
812{
813 STARPU_ASSERT(!worker->state_sched_op_pending);
815 {
816 /* process pending block requests before entering a sched_op region */
817 _starpu_worker_process_block_in_parallel_requests(worker);
818 while (worker->state_changing_ctx_notice)
819 {
820 STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
821
822 /* new block requests may have been triggered during the wait,
823 * need to check again */
824 _starpu_worker_process_block_in_parallel_requests(worker);
825 }
826 }
827 else
828 {
829 /* if someone observed the worker state since the last call, postpone block request
830 * processing for one sched_op turn more, because the observer will not have seen
831 * new block requests between its observation and now.
832 *
833 * however, the worker still has to wait for context change operations to complete
834 * before entering sched_op again*/
835 while (worker->state_changing_ctx_notice)
836 {
837 STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
838 }
839 }
840
841 /* no block request and no ctx change ahead,
842 * enter sched_op */
843 worker->state_sched_op_pending = 1;
845 worker->state_relax_refcnt = 0;
846#ifdef STARPU_SPINLOCK_CHECK
847 worker->relax_on_file = file;
848 worker->relax_on_line = line;
849 worker->relax_on_func = func;
850#endif
851}
852#ifdef STARPU_SPINLOCK_CHECK
853#define _starpu_worker_enter_sched_op(worker) __starpu_worker_enter_sched_op((worker), __FILE__, __LINE__, __starpu_func__)
854#endif
855
861#ifdef STARPU_SPINLOCK_CHECK
862static inline void __starpu_worker_leave_sched_op(struct _starpu_worker * const worker, const char*file, int line, const char* func)
863#else
864static inline void _starpu_worker_leave_sched_op(struct _starpu_worker * const worker)
865#endif
866{
867 STARPU_ASSERT(worker->state_sched_op_pending);
868 worker->state_relax_refcnt = 1;
869#ifdef STARPU_SPINLOCK_CHECK
870 worker->relax_off_file = file;
871 worker->relax_off_line = line;
872 worker->relax_off_func = func;
873#endif
874 worker->state_sched_op_pending = 0;
875 STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
877}
878#ifdef STARPU_SPINLOCK_CHECK
879#define _starpu_worker_leave_sched_op(worker) __starpu_worker_leave_sched_op((worker), __FILE__, __LINE__, __starpu_func__)
880#endif
881
882static inline int _starpu_worker_sched_op_pending(void)
883{
884 int workerid = starpu_worker_get_id();
885 if (workerid == -1)
886 return 0;
887 struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
888 STARPU_ASSERT(worker != NULL);
889 return worker->state_sched_op_pending;
890}
891
901static inline void _starpu_worker_enter_changing_ctx_op(struct _starpu_worker * const worker)
902{
903 STARPU_ASSERT(!starpu_pthread_equal(worker->thread_changing_ctx, starpu_pthread_self()));
904 /* flush pending requests to start on a fresh transaction epoch */
905 while (worker->state_changing_ctx_notice)
906 STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
907
908 /* announce changing_ctx intent
909 *
910 * - an already started sched_op is allowed to complete
911 * - no new sched_op may be started
912 */
913 worker->state_changing_ctx_notice = 1;
914
915 worker->thread_changing_ctx = starpu_pthread_self();
916
917 /* allow for an already started sched_op to complete */
918 if (worker->state_sched_op_pending)
919 {
920 /* request sched_op to broadcast when way is cleared */
921 worker->state_changing_ctx_waiting = 1;
922
923 /* wait for sched_op completion */
924 STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
925#ifdef STARPU_SIMGRID
926 starpu_pthread_queue_broadcast(&_starpu_simgrid_task_queue[worker->workerid]);
927#endif
928 do
929 {
930 STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
931 }
932 while (worker->state_sched_op_pending);
933
934 /* reset flag so other sched_ops wont have to broadcast state */
935 worker->state_changing_ctx_waiting = 0;
936 }
937}
938
943static inline void _starpu_worker_leave_changing_ctx_op(struct _starpu_worker * const worker)
944{
945 worker->thread_changing_ctx = (starpu_pthread_t)0;
946 worker->state_changing_ctx_notice = 0;
947 STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
948}
949
952#ifdef STARPU_SPINLOCK_CHECK
953static inline void __starpu_worker_relax_on(const char*file, int line, const char* func)
954#else
955static inline void _starpu_worker_relax_on(void)
956#endif
957{
958 struct _starpu_worker *worker = _starpu_get_local_worker_key();
959 if (worker == NULL)
960 return;
961 if (!worker->state_sched_op_pending)
962 return;
963 STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
964#ifdef STARPU_SPINLOCK_CHECK
965 STARPU_ASSERT_MSG(worker->state_relax_refcnt<UINT_MAX, "relax last turn on in %s (%s:%d)\n", worker->relax_on_func, worker->relax_on_file, worker->relax_on_line);
966#else
967 STARPU_ASSERT(worker->state_relax_refcnt<UINT_MAX);
968#endif
969 worker->state_relax_refcnt++;
970#ifdef STARPU_SPINLOCK_CHECK
971 worker->relax_on_file = file;
972 worker->relax_on_line = line;
973 worker->relax_on_func = func;
974#endif
975 STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
976 STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
977}
978#ifdef STARPU_SPINLOCK_CHECK
979#define _starpu_worker_relax_on() __starpu_worker_relax_on(__FILE__, __LINE__, __starpu_func__)
980#endif
981#define starpu_worker_relax_on _starpu_worker_relax_on
982
984#ifdef STARPU_SPINLOCK_CHECK
985static inline void __starpu_worker_relax_on_locked(struct _starpu_worker *worker, const char*file, int line, const char* func)
986#else
987static inline void _starpu_worker_relax_on_locked(struct _starpu_worker *worker)
988#endif
989{
990 if (!worker->state_sched_op_pending)
991 return;
992#ifdef STARPU_SPINLOCK_CHECK
993 STARPU_ASSERT_MSG(worker->state_relax_refcnt<UINT_MAX, "relax last turn on in %s (%s:%d)\n", worker->relax_on_func, worker->relax_on_file, worker->relax_on_line);
994#else
995 STARPU_ASSERT(worker->state_relax_refcnt<UINT_MAX);
996#endif
997 worker->state_relax_refcnt++;
998#ifdef STARPU_SPINLOCK_CHECK
999 worker->relax_on_file = file;
1000 worker->relax_on_line = line;
1001 worker->relax_on_func = func;
1002#endif
1003 STARPU_PTHREAD_COND_BROADCAST(&worker->sched_cond);
1004}
1005#ifdef STARPU_SPINLOCK_CHECK
1006#define _starpu_worker_relax_on_locked(worker) __starpu_worker_relax_on_locked(worker,__FILE__, __LINE__, __starpu_func__)
1007#endif
1008
1009#ifdef STARPU_SPINLOCK_CHECK
1010static inline void __starpu_worker_relax_off(const char*file, int line, const char* func)
1011#else
1012static inline void _starpu_worker_relax_off(void)
1013#endif
1014{
1015 int workerid = starpu_worker_get_id();
1016 if (workerid == -1)
1017 return;
1018 struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
1019 STARPU_ASSERT(worker != NULL);
1020 if (!worker->state_sched_op_pending)
1021 return;
1022 STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
1023#ifdef STARPU_SPINLOCK_CHECK
1024 STARPU_ASSERT_MSG(worker->state_relax_refcnt>0, "relax last turn off in %s (%s:%d)\n", worker->relax_on_func, worker->relax_on_file, worker->relax_on_line);
1025#else
1026 STARPU_ASSERT(worker->state_relax_refcnt>0);
1027#endif
1028 worker->state_relax_refcnt--;
1029#ifdef STARPU_SPINLOCK_CHECK
1030 worker->relax_off_file = file;
1031 worker->relax_off_line = line;
1032 worker->relax_off_func = func;
1033#endif
1034 STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
1035}
1036#ifdef STARPU_SPINLOCK_CHECK
1037#define _starpu_worker_relax_off() __starpu_worker_relax_off(__FILE__, __LINE__, __starpu_func__)
1038#endif
1039#define starpu_worker_relax_off _starpu_worker_relax_off
1040
1041#ifdef STARPU_SPINLOCK_CHECK
1042static inline void __starpu_worker_relax_off_locked(const char*file, int line, const char* func)
1043#else
1044static inline void _starpu_worker_relax_off_locked(void)
1045#endif
1046{
1047 int workerid = starpu_worker_get_id();
1048 if (workerid == -1)
1049 return;
1050 struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
1051 STARPU_ASSERT(worker != NULL);
1052 if (!worker->state_sched_op_pending)
1053 return;
1054#ifdef STARPU_SPINLOCK_CHECK
1055 STARPU_ASSERT_MSG(worker->state_relax_refcnt>0, "relax last turn off in %s (%s:%d)\n", worker->relax_on_func, worker->relax_on_file, worker->relax_on_line);
1056#else
1057 STARPU_ASSERT(worker->state_relax_refcnt>0);
1058#endif
1059 worker->state_relax_refcnt--;
1060#ifdef STARPU_SPINLOCK_CHECK
1061 worker->relax_off_file = file;
1062 worker->relax_off_line = line;
1063 worker->relax_off_func = func;
1064#endif
1065}
1066#ifdef STARPU_SPINLOCK_CHECK
1067#define _starpu_worker_relax_off_locked() __starpu_worker_relax_off_locked(__FILE__, __LINE__, __starpu_func__)
1068#endif
1069
1070static inline int _starpu_worker_get_relax_state(void)
1071{
1072 int workerid = starpu_worker_get_id();
1073 if (workerid < 0)
1074 return 1;
1075 struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
1076 STARPU_ASSERT(worker != NULL);
1077 return worker->state_relax_refcnt != 0;
1078}
1079#define starpu_worker_get_relax_state _starpu_worker_get_relax_state
1080
1085static inline void _starpu_worker_lock(int workerid)
1086{
1087 struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
1088 STARPU_ASSERT(worker != NULL);
1089 int cur_workerid = starpu_worker_get_id();
1090 if (workerid != cur_workerid)
1091 {
1092 starpu_worker_relax_on();
1093
1094 STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
1095 while (!worker->state_relax_refcnt)
1096 {
1097 STARPU_PTHREAD_COND_WAIT(&worker->sched_cond, &worker->sched_mutex);
1098 }
1099 }
1100 else
1101 {
1102 STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
1103 }
1104}
1105
1106static inline int _starpu_worker_trylock(int workerid)
1107{
1108 struct _starpu_worker *cur_worker = _starpu_get_local_worker_key();
1109 int cur_workerid = cur_worker->workerid;
1110 struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
1111 STARPU_ASSERT(worker != NULL);
1112
1113 /* Start with ourself */
1114 int ret = STARPU_PTHREAD_MUTEX_TRYLOCK_SCHED(&cur_worker->sched_mutex);
1115 if (ret)
1116 return ret;
1117 if (workerid == cur_workerid)
1118 /* We only needed to lock ourself */
1119 return 0;
1120
1121 /* Now try to lock the other worker */
1122 ret = STARPU_PTHREAD_MUTEX_TRYLOCK_SCHED(&worker->sched_mutex);
1123 if (!ret)
1124 {
1125 /* Good, check that it is relaxed */
1126 ret = !worker->state_relax_refcnt;
1127 if (ret)
1128 STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
1129 }
1130 if (!ret)
1131 _starpu_worker_relax_on_locked(cur_worker);
1132 STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&cur_worker->sched_mutex);
1133 return ret;
1134}
1135
1136static inline void _starpu_worker_unlock(int workerid)
1137{
1138 struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
1139 STARPU_ASSERT(worker != NULL);
1140 STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
1141 int cur_workerid = starpu_worker_get_id();
1142 if (workerid != cur_workerid)
1143 {
1144 starpu_worker_relax_off();
1145 }
1146}
1147
1148static inline void _starpu_worker_lock_self(void)
1149{
1150 int workerid = starpu_worker_get_id_check();
1151 struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
1152 STARPU_ASSERT(worker != NULL);
1153 STARPU_PTHREAD_MUTEX_LOCK_SCHED(&worker->sched_mutex);
1154}
1155
1156static inline void _starpu_worker_unlock_self(void)
1157{
1158 int workerid = starpu_worker_get_id_check();
1159 struct _starpu_worker *worker = _starpu_get_worker_struct(workerid);
1160 STARPU_ASSERT(worker != NULL);
1161 STARPU_PTHREAD_MUTEX_UNLOCK_SCHED(&worker->sched_mutex);
1162}
1163
1164static inline int _starpu_wake_worker_relax(int workerid)
1165{
1166 _starpu_worker_lock(workerid);
1167 int ret = starpu_wake_worker_locked(workerid);
1168 _starpu_worker_unlock(workerid);
1169 return ret;
1170}
1171
1172int starpu_wake_worker_relax_light(int workerid);
1173
1178void _starpu_worker_refuse_task(struct _starpu_worker *worker, struct starpu_task *task);
1179
1180/* @}*/
1181
1182#endif // __WORKERS_H__
Definition: barrier_counter.h:27
_starpu_worker_status
Definition: errorcheck.h:26
void _starpu_worker_apply_deferred_ctx_changes(void)
Definition: sched_ctx_list.h:25
Definition: workers.h:203
char padding[STARPU_CACHELINE_SIZE]
Definition: workers.h:223
unsigned memory_node
Definition: workers.h:207
struct starpu_perfmodel_arch perf_arch
Definition: workers.h:204
uint32_t worker_mask
Definition: workers.h:205
Definition: drivers.h:24
Definition: workers.h:352
int mpi_nodeid
Definition: workers.h:386
struct _starpu_combined_worker combined_workers[STARPU_NMAX_COMBINEDWORKERS]
Definition: workers.h:397
int opencl_nodeid
Definition: workers.h:382
unsigned running
Definition: workers.h:421
int current_cuda_gpuid
Definition: workers.h:366
unsigned nbindid
Definition: workers.h:410
int pause_depth
Definition: workers.h:427
int current_opencl_gpuid
Definition: workers.h:369
int cpus_nodeid
Definition: workers.h:378
int mic_nodeid
Definition: workers.h:384
struct _starpu_machine_config::@4 * bindid_workers
struct _starpu_sched_ctx sched_ctxs[STARPU_NMAX_SCHED_CTXS+1]
Definition: workers.h:430
int current_bindid
Definition: workers.h:361
struct _starpu_worker workers[STARPU_NMAXWORKERS]
Definition: workers.h:393
uint32_t worker_mask
Definition: workers.h:415
int current_mic_deviceid
Definition: workers.h:372
int cuda_nodeid
Definition: workers.h:380
char padding2[STARPU_CACHELINE_SIZE]
Definition: workers.h:402
unsigned submitting
Definition: workers.h:433
char padding1[STARPU_CACHELINE_SIZE]
Definition: workers.h:389
int current_mpi_deviceid
Definition: workers.h:375
struct starpu_conf conf
Definition: workers.h:418
Definition: workers.h:247
struct starpu_tree * tree
Definition: workers.h:261
unsigned nmpidevices
Definition: workers.h:301
unsigned nhwmicdevices
Definition: workers.h:309
hwloc_topology_t hwtopology
Definition: workers.h:258
unsigned workers_mpi_ms_deviceid[STARPU_NMAXWORKERS]
Definition: workers.h:348
unsigned workers_cuda_gpuid[STARPU_NMAXWORKERS]
Definition: workers.h:330
unsigned nhwopenclgpus
Definition: workers.h:281
unsigned workers_opencl_gpuid[STARPU_NMAXWORKERS]
Definition: workers.h:338
unsigned nworkers
Definition: workers.h:249
unsigned ncudagpus
Definition: workers.h:292
unsigned nhwmpi
Definition: workers.h:286
unsigned ncpus
Definition: workers.h:289
unsigned ncombinedworkers
Definition: workers.h:252
unsigned nhwcudagpus
Definition: workers.h:276
unsigned nopenclgpus
Definition: workers.h:298
unsigned nhwmpicores[STARPU_MAXMPIDEVS]
Definition: workers.h:304
unsigned nhwpus
Definition: workers.h:271
unsigned workers_bindid[STARPU_NMAXWORKERS]
Definition: workers.h:322
unsigned nhwmiccores[STARPU_MAXMICDEVS]
Definition: workers.h:312
unsigned nhwcpus
Definition: workers.h:266
Definition: sched_ctx.h:46
unsigned id
Definition: sched_ctx.h:48
Definition: workers.h:231
unsigned started
Definition: workers.h:235
starpu_pthread_t worker_thread
Definition: workers.h:233
starpu_pthread_cond_t ready_cond
Definition: workers.h:238
Definition: workers.h:70
unsigned state_blocked_in_parallel_observed
Definition: workers.h:107
struct starpu_task * current_task
Definition: workers.h:134
unsigned state_block_in_parallel_ack
Definition: workers.h:109
unsigned state_relax_refcnt
Definition: workers.h:94
struct starpu_task * task_transferring
Definition: workers.h:169
unsigned state_changing_ctx_waiting
Definition: workers.h:104
unsigned subworkerid
Definition: workers.h:78
unsigned devid
Definition: workers.h:77
starpu_pthread_t thread_changing_ctx
Definition: workers.h:120
int current_rank
Definition: workers.h:82
unsigned state_unblock_in_parallel_req
Definition: workers.h:110
unsigned nb_buffers_transferred
Definition: workers.h:167
starpu_pthread_t worker_thread
Definition: workers.h:76
starpu_pthread_cond_t ready_cond
Definition: workers.h:85
unsigned memory_node
Definition: workers.h:86
int workerid
Definition: workers.h:80
unsigned nb_buffers_totransfer
Definition: workers.h:168
unsigned state_keep_awake
Definition: workers.h:150
struct _starpu_worker_set * set
Definition: workers.h:146
int worker_size
Definition: workers.h:83
unsigned pop_ctx_priority
Definition: workers.h:185
unsigned state_unblock_in_parallel_ack
Definition: workers.h:111
unsigned numa_memory_node
Definition: workers.h:87
unsigned char pipeline_stuck
Definition: workers.h:145
unsigned char pipeline_length
Definition: workers.h:144
unsigned nsched_ctxs
Definition: workers.h:158
unsigned spinning_backoff
Definition: workers.h:165
unsigned state_blocked_in_parallel
Definition: workers.h:106
unsigned current_ordered_task_order
Definition: workers.h:133
starpu_pthread_cond_t started_cond
Definition: workers.h:84
starpu_pthread_mutex_t sched_mutex
Definition: workers.h:93
enum starpu_worker_archtype arch
Definition: workers.h:73
unsigned run_by_starpu
Definition: workers.h:153
unsigned local_ordered_tasks_size
Definition: workers.h:131
struct starpu_task ** local_ordered_tasks
Definition: workers.h:130
unsigned is_slave_somewhere
Definition: workers.h:186
starpu_pthread_cond_t sched_cond
Definition: workers.h:92
unsigned has_prev_init
Definition: workers.h:161
unsigned char ntasks
Definition: workers.h:143
int bindid
Definition: workers.h:79
unsigned state_block_in_parallel_req
Definition: workers.h:108
unsigned state_changing_ctx_notice
Definition: workers.h:105
unsigned state_sched_op_pending
Definition: workers.h:103
int combined_workerid
Definition: workers.h:81
unsigned char first_task
Definition: workers.h:142
unsigned current_ordered_task
Definition: workers.h:132
unsigned block_in_parallel_ref_count
Definition: workers.h:119
uint32_t worker_mask
Definition: workers.h:74
enum _starpu_worker_status status
Definition: workers.h:149