codestory

Die Anleitung zu Java BlockingQueue

  1. BlockingQueue
  2. BlockingQueue methods
  3. Zum Beispiel:
  4. drainTo(Collection<? super E>)
  5. drainTo(Collection<? super E>, int)
  6. offer(E, long, TimeUnit)
  7. poll(long, TimeUnit)
  8. put(E e)
  9. remainingCapacity()
  10. take()

1. BlockingQueue

BlockingQueue ist eine Unterinterface von Queue, die zusätzliche Operationen bereitstellt und in Situationen nützlich ist, in denen die Warteschlange leer oder voller Elemente ist.
public interface BlockingQueue<E> extends Queue<E>
Der Unterschied zwischen Queue und BlockingQueue wird durch die bereitgestellten Methoden deutlich:
Interface
Queue<E>
Interface
BlockingQueue<E>
Action
Throws exception
Special value
Blocks
Times out
Insert
boolean add(e)
boolean offer(e)
void put(e)
boolean offer(e, time, unit)
Remove
E remove()
E poll()
E take()
E poll(time, unit)
Examine
E element()
E peek()
take()/poll(time,unit)
Wie wir wissen, geben die Methoden remove(), element(), poll() und peek() der Interface Queue das Element am Anfang der Warteschlange zurück, das entweder sofort eine Ausnahme auslöst oder null zurückgibt, wenn die Warteschlange dies nicht tut beliebige Elemente enthalten. Solche Operationen sind in einer Multithreading-Umgebung nicht gut genug, daher bietet die Interface BlockingQueue neue Methoden take() und poll(time,unit).
  • take(): Gibt das Kopfelement zurück und entfernt es aus der Warteschlange. Wenn die Warteschlange leer ist, wartet die Methode, bis ein Element in der Warteschlange verfügbar ist.
  • poll(timeout,unit): Gibt das Kopfelement zurück und entfernt es aus der Warteschlange. Wenn die Warteschlange leer ist, wartet die Methode, bis ein Element für eine bestimmte Zeit verfügbar ist. Wenn das Timeout ohne verfügbare Elemente endet, gibt die Methode null zurück.
put(e)/offer(e,time,unit)
Die Methoden add(e) und offer(e) der Interface Queue werden verwendet, um der Queue ein Element hinzuzufügen. Sie lösen entweder sofort eine Ausnahme aus oder geben false zurück, wenn die Warteschlange voll ist. Die Interface BlockingQueue bietet die Methoden put(e) und offer(e,timeout,unit) für denselben Zweck, aber sie haben speziellere Funktionen.
  • put(e): Fügt ein Element in die Warteschlange ein. Wenn die Warteschlange voll ist, wartet diese Methode, bis ein Platz zum Einfügen verfügbar ist.
  • offer(e,timeout,unit): Fügt ein Element in die Warteschlange ein. Wenn die Warteschlange voll ist, wartet die Methode für eine bestimmte Zeit auf einen verfügbaren Platz zum Einfügen. Wenn das Timeout ohne verfügbaren Speicherplatz endet, wird keine Aktion ausgeführt und die Methode gibt false zurück.
Die Hierarchie der Klassen und Interface in Bezug auf die Interface BlockingQueue:
Die Eigenschaften von BlockingQueue:
  • BlockingQueue akzeptiert keine Elemente null. Wenn Sie dieser Warteschlange absichtlich ein Element null hinzufügen, wird eine NullPointerException ausgelöst.
  • Die Kapazität einer BlockingQueue kann begrenzt werden. Die Methode remainingCapacity() gibt die verbleibende Kapazität dieser Warteschlange oder Integer.MAX_VALUE zurück, wenn die Kapazität der Warteschlange nicht begrenzt ist.
  • BlockingQueue wird häufig in die Anwendungen Producer & Consumer verwendet. BlockingQueue ist ein Nachkomme der Interface Collection, daher wird auch die Methode remove(e) unterstützt. Solche Verfahren arbeiten jedoch ineffizient und nur für den gelegentlichen Gebrauch. Entfernen Sie beispielsweise ein defektes Produkt aus der Warteschlange.
  • BlockingQueue ist eine Thread-sichere Warteschlange. Alle Warteschlangenmethoden sind atomare Operationen. Von der Interface Collection geerbte Methoden wie addAll, containsAll, retainAll und removeAll sind jedoch nicht unbedingt atomare Operationen, sondern hängen von der Klasse ab, die die Interface BockingQueue implementiert. So könnte beispielsweise der Aufruf von addAll(aCollection) eine Ausnahme auslösen, wenn ein anderer Thread gleichzeitig ein Element zur aCollection hinzufügt.
  • BlockingQueue unterstützt keine Methoden wie "close" oder "shutdown", wenn der Producer beispielsweise ein Signal senden möchte, dass keine weiteren "Produkte" zur Warteschlange hinzugefügt werden. Der Bedarf und die Verwendung dieser Funktionen sind abhängig von der Implementierung. Die Lösung könnte sein: Ein letztes und spezielles "Produkt" wird der Warteschlange hinzugefügt, um dem Consumer mitzuteilen, dass dies das letzte Produkt ist, das der Warteschlange hinzugefügt wird.

