codestory

Die Anleitung zu Java TransferQueue

Folge uns auf unserer fanpage, um jedes Mal benachrichtigt zu werden, wenn es neue Artikel gibt. Facebook

1- TransferQueue

Als Unterinterface von BlockingQueue verfügt TransferQueue über alle Funktionen der übergeordneten Interface und bietet darüber hinaus die Möglichkeit, dem Producer zu erlauben, zu warten, bis der Consumer das "Produkt" (Element) erhält. TransferQueue ist in einigen Anwendungstypen nützlich, beispielsweise in Messaging-Anwendungen.

public interface TransferQueue<E> extends BlockingQueue<E>
Im Vergleich zu BlockingQueue<E> bietet TransferQueue<E> einige weitere Methoden, darunter:

void transfer(E e) throws InterruptedException;
boolean tryTransfer(E e);
boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException;
boolean hasWaitingConsumer();
int getWaitingConsumerCount();
transfer(e):
Fügen Sie dieser TransferQueue ein Element hinzu und warten Sie, bis es von einem wartenden Consumer über die Methode BlockingQueue.take() oder BlockingQueue.poll(timeout,unit) empfangen wird.
tryTransfer(e):
Die Methode tryTransfer(e) fügt dieser TransferQueue nur dann ein Element hinzu, wenn ein Consumer darauf wartet, das Element über die Methode BlockingQueue.take() oder BlockingQueue.poll(timeout,unit) zu empfangen, und stellen Sie sicher, dass der Consumer dies erhält Element sofort. Andernfalls gibt die Methode false zurück und es wird keine andere Aktion ausgeführt.
tryTransfer(e, timeout, unit):
Die Methode tryTransfer(e,timeout,unit) fügt dieser TransferQueue nur dann ein Element hinzu, wenn innerhalb einer festgelegten Wartezeit ein Consumer auf den Empfang des Elements über die  Methode BlockingQueue.take() oder BlockingQueue.poll(timeout,unit) wartet und Stellen Sie sicher, dass der Verbraucher dieses Element erhält. Andernfalls gibt die Methode false zurück und es wird keine andere Aktion ausgeführt.

2- TransferQueue methods

Die Methoden werden in der Interface TransferQueue<E> definiert:

void transfer(E e) throws InterruptedException;
boolean tryTransfer(E e);
boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException;
boolean hasWaitingConsumer();
int getWaitingConsumerCount();
Methoden, die von der Interface BlockingQueue<E> geerbt werden:

void put(E e) throws InterruptedException;
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
E take() throws InterruptedException;
E poll(long timeout, TimeUnit unit) throws InterruptedException;
int remainingCapacity();
int drainTo(Collection<? super E> c);  
int drainTo(Collection<? super E> c, int maxElements);
Methoden, die von der Interface Queue<E> geerbt werden:

boolean add(E e);
boolean offer(E e);
E remove();
E poll();
E element();
E peek();
Methoden, die von der Interface Collection<E> geerbt werden:

int size();
boolean isEmpty();
boolean contains(Object o);
Iterator<E> iterator();
Object[] toArray();
<T> T[] toArray(T[] a);
boolean add(E e);
boolean remove(Object o);
boolean containsAll(Collection<?> c);  
boolean addAll(Collection<? extends E> c);
boolean removeAll(Collection<?> c);
boolean retainAll(Collection<?> c);
void clear();  
boolean equals(Object o);
int hashCode();

default <T> T[] toArray(IntFunction<T[]> generator)  
default boolean removeIf(Predicate<? super E> filter)
default Spliterator<E> spliterator()  
default Stream<E> stream()  
default Stream<E> parallelStream()

3- Zum Beispiel:

Im folgenden Beispiel sendet der Producer Nachrichten über die Methode TransferQueue.transfer(e) an den Consumer.
Wenn Sie sich die Ausgabe dieses Beispiels ansehen, sehen Sie Folgendes: Wenn alle Consumer(s) damit beschäftigt sind, die Nachrichten zu konsumieren (was bedeutet, dass sich kein Consumer im Wartezustand befindet), wird die Methode TransferQueue.transfer(e) blockiert (fall in den Wartezustand).
TransferQueue_transfer_ex1.java

package org.o7planning.transferqueue.aa;

import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TransferQueue;

public class TransferQueue_transfer_ex1 {

