Przejdź do treści

📜 Log Processing Recipes

Efficient log processing transforms raw log data into actionable insights. This recipe provides patterns for collecting, parsing, filtering, and analyzing log data across different systems and formats.


🎯 Core Principles

Structured Log Processing Pipeline

Build modular log processing workflows that can be composed and reused.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
#!/bin/bash
# log-processing-pipeline.sh - Modular log processing framework

# Global configuration
LOG_PROCESSING_TEMP_DIR="/tmp/logproc_$$"
LOG_PROCESSING_OUTPUT_DIR="/var/log/processed"
LOG_PROCESSING_CONFIG_DIR="/etc/logproc"

# Cleanup function
cleanup() {
    rm -rf "$LOG_PROCESSING_TEMP_DIR"
}

trap cleanup EXIT INT TERM

# Initialize processing environment
init_log_processing() {
    mkdir -p "$LOG_PROCESSING_TEMP_DIR"
    mkdir -p "$LOG_PROCESSING_OUTPUT_DIR"

    # Load configuration if available
    if [ -f "$LOG_PROCESSING_CONFIG_DIR/config" ]; then
        source "$LOG_PROCESSING_CONFIG_DIR/config"
    fi
}

# Log source abstraction
get_log_sources() {
    local source_type="$1"
    local source_config="$2"

    case "$source_type" in
        file)
            echo "$source_config"
            ;;
        directory)
            find "$source_config" -name "*.log" -type f
            ;;
        journal)
            echo "journalctl"
            ;;
        syslog)
            echo "/var/log/syslog"
            ;;
        *)
            echo "Unsupported source type: $source_type" >&2
            return 1
            ;;
    esac
}

# Log reader with format detection
read_logs() {
    local source="$1"
    local format="${2:-auto}"

    case "$format" in
        json)
            cat "$source"
            ;;
        csv)
            cat "$source"
            ;;
        syslog)
            # Syslog format preprocessing
            awk '{print $1" "$2" "$3" "$4" "$5": "$6" "$7" "$8}' "$source"
            ;;
        apache)
            # Apache common log format
            cat "$source"
            ;;
        auto)
            # Auto-detect format
            detect_log_format "$source"
            ;;
        *)
            cat "$source"
            ;;
    esac
}

# Format detection
detect_log_format() {
    local source="$1"
    local sample_lines
    sample_lines=$(head -10 "$source")

    # Simple heuristic-based detection
    if echo "$sample_lines" | grep -E '^{.*}$' >/dev/null; then
        echo "$sample_lines"  # Already JSON
    elif echo "$sample_lines" | grep -E '^[0-9]{4}-[0-9]{2}-[0-9]{2}' >/dev/null; then
        # ISO date format
        echo "$sample_lines"
    elif echo "$sample_lines" | grep -E '^[A-Z][a-z]{2} [0-9]{1,2} [0-9]{2}:[0-9]{2}:[0-9]{2}' >/dev/null; then
        # Syslog format
        echo "$sample_lines" | awk '{print $1" "$2" "$3" "$4" "$5": "$6" "$7" "$8}'
    else
        # Default passthrough
        echo "$sample_lines"
    fi
}

🔧 Log Parsing and Filtering

Field Extraction and Transformation

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# log-parser.sh - Advanced log parsing utilities

# Extract fields using various methods
extract_fields() {
    local input="$1"
    local delimiter="$2"
    local field_spec="$3"  # comma-separated field numbers or names

    case "$delimiter" in
        regex)
            # Use regex groups
            echo "$input" | sed -E "$field_spec"
            ;;
        json)
            # Parse JSON fields
            if command -v jq >/dev/null 2>&1; then
                echo "$input" | jq -r "$field_spec"
            else
                echo "Error: jq required for JSON parsing" >&2
                return 1
            fi
            ;;
        *)
            # Standard delimiter-based extraction
            echo "$input" | cut -d"$delimiter" -f"$field_spec"
            ;;
    esac
}

