codestory

Die Anleitung zu Java Pipe.SourceChannel

  1. Pipe.SourceChannel
  2. Examples

1. Pipe.SourceChannel

Angenommen, Sie entwickeln eine Anwendung Multithreading und haben 2 unabhängige Thread(s), Thread-A und Thread-B. Die Frage ist:
  • Was ist zu tun, damit Daten, die auf Thread-A erscheinen, automatisch auf Thread-B übertragen werden?
Pipe.SinkChannel und Pipe.SourceChannel sind zwei Klassen, die erstellt wurden, um die oben erwähnte Situation zu behandeln. Jedes Mal, wenn Daten in Pipe.SinkChannel geschrieben werden, erscheinen sie automatisch auf Pipe.SourceChannel, dies wird als Pipe-Effekt bezeichnet.
Die Klasse Pipe.SinkChannel ist eine abstrakte Klasse, die innerhalb der Klasse Pipe definiert ist und die Interface WritableByteChannel- und GatheringByteChannel implementiert. Es fungiert als Schreibkanal.
public abstract static class SinkChannel extends AbstractSelectableChannel
                        implements WritableByteChannel, GatheringByteChannel
Die Klasse Pipe.SourceChannel ist eine abstrakte Klasse, die in der Klasse Pipe definiert ist und die Interface ReadableByteChannel und ScatteringByteChannel implementiert. Es fungiert als Lesekanal.
public abstract static class SourceChannel extends AbstractSelectableChannel
                        implements ReadableByteChannel, ScatteringByteChannel

2. Examples

In diesem Beispiel schreiben wir Nachrichten in einen Pipe.SinkChannel (gesteuert von ThreadA). Sie werden automatisch auf dem Pipe.SourceChannel (von ThreadB gesteuert) angezeigt.
Pipe_ex1.java
package org.o7planning.pipe.sourcechannel.ex;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Pipe;

public class Pipe_ex1 {

    public static void main(String[] args) throws IOException, InterruptedException {
        Pipe pipe = Pipe.open();

        ThreadA threadA = new ThreadA(pipe);
        ThreadB threadB = new ThreadB(pipe);

        threadA.start();
        threadB.start();
        threadA.join(); // Waits for this thread to die.
        threadB.join(); // Waits for this thread to die.
        System.out.println();
        System.out.println("Done!");
    }
}

//
class ThreadA extends Thread {
    private Pipe pipe;

    public ThreadA(Pipe pipe) {
        this.pipe = pipe;
    }

    @Override
    public void run() {
        try (Pipe.SinkChannel skChannel = this.pipe.sink();) { // try
            String[] messages = new String[] { "Hello\n", "Hi\n", "Bye\n" };

            ByteBuffer buffer = ByteBuffer.allocate(512);

            for (String msg : messages) {
                // Set position = 0; limit = capacity;
                buffer.clear();
                buffer.put(msg.getBytes("UTF-8"));
                buffer.flip();
                while (buffer.hasRemaining()) {
                    skChannel.write(buffer);
                }
                Thread.sleep(1000);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

//
class ThreadB extends Thread {
    private Pipe pipe;

    public ThreadB(Pipe pipe) {
        this.pipe = pipe;
    }

    @Override
    public void run() {
        try (Pipe.SourceChannel srcChannel = this.pipe.source();) { // try
            ByteBuffer buffer = ByteBuffer.allocate(512);

            while (srcChannel.read(buffer) != -1) {
                buffer.flip(); // Set limit = current position; position = 0;
                ByteArrayOutputStream baos = new ByteArrayOutputStream();

                while (buffer.hasRemaining()) {
                    byte b = buffer.get();
                    if (b != '\n') {
                        baos.write(b);
                    } else {
                        String s = baos.toString("UTF-8");
                        System.out.println(s);
                    }
                }
                buffer.clear(); // Set position =0; limit = capacity;
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
Output: