📜 Log Processing Recipes
Efficient log processing transforms raw log data into actionable insights. This recipe provides patterns for collecting, parsing, filtering, and analyzing log data across different systems and formats.
🎯 Core Principles
Structured Log Processing Pipeline
Build modular log processing workflows that can be composed and reused.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101 | #!/bin/bash
# log-processing-pipeline.sh - Modular log processing framework
# Global configuration
LOG_PROCESSING_TEMP_DIR="/tmp/logproc_$$"
LOG_PROCESSING_OUTPUT_DIR="/var/log/processed"
LOG_PROCESSING_CONFIG_DIR="/etc/logproc"
# Cleanup function
cleanup() {
rm -rf "$LOG_PROCESSING_TEMP_DIR"
}
trap cleanup EXIT INT TERM
# Initialize processing environment
init_log_processing() {
mkdir -p "$LOG_PROCESSING_TEMP_DIR"
mkdir -p "$LOG_PROCESSING_OUTPUT_DIR"
# Load configuration if available
if [ -f "$LOG_PROCESSING_CONFIG_DIR/config" ]; then
source "$LOG_PROCESSING_CONFIG_DIR/config"
fi
}
# Log source abstraction
get_log_sources() {
local source_type="$1"
local source_config="$2"
case "$source_type" in
file)
echo "$source_config"
;;
directory)
find "$source_config" -name "*.log" -type f
;;
journal)
echo "journalctl"
;;
syslog)
echo "/var/log/syslog"
;;
*)
echo "Unsupported source type: $source_type" >&2
return 1
;;
esac
}
# Log reader with format detection
read_logs() {
local source="$1"
local format="${2:-auto}"
case "$format" in
json)
cat "$source"
;;
csv)
cat "$source"
;;
syslog)
# Syslog format preprocessing
awk '{print $1" "$2" "$3" "$4" "$5": "$6" "$7" "$8}' "$source"
;;
apache)
# Apache common log format
cat "$source"
;;
auto)
# Auto-detect format
detect_log_format "$source"
;;
*)
cat "$source"
;;
esac
}
# Format detection
detect_log_format() {
local source="$1"
local sample_lines
sample_lines=$(head -10 "$source")
# Simple heuristic-based detection
if echo "$sample_lines" | grep -E '^{.*}$' >/dev/null; then
echo "$sample_lines" # Already JSON
elif echo "$sample_lines" | grep -E '^[0-9]{4}-[0-9]{2}-[0-9]{2}' >/dev/null; then
# ISO date format
echo "$sample_lines"
elif echo "$sample_lines" | grep -E '^[A-Z][a-z]{2} [0-9]{1,2} [0-9]{2}:[0-9]{2}:[0-9]{2}' >/dev/null; then
# Syslog format
echo "$sample_lines" | awk '{print $1" "$2" "$3" "$4" "$5": "$6" "$7" "$8}'
else
# Default passthrough
echo "$sample_lines"
fi
}
|
🔧 Log Parsing and Filtering
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94 | # log-parser.sh - Advanced log parsing utilities
# Extract fields using various methods
extract_fields() {
local input="$1"
local delimiter="$2"
local field_spec="$3" # comma-separated field numbers or names
case "$delimiter" in
regex)
# Use regex groups
echo "$input" | sed -E "$field_spec"
;;
json)
# Parse JSON fields
if command -v jq >/dev/null 2>&1; then
echo "$input" | jq -r "$field_spec"
else
echo "Error: jq required for JSON parsing" >&2
return 1
fi
;;
*)
# Standard delimiter-based extraction
echo "$input" | cut -d"$delimiter" -f"$field_spec"
;;
esac
}
# Timestamp normalization
normalize_timestamp() {
local timestamp="$1"
local input_format="$2"
case "$input_format" in
iso)
echo "$timestamp" # Already in ISO format
;;
unix)
date -d "@$timestamp" -u +"%Y-%m-%dT%H:%M:%SZ"
;;
syslog)
# Parse syslog timestamp (e.g., "Jan 1 12:00:00")
date -d "$timestamp" -u +"%Y-%m-%dT%H:%M:%SZ"
;;
apache)
# Apache log format timestamp
date -d "$timestamp" -u +"%Y-%m-%dT%H:%M:%SZ"
;;
*)
# Try to parse automatically
date -d "$timestamp" -u +"%Y-%m-%dT%H:%M:%SZ" 2>/dev/null || echo "$timestamp"
;;
esac
}
# Log filtering with conditions
filter_logs() {
local input_file="$1"
local filter_conditions="$2" # key=value pairs separated by ;
# Parse filter conditions
IFS=';' read -ra conditions <<< "$filter_conditions"
local awk_script=""
for condition in "${conditions[@]}"; do
local key
local value
key=$(echo "$condition" | cut -d'=' -f1)
value=$(echo "$condition" | cut -d'=' -f2)
# Build AWK condition
if [ -n "$awk_script" ]; then
awk_script="$awk_script && \$$key ~ /$value/"
else
awk_script="\$$key ~ /$value/"
fi
done
# Apply filter
awk "$awk_script" "$input_file"
}
# Log enrichment with additional data
enrich_logs() {
local input_file="$1"
local enrichment_rules="$2"
# Example: Add hostname, environment info
awk -v hostname="$(hostname)" -v timestamp="$(date -u +%Y-%m-%dT%H:%M:%SZ)" '
{
print $0 " hostname=" hostname " processed_at=" timestamp
}' "$input_file"
}
|
📊 Log Aggregation and Analysis
Statistical Analysis and Reporting
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91 | # log-analytics.sh - Log analysis and reporting tools
# Count occurrences of patterns
count_patterns() {
local input_file="$1"
local pattern="$2"
local field="${3:-0}" # Field number to count (0 for whole line)
if [ "$field" -eq 0 ]; then
grep "$pattern" "$input_file" | wc -l
else
grep "$pattern" "$input_file" | cut -d' ' -f"$field" | sort | uniq -c | sort -nr
fi
}
# Calculate statistics for numeric fields
calculate_statistics() {
local input_file="$1"
local field="$2"
# Extract numeric values and calculate stats
local values
values=$(awk -v fld="$field" '{print $fld}' "$input_file" | grep -E '^[0-9]+\.?[0-9]*$')
if [ -z "$values" ]; then
echo "No numeric values found in field $field"
return 1
fi
# Calculate basic statistics
echo "$values" | awk '
BEGIN {
sum = 0; count = 0; min = ""; max = "";
}
{
sum += $1; count++;
if (min == "" || $1 < min) min = $1;
if (max == "" || $1 > max) max = $1;
}
END {
if (count > 0) {
avg = sum / count;
printf "Count: %d\nSum: %.2f\nAverage: %.2f\nMin: %.2f\nMax: %.2f\n", count, sum, avg, min, max);
}
}'
}
# Time-series analysis
analyze_time_series() {
local input_file="$1"
local time_field="$2"
local value_field="$3"
local interval="${4:-3600}" # Default 1 hour intervals
# Group by time intervals and calculate aggregates
awk -v time_fld="$time_field" -v val_fld="$value_field" -v interval="$interval" '
{
# Parse timestamp and convert to epoch
timestamp = $time_fld;
gsub(/[-:TZ]/, " ", timestamp);
epoch = mktime(timestamp);
# Calculate interval bucket
bucket = int(epoch / interval) * interval;
# Accumulate values
sum[bucket] += $val_fld;
count[bucket]++;
}
END {
# Output results
for (bucket in sum) {
avg = sum[bucket] / count[bucket];
print strftime("%Y-%m-%d %H:%M:%S", bucket) " " sum[bucket] " " count[bucket] " " avg;
}
}' "$input_file" | sort
}
# Top-N analysis
top_n_analysis() {
local input_file="$1"
local field="$2"
local count="${3:-10}"
local sort_order="${4:-desc}" # asc or desc
if [ "$sort_order" = "desc" ]; then
awk -v fld="$field" '{print $fld}' "$input_file" | sort | uniq -c | sort -nr | head -n "$count"
else
awk -v fld="$field" '{print $fld}' "$input_file" | sort | uniq -c | sort -n | head -n "$count"
fi
}
|
🔄 Log Rotation and Archiving
Automated Log Management
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70 | # log-rotation.sh - Log rotation and archival system
# Rotate logs based on size or age
rotate_logs() {
local log_dir="$1"
local max_size="${2:-100M}"
local max_age="${3:-7}" # days
local archive_dir="${4:-$log_dir/archive}"
mkdir -p "$archive_dir"
# Find logs to rotate
find "$log_dir" -name "*.log" -type f | while read -r log_file; do
local file_size
file_size=$(stat -f%z "$log_file" 2>/dev/null || stat -c%s "$log_file" 2>/dev/null)
local file_age
file_age=$(( ( $(date +%s) - $(stat -f %m "$log_file" 2>/dev/null || stat -c %Y "$log_file" 2>/dev/null) ) / 86400 ))
# Check rotation criteria
if [ "$file_size" -gt "$(echo "$max_size" | sed 's/M/*1024*1024/' | bc)" ] || [ "$file_age" -gt "$max_age" ]; then
local timestamp
timestamp=$(date -r "$log_file" +%Y%m%d_%H%M%S)
local base_name
base_name=$(basename "$log_file" .log)
local archive_name="$archive_dir/${base_name}_${timestamp}.log.gz"
# Rotate and compress
if gzip -c "$log_file" > "$archive_name"; then
echo "Rotated and compressed: $log_file -> $archive_name"
> "$log_file" # Truncate original file
else
echo "Failed to rotate: $log_file" >&2
fi
fi
done
}
# Clean up old archives
cleanup_old_archives() {
local archive_dir="$1"
local retention_days="${2:-30}"
local cutoff_date
cutoff_date=$(date -d "$retention_days days ago" +%s)
find "$archive_dir" -name "*.log.gz" -type f | while read -r archive_file; do
local file_date
file_date=$(stat -f %m "$archive_file" 2>/dev/null || stat -c %Y "$archive_file" 2>/dev/null)
if [ "$file_date" -lt "$cutoff_date" ]; then
rm -f "$archive_file"
echo "Removed old archive: $archive_file"
fi
done
}
# Compress rotated logs
compress_logs() {
local log_dir="$1"
local compression_level="${2:-6}"
find "$log_dir" -name "*.log.1" -o -name "*.log.0" | while read -r log_file; do
if gzip -"$compression_level" "$log_file"; then
echo "Compressed: $log_file"
else
echo "Failed to compress: $log_file" >&2
fi
done
}
|
🎨 Advanced Log Processing Features
Real-time Log Streaming
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85 | # log-streaming.sh - Real-time log processing
# Stream and process logs in real-time
stream_logs() {
local source="$1"
local processor="$2"
local output_file="$3"
# Tail the log file and process new entries
tail -f "$source" | while read -r line; do
# Process each line
processed_line=$(echo "$line" | eval "$processor")
# Output processed line
echo "$processed_line"
# Optionally save to file
if [ -n "$output_file" ]; then
echo "$processed_line" >> "$output_file"
fi
done
}
# Log correlation across multiple sources
correlate_logs() {
local sources=("$@")
local temp_dir
temp_dir=$(mktemp -d)
# Stream each source to temporary files
for i in "${!sources[@]}"; do
local source="${sources[$i]}"
tail -f "$source" > "$temp_dir/source_$i.log" &
done
# Correlation logic would go here
# This is a simplified example
# Cleanup
trap "rm -rf $temp_dir" EXIT
}
# Alert generation from log patterns
generate_alerts() {
local log_file="$1"
local alert_patterns_file="$2"
local alert_destination="$3"
# Read alert patterns
while IFS='|' read -r pattern severity message; do
# Count occurrences in recent logs
local count
count=$(grep "$pattern" "$log_file" | tail -100 | wc -l)
# Generate alert if threshold exceeded
if [ "$count" -gt 0 ]; then
local alert_message="[$severity] $message (Count: $count)"
send_alert "$alert_message" "$alert_destination"
fi
done < "$alert_patterns_file"
}
# Send alert notifications
send_alert() {
local message="$1"
local destination="$2"
case "$destination" in
email)
echo "$message" | mail -s "Log Alert" "admin@example.com"
;;
slack)
# Slack webhook integration
curl -X POST -H 'Content-type: application/json' \
--data "{\"text\":\"$message\"}" \
"https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK"
;;
syslog)
logger -p user.alert "$message"
;;
*)
echo "$message"
;;
esac
}
|
🧾 Summary Best Practices
Log Processing Guidelines
- Structured Input: Use structured logging formats (JSON, CSV) when possible
- Modular Design: Build composable processing functions
- Error Handling: Handle malformed or missing data gracefully
- Performance: Optimize for large log volumes
- Security: Sanitize sensitive data in logs
- Retention: Implement appropriate log retention policies
- Monitoring: Monitor log processing pipelines themselves
Sample Log Processing Workflow
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25 | # example-workflow.sh - Complete log processing example
process_web_server_logs() {
local log_file="$1"
local output_dir="$2"
# Step 1: Parse and normalize
cat "$log_file" | \
awk '{print $1" "$4" "$9" "$10}' | \
sed 's/\[//g; s/\]//g' > "$LOG_PROCESSING_TEMP_DIR/parsed.log"
# Step 2: Filter errors
grep " 500\| 404\| 403" "$LOG_PROCESSING_TEMP_DIR/parsed.log" > "$LOG_PROCESSING_TEMP_DIR/errors.log"
# Step 3: Analyze error patterns
top_n_analysis "$LOG_PROCESSING_TEMP_DIR/errors.log" 1 20 > "$output_dir/top_errors.txt"
# Step 4: Generate statistics
calculate_statistics "$LOG_PROCESSING_TEMP_DIR/parsed.log" 4 > "$output_dir/response_stats.txt"
# Step 5: Archive processed logs
gzip -c "$LOG_PROCESSING_TEMP_DIR/parsed.log" > "$output_dir/processed_$(date +%Y%m%d).log.gz"
echo "Log processing completed. Results in $output_dir"
}
|
🧠 Complete Log Processing Script
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71 | #!/bin/bash
# complete-log-processor.sh - Production-ready log processing system
set -euo pipefail
# Source processing modules
source log-processing-pipeline.sh
source log-parser.sh
source log-analytics.sh
source log-rotation.sh
source log-streaming.sh
# Main processing function
main() {
local action="$1"
shift
case "$action" in
process)
process_logs "$@"
;;
analyze)
analyze_logs "$@"
;;
rotate)
rotate_logs "$@"
;;
stream)
stream_logs "$@"
;;
correlate)
correlate_logs "$@"
;;
*)
echo "Usage: $0 {process|analyze|rotate|stream|correlate} [options]" >&2
return 1
;;
esac
}
# Process logs with full pipeline
process_logs() {
local source_type="$1"
local source_config="$2"
local output_dir="$3"
init_log_processing
# Get log sources
local sources
sources=$(get_log_sources "$source_type" "$source_config")
# Process each source
for source in $sources; do
echo "Processing source: $source"
# Read and parse logs
read_logs "$source" | \
extract_fields_stdin "|" "1,3,5" | \
normalize_timestamps | \
enrich_with_metadata > "$LOG_PROCESSING_TEMP_DIR/processed_$(basename "$source").log"
# Generate analytics
generate_analytics "$LOG_PROCESSING_TEMP_DIR/processed_$(basename "$source").log" "$output_dir"
done
echo "Log processing completed"
}
# Run main function
main "$@"
|
🧾 See Also