Real-time insights: Telemetry Pipeline

WIP
Tangential topic
Ponder on
Alternatives

0. Overview

0.1. Architecture

A telemetry pipeline is a system that collects, ingests, processes, stores, and analyzes telemetry data (metrics, logs, traces) from various sources in real-time or near real-time to provide insights into the performance and health of applications and infrastructure.

Figure 0: Barebone Telemetry Pipeline Architecture

It typically involves tools like Telegraf for data collection, Kafka for ingestion, Flink for processing, and Cassandra and VictoriaMetrics for storage and analysis.

Figure 1: Detailed Telemetry Pipeline Architecture


0.2. Stages
  • Collection: Telemetry data is collected from various sources using agents like Telegraf and Fluentd.

  • Ingestion: Data is ingested through message brokers such as Apache Kafka or Kinesis to handle high throughput.

  • Processing: Real-time processing is done using stream processing frameworks like Apache Flink for filtering, aggregating, and enriching data.

  • Storage and Analysis: Processed data is stored in systems like Cassandra, ClickHouse and Elasticsearch, and analyzed using tools like Grafana and Kibana for visualization and alerting.


1. Collection

1.1. Collection Agent

To start, we'll use Telegraf, a versatile open-source agent that collects metrics from various sources and writes them to different outputs. Telegraf supports a wide range of input and output plugins, making it easy to gather data from sensors, servers, GPS systems, and more.

Figure 2: Telegraf for collecting metrics & data

For this example, we'll focus on collecting the CPU temperature and Fan speed from a macOS system using the exec plugin in Telegraf. And leverage the osx-cpu-temp command line tool to fetch the CPU temperature.

🌵 Inlets allows devices behind firewalls or NAT to securely expose local services to the public internet by tunneling traffic through a public-facing Inlets server


1.2. Dependencies

1.3. Events

Here's a custom script to get the CPU and Fan Speed:

#!/bin/bash
timestamp=$(date +%s)000000000
hostname=$(hostname | tr "[:upper:]" "[:lower:]")
cpu=$(osx-cpu-temp -c | sed -e 's/\([0-9.]*\).*/\1/')
fans=$(osx-cpu-temp -f | grep '^Fan' | sed -e 's/^Fan \([0-9]\) - \([a-zA-Z]*\) side *at \([0-9]*\) RPM (\([0-9]*\)%).*/\1,\2,\3,\4/')
echo "cpu_temp,device_id=$hostname temp=$cpu $timestamp"
for f in $fans; do
  side=$(echo "$f" | cut -d, -f2 | tr "[:upper:]" "[:lower:]")
  rpm=$(echo "$f" | cut -d, -f3)
  pct=$(echo "$f" | cut -d, -f4)
  echo "fan_speed,device_id=$hostname,side=$side rpm=$rpm,percent=$pct $timestamp"
done

