Home : Course Map : Chapter 9 : Java : Basics :
Piped Streams
JavaTech
Course Map
Chapter 9

Introduction
Overview
Streams
Wrappers,Buffers
Console I/O
  Text Output 
     Demo 1

  Formatter/printf()
     Demo 2

  Tex 2t Input
     Demo 3

  Scanner
     
Demo 4
File Class
  File I/O
  File Output-Text
     Demo 5

  Formatter to File
     Demo 6

  File Input - Text
    Demo 7

  Scanner - Files
     Demo 8

  File I/O - Binary
     Demo 9
   Demo 10
File Chooser Dialog
  Demo 11

Character Codes
  Demo 12
   Demo13
Object I/O
Types to Bytes
Stream Filters
Other I/O Topics
Exercises

    Supplements
Character I/O
  Demo 1   Demo 2
Random Access
  Demo 3
ZIP/GZIP Streams
  Demo 4
Piped Streams
  Demo 5
NIO Framework
More NIO
  Demo 6

     About JavaTech
     Codes List
     Exercises
     Feedback
     References
     Resources
     Tips
     Topic Index
     Course Guide
     What's New

In the I/O examples in Chapter 9: Java we dealt primarily with sources and destinations that include the console, files, or anouther object. Piped streams deal instead with I/O between threads. Think of one thread writing data into one end of a pipe and another thread reading data out of the end of the pipe.

You could do thread I/O yourself by, say, having one thread write to an array and then flag it when it is finished so the second thread could then read from that array. But this is messy and difficult to coordinate. Piped streams allow for the I/O classes to take care of all of this rather forcing you to deal with it.

