/*
 * Written by Doug Lea with assistance from members of JCP JSR-166
 * Expert Group and released to the public domain, as explained at
 * http://creativecommons.org/licenses/publicdomain
 */


package java.util.concurrent;
import java.util.concurrent.locks.*;
import java.util.*;

/**
 * An unbounded {@linkplain BlockingQueue blocking queue} of <tt>Delayed</tt>
 * elements, in which an element can only be taken when its delay has expired.
 * The <em>head</em> of the queue is that <tt>Delayed</tt> element whose delay
 * expired furthest in the past - if no delay has expired there is no head and
 * <tt>poll</tt> will return <tt>null</tt>.
 * This queue does not permit <tt>null</tt> elements.
 * <p>This class implements all of the <em>optional</em> methods
 * of the {@link Collection} and {@link Iterator} interfaces.
 *
 * <p>This class is a member of the
 * <a href="{@docRoot}/../guide/collections/index.html">
 * Java Collections Framework</a>.
 *
 * @since 1.5
 * @author Doug Lea
 * @param <E> the type of elements held in this collection
 */

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
	implements BlockingQueue<E> {

	private /*@ spec_public rep @*/ transient final ReentrantLock lock = new ReentrantLock();
	private /*@ spec_public rep @*/ transient final Condition available = lock.newCondition();
	private /*@ spec_public rep @*/ final PriorityQueue<E> q = new PriorityQueue<E>();
	
	//@ monitors_for this.q <- lock;

	/**
	 * Creates a new <tt>DelayQueue</tt> that is initially empty.
	 */
	public /*@ atomic @*/ DelayQueue() {}

	/**
	 * Creates a <tt>DelayQueue</tt> initially containing the elements of the
	 * given collection of {@link Delayed} instances.
	 *
	 * @param c the collection
	 * @throws NullPointerException if <tt>c</tt> or any element within it
	 * is <tt>null</tt>
	 *
	 */
	/*@ public normal_behavior
	   @   requires \thread_safe(c) && c != null;
	   @   ensures (\forall E o; c.contains(o); this.q.contains(o));
	   @*/
	public /*@ atomic @*/ DelayQueue(Collection<? extends E> c) {
		this.addAll(c);
	}

	/**
	 * Inserts the specified element into this delay queue.
	 *
	 * @param o the element to add
	 * @return <tt>true</tt>
	 * @throws NullPointerException if the specified element is <tt>null</tt>.
	 */
	/*@ public normal_behavior
	   @   requires o != null;
	   @   locks lock;
	   @   ensures this.q.contains(o);
	   @also
	   @ public exceptional_behavior
	   @   requires o == null;
	   @   signals (Exception npe) npe instanceof NullPointerException;
	   @*/
	public /*@ atomic @*/ boolean offer(E o) {
		final ReentrantLock lock = this.lock;
		lock.lock();
		try {
			E first = q.peek();
			q.offer(o);
			if (first == null || o.compareTo(first) < 0)
				available.signalAll();
			return true;
		} finally {
			lock.unlock();
		}
	}


	/**
	 * Adds the specified element to this delay queue. As the queue is
	 * unbounded this method will never block.
	 * @param o the element to add
	 * @throws NullPointerException if the specified element is <tt>null</tt>.
	 */
	/*@ public normal_behavior
	   @   requires o != null;
	   @   locks lock;
	   @   ensures this.q.contains(o);
	   @also
	   @ public exceptional_behavior
	   @   requires o == null;
	   @   signals (Exception npe) npe instanceof NullPointerException;
	   @*/
	public /*@ atomic @*/ void put(E o) {
		offer(o);
	}

	/**
	 * Inserts the specified element into this delay queue. As the queue is
	 * unbounded this method will never block.
	 * @param o the element to add
	 * @param timeout This parameter is ignored as the method never blocks
	 * @param unit This parameter is ignored as the method never blocks
	 * @return <tt>true</tt>
	 * @throws NullPointerException if the specified element is <tt>null</tt>.
	 */
	/*@ public normal_behavior
	   @   requires o != null;
	   @   locks lock;
	   @   ensures this.q.contains(o);
	   @also
	   @ public exceptional_behavior
	   @   requires o == null;
	   @   signals (Exception npe) npe instanceof NullPointerException;
	   @*/
	public /*@ atomic @*/ boolean offer(E o, long timeout, TimeUnit unit) {
		return offer(o);
	}

	/**
	 * Adds the specified element to this queue.
	 * @param o the element to add
	 * @return <tt>true</tt> (as per the general contract of
	 * <tt>Collection.add</tt>).
	 *
	 * @throws NullPointerException if the specified element is <tt>null</tt>.
	 */
	/*@ public normal_behavior
	   @   requires o != null;
	   @   locks lock;
	   @   ensures this.q.contains(o);
	   @also
	   @ public exceptional_behavior
	   @   requires o == null;
	   @   signals (Exception npe) npe instanceof NullPointerException;
	   @*/
	public /*@ atomic @*/ boolean add(E o) {
		return offer(o);
	}

	/*@ public behavior
	   @   locks lock;
	   @   when this.q.queue[1] != null && this.q.queue[1].getDelay() == 0;
	   @   ensures \old(this.q.contains(\result)) && \result != null;
	   @   signals (Exception ie) ie instanceof InterruptedException;
	   @*/
	public /*@ atomic @*/ E take() throws InterruptedException {
		final ReentrantLock lock = this.lock;
		lock.lockInterruptibly();
		try {
			for (;;) {
				E first = q.peek();
				if (first == null) {
					available.await();
				} else {
					long delay =  first.getDelay(TimeUnit.NANOSECONDS);
					if (delay > 0) {
						long tl = available.awaitNanos(delay);
					} else {
						//@ commit:
						E x = q.poll();
						assert x != null;
						if (q.size() != 0)
							available.signalAll(); // wake up other takers
						return x;

					}
				}
			}
		} finally {
			lock.unlock();
		}
	}

	/*@ public behavior
	   @   locks lock;
	   @   ensures \old(this.q.queue[1] != nul && this.q.queue[1].getDelay() == 0l) ==>
	   @                    \old(this.q.contains(\result)) && \result != null;
	   @   signals (Exception ie) ie instanceof InterruptedException;
	   @*/
	public /*@ atomic @*/ E poll(long time, TimeUnit unit) throws InterruptedException {
		final ReentrantLock lock = this.lock;
		lock.lockInterruptibly();
		long nanos = unit.toNanos(time);
		try {
			for (;;) {
				E first = q.peek();
				if (first == null) {
					if (nanos <= 0)
						return null;
					else
						nanos = available.awaitNanos(nanos);
				} else {
					long delay =  first.getDelay(TimeUnit.NANOSECONDS);
					if (delay > 0) {
						if (delay > nanos)
							delay = nanos;
						long timeLeft = available.awaitNanos(delay);
						nanos -= delay - timeLeft;
					} else {
						E x = q.poll();
						assert x != null;
						if (q.size() != 0)
							available.signalAll();
						return x;
					}
				}
			}
		} finally {
			lock.unlock();
		}
	}

	/*@ public behavior
	   @   locks lock;
	   @   when this.q.queue[1] != null && this.q.queue[1].getDelay() == 0;
	   @   ensures \old(this.q.contains(\result)) && \result != null && this.q.queue[1] != \result;
	   @   signals (Exception ie) ie instanceof InterruptedException;
	   @*/
	public /*@ atomic @*/ E poll() {
		final ReentrantLock lock = this.lock;
		lock.lock();
		try {
			E first = q.peek();
			if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
				return null;
			else {
				//@ commit:
				E x = q.poll();
				assert x != null;
				if (q.size() != 0)
					available.signalAll();
				return x;
			}
		} finally {
			lock.unlock();
		}
	}

	/*@ public normal_behavior
	   @   locks lock;
	   @   ensures \result == this.q.queue[1];
	   @*/
	public /*@ atomic @*/ E peek() {
		final ReentrantLock lock = this.lock;
		lock.lock();
		try {
			return q.peek();
		} finally {
			lock.unlock();
		}
	}

	/*@ public normal_behavior
	   @   locks lock;
	   @   ensures \result == this.q.size;
	   @*/
	public /*@ atomic @*/ int size() {
		final ReentrantLock lock = this.lock;
		lock.lock();
		try {
			return q.size();
		} finally {
			lock.unlock();
		}
	}

	/*@ public normal_behavior
	   @   requires \thread_safe(c) && c != null & c != this;
	   @   locks lock;
	   @   ensures (\forall int i; i >= 1 && i <= this.q.size;
	   @                   c.contains(this.q.queue[i]));
	   @also
	   @ public exceptional_behavior
	   @   requires \thread_safe(c) && c == null;
	   @   signals (Exception npe) npe instanceof NullPointerException;
	   @also
	   @ public exceptional_behavior
	   @   requires \thread_safe(c) && c == this;
	   @   signals (Exception iae) iae instanceof IllegalArgumentException;
	   @*/
	public /*@ atomic @*/ int drainTo(Collection<? super E> c) {
		if (c == null)
			throw new NullPointerException();
		if (c == this)
			throw new IllegalArgumentException();
		final ReentrantLock lock = this.lock;
		lock.lock();
		try {
			int n = 0;
			for (;;) {
				E first = q.peek();
				if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
					break;
				c.add(q.poll());
				++n;
			}
			if (n > 0)
				available.signalAll();
			return n;
		} finally {
			lock.unlock();
		}
	}

	public /*@ atomic @*/ int drainTo(Collection<? super E> c, int maxElements) {
		if (c == null)
			throw new NullPointerException();
		if (c == this)
			throw new IllegalArgumentException();
		if (maxElements <= 0)
			return 0;
		final ReentrantLock lock = this.lock;
		lock.lock();
		try {
			int n = 0;
			while (n < maxElements) {
				E first = q.peek();
				if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
					break;
				c.add(q.poll());
				++n;
			}
			if (n > 0)
				available.signalAll();
			return n;
		} finally {
			lock.unlock();
		}
	}

	/**
	 * Atomically removes all of the elements from this delay queue.
	 * The queue will be empty after this call returns.
	 */
	/*@ public normal_behavior
	   @   locks lock;
	   @   ensures this.q.size == 0;
	   @*/
	public /*@ atomic @*/ void clear() {
		final ReentrantLock lock = this.lock;
		lock.lock();
		try {
			q.clear();
		} finally {
			lock.unlock();
		}
	}

	/**
	 * Always returns <tt>Integer.MAX_VALUE</tt> because
	 * a <tt>DelayQueue</tt> is not capacity constrained.
	 * @return <tt>Integer.MAX_VALUE</tt>
	 */
	//@ ensures \independent && \result == Integer.MAX_VALUE;
	public /*@ atomic @*/ int remainingCapacity() {
		return Integer.MAX_VALUE;
	}

	/*@ public normal_behavior
	   @   locks lock;
	   @   ensures \result.length == this.q.size &&
	   @                 (\forall int i; i >= 1 && i <= this.q.size;
	   @                    \result[i - 1] == this.q.queue[i]);
	   @*/
	public /*@ atomic @*/ Object[] toArray() {
		final ReentrantLock lock = this.lock;
		lock.lock();
		try {
			return q.toArray();
		} finally {
			lock.unlock();
		}
	}

	/*@ public normal_behavior
	   @   requires \thread_safe(array) && array != null; 
	   @   locks lock;
	   @   ensures (\old(array.length) < this.q.size) ==> (array.length == this.q.size &&
	   @                 (\forall int i; i >= 1 && i <= this.q.size;
	   @                    \result[i - 1] == this.q.queue[i]));
	   @                (\old(array.length) >= this.q.size) ==> (array.length == \old(array.length) &&
	   @                 (\forall int i; i >= 1 && i <= this.q.size;
	   @                    \result[i - 1] == this.q.queue[i]));
	   @*/	
	public /*@ atomic @*/ <T> T[] toArray(T[] array) {
		final ReentrantLock lock = this.lock;
		lock.lock();
		try {
			return q.toArray(array);
		} finally {
			lock.unlock();
		}
	}

	/*@ public normal_behavior
	   @   locks lock;
	   @   ensures \old(this.q.contains(o)) ==> \result;
	   @*/
	public /*@ atomic @*/ boolean remove(Object o) {
		final ReentrantLock lock = this.lock;
		lock.lock();
		try {
			return q.remove(o);
		} finally {
			lock.unlock();
		}
	}

	/**
	 * Returns an iterator over the elements in this queue. The iterator
	 * does not return the elements in any particular order. The
	 * returned iterator is a thread-safe "fast-fail" iterator that will
	 * throw {@link java.util.ConcurrentModificationException}
	 * upon detected interference.
	 *
	 * @return an iterator over the elements in this queue.
	 */
	public Iterator<E> iterator() {
		final ReentrantLock lock = this.lock;
		lock.lock();
		try {
			return new Itr(q.iterator());
		} finally {
			lock.unlock();
		}
	}

	private class Itr<E> implements Iterator<E> {
		private final Iterator<E> iter;
		Itr(Iterator<E> i) {
			iter = i;
		}

		public boolean hasNext() {
			return iter.hasNext();
		}

		public E next() {
			final ReentrantLock lock = DelayQueue.this.lock;
			lock.lock();
			try {
				return iter.next();
			} finally {
				lock.unlock();
			}
		}

		public void remove() {
			final ReentrantLock lock = DelayQueue.this.lock;
			lock.lock();
			try {
				iter.remove();
			} finally {
				lock.unlock();
			}
		}
	}

}