    public static void main(String[] args) {
        TransferQueue<String> queue = new LinkedTransferQueue<String>();

        queue.add("NORMAL-MESSAGE 1");
        queue.add("NORMAL-MESSAGE 2");
        queue.add("NORMAL-MESSAGE 3");

        Producer producer1 = new Producer(queue);
        Consumer consumer1 = new Consumer(queue);
        Consumer consumer2 = new Consumer(queue);

        // Start..
        producer1.start();
        
        consumer1.start();
        consumer2.start();
    }
}

class Producer extends Thread {
    final TransferQueue<String> queue;
    private static int count = 1;

    Producer(TransferQueue<String> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            while (true) {
                String message = "IMPORTANT-MESSAGE " + count++;
                System.out.println("[PRODUCER] Transfering: " + message);
                this.queue.transfer(message);
                System.out.println("[PRODUCER] Transfered: " + message + " (**)");
            }
        } catch (InterruptedException ex) {
        }
    }
}
class Consumer extends Thread {
    final TransferQueue<String> queue;

    Consumer(TransferQueue<String> queue) {
        this.queue = queue;
    }
    @Override
    public void run() {
        try {
            while (true) {
                String message = queue.take();
                this.longConsume(message);
            }
        } catch (InterruptedException ex) {
        }
    }
    // Need 2 seconds to consume the message.
    private void longConsume(String message) throws InterruptedException  {
        System.out.println(" [CONSUMER] Consuming: " + message);
        Thread.sleep(2 * 1000); // 2 seconds.
        System.out.println(" [CONSUMER] Consumed: " + message);
    }
}
Output:

[PRODUCER] Transfering: IMPORTANT-MESSAGE 1
 [CONSUMER] Consuming: NORMAL-MESSAGE 1
 [CONSUMER] Consuming: NORMAL-MESSAGE 2
 [CONSUMER] Consumed: NORMAL-MESSAGE 1
 [CONSUMER] Consumed: NORMAL-MESSAGE 2
 [CONSUMER] Consuming: NORMAL-MESSAGE 3
 [CONSUMER] Consuming: IMPORTANT-MESSAGE 1
[PRODUCER] Transfered: IMPORTANT-MESSAGE 1 (**)
[PRODUCER] Transfering: IMPORTANT-MESSAGE 2
 [CONSUMER] Consumed: NORMAL-MESSAGE 3
 [CONSUMER] Consuming: IMPORTANT-MESSAGE 2
 [CONSUMER] Consumed: IMPORTANT-MESSAGE 1
[PRODUCER] Transfered: IMPORTANT-MESSAGE 2 (**)
[PRODUCER] Transfering: IMPORTANT-MESSAGE 3
[PRODUCER] Transfered: IMPORTANT-MESSAGE 3 (**)
[PRODUCER] Transfering: IMPORTANT-MESSAGE 4
 [CONSUMER] Consuming: IMPORTANT-MESSAGE 3
 [CONSUMER] Consumed: IMPORTANT-MESSAGE 2
 [CONSUMER] Consuming: IMPORTANT-MESSAGE 4
[PRODUCER] Transfered: IMPORTANT-MESSAGE 4 (**)
[PRODUCER] Transfering: IMPORTANT-MESSAGE 5
 [CONSUMER] Consumed: IMPORTANT-MESSAGE 3
 [CONSUMER] Consuming: IMPORTANT-MESSAGE 5
[PRODUCER] Transfered: IMPORTANT-MESSAGE 5 (**)
...
Das folgende Beispiel zeigt, wie die Methode TransferQueue.tryTransfer(e) verwendet wird. In diesem Beispiel erstellt der Producer Nachrichten und versucht, diese an den wartenden Consumer zu übertragen.
Wenn Sie sich die Ausgabe dieses Beispiels ansehen, werden Sie feststellen, dass viele vom Producer erstellte Nachrichten ignoriert werden, da zum Zeitpunkt des Aufrufs der Methode TransferQueue.tryTransfer(e) kein Consumer wartet.
TransferQueue_tryTransfer_ex1.java

package org.o7planning.transferqueue.bb;

import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TransferQueue;

public class TransferQueue_tryTransfer_ex1 {

