第六章:客户端 SDK
学习如何在应用中使用 Loki 客户端库直接发送日志。
最后更新: 2024-01-20
页面目录
Loki 客户端 SDK
Loki 提供多种语言的客户端库,可以直接从应用程序发送日志。
Go 客户端 (Gokrazy)
安装
go get github.com/grafana/loki/clients/pkg/promtail/client
基本使用
package main
import (
"log"
"time"
"github.com/grafana/loki/clients/pkg/promtail/client"
)
func main() {
// 创建客户端
cfg := client.Config{
URL: "http://loki:3100/loki/api/v1/push",
Timeout: 10 * time.Second,
BatchWait: 1 * time.Second,
BatchSize: 100 * 1024,
}
c, err := client.New(cfg)
if err != nil {
log.Fatalf("Failed to create client: %v", err)
}
defer c.Stop()
// 发送日志
c.Handle("my-app", "info", "Application started")
// 带标签的日志
c.WithLabels("service", "api", "env", "prod").Handle("", "debug", "Debug message")
// 格式化日志
c.WithLabel("trace_id", "abc123").Handle("", "info", "Processing request")
}
完整示例
package main
import (
"context"
"log"
"time"
"github.com/grafana/loki/clients/pkg/promtail/client"
"github.com/grafana/loki/clients/pkg/promtail/client/fake"
)
func main() {
// 生产环境客户端
prodClient()
// 测试环境客户端
testClient()
}
func prodClient() {
cfg := client.Config{
URL: "http://loki:3100/loki/api/v1/push",
Timeout: 10 * time.Second,
BatchWait: 1 * time.Second,
BatchSize: 100 * 1024,
BackoffConfig: client.BackoffConfig{
MinPeriod: 500 * time.Millisecond,
MaxPeriod: 5 * time.Minute,
MaxRetries: 20,
},
}
c, err := client.New(cfg)
if err != nil {
log.Fatalf("Failed to create client: %v", err)
}
defer c.Stop()
// 发送日志流
for i := 0; i < 100; i++ {
c.WithLabel("service", "api").
WithLabel("endpoint", "/users").
Handle("info", "Processing user request %d", i)
}
}
func testClient() {
// 使用 fake 客户端用于测试
f := fake.New(nil)
c, err := client.New(client.Config{
URL: "http://loki:3100/loki/api/v1/push",
}, client.WithInstantiator(f))
if err != nil {
log.Fatalf("Failed to create client: %v", err)
}
// 验证日志
c.Handle("test", "info", "Test message")
// 检查接收的日志
entries := f.Received()
log.Printf("Received %d entries", len(entries))
}
Python 客户端 (loki-logback-appender)
安装
pip install python-logging-loki
基本使用
import logging
from logging_loki import LokiHandler
# 创建 Loki Handler
handler = LokiHandler(
url="http://loki:3100/loki/api/v1/push",
tags={"service": "myapp", "env": "production"},
auth=("admin", "admin"),
)
# 创建 logger
logger = logging.getLogger("myapp")
logger.addHandler(handler)
logger.setLevel(logging.INFO)
# 发送日志
logger.info("Application started")
logger.warning("High memory usage: {memory}%", extra={"memory": 85})
logger.error("Connection failed", exc_info=True)
异步客户端
import logging
from logging_loki import LokiHandler, LokiQueueHandler
# 创建异步 Handler
handler = LokiHandler(
url="http://loki:3100/loki/api/v1/push",
tags={"service": "async-app"},
)
# 使用队列处理器
queue_handler = LokiQueueHandler(handler)
logger = logging.getLogger("async-app")
logger.addHandler(queue_handler)
logger.setLevel(logging.INFO)
# 发送日志
for i in range(1000):
logger.info(f"Processing item {i}")
完整示例
import logging
from logging_loki import LokiHandler
# 创建自定义格式的 Handler
class CustomLokiHandler(LokiHandler):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.defaults = {
"service": "my-service",
"version": "1.0.0",
}
def formatRecord(self, record):
formatted = super().formatRecord(record)
formatted["service"] = self.defaults["service"]
formatted["version"] = self.defaults["version"]
return formatted
# 使用
logger = logging.getLogger("custom-app")
logger.addHandler(CustomLokiHandler(
url="http://loki:3100/loki/api/v1/push",
))
logger.setLevel(logging.INFO)
# 发送带额外字段的日志
logger.info(
"User action",
extra={
"user_id": 12345,
"action": "login",
"ip": "192.168.1.1"
}
)
Java 客户端 (logback-loki-appender)
Maven 依赖
<dependency>
<groupId>com.github.loki4j</groupId>
<artifactId>logback-loki-appender</artifactId>
<version>1.4.1</version>
</dependency>
Logback 配置
<!-- logback.xml -->
<configuration>
<!-- Loki Appender -->
<appender name="LOKI" class="com.github.loki4j.logback.JavaHttpAppender">
<label>
<name>service</name>
<value>my-spring-app</value>
</label>
<label>
<name>environment</name>
<value>production</value>
</label>
<url>http://loki:3100/loki/api/v1/push</url>
<encoder>
<pattern>%-5level %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %logger{36} - %msg%n</pattern>
<immediateFlush>true</immediateFlush>
</encoder>
<buffering>
<enabled>true</enabled>
<maxBufferSize>1048576</maxBufferSize>
<maxBufferItems>500</maxBufferItems>
<flushInterval>2000</flushInterval>
</buffering>
</appender>
<!-- Async Appender -->
<appender name="ASYNC_LOKI" class="ch.qos.logback.classic.AsyncAppender">
<appender-ref ref="LOKI"/>
<queueSize>1024</queueSize>
<discardingThreshold>0</discardingThreshold>
<includeCallerData>false</includeCallerData>
</appender>
<root level="INFO">
<appender-ref ref="ASYNC_LOKI"/>
</root>
<logger name="com.myapp" level="DEBUG" additivity="false">
<appender-ref ref="ASYNC_LOKI"/>
</logger>
</configuration>
Java 代码使用
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MyService {
private static final Logger logger = LoggerFactory.getLogger(MyService.class);
public void doSomething() {
logger.info("Processing request");
logger.warn("High memory usage detected");
logger.error("Failed to connect to database", new Exception("Connection timeout"));
}
}
JavaScript/TypeScript 客户端
安装
npm install @ grafana/loki
基本使用
import { LokiBatchStream, LokiOptions } from '@grafana/loki';
const options: LokiOptions = {
url: 'http://loki:3100/loki/api/v1/push',
timeout: 10000,
batchSize: 100,
batchWait: 1000,
};
const stream = new LokiBatchStream(options);
// 发送日志
stream.push({
labels: '{service="my-app", env="prod"}',
timestamp: Date.now() * 1000000,
log: 'Application started',
});
// 发送多条日志
stream.push([
{
labels: '{service="my-app"}',
timestamp: Date.now() * 1000000,
log: 'Processing request 1',
},
{
labels: '{service="my-app"}',
timestamp: (Date.now() + 1) * 1000000,
log: 'Processing request 2',
},
]);
// 等待发送
await stream.flush();
Promtail HTTP API
const fetch = require('node-fetch');
async function sendToLoki(logs) {
const response = await fetch('http://loki:3100/loki/api/v1/push', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({
streams: logs.map(log => ({
stream: {
service: log.service,
env: log.env,
},
values: [
[
String(Date.now() * 1000000),
log.message
]
]
}))
})
});
return response.ok;
}
// 使用
sendToLoki([
{
service: 'api',
env: 'prod',
message: 'Request processed successfully'
}
]);
Rust 客户端
Cargo.toml
[dependencies]
loki-logback = "0.1"
tokio = { version = "1", features = ["full"] }
基本使用
use loki_logback::{Builder, Level, Loki};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let loki = Builder::new("http://loki:3100/loki/api/v1/push")
.with_label("service", "my-service")
.with_label("env", "production")
.build()?;
loki.log(
Level::Info,
"Application started",
&["my-app", "startup"],
).await?;
loki.log(
Level::Warn,
"High CPU usage detected",
&["my-app", "performance"],
).await?;
Ok(())
}
最佳实践
1. 批量发送
// ✅ 推荐:批量发送
c.Handle("service", "info", "Batch log message")
// ❌ 避免:逐条发送
for _, msg := range messages {
sendToLoki(msg)
}
2. 添加适当标签
# ✅ 推荐:添加有意义的标签
logger.info("User logged in", extra={
"user_id": user.id,
"method": "oauth"
})
# ❌ 避免:过多标签
logger.info("Message", extra={
"tag1": "value1",
"tag2": "value2",
# ... 20+ 标签
})
3. 使用异步发送
// ✅ 推荐:异步 Appender
<appender name="ASYNC_LOKI" class="ch.qos.logback.classic.AsyncAppender">
<appender-ref ref="LOKI"/>
</appender>
// ❌ 避免:同步发送(阻塞应用)
下一步
接下来让我们学习 LogQL 查询语言。
👉 查询语言