! This file is an example of using fthreads. ! The Project Settings *MUST* include ! "allow recursive procedures" ! "variables default to automatic" ! "use multithreaded libraries" ! Expect random memory related errors if not. ! This module holds the global variables for the example. module test_data ! This module needs the public thread ! and synchronization object definitions ! so it must include the fthreads module. ! By having each category of variable as a separate type, ! the compiler may make some checks for correct usage. use fthreads ! logu is a logical unit to receive trace messages. integer, save :: logu = 8 ! test output ! These are some thread variables. ! within the example program, These variables ! will represent the various threads ! throughout the program and its procedures. ! The first thread is the master thread, ! assigned the constant value primary defined in fthreads. ! (The primary thread data structures ! are initialized by fthread_init().) type( thread_t) :: master ! thread 0 ! These threads are the worker threads, ! which will preform the actual calculations. type( thread_t) :: work1 ! thread 1 type( thread_t) :: work2 ! thread 2 type( thread_t) :: work3 ! thread 3 type( thread_t) :: work4 ! thread 4 ! The trace variable records the actions of the program ! without the overhead of I/O and only records a predefined ! number of messages to avoid overwhelming the programmer ! with a lengthy logfile. It is initialized by calling ! trace_init(). type( trace_t) :: thread_log ! record actions ! These variables represent the synchronization objects ! the program will use. ! This mutex allows the program to single thread execution ! of I/O statements. This is not strictly necessary for ! a thread-safe I/O library (such as is available with CVF), ! but it's a good habit to acquire. It is initialized by ! calling mutex_create(). type( mutex_t) :: m_io ! single thread io ! The software pipeline example needs two events to synchronize ! use of the buffer to pass data down the pipeline. They are ! initialized by calling event_create(). ! This event is posted when the next buffer is ready for use. type( event_t) :: e_rnready ! signal got more rn ! This event is posted when the last buffer may be reused. type( event_t) :: e_rnmore ! signal need more rn ! This flag is how producer() signals consumer() ! that there is no more work. logical, save :: rn_done ! This is the double buffer used for the software pipeline ! between producer and consumer. integer, parameter :: num_buffers = 2 ! number of buffers integer, parameter :: rn_size = 100000 ! number of rn's per buffer real, dimension( rn_size, num_buffers) :: rn ! a lot of rn's ! The barrier is used to keep the worker threads synchronized ! when updating the array in the solution of Poisson's equation. ! It is initialized by calling barrier_create(). type( barrier_t) :: b_switch ! switch direction in poisson solver ! The size of the array to be smoothed by poisson(). integer, parameter :: p_size = 1000 integer, save :: p_to = 1 ! sides of poisson buffer integer, save :: p_from = 2 ! sides of poisson buffer real, dimension( p_size, p_size, 2), save :: p_buff ! Used to gather the residuals from p_buff. real, dimension( 4), save :: max_err ! Flag to all worker threads to quit poisson(). logical, save :: go_exit = .false. ! End of the data shared by all threads. end module test_data ! To report bugs, suggest enhancements, etc. to the Authors, ! Contact: ! Purple Sage Computing Solutions, Inc. ! send email to dnagle@erols.com ! or fax to 703 471 0684 (USA) ! or mail to 12142 Purple Sage Ct. ! Reston, VA 20194-5621 USA ! Windows wants a main program called WinMain, which takes ! exactly four arguments and returns an integer result. integer function WinMain( hIbV, hPIbV, lpCLbV, nCS) ! The compiler must decorate the name. !DEC$ ATTRIBUTES STDCALL,ALIAS : '_WinMain@16' :: WinMain ! ********************************************************************** ! test the fthreads library ! ********************************************************************** ! This program uses standard_types to correctly establish kinds ! and a few other purposes. use standard_types ! This program uses the fthreads procedures, ! so it must include the fthreads module. use fthreads ! This program uses the variables declared in the above module. use test_data ! This program ignores the Windows arguments to the main program, ! but it must declare them to be 32 bit integers. integer( int_k) :: hIbV, hPIbV, lpCLbV, nCS ! ********************************************************************** ! test_fthreads data ! ********************************************************************** ! local variables ! The program will print the number of threads running, ! and the status of several calls. integer :: nprocs, istat, iprt ! The program prints the names of its worker threads, ! so here's the string which passes the names ! from the thread_create() procedure to the write() statement. character( len= 20) tname ! This is the name of the integer function ! the program uses as a thread procedure. ! The program passes this name to the thread_create() ! procedure to execute a thread. integer, external :: work ! ********************************************************************** ! test_fthreads text ! ********************************************************************** continue ! test_fthreads ! The program "uses" the main program arguments ! to quiet compiler complaints regarding unused arguments. ! This is Windows hocus-pocus which you may safely ignore. hIbV = hIbV hPIbV = hPIbV lpCLbV = lpCLbV nCS = nCS ! Since there's only one thread (the master thread) ! executing now, there's no need to single thread I/O. open( unit= logu, file= 'test.log', status= 'replace') ! This call initializes the trace buffer variable. ! It sets the number of messages held to 100, ! indicates that the mutex built into the trace variable ! is to be used so the program can use the same ! trace variable in multiple threads, identifies the ! trace variable, and requests the status of the initialization ! so the program can print it. ! initialize trace buffer call trace_init( 100, .true., thread_log, istat) ! Write the status of the trace variable initialization. write( unit= logu, fmt= *) 'trace_init status: ', istat ! Find out how many physical processors Windows thinks are present. nprocs = fthread_cpus() ! And print the resulting number. write( unit= logu, fmt= *) 'number of processors: ', nprocs ! Here's where the program initializes the fthreads data structures. ! This call sets the number of threads to be the number of processors, ! asks for four teams (which aren't used in this example), four barriers, ! four events, four mutexs. It names the master thread "boss". ! The actions of fthread_init() are recorded in the trace variable, ! and the program will print the status. call fthread_init( nprocs, 4, 4, 4, 4, & 'Boss', thread_log, istat) ! Print the status returned by fthread_init(). write( unit= logu, fmt= *) 'fthread_init status: ', istat ! The primary thread's thread variable is initialized ! by using the fthread constant primary. master = primary ! Print the primary thread's id. write( unit= logu, fmt= *) 'thread id: ', thread_id( master) ! ---------------------------------------------------------------------- ! Get name of the primary thread ! and log the operation. call thread_status( master, name= tname, flag= istat, trace_v= thread_log) ! Print the masterthread's name. write( unit= logu, fmt= *) 'thread name, get status: ', tname, istat ! ---------------------------------------------------------------------- ! The program must create all the synchronization objects to be used ! because once thread_create() is called, multiple threads will be executing. ! ---------------------------------------------------------------------- ! Make a mutex which will be used to single thread I/O. ! Note that the mutex won't be used until there's a second thread running. call mutex_init( m_io, flag= istat, trace_v= thread_log) ! Print the results of mutex_create(). write( unit= logu, fmt= *) 'mutex create status, id: ', istat, mutex_id( m_io) ! ---------------------------------------------------------------------- ! Make an event which producer will use to signal consumer ! when another buffer is ready for use. call event_init( e_rnready, flag= istat, trace_v= thread_log) ! Print the results of event_create() write( unit= logu, fmt= *) 'event create status, id: ', istat, event_id( e_rnready) ! ---------------------------------------------------------------------- ! Make an event which consumer will use to signal producer ! when another buffer is needed. call event_init( e_rnmore, flag= istat, trace_v= thread_log) ! Print the results of event_create() write( unit= logu, fmt= *) 'event create status, id: ', istat, event_id( e_rnmore) ! ---------------------------------------------------------------------- ! Make a barrier which will be used to synchronize ! the updating of the matrix. call barrier_init( b_switch, all_workers, flag= istat, trace_v= thread_log) ! Print the results of barrier_create() write( unit= logu, fmt= *) 'barrier create status, id: ', istat, barrier_id( b_switch) ! ---------------------------------------------------------------------- ! Make another thread to actually process data. ! From this point until the thread_wait() calls, ! one must assume there are several threads executing. ! The new thread's variable is work1, it will execute integer function work, ! it's name is work_#1, the status if the action will be checked, ! and the call to thread_create() will be entered in the trace variable. call thread_create( work1, work, "Work_#1", flag= istat, trace_v= thread_log) ! Now, there might be two threads running ! or, thread work1 might have finished already. ! ---------------------------------------------------------------------- ! Ensure single thread access to I/O call mutex_lock( m_io) ! Print the status of the previous thread_create() call. write( unit= logu, fmt= *) 'thread create status:', istat ! Release the mutex for further use. call mutex_unlock( m_io) ! ---------------------------------------------------------------------- ! Make a second worker thread. ! The new thread's variable is work2, it will execute integer function work, ! it's name is work_#2, the status if the action will be checked, ! and the call to thread_create() will be entered in the trace variable. call thread_create( work2, work, "Work_#2", flag= istat, trace_v= thread_log) ! ---------------------------------------------------------------------- ! Ensure single thread access to I/O call mutex_lock( m_io) ! Print the status of the previous thread_create() call. write( unit= logu, fmt= *) 'thread create status:', istat ! Release the mutex for further use. call mutex_unlock( m_io) ! ---------------------------------------------------------------------- ! Make a third worker thread. ! The new thread's variable is work3, it will execute integer function work, ! it's name is work_#3, the status if the action will be checked, ! and the call to thread_create() will be entered in the trace variable. call thread_create( work3, work, "Work_#3", flag= istat, trace_v= thread_log) ! ---------------------------------------------------------------------- ! Ensure single thread access to I/O call mutex_lock( m_io) ! Print the status of the previous thread_create() call. write( unit= logu, fmt= *) 'thread create status:', istat ! Release the mutex for further use. call mutex_unlock( m_io) ! ---------------------------------------------------------------------- ! Make a fourth worker thread. ! The new thread's variable is work4, it will execute integer function work, ! it's name is work_#4, the status if the action will be checked, ! and the call to thread_create() will be entered in the trace variable. call thread_create( work4, work, "Work_#4", flag= istat, trace_v= thread_log) ! ---------------------------------------------------------------------- ! Ensure single thread access to I/O call mutex_lock( m_io) ! Print the status of the previous thread_create() call. write( unit= logu, fmt= *) 'thread create status:', istat ! Release the mutex for further use. call mutex_unlock( m_io) ! ---------------------------------------------------------------------- ! Now verify that there are five threads running, ! master and four workers. nprocs = fthread_count() ! Ensure single thread access to I/O call mutex_lock( m_io) ! Print the number of threads running as found by the fthread_count() call. write( unit= logu, fmt= *) 'number of threads: ', nprocs ! Release the mutex for further use. call mutex_unlock( m_io) ! ---------------------------------------------------------------------- ! Wait here until thread 1 finishes executing. ! The call to thread_wait() will return when work, ! executing in thread 1, returns. call thread_wait( work1, flag= istat, trace_v= thread_log) ! Get the user and system times for thread 1. call thread_status( work1, et= te, st= ts) ! ---------------------------------------------------------------------- ! Ensure single thread access to I/O call mutex_lock( m_io) ! Print the thread_wait() status. write( unit= logu, fmt= *) 'thread_wait status: ', istat ! Print the thread 1 times. write( unit= logu, fmt= *) 'thread ut, kt:', te, ts ! Release the mutex for further use. call mutex_unlock( m_io) ! ---------------------------------------------------------------------- ! Wait here until thread 2 finishes executing. call thread_wait( work2, flag= istat, trace_v= thread_log) ! Get the user and system times for thread 2. call thread_status( work2, et= te, st= ts) ! ---------------------------------------------------------------------- ! Ensure single thread access to I/O call mutex_lock( m_io) ! Print the thread_wait() status. write( unit= logu, fmt= *) 'thread_wait status: ', istat ! Print the thread 1 times. write( unit= logu, fmt= *) 'thread ut, kt:', te, ts ! Release the mutex for further use. call mutex_unlock( m_io) ! ---------------------------------------------------------------------- ! Wait here until thread 3 finishes executing. call thread_wait( work3, flag= istat, trace_v= thread_log) ! Get the user and system times for thread 3. call thread_status( work3, et= te, st= ts) ! ---------------------------------------------------------------------- ! Ensure single thread access to I/O call mutex_lock( m_io) ! Print the thread_wait() status. write( unit= logu, fmt= *) 'thread_wait status: ', istat ! Print the thread 1 times. write( unit= logu, fmt= *) 'thread ut, kt:', te, ts ! Release the mutex for further use. call mutex_unlock( m_io) ! ---------------------------------------------------------------------- ! Wait here until thread 4 finishes executing. call thread_wait( work4, flag= istat, trace_v= thread_log) ! Get the user and system times for thread 4. call thread_status( work4, et= te, st= ts) ! ---------------------------------------------------------------------- ! Since all the worker threads have finished, ! it's no longer necessary to guard I/O with the mutex. ! Print the thread_wait() status. write( unit= logu, fmt= *) 'thread_wait status: ', istat ! Print the thread 4 times. write( unit= logu, fmt= *) 'thread ut, kt:', te, ts ! ---------------------------------------------------------------------- ! Get the mutex statistics. call mutex_status( m_io, locked= i, unlocked= j, flag= istat, trace_v= thread_log) ! Print the mutex statistics. write( unit= logu, fmt= *) 'mutex_get locks, unlocks, status: ', i, j, istat ! ---------------------------------------------------------------------- ! Clear the mutex variable, it's no longer needed. call mutex_del( m_io, flag= istat, trace_v= thread_log) ! Verify that mutex_del() worked. write( unit= logu, fmt= *) 'mutex_del status: ', istat ! ---------------------------------------------------------------------- ! Get the event statistics. call event_status( e_rnready, waited= i, cleared= j, posted= k, & flag= istat, trace_v= thread_log) ! Print the event statistics. write( unit= logu, fmt= *) 'event_get ready waited, cleared, posted, status: ', i, j, k, istat ! ---------------------------------------------------------------------- ! Get the event statistics. call event_status( e_rnmore, waited= i, cleared= j, posted= k, & flag= istat, trace_v= thread_log) ! Print the event statistics. write( unit= logu, fmt= *) 'event_get more waited, cleared, posted, status: ', i, j, k, istat ! ---------------------------------------------------------------------- ! Clear the event variable, it's no longer needed. call event_del( e_rnready, flag= istat, trace_v= thread_log) ! Verify that event_del() worked. write( unit= logu, fmt= *) 'event_del status: ', istat ! ---------------------------------------------------------------------- ! Clear the other event variable, it's no longer needed. call event_del( e_rnmore, flag= istat, trace_v= thread_log) ! Verify that event_del() worked. write( unit=logu, fmt= *) 'event_del status: ', istat ! ---------------------------------------------------------------------- ! Get the barrier statistics. call barrier_status( b_switch, number= i, synced= j, flag= istat, trace_v= thread_log) ! Print the barrier statistics. write( unit= logu, fmt= *) 'barrier_get, height, syncs, status: ', i, j, istat ! ---------------------------------------------------------------------- ! Clear the barrier variable, it's no longer needed. call barrier_del( b_switch, flag= istat, trace_v= thread_log) ! Verify that barrier_del() worked. write( unit= logu, fmt= *) 'barrier_del status: ', istat ! ---------------------------------------------------------------------- ! All statistics are gathered, so the fthreads data structures ! aren't needed anymore. Deallocate them. call fthread_end( master, flag= istat, trace_v= thread_log) ! Verify that fthread_end() worked. write( unit= logu, fmt= *) 'fthread_end status: ', istat ! ---------------------------------------------------------------------- ! Print the trace variable messages. call trace_print( logu, trace_v= thread_log, flag= istat, printed= iprt) ! Print the number of messages printed and the status. write( unit= logu, fmt= *) 'trace_print number, status: ', iprt, istat ! Close the logfile. close( unit= logu, status= 'keep') ! Return to Windows successfully. WinMain = 0 return ! test_fthreads end function WinMain ! eof ! ********************************************************************** ! This function is the code each worker thread executes. ! It calls the routines which do the actual computations. integer function work( th) use fthreads use test_data type( thread_t), intent( in) :: th ! work() continue ! work() ! Print a message identifying this thread. call mutex_lock( m_io) write( logu, *) ' arg: ', thread_id( th) call mutex_unlock( m_io) ! Only two of the four threads will execute producer() and consumer(). if( thread_id( th) == 3) call producer() ! make rn's if( thread_id( th) == 4) call consumer() ! use rn's ! All the workers will execute poisson(). call poisson( th) ! all threads ! Return successfully. work = 0 ! This return ends the thread. return ! work() ! work() end function work ! ********************************************************************** ! Procedure producer() is the source of random numbers ! used by consumer(). Conceptually, the rn's could be ! any data which must be processed through several steps. ! This strategy is called a software pipeline. subroutine producer() use fthreads use test_data ! This integer points to the side of the buffer ! to be filled with rns. integer, save :: filling ! producer() continue ! producer() ! These next lines initialize the loop ! of repeatedly processing data in batches. ! The last buffer to be sent to consumer() ! must be signaled to consumer(). Otherwise, ! consumer() will attempt to process a buffer ! which is not data. rn_done = .false. ! First, producer() will fill side 1 of the buffer. filling = 1 ! Fill the buffer with rns. call random_number( rn( :, filling)) ! Signal to consumer() that it may process one side ! of the buffer now. call event_post( e_rnready) ! Wait until consumer() signals that it's ready ! to have the other side filled (which it will do ! right away). call event_wait( e_rnmore) ! Clear the event so it can be posted again. call event_clear( e_rnmore) ! The beginning of the main sending loop. get_rns: do i = 1, 10 ! or pick another upper limit ! Compute the other side of the buffer. filling = filling + 1 if( filling > num_buffers ) filling = 1 ! Fill the other side of the buffer. call random_number( rn( :, filling)) ! Signal consumer() that the buffer is filled. call event_post( e_rnready) ! Wait to be told to fill the next side. call event_wait( e_rnmore) ! Clear the event so it can be posted again. call event_clear( e_rnmore) ! The end of the main sending loop. enddo get_rns ! Signal consumer() that there's no more work to do. rn_done = .true. ! Signal consumer() to run one last time. call event_post( e_rnready) ! Return successfully. return ! producer() ! producer() end subroutine producer ! ********************************************************************** subroutine consumer() use fthreads use test_data ! This integer points to the side of the buffer ! to be used as input. integer, save :: using ! consumer() continue ! consumer() ! These next lines initialize the loop ! of repeatedly processing data in batches. ! First, comsumer() will use side 1 of the buffer. using = 1 ! Wait for producer() to fill side 1 of the buffer. call event_wait( e_rnready) ! Clear the event so it can be posted again. call event_clear( e_rnready) ! The beginning of the main sending loop. use_rns: do ! Compute using the data provided by producer(). ave = sum( rn( :, using)) / rn_size ! Make some output to show the work was done. call mutex_lock( m_io) ! The side should alternate 1, 2, 1, 2, ... ! and the average should be about one half. write( logu, *) using, ave ! Reset the mutex. call mutex_unlock( m_io) ! Signal producer() that consumer() needs another buffer. call event_post( e_rnmore) ! Wait until producer() has made another buffer. call event_wait( e_rnready) ! Clear the event so it can be posted again. call event_clear( e_rnready) ! Check whether producer() has set the all done flag ! and exit if it is set. if( rn_done ) exit use_rns ! Compute the other side of the buffer. using = using + 1 if( using > num_buffers ) using = 1 ! The end of the main sending loop. enddo use_rns ! Return successfully. return ! consumer() ! consumer() end subroutine consumer ! ********************************************************************** subroutine poisson( th) use fthreads use test_data type( thread_t), intent( in) :: th ! poisson() local integer :: i, ibeg, iend, me ! poisson() continue ! poisson() ! Poisson() will use a divide-and-conquer strategy, ! so each thread needs to know its identity ! in order to execute its portion of the whole. me = thread_id( th) ! Only worker 1 will set the initial conditions ! and the boundary condition. ic_bc: if( me == 1 )then ! thread 1 only ! The array is zero. p_buff = 0. ! sets ic ! Except along the top and left edges, where it is one. p_buff( :, 1, :) = 1. ! and bc p_buff( 1, :, :) = 1. ! Print an identifying message. call mutex_lock( m_io) write( unit= logu, fmt= *) "set ic, bc ", me call mutex_unlock( m_io) endif ic_bc ! Each thread must compute the limits of its portion. ibeg = max( ( p_size / 4) * ( me - 1) + 1, 2) iend = min( ( p_size / 4) * ( me), p_size - 1) ! Print an confirming message. call mutex_lock( m_io) write( unit= logu, fmt= *) "thread, ibeg, iend ", me, ibeg, iend call mutex_unlock( m_io) ! All workers wait here for all workers to get here ! because the iteration can't start until the array is initialized. call barrier_sync( b_switch) ! This is a local counter which is kept in synch ! by the barrier. i = 0 ! The main smoothing loop. ! The exit criteria is the convergence criteria. smooth: do ! forever i = i + 1 ! count iterations ! Smooth the array. ! Each thread has its own ix and iy, p_to and p_from ! are updated only by thread 1. do iy = ibeg, iend do ix = 2, p_size - 1 p_buff( ix, iy, p_to) = .25 * ( & p_buff( ix - 1, iy, p_from) + & p_buff( ix + 1, iy, p_from) + & p_buff( ix, iy - 1, p_from) + & p_buff( ix, iy + 1, p_from) ) enddo enddo ! Each thread computes the largest residual ! in its own portion of the array. max_err( me) = & maxval( abs( p_buff( :, ibeg: iend, p_to) - p_buff( :, ibeg: iend, p_from) )) ! Print a diagnostic message. ! In a production code, this would probably be removed. call mutex_lock( m_io) write( logu, *) "done step: thread, iteration ", me, i call mutex_unlock( m_io) ! Synchronize all threads when all have completed the iteration. call barrier_sync( b_switch) ! wait for all to finish step ! Worker 1 will compute the global maximum residual. if( me == 1 )then ! thread 1 ! Note that go_exit is a global variable declared in the module. go_exit = maxval( max_err) < .01 ! maximum residual endif ! Print a diagnostic message. ! In a production code, this would probably be removed. call mutex_lock( m_io) write( logu, *) "past go_exit: thread, iteration ", me, i call mutex_unlock( m_io) ! All worker threads must wait until the exit switch is known to be set. call barrier_sync( b_switch) ! wait for all #1 to compute go_exit ! All threads will quit upon signal. if( go_exit ) exit smooth ! Worker 1 will switch p_to and p_from. ! The integer j is wasted storage in the other threads. switch: if( me == 1 )then ! thread 1 j = p_to ! switch from & to p_to = p_from p_from = j endif switch ! Print a diagnostic message. ! In a production code, this would probably be removed. call mutex_lock( m_io) write( logu, *) "past switch: thread, iteration", me, i call mutex_unlock( m_io) ! All threads must wait until thread 1 has swapped p_to and p_from. call barrier_sync( b_switch) ! wait for residual, switch ! The end of the main smoothing loop. enddo smooth ! Print a diagnostic message. ! In a production code, this would probably be removed. call mutex_lock( m_io) write( logu, *) "thread, iterations ", me, i call mutex_unlock( m_io) ! Return successfully. return ! poisson() ! poisson() end subroutine poisson