import java.util.*; /** software library */ public class comm implements Runnable { private int ID ; /* unique id */ private int CTR; /* to count number of messages sent . will increase monotonically */ private int MID; /* Message id. used for tracking sequence of messages : abcast,abcast_ack,abcast2 */ final static int READ=1; final static int WRITE=2; final static int ABCAST=0; final static int ABCAST_ACK=1; final static int ABCAST2=2; final static int PROBE=3; final static int PROBE_ACK=4; final static int UNDELIVERABLE=0; final static int DELIVERABLE=1; private int[] CLIENT_CTR; private int[] SERVER_ID; private Abcast a; private Message m; private AbcastAck aack; private Abcast2 a2; private CoreMessage cm; private Sender s; private Receiver r; private Hashtable causalCheck = new Hashtable(); /* contains client_id and last message delivered -- both string objects */ private Hashtable causalQueue = new Hashtable(); /* contains "client_id + messagenum" and Message object */ private Hashtable abcastQueue = new Hashtable(); /* contains client_id and Message object */ private Hashtable abcastAckSet = new Hashtable(); /* contains abcast_ack received by the clients sent by the servers */ /* contains "client_id + message_ID" and SetObject */ public comm(int ID) { this.ID=ID; /* initialise this client/server with a unique identitification */ } public void run() { } /* send ABCAST Message */ public void read(int obj_id, String query ) { cm = new CoreMessage(obj_id,query); a=new Abcast(cm,READ); m=new Message(ID,ABCAST,this.getMessageID(),this.mapGid(obj_id),a,this.getCounterValue()); this.send(m,this.mapGid(obj_id)); } /* ----------------------------------------------------------------- */ public synchronized int getCounterValue() { return CTR++; /* vector time of the process */ /* increments every time a message is sent */ } public synchronized int getMessageID() { return MID++; /* message identification */ } public int mapGid(int obj_id) { return 0; /* should return the group id to which object obj_id is associated */ } public AbstractSet mapGidtoSet(int obj_id) { AbstractSet abs = new AbstractSet(); /* should return all elements of the group */ abs.add(new Integer(1)); abs.add(new Integer(2)); abs.add(new Integer(3)); return abs; } /* ------------------------------------------------------------- */ public void send(Message m, int gid) { //s.sendTo(M); // this mechanism will be determined later. // low level interface. /* this primitive has to differentiate between multicast and unicast. both have gid example Absact send and AbcastAck sent */ } /* ------------------------------------------------------------- */ public void write(CoreMessage cm,int operation) { } /* ------------------------------------------------------------- */ /* this will receive all messages and prcess them apropriately */ public void receive(Message m) { switch (m.getmessagetype()) { case ABCAST:{ processAbcast(m);}break; case ABCAST_ACK:{ processAbcastAck(m);}break; case ABCAST2: {processAbcast2(m);} break; case PROBE: {processProbe(m);} break; case PROBE_ACK: {processProbeAck(m);} break; } } /* ------------------------------------------------------------- */ /** server executes this methods */ public void processAbcast(Message m) { /* Phase -I check for causal order */ if (causalCheck.containsKey(""+m.getmessagesource())) { int x = Integer.parseInt((String)causalCheck.get(""+m.getmessagesource())); if( x == m.getmessagenum() - 1) { causalCheck.put(""+m.getmessagesource(),""+(x+1)); /* Phase -II Atomicity and total ordering */ ((Abcast)m.getmessageobj()).setmessagestatus(UNDELIVERABLE); abcastQueue.put(""+m.getmessagesource(),m); sendAbcastAck(m,this.getCounterValue()); //check list of waiting messages in causal_queue check(m); /* recursive call */ } else { causalQueue.put(""+m.getmessagesource()+"#"+m.getmessagenum(),m); } } else { causalCheck.put(""+m.getmessagesource(),""+m.getmessagenum()); /* Phase -II Atomicity and total ordering */ //second phase starts for the first message recived from a client ((Abcast)m.getmessageobj()).setmessagestatus(UNDELIVERABLE); abcastQueue.put(""+m.getmessagesource(),m); sendAbcastAck(m,this.getCounterValue()); } } /* ------------------------------------------------------------- */ public void processAbcast2(Message m) { /* Phase -I check for causal order */ if (causalCheck.containsKey(""+m.getmessagesource())) { int x = Integer.parseInt((String)causalCheck.get(""+m.getmessagesource())); if( x == m.getmessagenum() - 1) { causalCheck.put(""+m.getmessagesource(),""+(x+1)); /* Phase -2 Atomicity and total ordering */ /* data structure to be defined */ /* ((Abcast)m.getmessageobj()).setmessagestatus(DELIVERABLE); abcastQueue.put(""+m.getmessagesource(),m); */ } else { causalQueue.put(""+m.getmessagesource()+"#"+m.getmessagenum(),m); } } else { /* redo it */ causalCheck.put(""+m.getmessagesource(),""+m.getmessagenum()); //second phase starts for the first message recived from a client ((Abcast)m.getmessageobj()).setmessagestatus(UNDELIVERABLE); abcastQueue.put(""+m.getmessagesource(),m); sendAbcastAck(m,CTR); CTR++; } } /* ------------------------------------------------------------- */ public void sendAbcastAck(Message m,int priority) { AbcastAck aack = new AbcastAck(priority); Message mack = new Message(ID,ABCAST_ACK,m.getmessageid(),m.getgid(),aack,priority /*CTR*/); /* here priority and CTR will have the same value */ send(mack,m.getmessagesource()); /* send this message to only one source */ } public void processAbcastAck(Message m) { } public void sendAbcast2(Message m) { } public void processProbe(Message m) { } public void processProbeAck(Message m) { } /* ------------------------------------------------------------- */ public void check(Message m) { /* i think m.getmessageid() should be incremented by 1 */ if(causalQueue.contains(m.getmessagesource()+"#"+(m.getmessagenum()+1))) { causalCheck.put(""+m.getmessagesource(),""+(m.getmessagenum()+1)); //second phase starts Message temp=(Message)causalQueue.get(m.getmessagesource()+"#"+(m.getmessagenum() +1 )); ((Abcast)temp.getmessageobj()).setmessagestatus(UNDELIVERABLE); abcastQueue.put(""+temp.getmessagesource(),temp); // check this later sendAbcastAck(temp,this.getCounterValue()); check(temp); } else { return; } } /* ------------------------------------------------------------- */ }