/* Hust * 2001 * Channel Independent Fair Queueing: * Its error free scheduling algorithm:Worst-case weighted fair Queueing. * Zeng Kai * All of the code for cifq is in this file. Make sure to * add cifq.o to the Makefile. * See http://www.cs.cmu.edu/~cheeko/wf2q+/ for further details * and references. */ #include #include #include "queue.h" #include "delay.h" #include "ranvar.h" /* helpful functions */ #define max(x,y) (xy)?y:x #define MAXDOUBLE 1000000000000.0 #define DELTA 100.0 /* maximum numbers of flows */ #define MAXFLOWS 5 /* default size of the per-flow queue (in bytes) */ #define DEF_QUEUE_SIZE 10000 /* default flow weight */ #define DEF_FLOW_WEIGHT 0.0 /* when no packet can send ,assume send DELTA on R rate*/ #define R 2000000.0 class LinkDelay; class CIFQ : public Queue { public: CIFQ(); virtual int command(int argc, const char*const* argv); Packet *deque(void); void enque(Packet *pkt); private: Packet *wf2q_deque(void); void wf2q_enque(Packet *pkt); int active(int flowid); /* judge the flow is active or not */ void leave(int flowid); /* when an empty and non-leading session leave,modify other active session's lag */ double sumw(void); /* sum of the weights of the active session */ int cansend(int flowid); /* determine whether packet of flow flowid can be send */ protected: /* flow structure */ struct flowState { PacketQueue q_; /* packet queue associated to the corresponding flow */ int qmaxSize_; /* maximum queue size (in bytes) */ int qcrtSize_; /* current queue size (in bytes) */ double weight_; /* Weight of the flow */ double S_; /* Starting time of flow , not checked for wraparound*/ double F_; /* Ending time of flow, not checked for wraparound */ double lag_; /* identify the difference between real sys and preference sys */ int rate_; /*uniform error rate*/ RandomVariable *ranvar_; /* the underlying random variate generator*/ } fs_[MAXFLOWS]; double V; /* Virtual time , not checked for wraparound!*/ LinkDelay* link_; /* To get the txt time of a packet */ }; static class CIFQClass : public TclClass { public: CIFQClass() : TclClass("Queue/CIFQ") {} TclObject* create(int argc, const char*const* argv) { return (new CIFQ); } } class_cifq; CIFQ::CIFQ() { /* initialize flow's structure */ for (int i = 0; i < MAXFLOWS; ++i) { fs_[i].qmaxSize_ = DEF_QUEUE_SIZE; fs_[i].qcrtSize_ = 0; fs_[i].weight_ = DEF_FLOW_WEIGHT; fs_[i].S_ = 0.0; fs_[i].F_ = 0.0; fs_[i].lag_ = 0.0; //fs_[i].send_ = 1; fs_[i].ranvar_ =NULL; } V = 0.0; link_ = NULL; } /* * entry points from OTcL to set per flow state variables * - $q set-queue-size flow_id flow_queue_size * - $q set-flow-weight flow_id flow_weight * - $q ranvar flowid [new RandomVariable/Uniform] rate * * NOTE: $q represents the discipline queue variable in OTcl. */ int CIFQ::command(int argc, const char*const* argv) { //Tcl& tcl = Tcl::instance(); if (argc == 4) { if (strcmp(argv[1], "set-queue-size") == 0) { int flowId = atoi(argv[2]); if (flowId >= MAXFLOWS) { fprintf(stderr, "CIFQ: Flow id=%d too large; it should be < %d!\n", abort } fs_[flowId].qmaxSize_ = atoi(argv[3]); return (TCL_OK); } else if (strcmp(argv[1], "set-flow-weight") == 0) { int flowId = atoi(argv[2]); if (flowId >= MAXFLOWS) { fprintf(stderr, "CIFQ: Flow id=%d too large; it should be < %d!\n", flowId, MAXFLOWS); abort(); } double flowweight = atof(argv[3]); if (flowweight <= 0) { fprintf(stderr, "CIFQ: Flow Weight=%.4f > 0\n", flowweight); abort(); } fs_[flowId].weight_ = flowweight; return (TCL_OK); } } if (argc==5) { if (strcmp(argv[1], "ranvar") == 0) { int flowId = atoi(argv[2]); if (flowId >= MAXFLOWS) { fprintf(stderr, "CIFQ: Flow id=%d too large; it should be < %d!\n", flowId, MAXFLOWS); abort(); } fs_[flowId].ranvar_ = (RandomVariable*) TclObject::lookup(argv[3]); fs_[flowId].rate_ =atoi(argv[4]); return (TCL_OK); } } if (argc == 3){ if (strcmp(argv[1], "link") == 0) { LinkDelay* del = (LinkDelay*)TclObject::lookup(argv[2]); if (del == 0) { fprintf(stderr, "CIFQ: no LinkDelay object %s\n", argv[2]); return(TCL_ERROR); } // set ptc now link_ = del; return (TCL_OK); } } return (Queue::command(argc, argv)); } /* * identify the flow state * */ int CIFQ::active(int flowid) { if (fs_[flowid].qcrtSize_||(!fs_[flowid].qcrtSize_&&fs_[flowid].lag_<0.0)) return 1; else return 0; } /* * when an empty and non-leading session leave,modify other active session's lag * */ void CIFQ::leave(int flowid) { printf("ok"); int m; for (m=0;m=0.0) leave(m); } } } /* *calculate the sum of the weights of the active session,return the sum * */ double CIFQ::sumw(void) { double sum=0.0; int n; for (n=0;nvalue() : Random::uniform(); return (u >= fs_[flowid].rate_); } /* * Receive a new packet. * * */ void CIFQ::enque(Packet *pkt) { wf2q_enque(pkt); } void CIFQ::wf2q_enque(Packet* pkt) { hdr_cmn* hdr = hdr_cmn::access(pkt); hdr_ip* hip = hdr_ip::access(pkt); int flowId = hip->flowid(); int pktSize = hdr->size(); if (flowId >= MAXFLOWS) { fprintf(stderr, "CIFQ::enqueue-Flow id=%d too large; it should be < %d!\n", flowId, MAXFLOWS); drop(pkt); } /* if buffer full, drop the packet; else enqueue it */ if (fs_[flowId].qcrtSize_ + pktSize > fs_[flowId].qmaxSize_) { /* If the queue is not large enough for this packet */ drop(pkt); } else { if (!fs_[flowId].qcrtSize_) { /* If queue for the flow is empty, calculate start and finish times */ fs_[flowId].S_ = max(V, fs_[flowId].F_); fs_[flowId].F_ = fs_[flowId].S_ + ((double)pktSize/fs_[flowId].weight_); /* the weight_ parameter better not be 0! */ /* update system virutal clock */ double minS = fs_[flowId].S_; for (int i = 0; i < MAXFLOWS; ++i) { if (active(i)) // original wf2q:if (fs_[i].qcrtSize_) if (fs_[i].S_ < minS) minS = fs_[i].S_; } V = max(minS, V); } fs_[flowId].q_.enque(pkt); fs_[flowId].qcrtSize_ += pktSize; } } /* * Dequeue the packet. */ Packet* CIFQ::deque() { Packet *pkt = NULL, *nextPkt; Packet *pktj=NULL, *nextPktj; int i,j; int pktSize; int pktjSize; double minF = MAXDOUBLE; int flow = -1; int maxlagflow= -1; double W = 0.0; double maxlag; /* look for the candidate flow with the earliest finish time */ for (i = 0; i< MAXFLOWS; i++){ if (!active(i)) continue; if (fs_[i].S_ <= V) if (fs_[i].F_ < minF){ flow = i; minF = fs_[i].F_; } } if (flow == -1 || minF == MAXDOUBLE) return (pkt); if (fs_[flow].lag_>=0.0&&cansend(flow)) { pkt = fs_[flow].q_.deque(); pktSize = ((hdr_cmn*)hdr_cmn::access(pkt))->size(); fs_[flow].qcrtSize_ -= pktSize; /* Set the start and the finish times of the remaining packets in the * queue */ nextPkt = fs_[flow].q_.head(); if (nextPkt) { fs_[flow].S_ = fs_[flow].F_; fs_[flow].F_ = fs_[flow].S_ + ((((hdr_cmn*)hdr_cmn::access(nextPkt))->size())/fs_[flow].weight_); /* the weight parameter better not be 0 */ } /* update the virtual clock */ double minS = fs_[flow].S_; for (i = 0; i < MAXFLOWS; ++i) { W += fs_[i].weight_; if (fs_[i].qcrtSize_) if (fs_[i].S_ < minS) minS = fs_[i].S_; } V = max(minS, (V + ((double)pktSize/W))); if (fs_[flow].lag_>=0.0&&!fs_[flow].qcrtSize_) leave (flow); return(pkt); } else /*search for the session that lag most and active and can send*/ { j=0; while(jsize(); fs_[maxlagflow].qcrtSize_ -= pktSize; pkt = fs_[flow].q_.deque(); pktSize = ((hdr_cmn*)hdr_cmn::access(pkt))->size(); fs_[flow].lag_+=pktSize ; fs_[maxlagflow].lag_-=pktjSize; /* update the virtual clock */ double minS = fs_[flow].S_; for (i = 0; i < MAXFLOWS; ++i) { W += fs_[i].weight_; if (fs_[i].qcrtSize_) if (fs_[i].S_ < minS) minS = fs_[i].S_; } V = max(minS, (V + ((double)pktSize/W))); if (!fs_[maxlagflow].qcrtSize_&&fs_[maxlagflow].lag_>+0) leave(maxlagflow); return(pktj); } else/*there is no active session ready to send*/ { /* Set the start and the finish times of the session i */ fs_[flow].S_ = fs_[flow].F_; fs_[flow].F_ = fs_[flow].S_ + (DELTA/fs_[flow].weight_); /* the weight parameter better not be 0 */ /* update the virtual clock */ double minS = fs_[flow].S_; for (i = 0; i < MAXFLOWS; ++i) { W += fs_[i].weight_; if (fs_[i].qcrtSize_) if (fs_[i].S_ < minS) minS = fs_[i].S_; } V = max(minS, (V + (DELTA/W))); pkt = fs_[flow].q_.deque(); pktSize = ((hdr_cmn*)hdr_cmn::access(pkt))->size(); if (fs_[flow].lag_<0.0&&!pktSize)/*flow is leading and unbacklogged*/ /*search for session that lag most and active*/ j=0; while(j=0.0&&!fs_[flow].qcrtSize_) leave (flow); }