Skip to content

Commit

Permalink
DNS V2 - Revise discovery result to have service and node name and ad…
Browse files Browse the repository at this point in the history
…dress fields. (hashicorp#20468)

* DNS V2 - Revise discovery result to have service and node name and address fields.

* NET-7488 - dns v2 add support for prepared queries in catalog v1 data model (hashicorp#20470)

NET-7488 - dns v2 add support for prepared queries in catalog v1 data model.
  • Loading branch information
jmurret authored Feb 3, 2024
1 parent 9602b43 commit 602e3c4
Show file tree
Hide file tree
Showing 13 changed files with 563 additions and 419 deletions.
22 changes: 13 additions & 9 deletions agent/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,11 @@ type QueryTenancy struct {
// QueryPayload represents all information needed by the data backend
// to decide which records to include.
type QueryPayload struct {
Name string
PortName string // v1 - this could optionally be "connect" or "ingress"; v2 - this is the service port name
Tag string // deprecated: use for V1 only
RemoteAddr net.Addr // deprecated: used for prepared queries
Tenancy QueryTenancy // tenancy includes any additional labels specified before the domain
Name string
PortName string // v1 - this could optionally be "connect" or "ingress"; v2 - this is the service port name
Tag string // deprecated: use for V1 only
SourceIP net.IP // deprecated: used for prepared queries
Tenancy QueryTenancy // tenancy includes any additional labels specified before the domain

// v2 fields only
EnableFailover bool
Expand All @@ -104,19 +104,23 @@ const (
// It is the responsibility of the DNS encoder to know what to do with
// each Result, based on the query type.
type Result struct {
Address string // A/AAAA/CNAME records - could be used in the Extra section. CNAME is required to handle hostname addresses in workloads & nodes.
Service *Location // The name and address of the service.
Node *Location // The name and address of the node.
Weight uint32 // SRV queries
PortName string // Used to generate a fgdn when a specifc port was queried
PortNumber uint32 // SRV queries
Metadata map[string]string // Used to collect metadata into TXT Records
Type ResultType // Used to reconstruct the fqdn name of the resource

// Used in SRV & PTR queries to point at an A/AAAA Record.
Target string

Tenancy ResultTenancy
}

// Location is used to represent a service, node, or workload.
type Location struct {
Name string
Address string
}

// ResultTenancy is used to reconstruct the fqdn name of the resource.
type ResultTenancy struct {
Namespace string
Expand Down
4 changes: 2 additions & 2 deletions agent/discovery/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ var (
}

testResult = &Result{
Address: "1.2.3.4",
Node: &Location{Address: "1.2.3.4"},
Type: ResultTypeNode, // This isn't correct for some test cases, but we are only asserting the right data fetcher functions are called
Target: "foo",
Service: &Location{Name: "foo"},
}
)

Expand Down
252 changes: 182 additions & 70 deletions agent/discovery/query_fetcher_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ type v1DataFetcherDynamicConfig struct {
// Default request tenancy
datacenter string

segmentName string
nodeName string
nodePartition string

// Catalog configuration
allowStale bool
maxStale time.Duration
Expand Down Expand Up @@ -115,17 +119,19 @@ func (f *V1DataFetcher) FetchNodes(ctx Context, req *QueryPayload) ([]*Result, e
}

results := make([]*Result, 0, 1)
node := out.NodeServices.Node
n := out.NodeServices.Node

results = append(results, &Result{
Address: node.Address,
Node: &Location{
Name: n.Node,
Address: n.Address,
},
Type: ResultTypeNode,
Metadata: node.Meta,
Target: node.Node,
Metadata: n.Meta,
Tenancy: ResultTenancy{
// Namespace is not required because nodes are not namespaced
Partition: node.GetEnterpriseMeta().PartitionOrDefault(),
Datacenter: node.Datacenter,
Partition: n.GetEnterpriseMeta().PartitionOrDefault(),
Datacenter: n.Datacenter,
},
})

Expand Down Expand Up @@ -163,8 +169,11 @@ func (f *V1DataFetcher) FetchVirtualIP(ctx Context, req *QueryPayload) (*Result,
}

result := &Result{
Address: out,
Type: ResultTypeVirtual,
Service: &Location{
Name: req.Name,
Address: out,
},
Type: ResultTypeVirtual,
}
return result, nil
}
Expand Down Expand Up @@ -196,9 +205,11 @@ func (f *V1DataFetcher) FetchRecordsByIp(reqCtx Context, ip net.IP) ([]*Result,
for _, n := range out.Nodes {
if targetIP == n.Address {
results = append(results, &Result{
Address: n.Address,
Type: ResultTypeNode,
Target: n.Node,
Node: &Location{
Name: n.Node,
Address: n.Address,
},
Type: ResultTypeNode,
Tenancy: ResultTenancy{
Namespace: f.defaultEnterpriseMeta.NamespaceOrDefault(),
Partition: f.defaultEnterpriseMeta.PartitionOrDefault(),
Expand Down Expand Up @@ -226,13 +237,19 @@ func (f *V1DataFetcher) FetchRecordsByIp(reqCtx Context, ip net.IP) ([]*Result,
for _, n := range sout.ServiceNodes {
if n.ServiceAddress == targetIP {
results = append(results, &Result{
Address: n.ServiceAddress,
Type: ResultTypeService,
Target: n.ServiceName,
Service: &Location{
Name: n.ServiceName,
Address: n.ServiceAddress,
},
Type: ResultTypeService,
Node: &Location{
Name: n.Node,
Address: n.Address,
},
Tenancy: ResultTenancy{
Namespace: f.defaultEnterpriseMeta.NamespaceOrDefault(),
Partition: f.defaultEnterpriseMeta.PartitionOrDefault(),
Datacenter: configCtx.datacenter,
Namespace: n.NamespaceOrEmpty(),
Partition: n.PartitionOrEmpty(),
Datacenter: n.Datacenter,
},
})
return results, nil
Expand All @@ -256,7 +273,119 @@ func (f *V1DataFetcher) FetchWorkload(ctx Context, req *QueryPayload) (*Result,
// FetchPreparedQuery evaluates the results of a prepared query.
// deprecated in V2
func (f *V1DataFetcher) FetchPreparedQuery(ctx Context, req *QueryPayload) ([]*Result, error) {
return nil, nil
cfg := f.dynamicConfig.Load().(*v1DataFetcherDynamicConfig)

// Execute the prepared query.
args := structs.PreparedQueryExecuteRequest{
Datacenter: req.Tenancy.Datacenter,
QueryIDOrName: req.Name,
QueryOptions: structs.QueryOptions{
Token: ctx.Token,
AllowStale: cfg.allowStale,
MaxAge: cfg.cacheMaxAge,
},

// Always pass the local agent through. In the DNS interface, there
// is no provision for passing additional query parameters, so we
// send the local agent's data through to allow distance sorting
// relative to ourself on the server side.
Agent: structs.QuerySource{
Datacenter: cfg.datacenter,
Segment: cfg.segmentName,
Node: cfg.nodeName,
NodePartition: cfg.nodePartition,
},
Source: structs.QuerySource{
Ip: req.SourceIP.String(),
},
}

out, err := f.executePreparedQuery(cfg, args)
if err != nil {
return nil, err
}

// (v2-dns) TODO: (v2-dns) get TTLS working. They come from the database so not having
// TTL on the discovery result poses challenges.

/*
// TODO (slackpad) - What's a safe limit we can set here? It seems like
// with dup filtering done at this level we need to get everything to
// match the previous behavior. We can optimize by pushing more filtering
// into the query execution, but for now I think we need to get the full
// response. We could also choose a large arbitrary number that will
// likely work in practice, like 10*maxUDPAnswerLimit which should help
// reduce bandwidth if there are thousands of nodes available.
// Determine the TTL. The parse should never fail since we vet it when
// the query is created, but we check anyway. If the query didn't
// specify a TTL then we will try to use the agent's service-specific
// TTL configs.
var ttl time.Duration
if out.DNS.TTL != "" {
var err error
ttl, err = time.ParseDuration(out.DNS.TTL)
if err != nil {
f.logger.Warn("Failed to parse TTL for prepared query , ignoring",
"ttl", out.DNS.TTL,
"prepared_query", req.Name,
)
}
} else {
ttl, _ = cfg.GetTTLForService(out.Service)
}
*/

// If we have no nodes, return not found!
if len(out.Nodes) == 0 {
return nil, ErrNoData
}

// Perform a random shuffle
out.Nodes.Shuffle()
return f.buildResultsFromServiceNodes(out.Nodes), nil
}

// executePreparedQuery is used to execute a PreparedQuery against the Consul catalog.
// If the config is set to UseCache, it will use agent cache.
func (f *V1DataFetcher) executePreparedQuery(cfg *v1DataFetcherDynamicConfig, args structs.PreparedQueryExecuteRequest) (*structs.PreparedQueryExecuteResponse, error) {
var out structs.PreparedQueryExecuteResponse

RPC:
if cfg.useCache {
raw, m, err := f.getFromCacheFunc(context.TODO(), cachetype.PreparedQueryName, &args)
if err != nil {
return nil, err
}
reply, ok := raw.(*structs.PreparedQueryExecuteResponse)
if !ok {
// This should never happen, but we want to protect against panics
return nil, err
}

f.logger.Trace("cache results for prepared query",
"cache_hit", m.Hit,
"prepared_query", args.QueryIDOrName,
)

out = *reply
} else {
if err := f.rpcFunc(context.Background(), "PreparedQuery.Execute", &args, &out); err != nil {
return nil, err
}
}

// Verify that request is not too stale, redo the request.
if args.AllowStale {
if out.LastContact > cfg.maxStale {
args.AllowStale = false
f.logger.Warn("Query results too stale, re-requesting")
goto RPC
} else if out.LastContact > staleCounterThreshold {
metrics.IncrCounter([]string{"dns", "stale_queries"}, 1)
}
}

return &out, nil
}

func (f *V1DataFetcher) ValidateRequest(_ Context, req *QueryPayload) error {
Expand All @@ -269,6 +398,34 @@ func (f *V1DataFetcher) ValidateRequest(_ Context, req *QueryPayload) error {
return validateEnterpriseTenancy(req.Tenancy)
}

// buildResultsFromServiceNodes builds a list of results from a list of nodes.
func (f *V1DataFetcher) buildResultsFromServiceNodes(nodes []structs.CheckServiceNode) []*Result {
results := make([]*Result, 0)
for _, n := range nodes {

results = append(results, &Result{
Service: &Location{
Name: n.Service.Service,
Address: n.Service.Address,
},
Node: &Location{
Name: n.Node.Node,
Address: n.Node.Address,
},
Type: ResultTypeService,
Weight: uint32(findWeight(n)),
PortNumber: uint32(f.translateServicePortFunc(n.Node.Datacenter, n.Service.Port, n.Service.TaggedAddresses)),
Metadata: n.Node.Meta,
Tenancy: ResultTenancy{
Namespace: n.Service.NamespaceOrEmpty(),
Partition: n.Service.PartitionOrEmpty(),
Datacenter: n.Node.Datacenter,
},
})
}
return results
}

// fetchNode is used to look up a node in the Consul catalog within NodeServices.
// If the config is set to UseCache, it will get the record from the agent cache.
func (f *V1DataFetcher) fetchNode(cfg *v1DataFetcherDynamicConfig, args *structs.NodeSpecificRequest) (*structs.IndexedNodeServices, error) {
Expand Down Expand Up @@ -353,7 +510,12 @@ func (f *V1DataFetcher) fetchServiceBasedOnTenancy(ctx Context, req *QueryPayloa

out, _, err := f.rpcFuncForServiceNodes(context.TODO(), args)
if err != nil {
return nil, err
return nil, fmt.Errorf("rpc request failed: %w", err)
}

// If we have no nodes, return not found!
if len(out.Nodes) == 0 {
return nil, ErrNoData
}

// Filter out any service nodes due to health checks
Expand All @@ -372,57 +534,7 @@ func (f *V1DataFetcher) fetchServiceBasedOnTenancy(ctx Context, req *QueryPayloa

// Perform a random shuffle
out.Nodes.Shuffle()
results := make([]*Result, 0, len(out.Nodes))
for _, node := range out.Nodes {
address, target, resultType := getAddressTargetAndResultType(node)

results = append(results, &Result{
Address: address,
Type: resultType,
Target: target,
Weight: uint32(findWeight(node)),
PortNumber: uint32(f.translateServicePortFunc(node.Node.Datacenter, node.Service.Port, node.Service.TaggedAddresses)),
Metadata: node.Node.Meta,
Tenancy: ResultTenancy{
Namespace: node.Service.NamespaceOrEmpty(),
Partition: node.Service.PartitionOrEmpty(),
Datacenter: node.Node.Datacenter,
},
})
}

return results, nil
}

// getAddressTargetAndResultType returns the address, target and result type for a check service node.
func getAddressTargetAndResultType(node structs.CheckServiceNode) (string, string, ResultType) {
// Set address and target
// if service address is present, set target and address based on service.
// otherwise get it from the node.
address := node.Service.Address
target := node.Service.Service
resultType := ResultTypeService

addressIP := net.ParseIP(address)
if addressIP == nil {
resultType = ResultTypeNode
if node.Service.Address != "" {
// cases where service address is foo or foo.node.consul
// For usage in DNS, these discovery results necessitate a CNAME record.
// These cases can be inferred from the discovery result when Type is Node and
// target is not an IP.
target = node.Service.Address
} else {
// cases where service address is empty and the service is bound to
// node with an address. These do not require a CNAME record in.
// For usage in DNS, these discovery results do not require a CNAME record.
// These cases can be inferred from the discovery result when Type is Node and
// target is not an IP.
target = node.Node.Node
}
address = node.Node.Address
}
return address, target, resultType
return f.buildResultsFromServiceNodes(out.Nodes), nil
}

// findWeight returns the weight of a service node.
Expand Down
Loading

0 comments on commit 602e3c4

Please sign in to comment.