Skip to content

Commit

Permalink
perf(streamer): improved telegram reader
Browse files Browse the repository at this point in the history
  • Loading branch information
EverythingSuckz committed Nov 27, 2023
1 parent b725269 commit 34e8298
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 130 deletions.
15 changes: 10 additions & 5 deletions routes/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,16 @@ import (
"strconv"

range_parser "github.com/quantumsheep/range-parser"
"go.uber.org/zap"

"github.com/gin-gonic/gin"
)

var log *zap.Logger

func (e *allRoutes) LoadHome(r *Route) {
defer e.log.Info("Loaded stream route")
log = e.log.Named("Stream")
defer log.Info("Loaded stream route")
r.Engine.GET("/stream/:messageID", getStreamRoute)
}

Expand Down Expand Up @@ -57,6 +61,7 @@ func getStreamRoute(ctx *gin.Context) {
start = ranges[0].Start
end = ranges[0].End
ctx.Header("Content-Range", fmt.Sprintf("bytes %d-%d/%d", start, end, file.FileSize))
log.Info("Content-Range", zap.Int64("start", start), zap.Int64("end", end), zap.Int64("fileSize", file.FileSize))
w.WriteHeader(http.StatusPartialContent)
}

Expand All @@ -79,13 +84,13 @@ func getStreamRoute(ctx *gin.Context) {
ctx.Header("Content-Disposition", fmt.Sprintf("%s; filename=\"%s\"", disposition, file.FileName))

if r.Method != "HEAD" {
parts, err := utils.GetParts(ctx, bot.Bot.Client, file)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
parts = utils.RangedParts(parts, start, end)
lr, _ := utils.NewLinearReader(ctx, bot.Bot.Client, parts, contentLength)
io.CopyN(w, lr, contentLength)
lr, _ := utils.NewTelegramReader(ctx, bot.Bot.Client, file.Location, start, end, contentLength)
if _, err := io.CopyN(w, lr, contentLength); err != nil {
log.WithOptions(zap.AddStacktrace(zap.DPanicLevel)).Error(err.Error())
}
}
}
65 changes: 33 additions & 32 deletions utils/linear_reader.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
package utils

import (
"EverythingSuckz/fsb/types"
"context"
"fmt"
"io"
"log"

"github.com/gotd/td/telegram"
"github.com/gotd/td/tg"
"go.uber.org/zap"
)

type linearReader struct {
type telegramReader struct {
ctx context.Context
parts []types.Part
pos int
log *zap.Logger
client *telegram.Client
location *tg.InputDocumentFileLocation
start int64
end int64
next func() ([]byte, error)
buffer []byte
bytesread int64
Expand All @@ -24,69 +25,69 @@ type linearReader struct {
contentLength int64
}

func (*linearReader) Close() error {
func (*telegramReader) Close() error {
return nil
}

func NewLinearReader(ctx context.Context, client *telegram.Client, parts []types.Part, contentLength int64) (io.ReadCloser, error) {
func NewTelegramReader(
ctx context.Context,
client *telegram.Client,
location *tg.InputDocumentFileLocation,
start int64,
end int64,
contentLength int64,
) (io.ReadCloser, error) {

r := &linearReader{
r := &telegramReader{
ctx: ctx,
parts: parts,
log: Logger.Named("telegramReader"),
location: location,
client: client,
start: start,
end: end,
chunkSize: int64(1024 * 1024),
contentLength: contentLength,
}

r.log.Sugar().Info("Linear Reader: Start")
r.next = r.partStream()

return r, nil
}

func (r *linearReader) Read(p []byte) (n int, err error) {
func (r *telegramReader) Read(p []byte) (n int, err error) {

if r.bytesread == r.contentLength {
log.Println("Linear Reader: EOF (bytesread == contentLength)")
r.log.Sugar().Info("Linear Reader: EOF (bytesread == contentLength)")
return 0, io.EOF
}

if r.i >= int64(len(r.buffer)) {
r.buffer, err = r.next()
r.log.Sugar().Infof("Next buffer: %d", len(r.buffer))
if err != nil {
return 0, err
}
if len(r.buffer) == 0 {
r.pos++
if r.pos == len(r.parts) {
log.Println("Linear Reader: EOF (pos==n(parts))")
return 0, io.EOF
} else {
r.next = r.partStream()
r.buffer, err = r.next()
if err != nil {
return 0, err
}
r.next = r.partStream()
r.buffer, err = r.next()
if err != nil {
return 0, err
}

}
r.i = 0
}

n = copy(p, r.buffer[r.i:])

r.i += int64(n)

r.bytesread += int64(n)

return n, nil
}

func (r *linearReader) chunk(offset int64, limit int64) ([]byte, error) {
func (r *telegramReader) chunk(offset int64, limit int64) ([]byte, error) {

req := &tg.UploadGetFileRequest{
Offset: offset,
Limit: int(limit),
Location: r.parts[r.pos].Location,
Location: r.location,
}

res, err := r.client.API().UploadGetFile(r.ctx, req)
Expand All @@ -103,10 +104,10 @@ func (r *linearReader) chunk(offset int64, limit int64) ([]byte, error) {
}
}

func (r *linearReader) partStream() func() ([]byte, error) {
func (r *telegramReader) partStream() func() ([]byte, error) {

start := r.parts[r.pos].Start
end := r.parts[r.pos].End
start := r.start
end := r.end
offset := start - (start % r.chunkSize)

firstPartCut := start - offset
Expand Down
93 changes: 0 additions & 93 deletions utils/stream_helpers.go

This file was deleted.

0 comments on commit 34e8298

Please sign in to comment.