Changeset 637 for trunk/user/sort


Ignore:
Timestamp:
Jul 18, 2019, 2:06:55 PM (5 years ago)
Author:
alain
Message:

Introduce the non-standard pthread_parallel_create() system call
and re-write the <fft> and <sort> applications to improve the
intrinsic paralelism in applications.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/user/sort/sort.c

    r636 r637  
    5454#include <hal_macros.h>
    5555
    56 #define ARRAY_LENGTH        2048       // number of items
    57 #define MAX_THREADS         1024       // 16 * 16 * 4
    58 
    59 #define USE_DQT_BARRIER     1          // use DQT barrier if non zero
    60 #define DISPLAY_ARRAY       0          // display items values before and after
    61 #define DEBUG_MAIN          0          // trace main function
    62 #define DEBUG_SORT          0          // trace sort function
    63 #define CHECK_RESULT        0          // for debug
    64 #define INSTRUMENTATION     1          // register computation times on file
    65 
    66 /////////////////////////////////////////////////////////////
    67 // argument for the sort() function (one thread per core)
    68 /////////////////////////////////////////////////////////////
     56#define ARRAY_LENGTH        2048            // number of items
     57#define MAX_THREADS         1024            // 16 * 16 * 4
     58
     59#define X_MAX               16              // max number of clusters in a row
     60#define Y_MAX               16              // max number of clusters in a column
     61#define CORES_MAX           4               // max number of cores in a cluster
     62#define CLUSTERS_MAX        X_MAX * Y_MAX
     63
     64#define USE_DQT_BARRIER     1               // use DQT barrier if non zero
     65#define DISPLAY_ARRAY       0               // display items values before and after
     66#define DEBUG_MAIN          0               // trace main function
     67#define DEBUG_SORT          0               // trace sort function
     68#define CHECK_RESULT        0               // for debug
     69#define INSTRUMENTATION     1               // register computation times on file
     70
     71///////////////////////////////////////////////////////////////////////////////////
     72//            Arguments for the sort() function
     73///////////////////////////////////////////////////////////////////////////////////
    6974
    7075typedef struct
    7176{
    72     unsigned int threads;       // total number of threads
    73     unsigned int thread_uid;    // thread user index (0 to threads -1)
    74     unsigned int main_uid;      // main thread user index
     77    unsigned int        tid;                // continuous thread index
     78    unsigned int        threads;            // total number of threads
     79    pthread_barrier_t * parent_barrier;     // pointer on termination barrier
    7580}
    76 args_t;
    77 
    78 //////////////////////////////////////////
    79 //      Global variables
    80 //////////////////////////////////////////
     81sort_args_t;
     82
     83////////////////////////////////////////////////////////////////////////////////////
     84//            Sort specific global variables
     85////////////////////////////////////////////////////////////////////////////////////
    8186
    8287int                 array0[ARRAY_LENGTH];    // values to sort
     
    8590pthread_barrier_t   barrier;                 // synchronisation variables
    8691
    87 pthread_t           trdid[MAX_THREADS];      // kernel identifiers
    88 pthread_attr_t      attr[MAX_THREADS];       // thread attributes
    89 args_t              arg[MAX_THREADS];        // sort function arguments
     92/////////////////////////////////////////////////////////////////////////////////////
     93//             Global variables required by parallel_pthread_create()
     94/////////////////////////////////////////////////////////////////////////////////////
     95
     96// 2D arrays of input arguments for the <sort> threads
     97// These arrays are initialised by the application main thread
     98
     99sort_args_t       sort_args[CLUSTERS_MAX][CORES_MAX];  // sort function arguments
     100sort_args_t     * sort_ptrs[CLUSTERS_MAX][CORES_MAX];  // pointers on arguments
     101
     102// 1D array of barriers to allow the <sort> threads to signal termination
     103// this array is initialised by the pthread_parallel_create() function
     104 
     105pthread_barrier_t parent_barriers[CLUSTERS_MAX];       // termination barrier
     106
    90107
    91108////////////////////////////////////
     
    157174}  // end merge()
    158175
    159 //////////////////////////////////////
    160 static void sort( const args_t * ptr )
     176//////////////////////////////
     177void sort( sort_args_t * ptr )
    161178{
    162     unsigned int       i;
    163     unsigned long long cycle;
    164     unsigned int       cxy;
    165     unsigned int       lid;
    166 
    167     int              * src_array  = NULL;
    168     int              * dst_array  = NULL;
    169 
    170     // get core coordinates an date
    171     get_core( &cxy , &lid );
    172     get_cycle( &cycle );
    173 
    174     unsigned int  thread_uid = ptr->thread_uid;
    175     unsigned int  threads    = ptr->threads;
    176     unsigned int  main_uid   = ptr->main_uid;
    177 
    178 #if DISPLAY_ARRAY
    179 unsigned int n;
    180 if( thread_uid == main_uid )
    181 {
    182     printf("\n*** array before sort\n");
    183     for( n=0; n<ARRAY_LENGTH; n++) printf("array[%d] = %d\n", n , array0[n] );
    184 }
     179    unsigned int        i;
     180    int               * src_array  = NULL;
     181    int               * dst_array  = NULL;
     182
     183    // get arguments
     184    unsigned int        tid            = ptr->tid;
     185    unsigned int        threads        = ptr->threads;
     186    pthread_barrier_t * parent_barrier = ptr->parent_barrier;
     187
     188    unsigned int        items      = ARRAY_LENGTH / threads;
     189    unsigned int        stages     = __builtin_ctz( threads ) + 1;
     190
     191#if DEBUG_SORT
     192printf("\n[sort] start : ptr %x / tid %d / threads %d / barrier %x\n",
     193ptr, tid, threads, parent_barrier );
     194#endif
     195
     196    bubbleSort( array0, items, items * tid );
     197
     198#if DEBUG_SORT
     199printf("\n[sort] thread[%d] : stage 0 completed\n", tid );
    185200#endif
    186201
     
    189204
    190205#if DEBUG_SORT
    191 if( thread_uid == 0 )
    192 printf("\n[sort] thread[%d] exit barrier 0\n", thread_uid );
    193 #endif
    194 
    195     unsigned int  items      = ARRAY_LENGTH / threads;
    196     unsigned int  stages     = __builtin_ctz( threads ) + 1;
    197 
    198 #if DEBUG_SORT
    199 if( thread_uid == 0 )
    200 printf("\n[sort] thread[%d] : start\n", thread_uid );
    201 #endif
    202 
    203     bubbleSort( array0, items, items * thread_uid );
    204 
    205 #if DEBUG_SORT
    206 if( thread_uid == 0 )
    207 printf("\n[sort] thread[%d] : stage 0 completed\n", thread_uid );
    208 #endif
    209 
    210     /////////////////////////////////
    211     pthread_barrier_wait( &barrier );
    212 
    213 #if DEBUG_SORT
    214 if( thread_uid == 0 )
    215 printf("\n[sort] thread[%d] exit barrier 0\n", thread_uid );
    216 #endif
    217 
    218 #if DISPLAY_ARRAY
    219 if( thread_uid == main_uid )
    220 {
    221     printf("\n*** array after bubble sort\n");
    222     for( n=0; n<ARRAY_LENGTH; n++) printf("array[%d] = %d\n", n , array0[n] );
    223 }
     206printf("\n[sort] thread[%d] exit barrier 0\n", tid );
    224207#endif
    225208
     
    239222        }
    240223
    241         if( (thread_uid & ((1<<i)-1)) == 0 )
    242         {
    243 
    244 #if DEBUG_SORT
    245 if( thread_uid == 0 )
    246 printf("\n[sort] thread[%d] : stage %d start\n", thread_uid , i );
     224        if( (tid & ((1<<i)-1)) == 0 )
     225        {
     226
     227#if DEBUG_SORT
     228printf("\n[sort] thread[%d] : stage %d start\n", tid , i );
    247229#endif
    248230            merge( src_array,
    249231                   dst_array,
    250232                   items << (i-1),
    251                    items * thread_uid,
    252                    items * (thread_uid + (1 << (i-1))),
    253                    items * thread_uid );
    254 
    255 #if DEBUG_SORT
    256 if( thread_uid == 0 )
    257 printf("\n[sort] thread[%d] : stage %d completed\n", thread_uid , i );
     233                   items * tid,
     234                   items * (tid + (1 << (i-1))),
     235                   items * tid );
     236
     237#if DEBUG_SORT
     238printf("\n[sort] thread[%d] : stage %d completed\n", tid , i );
    258239#endif
    259240        }
     
    263244
    264245#if DEBUG_SORT
    265 if( thread_uid == 0 )
    266 printf("\n[sort] thread[%d] exit barrier %d\n", thread_uid , i );
    267 #endif
    268 
    269 #if DISPLAY_ARRAY
    270 if( thread_uid == main_uid )
    271 {
    272     printf("\n*** array after merge %d\n", i );
    273     for( n=0; n<ARRAY_LENGTH; n++) printf("array[%d] = %d\n", n , dst_array[n] );
    274 }
     246printf("\n[sort] thread[%d] exit barrier %d\n", tid , i );
    275247#endif
    276248
    277249    }  // en for stages
    278250
    279     // all threads but the main thread exit
    280     if( thread_uid != main_uid ) pthread_exit( NULL );
     251    // sort thread signal completion to main thread
     252    pthread_barrier_wait( parent_barrier );
     253
     254#if DEBUG_SORT
     255printf("\n[sort] thread[%d] exit\n", tid );
     256#endif
     257
     258    // sort thread exit
     259    pthread_exit( NULL );
    281260
    282261} // end sort()
     
    291270    unsigned int           ncores;             // number of cores per cluster
    292271    unsigned int           total_threads;      // total number of threads
    293     unsigned int           thread_uid;         // user defined thread index
    294     unsigned int           main_cxy;           // cluster identifier for main
    295     unsigned int           main_x;             // X coordinate for main thread
    296     unsigned int           main_y;             // Y coordinate for main thread
    297     unsigned int           main_lid;           // core local index for main thread
    298     unsigned int           main_uid;           // thread user index for main thread
    299     unsigned int           x;                  // X coordinate for a thread
    300     unsigned int           y;                  // Y coordinate for a thread
     272    unsigned int           x;                  // X coordinate for a sort thread
     273    unsigned int           y;                  // Y coordinate for a sort thread
     274    unsigned int           cxy;                // cluster identifier for a sort thead
    301275    unsigned int           lid;                // core local index for a thread
     276    unsigned int           tid;                // sort thread continuous index
     277    pthread_barrierattr_t  barrier_attr;       // barrier attributes (used for DQT)
    302278    unsigned int           n;                  // index in array to sort
    303     pthread_barrierattr_t  barrier_attr;       // barrier attributes
    304279
    305280    unsigned long long     start_cycle;
     
    314289    total_threads = x_size * y_size * ncores;
    315290
    316     // get core coordinates and user index for the main thread
    317     get_core( &main_cxy , & main_lid );
    318     main_x   = HAL_X_FROM_CXY( main_cxy );
    319     main_y   = HAL_Y_FROM_CXY( main_cxy );
    320     main_uid = (((main_x * y_size) + main_y) * ncores) + main_lid;
     291    // compute covering DQT size an level
     292    unsigned int z = (x_size > y_size) ? x_size : y_size;
     293    unsigned int root_level = (z == 1) ? 0 : (z == 2) ? 1 : (z == 4) ? 2 : (z == 8) ? 3 : 4;
    321294
    322295    // checks number of threads
     
    326299         (total_threads != 512) && (total_threads != 1024) )
    327300    {
    328         printf("\n[sort error] number of cores must be power of 2\n");
     301        printf("\n[sort] ERROR : number of cores must be power of 2\n");
    329302        exit( 0 );
    330303    }
     
    333306    if ( ARRAY_LENGTH % total_threads)
    334307    {
    335         printf("\n[sort error] array size must be multiple of number of threads\n");
     308        printf("\n[sort] ERROR : array size must be multiple of number of threads\n");
    336309        exit( 0 );
    337310    }
     
    355328    if( error )
    356329    {
    357         printf("\n[sort error] cannot initialise barrier\n" );
     330        printf("\n[sort] ERROR : cannot initialise barrier\n" );
    358331        exit( 0 );
    359332    }
     
    370343    }
    371344
     345#if DISPLAY_ARRAY
     346    printf("\n*** array before sort\n");
     347    for( n=0; n<ARRAY_LENGTH; n++) printf("array[%d] = %d\n", n , array0[n] );
     348#endif
     349
    372350#if DEBUG_MAIN
    373351printf("\n[sort] main completes array init\n");
    374352#endif
    375353
    376     // launch other threads to execute sort() function
    377     // on cores other than the core running the main thread
    378     for ( x = 0 ; x < x_size ; x++ )
    379     {
    380         for ( y = 0 ; y < y_size ; y++ )
    381         {
     354    // build array of arguments for the <sort> threads
     355    for (x = 0 ; x < x_size ; x++)
     356    {
     357        for (y = 0 ; y < y_size ; y++)
     358        {
     359            // compute cluster identifier
     360            cxy = HAL_CXY_FROM_XY( x , y );
     361
    382362            for ( lid = 0 ; lid < ncores ; lid++ )
    383363            {
    384                 // compute thread user index (continuous index)
    385                 thread_uid = (((x * y_size) + y) * ncores) + lid;
    386 
    387                 // set arguments for all threads
    388                 arg[thread_uid].threads      = total_threads;
    389                 arg[thread_uid].thread_uid   = thread_uid;
    390                 arg[thread_uid].main_uid     = main_uid;
    391 
    392                 // set thread attributes for all threads
    393                 attr[thread_uid].attributes = PT_ATTR_CLUSTER_DEFINED | PT_ATTR_CORE_DEFINED;
    394                 attr[thread_uid].cxy        = HAL_CXY_FROM_XY( x , y );
    395                 attr[thread_uid].lid        = lid;
    396 
    397                 if( thread_uid != main_uid )
    398                 {
    399                     if ( pthread_create( &trdid[thread_uid],  // buffer for kernel identifier
    400                                          &attr[thread_uid],   // thread attributes
    401                                          &sort,               // entry function
    402                                          &arg[thread_uid] ) ) // sort arguments
    403                     {
    404                         printf("\n[sort error] main cannot create thread %x \n", thread_uid );
    405                         exit( 0 );
    406                     }
    407 
    408 #if (DEBUG_MAIN & 1)
    409 printf("\n[sort] main created thread %x \n", thread_uid );
    410 #endif
    411                 }
     364                // compute thread continuous index
     365                tid = (((x * y_size) + y) * ncores) + lid;
     366
     367                // initialize 2D array of arguments
     368                sort_args[cxy][lid].tid            = tid;
     369                sort_args[cxy][lid].threads        = total_threads;
     370                sort_args[cxy][lid].parent_barrier = &parent_barriers[cxy];
     371
     372                // initialize 2D array of pointers
     373                sort_ptrs[cxy][lid] = &sort_args[cxy][lid];
    412374            }
    413375        }
    414376    }
    415    
     377
    416378    ///////////////////////////
    417379    get_cycle( &seq_end_cycle );
     
    422384#endif
    423385
    424     // the main thread run also the sort() function
    425     sort( &arg[main_uid] );
    426 
    427     // wait other threads completion
    428     for ( x = 0 ; x < x_size ; x++ )
    429     {
    430         for ( y = 0 ; y < y_size ; y++ )
    431         {
    432             for ( lid = 0 ; lid < ncores ; lid++ )
    433             {
    434                 // compute thread continuous index
    435                 thread_uid = (((x * y_size) + y) * ncores) + lid;
    436 
    437                 if( thread_uid != main_uid )
    438                 {
    439                     if( pthread_join( trdid[thread_uid] , NULL ) )
    440                     {
    441                         printf("\n[fft error] in main thread %d joining thread %d\n",
    442                         main_uid , thread_uid );
    443                         exit( 0 );
    444                     }
    445                    
    446 #if (DEBUG_MAIN & 1)
    447 printf("\n[fft] main thread %d joined thread %d\n", main_uid, thread_uid );
    448 #endif
    449 
    450                 }
    451             }
    452         }
     386    // create and execute the working threads
     387    if( pthread_parallel_create( root_level,
     388                                 &sort,
     389                                 &sort_ptrs[0][0],
     390                                 &parent_barriers[0] ) )
     391    {
     392        printf("\n[sort] ERROR : cannot create threads\n");
     393        exit( 0 );
    453394    }
    454395
     
    456397    get_cycle( &para_end_cycle );
    457398
    458     printf("\n[sort] main completes parallel sort at cycle %d\n",
    459     (unsigned int)para_end_cycle );
     399#if DEBUG_main
     400printf("\n[sort] main completes parallel sort at cycle %d\n",
     401(unsigned int)para_end_cycle );
     402#endif
    460403
    461404    // destroy barrier
    462405    pthread_barrier_destroy( &barrier );
     406
     407#if DISPLAY_ARRAY
     408    printf("\n*** array after merge %d\n", i );
     409    for( n=0; n<ARRAY_LENGTH; n++) printf("array[%d] = %d\n", n , dst_array[n] );
     410#endif
    463411
    464412#if CHECK_RESULT
     
    492440    // build file name
    493441    if( USE_DQT_BARRIER )
    494     snprintf( name , 64 , "sort_dqt_%d_%d_%d", ARRAY_LENGTH, x_size * y_size, ncores );
     442    snprintf( name , 64 , "p_sort_dqt_%d_%d_%d", ARRAY_LENGTH, x_size * y_size, ncores );
    495443    else
    496     snprintf( name , 64 , "sort_smp_%d_%d_%d", ARRAY_LENGTH, x_size * y_size, ncores );
     444    snprintf( name , 64 , "p_sort_smp_%d_%d_%d", ARRAY_LENGTH, x_size * y_size, ncores );
    497445
    498446    // build file pathname
     
    515463    if( stream == NULL )
    516464    {
    517         printf("\n[sort error] cannot open instrumentation file <%s>\n", path );
     465        printf("\n[sort] ERROR : cannot open instrumentation file <%s>\n", path );
    518466        exit(0);
    519467    }
     
    532480    if( ret < 0 )
    533481    {
    534         printf("\n[sort error] cannot write to instrumentation file <%s>\n", path );
     482        printf("\n[sort] ERROR : cannot write to instrumentation file <%s>\n", path );
    535483        exit(0);
    536484    }
     
    548496    if( ret )
    549497    {
    550         printf("\n[sort error] cannot close instrumentation file <%s>\n", path );
     498        printf("\n[sort] ERROR : cannot close instrumentation file <%s>\n", path );
    551499        exit(0);
    552500    }
Note: See TracChangeset for help on using the changeset viewer.