Changed remote loader to use a timeout

rather than a hard block, it will wait 50ms then try again, regardless of cores, so it won’t ever block fully.

Signed-off-by: quobix <dave@quobix.com>
This commit is contained in:
quobix
2023-11-01 14:29:52 -04:00
parent d096163f0e
commit 276c3959fd
3 changed files with 796 additions and 815 deletions

View File

@@ -4,17 +4,16 @@
package index package index
import ( import (
"context"
"errors" "errors"
"fmt" "fmt"
"github.com/pb33f/libopenapi/datamodel" "github.com/pb33f/libopenapi/datamodel"
"github.com/pb33f/libopenapi/utils" "github.com/pb33f/libopenapi/utils"
"log/slog"
"runtime"
"golang.org/x/sync/syncmap" "golang.org/x/sync/syncmap"
"gopkg.in/yaml.v3" "gopkg.in/yaml.v3"
"io" "io"
"io/fs" "io/fs"
"log/slog"
"net/http" "net/http"
"net/url" "net/url"
"os" "os"
@@ -261,11 +260,11 @@ func (i *RemoteFS) Open(remoteURL string) (fs.File, error) {
// if we're processing, we need to block and wait for the file to be processed // if we're processing, we need to block and wait for the file to be processed
// try path first // try path first
if _, ok := i.ProcessingFiles.Load(remoteParsedURL.Path); ok { if _, ok := i.ProcessingFiles.Load(remoteParsedURL.Path); ok {
// we can't block if we only have a couple of CPUs, as we'll deadlock / run super slow, only when we're running in parallel
// can we block threads.
if runtime.GOMAXPROCS(-1) > 2 {
i.logger.Debug("waiting for existing fetch to complete", "file", remoteURL, "remoteURL", remoteParsedURL.String())
i.logger.Debug("waiting for existing fetch to complete", "file", remoteURL, "remoteURL", remoteParsedURL.String())
// Create a context with a timeout of 50ms
ctxTimeout, cancel := context.WithTimeout(context.Background(), time.Millisecond*50)
defer cancel()
f := make(chan *RemoteFile) f := make(chan *RemoteFile)
fwait := func(path string, c chan *RemoteFile) { fwait := func(path string, c chan *RemoteFile) {
for { for {
@@ -275,7 +274,12 @@ func (i *RemoteFS) Open(remoteURL string) (fs.File, error) {
} }
} }
go fwait(remoteParsedURL.Path, f) go fwait(remoteParsedURL.Path, f)
return <-f, nil
select {
case <-ctxTimeout.Done():
i.logger.Info("waiting for remote file timed out, trying again", "file", remoteURL, "remoteURL", remoteParsedURL.String())
case v := <-f:
return v, nil
} }
} }

View File

@@ -13,19 +13,16 @@
package index package index
import ( import (
"context"
"fmt" "fmt"
"github.com/pb33f/libopenapi/utils"
"github.com/vmware-labs/yaml-jsonpath/pkg/yamlpath"
"golang.org/x/sync/syncmap" "golang.org/x/sync/syncmap"
"gopkg.in/yaml.v3"
"log/slog" "log/slog"
"os" "os"
"sort" "sort"
"strings" "strings"
"sync" "sync"
"time"
"github.com/pb33f/libopenapi/utils"
"github.com/vmware-labs/yaml-jsonpath/pkg/yamlpath"
"gopkg.in/yaml.v3"
) )
// NewSpecIndexWithConfig will create a new index of an OpenAPI or Swagger spec. It uses the same logic as NewSpecIndex // NewSpecIndexWithConfig will create a new index of an OpenAPI or Swagger spec. It uses the same logic as NewSpecIndex
@@ -696,29 +693,9 @@ func (index *SpecIndex) GetGlobalLinksCount() int {
// look through method for links // look through method for links
links, _ := yamlpath.NewPath("$..links") links, _ := yamlpath.NewPath("$..links")
// Channel used to receive the result from doSomething function
ch := make(chan string, 1)
// Create a context with a timeout of 5 seconds
ctxTimeout, cancel := context.WithTimeout(context.Background(), time.Millisecond*500)
defer cancel()
var res []*yaml.Node var res []*yaml.Node
doSomething := func(ctx context.Context, ch chan<- string) {
res, _ = links.Find(m.Node) res, _ = links.Find(m.Node)
ch <- m.Definition
}
// Start the doSomething function
go doSomething(ctxTimeout, ch)
select {
case <-ctxTimeout.Done():
fmt.Printf("Global links %d ref: Context cancelled: %v\n", m.Node.Line, ctxTimeout.Err())
case <-ch:
}
if len(res) > 0 { if len(res) > 0 {
for _, link := range res[0].Content { for _, link := range res[0].Content {

View File

@@ -142,7 +142,7 @@ func TestSpecIndex_DigitalOcean(t *testing.T) {
cf.AllowRemoteLookup = true cf.AllowRemoteLookup = true
cf.AvoidCircularReferenceCheck = true cf.AvoidCircularReferenceCheck = true
cf.Logger = slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ cf.Logger = slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
Level: slog.LevelError, Level: slog.LevelInfo,
})) }))
// setting this baseURL will override the base // setting this baseURL will override the base