File: stats.go

package info (click to toggle)
mirrorbits 0.6.1-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 984 kB
  • sloc: sh: 675; makefile: 93
file content (192 lines) | stat: -rw-r--r-- 4,170 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
// Copyright (c) 2014-2019 Ludovic Fauvet
// Licensed under the MIT license

package http

import (
	"errors"
	"fmt"
	"strconv"
	"strings"
	"sync"
	"time"

	"github.com/etix/mirrorbits/database"
	"github.com/etix/mirrorbits/filesystem"
	"github.com/etix/mirrorbits/mirrors"
)

/*
	Total (all files, all mirrors):
	STATS_TOTAL

	List of hashes for a file:
	STATS_FILE							= path -> value		All time
	STATS_FILE_[year]					= path -> value		By year
	STATS_FILE_[year]_[month]			= path -> value		By month
	STATS_FILE_[year]_[month]_[day]		= path -> value		By day

	List of hashes for a mirror:
	STATS_MIRROR						= mirror -> value	All time
	STATS_MIRROR_[year]					= mirror -> value	By year
	STATS_MIRROR_[year]_[month]			= mirror -> value	By month
	STATS_MIRROR_[year]_[month]_[day]	= mirror -> value	By day
*/

var (
	errEmptyFileError = errors.New("stats: file parameter is empty")
	errUnknownMirror  = errors.New("stats: unknown mirror")
)

// Stats is the internal structure for the download stats
type Stats struct {
	r          *database.Redis
	countChan  chan countItem
	mapStats   map[string]int64
	stop       chan bool
	wg         sync.WaitGroup
	downgraded bool
}

type countItem struct {
	mirrorID int
	filepath string
	size     int64
	time     time.Time
}

// NewStats returns an instance of the stats counter
func NewStats(redis *database.Redis) *Stats {
	s := &Stats{
		r:         redis,
		countChan: make(chan countItem, 1000),
		mapStats:  make(map[string]int64),
		stop:      make(chan bool),
	}
	go s.processCountDownload()
	return s
}

// Terminate stops the stats handler and commit results to the database
func (s *Stats) Terminate() {
	close(s.stop)
	log.Notice("Saving stats")
	s.wg.Wait()
}

// CountDownload is a lightweight method used to count a new download for a specific file and mirror
func (s *Stats) CountDownload(m mirrors.Mirror, fileinfo filesystem.FileInfo) error {
	if m.Name == "" {
		return errUnknownMirror
	}
	if fileinfo.Path == "" {
		return errEmptyFileError
	}

	s.countChan <- countItem{m.ID, fileinfo.Path, fileinfo.Size, time.Now().UTC()}
	return nil
}

// Process all stacked download messages
func (s *Stats) processCountDownload() {
	s.wg.Add(1)
	pushTicker := time.NewTicker(500 * time.Millisecond)

	for {
		select {
		case <-s.stop:
			s.pushStats()
			s.wg.Done()
			return
		case c := <-s.countChan:
			date := c.time.Format("2006_01_02|") // Includes separator
			s.mapStats["f"+date+c.filepath]++
			s.mapStats["m"+date+strconv.Itoa(c.mirrorID)]++
			s.mapStats["s"+date+strconv.Itoa(c.mirrorID)] += c.size
		case <-pushTicker.C:
			s.pushStats()
		}
	}
}

// Push the resulting stats on redis
func (s *Stats) pushStats() {
	if len(s.mapStats) <= 0 {
		return
	}

	rconn := s.r.Get()
	defer rconn.Close()

	if rconn.Err() != nil {
		if s.downgraded == false {
			log.Warningf("Uncommited stats kept in-memory: %v", rconn.Err())
		}

		s.downgraded = true
		return
	}

	rconn.Send("MULTI")

	for k, v := range s.mapStats {
		if v == 0 {
			continue
		}

		separator := strings.Index(k, "|")
		if separator <= 0 {
			log.Critical("Stats: separator not found")
			continue
		}
		typ := k[:1]
		date := k[1:separator]
		object := k[separator+1:]

		if typ == "f" {
			// File

			fkey := fmt.Sprintf("STATS_FILE_%s", date)

			for i := 0; i < 4; i++ {
				rconn.Send("HINCRBY", fkey, object, v)
				fkey = fkey[:strings.LastIndex(fkey, "_")]
			}

			// Increase the total too
			rconn.Send("INCRBY", "STATS_TOTAL", v)
		} else if typ == "m" {
			// Mirror

			mkey := fmt.Sprintf("STATS_MIRROR_%s", date)

			for i := 0; i < 4; i++ {
				rconn.Send("HINCRBY", mkey, object, v)
				mkey = mkey[:strings.LastIndex(mkey, "_")]
			}
		} else if typ == "s" {
			// Bytes

			mkey := fmt.Sprintf("STATS_MIRROR_BYTES_%s", date)

			for i := 0; i < 4; i++ {
				rconn.Send("HINCRBY", mkey, object, v)
				mkey = mkey[:strings.LastIndex(mkey, "_")]
			}
		} else {
			log.Warning("Stats: unknown type", typ)
		}
	}

	_, err := rconn.Do("EXEC")

	if err != nil {
		log.Errorf("Stats: could not save stats to redis: %s", err.Error())
		return
	}

	s.downgraded = false

	// Clear the map
	s.mapStats = make(map[string]int64)
}