第六章:客户端 SDK

学习如何在应用中使用 Loki 客户端库直接发送日志。

最后更新: 2024-01-20
页面目录

Loki 客户端 SDK

Loki 提供多种语言的客户端库,可以直接从应用程序发送日志。

Go 客户端 (Gokrazy)

安装

go get github.com/grafana/loki/clients/pkg/promtail/client

基本使用

package main

import (
    "log"
    "time"

    "github.com/grafana/loki/clients/pkg/promtail/client"
)

func main() {
    // 创建客户端
    cfg := client.Config{
        URL:       "http://loki:3100/loki/api/v1/push",
        Timeout:   10 * time.Second,
        BatchWait: 1 * time.Second,
        BatchSize: 100 * 1024,
    }

    c, err := client.New(cfg)
    if err != nil {
        log.Fatalf("Failed to create client: %v", err)
    }
    defer c.Stop()

    // 发送日志
    c.Handle("my-app", "info", "Application started")

    // 带标签的日志
    c.WithLabels("service", "api", "env", "prod").Handle("", "debug", "Debug message")

    // 格式化日志
    c.WithLabel("trace_id", "abc123").Handle("", "info", "Processing request")
}

完整示例

package main

import (
    "context"
    "log"
    "time"

    "github.com/grafana/loki/clients/pkg/promtail/client"
    "github.com/grafana/loki/clients/pkg/promtail/client/fake"
)

func main() {
    // 生产环境客户端
    prodClient()

    // 测试环境客户端
    testClient()
}

func prodClient() {
    cfg := client.Config{
        URL:       "http://loki:3100/loki/api/v1/push",
        Timeout:   10 * time.Second,
        BatchWait: 1 * time.Second,
        BatchSize: 100 * 1024,
        BackoffConfig: client.BackoffConfig{
            MinPeriod: 500 * time.Millisecond,
            MaxPeriod: 5 * time.Minute,
            MaxRetries: 20,
        },
    }

    c, err := client.New(cfg)
    if err != nil {
        log.Fatalf("Failed to create client: %v", err)
    }
    defer c.Stop()

    // 发送日志流
    for i := 0; i < 100; i++ {
        c.WithLabel("service", "api").
            WithLabel("endpoint", "/users").
            Handle("info", "Processing user request %d", i)
    }
}

func testClient() {
    // 使用 fake 客户端用于测试
    f := fake.New(nil)

    c, err := client.New(client.Config{
        URL: "http://loki:3100/loki/api/v1/push",
    }, client.WithInstantiator(f))
    if err != nil {
        log.Fatalf("Failed to create client: %v", err)
    }

    // 验证日志
    c.Handle("test", "info", "Test message")

    // 检查接收的日志
    entries := f.Received()
    log.Printf("Received %d entries", len(entries))
}

Python 客户端 (loki-logback-appender)

安装

pip install python-logging-loki

基本使用

import logging
from logging_loki import LokiHandler

# 创建 Loki Handler
handler = LokiHandler(
    url="http://loki:3100/loki/api/v1/push",
    tags={"service": "myapp", "env": "production"},
    auth=("admin", "admin"),
)

# 创建 logger
logger = logging.getLogger("myapp")
logger.addHandler(handler)
logger.setLevel(logging.INFO)

# 发送日志
logger.info("Application started")
logger.warning("High memory usage: {memory}%", extra={"memory": 85})
logger.error("Connection failed", exc_info=True)

异步客户端

import logging
from logging_loki import LokiHandler, LokiQueueHandler

# 创建异步 Handler
handler = LokiHandler(
    url="http://loki:3100/loki/api/v1/push",
    tags={"service": "async-app"},
)

# 使用队列处理器
queue_handler = LokiQueueHandler(handler)
logger = logging.getLogger("async-app")
logger.addHandler(queue_handler)
logger.setLevel(logging.INFO)

# 发送日志
for i in range(1000):
    logger.info(f"Processing item {i}")

完整示例

import logging
from logging_loki import LokiHandler

# 创建自定义格式的 Handler
class CustomLokiHandler(LokiHandler):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.defaults = {
            "service": "my-service",
            "version": "1.0.0",
        }

    def formatRecord(self, record):
        formatted = super().formatRecord(record)
        formatted["service"] = self.defaults["service"]
        formatted["version"] = self.defaults["version"]
        return formatted

# 使用
logger = logging.getLogger("custom-app")
logger.addHandler(CustomLokiHandler(
    url="http://loki:3100/loki/api/v1/push",
))
logger.setLevel(logging.INFO)

# 发送带额外字段的日志
logger.info(
    "User action",
    extra={
        "user_id": 12345,
        "action": "login",
        "ip": "192.168.1.1"
    }
)

Java 客户端 (logback-loki-appender)

Maven 依赖

