Implement TranslatePipeline() as generalized concurrent map iterator.

Integrate `TranslatePipeline()` into datamodel for schema components to replace specialized async logic.
This commit is contained in:
Shawn Poulson
2023-08-01 14:33:31 -04:00
parent 8c78208fb4
commit a8cf9fdaa9
3 changed files with 224 additions and 225 deletions

View File

@@ -4,6 +4,9 @@
package v3 package v3
import ( import (
"sync"
"github.com/pb33f/libopenapi/datamodel"
"github.com/pb33f/libopenapi/datamodel/high" "github.com/pb33f/libopenapi/datamodel/high"
highbase "github.com/pb33f/libopenapi/datamodel/high/base" highbase "github.com/pb33f/libopenapi/datamodel/high/base"
lowmodel "github.com/pb33f/libopenapi/datamodel/low" lowmodel "github.com/pb33f/libopenapi/datamodel/low"
@@ -12,18 +15,6 @@ import (
"gopkg.in/yaml.v3" "gopkg.in/yaml.v3"
) )
// used for internal channel co-ordination for building out different component types.
const (
responses = iota
parameters
examples
requestBodies
headers
securitySchemes
links
callbacks
)
// Components represents a high-level OpenAPI 3+ Components Object, that is backed by a low-level one. // Components represents a high-level OpenAPI 3+ Components Object, that is backed by a low-level one.
// //
// Holds a set of reusable objects for different aspects of the OAS. All objects defined within the components object // Holds a set of reusable objects for different aspects of the OAS. All objects defined within the components object
@@ -61,83 +52,48 @@ func NewComponents(comp *low.Components) *Components {
headerMap := make(map[string]*Header) headerMap := make(map[string]*Header)
securitySchemeMap := make(map[string]*SecurityScheme) securitySchemeMap := make(map[string]*SecurityScheme)
schemas := make(map[string]*highbase.SchemaProxy) schemas := make(map[string]*highbase.SchemaProxy)
schemaChan := make(chan componentResult[*highbase.SchemaProxy])
cbChan := make(chan componentResult[*Callback])
linkChan := make(chan componentResult[*Link])
responseChan := make(chan componentResult[*Response])
paramChan := make(chan componentResult[*Parameter])
exampleChan := make(chan componentResult[*highbase.Example])
requestBodyChan := make(chan componentResult[*RequestBody])
headerChan := make(chan componentResult[*Header])
securitySchemeChan := make(chan componentResult[*SecurityScheme])
// build all components asynchronously. // build all components asynchronously.
for k, v := range comp.Callbacks.Value { var wg sync.WaitGroup
go buildComponent[*Callback, *low.Callback](callbacks, k.Value, v.Value, cbChan, NewCallback) wg.Add(9)
} go func() {
for k, v := range comp.Links.Value { buildComponent[*low.Callback, *Callback](comp.Callbacks.Value, cbMap, NewCallback)
go buildComponent[*Link, *low.Link](links, k.Value, v.Value, linkChan, NewLink) wg.Done()
} }()
for k, v := range comp.Responses.Value { go func() {
go buildComponent[*Response, *low.Response](responses, k.Value, v.Value, responseChan, NewResponse) buildComponent[*low.Link, *Link](comp.Links.Value, linkMap, NewLink)
} wg.Done()
for k, v := range comp.Parameters.Value { }()
go buildComponent[*Parameter, *low.Parameter](parameters, k.Value, v.Value, paramChan, NewParameter) go func() {
} buildComponent[*low.Response, *Response](comp.Responses.Value, responseMap, NewResponse)
for k, v := range comp.Examples.Value { wg.Done()
go buildComponent[*highbase.Example, *base.Example](examples, k.Value, v.Value, exampleChan, highbase.NewExample) }()
} go func() {
for k, v := range comp.RequestBodies.Value { buildComponent[*low.Parameter, *Parameter](comp.Parameters.Value, parameterMap, NewParameter)
go buildComponent[*RequestBody, *low.RequestBody](requestBodies, k.Value, v.Value, wg.Done()
requestBodyChan, NewRequestBody) }()
} go func() {
for k, v := range comp.Headers.Value { buildComponent[*base.Example, *highbase.Example](comp.Examples.Value, exampleMap, highbase.NewExample)
go buildComponent[*Header, *low.Header](headers, k.Value, v.Value, headerChan, NewHeader) wg.Done()
} }()
for k, v := range comp.SecuritySchemes.Value { go func() {
go buildComponent[*SecurityScheme, *low.SecurityScheme](securitySchemes, k.Value, v.Value, buildComponent[*low.RequestBody, *RequestBody](comp.RequestBodies.Value, requestBodyMap, NewRequestBody)
securitySchemeChan, NewSecurityScheme) wg.Done()
} }()
for k, v := range comp.Schemas.Value { go func() {
go buildSchema(k, v, schemaChan) buildComponent[*low.Header, *Header](comp.Headers.Value, headerMap, NewHeader)
} wg.Done()
}()
go func() {
buildComponent[*low.SecurityScheme, *SecurityScheme](comp.SecuritySchemes.Value, securitySchemeMap, NewSecurityScheme)
wg.Done()
}()
go func() {
buildSchema(comp.Schemas.Value, schemas)
wg.Done()
}()
totalComponents := len(comp.Callbacks.Value) + len(comp.Links.Value) + len(comp.Responses.Value) + wg.Wait()
len(comp.Parameters.Value) + len(comp.Examples.Value) + len(comp.RequestBodies.Value) +
len(comp.Headers.Value) + len(comp.SecuritySchemes.Value) + len(comp.Schemas.Value)
processedComponents := 0
for processedComponents < totalComponents {
select {
case sRes := <-schemaChan:
processedComponents++
schemas[sRes.key] = sRes.res
case cbRes := <-cbChan:
processedComponents++
cbMap[cbRes.key] = cbRes.res
case lRes := <-linkChan:
processedComponents++
linkMap[lRes.key] = lRes.res
case respRes := <-responseChan:
processedComponents++
responseMap[respRes.key] = respRes.res
case pRes := <-paramChan:
processedComponents++
parameterMap[pRes.key] = pRes.res
case eRes := <-exampleChan:
processedComponents++
exampleMap[eRes.key] = eRes.res
case rbRes := <-requestBodyChan:
processedComponents++
requestBodyMap[rbRes.key] = rbRes.res
case hRes := <-headerChan:
processedComponents++
headerMap[hRes.key] = hRes.res
case ssRes := <-securitySchemeChan:
processedComponents++
securitySchemeMap[ssRes.key] = ssRes.res
}
}
c.Schemas = schemas c.Schemas = schemas
c.Callbacks = cbMap c.Callbacks = cbMap
c.Links = linkMap c.Links = linkMap
@@ -157,20 +113,33 @@ type componentResult[T any] struct {
comp int comp int
} }
// build out a component. // buildComponent builds component structs from low level structs.
func buildComponent[N any, O any](comp int, key string, orig O, c chan componentResult[N], f func(O) N) { func buildComponent[IN any, OUT any](inMap map[lowmodel.KeyReference[string]]lowmodel.ValueReference[IN], outMap map[string]OUT, translateItem func(IN) OUT) {
c <- componentResult[N]{comp: comp, res: f(orig), key: key} translateFunc := func(key lowmodel.KeyReference[string], value lowmodel.ValueReference[IN]) (componentResult[OUT], error) {
return componentResult[OUT]{key: key.Value, res: translateItem(value.Value)}, nil
}
resultFunc := func(value componentResult[OUT]) error {
outMap[value.key] = value.res
return nil
}
_ = datamodel.TranslateMapParallel(inMap, translateFunc, resultFunc)
} }
// build out a schema // buildSchema builds a schema from low level structs.
func buildSchema(key lowmodel.KeyReference[string], orig lowmodel.ValueReference[*base.SchemaProxy], func buildSchema(inMap map[lowmodel.KeyReference[string]]lowmodel.ValueReference[*base.SchemaProxy], outMap map[string]*highbase.SchemaProxy) {
c chan componentResult[*highbase.SchemaProxy]) { translateFunc := func(key lowmodel.KeyReference[string], value lowmodel.ValueReference[*base.SchemaProxy]) (componentResult[*highbase.SchemaProxy], error) {
var sch *highbase.SchemaProxy var sch *highbase.SchemaProxy
sch = highbase.NewSchemaProxy(&lowmodel.NodeReference[*base.SchemaProxy]{ sch = highbase.NewSchemaProxy(&lowmodel.NodeReference[*base.SchemaProxy]{
Value: orig.Value, Value: value.Value,
ValueNode: orig.ValueNode, ValueNode: value.ValueNode,
}) })
c <- componentResult[*highbase.SchemaProxy]{res: sch, key: key.Value} return componentResult[*highbase.SchemaProxy]{res: sch, key: key.Value}, nil
}
resultFunc := func(value componentResult[*highbase.SchemaProxy]) error {
outMap[value.key] = value.res
return nil
}
_ = datamodel.TranslateMapParallel(inMap, translateFunc, resultFunc)
} }
// GoLow returns the low-level Components instance used to create the high-level one. // GoLow returns the low-level Components instance used to create the high-level one.

View File

@@ -7,7 +7,9 @@ import (
"fmt" "fmt"
"sort" "sort"
"github.com/pb33f/libopenapi/datamodel"
"github.com/pb33f/libopenapi/datamodel/high" "github.com/pb33f/libopenapi/datamodel/high"
lowbase "github.com/pb33f/libopenapi/datamodel/low"
low "github.com/pb33f/libopenapi/datamodel/low/v3" low "github.com/pb33f/libopenapi/datamodel/low/v3"
"github.com/pb33f/libopenapi/utils" "github.com/pb33f/libopenapi/utils"
"gopkg.in/yaml.v3" "gopkg.in/yaml.v3"
@@ -45,29 +47,19 @@ func NewResponses(responses *low.Responses) *Responses {
} }
codes := make(map[string]*Response) codes := make(map[string]*Response)
// struct to hold response and code sent over chan.
type respRes struct { type respRes struct {
code string code string
resp *Response resp *Response
} }
// build each response async for speed translateFunc := func(key lowbase.KeyReference[string], value lowbase.ValueReference[*low.Response]) (respRes, error) {
rChan := make(chan respRes) return respRes{code: key.Value, resp: NewResponse(value.Value)}, nil
var buildResponse = func(code string, resp *low.Response, c chan respRes) {
c <- respRes{code: code, resp: NewResponse(resp)}
} }
for k, v := range responses.Codes { resultFunc := func(value respRes) error {
go buildResponse(k.Value, v.Value, rChan) codes[value.code] = value.resp
} return nil
totalCodes := len(responses.Codes)
codesParsed := 0
for codesParsed < totalCodes {
select {
case re := <-rChan:
codesParsed++
codes[re.code] = re.resp
}
} }
_ = datamodel.TranslateMapParallel[lowbase.KeyReference[string], lowbase.ValueReference[*low.Response], respRes](responses.Codes, translateFunc, resultFunc)
r.Codes = codes r.Codes = codes
return r return r
} }

