import java.util.*; /** software library */ public class comm { private int port ; /* port on which client will send */ private int ID ; /* unique id . will get from the server at initialization*/ private int CTR; /* to count number of messages sent . will increase monotonically */ private int MID; /* Message id. used for tracking sequence of messages : abcast,abcast_ack,abcast2 */ final static int READ=1; final static int WRITE=2; final static int QUERYRESULT=-1; final static int ABCAST=0; final static int ABCAST_ACK=1; final static int ABCAST2=2; final static int PROBE=3; final static int PROBE_ACK=4; final static int VIEWREQUEST=1111; final static int VIEWRESULT=2222; final static int UNDELIVERABLE=0; final static int DELIVERABLE=1; final static int CLIENT=0; final static int SERVER=1; private Abcast a; private Message m,mab2,newm; private AbcastAck aack; private Abcast2 a2,ab2; private CoreMessage cm; private String v,u,s; Object o; private Hashtable causalCheck; /* contains client_id and last message delivered -- both string objects */ private Hashtable causalQueue ; /* contains "client_id + messagenum" and Message object */ private TreeMap abcastQueue ; /* contains "priority + client_id " and Message object */ private Hashtable abcastAckSet ; /* contains abcast_ack received by the clients sent by the servers */ /* contains "client_id + message_ID" and SetObject */ SetObject so; /* contains priority, HashSet containing unique identity String */ /* each client will have one */ /* an entry will be added when an Abcast message is sent */ HashSet viewset; Hashtable viewcheck; Hashtable viewpriority; /* synchronised methods --- producer consumer */ DataCA dca; /* queue , item produced by comm , consumed by comm */ DataAC dac; /* queue , item produced by App , consumed by comm */ DataCT dct; /* queue , item produced by comm , consumed by transport */ DataTC dtc; /* queue , item produced by transport , consumed by comm */ /* initialization of comm module */ /* this module is called by the application layer */ public comm(int ID, int port, int X, int ReceiverType, DataAC dac, DataCA dca) // int X is boolean to differentiate between client and server. { this.ID=ID; /* initialise this client/server with a unique identitification */ this.CTR=1; this.MID=1; causalCheck = new Hashtable(); causalQueue = new Hashtable(); abcastQueue = new TreeMap(new CompareString()); abcastAckSet = new Hashtable(); this.dac=dac; this.dca=dca; dtc=new DataTC(); dct=new DataCT(); viewset=new HashSet(); viewcheck = new Hashtable(); viewpriority = new Hashtable(); Thread sender=new Thread(new Send(port,dct)); /* instantiate the sender , which will open the send port */ /* instantiate the receiver , which will open the receive port at address (port + 1) from listening */ /* threee types of receivers : */ /* point to point receive ,multicast receive , and both multicast and point to point */ if (ReceiverType==0) { Thread t=new Thread(new ReceiveP2P((port+1),dtc)); t.start(); } else if (ReceiverType==1) { Thread t=new Thread(new ReceiveMulticast(dtc)); t.start(); } else { Thread t1=new Thread(new ReceiveP2P((port+1),dtc)); Thread t2=new Thread(new ReceiveMulticast(dtc)); t1.start(); t2.start(); } sender.start(); /* start sender thread : single port for all types ( multicast and point to point ) */ /* these threads start two threads of the commm module */ /* one will consume from the buffer produced by the application layer */ /* other will consume from the buffer produced by the transport layer below */ if (ReceiverType==0) { this.init(); //for client only! } Thread cfa=new Thread(new ConsumeFromApp(dac,this)); cfa.start(); Thread cfd=new Thread(new ConsumeFromT(dtc,this)); cfd.start(); } /* ----------------------------------------------------------------- */ public synchronized int getCounterValue() { return CTR++; /* vector time of the process */ /* increments every time a message is sent */ } /* ----------------------------------------------------------------- */ public synchronized int getMessageID() { return MID++; /* message identification */ } /* ----------------------------------------------------------------- */ /* Application layer will issue a read request,by passing an instance of CoreMessage to the comm layer */ public void processMessage(CoreMessage cm) { try { // Debug.dump("Processing CoreMessage "); if(cm.getType() == READ) { /* these lines are added just to change the order of messages */ /* ------------------------------------------------------------------------- int qwc = this.getCounterValue(); if (qwc == 2) { qwc = 3 ; } else if (qwc == 3) { qwc = 2 ; }; int qwm = this.getMessageID(); if (qwm == 2) { qwm = 3 ; } else if (qwm == 3) { qwm = 2 ; }; ------------------------------------------------------------------------- */ a=new Abcast(cm,READ); m=new Message(ID,ABCAST,this.getMessageID(),0,a,this.getCounterValue(),-1,0); // UNCOMENT IT LATER /* ------------------------------------------------------------------------- m=new Message(ID,ABCAST,qwm,0,a,qwc,-1,0); // comment it later ------------------------------------------------------------------------- */ HashSet tempviewset=(HashSet)viewset.clone(); Iterator itr=viewset.iterator(); while(itr.hasNext()) { tempviewset.add(itr.next()); } viewcheck.put(""+m.getmessageid(),tempviewset); viewpriority.put(""+m.getmessageid(),""+0); /* populate the hash set , for later processing */ dct.Produce(m); } else /* ) */ { /* include WRITE OPERATION */ } } catch(Exception e) { System.out.println("ProcessMessage: "+e); } } /** New method added to enable server to send independent messages*/ void processServerMessage(Message m) { dct.Produce(m); } /* ------------------------------------------------------------- */ /* read write primitives invoked by this layer and passed to the lower layer */ /** server executes this methods */ public void processAbcast(Message m) { try { /* Phase -I check for causal order */ // Debug.dump("Causal Check"); if (causalCheck.containsKey(""+m.getmessagesource())) { int x = Integer.parseInt((String)causalCheck.get(""+m.getmessagesource())); if( x == m.getmessagenum() - 1) { // System.out.println( "message in order "); causalCheck.put(""+m.getmessagesource(),""+(x+1)); // Phase -II Atomicity and total ordering ((Abcast)m.getmessageobj()).setmessagestatus(UNDELIVERABLE); u = new String (0+"A"+m.getmessagesource()+"A"+m.getmessageid()); // 0 stands for initial proiority , which will be changed later abcastQueue.put(u,m); // change it later // System.out.println(abcastQueue.size()); sendAbcastAck(m,this.getCounterValue()); //check list of waiting messages in causal_queue check(m); // recursive call } else { //System.out.println( "message out of order "); causalQueue.put(""+m.getmessagesource()+"A"+m.getmessagenum(),m); } } else { causalCheck.put(""+m.getmessagesource(),""+m.getmessagenum()); // Phase -II Atomicity and total ordering //second phase starts for the first message recived from a client ((Abcast)m.getmessageobj()).setmessagestatus(UNDELIVERABLE); u = new String (0 +"A"+ m.getmessagesource()+"A"+m.getmessageid()); // System.out.println(u); abcastQueue.put(u,m); // change it later sendAbcastAck(m,this.getCounterValue()); } } catch(Exception e) { System.out.println("processAbcast :"+e); } } /* ------------------------------------------------------------- */ public void check(Message m) { try { // System.out.println("causal queue check entered "); if(causalQueue.containsKey(m.getmessagesource()+"A"+(m.getmessagenum()+1))) { // System.out.println("if of causal check entered "); causalCheck.put(""+m.getmessagesource(),""+(m.getmessagenum()+1)); //second phase starts Message temp=(Message)causalQueue.get(m.getmessagesource()+"A"+(m.getmessagenum() +1 )); ((Abcast)temp.getmessageobj()).setmessagestatus(UNDELIVERABLE); u = new String (0+"A"+temp.getmessagesource()+"A"+temp.getmessageid()); // System.out.println("check function"+u); abcastQueue.put(u ,temp); // check this later sendAbcastAck(temp,this.getCounterValue()); check(temp); } else { return; } } catch(Exception e) { System.out.println("Error in check :"+e); } } /* ------------------------------------------------------------- */ public void processAbcast2(Message m) { try { /* Phase -I check for causal order */ System.out.println("abcast2 received"); if (causalCheck.containsKey(""+m.getmessagesource())) { int x = Integer.parseInt((String)causalCheck.get(""+m.getmessagesource())); // System.out.println("processab2 "+x); if( x == (m.getmessagenum() - 1)) { causalCheck.put(""+m.getmessagesource(),""+(x+1)); // Phase -2 Atomicity and total ordering v = new String(0+"A"+m.getmessagesource()+"A"+m.getmessageid()); /* 0 stands for priority */ /* v = new String( ((Abcast)newm.getmessageobj()).getpriority() + "A"+ m.getmessagesource() + "A" + m.getmessageid() ); */ //System.out.println(abcastQueue.containsKey(v)); //System.out.println(abcastQueue.size()+"Size in pab2"); //System.out.println(v+abcastQueue.containsKey(v)); newm = (Message)abcastQueue.remove(v); // System.out.println(v+abcastQueue.containsKey(v)); //System.out.println("enter1"); ((Abcast)newm.getmessageobj()).setpriority( ((Abcast2)m.getmessageobj()).getpriority()); //System.out.println("enter2"); ((Abcast)newm.getmessageobj()).setmessagestatus(DELIVERABLE); //System.out.println("enter3"); abcastQueue.put(v,newm); //System.out.println("enter4"); processAbcastQueue(); } else { causalQueue.put(""+m.getmessagesource()+"A"+m.getmessagenum(),m); } } else { causalCheck.put(""+m.getmessagesource(),""+m.getmessagenum()); //second phase starts for the first message recived from a client v = new String(0+"A"+ m.getmessagesource() + "A" + m.getmessageid()); newm = (Message)abcastQueue.remove(v); ((Abcast)newm.getmessageobj()).setpriority( ((Abcast2)m.getmessageobj()).getpriority()); ((Abcast)newm.getmessageobj()).setmessagestatus(DELIVERABLE); /* v = new String( ((Abcast)newm.getmessageobj()).getpriority() + "A"+ m.getmessagesource() + "A" + m.getmessageid() ); */ abcastQueue.put(v,newm); processAbcastQueue(); } } catch(Exception e) { System.out.println("processAbcast2 :"+e); } } public synchronized void processAbcastQueue() { try { /* to deliver message whose status bit is "DELIVERABLE" */ boolean flag = true; Message dm; while (flag) { // Debug.dump(" this abcast2 is going to be sent to the app layer "); //System.out.println(abcastQueue.size()+"abcast que"); if(!abcastQueue.isEmpty()) { dm = (Message) abcastQueue.get(abcastQueue.lastKey()); // System.out.println("here"); if( ((Abcast)dm.getmessageobj()).getmessagestatus() == DELIVERABLE ) { // dm = (Message) abcastQueue.remove(abcastQueue.lastKey()); //System.out.println("enter 1"); dca.Produce((Message) abcastQueue.remove(abcastQueue.lastKey()) ); // System.out.println("Message delivered to App layer"+abcastQueue.size()); // flag=false; //#####################JEKKIN#############IS THIS CORRECT } else //potential problem { flag=false; //System.out.println("problemo"); } } else { flag=false; } } } catch(Exception e) { System.out.println("processAbcastQueue :"+e); } } /* ------------------------------------------------------------- */ public void sendAbcastAck(Message m,int priority) { try { // System.out.println("Abcast_aCk sent"); AbcastAck aack = new AbcastAck(priority); Message mack = new Message(ID,ABCAST_ACK,m.getmessageid(),m.getgid(),aack,priority,m.getmessagesource(),-1); /* here priority and CTR will have the same value */ dct.Produce(mack); // Debug.dump(DataSR_comm.Comm_SR," Abcast Ack added to queue "); /* send this message to only one source */ } catch(Exception r) { System.out.println("sendAbcastAck :"+r); } } public void processAbcastAck(Message m) { int currentp,ackp,currentmax; try { // s = new String(m.getmessagesource()+"A"+m.getmessageid()); // PROBLEM AREA System.out.println("AbcastAck received-Viewsetsize: "+viewset.size()); ((HashSet)viewcheck.get(""+m.getmessageid())).remove(""+m.getmessagesource()); currentp=Integer.parseInt((String)viewpriority.get(""+m.getmessageid())); ackp=((AbcastAck)(m.getmessageobj())).getpriority(); viewpriority.put(""+m.getmessageid(),""+Math.max(currentp,ackp)); currentmax=Math.max(currentp,ackp); if(((HashSet)viewcheck.get(""+m.getmessageid())).size()==0) { Abcast2 ab2 = new Abcast2(currentmax); ab2 = new Abcast2(1); // 1 is the priority mab2 = new Message(ID,ABCAST2,m.getmessageid(),0,ab2,this.getCounterValue(),-1,0); //gid is hard-coded as 0 System.out.println("ABCAST2 sent"); dct.Produce(mab2); // Debug.dump(DataSR_comm.Comm_SR, " after adding ABCAST2 to comm->SR queue "); } } catch(Exception e) { System.out.println("processAbacstAck :"+e); } } void init() { boolean ontime=true; m = new Message(ID,VIEWREQUEST,0,0,"VIEWREQUEST",0,-1,0); dct.Produce(m); try { while(ontime || !dtc.TC.isEmpty()) { Message m=(Message)dtc.Consume(); processViewResult(m); Thread.sleep(2000); ontime=false; } } catch(Exception e) { System.out.println(e); } } public void processProbe(Message m) { } public void processProbeAck(Message m) { } public void processViewRequest(Message m) { m.setmessagetype(VIEWRESULT); m.setmessagedest(m.getmessagesource()); m.setmessagesource(this.ID); m.setmessagegrp(-1); dct.Produce(m); } public void processViewResult(Message m) { viewset.add(""+m.getmessagesource()); System.out.println(viewset.size()); } }