source: trunk/kernel/kern/pipe.c @ 683

Last change on this file since 683 was 683, checked in by alain, 3 years ago

All modifications required to support the <tcp_chat> application
including error recovery in case of packet loss.A

File size: 7.4 KB
Line 
1/*
2 * pipe.c - single writer, single reader pipe implementation           
3 *
4 * Author     Alain Greiner     (2016,2017,2018,2019,2020)
5 *
6 * Copyright (c) UPMC Sorbonne Universites
7 *
8 * This file is part of ALMOS-MKH.
9 *
10 * ALMOS-MKH is free software; you can redistribute it and/or modify it
11 * under the terms of the GNU General Public License as published by
12 * the Free Software Foundation; version 2.0 of the License.
13 *
14 * ALMOS-MKH is distributed in the hope that it will be useful, but
15 * WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17 * General Public License for more details.
18 *
19 * You should have received a copy of the GNU General Public License
20 * along with ALMOS-MKH; if not, write to the Free Software Foundation,
21 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
22 */
23
24#include <kernel_config.h>
25#include <hal_kernel_types.h>
26#include <printk.h>
27#include <thread.h>
28#include <pipe.h>
29
30///////////////////////////////////
31pipe_t * pipe_create( cxy_t    cxy,
32                      uint32_t size )
33{
34    remote_buf_t * buf;
35    pipe_t       * pipe;
36    error_t        error;
37
38    // 2. allocate memory for remote_buf descriptor
39    buf = remote_buf_alloc( cxy );
40
41    if( buf == NULL )
42    {
43       return NULL;
44    }
45
46    // 1. initialize it, and allocate memory for associated data buffer
47    error = remote_buf_init( XPTR( cxy , buf ),
48                             bits_log2( size ) );
49    if( error )
50    {
51        remote_buf_destroy( XPTR( cxy , buf ) );
52        return NULL;
53    }
54
55    // 3. allocate memory for pipe descriptor
56    pipe = kmem_remote_alloc( cxy , bits_log2(sizeof(pipe_t)) , AF_ZERO );
57
58    if( pipe == NULL )
59    {
60        remote_buf_destroy( XPTR( cxy , buf ) );
61        return NULL;
62    }
63
64    // initialise it
65    hal_remote_spt( XPTR( cxy , &pipe->buffer ) , buf );
66
67    return pipe;
68
69}  // end pipe_create()
70
71///////////////////////////////////
72void pipe_destroy( xptr_t pipe_xp )
73{
74    pipe_t * pipe_ptr = GET_PTR( pipe_xp );
75    cxy_t    pipe_cxy = GET_CXY( pipe_xp );
76
77    // get pointer on remote_buf descriptor
78    remote_buf_t * buf = hal_remote_lpt( XPTR( pipe_cxy , &pipe_ptr->buffer ));
79
80    // release remote_buf descriptor and data buffer
81    remote_buf_destroy( XPTR( pipe_cxy , buf ) );
82
83    // release pipe descriptor
84    kmem_remote_free( pipe_cxy , pipe_ptr , bits_log2(sizeof(pipe_t)) );
85
86}  // end pipe_destroy()
87
88//////////////////////////////////////////
89void pipe_register_writer( xptr_t pipe_xp,
90                           xptr_t thread_xp )
91{
92    // get cluster and local pointer on pipe descriptor
93    cxy_t    pipe_cxy = GET_CXY( pipe_xp );
94    pipe_t * pipe_ptr = GET_PTR( pipe_xp );
95
96    // update "writer_xp" field
97    hal_remote_s64( XPTR( pipe_cxy , &pipe_ptr->writer_xp ) , thread_xp );
98
99}  // end pipe_register_writer()
100
101//////////////////////////////////////////
102void pipe_register_reader( xptr_t pipe_xp,
103                           xptr_t thread_xp )
104{
105    // get cluster and local pointer on pipe descriptor
106    cxy_t    pipe_cxy = GET_CXY( pipe_xp );
107    pipe_t * pipe_ptr = GET_PTR( pipe_xp );
108
109    // update "reader_xp" field
110    hal_remote_s64( XPTR( pipe_cxy , &pipe_ptr->reader_xp ) , thread_xp );
111
112}  // end pipe_register_reader()
113
114///////////////////////////////////////
115int pipe_user_move( bool_t   to_buffer,
116                    xptr_t   file_xp,
117                    void   * u_buf,
118                    uint32_t size )
119{
120    int        nbytes;      // number of bytes actually moved
121
122    thread_t * this  = CURRENT_THREAD;
123   
124assert( __FUNCTION__, ( file_xp != XPTR_NULL ) , "file_xp == XPTR_NULL" );
125
126#if DEBUG_PIPE_MOVE
127uint32_t   cycle = (uint32_t)hal_get_cycles();
128if( DEBUG_PIPE_MOVE < cycle )
129printk("\n[%s] thread[%x,%x] enter / to_user %d / cycle %d\n",
130__FUNCTION__, this->process->pid, this->trdid, to_buffer, cycle );
131#endif
132
133    // get cluster and local pointer on file descriptor
134    cxy_t        file_cxy = GET_CXY( file_xp );
135    vfs_file_t * file_ptr = GET_PTR( file_xp );
136
137    // get local pointer on remote pipe, and extended pointers on reader & writer
138    pipe_t * pipe_ptr = hal_remote_lpt( XPTR( file_cxy , &file_ptr->pipe ));
139    xptr_t reader_xp  = hal_remote_l64( XPTR( file_cxy , &pipe_ptr->reader_xp ));
140    xptr_t writer_xp  = hal_remote_l64( XPTR( file_cxy , &pipe_ptr->writer_xp ));
141
142assert( __FUNCTION__ , (pipe_ptr != NULL)       , "pipe_ptr cannot be NULL" );
143assert( __FUNCTION__ , (reader_xp != XPTR_NULL) , "reader_xp cannot be NULL" );
144assert( __FUNCTION__ , (writer_xp != XPTR_NULL) , "writer_xp cannot be NULL" );
145
146    // get pointers on associated remote_buf
147    remote_buf_t * buf_ptr = hal_remote_lpt( XPTR( file_cxy , &pipe_ptr->buffer ));
148    xptr_t         buf_xp  = XPTR( file_cxy , buf_ptr );
149
150    ///////////////// pipe read
151    if( to_buffer )
152    {
153
154assert( __FUNCTION__, (reader_xp == XPTR(local_cxy , this)) , "illegal reader thread" );
155
156        // wait remote_buf_t non empty
157        while( 1 )
158        {
159            // get remote_buf status
160            int status = (int)remote_buf_status( buf_xp );
161
162            // move data when remote_buf non empty
163            if( status )   
164            {
165                // compute min(status , count)
166                nbytes = (status < (int)size) ? status : (int)size;
167
168                // move nbytes to user buffer
169                remote_buf_get_to_user( buf_xp,
170                                        u_buf,
171                                        nbytes );
172
173                // unblock writer thread
174                thread_unblock( writer_xp , THREAD_BLOCKED_IO );
175
176                // exit loop
177                break; 
178            }
179            else  // block and deschedule reader thread when remote_buf empty
180            {
181                thread_block( reader_xp , THREAD_BLOCKED_IO );
182                sched_yield( "pipe empty" );
183            }
184        }  // end while
185    }
186    ////////////////// pipe write
187    else
188    {
189
190assert( __FUNCTION__, (writer_xp == XPTR(local_cxy , this)) , "illegal writer thread" );
191
192        // wait remote_buf_t non full
193        while( 1 )
194        {
195            // get remote_buf status
196            int status = (int)remote_buf_status( buf_xp );
197
198            // get remote_buf length
199            int buflen  = 1 << hal_remote_l32( XPTR( file_cxy , &buf_ptr->order )); 
200
201            // move data when remote_buf non full
202            if( status < buflen )   
203            {
204                // compute min( buflen - status , size)
205                nbytes = ((buflen - status) < (int)size) ? (buflen - status) : (int)size;
206
207                // move nbytes from user buffer
208                remote_buf_put_from_user( buf_xp,
209                                          u_buf,
210                                          nbytes );
211
212                // unblock reader thread
213                thread_unblock( reader_xp , THREAD_BLOCKED_IO );
214
215                // exit loop
216                break; 
217            }
218            else  // block and deschedule writer thread when remote_buf full
219            {
220                thread_block( writer_xp , THREAD_BLOCKED_IO );
221                sched_yield( "pipe full" );
222            }
223        }  // end while
224    }
225
226#if DEBUG_PIPE_MOVE
227cycle = (uint32_t)hal_get_cycles();
228if( DEBUG_PIPE_MOVE < cycle )
229printk("\n[%s] thread[%x,%x] exit / to_user %d / nbytes %d / cycle %d\n",
230__FUNCTION__, this->process->pid, this->trdid, to_buffer, nbytes, cycle );
231#endif
232
233    return nbytes;
234
235}  // end pipe_user_move()
236
237
238
239
Note: See TracBrowser for help on using the repository browser.