What is Logstash?
Logstash is an open-source server-side data processing pipeline that ingests data from multiple sources simultaneously, transforms it, and then sends it to your favorite "stash." As a core component of the ELK Stack (Elasticsearch, Logstash, and Kibana), Logstash serves as the data collection and log-parsing engine that helps organizations centralize, transform, and route data for analysis and storage.
Why Do We Need Logstash?
Data Fragmentation Challenge
Modern applications generate logs across distributed systems, making data collection and analysis complex. Without a centralized processing pipeline, organizations face scattered data sources that are difficult to monitor and analyze effectively.
Real-Time Processing Requirements
Traditional batch processing methods cannot meet the demands of real-time monitoring and alerting. Logstash provides streaming data processing capabilities that enable immediate data transformation and routing.
Data Format Standardization
Raw log data comes in various formats from different sources. Logstash normalizes and enriches this data, making it searchable and analyzable in downstream systems like Elasticsearch.
Logstash Architecture
Three-Stage Pipeline
Logstash operates through a three-stage pipeline architecture:
Input Stage: Collects data from various sources including files, databases, message queues, and cloud services.
Filter Stage: Transforms, parses, and enriches data using plugins like Grok, Mutate, and Date filters.
Output Stage: Routes processed data to destinations such as Elasticsearch, databases, or external systems.
Plugin Architecture
Logstash's functionality extends through a rich ecosystem of plugins:
- Input Plugins: File, Beats, TCP, HTTP, Database connectors
- Filter Plugins: Grok, JSON, CSV, GeoIP, Date parsing
- Output Plugins: Elasticsearch, Kafka, Email, S3, databases
Event Processing Flow
Events flow through the pipeline where they are buffered, processed in batches, and routed based on configuration rules, ensuring efficient resource utilization and fault tolerance.
Key Features of Logstash
Based on the official Logstash documentation, Logstash is an open source data collection engine with real-time pipelining capabilities that can dynamically unify data from disparate sources and normalize the data into destinations of your choice:
Real-Time Data Processing Pipeline
Logstash operates as a powerful data collection engine with real-time pipelining capabilities, enabling continuous processing of streaming data from multiple sources simultaneously. The pipeline architecture processes events in near real-time, making it ideal for monitoring, alerting, and immediate data analysis scenarios where timely insights are critical for operational decision-making.
Dynamic Data Unification and Normalization
Logstash can dynamically unify data from disparate sources and normalize the data into destinations of your choice. This capability allows organizations to cleanse and democratize all their data for diverse advanced downstream analytics and visualization use cases, regardless of the original data format or source system.
Extensive Plugin Ecosystem
While Logstash originally drove innovation in log collection, its capabilities extend well beyond that use case. Any type of event can be enriched and transformed with a broad array of input, filter, and output plugins, with many native codecs further simplifying the ingestion process. This extensibility accelerates insights by harnessing a greater volume and variety of data sources.
Flexible Input Processing
Logstash supports diverse input sources including files, databases, message queues, HTTP endpoints, cloud services, and the Beats family of lightweight data shippers. The input stage can handle multiple concurrent data streams with different formats, protocols, and delivery mechanisms, providing comprehensive data collection capabilities.
Advanced Filtering and Transformation
The filter stage provides sophisticated data processing capabilities through plugins like Grok for pattern matching, Mutate for field manipulation, Date for timestamp parsing, JSON for structured data handling, and GeoIP for location enrichment. These filters enable complex data transformation, parsing, and enrichment operations that prepare raw data for analysis.
Multi-Destination Output Routing
Logstash can route processed data to multiple destinations simultaneously, including Elasticsearch for search and analytics, databases for persistence, message queues for further processing, cloud storage services, and external APIs. Conditional routing logic allows different data types to be sent to appropriate destinations based on content, metadata, or processing results.
Common Use Cases for Logstash
Log Aggregation and Analysis
Centralize logs from web servers, applications, and infrastructure components for unified monitoring and troubleshooting.
Security Information and Event Management (SIEM)
Process security logs, correlate events, and enrich data with threat intelligence for comprehensive security monitoring.
Metrics Processing
Transform and route application and infrastructure metrics to monitoring platforms for performance analysis and alerting.
Data Migration and ETL
Extract data from legacy systems, transform formats, and load into modern data platforms or data warehouses.
Real-time Data Streaming
Process streaming data from message queues, IoT sensors, or application events for immediate analysis and action.
Implementation Examples
Basic Log Processing Pipeline
# /etc/logstash/conf.d/apache-logs.conf
input {
file {
path => "/var/log/apache2/access.log"
start_position => "beginning"
sincedb_path => "/dev/null"
}
}
filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
date {
match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
}
mutate {
convert => { "bytes" => "integer" }
convert => { "response" => "integer" }
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "apache-logs-%{+YYYY.MM.dd}"
}
stdout {
codec => rubydebug
}
}
Multi-Input Pipeline Configuration
# /etc/logstash/pipelines.yml
- pipeline.id: web-logs
path.config: "/etc/logstash/conf.d/web-*.conf"
pipeline.workers: 2
- pipeline.id: database-logs
path.config: "/etc/logstash/conf.d/db-*.conf"
pipeline.workers: 1
Advanced JSON Processing
input {
beats {
port => 5044
}
}
filter {
if [fields][service] == "api" {
json {
source => "message"
}
if [response_time] {
mutate {
convert => { "response_time" => "float" }
}
}
if [response_time] > 1000 {
mutate {
add_tag => ["slow_response"]
}
}
}
}
output {
if "slow_response" in [tags] {
elasticsearch {
hosts => ["localhost:9200"]
index => "slow-api-logs-%{+YYYY.MM.dd}"
}
} else {
elasticsearch {
hosts => ["localhost:9200"]
index => "api-logs-%{+YYYY.MM.dd}"
}
}
}
Logstash Doris output plugin Example
The Logstash Doris output plugin calls the Doris Stream Load HTTP interface to write data into Doris in real-time, offering capabilities such as multi-threaded concurrency, failure retries, custom Stream Load formats and parameters, and output write speed.
Installing the Plugin
// Obtaining the Plugin
// You can download the plugin from the official website or compile it from the source code yourself.
//
// Download from the official website
//
// Installation package without dependencies https://apache-doris-releases.oss-accelerate.aliyuncs.com/extension/logstash-output-doris-1.2.0.gem
// Installation package with dependencies https://apache-doris-releases.oss-accelerate.aliyuncs.com/extension/logstash-output-doris-1.2.0.zip
// Compile from source code
//
// cd extension/logstash/
//
// gem build logstash-output-doris.gemspec${LOGSTASH_HOME}/bin/logstash-plugin install logstash-output-doris-1.2.0.gem
Validating logstash-output-doris-1.2.0.gem
Installing logstash-output-doris
Installation successful
Create Doris table
// Example data
// wget https://data.gharchive.org/2024-01-01-15.json.gz
CREATE DATABASE log_db;
USE log_db;
CREATE TABLE github_events
(
`created_at` DATETIME,
`id` BIGINT,
`type` TEXT,
`public` BOOLEAN,
`actor.id` BIGINT,
`actor.login` TEXT,
`actor.display_login` TEXT,
`actor.gravatar_id` TEXT,
`actor.url` TEXT,
`actor.avatar_url` TEXT,
`repo.id` BIGINT,
`repo.name` TEXT,
`repo.url` TEXT,
`payload` TEXT,
`host` TEXT,
`path` TEXT,
INDEX `idx_id` (`id`) USING INVERTED,
INDEX `idx_type` (`type`) USING INVERTED,
INDEX `idx_actor.id` (`actor.id`) USING INVERTED,
INDEX `idx_actor.login` (`actor.login`) USING INVERTED,
INDEX `idx_repo.id` (`repo.id`) USING INVERTED,
INDEX `idx_repo.name` (`repo.name`) USING INVERTED,
INDEX `idx_host` (`host`) USING INVERTED,
INDEX `idx_path` (`path`) USING INVERTED,
INDEX `idx_payload` (`payload`) USING INVERTED PROPERTIES("parser" = "unicode", "support_phrase" = "true")
)
ENGINE = OLAP
DUPLICATE KEY(`created_at`)
PARTITION BY RANGE(`created_at`) ()
DISTRIBUTED BY RANDOM BUCKETS 10
PROPERTIES (
"replication_num" = "1",
"compaction_policy" = "time_series",
"enable_single_replica_compaction" = "true",
"dynamic_partition.enable" = "true",
"dynamic_partition.create_history_partition" = "true",
"dynamic_partition.time_unit" = "DAY",
"dynamic_partition.start" = "-30",
"dynamic_partition.end" = "1",
"dynamic_partition.prefix" = "p",
"dynamic_partition.buckets" = "10",
"dynamic_partition.replication_num" = "1"
);
Logstash configuration
input {
file {
path => "/tmp/github_events/2024-04-01-23.json"
codec => json
}
}
output {
doris {
http_hosts => ["http://fe1:8630", "http://fe2:8630", "http://fe3:8630"]
user => "root"
password => ""
db => "log_db"
table => "github_events"
headers => {
"format" => "json"
"read_json_by_line" => "true"
"load_to_single_tablet" => "true"
}
mapping => {
"created_at" => "%{created_at}"
"id" => "%{id}"
"type" => "%{type}"
"public" => "%{public}"
"actor.id" => "%{[actor][id]}"
"actor.login" => "%{[actor][login]}"
"actor.display_login" => "%{[actor][display_login]}"
"actor.gravatar_id" => "%{[actor][gravatar_id]}"
"actor.url" => "%{[actor][url]}"
"actor.avatar_url" => "%{[actor][avatar_url]}"
"repo.id" => "%{[repo][id]}"
"repo.name" => "%{[repo][name]}"
"repo.url" => "%{[repo][url]}"
"payload" => "%{[payload]}"
"host" => "%{[host][name]}"
"path" => "%{[log][file][path]}"
}
log_request => true
}
}
Docker Deployment Example
# docker-compose.yml
version: '3.7'
services:
logstash:
image: docker.elastic.co/logstash/logstash:8.11.0
container_name: logstash
environment:
- "LS_JAVA_OPTS=-Xmx1g -Xms1g"
volumes:
- ./logstash/config/logstash.yml:/usr/share/logstash/config/logstash.yml:ro
- ./logstash/pipeline:/usr/share/logstash/pipeline:ro
- /var/log:/var/log:ro
ports:
- "5044:5044"
- "5000:5000/tcp"
- "5000:5000/udp"
networks:
- elk
depends_on:
- elasticsearch
networks:
elk:
driver: bridge
Key Takeaways
Logstash serves as a critical data processing pipeline component that bridges the gap between raw data sources and analytical platforms. Its flexible plugin architecture enables seamless integration with diverse data sources while powerful transformation capabilities ensure data quality and consistency. For organizations implementing observability solutions or building data pipelines, Logstash provides the reliability, scalability, and functionality needed for effective data processing. The tool's integration with the broader Elastic Stack ecosystem makes it an essential component for log management, security monitoring, and real-time analytics workflows.
Frequently Asked Questions
Q: How does Logstash differ from Beats?
A: Beats are lightweight data shippers that collect and forward data, while Logstash is a more powerful data processing engine that can transform and enrich data. Beats often send data to Logstash for complex processing.
Q: Can Logstash handle high-volume data streams?
A: Yes, Logstash supports horizontal scaling through multiple pipelines, worker threads, and persistent queues. It can process thousands of events per second with proper configuration and hardware resources.
Q: What happens if Logstash fails during processing?
A: Logstash provides persistent queues that store events to disk, ensuring data durability. Dead letter queues handle events that fail processing, preventing data loss and enabling error investigation.
Q: Is Logstash suitable for real-time processing?
A: Logstash processes data in near real-time with minimal latency. While not designed for microsecond-level processing, it handles streaming data effectively for most monitoring and analytics use cases.
Q: How do I monitor Logstash performance?
A: Logstash provides monitoring APIs and integrates with Elasticsearch monitoring. Key metrics include pipeline throughput, queue sizes, CPU usage, and processing latency.
Additional Resources & Next Steps
Learn More
- Getting started with Logstash
- Installing Logstash
- Stashing Your First Event
- Parsing Logs with Logstash
- Stitching Together Multiple Input and Output Plugins
- How Logstash Works
- Setting up and running Logstash
- Working with plugins
Related Articles
- A Practical Introduction to Logstash
- Introducing the Logstash HTTP input plugin
- Introducing Multiple Pipelines in Logstash
- Logstash Persistent Queue
- You know, for visualizing your Logstash pipelines