Skip to content

Demo Client

Source Code

go
// Copyright The OpenTelemetry Authors
// Modifications copyright (C) 2022 Cisco Systems, Inc.
// Link to original file
// https://github.com/open-telemetry/opentelemetry-go-contrib/blob/v0.14.0/instrumentation/net/http/otelhttp/example/client/client.go
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//       http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Sample contains a simple client that periodically makes a simple http request
// to a server and exports to the OpenTelemetry service.
package main

import (
    "context"
    "fmt"
    "log"
    "math/rand"
    "net/http"
    "os"
    "time"

    "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
    "go.opentelemetry.io/otel"
    "go.opentelemetry.io/otel/baggage"
    "go.opentelemetry.io/otel/exporters/otlp/otlptrace"
    "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
    "go.opentelemetry.io/otel/propagation"
    "go.opentelemetry.io/otel/sdk/resource"
    sdktrace "go.opentelemetry.io/otel/sdk/trace"
    semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
    "google.golang.org/grpc"
)

// Initializes an OTLP exporter, and configures the corresponding trace and
// metric providers.
func initProvider() func() {
    ctx := context.Background()

    res, err := resource.New(ctx,
        resource.WithFromEnv(),
        resource.WithProcess(),
        resource.WithTelemetrySDK(),
        resource.WithHost(),
        resource.WithAttributes(
            // the service name used to display traces in backends
            semconv.ServiceNameKey.String("demo-client"),
        ),
    )
    handleErr(err, "failed to create resource")

    otelAgentAddr, ok := os.LookupEnv("OTEL_EXPORTER_OTLP_ENDPOINT")
    if !ok {
        otelAgentAddr = "0.0.0.0:4317"
    }

    traceClient := otlptracegrpc.NewClient(
        otlptracegrpc.WithInsecure(),
        otlptracegrpc.WithEndpoint(otelAgentAddr),
        otlptracegrpc.WithDialOption(grpc.WithBlock()))
    sctx, cancel := context.WithTimeout(ctx, time.Second)
    defer cancel()
    traceExp, err := otlptrace.New(sctx, traceClient)
    handleErr(err, "Failed to create the collector trace exporter")

    bsp := sdktrace.NewBatchSpanProcessor(traceExp)
    tracerProvider := sdktrace.NewTracerProvider(
        sdktrace.WithSampler(sdktrace.AlwaysSample()),
        sdktrace.WithResource(res),
        sdktrace.WithSpanProcessor(bsp),
    )

    // set global propagator to tracecontext (the default is no-op).
    otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}))
    otel.SetTracerProvider(tracerProvider)

    return func() {
        cxt, cancel := context.WithTimeout(ctx, time.Second)
        defer cancel()
        if err := traceExp.Shutdown(cxt); err != nil {
            otel.Handle(err)
        }
    }
}

func handleErr(err error, message string) {
    if err != nil {
        log.Fatalf("%s: %v", message, err)
    }
}

func main() {
    shutdown := initProvider()
    defer shutdown()

    tracer := otel.Tracer("demo-client-tracer")

    method, _ := baggage.NewMember("method", "repl")
    client, _ := baggage.NewMember("client", "cli")
    bag, _ := baggage.New(method, client)

    defaultCtx := baggage.ContextWithBaggage(context.Background(), bag)
    rng := rand.New(rand.NewSource(time.Now().UnixNano()))
    for {
        startTime := time.Now()
        ctx, span := tracer.Start(defaultCtx, "ExecuteRequest")
        makeRequest(ctx)
        span.End()
        latencyMs := float64(time.Since(startTime)) / 1e6
        nr := int(rng.Int31n(7))
        for i := 0; i < nr; i++ {
            randLineLength := rng.Int63n(999)
            fmt.Printf("#%d: LineLength: %dBy\n", i, randLineLength)
        }

        fmt.Printf("Latency: %.3fms\n", latencyMs)
        time.Sleep(time.Duration(1) * time.Second)
    }
}

func makeRequest(ctx context.Context) {

    demoServerAddr, ok := os.LookupEnv("DEMO_SERVER_ENDPOINT")
    if !ok {
        demoServerAddr = "http://0.0.0.0:7080/hello"
    }

    // Trace an HTTP client by wrapping the transport
    client := http.Client{
        Transport: otelhttp.NewTransport(http.DefaultTransport),
    }

    // Make sure we pass the context to the request to avoid broken traces.
    req, err := http.NewRequestWithContext(ctx, "GET", demoServerAddr, nil)
    if err != nil {
        handleErr(err, "failed to http request")
    }

    // All requests made with this client will create spans.
    res, err := client.Do(req)
    if err != nil {
        panic(err)
    }
    res.Body.Close()
}

Init Provider

Create Resource

构造一个 Resource(资源),用于描述应用的上下文信息。

go
res, err := resource.New(ctx,
    resource.WithFromEnv(),
    resource.WithProcess(),
    resource.WithTelemetrySDK(),
    resource.WithHost(),
    resource.WithAttributes(
        semconv.ServiceNameKey.String("demo-client"),
    ),
)
  • 从环境变量读取配置。
  • 包括进程信息(如 PID)。
  • SDK 版本信息。
  • 主机名等主机信息。
  • 自定义属性:服务名为 "demo-client",会出现在后端展示中。

Get OTEL Collector Endpoint

读取环境变量 OTEL_EXPORTER_OTLP_ENDPOINT