The tutorial at How to Use Pipe Streams - Sun Java Tutorial provides a simple example of tying three threads together. We modified that example as shown below to illustrate how one might carry out a sequence of analysis steps. The first thread does a calculation on data read from a file and then writes the result of the calculation to a piped output stream. The second thread reads this data from the other end of this pipe and does a second level of calculations on it. (In this case the trivial calculations are for illustrative purposes only.

The piped streams are set up before the threads are created and started. In the main() method, the analysis sequence is directed by this single line of code

 analyzeOut (analyze2 (analyze1 (file_in)));

The file is fed into the input of the first analysis thread, whose output then goes to the second analysis thread. Finally, the output of the latter goes to a static method for sending the output of the second thread to the console.

The first thread is set up and started in the static method analyze1(), which is passed a reference to the input file stream. The first step is to buffer than stream. A PipedWriter is created and then wrapped with a PipedReader. The Analyze1Thread is then created and started. It will write its output to the PipedWriter and that data will be read via the PipedReader that is returned via this method.

    // Buffer the input data.
    BufferedReader buf_in = new BufferedReader (source);

    // Create a pipe for data sent out by the Analyze2Thread
    PipedWriter pipe_out = new PipedWriter ();

    // What goes into the pipe can be read by the other thread via pipe_in
    PipedReader pipe_in = new PipedReader (pipe_out);

    // Start the thread for the second level analysis. It
    // will take data from buf_in and write it into pipe_out.
    new Analyze1Thread  (pipe_out, buf_in).start ();

    // The other thread will be able to read via pipe_in what
    // was written into pipe_out.
    return pipe_in;

The second thread is set up in a similar way in the static method analyze1().

RandomFileTest.java
Analyze1Thread + Analyze2Thread

import java.io.*;
import java.util.*;

/**
  * Demonstrate piped streams. Data is read from file and
  * read and "analysized" by one thread, which in turn passes
  * its result via a pipe to a second thread for further analysis.
  * This thread in turn passes its data via a pipe to a method
  * for output to a file.
**/
public class PipedAnalyzer {

  public static void main(String[] args) throws IOException {

    File file = null;

     // Get the file from the argument line.
    if (args.length > 0) file = new File (args[0]);
    if (file == null) {
      System.out.println ("Input data file required");
      System.exit (0);
    }
    // Create a file reader for the data file.
    FileReader file_in = new FileReader (file);

    // Carry out the two levels of analysis and then
    // write out the results.
    analyzeOut (analyze2 (analyze1 (file_in)));

    // Close the file stream.
    file_in.close();

  } // main

  /** Set up the piped streams for the first thread. Then create and
    * start the thread.
   **/
  public static Reader analyze1 (Reader source)
       throws IOException {

    // Buffer the input data.
    BufferedReader buf_in = new BufferedReader (source);

    // Create a pipe for data sent out by the Analyze1Thread
    PipedWriter pipe_out = new PipedWriter ();

    // What goes into the pipe can be read by the other thread via pipe_in
    PipedReader pipe_in = new PipedReader (pipe_out);

    // Start the thread for the first level analysis. It
    // will take data from buf_in and write it into pipe_out.
    new Analyze1Thread (pipe_out, buf_in).start ();

    // The other thread will be able to read via pipe_in what
    // was written into pipe_out.
    return pipe_in;

  } // analyze1

  /** Set up the piped streams for the second thread. Then create and
    * start the thread.
   **/
  public static Reader analyze2 (Reader source) throws IOException {

    // Buffer the input data.
    BufferedReader buf_in = new BufferedReader (source);

    // Create a pipe for data sent out by the Analyze2Thread
    PipedWriter pipe_out = new PipedWriter ();

    // What goes into the pipe can be read by the other thread via pipe_in
    PipedReader pipe_in = new PipedReader (pipe_out);

    // Start the thread for the second level analysis. It
    // will take data from buf_in and write it into pipe_out.
    new Analyze2Thread  (pipe_out, buf_in).start ();

    // The other thread will be able to read via pipe_in what
    // was written into pipe_out.
    return pipe_in;

  } // analyze2

  /**
    * Read the output from the final analysis step and print to
    * the console.
   **/
  public static void analyzeOut (Reader source) throws IOException {

    // Buffer the input data.
    BufferedReader buf_in = new BufferedReader (source);
    System.out.println ("PipedAnalysis Output:");

    String result;
    while ((result = buf_in.readLine ()) != null)
         System.out.println (result);
    buf_in.close ();

  } // analyzeOut

} // PipedAnalyzer

import java.io.*;
import java.util.*;

/**
  * Helper class for PipedAnalyzer.
  * Illustrates how a first level of analysis
  * would be done on data in a sequence of threads
  * connected by piped streams.
**/
class Analyze1Thread extends Thread {
  private PrintWriter fOut;
  private Scanner fScanner;

  public Analyze1Thread (Writer out, BufferedReader in) {
      fOut = new PrintWriter (out);
      fScanner  = new Scanner (in);
  }

  /** Read through the data until it runs out.
    * Do a calculation upon each value and
    * send the result to the output stream.
   **/
   public void run () {
    double d = 0.0;
    if (fOut != null && fScanner != null) {
      try {
        while (true) {
          d = fScanner.nextDouble ();
          // Send as string to the PrintWriter stream
          fOut.printf ("%12.5f %n",calculate (d));
          fOut.flush ();
        }
      }
      catch (NoSuchElementException nse)
      {} // data finished
      catch (Exception e) {
             System.err.println("Analyzer1Thread error = " + e);
      }
    }
    fOut.close ();
  } // run

  /** Do trivial calculation. **/
  double calculate (double d) {
    return d*d;
  }

} // Analyze1Thread
import java.io.*;
import java.util.*;

/**
  * Helper class for PipedAnalyzer.
  * Illustrates the second level of analysis
  * on data in a sequence of threads connected
  * by piped streams.
**/
class Analyze2Thread extends Thread {
  private PrintWriter fOut;
  private Scanner fScanner;

  public Analyze2Thread (Writer out, BufferedReader in) {
      fOut = new PrintWriter (out);
      fScanner  = new Scanner (in);
  } // ctor

  /** Read through the data until it runs out.
    * Do a calculation upon each value and
    * send the result to the output stream.
   **/
  public void run () {
    double d = 0.0;
    if (fOut != null && fScanner != null) {
      try {
        while (true) {
          d = fScanner.nextDouble ();
          // Print as string to the PrintWriter stream.
          fOut.printf ("%12.5f %n",calculate (d));
          fOut.flush ();
        }
      }
      catch (NoSuchElementException nse)
      {} // data finished
      catch (Exception e) {
             System.err.println("Analyzer1Thread error = " + e);
      }
    }
    fOut.close ();
  } // run

  /** Do trivial calculation. **/
  double calculate (double d) {
    return 2.0 * d;
  }

} // Analyze2Thread

 

We use the input file dataInput.txt, which holds

2.0
3.4
454.0
2.1
13.9
632.34
10
99.99

in the following:

c:> java PipedAnalyzer dataInput.txt

produces the output

PipedAnalysis Output:
     8.00000
    23.12000
412232.00000
     8.82000
   386.42000
799707.75120
   200.00000
 19996.00020

For real data analysi, it would be more likely that you would deal with binary data. It is left to the student to convert the above example to work with binary data in the file and the pipes.

References & Web Resources

 

Most recent update: Oct. 1, 2005

              Tech
Histogram I/O
Hist I/O - Get/Set
  Demo 1
Hist I/O - Objects
  Demo 2
HistogramStream
  Demo 3
Filtering Data
  Demo 4
Exercises

           Physics
Physics Model
Simulation Design
Physics Simulator
  Demo 1
Experiment Design
Experiment Sim.
  Demo 2
Analysis
Expt. + Analysis
  Demo 3
Exercises

  Part I Part II Part III
Java Core 1  2  3  4  5  6  7  8  9  10  11  12 13 14 15 16 17
18 19 20
21
22 23 24
Supplements

1  2  3  4  5  6  7  8  9  10  11  12

Tech 1  2  3  4  5  6  7  8  9  10  11  12
Physics 1  2  3  4  5  6  7  8  9  10  11  12

Java is a trademark of Sun Microsystems, Inc.