#include #include #include "banking.h" #include "ipc.h" #include "pa2345.h" #include "dist.h" void child_phase0( dist_info_t *info, uint8_t id ) { close_redundant_pipes( info, id ); } void child_phase1( dist_info_t *info, uint8_t id ) { pid_t child_pid = getpid(); char buf[MAX_MESSAGE_LEN]; uint8_t i; my_info_t me = { .id = id, .dist_info = info }; int size = snprintf( buf, sizeof(buf), log_started_fmt, get_physical_time(), id, child_pid, info->parent_pid, info->balances[ id ] ); Message msg = { .s_header = { .s_magic = MESSAGE_MAGIC, .s_payload_len = size + 1, .s_type = STARTED, .s_local_time = get_physical_time() } }; fputs( buf, info->events_log ); fputs( buf, stdout ); strncpy( msg.s_payload, buf, sizeof( msg.s_payload ) ); send_multicast( &me, &msg ); for( i = 1; i <= info->x; i++ ) { if( i == me.id ) continue; if( !receive( &me, i, &msg ) ) { if( msg.s_header.s_type != STARTED ) { fprintf( stderr, "Message type INVALID (not STARTED)!\n" ); break; } } } snprintf( buf, sizeof(buf), log_received_all_started_fmt, get_physical_time(), id ); fputs( buf, info->events_log ); fputs( buf, stdout ); } void child_phase2( dist_info_t *info, uint8_t id, BalanceHistory *history ) { Message msg; BalanceState balanceState; char buf[MAX_MESSAGE_LEN]; my_info_t me = { .id = id, .dist_info = info }; uint8_t i; uint8_t last_order_time = 0; balanceState.s_balance = info->balances[ id ]; balanceState.s_time = 0; balanceState.s_balance_pending_in = 0; history->s_history[ last_order_time ] = balanceState; int running = 1; while( running ) { if( !receive_any( &me, &msg ) ) { switch( msg.s_header.s_type ) { case TRANSFER: { TransferOrder *order = (TransferOrder *) msg.s_payload; uint8_t cur_order_time = get_physical_time(); for( i = last_order_time + 1; i < cur_order_time; i++ ) { balanceState.s_balance = info->balances[ id ]; balanceState.s_time = i; balanceState.s_balance_pending_in = 0; history->s_history[ i ] = balanceState; } if( order->s_src == id ) { snprintf( buf, sizeof(buf), log_transfer_out_fmt, get_physical_time(), order->s_src, order->s_amount, order->s_dst ); fputs( buf, info->events_log ); fputs( buf, stdout ); info->balances[ id ] -= order->s_amount; msg.s_header.s_local_time = get_physical_time(); send( &me, order->s_dst, &msg ); } else if( order->s_dst == id ) { snprintf( buf, sizeof(buf), log_transfer_in_fmt, get_physical_time(), order->s_dst, order->s_amount, order->s_src); fputs( buf, info->events_log ); fputs( buf, stdout ); Message stopMsg = { .s_header = { .s_magic = MESSAGE_MAGIC, .s_payload_len = 0, .s_type = ACK, .s_local_time = get_physical_time() } }; info->balances[ id ] += order->s_amount; send( &me, PARENT_ID, &stopMsg ); } balanceState.s_balance = info->balances[ id ]; balanceState.s_time = cur_order_time; balanceState.s_balance_pending_in = 0; history->s_history[ cur_order_time ] = balanceState; history->s_history_len = cur_order_time + 1; last_order_time = cur_order_time; break; } case STOP: running = 0; break; } } } } void child_phase3( dist_info_t *info, uint8_t id, BalanceHistory *history ) { char buf[MAX_MESSAGE_LEN]; uint8_t i; my_info_t me = { .id = id, .dist_info = info }; int size = snprintf( buf, sizeof(buf), log_done_fmt, get_physical_time(), id, info->balances[ id ] ); Message msg = { .s_header = { .s_magic = MESSAGE_MAGIC, .s_payload_len = size + 1, .s_type = DONE, .s_local_time = get_physical_time() } }; fputs( buf, info->events_log ); fputs( buf, stdout ); strncpy( msg.s_payload, buf, sizeof( msg.s_payload ) ); send_multicast( &me, &msg ); for( i = 1; i <= info->x; i++ ) { if( i == me.id ) continue; if( !receive( &me, i, &msg ) ) { if( msg.s_header.s_type != DONE ) { fprintf( stderr, "Message type INVALID (not DONE)!\n" ); break; } } } snprintf( buf, sizeof(buf), log_received_all_done_fmt, get_physical_time(), id ); fputs( buf, info->events_log ); fputs( buf, stdout ); int time = get_physical_time(); for( i = history->s_history_len; i <= time; i++ ) { history->s_history[i].s_balance = history->s_history[ history->s_history_len - 1].s_balance; history->s_history[i].s_time = i; } history->s_history_len = time+1; int wr_size = history->s_history_len * sizeof(history->s_history[0]); Message historyMsg = { .s_header = { .s_magic = MESSAGE_MAGIC, .s_payload_len = wr_size, .s_type = BALANCE_HISTORY, .s_local_time = time } }; memcpy( historyMsg.s_payload, history, wr_size ); send( &me, PARENT_ID, &historyMsg ); } void child_workflow( dist_info_t *info, uint8_t id ) { BalanceHistory history = { .s_id = id }; /* CHILD PHASE 0: Close all redundant descriptors */ child_phase0(info, id); /* CHILD PHASE 1: Send STARTED message for all, receive STARTED * messages from other childs */ child_phase1(info, id); /* CHILD PHASE 2: Do some useful work */ child_phase2(info, id, &history); /* CHILD PHASE 3: Send DONE message for all, receive DONE messages from * other childs and exit */ child_phase3(info, id, &history); }