UPDATE: Add mutithread download

This commit is contained in:
2025-08-27 08:27:02 +07:00
parent a021658fa9
commit a4de02bc18
9 changed files with 164 additions and 225 deletions

View File

@@ -2,6 +2,8 @@ package gitService
import (
"archive/zip"
"encoding/json"
"firefly-launcher/pkg/models"
"fmt"
"io"
"math"
@@ -13,39 +15,37 @@ import (
"time"
)
func humanFormat(bytes int64) string {
n := float64(bytes)
func HumanFormat(bytes float64) string {
for _, unit := range []string{"", "Ki", "Mi", "Gi"} {
if math.Abs(n) < 1024.0 {
return fmt.Sprintf("%3.1f%sB", n, unit)
if math.Abs(bytes) < 1024.0 {
return fmt.Sprintf("%3.1f%sB", bytes, unit)
}
n /= 1024.0
bytes /= 1024.0
}
return fmt.Sprintf("%.1fTiB", n)
return fmt.Sprintf("%.1fTiB", bytes)
}
type WriteCounter struct {
Total uint64
StartTime time.Time
OnEmit func(percent float64, speedMBps float64)
TotalSize int64
lastLoggedPercent int
Total uint64
StartTime time.Time
OnEmit func(percent float64, speed string)
TotalSize int64
mu sync.Mutex
}
func NewWriteCounter(total int64, onEmit func(percent float64, speedMBps float64)) *WriteCounter {
func NewWriteCounter(total int64, onEmit func(percent float64, speed string)) *WriteCounter {
return &WriteCounter{
StartTime: time.Now(),
TotalSize: total,
lastLoggedPercent: -1,
OnEmit: onEmit,
StartTime: time.Now(),
TotalSize: total,
OnEmit: onEmit,
}
}
func (wc *WriteCounter) Write(p []byte) (int, error) {
n := len(p)
func (wc *WriteCounter) Add(n int) {
wc.mu.Lock()
defer wc.mu.Unlock()
wc.Total += uint64(n)
wc.PrintProgress()
return n, nil
}
func (wc *WriteCounter) PrintProgress() {
@@ -53,63 +53,109 @@ func (wc *WriteCounter) PrintProgress() {
if elapsed < 0.001 {
elapsed = 0.001
}
speed := float64(wc.Total) / 1024 / 1024 / elapsed // MB/s
speed := float64(wc.Total) / 1024 / 1024 / elapsed
percent := float64(wc.Total) / float64(wc.TotalSize) * 100
if wc.OnEmit != nil {
wc.OnEmit(percent, speed)
wc.OnEmit(percent, fmt.Sprintf("%s/s", HumanFormat(speed)))
}
}
func DownloadFile(filepath string, url string, onEmit func(percent float64, speed float64)) error {
tmpPath := filepath + ".tmp"
resp, err := http.Get(url)
// --- DownloadFileParallel ---
func (g *GitService) downloadFileParallel(filePath, url string, numParts int, onEmit func(percent float64, speed string)) (tmpPath string, err error) {
resp, err := http.Head(url)
if err != nil {
return fmt.Errorf("failed to get file: %w", err)
return "", fmt.Errorf("failed to get head: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("bad status: %s", resp.Status)
return "", fmt.Errorf("bad status: %s", resp.Status)
}
size := resp.ContentLength
tmpPath = filePath + ".tmp"
out, err := os.Create(tmpPath)
if err != nil {
return fmt.Errorf("failed to create tmp file: %w", err)
return "", fmt.Errorf("failed to create tmp file: %w", err)
}
defer out.Close()
counter := NewWriteCounter(resp.ContentLength, onEmit)
_, err = io.Copy(out, io.TeeReader(resp.Body, counter))
if closeErr := out.Close(); closeErr != nil {
return fmt.Errorf("failed to close tmp file: %w", closeErr)
}
if err != nil {
return fmt.Errorf("failed to download file: %w", err)
}
counter := NewWriteCounter(size, onEmit)
partSize := size / int64(numParts)
var wg sync.WaitGroup
var mu sync.Mutex
// Delete destination file if it exists
if _, err := os.Stat(filepath); err == nil {
if err := os.Remove(filepath); err != nil {
return fmt.Errorf("failed to remove existing file: %w", err)
for i := 0; i < numParts; i++ {
start := int64(i) * partSize
end := start + partSize - 1
if i == numParts-1 {
end = size - 1
}
_ = start
_ = end
wg.Go(func() {
req, _ := http.NewRequest("GET", url, nil)
req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", start, end))
resp, err := http.DefaultClient.Do(req)
if err != nil {
return
}
defer resp.Body.Close()
buf := make([]byte, 32*1024)
var written int64
for {
n, err := resp.Body.Read(buf)
if n > 0 {
mu.Lock()
out.Seek(start+written, 0)
out.Write(buf[:n])
mu.Unlock()
written += int64(n)
counter.Add(n)
}
if err == io.EOF {
break
}
if err != nil {
break
}
}
})
}
for i := 0; i < 3; i++ {
err = os.Rename(tmpPath, filepath)
if err == nil {
break
}
time.Sleep(300 * time.Millisecond)
}
if err != nil {
return fmt.Errorf("failed to rename after retries: %w", err)
}
return nil
wg.Wait()
return tmpPath, nil
}
func unzipParallel(src string, dest string) error {
// --- Helper getReleaseAsset ---
func (g *GitService) getReleaseAsset(version, url, fileName string) (models.AssetType, bool) {
resp, err := http.Get(url)
if err != nil {
return models.AssetType{}, false
}
defer resp.Body.Close()
body, _ := io.ReadAll(resp.Body)
var releases []*models.ReleaseType
if err := json.Unmarshal(body, &releases); err != nil || len(releases) == 0 {
return models.AssetType{}, false
}
for _, release := range releases {
if release.TagName == version {
for _, asset := range release.Assets {
if asset.Name == fileName {
return asset, true
}
}
}
}
return models.AssetType{}, false
}
func (g *GitService) unzipParallel(src string, dest string) error {
numCPU := runtime.NumCPU()
reserved := 1
@@ -137,11 +183,11 @@ func unzipParallel(src string, dest string) error {
jobs := make(chan job)
var wg sync.WaitGroup
// Worker pool
for i := 0; i < maxWorkers; i++ {
wg.Go(func() {
for j := range jobs {
err := extractFile(j.f, dest)
err := g.extractFile(j.f, dest)
if err != nil {
fmt.Printf("Error extracting %s: %v\n", j.f.Name, err)
}
@@ -149,17 +195,17 @@ func unzipParallel(src string, dest string) error {
})
}
// Feed jobs
for _, f := range r.File {
jobs <- job{f}
}
close(jobs)
wg.Wait()
return nil
}
func extractFile(f *zip.File, dest string) error {
func (g *GitService) extractFile(f *zip.File, dest string) error {
fp := filepath.Join(dest, f.Name)
if f.FileInfo().IsDir() {