File: service_logs.go

package info (click to toggle)
vagrant 2.3.7%2Bgit20230731.5fc64cde%2Bdfsg-3
  • links: PTS, VCS
  • area: main
  • in suites: trixie
  • size: 17,616 kB
  • sloc: ruby: 111,820; sh: 462; makefile: 123; ansic: 34; lisp: 1
file content (142 lines) | stat: -rw-r--r-- 4,147 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
package singleprocess

import (
	//	"sync"

	//"github.com/hashicorp/go-hclog"
	//	"github.com/hashicorp/go-memdb"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/status"

	"github.com/hashicorp/vagrant/internal/server/proto/vagrant_server"
	//	"github.com/hashicorp/vagrant/internal/server/singleprocess/state"
)

// defaultLogLimitBacklog is the default backlog amount to send down.
const defaultLogLimitBacklog = 100

// TODO: test
func (s *service) GetLogStream(
	req *vagrant_server.GetLogStreamRequest,
	srv vagrant_server.Vagrant_GetLogStreamServer,
) error {
	//log := hclog.FromContext(srv.Context())

	// Default the limit
	if req.LimitBacklog == 0 {
		req.LimitBacklog = defaultLogLimitBacklog
	}

	//	var instanceFunc func(ws memdb.WatchSet) ([]*state.Instance, error)
	switch scope := req.Scope.(type) {
	// case *vagrant_server.GetLogStreamRequest_DeploymentId:
	// 	log = log.With("deployment_id", scope.DeploymentId)
	// 	instanceFunc = func(ws memdb.WatchSet) ([]*state.Instance, error) {
	// 		return s.state.InstancesByDeployment(scope.DeploymentId, ws)
	// 	}

	// case *vagrant_server.GetLogStreamRequest_Application_:
	// 	if scope.Application == nil ||
	// 		scope.Application.Application == nil ||
	// 		scope.Application.Workspace == nil {
	// 		return status.Errorf(
	// 			codes.FailedPrecondition,
	// 			"application scope requires the application and workspace fields to be set",
	// 		)
	// 	}

	// 	log = log.With(
	// 		"project", scope.Application.Application.Project,
	// 		"application", scope.Application.Application.Application,
	// 		"workspace", scope.Application.Workspace.Workspace,
	// 	)
	// 	instanceFunc = func(ws memdb.WatchSet) ([]*state.Instance, error) {
	// 		return s.state.InstancesByApp(
	// 			scope.Application.Application,
	// 			scope.Application.Workspace,
	// 			ws,
	// 		)
	// 	}

	default:
		return status.Errorf(
			codes.FailedPrecondition,
			"invalid scope supplied: %T - %T",
			req.Scope, scope,
		)
	}

	// We keep track of what instances we already have readers for here.
	//	var instanceSetLock sync.Mutex
	//instanceSet := make(map[string]struct{})

	// // We loop forever so that we can automatically get any new instances that
	// // join as we have an open log stream.
	// for {
	// 	// Get all our records
	// 	ws := memdb.NewWatchSet()
	// 	records, err := instanceFunc(ws)
	// 	if err != nil {
	// 		return err
	// 	}
	// 	log.Trace("instances loaded", "len", len(records))

	// 	// For each record, start a goroutine that reads the log entries and sends them.
	// 	for _, record := range records {
	// 		instanceId := record.Id
	// 		deploymentId := record.DeploymentId

	// 		// If we already have a reader for this, then do nothing.
	// 		instanceSetLock.Lock()
	// 		_, exit := instanceSet[instanceId]
	// 		instanceSet[instanceId] = struct{}{}
	// 		instanceSetLock.Unlock()
	// 		if exit {
	// 			continue
	// 		}

	// 		// Start our reader up
	// 		r := record.LogBuffer.Reader(req.LimitBacklog)
	// 		instanceLog := log.With("instance_id", instanceId)
	// 		instanceLog.Trace("instance log stream starting")
	// 		go r.CloseContext(srv.Context())
	// 		go func() {
	// 			defer instanceLog.Debug("instance log stream ending")
	// 			defer func() {
	// 				instanceSetLock.Lock()
	// 				defer instanceSetLock.Unlock()
	// 				delete(instanceSet, instanceId)
	// 			}()

	// 			for {
	// 				entries := r.Read(64, true)
	// 				if entries == nil {
	// 					return
	// 				}

	// 				lines := make([]*vagrant_server.LogBatch_Entry, len(entries))
	// 				for i, v := range entries {
	// 					lines[i] = v.(*vagrant_server.LogBatch_Entry)
	// 				}

	// 				instanceLog.Trace("sending instance log data", "entries", len(entries))
	// 				srv.Send(&vagrant_server.LogBatch{
	// 					DeploymentId: deploymentId,
	// 					InstanceId:   instanceId,
	// 					Lines:        lines,
	// 				})
	// 			}
	// 		}()
	// 	}

	// // Wait for changes or to be done
	// if err := ws.WatchCtx(srv.Context()); err != nil {
	// 	// If our context ended, exit with that
	// 	if err := srv.Context().Err(); err != nil {
	// 		return err
	// 	}

	// 	return err
	// }
	//	}
}