In the I/O examples in Chapter
9: Java we dealt primarily with sources and destinations that
include the console, files, or anouther object. Piped streams deal
instead with I/O between threads. Think of one thread writing data
into one end of a pipe and another thread reading data out of the
end of the pipe.
You could do thread I/O yourself by, say, having one thread write
to an array and then flag it when it is finished so the second thread
could then read from that array. But this is messy and difficult
to coordinate. Piped streams allow for the I/O classes to take care
of all of this rather forcing you to deal with it.
The tutorial at How
to Use Pipe Streams - Sun Java Tutorial provides a simple example
of tying three threads together. We modified that example as shown
below to illustrate how one might carry out a sequence of analysis
steps. The first thread does a calculation on data read from a file
and then writes the result of the calculation to a piped output
stream. The second thread reads this data from the other end of
this pipe and does a second level of calculations on it. (In this
case the trivial calculations are for illustrative purposes only.
The piped streams are set up before the threads are created and
started. In the main() method, the analysis
sequence is directed by this single line of code
analyzeOut
(analyze2 (analyze1 (file_in)));
The file is fed into the input of the first analysis thread, whose
output then goes to the second analysis thread. Finally, the output
of the latter goes to a static method for sending the output of
the second thread to the console.
The first thread is set up and started in the static method analyze1(),
which is passed a reference to the input file stream. The first
step is to buffer than stream. A PipedWriter
is created and then wrapped with a PipedReader.
The Analyze1Thread is then created and
started. It will write its output to the PipedWriter
and that data will be read via the PipedReader
that is returned via this method.
//
Buffer the input data.
BufferedReader buf_in = new BufferedReader
(source);
// Create a pipe for data sent out by the
Analyze2Thread
PipedWriter pipe_out = new PipedWriter ();
// What goes into the pipe can be read by
the other thread via pipe_in
PipedReader pipe_in = new PipedReader (pipe_out);
// Start the thread for the second level
analysis. It
// will take data from buf_in and write
it into pipe_out.
new Analyze1Thread (pipe_out,
buf_in).start ();
// The other thread will be able to read
via pipe_in what
// was written into pipe_out.
return pipe_in;
The second thread is set up in a similar way in the static method
analyze1().
RandomFileTest.java
Analyze1Thread
+ Analyze2Thread
|
import
java.io.*;
import java.util.*;
/**
* Demonstrate piped streams. Data is read
from file and
* read and "analysized" by one thread, which
in turn passes
* its result via a pipe to a second thread
for further analysis.
* This thread in turn passes its data via
a pipe to a method
* for output to a file.
**/
public class PipedAnalyzer {
public static void main(String[] args) throws
IOException {
File file = null;
// Get the file from the argument
line.
if (args.length > 0) file = new
File (args[0]);
if (file == null) {
System.out.println
("Input data file required");
System.exit (0);
}
// Create a file reader for the
data file.
FileReader file_in = new FileReader
(file);
// Carry out the two levels of
analysis and then
// write out the results.
analyzeOut (analyze2 (analyze1
(file_in)));
// Close the file stream.
file_in.close();
} // main
/** Set up the piped streams for the first
thread. Then create and
* start the thread.
**/
public static Reader analyze1 (Reader source)
throws IOException
{
// Buffer the input data.
BufferedReader buf_in = new BufferedReader
(source);
// Create a pipe for data sent
out by the Analyze1Thread
PipedWriter pipe_out = new PipedWriter
();
// What goes into the pipe can
be read by the other thread via pipe_in
PipedReader pipe_in = new PipedReader
(pipe_out);
// Start the thread for the first
level analysis. It
// will take data from buf_in
and write it into pipe_out.
new Analyze1Thread (pipe_out,
buf_in).start ();
// The other thread will be able
to read via pipe_in what
// was written into pipe_out.
return pipe_in;
} // analyze1
/** Set up the piped streams for the second
thread. Then create and
* start the thread.
**/
public static Reader analyze2 (Reader source)
throws IOException {
// Buffer the input data.
BufferedReader buf_in = new BufferedReader
(source);
// Create a pipe for data sent
out by the Analyze2Thread
PipedWriter pipe_out = new PipedWriter
();
// What goes into the pipe can
be read by the other thread via pipe_in
PipedReader pipe_in = new PipedReader
(pipe_out);
// Start the thread for the second
level analysis. It
// will take data from buf_in
and write it into pipe_out.
new Analyze2Thread (pipe_out,
buf_in).start ();
// The other thread will be able
to read via pipe_in what
// was written into pipe_out.
return pipe_in;
} // analyze2
/**
* Read the output from the final
analysis step and print to
* the console.
**/
public static void analyzeOut (Reader source)
throws IOException {
// Buffer the input data.
BufferedReader buf_in = new BufferedReader
(source);
System.out.println ("PipedAnalysis
Output:");
String result;
while ((result = buf_in.readLine
()) != null)
System.out.println
(result);
buf_in.close ();
} // analyzeOut
} // PipedAnalyzer
|
import
java.io.*;
import java.util.*;
/**
* Helper class for PipedAnalyzer.
* Illustrates how a first level of analysis
* would be done on data in a sequence of threads
* connected by piped streams.
**/
class Analyze1Thread extends Thread {
private PrintWriter fOut;
private Scanner fScanner;
public Analyze1Thread (Writer out, BufferedReader
in) {
fOut = new PrintWriter
(out);
fScanner =
new Scanner (in);
}
/** Read through the data until it runs out.
* Do a calculation upon each value
and
* send the result to the output stream.
**/
public void run () {
double d = 0.0;
if (fOut != null && fScanner != null)
{
try {
while (true)
{
d
= fScanner.nextDouble ();
//
Send as string to the PrintWriter stream
fOut.printf
("%12.5f %n",calculate (d));
fOut.flush
();
}
}
catch (NoSuchElementException
nse)
{} // data finished
catch (Exception e) {
System.err.println("Analyzer1Thread error = " + e);
}
}
fOut.close ();
} // run
/** Do trivial calculation. **/
double calculate (double d) {
return d*d;
}
} // Analyze1Thread |
import
java.io.*;
import java.util.*;
/**
* Helper class for PipedAnalyzer.
* Illustrates the second level of analysis
* on data in a sequence of threads connected
* by piped streams.
**/
class Analyze2Thread extends Thread {
private PrintWriter fOut;
private Scanner fScanner;
public Analyze2Thread (Writer out, BufferedReader
in) {
fOut = new PrintWriter
(out);
fScanner =
new Scanner (in);
} // ctor
/** Read through the data until it runs out.
* Do a calculation upon each value
and
* send the result to the output stream.
**/
public void run () {
double d = 0.0;
if (fOut != null && fScanner != null)
{
try {
while (true)
{
d
= fScanner.nextDouble ();
//
Print as string to the PrintWriter stream.
fOut.printf
("%12.5f %n",calculate (d));
fOut.flush
();
}
}
catch (NoSuchElementException
nse)
{} // data finished
catch (Exception e) {
System.err.println("Analyzer1Thread error = " + e);
}
}
fOut.close ();
} // run
/** Do trivial calculation. **/
double calculate (double d) {
return 2.0 * d;
}
} // Analyze2Thread |
We use the input file dataInput.txt,
which holds
2.0
3.4
454.0
2.1
13.9
632.34
10
99.99
in the following:
c:> java
PipedAnalyzer dataInput.txt
produces the output
PipedAnalysis
Output:
8.00000
23.12000
412232.00000
8.82000
386.42000
799707.75120
200.00000
19996.00020
For real data analysi, it would be more likely that you would deal
with binary data. It is left to the student to convert the above
example to work with binary data in the file and the pipes.
References & Web
Resources
Most recent update: Oct. 1, 2005
|