summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSergey Nazaryev <sergey@nazaryev.ru>2016-05-15 07:41:38 +0000
committerSergey Nazaryev <sergey@nazaryev.ru>2016-05-15 07:41:38 +0000
commitb773e91c55a2a5183931c3a7642ecf84e3720022 (patch)
tree2c9b08b8e6040fffc0df8911a69eb81f645be7bc
parent4f36da5df51ce984201b56b794f04fea8c8e9e9e (diff)
downloadpa3-master.zip
pa3-master.tar.gz
pa3-master.tar.bz2
Workaround for fucking verifierHEADmaster
-rw-r--r--child.c82
-rw-r--r--ipc.c19
-rw-r--r--pa3.c2
3 files changed, 62 insertions, 41 deletions
diff --git a/child.c b/child.c
index e4f1597..e62ad15 100644
--- a/child.c
+++ b/child.c
@@ -10,7 +10,6 @@
void child_phase0( dist_info_t *info, uint8_t id ) {
lamport_init( &gl_lamport_time );
- lamport_increase( &gl_lamport_time );
close_redundant_pipes( info, id );
}
@@ -29,6 +28,7 @@ void child_phase1( dist_info_t *info, uint8_t id ) {
info->parent_pid,
info->balances[ id ] );
fputs( buf, info->events_log );
+ fflush( info->events_log );
fputs( buf, stdout );
lamport_increase( &gl_lamport_time );
@@ -59,7 +59,9 @@ void child_phase1( dist_info_t *info, uint8_t id ) {
snprintf( buf, sizeof(buf), log_received_all_started_fmt, get_lamport_time(), id );
fputs( buf, info->events_log );
+ fflush( info->events_log );
fputs( buf, stdout );
+ fflush( info->events_log );
}
void child_phase2( dist_info_t *info, uint8_t id, BalanceHistory *history ) {
@@ -76,9 +78,11 @@ void child_phase2( dist_info_t *info, uint8_t id, BalanceHistory *history ) {
uint8_t last_order_time = 0;
balanceState.s_balance = info->balances[ id ];
- balanceState.s_time = 0;
+ balanceState.s_time = last_order_time;
balanceState.s_balance_pending_in = 0;
+
history->s_history[ last_order_time ] = balanceState;
+ history->s_history_len = 1;
int running = 1;
while( running ) {
@@ -87,37 +91,35 @@ void child_phase2( dist_info_t *info, uint8_t id, BalanceHistory *history ) {
switch( msg.s_header.s_type ) {
case TRANSFER: {
TransferOrder *order = (TransferOrder *) msg.s_payload;
- uint8_t cur_order_time = get_lamport_time();
-
- for( i = last_order_time + 1; i < cur_order_time; i++ ) {
- balanceState.s_balance = info->balances[ id ];
- balanceState.s_time = i;
- balanceState.s_balance_pending_in = 0;
- history->s_history[ i ] = balanceState;
- }
-
if( order->s_src == id ) {
+ lamport_increase( &gl_lamport_time );
+
+ info->balances[ id ] -= order->s_amount;
+ int oldBp = history->s_history[ history->s_history_len - 1 ].s_balance_pending_in;
+ balanceState.s_balance = info->balances[ id ];
+ balanceState.s_time = get_lamport_time();
+ balanceState.s_balance_pending_in = oldBp + order->s_amount;
+
+ msg.s_header.s_local_time = get_lamport_time();
+ send( &me, order->s_dst, &msg );
+
snprintf( buf, sizeof(buf), log_transfer_out_fmt,
get_lamport_time(), order->s_src,
order->s_amount, order->s_dst );
fputs( buf, info->events_log );
+ fflush( info->events_log );
fputs( buf, stdout );
-
- info->balances[ id ] -= order->s_amount;
-
- //lamport_increase( &gl_lamport_time );
- //msg.s_header.s_local_time = get_lamport_time();
- send( &me, order->s_dst, &msg );
} else if( order->s_dst == id ) {
- snprintf( buf, sizeof(buf), log_transfer_in_fmt,
- get_lamport_time(), order->s_dst,
- order->s_amount, order->s_src);
- fputs( buf, info->events_log );
- fputs( buf, stdout );
-
lamport_increase( &gl_lamport_time );
- Message stopMsg = {
+ info->balances[ id ] += order->s_amount;
+ int oldBp = history->s_history[ history->s_history_len - 1 ].s_balance_pending_in;
+
+ balanceState.s_balance = info->balances[ id ];
+ balanceState.s_time = get_lamport_time();
+ balanceState.s_balance_pending_in = oldBp - order->s_amount;
+
+ Message ackMsg = {
.s_header = {
.s_magic = MESSAGE_MAGIC,
.s_payload_len = 0,
@@ -125,17 +127,28 @@ void child_phase2( dist_info_t *info, uint8_t id, BalanceHistory *history ) {
.s_local_time = get_lamport_time()
}
};
- info->balances[ id ] += order->s_amount;
- send( &me, PARENT_ID, &stopMsg );
+ send( &me, PARENT_ID, &ackMsg );
+
+ snprintf( buf, sizeof(buf), log_transfer_in_fmt,
+ get_lamport_time(), order->s_dst,
+ order->s_amount, order->s_src);
+ fputs( buf, info->events_log );
+ fflush( info->events_log );
+ fputs( buf, stdout );
}
- balanceState.s_balance = info->balances[ id ];
- balanceState.s_time = cur_order_time;
- balanceState.s_balance_pending_in = 0;
+ int cur_time = get_lamport_time();
+
+ history->s_history[ cur_time ] = balanceState;
+ history->s_history_len = cur_time + 1;
- history->s_history[ cur_order_time ] = balanceState;
- history->s_history_len = cur_order_time + 1;
- last_order_time = cur_order_time;
+ for( i = last_order_time + 1; i < cur_time; i++ ) {
+ balanceState = history->s_history[ i-1 ];
+ balanceState.s_time = i;
+ history->s_history[ i ] = balanceState;
+ }
+
+ last_order_time = cur_time;
break;
}
case STOP:
@@ -159,6 +172,7 @@ void child_phase3( dist_info_t *info, uint8_t id, BalanceHistory *history ) {
get_lamport_time(), id,
info->balances[ id ] );
fputs( buf, info->events_log );
+ fflush( info->events_log );
fputs( buf, stdout );
lamport_increase( &gl_lamport_time );
@@ -190,11 +204,13 @@ void child_phase3( dist_info_t *info, uint8_t id, BalanceHistory *history ) {
snprintf( buf, sizeof(buf), log_received_all_done_fmt,
get_lamport_time(), id );
fputs( buf, info->events_log );
+ fflush( info->events_log );
fputs( buf, stdout );
int time = get_lamport_time();
for( i = history->s_history_len; i <= time; i++ ) {
history->s_history[i].s_balance = history->s_history[ history->s_history_len - 1].s_balance;
+ history->s_history[i].s_balance_pending_in = history->s_history[ history->s_history_len - 1].s_balance_pending_in;
history->s_history[i].s_time = i;
}
@@ -208,7 +224,7 @@ void child_phase3( dist_info_t *info, uint8_t id, BalanceHistory *history ) {
.s_magic = MESSAGE_MAGIC,
.s_payload_len = wr_size,
.s_type = BALANCE_HISTORY,
- .s_local_time = time
+ .s_local_time = get_lamport_time()
}
};
memcpy( historyMsg.s_payload, history, wr_size );
diff --git a/ipc.c b/ipc.c
index b208067..71ec9aa 100644
--- a/ipc.c
+++ b/ipc.c
@@ -75,11 +75,13 @@ int receive(void * self, local_id from, Message * msg) {
int descriptor = me->dist_info->pipes[idx][0];
char buf[MAX_MESSAGE_LEN];
- int size;
-
+ int size_hdr, size_payload;
+ char *msgbuf = (char *) msg;
while( 1 ) {
- if( ( size = read( descriptor, buf, sizeof( buf ) ) ) > 0 ) {
- memcpy( msg, buf, sizeof( *msg ) );
+ if( ( size_hdr = read( descriptor, buf, sizeof( MessageHeader ) ) ) > 0 ) {
+ memcpy( msgbuf, buf, size_hdr );
+ size_payload = read( descriptor, buf, msg->s_header.s_payload_len );
+ memcpy( msgbuf + size_hdr, buf, size_payload );
return 0;
}
usleep(100000); // sleep 100 ms
@@ -112,12 +114,15 @@ int receive_any(void * self, Message * msg) {
}
char buf[MAX_MESSAGE_LEN];
- int size;
+ int size_hdr, size_payload;
+ char *msgbuf = (char *) msg;
while( 1 ) {
for( i = 0; i < j; i++ ) {
- if( ( size = read( descriptors[i], buf, sizeof( buf ) ) ) > 0 ) {
- memcpy( msg, buf, sizeof( *msg ) );
+ if( ( size_hdr = read( descriptors[i], buf, sizeof( MessageHeader ) ) ) > 0 ) {
+ memcpy( msgbuf, buf, size_hdr );
+ size_payload = read( descriptors[i], buf, msg->s_header.s_payload_len );
+ memcpy( msgbuf + size_hdr, buf, size_payload );
free( descriptors );
return 0;
}
diff --git a/pa3.c b/pa3.c
index b01dfdc..c7e0ae2 100644
--- a/pa3.c
+++ b/pa3.c
@@ -56,7 +56,7 @@ int main(int argc, char* argv[]) {
// open all (N * (N-1)) pipes for a full mesh topology
for( i = 0; i < info.proccnt * (info.proccnt - 1); i++ ) {
- pipe2( info.pipes[i], O_NONBLOCK | O_DIRECT );
+ pipe2( info.pipes[i], O_NONBLOCK );
fprintf( info.pipes_log, "Opened pipe #%d (%d;%d)\n", i, info.pipes[i][0], info.pipes[i][1] );
}
fclose( info.pipes_log );