๐ Advanced Pipelines
Beyond simple command chaining, pipelines offer sophisticated data processing capabilities with careful design and optimization.
๐งญ Pipeline Architecture
A pipeline connects multiple processes through anonymous pipes:
| command1 | command2 | command3
|
Internally:
| command1.stdout โ pipe1 โ command2.stdin
command2.stdout โ pipe2 โ command3.stdin
|
Each process runs concurrently, enabling streaming processing.
๐งช Buffering Behavior
Understanding buffering is crucial for pipeline performance:
Buffer Types
| Buffer Type |
Size |
Trigger |
When Used |
| Full |
4-8KB |
Buffer fills |
File output |
| Line |
N/A |
Newline |
Terminal output |
| None |
0 |
Immediate |
stderr |
Buffering Effects
1
2
3
4
5
6
7
8
9
10
11
12 | # Line buffered (appears immediately)
for i in {1..5}; do
echo "Line $i"
sleep 1
done | cat
# Fully buffered (appears all at once)
for i in {1..5}; do
echo "Line $i" > tempfile
cat tempfile
sleep 1
done | cat
|
Force Line Buffering
| # Use stdbuf to control buffering
stdbuf -oL command | while read line; do
echo "Got: $line"
done
# Or use unbuffer (from expect package)
unbuffer command | process
|
๐ง Complex Pipeline Patterns
Tee Operations
Branch output to multiple destinations:
| # Save to file AND continue processing
command | tee output.log | further_processing
# Multiple branches
command | tee >(process1) >(process2) >(process3) > final_output
# Append mode
command | tee -a output.log | filter
|
Combining Streams
Merge stdout and stderr:
| # Method 1: Redirect stderr to stdout
command 2>&1 | grep ERROR
# Method 2: Bash shorthand
command |& grep ERROR
# Method 3: Separate handling
{
command 2>&3 | handle_stdout
} 3>&1 | handle_stderr
|
๐งช Process Substitution
Treat command output as files:
| # Compare sorted outputs
diff <(sort file1.txt) <(sort file2.txt)
# Feed generated data
while IFS= read -r line; do
process "$line"
done < <(generate_data_stream)
|
Output Process Substitution
| # Send output to multiple processors
paste <(cut -d',' -f1 data.csv) <(cut -d',' -f2 data.csv)
# Log and process simultaneously
command > >(logger) 2> >(error_handler)
|
๐ง Named Pipes (FIFOs)
Persistent communication channels:
| # Create named pipe
mkfifo /tmp/mypipe
trap "rm -f /tmp/mypipe" EXIT
# Producer (background)
produce_data > /tmp/mypipe &
# Consumer
consume_data < /tmp/mypipe
|
Useful for:
- Producer-consumer patterns
- Inter-script communication
- Streaming data between unrelated processes
Minimize Pipeline Stages
Each | creates a subprocess:
| # โ 3 processes
cat file | grep pattern | wc -l
# โ
1 process
grep -c pattern file
|
Use Built-ins When Possible
| # โ External command
cat file | while read line; do echo "$line"; done
# โ
Built-in (no subshell)
while IFS= read -r line; do echo "$line"; done < file
|
Batch Processing
Reduce process creation overhead:
| # โ Many processes
for file in *.txt; do
wc -l "$file"
done
# โ
Single process
wc -l *.txt
|
๐ง Advanced Pipeline Examples
Multi-stage Log Analysis
| # Extract, filter, aggregate, and format
cat access.log \
| grep "POST /api" \
| awk '{print $1, $4, $7}' \
| sort -k2 -n \
| uniq -c \
| sort -nr \
| head -10 \
| awk '{printf "%-5d %-15s %s\n", $1, $2, $3}'
|
Parallel Branch Processing
| {
echo "Header"
cat part1.txt
cat part2.txt
} | sort \
| tee >(wc -l > count.txt) \
| gzip > archive.gz
|
Real-time Monitoring
| # Tail log and process in real-time
tail -f /var/log/app.log \
| grep --line-buffered ERROR \
| while IFS= read -r line; do
send_alert "$line"
done
|
๐งช Error Handling in Pipelines
Pipeline Exit Status
Default behavior: last command's exit code:
| false | true # Exit status: 0 (misleading!)
# Fix with pipefail
set -o pipefail
false | true # Exit status: 1 (correct!)
|
Error Propagation
| # Handle errors in pipeline stages
{
command1 || { echo "command1 failed" >&2; exit 1; }
} | {
command2 || { echo "command2 failed" >&2; exit 1; }
} | command3
|
Timeout Protection
| # Prevent hanging pipelines
timeout 30s command1 | command2 | command3
|
๐งพ Portability Considerations
| Feature |
POSIX |
Bash |
Zsh |
Basic pipes | |
โ
|
โ
|
โ
|
| Process substitution |
โ |
โ
|
โ
|
|& shorthand |
โ |
โ
|
โ |
Named pipes mkfifo |
โ
|
โ
|
โ
|
stdbuf |
โ |
โ
|
โ |
๐งพ Summary
- Understand buffering to control timing
- Use
tee for branching pipelines
- Leverage process substitution for complex flows
- Optimize by reducing subprocesses
- Handle errors with
pipefail and timeouts
- Monitor performance with profiling tools
- Design for portability when needed
๐ Continue to: Expansions