diff --git a/api/client/commands.go b/api/client/commands.go index 4cfe97cdd..f9cc10079 100644 --- a/api/client/commands.go +++ b/api/client/commands.go @@ -16,14 +16,17 @@ import ( "path" "path/filepath" "runtime" + "sort" "strconv" "strings" + "sync" "text/tabwriter" "text/template" "time" log "github.com/Sirupsen/logrus" "github.com/docker/docker/api" + "github.com/docker/docker/api/stats" "github.com/docker/docker/dockerversion" "github.com/docker/docker/engine" "github.com/docker/docker/graph" @@ -2618,3 +2621,122 @@ func (cli *DockerCli) CmdExec(args ...string) error { return nil } + +type containerStats struct { + Name string + CpuPercentage float64 + Memory float64 + MemoryLimit float64 + MemoryPercentage float64 + NetworkRx float64 + NetworkTx float64 + mu sync.RWMutex + err error +} + +func (s *containerStats) Collect(stream io.ReadCloser) { + defer stream.Close() + var ( + previousCpu uint64 + previousSystem uint64 + start = true + dec = json.NewDecoder(stream) + ) + for { + var v *stats.Stats + if err := dec.Decode(&v); err != nil { + s.mu.Lock() + s.err = err + s.mu.Unlock() + return + } + var ( + memPercent = float64(v.MemoryStats.Usage) / float64(v.MemoryStats.Limit) * 100.0 + cpuPercent = 0.0 + ) + if !start { + cpuPercent = calcuateCpuPercent(previousCpu, previousSystem, v) + } + start = false + s.mu.Lock() + s.CpuPercentage = cpuPercent + s.Memory = float64(v.MemoryStats.Usage) + s.MemoryLimit = float64(v.MemoryStats.Limit) + s.MemoryPercentage = memPercent + s.NetworkRx = float64(v.Network.RxBytes) + s.NetworkTx = float64(v.Network.TxBytes) + s.mu.Unlock() + + previousCpu = v.CpuStats.CpuUsage.TotalUsage + previousSystem = v.CpuStats.SystemUsage + } +} + +func (s *containerStats) Display(w io.Writer) error { + s.mu.RLock() + defer s.mu.RUnlock() + if s.err != nil { + return s.err + } + fmt.Fprintf(w, "%s\t%.2f%%\t%s/%s\t%.2f%%\t%s/%s\n", + s.Name, + s.CpuPercentage, + units.BytesSize(s.Memory), units.BytesSize(s.MemoryLimit), + s.MemoryPercentage, + units.BytesSize(s.NetworkRx), units.BytesSize(s.NetworkTx)) + return nil +} + +func (cli *DockerCli) CmdStats(args ...string) error { + cmd := cli.Subcmd("stats", "CONTAINER", "Display live container stats based on resource usage", true) + cmd.Require(flag.Min, 1) + utils.ParseFlags(cmd, args, true) + + names := cmd.Args() + sort.Strings(names) + var cStats []*containerStats + for _, n := range names { + s := &containerStats{Name: n} + cStats = append(cStats, s) + stream, _, err := cli.call("GET", "/containers/"+n+"/stats", nil, false) + if err != nil { + return err + } + go s.Collect(stream) + } + w := tabwriter.NewWriter(cli.out, 20, 1, 3, ' ', 0) + for _ = range time.Tick(500 * time.Millisecond) { + fmt.Fprint(cli.out, "\033[2J") + fmt.Fprint(cli.out, "\033[H") + fmt.Fprintln(w, "CONTAINER\tCPU %\tMEM USAGE/LIMIT\tMEM %\tNET I/O") + toRemove := []int{} + for i, s := range cStats { + if err := s.Display(w); err != nil { + toRemove = append(toRemove, i) + } + } + for _, i := range toRemove { + cStats = append(cStats[:i], cStats[i+1:]...) + } + if len(cStats) == 0 { + return nil + } + w.Flush() + } + return nil +} + +func calcuateCpuPercent(previousCpu, previousSystem uint64, v *stats.Stats) float64 { + var ( + cpuPercent = 0.0 + // calculate the change for the cpu usage of the container in between readings + cpuDelta = float64(v.CpuStats.CpuUsage.TotalUsage - previousCpu) + // calculate the change for the entire system between readings + systemDelta = float64(v.CpuStats.SystemUsage - previousSystem) + ) + + if systemDelta > 0.0 && cpuDelta > 0.0 { + cpuPercent = (cpuDelta / systemDelta) * float64(len(v.CpuStats.CpuUsage.PercpuUsage)) * 100.0 + } + return cpuPercent +} diff --git a/api/server/server.go b/api/server/server.go index d2715f1bc..d5cdbd00c 100644 --- a/api/server/server.go +++ b/api/server/server.go @@ -411,6 +411,19 @@ func getContainersJSON(eng *engine.Engine, version version.Version, w http.Respo return nil } +func getContainersStats(eng *engine.Engine, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error { + if err := parseForm(r); err != nil { + return err + } + if vars == nil { + return fmt.Errorf("Missing parameter") + } + name := vars["name"] + job := eng.Job("container_stats", name) + streamJSON(job, w, true) + return job.Run() +} + func getContainersLogs(eng *engine.Engine, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error { if err := parseForm(r); err != nil { return err @@ -1323,6 +1336,7 @@ func createRouter(eng *engine.Engine, logging, enableCors bool, dockerVersion st "/containers/{name:.*}/json": getContainersByName, "/containers/{name:.*}/top": getContainersTop, "/containers/{name:.*}/logs": getContainersLogs, + "/containers/{name:.*}/stats": getContainersStats, "/containers/{name:.*}/attach/ws": wsContainersAttach, "/exec/{id:.*}/json": getExecByID, }, diff --git a/api/stats/stats.go b/api/stats/stats.go new file mode 100644 index 000000000..d58fdd4f5 --- /dev/null +++ b/api/stats/stats.go @@ -0,0 +1,87 @@ +// This package is used for API stability in the types and response to the +// consumers of the API stats endpoint. +package stats + +import "time" + +type ThrottlingData struct { + // Number of periods with throttling active + Periods uint64 `json:"periods,omitempty"` + // Number of periods when the container hit its throttling limit. + ThrottledPeriods uint64 `json:"throttled_periods,omitempty"` + // Aggregate time the container was throttled for in nanoseconds. + ThrottledTime uint64 `json:"throttled_time,omitempty"` +} + +// All CPU stats are aggregated since container inception. +type CpuUsage struct { + // Total CPU time consumed. + // Units: nanoseconds. + TotalUsage uint64 `json:"total_usage,omitempty"` + // Total CPU time consumed per core. + // Units: nanoseconds. + PercpuUsage []uint64 `json:"percpu_usage,omitempty"` + // Time spent by tasks of the cgroup in kernel mode. + // Units: nanoseconds. + UsageInKernelmode uint64 `json:"usage_in_kernelmode"` + // Time spent by tasks of the cgroup in user mode. + // Units: nanoseconds. + UsageInUsermode uint64 `json:"usage_in_usermode"` +} + +type CpuStats struct { + CpuUsage CpuUsage `json:"cpu_usage,omitempty"` + SystemUsage uint64 `json:"system_cpu_usage"` + ThrottlingData ThrottlingData `json:"throttling_data,omitempty"` +} + +type MemoryStats struct { + // current res_counter usage for memory + Usage uint64 `json:"usage,omitempty"` + // maximum usage ever recorded. + MaxUsage uint64 `json:"max_usage,omitempty"` + // TODO(vishh): Export these as stronger types. + // all the stats exported via memory.stat. + Stats map[string]uint64 `json:"stats,omitempty"` + // number of times memory usage hits limits. + Failcnt uint64 `json:"failcnt"` + Limit uint64 `json:"limit"` +} + +type BlkioStatEntry struct { + Major uint64 `json:"major,omitempty"` + Minor uint64 `json:"minor,omitempty"` + Op string `json:"op,omitempty"` + Value uint64 `json:"value,omitempty"` +} + +type BlkioStats struct { + // number of bytes tranferred to and from the block device + IoServiceBytesRecursive []BlkioStatEntry `json:"io_service_bytes_recursive,omitempty"` + IoServicedRecursive []BlkioStatEntry `json:"io_serviced_recursive,omitempty"` + IoQueuedRecursive []BlkioStatEntry `json:"io_queue_recursive,omitempty"` + IoServiceTimeRecursive []BlkioStatEntry `json:"io_service_time_recursive,omitempty"` + IoWaitTimeRecursive []BlkioStatEntry `json:"io_wait_time_recursive,omitempty"` + IoMergedRecursive []BlkioStatEntry `json:"io_merged_recursive,omitempty"` + IoTimeRecursive []BlkioStatEntry `json:"io_time_recursive,omitempty"` + SectorsRecursive []BlkioStatEntry `json:"sectors_recursive,omitempty"` +} + +type Network struct { + RxBytes uint64 `json:"rx_bytes"` + RxPackets uint64 `json:"rx_packets"` + RxErrors uint64 `json:"rx_errors"` + RxDropped uint64 `json:"rx_dropped"` + TxBytes uint64 `json:"tx_bytes"` + TxPackets uint64 `json:"tx_packets"` + TxErrors uint64 `json:"tx_errors"` + TxDropped uint64 `json:"tx_dropped"` +} + +type Stats struct { + Read time.Time `json:"read"` + Network Network `json:"network,omitempty"` + CpuStats CpuStats `json:"cpu_stats,omitempty"` + MemoryStats MemoryStats `json:"memory_stats,omitempty"` + BlkioStats BlkioStats `json:"blkio_stats,omitempty"` +} diff --git a/daemon/container.go b/daemon/container.go index b0eaea03b..046ec71e8 100644 --- a/daemon/container.go +++ b/daemon/container.go @@ -1414,3 +1414,7 @@ func (container *Container) getNetworkedContainer() (*Container, error) { return nil, fmt.Errorf("network mode not set to container") } } + +func (container *Container) Stats() (*execdriver.ResourceStats, error) { + return container.daemon.Stats(container) +} diff --git a/daemon/daemon.go b/daemon/daemon.go index 1b2d13bb6..c03e9d7aa 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -104,6 +104,7 @@ type Daemon struct { driver graphdriver.Driver execDriver execdriver.Driver trustStore *trust.TrustStore + statsCollector *statsCollector } // Install installs daemon capabilities to eng. @@ -116,6 +117,7 @@ func (daemon *Daemon) Install(eng *engine.Engine) error { "container_copy": daemon.ContainerCopy, "container_rename": daemon.ContainerRename, "container_inspect": daemon.ContainerInspect, + "container_stats": daemon.ContainerStats, "containers": daemon.Containers, "create": daemon.ContainerCreate, "rm": daemon.ContainerRm, @@ -982,6 +984,7 @@ func NewDaemonFromDirectory(config *Config, eng *engine.Engine) (*Daemon, error) execDriver: ed, eng: eng, trustStore: t, + statsCollector: newStatsCollector(1 * time.Second), } if err := daemon.restore(); err != nil { return nil, err @@ -1092,6 +1095,28 @@ func (daemon *Daemon) Kill(c *Container, sig int) error { return daemon.execDriver.Kill(c.command, sig) } +func (daemon *Daemon) Stats(c *Container) (*execdriver.ResourceStats, error) { + return daemon.execDriver.Stats(c.ID) +} + +func (daemon *Daemon) SubscribeToContainerStats(name string) (chan interface{}, error) { + c := daemon.Get(name) + if c == nil { + return nil, fmt.Errorf("no such container") + } + ch := daemon.statsCollector.collect(c) + return ch, nil +} + +func (daemon *Daemon) UnsubscribeToContainerStats(name string, ch chan interface{}) error { + c := daemon.Get(name) + if c == nil { + return fmt.Errorf("no such container") + } + daemon.statsCollector.unsubscribe(c, ch) + return nil +} + // Nuke kills all containers then removes all content // from the content root, including images, volumes and // container filesystems. diff --git a/daemon/delete.go b/daemon/delete.go index 990e4b448..59c765178 100644 --- a/daemon/delete.go +++ b/daemon/delete.go @@ -49,6 +49,9 @@ func (daemon *Daemon) ContainerRm(job *engine.Job) engine.Status { } if container != nil { + // stop collection of stats for the container regardless + // if stats are currently getting collected. + daemon.statsCollector.stopCollection(container) if container.IsRunning() { if forceRemove { if err := container.Kill(); err != nil { diff --git a/daemon/execdriver/driver.go b/daemon/execdriver/driver.go index fe99e062d..2215d03cf 100644 --- a/daemon/execdriver/driver.go +++ b/daemon/execdriver/driver.go @@ -5,7 +5,9 @@ import ( "io" "os" "os/exec" + "time" + "github.com/docker/libcontainer" "github.com/docker/libcontainer/devices" ) @@ -14,7 +16,7 @@ import ( type Context map[string]string var ( - ErrNotRunning = errors.New("Process could not be started") + ErrNotRunning = errors.New("Container is not running") ErrWaitTimeoutReached = errors.New("Wait timeout reached") ErrDriverAlreadyRegistered = errors.New("A driver already registered this docker init function") ErrDriverNotFound = errors.New("The requested docker init has not been found") @@ -61,6 +63,7 @@ type Driver interface { GetPidsForContainer(id string) ([]int, error) // Returns a list of pids for the given container. Terminate(c *Command) error // kill it with fire Clean(id string) error // clean all traces of container exec + Stats(id string) (*ResourceStats, error) // Get resource stats for a running container } // Network settings of the container @@ -101,6 +104,13 @@ type Resources struct { Cpuset string `json:"cpuset"` } +type ResourceStats struct { + *libcontainer.ContainerStats + Read time.Time `json:"read"` + MemoryLimit int64 `json:"memory_limit"` + SystemUsage uint64 `json:"system_usage"` +} + type Mount struct { Source string `json:"source"` Destination string `json:"destination"` diff --git a/daemon/execdriver/execdrivers/execdrivers.go b/daemon/execdriver/execdrivers/execdrivers.go index 2a050b483..be3222a8b 100644 --- a/daemon/execdriver/execdrivers/execdrivers.go +++ b/daemon/execdriver/execdrivers/execdrivers.go @@ -2,11 +2,12 @@ package execdrivers import ( "fmt" + "path" + "github.com/docker/docker/daemon/execdriver" "github.com/docker/docker/daemon/execdriver/lxc" "github.com/docker/docker/daemon/execdriver/native" "github.com/docker/docker/pkg/sysinfo" - "path" ) func NewDriver(name, root, initPath string, sysInfo *sysinfo.SysInfo) (execdriver.Driver, error) { diff --git a/daemon/execdriver/lxc/driver.go b/daemon/execdriver/lxc/driver.go index c02ceae97..44942b1fe 100644 --- a/daemon/execdriver/lxc/driver.go +++ b/daemon/execdriver/lxc/driver.go @@ -524,3 +524,8 @@ func (t *TtyConsole) Close() error { func (d *driver) Exec(c *execdriver.Command, processConfig *execdriver.ProcessConfig, pipes *execdriver.Pipes, startCallback execdriver.StartCallback) (int, error) { return -1, ErrExec } + +func (d *driver) Stats(id string) (*execdriver.ResourceStats, error) { + return nil, fmt.Errorf("container stats are not supported with LXC") + +} diff --git a/daemon/execdriver/native/driver.go b/daemon/execdriver/native/driver.go index f6099bd04..533e6d61e 100644 --- a/daemon/execdriver/native/driver.go +++ b/daemon/execdriver/native/driver.go @@ -13,9 +13,11 @@ import ( "strings" "sync" "syscall" + "time" log "github.com/Sirupsen/logrus" "github.com/docker/docker/daemon/execdriver" + sysinfo "github.com/docker/docker/pkg/system" "github.com/docker/docker/pkg/term" "github.com/docker/libcontainer" "github.com/docker/libcontainer/apparmor" @@ -41,23 +43,28 @@ type driver struct { root string initPath string activeContainers map[string]*activeContainer + machineMemory int64 sync.Mutex } func NewDriver(root, initPath string) (*driver, error) { - if err := os.MkdirAll(root, 0700); err != nil { + meminfo, err := sysinfo.ReadMemInfo() + if err != nil { return nil, err } + if err := os.MkdirAll(root, 0700); err != nil { + return nil, err + } // native driver root is at docker_root/execdriver/native. Put apparmor at docker_root if err := apparmor.InstallDefaultProfile(); err != nil { return nil, err } - return &driver{ root: root, initPath: initPath, activeContainers: make(map[string]*activeContainer), + machineMemory: meminfo.MemTotal, }, nil } @@ -279,6 +286,33 @@ func (d *driver) Clean(id string) error { return os.RemoveAll(filepath.Join(d.root, id)) } +func (d *driver) Stats(id string) (*execdriver.ResourceStats, error) { + c := d.activeContainers[id] + state, err := libcontainer.GetState(filepath.Join(d.root, id)) + if err != nil { + if os.IsNotExist(err) { + return nil, execdriver.ErrNotRunning + } + return nil, err + } + now := time.Now() + stats, err := libcontainer.GetStats(nil, state) + if err != nil { + return nil, err + } + memoryLimit := c.container.Cgroups.Memory + // if the container does not have any memory limit specified set the + // limit to the machines memory + if memoryLimit == 0 { + memoryLimit = d.machineMemory + } + return &execdriver.ResourceStats{ + Read: now, + ContainerStats: stats, + MemoryLimit: memoryLimit, + }, nil +} + func getEnv(key string, env []string) string { for _, pair := range env { parts := strings.Split(pair, "=") diff --git a/daemon/stats.go b/daemon/stats.go new file mode 100644 index 000000000..e047497ec --- /dev/null +++ b/daemon/stats.go @@ -0,0 +1,98 @@ +package daemon + +import ( + "encoding/json" + + "github.com/docker/docker/api/stats" + "github.com/docker/docker/daemon/execdriver" + "github.com/docker/docker/engine" + "github.com/docker/libcontainer" + "github.com/docker/libcontainer/cgroups" +) + +func (daemon *Daemon) ContainerStats(job *engine.Job) engine.Status { + updates, err := daemon.SubscribeToContainerStats(job.Args[0]) + if err != nil { + return job.Error(err) + } + enc := json.NewEncoder(job.Stdout) + for v := range updates { + update := v.(*execdriver.ResourceStats) + ss := convertToAPITypes(update.ContainerStats) + ss.MemoryStats.Limit = uint64(update.MemoryLimit) + ss.Read = update.Read + ss.CpuStats.SystemUsage = update.SystemUsage + if err := enc.Encode(ss); err != nil { + // TODO: handle the specific broken pipe + daemon.UnsubscribeToContainerStats(job.Args[0], updates) + return job.Error(err) + } + } + return engine.StatusOK +} + +// convertToAPITypes converts the libcontainer.ContainerStats to the api specific +// structs. This is done to preserve API compatibility and versioning. +func convertToAPITypes(ls *libcontainer.ContainerStats) *stats.Stats { + s := &stats.Stats{} + if ls.NetworkStats != nil { + s.Network = stats.Network{ + RxBytes: ls.NetworkStats.RxBytes, + RxPackets: ls.NetworkStats.RxPackets, + RxErrors: ls.NetworkStats.RxErrors, + RxDropped: ls.NetworkStats.RxDropped, + TxBytes: ls.NetworkStats.TxBytes, + TxPackets: ls.NetworkStats.TxPackets, + TxErrors: ls.NetworkStats.TxErrors, + TxDropped: ls.NetworkStats.TxDropped, + } + } + cs := ls.CgroupStats + if cs != nil { + s.BlkioStats = stats.BlkioStats{ + IoServiceBytesRecursive: copyBlkioEntry(cs.BlkioStats.IoServiceBytesRecursive), + IoServicedRecursive: copyBlkioEntry(cs.BlkioStats.IoServicedRecursive), + IoQueuedRecursive: copyBlkioEntry(cs.BlkioStats.IoQueuedRecursive), + IoServiceTimeRecursive: copyBlkioEntry(cs.BlkioStats.IoServiceTimeRecursive), + IoWaitTimeRecursive: copyBlkioEntry(cs.BlkioStats.IoWaitTimeRecursive), + IoMergedRecursive: copyBlkioEntry(cs.BlkioStats.IoMergedRecursive), + IoTimeRecursive: copyBlkioEntry(cs.BlkioStats.IoTimeRecursive), + SectorsRecursive: copyBlkioEntry(cs.BlkioStats.SectorsRecursive), + } + cpu := cs.CpuStats + s.CpuStats = stats.CpuStats{ + CpuUsage: stats.CpuUsage{ + TotalUsage: cpu.CpuUsage.TotalUsage, + PercpuUsage: cpu.CpuUsage.PercpuUsage, + UsageInKernelmode: cpu.CpuUsage.UsageInKernelmode, + UsageInUsermode: cpu.CpuUsage.UsageInUsermode, + }, + ThrottlingData: stats.ThrottlingData{ + Periods: cpu.ThrottlingData.Periods, + ThrottledPeriods: cpu.ThrottlingData.ThrottledPeriods, + ThrottledTime: cpu.ThrottlingData.ThrottledTime, + }, + } + mem := cs.MemoryStats + s.MemoryStats = stats.MemoryStats{ + Usage: mem.Usage, + MaxUsage: mem.MaxUsage, + Stats: mem.Stats, + Failcnt: mem.Failcnt, + } + } + return s +} + +func copyBlkioEntry(entries []cgroups.BlkioStatEntry) []stats.BlkioStatEntry { + out := make([]stats.BlkioStatEntry, len(entries)) + for i, re := range entries { + out[i] = stats.BlkioStatEntry{ + Major: re.Major, + Minor: re.Minor, + Op: re.Op, + Value: re.Value, + } + } + return out +} diff --git a/daemon/stats_collector.go b/daemon/stats_collector.go new file mode 100644 index 000000000..779bd1a59 --- /dev/null +++ b/daemon/stats_collector.go @@ -0,0 +1,129 @@ +package daemon + +import ( + "bufio" + "fmt" + "os" + "strconv" + "strings" + "sync" + "time" + + log "github.com/Sirupsen/logrus" + "github.com/docker/docker/daemon/execdriver" + "github.com/docker/docker/pkg/pubsub" + "github.com/docker/libcontainer/system" +) + +// newStatsCollector returns a new statsCollector that collections +// network and cgroup stats for a registered container at the specified +// interval. The collector allows non-running containers to be added +// and will start processing stats when they are started. +func newStatsCollector(interval time.Duration) *statsCollector { + s := &statsCollector{ + interval: interval, + publishers: make(map[*Container]*pubsub.Publisher), + clockTicks: uint64(system.GetClockTicks()), + } + go s.run() + return s +} + +// statsCollector manages and provides container resource stats +type statsCollector struct { + m sync.Mutex + interval time.Duration + clockTicks uint64 + publishers map[*Container]*pubsub.Publisher +} + +// collect registers the container with the collector and adds it to +// the event loop for collection on the specified interval returning +// a channel for the subscriber to receive on. +func (s *statsCollector) collect(c *Container) chan interface{} { + s.m.Lock() + defer s.m.Unlock() + publisher, exists := s.publishers[c] + if !exists { + publisher = pubsub.NewPublisher(100*time.Millisecond, 1024) + s.publishers[c] = publisher + } + return publisher.Subscribe() +} + +// stopCollection closes the channels for all subscribers and removes +// the container from metrics collection. +func (s *statsCollector) stopCollection(c *Container) { + s.m.Lock() + if publisher, exists := s.publishers[c]; exists { + publisher.Close() + delete(s.publishers, c) + } + s.m.Unlock() +} + +// unsubscribe removes a specific subscriber from receiving updates for a container's stats. +func (s *statsCollector) unsubscribe(c *Container, ch chan interface{}) { + s.m.Lock() + publisher := s.publishers[c] + if publisher != nil { + publisher.Evict(ch) + if publisher.Len() == 0 { + delete(s.publishers, c) + } + } + s.m.Unlock() +} + +func (s *statsCollector) run() { + for _ = range time.Tick(s.interval) { + for container, publisher := range s.publishers { + systemUsage, err := s.getSystemCpuUsage() + if err != nil { + log.Errorf("collecting system cpu usage for %s: %v", container.ID, err) + continue + } + stats, err := container.Stats() + if err != nil { + if err != execdriver.ErrNotRunning { + log.Errorf("collecting stats for %s: %v", container.ID, err) + } + continue + } + stats.SystemUsage = systemUsage + publisher.Publish(stats) + } + } +} + +const nanoSeconds = 1e9 + +// getSystemCpuUSage returns the host system's cpu usage in nanoseconds +// for the system to match the cgroup readings are returned in the same format. +func (s *statsCollector) getSystemCpuUsage() (uint64, error) { + f, err := os.Open("/proc/stat") + if err != nil { + return 0, err + } + defer f.Close() + sc := bufio.NewScanner(f) + for sc.Scan() { + parts := strings.Fields(sc.Text()) + switch parts[0] { + case "cpu": + if len(parts) < 8 { + return 0, fmt.Errorf("invalid number of cpu fields") + } + var sum uint64 + for _, i := range parts[1:8] { + v, err := strconv.ParseUint(i, 10, 64) + if err != nil { + return 0, fmt.Errorf("Unable to convert value %s to int: %s", i, err) + } + sum += v + } + return (sum * nanoSeconds) / s.clockTicks, nil + } + } + return 0, fmt.Errorf("invalid stat format") +} diff --git a/docker/flags.go b/docker/flags.go index 719acbe93..525e8cbfd 100644 --- a/docker/flags.go +++ b/docker/flags.go @@ -98,6 +98,7 @@ func init() { {"save", "Save an image to a tar archive"}, {"search", "Search for an image on the Docker Hub"}, {"start", "Start a stopped container"}, + {"stats", "Display live container stats based on resource usage"}, {"stop", "Stop a running container"}, {"tag", "Tag an image into a repository"}, {"top", "Lookup the running processes of a container"}, diff --git a/docs/man/docker-stats.1.md b/docs/man/docker-stats.1.md new file mode 100644 index 000000000..fdad99719 --- /dev/null +++ b/docs/man/docker-stats.1.md @@ -0,0 +1,28 @@ +% DOCKER(1) Docker User Manuals +% Docker Community +% JUNE 2014 +# NAME +docker-stats - Display live container stats based on resource usage. + +# SYNOPSIS +**docker stats** +[**--help**] +[CONTAINERS] + +# DESCRIPTION + +Display live container stats based on resource usage. + +# OPTIONS +**--help** + Print usage statement + +# EXAMPLES + +Run **docker stats** with multiple containers. + + $ sudo docker stats redis1 redis2 + CONTAINER CPU % MEM USAGE/LIMIT MEM % NET I/O + redis1 0.07% 796 KiB/64 MiB 1.21% 788 B/648 B + redis2 0.07% 2.746 MiB/64 MiB 4.29% 1.266 KiB/648 B + diff --git a/docs/sources/reference/api/docker_remote_api.md b/docs/sources/reference/api/docker_remote_api.md index a36133795..2d52d53b8 100644 --- a/docs/sources/reference/api/docker_remote_api.md +++ b/docs/sources/reference/api/docker_remote_api.md @@ -68,6 +68,12 @@ New endpoint to rename a container `id` to a new name. (`ReadonlyRootfs`) can be passed in the host config to mount the container's root filesystem as read only. +`GET /containers/(id)/stats` + +**New!** +This endpoint returns a stream of container stats based on resource usage. + + ## v1.16 ### Full Documentation diff --git a/docs/sources/reference/api/docker_remote_api_v1.17.md b/docs/sources/reference/api/docker_remote_api_v1.17.md index a44dcbf3a..77e9a8713 100644 --- a/docs/sources/reference/api/docker_remote_api_v1.17.md +++ b/docs/sources/reference/api/docker_remote_api_v1.17.md @@ -514,6 +514,94 @@ Status Codes: - **404** – no such container - **500** – server error +### Get container stats based on resource usage + +`GET /containers/(id)/stats` + +Returns a stream of json objects of the container's stats + +**Example request**: + + GET /containers/redis1/stats HTTP/1.1 + +**Example response**: + + HTTP/1.1 200 OK + Content-Type: application/json + + { + "read" : "2015-01-08T22:57:31.547920715Z", + "network" : { + "rx_dropped" : 0, + "rx_bytes" : 648, + "rx_errors" : 0, + "tx_packets" : 8, + "tx_dropped" : 0, + "rx_packets" : 8, + "tx_errors" : 0, + "tx_bytes" : 648 + }, + "memory_stats" : { + "stats" : { + "total_pgmajfault" : 0, + "cache" : 0, + "mapped_file" : 0, + "total_inactive_file" : 0, + "pgpgout" : 414, + "rss" : 6537216, + "total_mapped_file" : 0, + "writeback" : 0, + "unevictable" : 0, + "pgpgin" : 477, + "total_unevictable" : 0, + "pgmajfault" : 0, + "total_rss" : 6537216, + "total_rss_huge" : 6291456, + "total_writeback" : 0, + "total_inactive_anon" : 0, + "rss_huge" : 6291456, + "hierarchical_memory_limit" : 67108864, + "total_pgfault" : 964, + "total_active_file" : 0, + "active_anon" : 6537216, + "total_active_anon" : 6537216, + "total_pgpgout" : 414, + "total_cache" : 0, + "inactive_anon" : 0, + "active_file" : 0, + "pgfault" : 964, + "inactive_file" : 0, + "total_pgpgin" : 477 + }, + "max_usage" : 6651904, + "usage" : 6537216, + "failcnt" : 0, + "limit" : 67108864 + }, + "blkio_stats" : {}, + "cpu_stats" : { + "cpu_usage" : { + "percpu_usage" : [ + 16970827, + 1839451, + 7107380, + 10571290 + ], + "usage_in_usermode" : 10000000, + "total_usage" : 36488948, + "usage_in_kernelmode" : 20000000 + }, + "system_cpu_usage" : 20091722000000000, + "throttling_data" : {} + } + } + +Status Codes: + +- **200** – no error +- **404** – no such container +- **500** – server error + ### Resize a container TTY `POST /containers/(id)/resize?h=&w=` diff --git a/docs/sources/reference/commandline/cli.md b/docs/sources/reference/commandline/cli.md index 36d0b18cc..b8af02da3 100644 --- a/docs/sources/reference/commandline/cli.md +++ b/docs/sources/reference/commandline/cli.md @@ -2001,8 +2001,28 @@ more details on finding shared images from the command line. -a, --attach=false Attach container's STDOUT and STDERR and forward all signals to the process -i, --interactive=false Attach container's STDIN -When run on a container that has already been started, -takes no action and succeeds unconditionally. +## stats + + Usage: docker stats [CONTAINERS] + + Display live container stats based on resource usage + + --help=false Print usage + +Running `docker stats` on two redis containers + + $ sudo docker stats redis1 redis2 + CONTAINER CPU % MEM USAGE/LIMIT MEM % NET I/O + redis1 0.07% 796 KiB/64 MiB 1.21% 788 B/648 B + redis2 0.07% 2.746 MiB/64 MiB 4.29% 1.266 KiB/648 B + + +When run on running containers live container stats will be streamed +back and displayed to the client. Stopped containers will not +receive any updates to their stats unless the container is started again. + +> **Note:** +> If you want more in depth resource usage for a container use the API endpoint ## stop diff --git a/integration-cli/docker_api_containers_test.go b/integration-cli/docker_api_containers_test.go index 8b0b8fd69..34cc82aff 100644 --- a/integration-cli/docker_api_containers_test.go +++ b/integration-cli/docker_api_containers_test.go @@ -9,7 +9,9 @@ import ( "os/exec" "strings" "testing" + "time" + "github.com/docker/docker/api/stats" "github.com/docker/docker/vendor/src/code.google.com/p/go/src/pkg/archive/tar" ) @@ -251,3 +253,31 @@ func TestVolumesFromHasPriority(t *testing.T) { logDone("container REST API - check VolumesFrom has priority") } + +func TestGetContainerStats(t *testing.T) { + defer deleteAllContainers() + name := "statscontainer" + + runCmd := exec.Command(dockerBinary, "run", "-d", "--name", name, "busybox", "top") + out, _, err := runCommandWithOutput(runCmd) + if err != nil { + t.Fatalf("Error on container creation: %v, output: %q", err, out) + } + go func() { + time.Sleep(4 * time.Second) + runCommand(exec.Command(dockerBinary, "kill", name)) + runCommand(exec.Command(dockerBinary, "rm", name)) + }() + + body, err := sockRequest("GET", "/containers/"+name+"/stats", nil) + if err != nil { + t.Fatalf("GET containers/stats sockRequest failed: %v", err) + } + + dec := json.NewDecoder(bytes.NewBuffer(body)) + var s *stats.Stats + if err := dec.Decode(&s); err != nil { + t.Fatal(err) + } + logDone("container REST API - check GET containers/stats") +} diff --git a/pkg/pubsub/publisher.go b/pkg/pubsub/publisher.go new file mode 100644 index 000000000..f017262ae --- /dev/null +++ b/pkg/pubsub/publisher.go @@ -0,0 +1,74 @@ +package pubsub + +import ( + "sync" + "time" +) + +// NewPublisher creates a new pub/sub publisher to broadcast messages. +// The duration is used as the send timeout as to not block the publisher publishing +// messages to other clients if one client is slow or unresponsive. +// The buffer is used when creating new channels for subscribers. +func NewPublisher(publishTimeout time.Duration, buffer int) *Publisher { + return &Publisher{ + buffer: buffer, + timeout: publishTimeout, + subscribers: make(map[subscriber]struct{}), + } +} + +type subscriber chan interface{} + +type Publisher struct { + m sync.RWMutex + buffer int + timeout time.Duration + subscribers map[subscriber]struct{} +} + +// Len returns the number of subscribers for the publisher +func (p *Publisher) Len() int { + p.m.RLock() + i := len(p.subscribers) + p.m.RUnlock() + return i +} + +// Subscribe adds a new subscriber to the publisher returning the channel. +func (p *Publisher) Subscribe() chan interface{} { + ch := make(chan interface{}, p.buffer) + p.m.Lock() + p.subscribers[ch] = struct{}{} + p.m.Unlock() + return ch +} + +// Evict removes the specified subscriber from receiving any more messages. +func (p *Publisher) Evict(sub chan interface{}) { + p.m.Lock() + delete(p.subscribers, sub) + close(sub) + p.m.Unlock() +} + +// Publish sends the data in v to all subscribers currently registered with the publisher. +func (p *Publisher) Publish(v interface{}) { + p.m.RLock() + for sub := range p.subscribers { + // send under a select as to not block if the receiver is unavailable + select { + case sub <- v: + case <-time.After(p.timeout): + } + } + p.m.RUnlock() +} + +// Close closes the channels to all subscribers registered with the publisher. +func (p *Publisher) Close() { + p.m.Lock() + for sub := range p.subscribers { + close(sub) + } + p.m.Unlock() +} diff --git a/pkg/pubsub/publisher_test.go b/pkg/pubsub/publisher_test.go new file mode 100644 index 000000000..c19059a8a --- /dev/null +++ b/pkg/pubsub/publisher_test.go @@ -0,0 +1,63 @@ +package pubsub + +import ( + "testing" + "time" +) + +func TestSendToOneSub(t *testing.T) { + p := NewPublisher(100*time.Millisecond, 10) + c := p.Subscribe() + + p.Publish("hi") + + msg := <-c + if msg.(string) != "hi" { + t.Fatalf("expected message hi but received %v", msg) + } +} + +func TestSendToMultipleSubs(t *testing.T) { + p := NewPublisher(100*time.Millisecond, 10) + subs := []chan interface{}{} + subs = append(subs, p.Subscribe(), p.Subscribe(), p.Subscribe()) + + p.Publish("hi") + + for _, c := range subs { + msg := <-c + if msg.(string) != "hi" { + t.Fatalf("expected message hi but received %v", msg) + } + } +} + +func TestEvictOneSub(t *testing.T) { + p := NewPublisher(100*time.Millisecond, 10) + s1 := p.Subscribe() + s2 := p.Subscribe() + + p.Evict(s1) + p.Publish("hi") + if _, ok := <-s1; ok { + t.Fatal("expected s1 to not receive the published message") + } + + msg := <-s2 + if msg.(string) != "hi" { + t.Fatalf("expected message hi but received %v", msg) + } +} + +func TestClosePublisher(t *testing.T) { + p := NewPublisher(100*time.Millisecond, 10) + subs := []chan interface{}{} + subs = append(subs, p.Subscribe(), p.Subscribe(), p.Subscribe()) + p.Close() + + for _, c := range subs { + if _, ok := <-c; ok { + t.Fatal("expected all subscriber channels to be closed") + } + } +}