Skip to content

Batch Send With Timeout

go
idleDuration := 10 * time.Second
idleDelay := time.NewTimer(idleDuration)

for {
    idleDelay.Reset(idleDuration)
    msgQueue := make([]model.MinIOEvent, 0, BatchSize)

    var needToSend bool
    for !needToSend {
        select {
        case msg := <-k.msgChan:
            msgQueue = append(msgQueue, msg)
            log.Info("msg add one")
        case <-idleDelay.C:
            needToSend = true
            log.Info("ready to flush data to Kafka, for interval routinely ",
                zap.Int("queue insert length", len(msgQueue)))
        default:
            if len(msgQueue) >= BatchSize {
                needToSend = true
            }
        }
    }

    if len(msgQueue) == 0 {
        log.Info("not need to batch insert")
        continue
    }
}