Sehen Sie mehr:
  • The concept of Atomic operations in computer science

2. BlockingQueue methods

Liste der Methoden der Interface BlockingQueue<E>:
void put(E e) throws InterruptedException;
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
E take() throws InterruptedException;
E poll(long timeout, TimeUnit unit) throws InterruptedException;
int remainingCapacity();
int drainTo(Collection<? super E> c);  
int drainTo(Collection<? super E> c, int maxElements);
Methoden, die von der Interface Queue<E> geerbt werden
boolean add(E e);
boolean offer(E e);
E remove();
E poll();
E element();
E peek();
Die Methoden, die von der Interface Collection<E> geerbt werden:
int size();
boolean isEmpty();
boolean contains(Object o);
Iterator<E> iterator();
Object[] toArray();
<T> T[] toArray(T[] a);
boolean add(E e);
boolean remove(Object o);
boolean containsAll(Collection<?> c);  
boolean addAll(Collection<? extends E> c);
boolean removeAll(Collection<?> c);
boolean retainAll(Collection<?> c);
void clear();  
boolean equals(Object o);
int hashCode();

default <T> T[] toArray(IntFunction<T[]> generator)  
default boolean removeIf(Predicate<? super E> filter)
default Spliterator<E> spliterator()  
default Stream<E> stream()  
default Stream<E> parallelStream()

3. Zum Beispiel:

Das Modell Producer/Consumer ist ein gutes Beispiel für die Verwendung der Interface BlockingQueue. Von Herstellern erstellte Produkte werden einer Warteschlange hinzugefügt, bevor sie von Verbrauchern herausgenommen werden.
  • Die Threads Producer rufen die Methode BlockingQueue.put(e) auf, um Produkte zu einer BlockingQueue hinzuzufügen. Wenn die Warteschlange voll ist, wartet die Methode put(e), bis Platz verfügbar ist.
  • Die Threads Consumer rufen die Methode BlockingQueue.take() auf, um Produkte aus der Warteschlange abzurufen. Wenn die Warteschlange leer ist, wartet diese Methode, bis das Produkt verfügbar ist.
Sehen Sie sich den vollständigen Code des Beispiels an:
Die Klasse Product simuliert ein Produkt.
Product.java
package org.o7planning.blockingqueue.ex;

public class Product {
    private String name;
    private int serial;

    public Product(String name, int serial) {
        this.name = name;
        this.serial = serial;
    }
    public String getInfo() {
        return "Product: " + this.name + ". Serial: " + this.serial;
    }
}
Die Klasse Consumer simuliert den Verbraucher.
Consumer.java
package org.o7planning.blockingqueue.ex;

import java.util.concurrent.BlockingQueue;

public class Consumer implements Runnable {
    private String consumerName;
    private final BlockingQueue<Product> queue;

    public Consumer(String consumerName, BlockingQueue<Product> q) {
        this.consumerName = consumerName;
        this.queue = q;
    }
    @Override
    public void run() {
        try {
            while (true) {
                this.consume(queue.take());
            }
        } catch (InterruptedException ex) {
        }
    }  
    private void consume(Product x) {
        System.out.println(" --> " + this.consumerName + " >> Consume: " + x.getInfo());
    }
}
Die Klasse Producer simuliert den Producer.
Producer.java
package org.o7planning.blockingqueue.ex;

import java.util.concurrent.BlockingQueue;

public class Producer implements Runnable {
    private static int serial = 1;

    private final String producerName;
    private final BlockingQueue<Product> queue;
    private final int delay; // Seconds