# Timestamp normalization
normalize_timestamp() {
    local timestamp="$1"
    local input_format="$2"

    case "$input_format" in
        iso)
            echo "$timestamp"  # Already in ISO format
            ;;
        unix)
            date -d "@$timestamp" -u +"%Y-%m-%dT%H:%M:%SZ"
            ;;
        syslog)
            # Parse syslog timestamp (e.g., "Jan  1 12:00:00")
            date -d "$timestamp" -u +"%Y-%m-%dT%H:%M:%SZ"
            ;;
        apache)
            # Apache log format timestamp
            date -d "$timestamp" -u +"%Y-%m-%dT%H:%M:%SZ"
            ;;
        *)
            # Try to parse automatically
            date -d "$timestamp" -u +"%Y-%m-%dT%H:%M:%SZ" 2>/dev/null || echo "$timestamp"
            ;;
    esac
}

# Log filtering with conditions
filter_logs() {
    local input_file="$1"
    local filter_conditions="$2"  # key=value pairs separated by ;

    # Parse filter conditions
    IFS=';' read -ra conditions <<< "$filter_conditions"

    local awk_script=""
    for condition in "${conditions[@]}"; do
        local key
        local value
        key=$(echo "$condition" | cut -d'=' -f1)
        value=$(echo "$condition" | cut -d'=' -f2)

        # Build AWK condition
        if [ -n "$awk_script" ]; then
            awk_script="$awk_script && \$$key ~ /$value/"
        else
            awk_script="\$$key ~ /$value/"
        fi
    done

    # Apply filter
    awk "$awk_script" "$input_file"
}

# Log enrichment with additional data
enrich_logs() {
    local input_file="$1"
    local enrichment_rules="$2"

    # Example: Add hostname, environment info
    awk -v hostname="$(hostname)" -v timestamp="$(date -u +%Y-%m-%dT%H:%M:%SZ)" '
    {
        print $0 " hostname=" hostname " processed_at=" timestamp
    }' "$input_file"
}

📊 Log Aggregation and Analysis

Statistical Analysis and Reporting

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# log-analytics.sh - Log analysis and reporting tools

# Count occurrences of patterns
count_patterns() {
    local input_file="$1"
    local pattern="$2"
    local field="${3:-0}"  # Field number to count (0 for whole line)

    if [ "$field" -eq 0 ]; then
        grep "$pattern" "$input_file" | wc -l
    else
        grep "$pattern" "$input_file" | cut -d' ' -f"$field" | sort | uniq -c | sort -nr
    fi
}

# Calculate statistics for numeric fields
calculate_statistics() {
    local input_file="$1"
    local field="$2"

    # Extract numeric values and calculate stats
    local values
    values=$(awk -v fld="$field" '{print $fld}' "$input_file" | grep -E '^[0-9]+\.?[0-9]*$')

    if [ -z "$values" ]; then
        echo "No numeric values found in field $field"
        return 1
    fi

    # Calculate basic statistics
    echo "$values" | awk '
    BEGIN {
        sum = 0; count = 0; min = ""; max = "";
    }
    {
        sum += $1; count++;
        if (min == "" || $1 < min) min = $1;
        if (max == "" || $1 > max) max = $1;
    }
    END {
        if (count > 0) {
            avg = sum / count;
            printf "Count: %d\nSum: %.2f\nAverage: %.2f\nMin: %.2f\nMax: %.2f\n", count, sum, avg, min, max);
        }
    }'
}

# Time-series analysis
analyze_time_series() {
    local input_file="$1"
    local time_field="$2"
    local value_field="$3"
    local interval="${4:-3600}"  # Default 1 hour intervals

    # Group by time intervals and calculate aggregates
    awk -v time_fld="$time_field" -v val_fld="$value_field" -v interval="$interval" '
    {
        # Parse timestamp and convert to epoch
        timestamp = $time_fld;
        gsub(/[-:TZ]/, " ", timestamp);
        epoch = mktime(timestamp);

        # Calculate interval bucket
        bucket = int(epoch / interval) * interval;

        # Accumulate values
        sum[bucket] += $val_fld;
        count[bucket]++;
    }
    END {
        # Output results
        for (bucket in sum) {
            avg = sum[bucket] / count[bucket];
            print strftime("%Y-%m-%d %H:%M:%S", bucket) " " sum[bucket] " " count[bucket] " " avg;
        }
    }' "$input_file" | sort
}

