diff options
author | Sergey Nazaryev <sergey@nazaryev.ru> | 2016-05-16 13:51:57 +0000 |
---|---|---|
committer | Sergey Nazaryev <sergey@nazaryev.ru> | 2016-05-16 13:51:57 +0000 |
commit | 12e6c17bda6c0245ef50157f7fd55afbd692f2d4 (patch) | |
tree | daec2042c3f5854c401aa9ca4c4ab549a9d3a4b7 | |
parent | b773e91c55a2a5183931c3a7642ecf84e3720022 (diff) | |
download | pa5-12e6c17bda6c0245ef50157f7fd55afbd692f2d4.zip pa5-12e6c17bda6c0245ef50157f7fd55afbd692f2d4.tar.gz pa5-12e6c17bda6c0245ef50157f7fd55afbd692f2d4.tar.bz2 |
Removed unused parts from lab2&3
-rw-r--r-- | bank_robbery.c | 20 | ||||
-rw-r--r-- | banking.c | 39 | ||||
-rw-r--r-- | banking.h | 103 | ||||
-rw-r--r-- | cs.c | 84 | ||||
-rw-r--r-- | queue.c | 87 | ||||
-rw-r--r-- | queue.h | 21 |
6 files changed, 192 insertions, 162 deletions
diff --git a/bank_robbery.c b/bank_robbery.c deleted file mode 100644 index 03bbbbf..0000000 --- a/bank_robbery.c +++ /dev/null @@ -1,20 +0,0 @@ -/** - * @file bank_robbery.c - * @Author Michael Kosyakov and Evgeniy Ivanov (ifmo.distributedclass@gmail.com) - * @date March, 2014 - * @brief Toy implementation of bank_robbery(), don't do it in real life ;) - * - * Students must not modify this file! - */ - -#include "banking.h" - -void bank_robbery(void * parent_data, local_id max_id) -{ - for (int i = 1; i < max_id; ++i) { - transfer(parent_data, i, i + 1, i); - } - if (max_id > 1) { - transfer(parent_data, max_id, 1, 1); - } -} diff --git a/banking.c b/banking.c deleted file mode 100644 index 8ae96ea..0000000 --- a/banking.c +++ /dev/null @@ -1,39 +0,0 @@ -#include <string.h> - -#include "banking.h" -#include "ipc.h" - -#include "dist.h" -#include "lamport.h" - -void transfer(void * parent_data, local_id src, local_id dst, balance_t amount) -{ - my_info_t *me = (my_info_t *) parent_data; - if( me->dist_info == NULL ) - return; - - TransferOrder order = { - .s_src = src, - .s_dst = dst, - .s_amount = amount - }; - - lamport_increase( &gl_lamport_time ); - Message msg = { - .s_header = { - .s_magic = MESSAGE_MAGIC, - .s_payload_len = sizeof( order ), - .s_type = TRANSFER, - .s_local_time = get_lamport_time() - } - }; - memcpy( msg.s_payload, &order, sizeof( order ) ); - - send( me, src, &msg ); - if( !receive( me, dst, &msg ) ) { - lamport_message_handler( &gl_lamport_time, msg.s_header.s_local_time ); - if( msg.s_header.s_type != ACK ) - return; - } - return; -} diff --git a/banking.h b/banking.h deleted file mode 100644 index d4e51b3..0000000 --- a/banking.h +++ /dev/null @@ -1,103 +0,0 @@ -/** - * @file banking.h - * @Author Michael Kosyakov and Evgeniy Ivanov (ifmo.distributedclass@gmail.com) - * @date March, 2014 - * @brief Definitions of data structures and functions related to banking - * - * Students must not modify this file! - */ - -#ifndef __IFMO_DISTRIBUTED_CLASS_BANKING__H -#define __IFMO_DISTRIBUTED_CLASS_BANKING__H - -#include "ipc.h" - -typedef int16_t balance_t; - -/** - * 1. "Main process" sends TransferOrder to process with id=s_src. - * 2. s_src decreases its balance by s_amount and sends TransferOrder to s_dst. - * 3. s_dst increases its balance by s_amount. - * 4. s_dst sends ACK to "main process". - */ -typedef struct { - local_id s_src; ///< transfer from process with this ID - local_id s_dst; ///< transfer to process with this ID - balance_t s_amount; ///< $$$ -} __attribute__((packed)) TransferOrder; - -typedef struct { - balance_t s_balance; - timestamp_t s_time; ///< physical time in PA2 or Lamport's scalar - ///< time in PA3 - balance_t s_balance_pending_in; ///< $$$ sent at t <= s_time, but - ///< received at t > s_time. PA3 only, - ///< in other labs must be 0 -} __attribute__((packed)) BalanceState; - -enum { - MAX_T = 255 ///< max possible value of timestamp returned by get_lamport_time() - ///< or get_physical_time() -}; - -/** - * Describes balance state of process with id=s_id at each time t >= 0 - * and t < s_history_len - */ -typedef struct { - local_id s_id; - uint8_t s_history_len; - BalanceState s_history[MAX_T + 1]; ///< Must be used as a buffer, unused - ///< part of array shouldn't be transfered -} __attribute__((packed)) BalanceHistory; - -/** - * Should contain balance histories of all processes in the distributed system - * except parrent process. - */ -typedef struct { - uint8_t s_history_len; ///< should be equal to the number of children - BalanceHistory s_history[MAX_PROCESS_ID + 1]; -} AllHistory; - -//------------------------------------------------------------------------------ -// Functions below must be implemented by students -//------------------------------------------------------------------------------ - -/** Transfer amount from src to dst. - * - * @param parent_data Any data structure implemented by students to perform I/O - */ -void transfer(void * parent_data, local_id src, local_id dst, balance_t amount); - -//------------------------------------------------------------------------------ -// Functions below are implemented by lector, test implementations are -// provided to students for testing purposes -//------------------------------------------------------------------------------ - -/** Perform a number of transfers between various children with ids [1;max_id] - * - * @param parent_data Any data structure implemented by students to perform I/O, - * will be passed to transfer() - * @param max_id max id of existing process, so that (max_id + 1) is the total - * number of processes - */ -void bank_robbery(void * parent_data, local_id max_id); - -/** - * Returs the value of Lamport's clock. - */ -timestamp_t get_lamport_time(); - -/** Returns physical time. - * - * Emulates physical clock (for each process). - */ -timestamp_t get_physical_time(); - -/** Pretty print for BalanceHistories. - * - */ -void print_history(const AllHistory * history); - -#endif // __IFMO_DISTRIBUTED_CLASS_BANKING__H @@ -0,0 +1,84 @@ +#include "pa2345.h" +#include "queue.h" +#include "dist.h" +#include "lamport.h" + +// TODO: remove global variable +queue_t q = { + .front = NULL, + .rear = NULL +}; + +int request_cs(const void * self) { + my_info_t *me = (my_info_t *) self; + //fprintf( stderr, "REQUEST_CS: Process %d sent message\n", me->id ); + node_t *a; + int replied = 0; + int msg_author = 0; + + lamport_increase( &gl_lamport_time ); + Message msg = { + .s_header = { + .s_magic = MESSAGE_MAGIC, + .s_payload_len = 0, + .s_type = CS_REQUEST, + .s_local_time = get_lamport_time() + } + }; + send_multicast( me, &msg ); + + //fprintf( stderr, "ENQUEUE: Process %d adding to queue\n", me->id ); + enqueue( &q, me->id, get_lamport_time() ); + while( replied < (me->dist_info->workers - 1) || ((a = front(&q)) && a->id != me->id )) { + //fprintf( stderr, "\n\n\nIN CYCLE: Process %d\n (replied = %d, workers = %d)\n\n\n", me->id, replied, me->dist_info->workers-1 ); + if( !receive_any( me, &msg ) ) { + //fprintf( stderr, "RECEIVE_ANY: Process %d\n", me->id ); + lamport_message_handler( &gl_lamport_time, msg.s_header.s_local_time ); + msg_author = me->msg_author; + switch( msg.s_header.s_type ) { + case CS_REQUEST: + //fprintf( stderr, "CS_REQUEST: Process %d received message from %d\n", me->id, msg_author ); + enqueue( &q, msg_author, msg.s_header.s_local_time ); + + lamport_increase( &gl_lamport_time ); + msg.s_header.s_type = CS_REPLY; + msg.s_header.s_local_time = get_lamport_time(); + + send( me, msg_author, &msg ); + break; + case CS_REPLY: + //fprintf( stderr, "\n\n CS_REPLY: Process %d received message from %d\n \n\n", me->id, msg_author ); + replied++; + break; + case CS_RELEASE: + //fprintf( stderr, "CS_RELEASE: Process %d received message from %d\n", me->id, msg_author ); + dequeue( &q ); + break; + case DONE: + //fprintf( stderr, "DONE: Process %d received message from %d\n", me->id, msg_author ); + me->dist_info->workers--; + break; + } + } + } + + return 0; +} + +int release_cs(const void * self) { + my_info_t *me = (my_info_t *) self; + + dequeue( &q ); + + lamport_increase( &gl_lamport_time ); + Message msg = { + .s_header = { + .s_magic = MESSAGE_MAGIC, + .s_payload_len = 0, + .s_type = CS_RELEASE, + .s_local_time = get_lamport_time() + } + }; + send_multicast( me, &msg ); + return 0; +} @@ -0,0 +1,87 @@ +#include <stdlib.h> +#include "queue.h" + +void enqueue(queue_t *q, local_id id, timestamp_t time) { + if( q == NULL ) + return; + + node_t *node = (node_t *) malloc(sizeof(*node)); + node->id = id; + node->time = time; + node->next = NULL; + + if(q->front == NULL && q->rear == NULL){ + q->front = q->rear = node; + return; + } + + node_t *current = q->front; + node_t *previous = NULL; + + while(current != NULL) { + if( current->time > time || + ( current->time == time && id < current->id ) ) { + + node->next = current; + if( previous ) + previous->next = node; + + if( current == q->front ) + q->front = node; + + node = NULL; + break; + } else { + previous = current; + current = current->next; + } + } + + if( node ) { + q->rear->next = node; + q->rear = node; + node = NULL; + } +} + +void dequeue(queue_t *q) { + struct Node* temp = q->front; + if(q->front == NULL) + return; + + if(q->front == q->rear) + q->front = q->rear = NULL; + else + q->front = q->front->next; + + free(temp); +} + +node_t *front(queue_t *q) { + if( q == NULL ) + return NULL; + + return q->front; +} + +queue_t *queue() { + queue_t *q = (queue_t*) malloc(sizeof(*q)); + if( q == NULL ) + return NULL; + + q->front = NULL; + q->rear = NULL; + return q; +} + +void free_queue(queue_t *q) { + if( q == NULL ) + return; + + struct Node* temp = q->front; + while(temp != NULL) { + temp = temp->next; + free(temp); + } + free(q); +} @@ -0,0 +1,21 @@ +#ifndef __IFMO_DISTRIBUTED_CLASS_QUEUE__H +#define __IFMO_DISTRIBUTED_CLASS_QUEUE__H + +#include "ipc.h" + +typedef struct Node { + local_id id; + timestamp_t time; + struct Node* next; +} node_t; + +typedef struct Queue { + struct Node* front; + struct Node* rear; +} queue_t; + +void dequeue(queue_t *q); +void enqueue(queue_t *q, local_id id, timestamp_t time); +node_t *front(queue_t *q); + +#endif |