mirror of
https://github.com/LukeHagar/libopenapi.git
synced 2025-12-09 20:47:44 +00:00
Refactor v2 Paths to parse YAML using TranslatePipeline.
This commit is contained in:
@@ -4,6 +4,8 @@
|
|||||||
package v2
|
package v2
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"sync"
|
||||||
|
|
||||||
"github.com/pb33f/libopenapi/datamodel/high"
|
"github.com/pb33f/libopenapi/datamodel/high"
|
||||||
low "github.com/pb33f/libopenapi/datamodel/low/v2"
|
low "github.com/pb33f/libopenapi/datamodel/low/v2"
|
||||||
)
|
)
|
||||||
@@ -40,71 +42,61 @@ func NewPathItem(pathItem *low.PathItem) *PathItem {
|
|||||||
}
|
}
|
||||||
p.Parameters = params
|
p.Parameters = params
|
||||||
}
|
}
|
||||||
var buildOperation = func(method string, op *low.Operation, resChan chan<- asyncResult[*Operation]) {
|
var buildOperation = func(method string, op *low.Operation) *Operation {
|
||||||
resChan <- asyncResult[*Operation]{
|
return NewOperation(op)
|
||||||
key: method,
|
|
||||||
result: NewOperation(op),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
totalOperations := 0
|
|
||||||
resChan := make(chan asyncResult[*Operation])
|
var wg sync.WaitGroup
|
||||||
if !pathItem.Get.IsEmpty() {
|
if !pathItem.Get.IsEmpty() {
|
||||||
totalOperations++
|
wg.Add(1)
|
||||||
go buildOperation(low.GetLabel, pathItem.Get.Value, resChan)
|
go func() {
|
||||||
|
p.Get = buildOperation(low.GetLabel, pathItem.Get.Value)
|
||||||
|
wg.Done()
|
||||||
|
}()
|
||||||
}
|
}
|
||||||
if !pathItem.Put.IsEmpty() {
|
if !pathItem.Put.IsEmpty() {
|
||||||
totalOperations++
|
wg.Add(1)
|
||||||
go buildOperation(low.PutLabel, pathItem.Put.Value, resChan)
|
go func() {
|
||||||
|
p.Put = buildOperation(low.PutLabel, pathItem.Put.Value)
|
||||||
|
wg.Done()
|
||||||
|
}()
|
||||||
}
|
}
|
||||||
if !pathItem.Post.IsEmpty() {
|
if !pathItem.Post.IsEmpty() {
|
||||||
totalOperations++
|
wg.Add(1)
|
||||||
go buildOperation(low.PostLabel, pathItem.Post.Value, resChan)
|
go func() {
|
||||||
|
p.Post = buildOperation(low.PostLabel, pathItem.Post.Value)
|
||||||
|
wg.Done()
|
||||||
|
}()
|
||||||
}
|
}
|
||||||
if !pathItem.Patch.IsEmpty() {
|
if !pathItem.Patch.IsEmpty() {
|
||||||
totalOperations++
|
wg.Add(1)
|
||||||
go buildOperation(low.PatchLabel, pathItem.Patch.Value, resChan)
|
go func() {
|
||||||
|
p.Patch = buildOperation(low.PatchLabel, pathItem.Patch.Value)
|
||||||
|
wg.Done()
|
||||||
|
}()
|
||||||
}
|
}
|
||||||
if !pathItem.Delete.IsEmpty() {
|
if !pathItem.Delete.IsEmpty() {
|
||||||
totalOperations++
|
wg.Add(1)
|
||||||
go buildOperation(low.DeleteLabel, pathItem.Delete.Value, resChan)
|
go func() {
|
||||||
|
p.Delete = buildOperation(low.DeleteLabel, pathItem.Delete.Value)
|
||||||
|
wg.Done()
|
||||||
|
}()
|
||||||
}
|
}
|
||||||
if !pathItem.Head.IsEmpty() {
|
if !pathItem.Head.IsEmpty() {
|
||||||
totalOperations++
|
wg.Add(1)
|
||||||
go buildOperation(low.HeadLabel, pathItem.Head.Value, resChan)
|
go func() {
|
||||||
|
p.Head = buildOperation(low.HeadLabel, pathItem.Head.Value)
|
||||||
|
wg.Done()
|
||||||
|
}()
|
||||||
}
|
}
|
||||||
if !pathItem.Options.IsEmpty() {
|
if !pathItem.Options.IsEmpty() {
|
||||||
totalOperations++
|
wg.Add(1)
|
||||||
go buildOperation(low.OptionsLabel, pathItem.Options.Value, resChan)
|
go func() {
|
||||||
}
|
p.Options = buildOperation(low.OptionsLabel, pathItem.Options.Value)
|
||||||
completedOperations := 0
|
wg.Done()
|
||||||
for completedOperations < totalOperations {
|
}()
|
||||||
select {
|
|
||||||
case r := <-resChan:
|
|
||||||
switch r.key {
|
|
||||||
case low.GetLabel:
|
|
||||||
completedOperations++
|
|
||||||
p.Get = r.result
|
|
||||||
case low.PutLabel:
|
|
||||||
completedOperations++
|
|
||||||
p.Put = r.result
|
|
||||||
case low.PostLabel:
|
|
||||||
completedOperations++
|
|
||||||
p.Post = r.result
|
|
||||||
case low.PatchLabel:
|
|
||||||
completedOperations++
|
|
||||||
p.Patch = r.result
|
|
||||||
case low.DeleteLabel:
|
|
||||||
completedOperations++
|
|
||||||
p.Delete = r.result
|
|
||||||
case low.HeadLabel:
|
|
||||||
completedOperations++
|
|
||||||
p.Head = r.result
|
|
||||||
case low.OptionsLabel:
|
|
||||||
completedOperations++
|
|
||||||
p.Options = r.result
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
wg.Wait()
|
||||||
return p
|
return p
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -4,50 +4,44 @@
|
|||||||
package v2
|
package v2
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"github.com/pb33f/libopenapi/datamodel"
|
||||||
"github.com/pb33f/libopenapi/datamodel/high"
|
"github.com/pb33f/libopenapi/datamodel/high"
|
||||||
low "github.com/pb33f/libopenapi/datamodel/low/v2"
|
"github.com/pb33f/libopenapi/datamodel/low"
|
||||||
|
v2low "github.com/pb33f/libopenapi/datamodel/low/v2"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Paths represents a high-level Swagger / OpenAPI Paths object, backed by a low-level one.
|
// Paths represents a high-level Swagger / OpenAPI Paths object, backed by a low-level one.
|
||||||
type Paths struct {
|
type Paths struct {
|
||||||
PathItems map[string]*PathItem
|
PathItems map[string]*PathItem
|
||||||
Extensions map[string]any
|
Extensions map[string]any
|
||||||
low *low.Paths
|
low *v2low.Paths
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewPaths creates a new high-level instance of Paths from a low-level one.
|
// NewPaths creates a new high-level instance of Paths from a low-level one.
|
||||||
func NewPaths(paths *low.Paths) *Paths {
|
func NewPaths(paths *v2low.Paths) *Paths {
|
||||||
p := new(Paths)
|
p := new(Paths)
|
||||||
p.low = paths
|
p.low = paths
|
||||||
p.Extensions = high.ExtractExtensions(paths.Extensions)
|
p.Extensions = high.ExtractExtensions(paths.Extensions)
|
||||||
|
pathItems := make(map[string]*PathItem)
|
||||||
|
|
||||||
resultChan := make(chan asyncResult[*PathItem])
|
translateFunc := func(key low.KeyReference[string], value low.ValueReference[*v2low.PathItem]) (asyncResult[*PathItem], error) {
|
||||||
var buildPath = func(path string, pi *low.PathItem, rChan chan<- asyncResult[*PathItem]) {
|
return asyncResult[*PathItem]{
|
||||||
rChan <- asyncResult[*PathItem]{
|
key: key.Value,
|
||||||
key: path,
|
result: NewPathItem(value.Value),
|
||||||
result: NewPathItem(pi),
|
}, nil
|
||||||
}
|
|
||||||
}
|
}
|
||||||
if len(paths.PathItems) > 0 {
|
resultFunc := func(result asyncResult[*PathItem]) error {
|
||||||
pathItems := make(map[string]*PathItem)
|
pathItems[result.key] = result.result
|
||||||
totalPaths := len(paths.PathItems)
|
return nil
|
||||||
for k := range paths.PathItems {
|
|
||||||
go buildPath(k.Value, paths.PathItems[k].Value, resultChan)
|
|
||||||
}
|
|
||||||
completedPaths := 0
|
|
||||||
for completedPaths < totalPaths {
|
|
||||||
select {
|
|
||||||
case res := <-resultChan:
|
|
||||||
completedPaths++
|
|
||||||
pathItems[res.key] = res.result
|
|
||||||
}
|
|
||||||
}
|
|
||||||
p.PathItems = pathItems
|
|
||||||
}
|
}
|
||||||
|
_ = datamodel.TranslateMapParallel[low.KeyReference[string], low.ValueReference[*v2low.PathItem], asyncResult[*PathItem]](
|
||||||
|
paths.PathItems, translateFunc, resultFunc,
|
||||||
|
)
|
||||||
|
p.PathItems = pathItems
|
||||||
return p
|
return p
|
||||||
}
|
}
|
||||||
|
|
||||||
// GoLow returns the low-level Paths instance that backs the high level one.
|
// GoLow returns the low-level Paths instance that backs the high level one.
|
||||||
func (p *Paths) GoLow() *low.Paths {
|
func (p *Paths) GoLow() *v2low.Paths {
|
||||||
return p.low
|
return p.low
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -33,19 +33,19 @@ func NewPaths(paths *v3low.Paths) *Paths {
|
|||||||
p.Extensions = high.ExtractExtensions(paths.Extensions)
|
p.Extensions = high.ExtractExtensions(paths.Extensions)
|
||||||
items := make(map[string]*PathItem)
|
items := make(map[string]*PathItem)
|
||||||
|
|
||||||
type pRes struct {
|
type pathItemResult struct {
|
||||||
key string
|
key string
|
||||||
value *PathItem
|
value *PathItem
|
||||||
}
|
}
|
||||||
|
|
||||||
translateFunc := func(key low.KeyReference[string], value low.ValueReference[*v3low.PathItem]) (pRes, error) {
|
translateFunc := func(key low.KeyReference[string], value low.ValueReference[*v3low.PathItem]) (pathItemResult, error) {
|
||||||
return pRes{key: key.Value, value: NewPathItem(value.Value)}, nil
|
return pathItemResult{key: key.Value, value: NewPathItem(value.Value)}, nil
|
||||||
}
|
}
|
||||||
resultFunc := func(value pRes) error {
|
resultFunc := func(value pathItemResult) error {
|
||||||
items[value.key] = value.value
|
items[value.key] = value.value
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
_ = datamodel.TranslateMapParallel[low.KeyReference[string], low.ValueReference[*v3low.PathItem], pRes](
|
_ = datamodel.TranslateMapParallel[low.KeyReference[string], low.ValueReference[*v3low.PathItem], pathItemResult](
|
||||||
paths.PathItems, translateFunc, resultFunc,
|
paths.PathItems, translateFunc, resultFunc,
|
||||||
)
|
)
|
||||||
p.PathItems = items
|
p.PathItems = items
|
||||||
|
|||||||
@@ -6,13 +6,14 @@ package v2
|
|||||||
import (
|
import (
|
||||||
"crypto/sha256"
|
"crypto/sha256"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"sort"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
|
||||||
"github.com/pb33f/libopenapi/datamodel/low"
|
"github.com/pb33f/libopenapi/datamodel/low"
|
||||||
"github.com/pb33f/libopenapi/index"
|
"github.com/pb33f/libopenapi/index"
|
||||||
"github.com/pb33f/libopenapi/utils"
|
"github.com/pb33f/libopenapi/utils"
|
||||||
"gopkg.in/yaml.v3"
|
"gopkg.in/yaml.v3"
|
||||||
"sort"
|
|
||||||
"strings"
|
|
||||||
"sync"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// PathItem represents a low-level Swagger / OpenAPI 2 PathItem object.
|
// PathItem represents a low-level Swagger / OpenAPI 2 PathItem object.
|
||||||
|
|||||||
@@ -4,14 +4,18 @@
|
|||||||
package v2
|
package v2
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"crypto/sha256"
|
"crypto/sha256"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"sort"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/pb33f/libopenapi/datamodel"
|
||||||
"github.com/pb33f/libopenapi/datamodel/low"
|
"github.com/pb33f/libopenapi/datamodel/low"
|
||||||
"github.com/pb33f/libopenapi/index"
|
"github.com/pb33f/libopenapi/index"
|
||||||
"github.com/pb33f/libopenapi/utils"
|
"github.com/pb33f/libopenapi/utils"
|
||||||
"gopkg.in/yaml.v3"
|
"gopkg.in/yaml.v3"
|
||||||
"sort"
|
|
||||||
"strings"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// Paths represents a low-level Swagger / OpenAPI Paths object.
|
// Paths represents a low-level Swagger / OpenAPI Paths object.
|
||||||
@@ -55,65 +59,104 @@ func (p *Paths) Build(_, root *yaml.Node, idx *index.SpecIndex) error {
|
|||||||
root = utils.NodeAlias(root)
|
root = utils.NodeAlias(root)
|
||||||
utils.CheckForMergeNodes(root)
|
utils.CheckForMergeNodes(root)
|
||||||
p.Extensions = low.ExtractExtensions(root)
|
p.Extensions = low.ExtractExtensions(root)
|
||||||
skip := false
|
// skip := false
|
||||||
var currentNode *yaml.Node
|
// var currentNode *yaml.Node
|
||||||
|
|
||||||
pathsMap := make(map[low.KeyReference[string]]low.ValueReference[*PathItem])
|
// Translate YAML nodes to pathsMap using `TranslatePipeline`.
|
||||||
|
|
||||||
// build each new path, in a new thread.
|
|
||||||
type pathBuildResult struct {
|
type pathBuildResult struct {
|
||||||
k low.KeyReference[string]
|
key low.KeyReference[string]
|
||||||
v low.ValueReference[*PathItem]
|
value low.ValueReference[*PathItem]
|
||||||
}
|
}
|
||||||
|
type nodeItem struct {
|
||||||
|
currentNode *yaml.Node
|
||||||
|
pathNode *yaml.Node
|
||||||
|
}
|
||||||
|
pathsMap := make(map[low.KeyReference[string]]low.ValueReference[*PathItem])
|
||||||
|
in := make(chan nodeItem)
|
||||||
|
out := make(chan pathBuildResult)
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
wg.Add(2) // input and output goroutines.
|
||||||
|
|
||||||
bChan := make(chan pathBuildResult)
|
// TranslatePipeline input.
|
||||||
eChan := make(chan error)
|
go func() {
|
||||||
var buildPathItem = func(cNode, pNode *yaml.Node, b chan<- pathBuildResult, e chan<- error) {
|
defer func() {
|
||||||
|
close(in)
|
||||||
|
wg.Done()
|
||||||
|
}()
|
||||||
|
skip := false
|
||||||
|
var currentNode *yaml.Node
|
||||||
|
for i, pathNode := range root.Content {
|
||||||
|
if strings.HasPrefix(strings.ToLower(pathNode.Value), "x-") {
|
||||||
|
skip = true
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if skip {
|
||||||
|
skip = false
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if i%2 == 0 {
|
||||||
|
currentNode = pathNode
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case in <- nodeItem{
|
||||||
|
currentNode: currentNode,
|
||||||
|
pathNode: pathNode,
|
||||||
|
}:
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// TranslatePipeline output.
|
||||||
|
go func() {
|
||||||
|
defer func() {
|
||||||
|
cancel()
|
||||||
|
wg.Done()
|
||||||
|
}()
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case result, ok := <-out:
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
pathsMap[result.key] = result.value
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
translateFunc := func(value nodeItem) (retval pathBuildResult, _ error) {
|
||||||
|
pNode := value.pathNode
|
||||||
|
cNode := value.currentNode
|
||||||
path := new(PathItem)
|
path := new(PathItem)
|
||||||
_ = low.BuildModel(pNode, path)
|
_ = low.BuildModel(pNode, path)
|
||||||
err := path.Build(cNode, pNode, idx)
|
err := path.Build(cNode, pNode, idx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
e <- err
|
return retval, err
|
||||||
return
|
|
||||||
}
|
}
|
||||||
b <- pathBuildResult{
|
return pathBuildResult{
|
||||||
k: low.KeyReference[string]{
|
key: low.KeyReference[string]{
|
||||||
Value: cNode.Value,
|
Value: cNode.Value,
|
||||||
KeyNode: cNode,
|
KeyNode: cNode,
|
||||||
},
|
},
|
||||||
v: low.ValueReference[*PathItem]{
|
value: low.ValueReference[*PathItem]{
|
||||||
Value: path,
|
Value: path,
|
||||||
ValueNode: pNode,
|
ValueNode: pNode,
|
||||||
},
|
},
|
||||||
}
|
}, nil
|
||||||
}
|
}
|
||||||
pathCount := 0
|
err := datamodel.TranslatePipeline[nodeItem, pathBuildResult](in, out, translateFunc)
|
||||||
for i, pathNode := range root.Content {
|
wg.Wait()
|
||||||
if strings.HasPrefix(strings.ToLower(pathNode.Value), "x-") {
|
if err != nil {
|
||||||
skip = true
|
return err
|
||||||
continue
|
|
||||||
}
|
|
||||||
if skip {
|
|
||||||
skip = false
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if i%2 == 0 {
|
|
||||||
currentNode = pathNode
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
pathCount++
|
|
||||||
go buildPathItem(currentNode, pathNode, bChan, eChan)
|
|
||||||
}
|
|
||||||
completedItems := 0
|
|
||||||
for completedItems < pathCount {
|
|
||||||
select {
|
|
||||||
case err := <-eChan:
|
|
||||||
return err
|
|
||||||
case res := <-bChan:
|
|
||||||
completedItems++
|
|
||||||
pathsMap[res.k] = res.v
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
p.PathItems = pathsMap
|
p.PathItems = pathsMap
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -4,13 +4,13 @@
|
|||||||
package v3
|
package v3
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
"crypto/sha256"
|
"crypto/sha256"
|
||||||
"fmt"
|
"fmt"
|
||||||
"sort"
|
"sort"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
|
"github.com/pb33f/libopenapi/datamodel"
|
||||||
"github.com/pb33f/libopenapi/datamodel/low"
|
"github.com/pb33f/libopenapi/datamodel/low"
|
||||||
"github.com/pb33f/libopenapi/index"
|
"github.com/pb33f/libopenapi/index"
|
||||||
"github.com/pb33f/libopenapi/utils"
|
"github.com/pb33f/libopenapi/utils"
|
||||||
@@ -271,58 +271,24 @@ func (p *PathItem) Build(_, root *yaml.Node, idx *index.SpecIndex) error {
|
|||||||
|
|
||||||
// all operations have been superficially built,
|
// all operations have been superficially built,
|
||||||
// now we need to build out the operation, we will do this asynchronously for speed.
|
// now we need to build out the operation, we will do this asynchronously for speed.
|
||||||
opBuildChan := make(chan bool)
|
translateFunc := func(_ int, op low.NodeReference[*Operation]) (any, error) {
|
||||||
opErrorChan := make(chan error)
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
buildOpFunc := func(op low.NodeReference[*Operation], ch chan<- bool, errCh chan<- error, ref string) {
|
|
||||||
er := op.Value.Build(op.KeyNode, op.ValueNode, idx)
|
|
||||||
if ref != "" {
|
|
||||||
op.Value.Reference.Reference = ref
|
|
||||||
}
|
|
||||||
if er != nil {
|
|
||||||
select {
|
|
||||||
case errCh <- er:
|
|
||||||
case <-ctx.Done():
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
select {
|
|
||||||
case ch <- true:
|
|
||||||
case <-ctx.Done():
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(ops) <= 0 {
|
|
||||||
return nil // nothing to do.
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, op := range ops {
|
|
||||||
ref := ""
|
ref := ""
|
||||||
if op.ReferenceNode {
|
if op.ReferenceNode {
|
||||||
ref = op.Reference
|
ref = op.Reference
|
||||||
}
|
}
|
||||||
go buildOpFunc(op, opBuildChan, opErrorChan, ref)
|
|
||||||
}
|
|
||||||
|
|
||||||
n := 0
|
err := op.Value.Build(op.KeyNode, op.ValueNode, idx)
|
||||||
total := len(ops)
|
if ref != "" {
|
||||||
FORLOOP1:
|
op.Value.Reference.Reference = ref
|
||||||
for n < total {
|
|
||||||
select {
|
|
||||||
case buildError := <-opErrorChan:
|
|
||||||
return buildError
|
|
||||||
case <-opBuildChan:
|
|
||||||
n++
|
|
||||||
case <-ctx.Done():
|
|
||||||
break FORLOOP1
|
|
||||||
}
|
}
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
err := datamodel.TranslateSliceParallel[low.NodeReference[*Operation], any](ops, translateFunc, nil)
|
||||||
// make sure we don't exit before the path is finished building.
|
if err != nil {
|
||||||
if len(ops) > 0 {
|
return err
|
||||||
wg.Wait()
|
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -25,7 +25,7 @@ type jobStatus[OUT any] struct {
|
|||||||
result OUT
|
result OUT
|
||||||
}
|
}
|
||||||
|
|
||||||
type tpJobStatus[IN any, OUT any] struct {
|
type pipelineJobStatus[IN any, OUT any] struct {
|
||||||
done chan struct{}
|
done chan struct{}
|
||||||
cont bool
|
cont bool
|
||||||
eof bool
|
eof bool
|
||||||
@@ -201,8 +201,8 @@ func TranslatePipeline[IN any, OUT any](in <-chan IN, out chan<- OUT, translate
|
|||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
defer cancel()
|
defer cancel()
|
||||||
concurrency := runtime.NumCPU()
|
concurrency := runtime.NumCPU()
|
||||||
workChan := make(chan *tpJobStatus[IN, OUT])
|
workChan := make(chan *pipelineJobStatus[IN, OUT])
|
||||||
resultChan := make(chan *tpJobStatus[IN, OUT])
|
resultChan := make(chan *pipelineJobStatus[IN, OUT])
|
||||||
var reterr error
|
var reterr error
|
||||||
var mu sync.Mutex
|
var mu sync.Mutex
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
@@ -257,7 +257,7 @@ func TranslatePipeline[IN any, OUT any](in <-chan IN, out chan<- OUT, translate
|
|||||||
if !ok {
|
if !ok {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
j := &tpJobStatus[IN, OUT]{
|
j := &pipelineJobStatus[IN, OUT]{
|
||||||
done: make(chan struct{}),
|
done: make(chan struct{}),
|
||||||
input: value,
|
input: value,
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user