/*
  File: LinkedQueue.java

  Originally written by Doug Lea and released into the public domain.
  This may be used for any purposes whatsoever without acknowledgment.
  Thanks for the assistance and support of Sun Microsystems Labs,
  and everyone contributing, testing, and using this code.

  History:
  Date       Who                What
  11Jun1998  dl               Create public version
  25aug1998  dl               added peek
  10dec1998  dl               added isEmpty
  10oct1999  dl               lock on node object to ensure visibility
*/

package linkedqueue;

/**
 * A linked list based channel implementation.
 * The algorithm avoids contention between puts
 * and takes when the queue is not empty. 
 * Normally a put and a take can proceed simultaneously. 
 * (Although it does not allow multiple concurrent puts or takes.)
 * This class tends to perform more efficently than
 * other Channel implementations in producer/consumer
 * applications.
 * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>]
 **/

public class LinkedQueue {

  //@ instance invariant head != null;
  //@ instance invariant last != null;
  //@ instance invariant putLock != null;
  //@ instance invariant waitingForTake >= 0;
  //@ instance invariant \reach(head).has(last);

  /** 
   * Dummy header node of list. The first actual node, if it exists, is always 
   * at head_.next. After each take, the old first node becomes the head.
   **/
  protected /*@ spec_public rep @*/ LinkedNode head_;         

  /**
   * Helper monitor for managing access to last node.
   **/
  protected /*@ spec_public rep @*/ final Object putLock_ = new Object(); 

  /** 
   * The last node of list. Put() appends to list, so modifies last_
   **/
  protected /*@ spec_public rep @*/ LinkedNode last_;         

  /**
   * The number of threads waiting for a take.
   * Notifications are provided in put only if greater than zero.
   * The bookkeeping is worth it here since in reasonably balanced
   * usages, the notifications will hardly ever be necessary, so
   * the call overhead to notify can be eliminated.
   **/
  protected /*@ spec_public rep @*/ int waitingForTake_ = 0;  

  /*@  behavior
     @   assignable head, last, putLock, waitingForTake;
     @   ensures \fresh(head, putLock) && head.next == null;
     @*/
  public /*@ atomic @*/ LinkedQueue() {
	head_ = new LinkedNode(null); 
	last_ = head_;
  }

  /*@ behavior
     @   requires x != null;
     @   assignable last, last.next;
     @   ensures last.value == x && \fresh(last);
     @*/
  protected /*@ atomic @*/ void insert(Object x) { 
	synchronized(putLock_) {
	  LinkedNode p = new LinkedNode(x);
	  synchronized(last_) {
		last_.next = p;
		last_ = p;
	  }
	  if (waitingForTake_ > 0)
		putLock_.notify();
	}
  }

  /** Main mechanics for take/poll **/
  
  /*@ behavior
     @   assignable head, head.next.value;
     @   ensures \result == null || (\exists LinkedNode n;
     @        \old(\reach(head)).has(n); n.value == \result && !(\reach(head).has(n)));
     @*/
  protected /*@ atomic @*/ synchronized Object extract() {
	synchronized(head_) {
	  Object x = null;
	  LinkedNode first = head_.next;
	  if (first != null) {
		x = first.value;
		first.value = null;
		head_ = first; 
	  }
	  return x;
	}
  }

  /*@ public behavior
     @    requires \thread_safe(x) && x != null;
     @    ensures last.value == x;
     @also
     @ public behavior
     @    requires \thread_safe && x == null;
     @    signals (Exception e) e instanceof IllegalArgumentException;
     @*/
  public /*@ atomic @*/ void put(Object x) {
	if (x == null) throw new IllegalArgumentException();
	insert(x); 
  }

  /*@ public behavior
     @   locks this.putLock_;
     @   when !this.isEmpty();
	 @   ensures \result == \old(head.next.value) && \old(head.next).value == null;
	 @   signals (Exception ie) ie instanceof InterruptedException;
     @*/
  public /*@ atomic @*/ Object take() {
	Object x = extract();
	if (x != null)
	  return x;
	else { 
	  synchronized(putLock_) {
		try {
		  ++waitingForTake_;
		  for (;;) {
			/*@ commit: @*/ x = extract();
			if (x != null) {
			  --waitingForTake_;
			  return x;
			}
			else {
			  putLock_.wait(); 
			}
		  }
		}
		catch(InterruptedException ex) { 
		  --waitingForTake_; 
		  putLock_.notify();
		  throw new RuntimeException(); 
		}
	  }
	}
  }

  /*@ public behavior
     @   requires head.next != null;
     @   ensures \result == head.next;
     @also
     @ public behavior
     @   requires head.next == null;
     @   ensures \result == null;
     @*/
  public /*@ atomic @*/ Object peek() {
	synchronized(head_) {
	  LinkedNode first = head_.next;
	  if (first != null) 
		return first.value;
	  else 
		return null;
	}
  }    

  /*@ public behavior
     @   ensures \result <==> head.next == null;
     @*/
  public /*@ atomic @*/ boolean isEmpty() {
	synchronized(head_) {
	  return head_.next == null;
	}
  }    

}


