Demo Server
Source Code
go
// Copyright The OpenTelemetry Authors
// Modifications copyright (C) 2022 Cisco Systems, Inc.
// Link to original file
// https://github.com/open-telemetry/opentelemetry-go-contrib/blob/v0.14.0/instrumentation/net/http/otelhttp/example/server/server.go
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Sample contains a simple http server that exports to the OpenTelemetry agent.
package main
import (
"context"
"fmt"
"log"
"math/rand"
"net/http"
"os"
"time"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/baggage"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
)
var rng = rand.New(rand.NewSource(time.Now().UnixNano()))
var counter int
var errorProb int = 5
// Initializes an OTLP exporter, and configures the corresponding trace and
// metric providers.
func initProvider() func() {
ctx := context.Background()
res, err := resource.New(ctx,
resource.WithFromEnv(),
resource.WithProcess(),
resource.WithTelemetrySDK(),
resource.WithHost(),
resource.WithAttributes(
// the service name used to display traces in backends
semconv.ServiceNameKey.String("demo-server"),
),
)
handleErr(err, "failed to create resource")
otelAgentAddr, ok := os.LookupEnv("OTEL_EXPORTER_OTLP_ENDPOINT")
if !ok {
otelAgentAddr = "0.0.0.0:4317"
}
traceClient := otlptracegrpc.NewClient(
otlptracegrpc.WithInsecure(),
otlptracegrpc.WithEndpoint(otelAgentAddr),
otlptracegrpc.WithDialOption(grpc.WithBlock()))
traceExp, err := otlptrace.New(ctx, traceClient)
handleErr(err, "Failed to create the collector trace exporter")
bsp := sdktrace.NewBatchSpanProcessor(traceExp)
tracerProvider := sdktrace.NewTracerProvider(
sdktrace.WithSampler(sdktrace.AlwaysSample()),
sdktrace.WithResource(res),
sdktrace.WithSpanProcessor(bsp),
)
// set global propagator to tracecontext (the default is no-op).
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}))
otel.SetTracerProvider(tracerProvider)
return func() {
cxt, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
if err := traceExp.Shutdown(cxt); err != nil {
otel.Handle(err)
}
}
}
func handleErr(err error, message string) {
if err != nil {
log.Fatalf("%s: %v", message, err)
}
}
func main() {
shutdown := initProvider()
defer shutdown()
serverAttribute := attribute.String("server-attribute", "foo")
// create a handler wrapped in OpenTelemetry instrumentation
handler := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
// random sleep to simulate latency
var sleep int64
counter++
switch modulus := time.Now().Unix() % 5; modulus {
case 0:
sleep = rng.Int63n(2000)
case 1:
sleep = rng.Int63n(15)
case 2:
sleep = rng.Int63n(917)
case 3:
sleep = rng.Int63n(87)
case 4:
sleep = rng.Int63n(1173)
}
time.Sleep(time.Duration(sleep) * time.Millisecond)
ctx := req.Context()
span := trace.SpanFromContext(ctx)
bag := baggage.FromContext(ctx)
// Sdding errors generation
if counter%errorProb == 0 {
defer span.End()
serverErr := fmt.Errorf("Internal Server Error - Randomized for Sampling")
http.Error(w, serverErr.Error(), http.StatusInternalServerError)
span.RecordError(serverErr)
span.SetStatus(codes.Error, serverErr.Error())
return
}
var baggageAttributes []attribute.KeyValue
baggageAttributes = append(baggageAttributes, serverAttribute)
for _, member := range bag.Members() {
baggageAttributes = append(baggageAttributes, attribute.String("baggage key:"+member.Key(), member.Value()))
}
span.SetAttributes(baggageAttributes...)
if _, err := w.Write([]byte("Hello World")); err != nil {
http.Error(w, "write operation failed.", http.StatusInternalServerError)
return
}
})
mux := http.NewServeMux()
mux.Handle("/hello", otelhttp.NewHandler(handler, "/hello"))
server := &http.Server{
Addr: ":7080",
Handler: mux,
}
if err := server.ListenAndServe(); err != http.ErrServerClosed {
handleErr(err, "server failed to serve")
}
}
Init Provider
参考 Client
OTEL HTTP Handler
go
mux := http.NewServeMux()
mux.Handle("/hello", otelhttp.NewHandler(handler, "/hello"))
server := &http.Server{
Addr: ":7080",
Handler: mux,
}
if err := server.ListenAndServe(); err != http.ErrServerClosed {
handleErr(err, "server failed to serve")
}
otelhttp.NewHandler
创建一个处理 span 的 Handlerserver.ListenAndServe()
会调用 Handler 的ServeHTTP()
ServeHTTP()
函数执行的时候会创建 span- 如果 Header 中带有
traceparent
头部,会创建子 span - 当 HTTP 请求处理完成后,span 自动结束并被加入上报队列
go
func (c *conn) serve(ctx context.Context) {
// ...
serverHandler{c.server}.ServeHTTP(w, w.req)
// ...
}
go
// ServeHTTP serves HTTP requests (http.Handler).
func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// ...
ctx := h.propagators.Extract(r.Context(), propagation.HeaderCarrier(r.Header))
// ...
ctx, span := tracer.Start(ctx, h.spanNameFormatter(h.operation, r), opts...)
defer span.End()
}
go
// Start starts a Span and returns it along with a context containing it.
//
// The Span is created with the provided name and as a child of any existing
// span context found in the passed context. The created Span will be
// configured appropriately by any SpanOption passed.
func (tr *tracer) Start(ctx context.Context, name string, options ...trace.SpanStartOption) (context.Context, trace.Span) {
// ...
s := tr.newSpan(ctx, name, &config)
// ...
return trace.ContextWithSpan(ctx, s), s
}
Get Header Context
go
ctx := req.Context()
span := trace.SpanFromContext(ctx)
bag := baggage.FromContext(ctx)
- 从 ctx 中获取当前 span,这个 span 是在
ServeHTTP
中创建的 - 从 Header 的
baggage
字段中提取信息
Simulate Error
go
// Sdding errors generation
if counter%errorProb == 0 {
defer span.End()
serverErr := fmt.Errorf("Internal Server Error - Randomized for Sampling")
http.Error(w, serverErr.Error(), http.StatusInternalServerError)
span.RecordError(serverErr)
span.SetStatus(codes.Error, serverErr.Error())
return
}