<dependency>
    <groupId>com.github.loki4j</groupId>
    <artifactId>logback-loki-appender</artifactId>
    <version>1.4.1</version>
</dependency>

Logback 配置

<!-- logback.xml -->
<configuration>
    <!-- Loki Appender -->
    <appender name="LOKI" class="com.github.loki4j.logback.JavaHttpAppender">
        <label>
            <name>service</name>
            <value>my-spring-app</value>
        </label>
        <label>
            <name>environment</name>
            <value>production</value>
        </label>
        <url>http://loki:3100/loki/api/v1/push</url>
        <encoder>
            <pattern>%-5level %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %logger{36} - %msg%n</pattern>
            <immediateFlush>true</immediateFlush>
        </encoder>
        <buffering>
            <enabled>true</enabled>
            <maxBufferSize>1048576</maxBufferSize>
            <maxBufferItems>500</maxBufferItems>
            <flushInterval>2000</flushInterval>
        </buffering>
    </appender>

    <!-- Async Appender -->
    <appender name="ASYNC_LOKI" class="ch.qos.logback.classic.AsyncAppender">
        <appender-ref ref="LOKI"/>
        <queueSize>1024</queueSize>
        <discardingThreshold>0</discardingThreshold>
        <includeCallerData>false</includeCallerData>
    </appender>

    <root level="INFO">
        <appender-ref ref="ASYNC_LOKI"/>
    </root>

    <logger name="com.myapp" level="DEBUG" additivity="false">
        <appender-ref ref="ASYNC_LOKI"/>
    </logger>
</configuration>

Java 代码使用

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MyService {
    private static final Logger logger = LoggerFactory.getLogger(MyService.class);

    public void doSomething() {
        logger.info("Processing request");
        logger.warn("High memory usage detected");
        logger.error("Failed to connect to database", new Exception("Connection timeout"));
    }
}

JavaScript/TypeScript 客户端

安装

npm install @ grafana/loki

基本使用

import { LokiBatchStream, LokiOptions } from '@grafana/loki';

const options: LokiOptions = {
    url: 'http://loki:3100/loki/api/v1/push',
    timeout: 10000,
    batchSize: 100,
    batchWait: 1000,
};

const stream = new LokiBatchStream(options);

// 发送日志
stream.push({
    labels: '{service="my-app", env="prod"}',
    timestamp: Date.now() * 1000000,
    log: 'Application started',
});

// 发送多条日志
stream.push([
    {
        labels: '{service="my-app"}',
        timestamp: Date.now() * 1000000,
        log: 'Processing request 1',
    },
    {
        labels: '{service="my-app"}',
        timestamp: (Date.now() + 1) * 1000000,
        log: 'Processing request 2',
    },
]);

// 等待发送
await stream.flush();

Promtail HTTP API

const fetch = require('node-fetch');

async function sendToLoki(logs) {
    const response = await fetch('http://loki:3100/loki/api/v1/push', {
        method: 'POST',
        headers: {
            'Content-Type': 'application/json',
        },
        body: JSON.stringify({
            streams: logs.map(log => ({
                stream: {
                    service: log.service,
                    env: log.env,
                },
                values: [
                    [
                        String(Date.now() * 1000000),
                        log.message
                    ]
                ]
            }))
        })
    });

    return response.ok;
}

// 使用
sendToLoki([
    {
        service: 'api',
        env: 'prod',
        message: 'Request processed successfully'
    }
]);

Rust 客户端

Cargo.toml

[dependencies]
loki-logback = "0.1"
tokio = { version = "1", features = ["full"] }

基本使用

use loki_logback::{Builder, Level, Loki};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let loki = Builder::new("http://loki:3100/loki/api/v1/push")
        .with_label("service", "my-service")
        .with_label("env", "production")
        .build()?;

    loki.log(
        Level::Info,
        "Application started",
        &["my-app", "startup"],
    ).await?;

    loki.log(
        Level::Warn,
        "High CPU usage detected",
        &["my-app", "performance"],
    ).await?;

    Ok(())
}

最佳实践

1. 批量发送

// ✅ 推荐:批量发送
c.Handle("service", "info", "Batch log message")

// ❌ 避免:逐条发送
for _, msg := range messages {
    sendToLoki(msg)
}

2. 添加适当标签

# ✅ 推荐:添加有意义的标签
logger.info("User logged in", extra={
    "user_id": user.id,
    "method": "oauth"
})

# ❌ 避免:过多标签
logger.info("Message", extra={
    "tag1": "value1",
    "tag2": "value2",
    # ... 20+ 标签
})

3. 使用异步发送

// ✅ 推荐:异步 Appender
<appender name="ASYNC_LOKI" class="ch.qos.logback.classic.AsyncAppender">
    <appender-ref ref="LOKI"/>
</appender>

// ❌ 避免:同步发送(阻塞应用)

下一步

接下来让我们学习 LogQL 查询语言。

👉 查询语言