mirror of
https://github.com/LukeHagar/slinky.git
synced 2025-12-06 04:21:20 +00:00
Implement mutex for thread-safe URLCache operations and optimize job handling in CheckURLs function
This commit introduces a mutex to the URLCache struct to ensure thread-safe access to cache entries during load, get, set, and clear operations. Additionally, it refines the job handling logic in the CheckURLs function by using atomic counters for processed and pending jobs, improving concurrency management.
This commit is contained in:
@@ -4,6 +4,7 @@ import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
@@ -18,6 +19,7 @@ type CacheEntry struct {
|
||||
|
||||
// URLCache manages URL result caching
|
||||
type URLCache struct {
|
||||
mu sync.RWMutex
|
||||
entries map[string]CacheEntry
|
||||
ttl time.Duration
|
||||
path string
|
||||
@@ -57,6 +59,8 @@ func (c *URLCache) Load() error {
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
c.entries = make(map[string]CacheEntry, len(entries))
|
||||
for _, entry := range entries {
|
||||
// Only load entries that haven't expired
|
||||
@@ -70,14 +74,18 @@ func (c *URLCache) Load() error {
|
||||
|
||||
// Get retrieves a cached result for a URL
|
||||
func (c *URLCache) Get(url string) (CacheEntry, bool) {
|
||||
c.mu.RLock()
|
||||
entry, ok := c.entries[url]
|
||||
c.mu.RUnlock()
|
||||
if !ok {
|
||||
return CacheEntry{}, false
|
||||
}
|
||||
|
||||
// Check if entry has expired
|
||||
if time.Since(entry.Checked) >= c.ttl {
|
||||
c.mu.Lock()
|
||||
delete(c.entries, url)
|
||||
c.mu.Unlock()
|
||||
return CacheEntry{}, false
|
||||
}
|
||||
|
||||
@@ -86,6 +94,8 @@ func (c *URLCache) Get(url string) (CacheEntry, bool) {
|
||||
|
||||
// Set stores a result in the cache
|
||||
func (c *URLCache) Set(url string, ok bool, status int, errMsg string) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
c.entries[url] = CacheEntry{
|
||||
URL: url,
|
||||
OK: ok,
|
||||
@@ -102,10 +112,12 @@ func (c *URLCache) Save() error {
|
||||
}
|
||||
|
||||
// Convert map to slice for JSON serialization
|
||||
c.mu.RLock()
|
||||
entries := make([]CacheEntry, 0, len(c.entries))
|
||||
for _, entry := range c.entries {
|
||||
entries = append(entries, entry)
|
||||
}
|
||||
c.mu.RUnlock()
|
||||
|
||||
data, err := json.MarshalIndent(entries, "", " ")
|
||||
if err != nil {
|
||||
@@ -127,6 +139,8 @@ func (c *URLCache) Save() error {
|
||||
|
||||
// Clear removes all entries from the cache
|
||||
func (c *URLCache) Clear() {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
c.entries = make(map[string]CacheEntry)
|
||||
}
|
||||
|
||||
|
||||
@@ -38,12 +38,18 @@ func CheckURLs(ctx context.Context, urls []string, sources map[string][]string,
|
||||
jobs := make(chan job, len(urls))
|
||||
done := make(chan struct{})
|
||||
|
||||
// Use atomic counters to avoid race conditions
|
||||
var processed int64
|
||||
var pending int64
|
||||
var jobCount int64
|
||||
|
||||
// Seed jobs (URLs are already deduplicated in check.go, so no need to deduplicate here)
|
||||
for _, u := range urls {
|
||||
if u == "" {
|
||||
continue
|
||||
}
|
||||
jobs <- job{url: u}
|
||||
jobCount++
|
||||
}
|
||||
close(jobs)
|
||||
|
||||
@@ -51,9 +57,8 @@ func CheckURLs(ctx context.Context, urls []string, sources map[string][]string,
|
||||
if concurrency <= 0 {
|
||||
concurrency = 8
|
||||
}
|
||||
// Use atomic counters to avoid race conditions
|
||||
var processed int64
|
||||
var pending int64 = int64(len(urls))
|
||||
// Set pending to actual number of jobs enqueued
|
||||
pending = jobCount
|
||||
|
||||
worker := func() {
|
||||
for j := range jobs {
|
||||
|
||||
Reference in New Issue
Block a user