diff options
author | Sergey Nazaryev <sergey@nazaryev.ru> | 2016-05-15 07:41:38 +0000 |
---|---|---|
committer | Sergey Nazaryev <sergey@nazaryev.ru> | 2016-05-15 07:41:38 +0000 |
commit | b773e91c55a2a5183931c3a7642ecf84e3720022 (patch) | |
tree | 2c9b08b8e6040fffc0df8911a69eb81f645be7bc | |
parent | 4f36da5df51ce984201b56b794f04fea8c8e9e9e (diff) | |
download | pa3-master.zip pa3-master.tar.gz pa3-master.tar.bz2 |
-rw-r--r-- | child.c | 82 | ||||
-rw-r--r-- | ipc.c | 19 | ||||
-rw-r--r-- | pa3.c | 2 |
3 files changed, 62 insertions, 41 deletions
@@ -10,7 +10,6 @@ void child_phase0( dist_info_t *info, uint8_t id ) { lamport_init( &gl_lamport_time ); - lamport_increase( &gl_lamport_time ); close_redundant_pipes( info, id ); } @@ -29,6 +28,7 @@ void child_phase1( dist_info_t *info, uint8_t id ) { info->parent_pid, info->balances[ id ] ); fputs( buf, info->events_log ); + fflush( info->events_log ); fputs( buf, stdout ); lamport_increase( &gl_lamport_time ); @@ -59,7 +59,9 @@ void child_phase1( dist_info_t *info, uint8_t id ) { snprintf( buf, sizeof(buf), log_received_all_started_fmt, get_lamport_time(), id ); fputs( buf, info->events_log ); + fflush( info->events_log ); fputs( buf, stdout ); + fflush( info->events_log ); } void child_phase2( dist_info_t *info, uint8_t id, BalanceHistory *history ) { @@ -76,9 +78,11 @@ void child_phase2( dist_info_t *info, uint8_t id, BalanceHistory *history ) { uint8_t last_order_time = 0; balanceState.s_balance = info->balances[ id ]; - balanceState.s_time = 0; + balanceState.s_time = last_order_time; balanceState.s_balance_pending_in = 0; + history->s_history[ last_order_time ] = balanceState; + history->s_history_len = 1; int running = 1; while( running ) { @@ -87,37 +91,35 @@ void child_phase2( dist_info_t *info, uint8_t id, BalanceHistory *history ) { switch( msg.s_header.s_type ) { case TRANSFER: { TransferOrder *order = (TransferOrder *) msg.s_payload; - uint8_t cur_order_time = get_lamport_time(); - - for( i = last_order_time + 1; i < cur_order_time; i++ ) { - balanceState.s_balance = info->balances[ id ]; - balanceState.s_time = i; - balanceState.s_balance_pending_in = 0; - history->s_history[ i ] = balanceState; - } - if( order->s_src == id ) { + lamport_increase( &gl_lamport_time ); + + info->balances[ id ] -= order->s_amount; + int oldBp = history->s_history[ history->s_history_len - 1 ].s_balance_pending_in; + balanceState.s_balance = info->balances[ id ]; + balanceState.s_time = get_lamport_time(); + balanceState.s_balance_pending_in = oldBp + order->s_amount; + + msg.s_header.s_local_time = get_lamport_time(); + send( &me, order->s_dst, &msg ); + snprintf( buf, sizeof(buf), log_transfer_out_fmt, get_lamport_time(), order->s_src, order->s_amount, order->s_dst ); fputs( buf, info->events_log ); + fflush( info->events_log ); fputs( buf, stdout ); - - info->balances[ id ] -= order->s_amount; - - //lamport_increase( &gl_lamport_time ); - //msg.s_header.s_local_time = get_lamport_time(); - send( &me, order->s_dst, &msg ); } else if( order->s_dst == id ) { - snprintf( buf, sizeof(buf), log_transfer_in_fmt, - get_lamport_time(), order->s_dst, - order->s_amount, order->s_src); - fputs( buf, info->events_log ); - fputs( buf, stdout ); - lamport_increase( &gl_lamport_time ); - Message stopMsg = { + info->balances[ id ] += order->s_amount; + int oldBp = history->s_history[ history->s_history_len - 1 ].s_balance_pending_in; + + balanceState.s_balance = info->balances[ id ]; + balanceState.s_time = get_lamport_time(); + balanceState.s_balance_pending_in = oldBp - order->s_amount; + + Message ackMsg = { .s_header = { .s_magic = MESSAGE_MAGIC, .s_payload_len = 0, @@ -125,17 +127,28 @@ void child_phase2( dist_info_t *info, uint8_t id, BalanceHistory *history ) { .s_local_time = get_lamport_time() } }; - info->balances[ id ] += order->s_amount; - send( &me, PARENT_ID, &stopMsg ); + send( &me, PARENT_ID, &ackMsg ); + + snprintf( buf, sizeof(buf), log_transfer_in_fmt, + get_lamport_time(), order->s_dst, + order->s_amount, order->s_src); + fputs( buf, info->events_log ); + fflush( info->events_log ); + fputs( buf, stdout ); } - balanceState.s_balance = info->balances[ id ]; - balanceState.s_time = cur_order_time; - balanceState.s_balance_pending_in = 0; + int cur_time = get_lamport_time(); + + history->s_history[ cur_time ] = balanceState; + history->s_history_len = cur_time + 1; - history->s_history[ cur_order_time ] = balanceState; - history->s_history_len = cur_order_time + 1; - last_order_time = cur_order_time; + for( i = last_order_time + 1; i < cur_time; i++ ) { + balanceState = history->s_history[ i-1 ]; + balanceState.s_time = i; + history->s_history[ i ] = balanceState; + } + + last_order_time = cur_time; break; } case STOP: @@ -159,6 +172,7 @@ void child_phase3( dist_info_t *info, uint8_t id, BalanceHistory *history ) { get_lamport_time(), id, info->balances[ id ] ); fputs( buf, info->events_log ); + fflush( info->events_log ); fputs( buf, stdout ); lamport_increase( &gl_lamport_time ); @@ -190,11 +204,13 @@ void child_phase3( dist_info_t *info, uint8_t id, BalanceHistory *history ) { snprintf( buf, sizeof(buf), log_received_all_done_fmt, get_lamport_time(), id ); fputs( buf, info->events_log ); + fflush( info->events_log ); fputs( buf, stdout ); int time = get_lamport_time(); for( i = history->s_history_len; i <= time; i++ ) { history->s_history[i].s_balance = history->s_history[ history->s_history_len - 1].s_balance; + history->s_history[i].s_balance_pending_in = history->s_history[ history->s_history_len - 1].s_balance_pending_in; history->s_history[i].s_time = i; } @@ -208,7 +224,7 @@ void child_phase3( dist_info_t *info, uint8_t id, BalanceHistory *history ) { .s_magic = MESSAGE_MAGIC, .s_payload_len = wr_size, .s_type = BALANCE_HISTORY, - .s_local_time = time + .s_local_time = get_lamport_time() } }; memcpy( historyMsg.s_payload, history, wr_size ); @@ -75,11 +75,13 @@ int receive(void * self, local_id from, Message * msg) { int descriptor = me->dist_info->pipes[idx][0]; char buf[MAX_MESSAGE_LEN]; - int size; - + int size_hdr, size_payload; + char *msgbuf = (char *) msg; while( 1 ) { - if( ( size = read( descriptor, buf, sizeof( buf ) ) ) > 0 ) { - memcpy( msg, buf, sizeof( *msg ) ); + if( ( size_hdr = read( descriptor, buf, sizeof( MessageHeader ) ) ) > 0 ) { + memcpy( msgbuf, buf, size_hdr ); + size_payload = read( descriptor, buf, msg->s_header.s_payload_len ); + memcpy( msgbuf + size_hdr, buf, size_payload ); return 0; } usleep(100000); // sleep 100 ms @@ -112,12 +114,15 @@ int receive_any(void * self, Message * msg) { } char buf[MAX_MESSAGE_LEN]; - int size; + int size_hdr, size_payload; + char *msgbuf = (char *) msg; while( 1 ) { for( i = 0; i < j; i++ ) { - if( ( size = read( descriptors[i], buf, sizeof( buf ) ) ) > 0 ) { - memcpy( msg, buf, sizeof( *msg ) ); + if( ( size_hdr = read( descriptors[i], buf, sizeof( MessageHeader ) ) ) > 0 ) { + memcpy( msgbuf, buf, size_hdr ); + size_payload = read( descriptors[i], buf, msg->s_header.s_payload_len ); + memcpy( msgbuf + size_hdr, buf, size_payload ); free( descriptors ); return 0; } @@ -56,7 +56,7 @@ int main(int argc, char* argv[]) { // open all (N * (N-1)) pipes for a full mesh topology for( i = 0; i < info.proccnt * (info.proccnt - 1); i++ ) { - pipe2( info.pipes[i], O_NONBLOCK | O_DIRECT ); + pipe2( info.pipes[i], O_NONBLOCK ); fprintf( info.pipes_log, "Opened pipe #%d (%d;%d)\n", i, info.pipes[i][0], info.pipes[i][1] ); } fclose( info.pipes_log ); |