    public Producer(String producerName, int delay, BlockingQueue<Product> q) {
        this.producerName = producerName;
        this.delay = delay;
        this.queue = q;
    }
    @Override
    public void run() {
        try {
            while (true) {
                Thread.sleep(this.delay * 1000); // 'delay' second.
                this.queue.put(this.produce());
            }
        } catch (InterruptedException ex) {
        }
    }
    private Product produce() {
        System.out.println("#" + this.producerName + " >> Create a new product!");
        return new Product("IPhone", serial++);
    }
}
Die Klasse Setup wird verwendet, um das Producer/Consumer-System zu bedienen:
Setup.java
package org.o7planning.blockingqueue.ex;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class Setup {
    public static void main(String[] args) {
        // Create a BlockingQueue with a capacity of 5.
        BlockingQueue<Product> q = new ArrayBlockingQueue<Product>(5);
        Producer producer1 = new Producer("Producer 01", 2, q);
        Producer producer2 = new Producer("Producer 02", 1, q);
        Consumer consumer1 = new Consumer("Consumer 01", q);
        Consumer consumer2 = new Consumer("Consumer 02", q);
        Consumer consumer3 = new Consumer("Consumer 03", q);

        // Starting the threads
        new Thread(producer1).start();
        new Thread(producer2).start();
        new Thread(consumer1).start();
        new Thread(consumer2).start();
        new Thread(consumer3).start();
    }
}
Führen Sie das obige Beispiel aus und Sie erhalten die folgende Ausgabe:
#Producer 02 >> Create a new product!
 --> Consumer 03 >> Consume: Product: IPhone. Serial: 1
#Producer 01 >> Create a new product!
 --> Consumer 01 >> Consume: Product: IPhone. Serial: 2
#Producer 02 >> Create a new product!
 --> Consumer 02 >> Consume: Product: IPhone. Serial: 3
#Producer 02 >> Create a new product!
 --> Consumer 03 >> Consume: Product: IPhone. Serial: 4
#Producer 01 >> Create a new product!
 --> Consumer 01 >> Consume: Product: IPhone. Serial: 5
#Producer 02 >> Create a new product!
 --> Consumer 02 >> Consume: Product: IPhone. Serial: 6
#Producer 02 >> Create a new product!
 --> Consumer 03 >> Consume: Product: IPhone. Serial: 7
...

4. drainTo(Collection<? super E>)

int drainTo(Collection<? super E> c);
Entfernt alle Elemente aus dieser BlockingQueue und fügt sie einer angegebenen Collection hinzu. Die Verwendung dieser Methode ist effizienter als der mehrfache Aufruf von poll() oder remove().
Die Methode drainTo(Collection) stellt sicher, dass entweder alle Elemente erfolgreich in die Collection verschoben werden oder im Fehlerfall keine Elemente in die Collection verschoben werden.

5. drainTo(Collection<? super E>, int)

int drainTo(Collection<? super E> c, int maxElements);
Entfernt bis zu maxElements Elemente aus dieser BlockingQueue und fügt sie einer angegebenen Collection hinzu. Die Verwendung dieser Methode ist effizienter als der mehrfache Aufruf von poll() oder remove().
Tritt ein Fehler auf, wird kein Element aus dieser BlockingQueue entfernt und der Collection wird kein Element hinzugefügt.

6. offer(E, long, TimeUnit)

boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
Fügt ein angegebenes Element in die Warteschlange ein. Wenn die Warteschlange voll ist, wartet die Methode in einer bestimmten Zeit auf einen verfügbaren Platz zum Einfügen. Wenn das Timeout ohne verfügbaren Speicherplatz endet, wird keine Aktion ausgeführt und die Methode gibt false zurück.
Zum Beispiel:
queue.offer(e, 5, TimeUnit.HOURS); // 5 hours.

7. poll(long, TimeUnit)

E poll(long timeout, TimeUnit unit) throws InterruptedException;
Gibt das Kopfelement zurück und entfernt es aus der Warteschlange. Wenn die Warteschlange leer ist, wartet die Methode, bis ein Element in einer bestimmten Zeit verfügbar ist. Wenn das Timeout ohne verfügbare Elemente endet, gibt die Methode null zurück.
Zum Beispiel:
E e = queue.offer(2, TimeUnit.HOURS); // 2 hours

8. put(E e)

void put(E e) throws InterruptedException;
Fügen Sie ein Element in die Warteschlange ein. Wenn die Warteschlange voll ist, wartet diese Methode, bis ein Platz zum Einfügen verfügbar ist.

9. remainingCapacity()

int remainingCapacity();
Gibt die verbleibende Kapazität dieser Warteschlange oder Integer.MAX_VALUE zurück, wenn die Kapazität der Warteschlange nicht begrenzt ist.
Die Klasse ArrayBlockingQueue ermöglicht das Erstellen einer BlockingQueue mit Angabe der maximalen Anzahl von Elementen.

10. take()

E take() throws InterruptedException;
Gibt das Kopfelement zurück und entfernt es aus der Warteschlange. Wenn die Warteschlange leer ist, wartet die Methode, bis ein Element in der Warteschlange verfügbar ist.