# Top-N analysis
top_n_analysis() {
    local input_file="$1"
    local field="$2"
    local count="${3:-10}"
    local sort_order="${4:-desc}"  # asc or desc

    if [ "$sort_order" = "desc" ]; then
        awk -v fld="$field" '{print $fld}' "$input_file" | sort | uniq -c | sort -nr | head -n "$count"
    else
        awk -v fld="$field" '{print $fld}' "$input_file" | sort | uniq -c | sort -n | head -n "$count"
    fi
}

🔄 Log Rotation and Archiving

Automated Log Management

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# log-rotation.sh - Log rotation and archival system

# Rotate logs based on size or age
rotate_logs() {
    local log_dir="$1"
    local max_size="${2:-100M}"
    local max_age="${3:-7}"  # days
    local archive_dir="${4:-$log_dir/archive}"

    mkdir -p "$archive_dir"

    # Find logs to rotate
    find "$log_dir" -name "*.log" -type f | while read -r log_file; do
        local file_size
        file_size=$(stat -f%z "$log_file" 2>/dev/null || stat -c%s "$log_file" 2>/dev/null)

        local file_age
        file_age=$(( ( $(date +%s) - $(stat -f %m "$log_file" 2>/dev/null || stat -c %Y "$log_file" 2>/dev/null) ) / 86400 ))

        # Check rotation criteria
        if [ "$file_size" -gt "$(echo "$max_size" | sed 's/M/*1024*1024/' | bc)" ] || [ "$file_age" -gt "$max_age" ]; then
            local timestamp
            timestamp=$(date -r "$log_file" +%Y%m%d_%H%M%S)
            local base_name
            base_name=$(basename "$log_file" .log)
            local archive_name="$archive_dir/${base_name}_${timestamp}.log.gz"

            # Rotate and compress
            if gzip -c "$log_file" > "$archive_name"; then
                echo "Rotated and compressed: $log_file -> $archive_name"
                > "$log_file"  # Truncate original file
            else
                echo "Failed to rotate: $log_file" >&2
            fi
        fi
    done
}

# Clean up old archives
cleanup_old_archives() {
    local archive_dir="$1"
    local retention_days="${2:-30}"

    local cutoff_date
    cutoff_date=$(date -d "$retention_days days ago" +%s)

    find "$archive_dir" -name "*.log.gz" -type f | while read -r archive_file; do
        local file_date
        file_date=$(stat -f %m "$archive_file" 2>/dev/null || stat -c %Y "$archive_file" 2>/dev/null)

        if [ "$file_date" -lt "$cutoff_date" ]; then
            rm -f "$archive_file"
            echo "Removed old archive: $archive_file"
        fi
    done
}

# Compress rotated logs
compress_logs() {
    local log_dir="$1"
    local compression_level="${2:-6}"

    find "$log_dir" -name "*.log.1" -o -name "*.log.0" | while read -r log_file; do
        if gzip -"$compression_level" "$log_file"; then
            echo "Compressed: $log_file"
        else
            echo "Failed to compress: $log_file" >&2
        fi
    done
}

🎨 Advanced Log Processing Features

Real-time Log Streaming

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# log-streaming.sh - Real-time log processing

# Stream and process logs in real-time
stream_logs() {
    local source="$1"
    local processor="$2"
    local output_file="$3"

    # Tail the log file and process new entries
    tail -f "$source" | while read -r line; do
        # Process each line
        processed_line=$(echo "$line" | eval "$processor")

        # Output processed line
        echo "$processed_line"

        # Optionally save to file
        if [ -n "$output_file" ]; then
            echo "$processed_line" >> "$output_file"
        fi
    done
}

# Log correlation across multiple sources
correlate_logs() {
    local sources=("$@")
    local temp_dir
    temp_dir=$(mktemp -d)

    # Stream each source to temporary files
    for i in "${!sources[@]}"; do
        local source="${sources[$i]}"
        tail -f "$source" > "$temp_dir/source_$i.log" &
    done

    # Correlation logic would go here
    # This is a simplified example

    # Cleanup
    trap "rm -rf $temp_dir" EXIT
}