    public static void main(String[] args) {
        TransferQueue<String> queue = new LinkedTransferQueue<String>();
        queue.add("NORMAL-MESSAGE 1");
        queue.add("NORMAL-MESSAGE 2");
        queue.add("NORMAL-MESSAGE 3");

        Producer producer1 = new Producer(queue);
        Consumer consumer1 = new Consumer(queue);
        Consumer consumer2 = new Consumer(queue);

        // Start..
        producer1.start();
        consumer1.start();
        consumer2.start();
    }
}
class Producer extends Thread {
    final TransferQueue<String> queue;
    private static int count = 1;

    public Producer(TransferQueue<String> queue) {
        this.queue = queue;
    }
    @Override
    public void run() {
        try {
            while (true) {
                String message = "IMPORTANT-MESSAGE " + count++;
                this.queue.tryTransfer(message); // Calling tryTransfer method.
                Thread.sleep(1 * 1000); // 1 seconds.
            }
        } catch (InterruptedException ex) {
        }
    }
}
class Consumer extends Thread {
    final TransferQueue<String> queue;

    Consumer(TransferQueue<String> queue) {
        this.queue = queue;
    }
    @Override
    public void run() {
        try {
            while (true) {
                String message = queue.take();
                System.out.println(">> " + message);
                Thread.sleep(3 * 1000); // 3 seconds
            }
        } catch (InterruptedException ex) {
        }
    }
}
Output:

>> NORMAL-MESSAGE 1
>> NORMAL-MESSAGE 2
>> NORMAL-MESSAGE 3
>> IMPORTANT-MESSAGE 4
>> IMPORTANT-MESSAGE 7
>> IMPORTANT-MESSAGE 8
>> IMPORTANT-MESSAGE 10
>> IMPORTANT-MESSAGE 11
>> IMPORTANT-MESSAGE 13
>> IMPORTANT-MESSAGE 14
>> IMPORTANT-MESSAGE 16
>> IMPORTANT-MESSAGE 17
Vom Producer erstellte Nachrichten wurden ignoriert:
  • IMPORTANT-MESSAGE 1
  • IMPORTANT-MESSAGE 2
  • IMPORTANT-MESSAGE 3
  • IMPORTANT-MESSAGE 5
  • IMPORTANT-MESSAGE 6
  • ...

4- getWaitingConsumerCount()


int getWaitingConsumerCount();
Gibt die geschätzte Anzahl von Consumern zurück, die darauf warten, ein Element von dieser TransferQueue über die Methode BlockingQueue.take() oder BlockingQueue.poll(timeout,unit) zu empfangen.
Der Rückgabewert ist eine Annäherung an einen momentanen Sachverhalt, der ungenau sein kann, wenn Verbraucher das Warten beendet oder aufgegeben haben. Der Wert kann für Überwachung und Heuristik nützlich sein, jedoch nicht für die Synchronisationssteuerung. Die Implementierung dieser Methode sind wahrscheinlich merklich langsamer als die von hasWaitingConsumer().

5- hasWaitingConsumer()


boolean hasWaitingConsumer();
Gibt true zurück, wenn mindestens ein Consumer darauf wartet, ein Element über die Methode BlockingQueue.take() oder BlockingQueue.poll(timeout,unit) zu empfangen. Der Rückgabewert repräsentiert einen vorübergehenden Zustand.

6- transfer(E)


void transfer(E e) throws InterruptedException;
Fügen Sie dieser TransferQueue ein Element hinzu und warten Sie, bis es von einem wartenden Consumer über die Methode BlockingQueue.take() oder BlockingQueue.poll(timeout,unit) empfangen wird.

7- tryTransfer(E)


boolean tryTransfer(E e);
Die Methode tryTransfer(e) fügt dieser TransferQueue nur dann ein Element hinzu, wenn ein Consumer darauf wartet, das Element über die Methode BlockingQueue.take() oder BlockingQueue.poll(timeout,unit) zu empfangen, und stellen Sie sicher, dass der Consumer dies erhält Element sofort. Andernfalls gibt die Methode false zurück und es wird keine andere Aktion ausgeführt.

8- tryTransfer(E, long, TimeUnit)


boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException;
Die Methode tryTransfer(e,timeout,unit) fügt dieser TransferQueue nur dann ein Element hinzu, wenn innerhalb einer festgelegten Wartezeit ein Consumer auf den Empfang des Elements über die  Methode BlockingQueue.take() oder BlockingQueue.poll(timeout,unit) wartet und Stellen Sie sicher, dass der Verbraucher dieses Element erhält. Andernfalls gibt die Methode false zurück und es wird keine andere Aktion ausgeführt.