Output Format: measurement,host=foo,tag=measure val1=5,val2=3234.34 1609459200000000000

  • The output is of Line protocol syntax

  • Where measurement is the “table” (“measurement" in InfluxDB terms) to which the metrics are written.

  • host=foo,tag=measure are tags to can group and filter by.

  • val1=5,val2=3234.34 are values, to display in graphs.

  • 1716425990000000000 is the current unix timestamp + 9 x "0" — representing nanosecond timestamp.

Sample Output: cpu_temp,device_id=adeshs-mbp temp=0.0 1716425990000000000


1.4. Configuration

The location of telegraf.conf installed using homebrew: /opt/homebrew/etc/telegraf.conf

Telegraf's configuration file is written using TOML and is composed of three sections: global tags, agent settings, and plugins (inputs, outputs, processors, and aggregators).

Once Telegraf collects the data, we need to transmit it to a designated endpoint for further processing. For this, we'll use the HTTP output plugin in Telegraf to send the data in JSON format to a Flask application (covered in the next section).

Below is what the telegraf.conf file looks like, with exec input plugin (format: influx) and HTTP output plugin (format: JSON).

[agent]
  interval = "10s"
  round_interval = true
  metric_buffer_limit = 10000
  flush_buffer_when_full = true
  collection_jitter = "0s"
  flush_interval = "10s"
  flush_jitter = "0s"
  precision = ""
  debug = false
  quiet = false
  logfile = "/path to telegraf log/telegraf.log"
  hostname = "host"
  omit_hostname = false

[[inputs.exec]]
  commands = ["/path to custom script/osx_metrics.sh"]
  timeout = "5s"
  name_suffix = "_custom"
  data_format = "influx"
  interval = "10s"

[[outputs.http]]
  url = "http://127.0.0.1:5000/metrics"
  method = "POST"
  timeout = "5s"
  data_format = "json"
  [outputs.http.headers]
    Content-Type = "application/json"

Edit telegraf.conf (use above config):
vi /opt/homebrew/etc/telegraf.conf

🚧: Don't forget to expore tons of other input and output plugins: docs.influxdata.com/telegraf/v1/plugins


1.5. Start Capture

Run telegraf (when installed from Homebrew):
/opt/homebrew/opt/telegraf/bin/telegraf -config /opt/homebrew/etc/telegraf.conf


2. Ingestion

2.1. Telemetry Server

The telemetry server layer is designed to be lightweight. Its primary function is to authenticate incoming requests and publish raw events directly to Message Broker/Kafka. Further processing of these events will be carried out by the stream processing framework.

For our example, the Flask application serves as the telemetry server, acting as the entry point (via load-balancer) for the requests. It receives the data from a POST request, validates it, and publishes the messages to a Kafka topic.

Topic partition is the unit of parallelism in Kafka. Choose a partition key (ex: client_id) that evenly distributes records to avoid hotspots and number of partitions to achieve good throughput.

🚧 Message Broker Alternatives: Amazon Kinesis, Redpanda


2.2. Dependencies
  • Using PIP: pip3 install Flask flask-cors kafka-python

  • For Local Kafka Set-up (Or use Docker from next sub-section):
  • Using Homebrew: brew install kafka
    Refer: Homebrew Kafka

    Start Zookeeper: zookeeper-server-start /opt/homebrew/etc/kafka/zookeeper.properties
    Start Kafka: brew services restart kafka

    Create Topic: kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic learn
    Usage: Kafka CLI


2.3. Docker Compose

To set up Kafka using Docker Compose, ensure Docker is installed on your machine by following the instructions on the Docker installation page. Once Docker is installed, create a docker-compose.yml for Kafka and Zookeeper:

version: '3.7'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.3.5
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
    ports:
      - "2181:2181"

  kafka:
    image: confluentinc/cp-kafka:7.3.5
    ports:
      - "9092:9092"  # Internal port
      - "9094:9094"  # External port
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:9092,OUTSIDE://localhost:9094
      KAFKA_LISTENERS: INTERNAL://0.0.0.0:9092,OUTSIDE://0.0.0.0:9094
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      CONFLUENT_SUPPORT_METRICS_ENABLE: "false"
    depends_on:
      - zookeeper

  kafka-topics-creator:
    image: confluentinc/cp-kafka:7.3.5
    depends_on:
      - kafka
    entrypoint: ["/bin/sh", "-c"]
    command: |
      "
      # blocks until kafka is reachable
      kafka-topics --bootstrap-server kafka:9092 --list

      echo -e 'Creating kafka topics'
      kafka-topics --bootstrap-server kafka:9092 --create --if-not-exists --topic raw-events --replication-factor 1 --partitions 1

      echo -e 'Successfully created the following topics:'
      kafka-topics --bootstrap-server kafka:9092 --list
      "

  schema-registry:
    image: confluentinc/cp-schema-registry:7.3.5
    environment:
      - SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=zookeeper:2181
      - SCHEMA_REGISTRY_HOST_NAME=schema-registry
      - SCHEMA_REGISTRY_LISTENERS=http://schema-registry:8085,http://localhost:8085
    ports:
      - 8085:8085
    depends_on: [zookeeper, kafka]

Run docker-compose up to start the services (Kafka + Zookeeper).


2.4. Start Server

The Flask application includes a /metrics endpoint, as configured in telegraf.conf output to collect metrics. When data is sent to this endpoint, the Flask app receives and publishes the message to Kafka.

New to Flask? Refer: Flask Quickstart

import os
from flask_cors import CORS
from flask import Flask, jsonify, request
from dotenv import load_dotenv
from kafka import KafkaProducer
import json


app = Flask(__name__)
cors = CORS(app)
load_dotenv()

producer = KafkaProducer(bootstrap_servers='localhost:9094', 
                         value_serializer=lambda v: json.dumps(v).encode('utf-8'))

@app.route('/metrics', methods=['POST'])
def process_metrics():
    data = request.get_json()
    print(data)
    producer.send('raw-events', data)
    return jsonify({'status': 'success'}), 200


if __name__ == "__main__":
    app.run(debug=True, host="0.0.0.0", port=int(os.environ.get("PORT", 8080)))

Start all services 🚀:

  • Run Flask App (Telemetry Server):
    flask run

  • Ensure telegraf is running (Refer: Section 1.5)


3. Processing

3.1. Stream Processor

The Stream Processor is responsible for data transformation, enrichment, stateful computations/updates over unbounded (push-model) and bounded (pull-model) data streams and sink enriched and transformed data to various data stores or applications. Key Features to Look for in a Stream Processing Framework:

  • Scalability and Performance: Scale by adding nodes, efficiently use resources, process data with minimal delay, and handle large volumes

  • Fault Tolerance and Data Consistency: Ensure fault tolerance with state saving for failure recovery and exactly-once processing.

  • Ease of Use and Community Support: Provide user-friendly APIs in multiple languages, comprehensive documentation, and active community support.

Figure 3: Stateful Stream Processing

  • Integration and Compatibility: Seamlessly integrate with various data sources and sinks, and be compatible with other tools in your tech stack.

  • Windowing and Event Time Processing: Support various windowing strategies (tumbling, sliding, session) and manage late-arriving data based on event timestamps.

  • Security and Monitoring: Include security features like data encryption and robust access controls, and provide tools for monitoring performance and logging.

Although I have set the context to use Flink for this example;
☢️ Note: While Apache Flink is a powerful choice for stream processing due to its rich feature set, scalability, and advanced capabilities, it can be overkill for a lot of use cases, particularly those with simpler requirements and/or lower data volumes.

🚧 Open Source Alternatives: Apache Kafka Streams, Apache Storm, Apache Samza


3.2. Dependencies

3.3. Docker Compose
  • Create flink_init/Dockerfile file for Flink and Kafka Connector:

    FROM flink:1.18.1-scala_2.12
    
    RUN wget -P /opt/flink/lib https://repo.maven.apache.org/maven2/org/apache/flink/flink-connector-kafka/3.1.0-1.18/flink-connector-kafka-3.1.0-1.18.jar
    
    RUN chown -R flink:flink /opt/flink/lib
    
  • Add Flink to docker-compose.yml (in-addition to Kafka, from Section 2.3)

    version: '3.8'
    services:
      jobmanager:
        build: flink_init/.
        ports:
          - "8081:8081"
        command: jobmanager
        environment:
          - |
            FLINK_PROPERTIES=
            jobmanager.rpc.address: jobmanager
    
      taskmanager:
        build: flink_init/.
        depends_on:
          - jobmanager
        command: taskmanager
        environment:
          - |
            FLINK_PROPERTIES=
            jobmanager.rpc.address: jobmanager
            taskmanager.numberOfTaskSlots: 2
    
  • Run docker-compose up to start the services (Kafka + Zookeeper, Flink).


3.4. Start Cluster

⚠️ PyFlink Job:

Start all services 🚀:

  • Ensure all the services are running (Refer: Section 1.5, 2.4, 3.3)


4. Storage and Analysis

The code snippets - stops here! The rest of the post covers key conventions, strategies, and factors for selecting the right data store, performing real-time analytics, and alerts.

4.1. Datastore

When choosing the right database for telemetry data, it's crucial to consider several factors:

  • Read and Write Patterns: Understanding the frequency and volume of read and write operations is key. High write and read throughput require different database optimizations and consistencies.

  • Data Amplification: Be mindful of how the data volume might grow over time (+Write Amplification) and how the database handles this increase without significant performance degradation.

  • Cost: Evaluate the cost implications, including storage, processing, and any associated services.

  • Analytics Use Cases: Determine whether the primary need is for real-time analytics, historical data analysis, or both.

  • Transactions: Consider the nature and complexity of transactions that will be performed. For example: Batch write transactions

  • Read and Write Consistency: Decide on the level of consistency required for the application. For example, OLTP (Online Transaction Processing) systems prioritize consistency and transaction integrity, while OLAP (Online Analytical Processing) systems are optimized for complex queries and read-heavy workloads.

🌵 LSM-Tree favors write-intensive applications.


For example, to decide between Row-based vs Columar Storage. Or OLTP (Online Transaction Processing), OLAP (Online Analytical Processing), or a Hybrid approach:

Figure 4: Row vs Columnnar Storage

  • Transactional and High Throughput Needs: For high write throughput and transactional batches (all or nothing), with queries needing wide-column family fetches and indexed queries within the partition, Cassandra/ScyllaDB is better suited.

  • Complex Analytical Queries: For more complex analytical queries, aggregations on specific columns, and machine learning models, data store(s) such as ClickHouse or Druid is more appropriate. Its optimized columnar storage and powerful query capabilities make it ideal for handling large-scale analytical tasks. Several others include: VictoriaMetrics and InfluxDB (emphasis on time-series); closed-source: Snowflake, BigQuery and Redshift

  • Hybrid Approach: In scenarios requiring both fast write-heavy transactional processing and complex analytics, a common approach is to use Cassandra for real-time data ingestion and storage, and periodically perform ETL (Extract, Transform, Load) or CDC (Change Data Capture) processes to batch insert data into OLAP DB for analytical processing. This leverages the strengths of both databases, ensuring efficient data handling and comprehensive analytical capabilities. Proper indexing and data modeling goes unsaid 🧐

🌵 Debezium: Distributed platform for change data capture (more on previous post).


Using a HTAP (Hybrid Transactional/Analytical Processing) database that's suitable for both transactional and analytical workloads is worth considering. Example: TiDB, TimescaleDB (Kind of).

While you get some of the best from both worlds 🌎, you also inherit a few of the worst from each!
Lucky for you, I have first hand experience with it 🤭:

Figure 5: Detailed comparison of OLTP, OLAP and HTAP

Analogy: Choosing the right database is like picking the perfect ride. Need pay-as-you-go flexibility? Grab a taxi. Tackling heavy-duty tasks? 🚜 Bring in the bulldozer. For everyday use, 🚗 a Toyota fits. Bringing a war tank to a community center is overkill. Sometimes, you need a fleet—a car for daily use, and a truck for heavy loads.

☢️ InfluxDB: Stagnant contribution graph, Flux deprecation, but new benchmarks!


4.2. Partition and Indexes

Without getting into too much detail, it's crucial to choose the right partitioning strategy (Ex: Range, List, Hash) to ensure partitions don't bloat and effectively support primary read patterns (in this context, example: client_id + region + 1st Day of Month).

Figure 6: Types of Indexes and Materialized view

Following this, clustering columns and indexes help organize data within partitions to optimize range queries and sorting. Secondary indexes (within the partition/local or across partitions/global) are valuable for query patterns where partition or primary keys don't apply. Materialized views for precomputing and storing complex query results, speeding up read operations for frequently accessed data.

Figure 7: Partition Key, Clustering Keys, Local/Global Secondary Indexes and Materialized views

Multi-dimensional Index (Spatial/Spatio-temporal): Indexes such as B+ trees and LSM trees are not designed to directly store higher-dimensional data. Spatial indexing uses structures like R-trees and Quad-trees and techniques like geohash. Space-filling curves like Z-order (Morton) and Hilbert curves interleave spatial and temporal dimensions, preserving locality and enabling efficient queries.

Figure 8: Commonly Used: Types of Spatial Indexes

🌵 GeoMesa: spatio-temporal indexing on top of the Accumulo, HBase, Redis, Kafka, PostGIS and Cassandra. XZ-Ordering: Customizing Index Creation.

Next blog post is all about spatial indexes!


4.3. Analytics and Alerts

Typically, analytics are performed as batch queries on bounded datasets of recorded events, requiring reruns to incorporate new data.

Figure 9: Analytics on Static, Relative and In-Motion Data

In contrast, streaming queries ingest real-time event streams, continuously updating results as events are consumed, with outputs either written to an external database or maintained as internal state.

Figure 10: Batch Analytics vs Stream Analytics

Feature Batch Analytics Stream Analytics
Data Processing Processes large volumes of stored data Processes data in real-time as it arrives
Result Latency Produces results with some delay; near real-time results with frequent query runs Provides immediate insights and actions
Resource Efficiency Requires querying the database often for necessary data Continuously updates results in transient data stores without re-querying the database
Typical Use Ideal for historical analysis and periodic reporting Best for real-time monitoring, alerting, and dynamic applications
Complexity Handling Can handle complex queries and computations Less effective for highly complex queries
Backfill Easy to backfill historical data and re-run queries Backfill can potentially introduce complexity

🌵 Anomaly Detection and Remediation

🌵 MindsDB: Connect Data Source, Configure AI Engine, Create AI Tables, Query for predictions and Automate workflows.


5. References
1. Wikipedia, "Telemetry," available: https://en.wikipedia.org/wiki/Telemetry. [Accessed: June 5, 2024].
2. Apache Cassandra, "Cassandra," available: https://cassandra.apache.org. [Accessed: June 5, 2024].
3. VictoriaMetrics, "VictoriaMetrics," available: https://victoriametrics.com. [Accessed: June 6, 2024].
4. Fluentd, "Fluentd," available: https://www.fluentd.org. [Accessed: June 5, 2024].
5. Elasticsearch, "Elasticsearch," available: https://www.elastic.co. [Accessed: June 5, 2024].
6. InfluxData, "Telegraf," available: https://www.influxdata.com. [Accessed: June 5, 2024].
7. InfluxData, "Telegraf Plugins," available: https://docs.influxdata.com. [Accessed: June 5, 2024].
8. GitHub, "osx-cpu-temp," available: https://github.com/lavoiesl/osx-cpu-temp. [Accessed: June 5, 2024].
9. GitHub, "Inlets," available: https://github.com/inlets/inlets. [Accessed: June 5, 2024].
10. InfluxData, "Telegraf Installation," available: https://docs.influxdata.com/telegraf/v1. [Accessed: June 5, 2024].
11. InfluxData, "InfluxDB Line Protocol," available: https://docs.influxdata.com/influxdb/v1.8/write_protocols/line_protocol. [Accessed: June 5, 2024].
12. GitHub, "Telegraf Exec Plugin," available: https://github.com/influxdata/telegraf/tree/master/plugins/inputs/exec. [Accessed: June 5, 2024].
13. GitHub, "Telegraf Output Plugins," available: https://github.com/influxdata/telegraf/tree/master/plugins/outputs. [Accessed: June 5, 2024].
14. Pallets Projects, "Flask," available: https://flask.palletsprojects.com. [Accessed: June 5, 2024].
15. Apache Kafka, "Kafka," available: https://kafka.apache.org. [Accessed: June 5, 2024].
16. Confluent, "Kafka Partitions," available: https://www.confluent.io. [Accessed: June 5, 2024].
17. AWS, "Amazon Kinesis," available: https://aws.amazon.com/kinesis. [Accessed: June 5, 2024].
18. Redpanda, "Redpanda," available: https://redpanda.com. [Accessed: June 5, 2024].
19. Apache, "Apache Flink," available: https://flink.apache.org. [Accessed: June 6, 2024].
20. GitHub, "flink-python/pyflink/examples," available: https://github.com/apache/flink/tree/master/flink-python/pyflink/examples. [Accessed: June 6, 2024].
21. Apache, "Flink Download," available: https://www.apache.org/dyn/closer.lua/flink. [Accessed: June 6, 2024].
22. Apache, "Flink Kafka Connector," available: https://www.apache.org/dyn/closer.lua/flink/flink-connector-kafka-3.1.0. [Accessed: June 6, 2024].
23. Docker, "Docker Installation," available: https://docs.docker.com. [Accessed: June 6, 2024].
24. Apache Kafka, "Kafka CLI," available: https://kafka.apache.org/quickstart. [Accessed: June 6, 2024].
25. Homebrew, "Kafka Installation," available: https://formulae.brew.sh/formula/kafka. [Accessed: June 6, 2024].
26. Apache, "Apache Storm," available: https://storm.apache.org. [Accessed: June 6, 2024].
27. Apache, "Apache Samza," available: https://samza.apache.org. [Accessed: June 6, 2024].
28. ClickHouse, "ClickHouse," available: https://clickhouse.com. [Accessed: June 6, 2024].
29. InfluxData, "InfluxDB Benchmarks," available: https://www.influxdata.com/benchmarks. [Accessed: June 6, 2024].
30. TiDB, "TiDB," available: https://github.com/pingcap/tidb. [Accessed: June 6, 2024].
31. Timescale, "TimescaleDB," available: https://www.timescale.com. [Accessed: June 6, 2024].
32. MindsDB, "MindsDB," available: https://docs.mindsdb.com. [Accessed: June 6, 2024].
33. Wikipedia, "Write Amplification," available: https://en.wikipedia.org/wiki/Write_amplification. [Accessed: June 6, 2024].
34. GitHub, "LSM-Tree," available: https://tikv.github.io/deep-dive/introduction/theory/lsm-tree.html. [Accessed: June 6, 2024].

Cite this article as: Adesh Nalpet Adimurthy. (Jun 7, 2024). Real-time insights: Telemetry Pipeline. PyBlog. https://www.pyblog.xyz/telemetry-pipeline

#index table of contents