Skip to content

Demo Server

Source Code

go
// Copyright The OpenTelemetry Authors
// Modifications copyright (C) 2022 Cisco Systems, Inc.
// Link to original file
// https://github.com/open-telemetry/opentelemetry-go-contrib/blob/v0.14.0/instrumentation/net/http/otelhttp/example/server/server.go
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//       http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Sample contains a simple http server that exports to the OpenTelemetry agent.

package main

import (
    "context"
    "fmt"
    "log"
    "math/rand"
    "net/http"
    "os"
    "time"

    "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
    "go.opentelemetry.io/otel"
    "go.opentelemetry.io/otel/attribute"
    "go.opentelemetry.io/otel/baggage"
    "go.opentelemetry.io/otel/codes"
    "go.opentelemetry.io/otel/exporters/otlp/otlptrace"
    "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
    "go.opentelemetry.io/otel/propagation"
    "go.opentelemetry.io/otel/sdk/resource"
    sdktrace "go.opentelemetry.io/otel/sdk/trace"
    semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
    "go.opentelemetry.io/otel/trace"
    "google.golang.org/grpc"
)

var rng = rand.New(rand.NewSource(time.Now().UnixNano()))
var counter int
var errorProb int = 5

// Initializes an OTLP exporter, and configures the corresponding trace and
// metric providers.
func initProvider() func() {
    ctx := context.Background()

    res, err := resource.New(ctx,
        resource.WithFromEnv(),
        resource.WithProcess(),
        resource.WithTelemetrySDK(),
        resource.WithHost(),
        resource.WithAttributes(
            // the service name used to display traces in backends
            semconv.ServiceNameKey.String("demo-server"),
        ),
    )
    handleErr(err, "failed to create resource")
    otelAgentAddr, ok := os.LookupEnv("OTEL_EXPORTER_OTLP_ENDPOINT")
    if !ok {
        otelAgentAddr = "0.0.0.0:4317"
    }

    traceClient := otlptracegrpc.NewClient(
        otlptracegrpc.WithInsecure(),
        otlptracegrpc.WithEndpoint(otelAgentAddr),
        otlptracegrpc.WithDialOption(grpc.WithBlock()))
    traceExp, err := otlptrace.New(ctx, traceClient)
    handleErr(err, "Failed to create the collector trace exporter")

    bsp := sdktrace.NewBatchSpanProcessor(traceExp)
    tracerProvider := sdktrace.NewTracerProvider(
        sdktrace.WithSampler(sdktrace.AlwaysSample()),
        sdktrace.WithResource(res),
        sdktrace.WithSpanProcessor(bsp),
    )

    // set global propagator to tracecontext (the default is no-op).
    otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}))
    otel.SetTracerProvider(tracerProvider)

    return func() {
        cxt, cancel := context.WithTimeout(ctx, time.Second)
        defer cancel()
        if err := traceExp.Shutdown(cxt); err != nil {
            otel.Handle(err)
        }
    }
}

func handleErr(err error, message string) {
    if err != nil {
        log.Fatalf("%s: %v", message, err)
    }
}

func main() {
    shutdown := initProvider()
    defer shutdown()

    serverAttribute := attribute.String("server-attribute", "foo")

    // create a handler wrapped in OpenTelemetry instrumentation
    handler := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
        //  random sleep to simulate latency
        var sleep int64
        counter++

        switch modulus := time.Now().Unix() % 5; modulus {
        case 0:
            sleep = rng.Int63n(2000)
        case 1:
            sleep = rng.Int63n(15)
        case 2:
            sleep = rng.Int63n(917)
        case 3:
            sleep = rng.Int63n(87)
        case 4:
            sleep = rng.Int63n(1173)
        }
        time.Sleep(time.Duration(sleep) * time.Millisecond)
        ctx := req.Context()
        span := trace.SpanFromContext(ctx)
        bag := baggage.FromContext(ctx)

        // Sdding errors generation
        if counter%errorProb == 0 {
            defer span.End()
            serverErr := fmt.Errorf("Internal Server Error - Randomized for Sampling")
            http.Error(w, serverErr.Error(), http.StatusInternalServerError)
            span.RecordError(serverErr)
            span.SetStatus(codes.Error, serverErr.Error())
            return
        }

        var baggageAttributes []attribute.KeyValue
        baggageAttributes = append(baggageAttributes, serverAttribute)
        for _, member := range bag.Members() {
            baggageAttributes = append(baggageAttributes, attribute.String("baggage key:"+member.Key(), member.Value()))
        }
        span.SetAttributes(baggageAttributes...)

        if _, err := w.Write([]byte("Hello World")); err != nil {
            http.Error(w, "write operation failed.", http.StatusInternalServerError)
            return
        }

    })

    mux := http.NewServeMux()
    mux.Handle("/hello", otelhttp.NewHandler(handler, "/hello"))
    server := &http.Server{
        Addr:    ":7080",
        Handler: mux,
    }
    if err := server.ListenAndServe(); err != http.ErrServerClosed {
        handleErr(err, "server failed to serve")
    }
}

Init Provider

参考 Client

OTEL HTTP Handler

go
mux := http.NewServeMux()
mux.Handle("/hello", otelhttp.NewHandler(handler, "/hello"))
server := &http.Server{
    Addr:    ":7080",
    Handler: mux,
}
if err := server.ListenAndServe(); err != http.ErrServerClosed {
    handleErr(err, "server failed to serve")
}
  • otelhttp.NewHandler 创建一个处理 span 的 Handler
  • server.ListenAndServe() 会调用 Handler 的 ServeHTTP()
  • ServeHTTP() 函数执行的时候会创建 span
  • 如果 Header 中带有 traceparent 头部,会创建子 span
  • 当 HTTP 请求处理完成后,span 自动结束并被加入上报队列
net > http > server.go
go
func (c *conn) serve(ctx context.Context) {
    // ...
    serverHandler{c.server}.ServeHTTP(w, w.req)
    // ...
}
otelhttp@v0.37.0 > handler.go
go
// ServeHTTP serves HTTP requests (http.Handler).
func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    // ...
    ctx := h.propagators.Extract(r.Context(), propagation.HeaderCarrier(r.Header))
    // ...
    ctx, span := tracer.Start(ctx, h.spanNameFormatter(h.operation, r), opts...)
    defer span.End()
}
go.opentelemetry.io > otel > sdk@v1.11.2 > trace > trace.go
go
// Start starts a Span and returns it along with a context containing it.
//
// The Span is created with the provided name and as a child of any existing
// span context found in the passed context. The created Span will be
// configured appropriately by any SpanOption passed.
func (tr *tracer) Start(ctx context.Context, name string, options ...trace.SpanStartOption) (context.Context, trace.Span) {
    // ...
    s := tr.newSpan(ctx, name, &config)
    // ...
    return trace.ContextWithSpan(ctx, s), s
}

Get Header Context

go
ctx := req.Context()
span := trace.SpanFromContext(ctx)
bag := baggage.FromContext(ctx)
  • 从 ctx 中获取当前 span,这个 span 是在 ServeHTTP 中创建的
  • 从 Header 的 baggage 字段中提取信息

Simulate Error

go
// Sdding errors generation
if counter%errorProb == 0 {
    defer span.End()
    serverErr := fmt.Errorf("Internal Server Error - Randomized for Sampling")
    http.Error(w, serverErr.Error(), http.StatusInternalServerError)
    span.RecordError(serverErr)
    span.SetStatus(codes.Error, serverErr.Error())
    return
}