TSLL.java
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 |
/* Thread safe singly linked list that always contains a dummy item in front. Maintains a front and a back ref (to first and last item). Allows concurrent push and pop (list items are locked individually, so if back ref!= front ref, then push and pop can happen in parallel). Supports isEmpty and wait_pop, tested via Consumer/Producer. */ class TSItem<T> { public T data=null; public TSItem<T> next; } class TSQueue<T> { private TSItem<T> front=null; private TSItem<T> back=null; public TSQueue() { back=new TSItem<T>(); //at least one dummy element is always in the list! front=back; back.next=back; //end always points to itself (data=null) } public T pop_front() throws InterruptedException { synchronized(front) //bock front (may also block back), so as not to race on front/front.data/front.next { //front=front.next; //moved further down to avoid race T data=front.next.data; // get data (current front is dummy) front.next.data=null; // invalidate data (this item might be last and be fetched again) front=front.next; //current front is old or dummy element, move to next item, must be last (next is not locked) return data; } } public void push_back(T data) throws InterruptedException { //create new item (doesn't need lock..do as little inside lock as possible) TSItem<T> end=new TSItem<T>(); end.data=data; end.next=end; synchronized(this) //only required for wait_pop() { //block back (may also block front), so as not to race on back/back.next synchronized(back) { back.next=end; //fix next of new pre last back=end; //store new end } notify(); //tell the world that we have new items } } //additional functionality, for push based interaction public boolean isEmpty() { return front==front.next; } public T wait_pop() throws InterruptedException { while(true)//must retry after notify, someone else might "steal" the item we were notified on, so impl is not "fair" { synchronized(this) //no adding items in here (so that we don't miss notifies send after pop_front but before wait, below) { T data=pop_front(); if(data!=null) return data; else wait(); } } } } class Producer extends Thread { TSQueue<Integer> queue; int count; Producer(TSQueue<Integer> queue,int count) { this.queue=queue; this.count=count; } public void run() { try { for(int i=0;i<count;i++) { Thread.sleep((long)(Math.random()*5+10)); queue.push_back(i); System.out.println("p("+getId()+") "+i); } } catch(InterruptedException ie) {} } } class Consumer extends Thread { TSQueue<Integer> queue; int count; Consumer(TSQueue<Integer> queue,int count) { this.queue=queue; this.count=count; } public void run() { try { for(int i=0;i<count;i++) { Thread.sleep((long)(Math.random()*5+10)); int v=queue.wait_pop(); System.out.println("c("+getId()+") "+v); } } catch(InterruptedException ie) {} } } public class TSLL { public static void main(String args[]) throws InterruptedException { TSQueue<Integer> queue = new TSQueue<Integer>(); Producer p1=new Producer(queue,100);//concurrent push Producer p2=new Producer(queue,100); Consumer c1=new Consumer(queue,100);//concurrent pop Consumer c2=new Consumer(queue,100); c1.start(); c2.start(); p1.start(); p2.start(); c1.join(); c2.join(); } } |