go
otelAgentAddr, ok := os.LookupEnv("OTEL_EXPORTER_OTLP_ENDPOINT")
if !ok {
    otelAgentAddr = "0.0.0.0:4317"
}

Create OTLP Trace Exporter

构建一个 gRPC 导出器客户端,用于将 Trace 数据发送到 Collector。

go
traceClient := otlptracegrpc.NewClient(
    otlptracegrpc.WithInsecure(),
    otlptracegrpc.WithEndpoint(otelAgentAddr),
    otlptracegrpc.WithDialOption(grpc.WithBlock()))

sctx, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
traceExp, err := otlptrace.New(sctx, traceClient)
handleErr(err, "Failed to create the collector trace exporter")
  • WithInsecure():不使用 TLS(开发测试环境用)。
  • WithBlock():连接过程中会阻塞,直到连接成功。

Create and Set TracerProvider

go
bsp := sdktrace.NewBatchSpanProcessor(traceExp)
tracerProvider := sdktrace.NewTracerProvider(
    sdktrace.WithSampler(sdktrace.AlwaysSample()),
    sdktrace.WithResource(res),
    sdktrace.WithSpanProcessor(bsp),
)

otel.SetTracerProvider(tracerProvider)
  • 使用 BatchSpanProcessor 批量处理和导出 spans。
  • 采样策略为 AlwaysSample():即每个 span 都采集。
  • 挂载上面创建的 Resource。
  • 设置全局的 TracerProvider

Set Global Propagator

设置全局的传播器,定义如何在 HTTP 请求之间传播 Trace 信息和上下文(如 Baggage)。

go
// set global propagator to tracecontext (the default is no-op).
// 默认传播器是 no-op,不会传任何 trace/baggage 信息
otel.SetTextMapPropagator(
    propagation.NewCompositeTextMapPropagator(
        propagation.TraceContext{}, propagation.Baggage{},
    ),
)
  • otel.SetTextMapPropagator

    传播器会决定如何从 HTTP 头(或其他 carrier)中提取 trace 和 baggage 信息,并注入到上下文中。

  • propagation.NewCompositeTextMapPropagator

    创建一个"组合传播器",可以同时支持多个传播格式。 它接受多个实现 TextMapPropagator 接口的组件,按顺序依次处理。

  • propagation.TraceContext{}

    实现 W3C 的 Trace Context 规范,使用 traceparenttracestate 两个 HTTP 头来传递 trace 信息。 如果请求的头部包含了 traceparent,那响应端创建的 span 是子 span,否则是根 span。

    Example Header
    txt
    traceparent: 00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01
  • propagation.Baggage{}

    实现 W3C 的 Baggage 规范,用于传递自定义键值对。

    Example Header
    txt
    baggage: user.id=abc123,client=ios

Return Clean Func

关闭导出器,确保 Trace 数据 flush 完毕。

go
return func() {
    cxt, cancel := context.WithTimeout(ctx, time.Second)
    if err := traceExp.Shutdown(cxt); err != nil {
        otel.Handle(err)
    }
}

Create Baggage

创建跨服务之间传递的上下文信息。

go
method, _ := baggage.NewMember("method", "repl")
client, _ := baggage.NewMember("client", "cli")
bag, _ := baggage.New(method, client)

defaultCtx := baggage.ContextWithBaggage(context.Background(), bag)
  • 创建 method=replclient=cli 的 baggage。
  • 注入到 defaultCtx 中。
  • 请求头将包含:baggage: method=repl,client=cli

Tracer Start

tracer.Start 会创建一个 span。 如果 defaultCtx 包含一个 Span,那么新创建的 Span 将成为该 span 的子 span,否则它将是一个根 span

go
ctx, span := tracer.Start(defaultCtx, "ExecuteRequest")
makeRequest(ctx)
span.End()

Custom Http Transport

makeRequest
go
// Trace an HTTP client by wrapping the transport
client := http.Client{
    Transport: otelhttp.NewTransport(http.DefaultTransport),
}

Transport 参数指定了 HTTP 客户端执行 HTTP 请求时使用的底层传输机制,。

  • 默认的 HTTP 传输实现是 http.DefaultTransport
  • 它实现了 http.RoundTripper 接口,会在发起请求的时候被调用

Create HTTP Request

go
// Make sure we pass the context to the request to avoid broken traces.
req, err := http.NewRequestWithContext(ctx, "GET", demoServerAddr, nil)
if err != nil {
    handleErr(err, "failed to http request")
}
  • 创建一个带有 trace 信息的 req,ctx 中包含了 span

Send Request

go
// All requests made with this client will create spans.
res, err := client.Do(req)
  • client.Send 方法会调用 send(req, c.transport(), deadline)
  • c.transport() 会获取上面的 otelhttp.NewTransport
  • otelhttp.NewTransport 会调用 RoundTripper 创建 span
net > http > client.go
go
// send issues an HTTP request.
// Caller should close resp.Body when done reading from it.
func send(ireq *Request, rt RoundTripper, deadline time.Time) (resp *Response, didTimeout func() bool, err error) {
    // ...
    resp, err = rt.RoundTrip(req)
    // ...
}
otelhttp@0.37.0 > transport.go
go
// RoundTrip creates a Span and propagates its context via the provided request's headers
// before handing the request to the configured base RoundTripper. The created span will
// end when the response body is closed or when a read from the body returns io.EOF.
func (t *Transport) RoundTrip(r *http.Request) (*http.Response, error) {
    // ...
    ctx, span := tracer.Start(r.Context(), t.spanNameFormatter("", r), opts...)
    // ...
}