View File

@@ -4,15 +4,19 @@
package v3 package v3
import ( import (
"context"
"crypto/sha256" "crypto/sha256"
"fmt" "fmt"
"sort"
"strings"
"sync"
"github.com/pb33f/libopenapi/datamodel"
"github.com/pb33f/libopenapi/datamodel/low" "github.com/pb33f/libopenapi/datamodel/low"
"github.com/pb33f/libopenapi/datamodel/low/base" "github.com/pb33f/libopenapi/datamodel/low/base"
"github.com/pb33f/libopenapi/index" "github.com/pb33f/libopenapi/index"
"github.com/pb33f/libopenapi/utils" "github.com/pb33f/libopenapi/utils"
"gopkg.in/yaml.v3" "gopkg.in/yaml.v3"
"sort"
"strings"
) )
// Components represents a low-level OpenAPI 3+ Components Object, that is backed by a low-level one. // Components represents a low-level OpenAPI 3+ Components Object, that is backed by a low-level one.
@@ -126,74 +130,84 @@ func (co *Components) FindCallback(callback string) *low.ValueReference[*Callbac
return low.FindItemInMap[*Callback](callback, co.Callbacks.Value) return low.FindItemInMap[*Callback](callback, co.Callbacks.Value)
} }
// Build converts root YAML node containing components to low level model.
// Process each component in parallel.
func (co *Components) Build(root *yaml.Node, idx *index.SpecIndex) error { func (co *Components) Build(root *yaml.Node, idx *index.SpecIndex) error {
root = utils.NodeAlias(root) root = utils.NodeAlias(root)
utils.CheckForMergeNodes(root) utils.CheckForMergeNodes(root)
co.Reference = new(low.Reference) co.Reference = new(low.Reference)
co.Extensions = low.ExtractExtensions(root) co.Extensions = low.ExtractExtensions(root)
// build out components asynchronously for speed. There could be some significant weight here. var reterr error
skipChan := make(chan bool) var ceMutex sync.Mutex
errorChan := make(chan error) var wg sync.WaitGroup
paramChan := make(chan low.NodeReference[map[low.KeyReference[string]]low.ValueReference[*Parameter]]) wg.Add(9)
schemaChan := make(chan low.NodeReference[map[low.KeyReference[string]]low.ValueReference[*base.SchemaProxy]])
responsesChan := make(chan low.NodeReference[map[low.KeyReference[string]]low.ValueReference[*Response]])
examplesChan := make(chan low.NodeReference[map[low.KeyReference[string]]low.ValueReference[*base.Example]])
requestBodiesChan := make(chan low.NodeReference[map[low.KeyReference[string]]low.ValueReference[*RequestBody]])
headersChan := make(chan low.NodeReference[map[low.KeyReference[string]]low.ValueReference[*Header]])
securitySchemesChan := make(chan low.NodeReference[map[low.KeyReference[string]]low.ValueReference[*SecurityScheme]])
linkChan := make(chan low.NodeReference[map[low.KeyReference[string]]low.ValueReference[*Link]])
callbackChan := make(chan low.NodeReference[map[low.KeyReference[string]]low.ValueReference[*Callback]])
go extractComponentValues[*base.SchemaProxy](SchemasLabel, root, skipChan, errorChan, schemaChan, idx) captureError := func(err error) {
go extractComponentValues[*Parameter](ParametersLabel, root, skipChan, errorChan, paramChan, idx) ceMutex.Lock()
go extractComponentValues[*Response](ResponsesLabel, root, skipChan, errorChan, responsesChan, idx) defer ceMutex.Unlock()
go extractComponentValues[*base.Example](base.ExamplesLabel, root, skipChan, errorChan, examplesChan, idx) if err != nil {
go extractComponentValues[*RequestBody](RequestBodiesLabel, root, skipChan, errorChan, requestBodiesChan, idx) reterr = err
go extractComponentValues[*Header](HeadersLabel, root, skipChan, errorChan, headersChan, idx)
go extractComponentValues[*SecurityScheme](SecuritySchemesLabel, root, skipChan, errorChan, securitySchemesChan, idx)
go extractComponentValues[*Link](LinksLabel, root, skipChan, errorChan, linkChan, idx)
go extractComponentValues[*Callback](CallbacksLabel, root, skipChan, errorChan, callbackChan, idx)
n := 0
total := 9
for n < total {
select {
case buildError := <-errorChan:
return buildError
case <-skipChan:
n++
case params := <-paramChan:
co.Parameters = params
n++
case schemas := <-schemaChan:
co.Schemas = schemas
n++
case responses := <-responsesChan:
co.Responses = responses
n++
case examples := <-examplesChan:
co.Examples = examples
n++
case reqBody := <-requestBodiesChan:
co.RequestBodies = reqBody
n++
case headers := <-headersChan:
co.Headers = headers
n++
case sScheme := <-securitySchemesChan:
co.SecuritySchemes = sScheme
n++
case links := <-linkChan:
co.Links = links
n++
case callbacks := <-callbackChan:
co.Callbacks = callbacks
n++
} }
} }
return nil
go func() {
schemas, err := extractComponentValues[*base.SchemaProxy](SchemasLabel, root, idx)
captureError(err)
co.Schemas = schemas
wg.Done()
}()
go func() {
parameters, err := extractComponentValues[*Parameter](ParametersLabel, root, idx)
captureError(err)
co.Parameters = parameters
wg.Done()
}()
go func() {
responses, err := extractComponentValues[*Response](ResponsesLabel, root, idx)
captureError(err)
co.Responses = responses
wg.Done()
}()
go func() {
examples, err := extractComponentValues[*base.Example](base.ExamplesLabel, root, idx)
captureError(err)
co.Examples = examples
wg.Done()
}()
go func() {
requestBodies, err := extractComponentValues[*RequestBody](RequestBodiesLabel, root, idx)
captureError(err)
co.RequestBodies = requestBodies
wg.Done()
}()
go func() {
headers, err := extractComponentValues[*Header](HeadersLabel, root, idx)
captureError(err)
co.Headers = headers
wg.Done()
}()
go func() {
securitySchemes, err := extractComponentValues[*SecurityScheme](SecuritySchemesLabel, root, idx)
captureError(err)
co.SecuritySchemes = securitySchemes
wg.Done()
}()
go func() {
links, err := extractComponentValues[*Link](LinksLabel, root, idx)
captureError(err)
co.Links = links
wg.Done()
}()
go func() {
callbacks, err := extractComponentValues[*Callback](CallbacksLabel, root, idx)
captureError(err)
co.Callbacks = callbacks
wg.Done()
}()
wg.Wait()
return reterr
} }
type componentBuildResult[T any] struct { type componentBuildResult[T any] struct {
@@ -201,81 +215,105 @@ type componentBuildResult[T any] struct {
v low.ValueReference[T] v low.ValueReference[T]
} }
func extractComponentValues[T low.Buildable[N], N any](label string, root *yaml.Node, // extractComponentValues converts all the YAML nodes of a component type to
skip chan bool, errorChan chan<- error, resultChan chan<- low.NodeReference[map[low.KeyReference[string]]low.ValueReference[T]], idx *index.SpecIndex) { // low level model.
// Process each node in parallel.
func extractComponentValues[T low.Buildable[N], N any](label string, root *yaml.Node, idx *index.SpecIndex) (retval low.NodeReference[map[low.KeyReference[string]]low.ValueReference[T]], _ error) {
_, nodeLabel, nodeValue := utils.FindKeyNodeFullTop(label, root.Content) _, nodeLabel, nodeValue := utils.FindKeyNodeFullTop(label, root.Content)
if nodeValue == nil { if nodeValue == nil {
skip <- true return retval, nil
return
} }
var currentLabel *yaml.Node
componentValues := make(map[low.KeyReference[string]]low.ValueReference[T]) componentValues := make(map[low.KeyReference[string]]low.ValueReference[T])
if utils.IsNodeArray(nodeValue) { if utils.IsNodeArray(nodeValue) {
errorChan <- fmt.Errorf("node is array, cannot be used in components: line %d, column %d", nodeValue.Line, nodeValue.Column) return retval, fmt.Errorf("node is array, cannot be used in components: line %d, column %d", nodeValue.Line, nodeValue.Column)
return
} }
// for every component, build in a new thread! type inputValue struct {
bChan := make(chan componentBuildResult[T]) node *yaml.Node
eChan := make(chan error) currentLabel *yaml.Node
var buildComponent = func(parentLabel string, label *yaml.Node, value *yaml.Node, c chan componentBuildResult[T], ec chan<- error) { }
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
in := make(chan inputValue)
out := make(chan componentBuildResult[T])
var wg sync.WaitGroup
wg.Add(2) // input and output goroutines.
// Send input.
go func() {
defer wg.Done()
var currentLabel *yaml.Node
for i, node := range nodeValue.Content {
// always ignore extensions
if i%2 == 0 {
currentLabel = node
continue
}
// only check for lowercase extensions as 'X-' is still valid as a key (annoyingly).
if strings.HasPrefix(currentLabel.Value, "x-") {
continue
}
select {
case in <- inputValue{
node: node,
currentLabel: currentLabel,
}:
case <-ctx.Done():
return
}
}
close(in)
}()
// Collect output.
go func() {
for result := range out {
componentValues[result.k] = result.v
}
cancel()
wg.Done()
}()
// Translate.
translateFunc := func(value inputValue) (retval componentBuildResult[T], _ error) {
var n T = new(N) var n T = new(N)
currentLabel := value.currentLabel
node := value.node
// if this is a reference, extract it (although components with references is an antipattern) // if this is a reference, extract it (although components with references is an antipattern)
// If you're building components as references... pls... stop, this code should not need to be here. // If you're building components as references... pls... stop, this code should not need to be here.
// TODO: check circular crazy on this. It may explode // TODO: check circular crazy on this. It may explode
var err error var err error
if h, _, _ := utils.IsNodeRefValue(value); h && parentLabel != SchemasLabel { if h, _, _ := utils.IsNodeRefValue(node); h && label != SchemasLabel {
value, err = low.LocateRefNode(value, idx) node, err = low.LocateRefNode(node, idx)
} }
if err != nil { if err != nil {
ec <- err return retval, err
return
} }
// build. // build.
_ = low.BuildModel(value, n) _ = low.BuildModel(node, n)
// todo: label is a key or? err = n.Build(currentLabel, node, idx)
err = n.Build(label, value, idx)
if err != nil { if err != nil {
ec <- err return retval, err
return
} }
c <- componentBuildResult[T]{ return componentBuildResult[T]{
k: low.KeyReference[string]{ k: low.KeyReference[string]{
KeyNode: label, KeyNode: currentLabel,
Value: label.Value, Value: currentLabel.Value,
}, },
v: low.ValueReference[T]{ v: low.ValueReference[T]{
Value: n, Value: n,
ValueNode: value, ValueNode: node,
}, },
} }, nil
} }
totalComponents := 0 err := datamodel.TranslatePipeline[inputValue, componentBuildResult[T]](in, out, translateFunc)
for i, v := range nodeValue.Content { wg.Wait()
// always ignore extensions if err != nil {
if i%2 == 0 { return retval, err
currentLabel = v
continue
}
// only check for lowercase extensions as 'X-' is still valid as a key (annoyingly).
if strings.HasPrefix(currentLabel.Value, "x-") {
continue
}
totalComponents++
go buildComponent(label, currentLabel, v, bChan, eChan)
}
completedComponents := 0
for completedComponents < totalComponents {
select {
case e := <-eChan:
errorChan <- e
case r := <-bChan:
componentValues[r.k] = r.v
completedComponents++
}
} }
results := low.NodeReference[map[low.KeyReference[string]]low.ValueReference[T]]{ results := low.NodeReference[map[low.KeyReference[string]]low.ValueReference[T]]{
@@ -283,5 +321,5 @@ func extractComponentValues[T low.Buildable[N], N any](label string, root *yaml.
ValueNode: nodeValue, ValueNode: nodeValue,
Value: componentValues, Value: componentValues,
} }
resultChan <- results return results, nil
} }