From a8cf9fdaa99abae4d883e8553c7fc9bee2031417 Mon Sep 17 00:00:00 2001 From: Shawn Poulson Date: Tue, 1 Aug 2023 14:33:31 -0400 Subject: [PATCH] Implement `TranslatePipeline()` as generalized concurrent map iterator. Integrate `TranslatePipeline()` into datamodel for schema components to replace specialized async logic. --- datamodel/high/v3/components.go | 165 ++++++++------------ datamodel/high/v3/responses.go | 24 +-- datamodel/low/v3/components.go | 260 ++++++++++++++++++-------------- 3 files changed, 224 insertions(+), 225 deletions(-) diff --git a/datamodel/high/v3/components.go b/datamodel/high/v3/components.go index afdc52a..cd9555f 100644 --- a/datamodel/high/v3/components.go +++ b/datamodel/high/v3/components.go @@ -4,6 +4,9 @@ package v3 import ( + "sync" + + "github.com/pb33f/libopenapi/datamodel" "github.com/pb33f/libopenapi/datamodel/high" highbase "github.com/pb33f/libopenapi/datamodel/high/base" lowmodel "github.com/pb33f/libopenapi/datamodel/low" @@ -12,18 +15,6 @@ import ( "gopkg.in/yaml.v3" ) -// used for internal channel co-ordination for building out different component types. -const ( - responses = iota - parameters - examples - requestBodies - headers - securitySchemes - links - callbacks -) - // Components represents a high-level OpenAPI 3+ Components Object, that is backed by a low-level one. // // Holds a set of reusable objects for different aspects of the OAS. All objects defined within the components object @@ -61,83 +52,48 @@ func NewComponents(comp *low.Components) *Components { headerMap := make(map[string]*Header) securitySchemeMap := make(map[string]*SecurityScheme) schemas := make(map[string]*highbase.SchemaProxy) - schemaChan := make(chan componentResult[*highbase.SchemaProxy]) - cbChan := make(chan componentResult[*Callback]) - linkChan := make(chan componentResult[*Link]) - responseChan := make(chan componentResult[*Response]) - paramChan := make(chan componentResult[*Parameter]) - exampleChan := make(chan componentResult[*highbase.Example]) - requestBodyChan := make(chan componentResult[*RequestBody]) - headerChan := make(chan componentResult[*Header]) - securitySchemeChan := make(chan componentResult[*SecurityScheme]) // build all components asynchronously. - for k, v := range comp.Callbacks.Value { - go buildComponent[*Callback, *low.Callback](callbacks, k.Value, v.Value, cbChan, NewCallback) - } - for k, v := range comp.Links.Value { - go buildComponent[*Link, *low.Link](links, k.Value, v.Value, linkChan, NewLink) - } - for k, v := range comp.Responses.Value { - go buildComponent[*Response, *low.Response](responses, k.Value, v.Value, responseChan, NewResponse) - } - for k, v := range comp.Parameters.Value { - go buildComponent[*Parameter, *low.Parameter](parameters, k.Value, v.Value, paramChan, NewParameter) - } - for k, v := range comp.Examples.Value { - go buildComponent[*highbase.Example, *base.Example](examples, k.Value, v.Value, exampleChan, highbase.NewExample) - } - for k, v := range comp.RequestBodies.Value { - go buildComponent[*RequestBody, *low.RequestBody](requestBodies, k.Value, v.Value, - requestBodyChan, NewRequestBody) - } - for k, v := range comp.Headers.Value { - go buildComponent[*Header, *low.Header](headers, k.Value, v.Value, headerChan, NewHeader) - } - for k, v := range comp.SecuritySchemes.Value { - go buildComponent[*SecurityScheme, *low.SecurityScheme](securitySchemes, k.Value, v.Value, - securitySchemeChan, NewSecurityScheme) - } - for k, v := range comp.Schemas.Value { - go buildSchema(k, v, schemaChan) - } + var wg sync.WaitGroup + wg.Add(9) + go func() { + buildComponent[*low.Callback, *Callback](comp.Callbacks.Value, cbMap, NewCallback) + wg.Done() + }() + go func() { + buildComponent[*low.Link, *Link](comp.Links.Value, linkMap, NewLink) + wg.Done() + }() + go func() { + buildComponent[*low.Response, *Response](comp.Responses.Value, responseMap, NewResponse) + wg.Done() + }() + go func() { + buildComponent[*low.Parameter, *Parameter](comp.Parameters.Value, parameterMap, NewParameter) + wg.Done() + }() + go func() { + buildComponent[*base.Example, *highbase.Example](comp.Examples.Value, exampleMap, highbase.NewExample) + wg.Done() + }() + go func() { + buildComponent[*low.RequestBody, *RequestBody](comp.RequestBodies.Value, requestBodyMap, NewRequestBody) + wg.Done() + }() + go func() { + buildComponent[*low.Header, *Header](comp.Headers.Value, headerMap, NewHeader) + wg.Done() + }() + go func() { + buildComponent[*low.SecurityScheme, *SecurityScheme](comp.SecuritySchemes.Value, securitySchemeMap, NewSecurityScheme) + wg.Done() + }() + go func() { + buildSchema(comp.Schemas.Value, schemas) + wg.Done() + }() - totalComponents := len(comp.Callbacks.Value) + len(comp.Links.Value) + len(comp.Responses.Value) + - len(comp.Parameters.Value) + len(comp.Examples.Value) + len(comp.RequestBodies.Value) + - len(comp.Headers.Value) + len(comp.SecuritySchemes.Value) + len(comp.Schemas.Value) - - processedComponents := 0 - for processedComponents < totalComponents { - select { - case sRes := <-schemaChan: - processedComponents++ - schemas[sRes.key] = sRes.res - case cbRes := <-cbChan: - processedComponents++ - cbMap[cbRes.key] = cbRes.res - case lRes := <-linkChan: - processedComponents++ - linkMap[lRes.key] = lRes.res - case respRes := <-responseChan: - processedComponents++ - responseMap[respRes.key] = respRes.res - case pRes := <-paramChan: - processedComponents++ - parameterMap[pRes.key] = pRes.res - case eRes := <-exampleChan: - processedComponents++ - exampleMap[eRes.key] = eRes.res - case rbRes := <-requestBodyChan: - processedComponents++ - requestBodyMap[rbRes.key] = rbRes.res - case hRes := <-headerChan: - processedComponents++ - headerMap[hRes.key] = hRes.res - case ssRes := <-securitySchemeChan: - processedComponents++ - securitySchemeMap[ssRes.key] = ssRes.res - } - } + wg.Wait() c.Schemas = schemas c.Callbacks = cbMap c.Links = linkMap @@ -157,20 +113,33 @@ type componentResult[T any] struct { comp int } -// build out a component. -func buildComponent[N any, O any](comp int, key string, orig O, c chan componentResult[N], f func(O) N) { - c <- componentResult[N]{comp: comp, res: f(orig), key: key} +// buildComponent builds component structs from low level structs. +func buildComponent[IN any, OUT any](inMap map[lowmodel.KeyReference[string]]lowmodel.ValueReference[IN], outMap map[string]OUT, translateItem func(IN) OUT) { + translateFunc := func(key lowmodel.KeyReference[string], value lowmodel.ValueReference[IN]) (componentResult[OUT], error) { + return componentResult[OUT]{key: key.Value, res: translateItem(value.Value)}, nil + } + resultFunc := func(value componentResult[OUT]) error { + outMap[value.key] = value.res + return nil + } + _ = datamodel.TranslateMapParallel(inMap, translateFunc, resultFunc) } -// build out a schema -func buildSchema(key lowmodel.KeyReference[string], orig lowmodel.ValueReference[*base.SchemaProxy], - c chan componentResult[*highbase.SchemaProxy]) { - var sch *highbase.SchemaProxy - sch = highbase.NewSchemaProxy(&lowmodel.NodeReference[*base.SchemaProxy]{ - Value: orig.Value, - ValueNode: orig.ValueNode, - }) - c <- componentResult[*highbase.SchemaProxy]{res: sch, key: key.Value} +// buildSchema builds a schema from low level structs. +func buildSchema(inMap map[lowmodel.KeyReference[string]]lowmodel.ValueReference[*base.SchemaProxy], outMap map[string]*highbase.SchemaProxy) { + translateFunc := func(key lowmodel.KeyReference[string], value lowmodel.ValueReference[*base.SchemaProxy]) (componentResult[*highbase.SchemaProxy], error) { + var sch *highbase.SchemaProxy + sch = highbase.NewSchemaProxy(&lowmodel.NodeReference[*base.SchemaProxy]{ + Value: value.Value, + ValueNode: value.ValueNode, + }) + return componentResult[*highbase.SchemaProxy]{res: sch, key: key.Value}, nil + } + resultFunc := func(value componentResult[*highbase.SchemaProxy]) error { + outMap[value.key] = value.res + return nil + } + _ = datamodel.TranslateMapParallel(inMap, translateFunc, resultFunc) } // GoLow returns the low-level Components instance used to create the high-level one. diff --git a/datamodel/high/v3/responses.go b/datamodel/high/v3/responses.go index 1a86d07..e1bd722 100644 --- a/datamodel/high/v3/responses.go +++ b/datamodel/high/v3/responses.go @@ -7,7 +7,9 @@ import ( "fmt" "sort" + "github.com/pb33f/libopenapi/datamodel" "github.com/pb33f/libopenapi/datamodel/high" + lowbase "github.com/pb33f/libopenapi/datamodel/low" low "github.com/pb33f/libopenapi/datamodel/low/v3" "github.com/pb33f/libopenapi/utils" "gopkg.in/yaml.v3" @@ -45,29 +47,19 @@ func NewResponses(responses *low.Responses) *Responses { } codes := make(map[string]*Response) - // struct to hold response and code sent over chan. type respRes struct { code string resp *Response } - // build each response async for speed - rChan := make(chan respRes) - var buildResponse = func(code string, resp *low.Response, c chan respRes) { - c <- respRes{code: code, resp: NewResponse(resp)} + translateFunc := func(key lowbase.KeyReference[string], value lowbase.ValueReference[*low.Response]) (respRes, error) { + return respRes{code: key.Value, resp: NewResponse(value.Value)}, nil } - for k, v := range responses.Codes { - go buildResponse(k.Value, v.Value, rChan) - } - totalCodes := len(responses.Codes) - codesParsed := 0 - for codesParsed < totalCodes { - select { - case re := <-rChan: - codesParsed++ - codes[re.code] = re.resp - } + resultFunc := func(value respRes) error { + codes[value.code] = value.resp + return nil } + _ = datamodel.TranslateMapParallel[lowbase.KeyReference[string], lowbase.ValueReference[*low.Response], respRes](responses.Codes, translateFunc, resultFunc) r.Codes = codes return r } diff --git a/datamodel/low/v3/components.go b/datamodel/low/v3/components.go index b32e53b..000b6d2 100644 --- a/datamodel/low/v3/components.go +++ b/datamodel/low/v3/components.go @@ -4,15 +4,19 @@ package v3 import ( + "context" "crypto/sha256" "fmt" + "sort" + "strings" + "sync" + + "github.com/pb33f/libopenapi/datamodel" "github.com/pb33f/libopenapi/datamodel/low" "github.com/pb33f/libopenapi/datamodel/low/base" "github.com/pb33f/libopenapi/index" "github.com/pb33f/libopenapi/utils" "gopkg.in/yaml.v3" - "sort" - "strings" ) // Components represents a low-level OpenAPI 3+ Components Object, that is backed by a low-level one. @@ -126,74 +130,84 @@ func (co *Components) FindCallback(callback string) *low.ValueReference[*Callbac return low.FindItemInMap[*Callback](callback, co.Callbacks.Value) } +// Build converts root YAML node containing components to low level model. +// Process each component in parallel. func (co *Components) Build(root *yaml.Node, idx *index.SpecIndex) error { root = utils.NodeAlias(root) utils.CheckForMergeNodes(root) co.Reference = new(low.Reference) co.Extensions = low.ExtractExtensions(root) - // build out components asynchronously for speed. There could be some significant weight here. - skipChan := make(chan bool) - errorChan := make(chan error) - paramChan := make(chan low.NodeReference[map[low.KeyReference[string]]low.ValueReference[*Parameter]]) - schemaChan := make(chan low.NodeReference[map[low.KeyReference[string]]low.ValueReference[*base.SchemaProxy]]) - responsesChan := make(chan low.NodeReference[map[low.KeyReference[string]]low.ValueReference[*Response]]) - examplesChan := make(chan low.NodeReference[map[low.KeyReference[string]]low.ValueReference[*base.Example]]) - requestBodiesChan := make(chan low.NodeReference[map[low.KeyReference[string]]low.ValueReference[*RequestBody]]) - headersChan := make(chan low.NodeReference[map[low.KeyReference[string]]low.ValueReference[*Header]]) - securitySchemesChan := make(chan low.NodeReference[map[low.KeyReference[string]]low.ValueReference[*SecurityScheme]]) - linkChan := make(chan low.NodeReference[map[low.KeyReference[string]]low.ValueReference[*Link]]) - callbackChan := make(chan low.NodeReference[map[low.KeyReference[string]]low.ValueReference[*Callback]]) + var reterr error + var ceMutex sync.Mutex + var wg sync.WaitGroup + wg.Add(9) - go extractComponentValues[*base.SchemaProxy](SchemasLabel, root, skipChan, errorChan, schemaChan, idx) - go extractComponentValues[*Parameter](ParametersLabel, root, skipChan, errorChan, paramChan, idx) - go extractComponentValues[*Response](ResponsesLabel, root, skipChan, errorChan, responsesChan, idx) - go extractComponentValues[*base.Example](base.ExamplesLabel, root, skipChan, errorChan, examplesChan, idx) - go extractComponentValues[*RequestBody](RequestBodiesLabel, root, skipChan, errorChan, requestBodiesChan, idx) - go extractComponentValues[*Header](HeadersLabel, root, skipChan, errorChan, headersChan, idx) - go extractComponentValues[*SecurityScheme](SecuritySchemesLabel, root, skipChan, errorChan, securitySchemesChan, idx) - go extractComponentValues[*Link](LinksLabel, root, skipChan, errorChan, linkChan, idx) - go extractComponentValues[*Callback](CallbacksLabel, root, skipChan, errorChan, callbackChan, idx) - - n := 0 - total := 9 - - for n < total { - select { - case buildError := <-errorChan: - return buildError - case <-skipChan: - n++ - case params := <-paramChan: - co.Parameters = params - n++ - case schemas := <-schemaChan: - co.Schemas = schemas - n++ - case responses := <-responsesChan: - co.Responses = responses - n++ - case examples := <-examplesChan: - co.Examples = examples - n++ - case reqBody := <-requestBodiesChan: - co.RequestBodies = reqBody - n++ - case headers := <-headersChan: - co.Headers = headers - n++ - case sScheme := <-securitySchemesChan: - co.SecuritySchemes = sScheme - n++ - case links := <-linkChan: - co.Links = links - n++ - case callbacks := <-callbackChan: - co.Callbacks = callbacks - n++ + captureError := func(err error) { + ceMutex.Lock() + defer ceMutex.Unlock() + if err != nil { + reterr = err } } - return nil + + go func() { + schemas, err := extractComponentValues[*base.SchemaProxy](SchemasLabel, root, idx) + captureError(err) + co.Schemas = schemas + wg.Done() + }() + go func() { + parameters, err := extractComponentValues[*Parameter](ParametersLabel, root, idx) + captureError(err) + co.Parameters = parameters + wg.Done() + }() + go func() { + responses, err := extractComponentValues[*Response](ResponsesLabel, root, idx) + captureError(err) + co.Responses = responses + wg.Done() + }() + go func() { + examples, err := extractComponentValues[*base.Example](base.ExamplesLabel, root, idx) + captureError(err) + co.Examples = examples + wg.Done() + }() + go func() { + requestBodies, err := extractComponentValues[*RequestBody](RequestBodiesLabel, root, idx) + captureError(err) + co.RequestBodies = requestBodies + wg.Done() + }() + go func() { + headers, err := extractComponentValues[*Header](HeadersLabel, root, idx) + captureError(err) + co.Headers = headers + wg.Done() + }() + go func() { + securitySchemes, err := extractComponentValues[*SecurityScheme](SecuritySchemesLabel, root, idx) + captureError(err) + co.SecuritySchemes = securitySchemes + wg.Done() + }() + go func() { + links, err := extractComponentValues[*Link](LinksLabel, root, idx) + captureError(err) + co.Links = links + wg.Done() + }() + go func() { + callbacks, err := extractComponentValues[*Callback](CallbacksLabel, root, idx) + captureError(err) + co.Callbacks = callbacks + wg.Done() + }() + + wg.Wait() + return reterr } type componentBuildResult[T any] struct { @@ -201,81 +215,105 @@ type componentBuildResult[T any] struct { v low.ValueReference[T] } -func extractComponentValues[T low.Buildable[N], N any](label string, root *yaml.Node, - skip chan bool, errorChan chan<- error, resultChan chan<- low.NodeReference[map[low.KeyReference[string]]low.ValueReference[T]], idx *index.SpecIndex) { +// extractComponentValues converts all the YAML nodes of a component type to +// low level model. +// Process each node in parallel. +func extractComponentValues[T low.Buildable[N], N any](label string, root *yaml.Node, idx *index.SpecIndex) (retval low.NodeReference[map[low.KeyReference[string]]low.ValueReference[T]], _ error) { _, nodeLabel, nodeValue := utils.FindKeyNodeFullTop(label, root.Content) if nodeValue == nil { - skip <- true - return + return retval, nil } - var currentLabel *yaml.Node componentValues := make(map[low.KeyReference[string]]low.ValueReference[T]) if utils.IsNodeArray(nodeValue) { - errorChan <- fmt.Errorf("node is array, cannot be used in components: line %d, column %d", nodeValue.Line, nodeValue.Column) - return + return retval, fmt.Errorf("node is array, cannot be used in components: line %d, column %d", nodeValue.Line, nodeValue.Column) } - // for every component, build in a new thread! - bChan := make(chan componentBuildResult[T]) - eChan := make(chan error) - var buildComponent = func(parentLabel string, label *yaml.Node, value *yaml.Node, c chan componentBuildResult[T], ec chan<- error) { + type inputValue struct { + node *yaml.Node + currentLabel *yaml.Node + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + in := make(chan inputValue) + out := make(chan componentBuildResult[T]) + var wg sync.WaitGroup + wg.Add(2) // input and output goroutines. + + // Send input. + go func() { + defer wg.Done() + var currentLabel *yaml.Node + for i, node := range nodeValue.Content { + // always ignore extensions + if i%2 == 0 { + currentLabel = node + continue + } + // only check for lowercase extensions as 'X-' is still valid as a key (annoyingly). + if strings.HasPrefix(currentLabel.Value, "x-") { + continue + } + + select { + case in <- inputValue{ + node: node, + currentLabel: currentLabel, + }: + case <-ctx.Done(): + return + } + } + close(in) + }() + + // Collect output. + go func() { + for result := range out { + componentValues[result.k] = result.v + } + cancel() + wg.Done() + }() + + // Translate. + translateFunc := func(value inputValue) (retval componentBuildResult[T], _ error) { var n T = new(N) + currentLabel := value.currentLabel + node := value.node // if this is a reference, extract it (although components with references is an antipattern) // If you're building components as references... pls... stop, this code should not need to be here. // TODO: check circular crazy on this. It may explode var err error - if h, _, _ := utils.IsNodeRefValue(value); h && parentLabel != SchemasLabel { - value, err = low.LocateRefNode(value, idx) + if h, _, _ := utils.IsNodeRefValue(node); h && label != SchemasLabel { + node, err = low.LocateRefNode(node, idx) } if err != nil { - ec <- err - return + return retval, err } // build. - _ = low.BuildModel(value, n) - // todo: label is a key or? - err = n.Build(label, value, idx) + _ = low.BuildModel(node, n) + err = n.Build(currentLabel, node, idx) if err != nil { - ec <- err - return + return retval, err } - c <- componentBuildResult[T]{ + return componentBuildResult[T]{ k: low.KeyReference[string]{ - KeyNode: label, - Value: label.Value, + KeyNode: currentLabel, + Value: currentLabel.Value, }, v: low.ValueReference[T]{ Value: n, - ValueNode: value, + ValueNode: node, }, - } + }, nil } - totalComponents := 0 - for i, v := range nodeValue.Content { - // always ignore extensions - if i%2 == 0 { - currentLabel = v - continue - } - // only check for lowercase extensions as 'X-' is still valid as a key (annoyingly). - if strings.HasPrefix(currentLabel.Value, "x-") { - continue - } - totalComponents++ - go buildComponent(label, currentLabel, v, bChan, eChan) - } - - completedComponents := 0 - for completedComponents < totalComponents { - select { - case e := <-eChan: - errorChan <- e - case r := <-bChan: - componentValues[r.k] = r.v - completedComponents++ - } + err := datamodel.TranslatePipeline[inputValue, componentBuildResult[T]](in, out, translateFunc) + wg.Wait() + if err != nil { + return retval, err } results := low.NodeReference[map[low.KeyReference[string]]low.ValueReference[T]]{ @@ -283,5 +321,5 @@ func extractComponentValues[T low.Buildable[N], N any](label string, root *yaml. ValueNode: nodeValue, Value: componentValues, } - resultChan <- results + return results, nil }