diff options
author | Sergey Nazaryev <sergey@nazaryev.ru> | 2016-04-24 13:13:18 +0000 |
---|---|---|
committer | Sergey Nazaryev <sergey@nazaryev.ru> | 2016-04-24 13:13:18 +0000 |
commit | 4ac1cfd94f5d0ada8234b63ba6e92da0f5a3d770 (patch) | |
tree | 5527df827bfcba6927b453cec653de292b615d1b | |
parent | 51aeb1fb630713d6bbb9ac1c403f5068d22b7dc5 (diff) | |
download | pa4-4ac1cfd94f5d0ada8234b63ba6e92da0f5a3d770.zip pa4-4ac1cfd94f5d0ada8234b63ba6e92da0f5a3d770.tar.gz pa4-4ac1cfd94f5d0ada8234b63ba6e92da0f5a3d770.tar.bz2 |
First release of PA2 lab
-rw-r--r-- | .version | 1 | ||||
-rw-r--r-- | Makefile | 4 | ||||
-rw-r--r-- | banking.c | 38 | ||||
-rw-r--r-- | child.c | 132 | ||||
-rw-r--r-- | dist.h | 2 | ||||
-rw-r--r-- | ipc.c | 3 | ||||
-rw-r--r-- | pa1.h | 28 | ||||
-rw-r--r-- | pa23.c | 9 | ||||
-rw-r--r-- | parent.c | 46 |
9 files changed, 206 insertions, 57 deletions
diff --git a/.version b/.version deleted file mode 100644 index e673a57..0000000 --- a/.version +++ /dev/null @@ -1 +0,0 @@ -38f0c5ce6aa00c87b4fb0155fc3889e64d1ee93d @@ -3,10 +3,10 @@ CC = gcc all: pa2 -pa2: pa23.o parent.o child.o ipc.o dist.o +pa2: bank_robbery.o banking.o pa23.o parent.o child.o ipc.o dist.o $(CC) -o $@ $^ -L. -lruntime -*.o: *.c +%.o: %.c $(CC) -c -o $@ $< $(CFLAGS) clean: diff --git a/banking.c b/banking.c new file mode 100644 index 0000000..2562645 --- /dev/null +++ b/banking.c @@ -0,0 +1,38 @@ +#include <string.h> + +#include "banking.h" +#include "ipc.h" + +#include "dist.h" + +void transfer(void * parent_data, local_id src, local_id dst, balance_t amount) +{ + my_info_t *me = (my_info_t *) parent_data; + if( me->dist_info == NULL ) + return; + + TransferOrder order = { + .s_src = src, + .s_dst = dst, + .s_amount = amount + }; + + Message msg = { + .s_header = { + .s_magic = MESSAGE_MAGIC, + .s_payload_len = sizeof( order ), + .s_type = TRANSFER, + .s_local_time = get_physical_time() + } + }; + memcpy( msg.s_payload, &order, sizeof( order ) ); + + send( me, src, &msg ); + if( !receive( me, dst, &msg ) ) { + if( msg.s_header.s_type != ACK ) { + return; + } + } + + return; +} @@ -1,9 +1,11 @@ #include <stdint.h> #include <string.h> -#include "dist.h" +#include "banking.h" #include "ipc.h" -#include "pa1.h" +#include "pa2345.h" + +#include "dist.h" void child_phase0( dist_info_t *info, uint8_t id ) { close_redundant_pipes( info, id ); @@ -11,7 +13,7 @@ void child_phase0( dist_info_t *info, uint8_t id ) { void child_phase1( dist_info_t *info, uint8_t id ) { pid_t child_pid = getpid(); - char buf[512]; + char buf[MAX_MESSAGE_LEN]; uint8_t i; my_info_t me = { @@ -19,14 +21,17 @@ void child_phase1( dist_info_t *info, uint8_t id ) { .dist_info = info }; - int size = snprintf( buf, sizeof(buf), log_started_fmt, id, child_pid, info->parent_pid ); + int size = snprintf( buf, sizeof(buf), log_started_fmt, get_physical_time(), + id, child_pid, + info->parent_pid, + info->balances[ id ] ); Message msg = { .s_header = { .s_magic = MESSAGE_MAGIC, .s_payload_len = size + 1, .s_type = STARTED, - .s_local_time = 0 + .s_local_time = get_physical_time() } }; @@ -48,15 +53,88 @@ void child_phase1( dist_info_t *info, uint8_t id ) { } } - snprintf( buf, sizeof(buf), log_received_all_started_fmt, id ); + snprintf( buf, sizeof(buf), log_received_all_started_fmt, get_physical_time(), id ); fputs( buf, info->events_log ); fputs( buf, stdout ); } -void child_phase2( dist_info_t *info, uint8_t id ) { +void child_phase2( dist_info_t *info, uint8_t id, BalanceHistory *history ) { + Message msg; + BalanceState balanceState; + char buf[MAX_MESSAGE_LEN]; + + my_info_t me = { + .id = id, + .dist_info = info + }; + + uint8_t i; + uint8_t last_order_time = -1; + + int running = 1; + while( running ) { + if( !receive_any( &me, &msg ) ) { + switch( msg.s_header.s_type ) { + case TRANSFER: { + if( msg.s_payload == NULL ) + return; + + TransferOrder *order = (TransferOrder *) msg.s_payload; + uint8_t cur_order_time = msg.s_header.s_local_time; + + for( i = last_order_time + 1; i < cur_order_time; i++ ) { + balanceState.s_balance = info->balances[ id ]; + balanceState.s_time = i; + balanceState.s_balance_pending_in = 0; + history->s_history[ i ] = balanceState; + } + + if( order->s_src == id ) { + snprintf( buf, sizeof(buf), log_transfer_out_fmt, + get_physical_time(), order->s_src, + order->s_amount, order->s_dst ); + fputs( buf, info->events_log ); + fputs( buf, stdout ); + + info->balances[ id ] -= order->s_amount; + send( &me, order->s_dst, &msg ); + } else if( order->s_dst == id ) { + snprintf( buf, sizeof(buf), log_transfer_in_fmt, + get_physical_time(), order->s_dst, + order->s_amount, order->s_src); + fputs( buf, info->events_log ); + fputs( buf, stdout ); + + Message stopMsg = { + .s_header = { + .s_magic = MESSAGE_MAGIC, + .s_payload_len = 0, + .s_type = ACK, + .s_local_time = get_physical_time() + } + }; + info->balances[ id ] += order->s_amount; + send( &me, PARENT_ID, &stopMsg ); + } + + balanceState.s_balance = info->balances[ id ]; + balanceState.s_time = cur_order_time; + balanceState.s_balance_pending_in = 0; + + history->s_history[ cur_order_time ] = balanceState; + history->s_history_len = cur_order_time + 1; + last_order_time = cur_order_time; + break; + } + case STOP: + running = 0; + break; + } + } + } } -void child_phase3( dist_info_t *info, uint8_t id ) { +void child_phase3( dist_info_t *info, uint8_t id, BalanceHistory *history ) { char buf[MAX_MESSAGE_LEN]; uint8_t i; @@ -65,14 +143,16 @@ void child_phase3( dist_info_t *info, uint8_t id ) { .dist_info = info }; - int size = snprintf( buf, sizeof(buf), log_done_fmt, id ); + int size = snprintf( buf, sizeof(buf), log_done_fmt, + get_physical_time(), id, + info->balances[ id ] ); Message msg = { .s_header = { .s_magic = MESSAGE_MAGIC, .s_payload_len = size + 1, .s_type = DONE, - .s_local_time = 0 + .s_local_time = get_physical_time() } }; @@ -94,12 +174,38 @@ void child_phase3( dist_info_t *info, uint8_t id ) { } } - snprintf( buf, sizeof(buf), log_received_all_done_fmt, id ); + snprintf( buf, sizeof(buf), log_received_all_done_fmt, + get_physical_time(), id ); fputs( buf, info->events_log ); fputs( buf, stdout ); + + int time = get_physical_time(); + for( i = history->s_history_len; i < time; i++ ) { + history->s_history[i].s_balance = history->s_history[ history->s_history_len - 1].s_balance; + history->s_history[i].s_time = i; + } + + history->s_history_len = time; + + int wr_size = history->s_history_len * sizeof(history->s_history[0]); + Message historyMsg = { + .s_header = { + .s_magic = MESSAGE_MAGIC, + .s_payload_len = wr_size, + .s_type = BALANCE_HISTORY, + .s_local_time = time + } + }; + + memcpy( historyMsg.s_payload, history, wr_size ); + send( &me, PARENT_ID, &historyMsg ); } void child_workflow( dist_info_t *info, uint8_t id ) { + BalanceHistory history = { + .s_id = id + }; + /* CHILD PHASE 0: Close all redundant descriptors */ child_phase0(info, id); @@ -108,9 +214,9 @@ void child_workflow( dist_info_t *info, uint8_t id ) { child_phase1(info, id); /* CHILD PHASE 2: Do some useful work */ - child_phase2(info, id); + child_phase2(info, id, &history); /* CHILD PHASE 3: Send DONE message for all, receive DONE messages from * other childs and exit */ - child_phase3(info, id); + child_phase3(info, id, &history); } @@ -2,6 +2,7 @@ #define __IFMO_DISTRIBUTED_CLASS_DIST__H #include "pa23.h" +#include "banking.h" #define MAX_PIPES ( MAX_PROCCNT ) * ( MAX_PROCCNT - 1 ) @@ -9,6 +10,7 @@ typedef struct { int pipes[MAX_PIPES][2]; uint8_t proccnt; uint8_t x; + balance_t balances[MAX_X]; FILE* events_log; FILE* pipes_log; @@ -28,7 +28,8 @@ int send(void * self, local_id dst, const Message * msg) { int wr_dscr = me->dist_info->pipes[idx][1]; size_t wr_size = sizeof(msg->s_header) + msg->s_header.s_payload_len; - if( write( wr_dscr, &msg, wr_size ) != wr_size ) + + if( write( wr_dscr, msg, wr_size ) != wr_size ) return 1; return 0; @@ -1,28 +0,0 @@ -/** - * @file pa1.h - * @Author Michael Kosyakov and Evgeniy Ivanov (ifmo.distributedclass@gmail.com) - * @date March, 2014 - * @brief Constants for programming assignment 1 - * - * Students must not modify this file! - */ - -#ifndef __IFMO_DISTRIBUTED_CLASS_PA1__H -#define __IFMO_DISTRIBUTED_CLASS_PA1__H - -/* %1d - local id, %5d - PID, e.g. - * Process 1 (pid 12341, parent 12340) has STARTED\n - */ -static const char * const log_started_fmt = - "Process %1d (pid %5d, parent %5d) has STARTED\n"; - -static const char * const log_received_all_started_fmt = - "Process %1d received all STARTED messages\n"; - -static const char * const log_done_fmt = - "Process %1d has DONE its work\n"; - -static const char * const log_received_all_done_fmt = - "Process %1d received all DONE messages\n"; - -#endif // __IFMO_DISTRIBUTED_CLASS_PA1__H @@ -46,6 +46,7 @@ int main(int argc, char* argv[]) { for( i = 1; i <= info.x; i++ ) { int balance = atoi(argv[3 + i - 1]); + info.balances[ i ] = balance; } info.proccnt = info.x + 1; @@ -80,11 +81,3 @@ int main(int argc, char* argv[]) { parent_workflow( &info ); return 0; } - -//int main(int argc, char * argv[]) -//{ -// //bank_robbery(parent_data); -// //print_history(all); -// -// return 0; -//} @@ -1,11 +1,45 @@ #include <stdint.h> #include <sys/wait.h> #include <stdlib.h> +#include <string.h> #include "dist.h" #include "ipc.h" #include "banking.h" +void parent_phase4( my_info_t *me, dist_info_t *info ) { + Message msg; + BalanceHistory *b_history; + AllHistory history; + + uint8_t i; + for( i = 1; i <= info->x; i++ ) { + if( !receive( me, i, &msg ) ) { + //TODO: what if s_payload_len more than s_history[i]? + b_history = (BalanceHistory *) msg.s_payload; + memcpy( &history.s_history[i-1], b_history, sizeof( *b_history ) ); + } + } + + history.s_history_len = info->x; + print_history( &history ); +} + +void parent_phase2( my_info_t *me, dist_info_t *info ) { + bank_robbery( me, info->x ); + + Message msg = { + .s_header = { + .s_magic = MESSAGE_MAGIC, + .s_payload_len = 0, + .s_type = STOP, + .s_local_time = get_physical_time() + } + }; + + send_multicast( me, &msg ); +} + void parent_workflow( dist_info_t *info ) { Message msg; uint8_t i, receive_cnt; @@ -30,10 +64,11 @@ void parent_workflow( dist_info_t *info ) { } } } - - //bank_robbery(); + + // PARENT PHASE 2: Bank robbery & send STOP for all childs + parent_phase2( &me, info ); - // PARENT PHASE 2: Receive all DONE messages + // PARENT PHASE 3: Receive all DONE messages for( i = 1; i <= info->x; i++ ) { if( !receive( &me, i, &msg ) ) { if( msg.s_header.s_type != DONE ) { @@ -43,7 +78,10 @@ void parent_workflow( dist_info_t *info ) { } } - // PARENT PHASE 3: Wait for the end of all processes + // PARENT PHASE 4: Receive all BALANCE_HISTORY and print all history + parent_phase4( &me, info ); + + // PARENT PHASE 5: Wait for the end of all processes receive_cnt = info->x; while( receive_cnt != 0 ) { if( (wpid = wait(&status)) > 0 ) { |