# Alert generation from log patterns
generate_alerts() {
    local log_file="$1"
    local alert_patterns_file="$2"
    local alert_destination="$3"

    # Read alert patterns
    while IFS='|' read -r pattern severity message; do
        # Count occurrences in recent logs
        local count
        count=$(grep "$pattern" "$log_file" | tail -100 | wc -l)

        # Generate alert if threshold exceeded
        if [ "$count" -gt 0 ]; then
            local alert_message="[$severity] $message (Count: $count)"
            send_alert "$alert_message" "$alert_destination"
        fi
    done < "$alert_patterns_file"
}

# Send alert notifications
send_alert() {
    local message="$1"
    local destination="$2"

    case "$destination" in
        email)
            echo "$message" | mail -s "Log Alert" "admin@example.com"
            ;;
        slack)
            # Slack webhook integration
            curl -X POST -H 'Content-type: application/json' \
                --data "{\"text\":\"$message\"}" \
                "https://hooks.slack.com/services/YOUR/SLACK/WEBHOOK"
            ;;
        syslog)
            logger -p user.alert "$message"
            ;;
        *)
            echo "$message"
            ;;
    esac
}

🧾 Summary Best Practices

Log Processing Guidelines

  1. Structured Input: Use structured logging formats (JSON, CSV) when possible
  2. Modular Design: Build composable processing functions
  3. Error Handling: Handle malformed or missing data gracefully
  4. Performance: Optimize for large log volumes
  5. Security: Sanitize sensitive data in logs
  6. Retention: Implement appropriate log retention policies
  7. Monitoring: Monitor log processing pipelines themselves

Sample Log Processing Workflow

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# example-workflow.sh - Complete log processing example

process_web_server_logs() {
    local log_file="$1"
    local output_dir="$2"

    # Step 1: Parse and normalize
    cat "$log_file" | \
    awk '{print $1" "$4" "$9" "$10}' | \
    sed 's/\[//g; s/\]//g' > "$LOG_PROCESSING_TEMP_DIR/parsed.log"

    # Step 2: Filter errors
    grep " 500\| 404\| 403" "$LOG_PROCESSING_TEMP_DIR/parsed.log" > "$LOG_PROCESSING_TEMP_DIR/errors.log"

    # Step 3: Analyze error patterns
    top_n_analysis "$LOG_PROCESSING_TEMP_DIR/errors.log" 1 20 > "$output_dir/top_errors.txt"

    # Step 4: Generate statistics
    calculate_statistics "$LOG_PROCESSING_TEMP_DIR/parsed.log" 4 > "$output_dir/response_stats.txt"

    # Step 5: Archive processed logs
    gzip -c "$LOG_PROCESSING_TEMP_DIR/parsed.log" > "$output_dir/processed_$(date +%Y%m%d).log.gz"

    echo "Log processing completed. Results in $output_dir"
}

🧠 Complete Log Processing Script

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
#!/bin/bash
# complete-log-processor.sh - Production-ready log processing system

set -euo pipefail

# Source processing modules
source log-processing-pipeline.sh
source log-parser.sh
source log-analytics.sh
source log-rotation.sh
source log-streaming.sh

# Main processing function
main() {
    local action="$1"
    shift

    case "$action" in
        process)
            process_logs "$@"
            ;;
        analyze)
            analyze_logs "$@"
            ;;
        rotate)
            rotate_logs "$@"
            ;;
        stream)
            stream_logs "$@"
            ;;
        correlate)
            correlate_logs "$@"
            ;;
        *)
            echo "Usage: $0 {process|analyze|rotate|stream|correlate} [options]" >&2
            return 1
            ;;
    esac
}

# Process logs with full pipeline
process_logs() {
    local source_type="$1"
    local source_config="$2"
    local output_dir="$3"

    init_log_processing

    # Get log sources
    local sources
    sources=$(get_log_sources "$source_type" "$source_config")

    # Process each source
    for source in $sources; do
        echo "Processing source: $source"

        # Read and parse logs
        read_logs "$source" | \
        extract_fields_stdin "|" "1,3,5" | \
        normalize_timestamps | \
        enrich_with_metadata > "$LOG_PROCESSING_TEMP_DIR/processed_$(basename "$source").log"

        # Generate analytics
        generate_analytics "$LOG_PROCESSING_TEMP_DIR/processed_$(basename "$source").log" "$output_dir"
    done

    echo "Log processing completed"
}

# Run main function
main "$@"

🧾 See Also