/* * pipe.c - single writer, single reader pipe implementation * * Author Alain Greiner (2016,2017,2018,2019,2020) * * Copyright (c) UPMC Sorbonne Universites * * This file is part of ALMOS-MKH. * * ALMOS-MKH is free software; you can redistribute it and/or modify it * under the terms of the GNU General Public License as published by * the Free Software Foundation; version 2.0 of the License. * * ALMOS-MKH is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * General Public License for more details. * * You should have received a copy of the GNU General Public License * along with ALMOS-MKH; if not, write to the Free Software Foundation, * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ #include #include #include #include #include /////////////////////////////////// pipe_t * pipe_create( cxy_t cxy, uint32_t size ) { remote_buf_t * buf; pipe_t * pipe; error_t error; // 2. allocate memory for remote_buf descriptor buf = remote_buf_alloc( cxy ); if( buf == NULL ) { return NULL; } // 1. initialize it, and allocate memory for associated data buffer error = remote_buf_init( XPTR( cxy , buf ), bits_log2( size ) ); if( error ) { remote_buf_destroy( XPTR( cxy , buf ) ); return NULL; } // 3. allocate memory for pipe descriptor pipe = kmem_remote_alloc( cxy , bits_log2(sizeof(pipe_t)) , AF_ZERO ); if( pipe == NULL ) { remote_buf_destroy( XPTR( cxy , buf ) ); return NULL; } // initialise it hal_remote_spt( XPTR( cxy , &pipe->buffer ) , buf ); return pipe; } // end pipe_create() /////////////////////////////////// void pipe_destroy( xptr_t pipe_xp ) { pipe_t * pipe_ptr = GET_PTR( pipe_xp ); cxy_t pipe_cxy = GET_CXY( pipe_xp ); // get pointer on remote_buf descriptor remote_buf_t * buf = hal_remote_lpt( XPTR( pipe_cxy , &pipe_ptr->buffer )); // release remote_buf descriptor and data buffer remote_buf_destroy( XPTR( pipe_cxy , buf ) ); // release pipe descriptor kmem_remote_free( pipe_cxy , pipe_ptr , bits_log2(sizeof(pipe_t)) ); } // end pipe_destroy() ////////////////////////////////////////// void pipe_register_writer( xptr_t pipe_xp, xptr_t thread_xp ) { // get cluster and local pointer on pipe descriptor cxy_t pipe_cxy = GET_CXY( pipe_xp ); pipe_t * pipe_ptr = GET_PTR( pipe_xp ); // update "writer_xp" field hal_remote_s64( XPTR( pipe_cxy , &pipe_ptr->writer_xp ) , thread_xp ); } // end pipe_register_writer() ////////////////////////////////////////// void pipe_register_reader( xptr_t pipe_xp, xptr_t thread_xp ) { // get cluster and local pointer on pipe descriptor cxy_t pipe_cxy = GET_CXY( pipe_xp ); pipe_t * pipe_ptr = GET_PTR( pipe_xp ); // update "reader_xp" field hal_remote_s64( XPTR( pipe_cxy , &pipe_ptr->reader_xp ) , thread_xp ); } // end pipe_register_reader() /////////////////////////////////////// int pipe_user_move( bool_t to_buffer, xptr_t file_xp, void * u_buf, uint32_t size ) { int nbytes; // number of bytes actually moved thread_t * this = CURRENT_THREAD; assert( __FUNCTION__, ( file_xp != XPTR_NULL ) , "file_xp == XPTR_NULL" ); #if DEBUG_PIPE_MOVE uint32_t cycle = (uint32_t)hal_get_cycles(); if( DEBUG_PIPE_MOVE < cycle ) printk("\n[%s] thread[%x,%x] enter / to_user %d / cycle %d\n", __FUNCTION__, this->process->pid, this->trdid, to_buffer, cycle ); #endif // get cluster and local pointer on file descriptor cxy_t file_cxy = GET_CXY( file_xp ); vfs_file_t * file_ptr = GET_PTR( file_xp ); // get local pointer on remote pipe, and extended pointers on reader & writer pipe_t * pipe_ptr = hal_remote_lpt( XPTR( file_cxy , &file_ptr->pipe )); xptr_t reader_xp = hal_remote_l64( XPTR( file_cxy , &pipe_ptr->reader_xp )); xptr_t writer_xp = hal_remote_l64( XPTR( file_cxy , &pipe_ptr->writer_xp )); assert( __FUNCTION__ , (pipe_ptr != NULL) , "pipe_ptr cannot be NULL" ); assert( __FUNCTION__ , (reader_xp != XPTR_NULL) , "reader_xp cannot be NULL" ); assert( __FUNCTION__ , (writer_xp != XPTR_NULL) , "writer_xp cannot be NULL" ); // get pointers on associated remote_buf remote_buf_t * buf_ptr = hal_remote_lpt( XPTR( file_cxy , &pipe_ptr->buffer )); xptr_t buf_xp = XPTR( file_cxy , buf_ptr ); ///////////////// pipe read if( to_buffer ) { assert( __FUNCTION__, (reader_xp == XPTR(local_cxy , this)) , "illegal reader thread" ); // wait remote_buf_t non empty while( 1 ) { // get remote_buf status int status = (int)remote_buf_status( buf_xp ); // move data when remote_buf non empty if( status ) { // compute min(status , count) nbytes = (status < (int)size) ? status : (int)size; // move nbytes to user buffer remote_buf_get_to_user( buf_xp, u_buf, nbytes ); // unblock writer thread thread_unblock( writer_xp , THREAD_BLOCKED_IO ); // exit loop break; } else // block and deschedule reader thread when remote_buf empty { thread_block( reader_xp , THREAD_BLOCKED_IO ); sched_yield( "pipe empty" ); } } // end while } ////////////////// pipe write else { assert( __FUNCTION__, (writer_xp == XPTR(local_cxy , this)) , "illegal writer thread" ); // wait remote_buf_t non full while( 1 ) { // get remote_buf status int status = (int)remote_buf_status( buf_xp ); // get remote_buf length int buflen = 1 << hal_remote_l32( XPTR( file_cxy , &buf_ptr->order )); // move data when remote_buf non full if( status < buflen ) { // compute min( buflen - status , size) nbytes = ((buflen - status) < (int)size) ? (buflen - status) : (int)size; // move nbytes from user buffer remote_buf_put_from_user( buf_xp, u_buf, nbytes ); // unblock reader thread thread_unblock( reader_xp , THREAD_BLOCKED_IO ); // exit loop break; } else // block and deschedule writer thread when remote_buf full { thread_block( writer_xp , THREAD_BLOCKED_IO ); sched_yield( "pipe full" ); } } // end while } #if DEBUG_PIPE_MOVE cycle = (uint32_t)hal_get_cycles(); if( DEBUG_PIPE_MOVE < cycle ) printk("\n[%s] thread[%x,%x] exit / to_user %d / nbytes %d / cycle %d\n", __FUNCTION__, this->process->pid, this->trdid, to_buffer, nbytes, cycle ); #endif return nbytes; } // end pipe_user_move()