Skip to content

Commit

Permalink
Merge pull request #6 from Cleafy/feature/mixed-improvements
Browse files Browse the repository at this point in the history
Feature/mixed improvements
  • Loading branch information
nicolopastore authored Jun 7, 2018
2 parents 880d284 + 3cfe99d commit dbdf2ef
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 44 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ usage: promplay [<flags>]
Flags:
--help Show context-sensitive help (also try --help-long and --help-man).
--debug Enable debug mode.
--debug Enable debug mode. (VERY VERBOSE!)
--verbose (-v) Enable info-level message
--nopromcfg Disable the generation of the prometheus cfg file (prometheus.yml)
-d, --dir="/tmp" Input directory.
--version Show application version.
Expand Down
154 changes: 116 additions & 38 deletions bin/promplay/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,39 +8,47 @@ import (
"io/ioutil"
"net/http"
"os"
"path/filepath"
"sort"
"strings"
"time"

cm "github.com/cleafy/promqueen/model"
cm "github.com/Cleafy/promqueen/model"

"github.com/mattetti/filebuffer"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/storage/local"
"github.com/sirupsen/logrus"
kingpin "gopkg.in/alecthomas/kingpin.v2"
pb "gopkg.in/cheggaaa/pb.v2"
filetype "gopkg.in/h2non/filetype.v1"
)

var replayType = filetype.NewType("rep", "application/replay")

func replayMatcher(buf []byte) bool {
header, err := cm.ReadFrameHeader(filebuffer.New(buf))
if err != nil {
return false
if err == nil {
return cm.CheckVersion(header)
}
return cm.CheckVersion(header)
logrus.Errorf("Malformed frame header!")
return false
}

var (
debug = kingpin.Flag("debug", "Enable debug mode.").Bool()
nopromcfg = kingpin.Flag("nopromcfg", "Disable the generation of the prometheus cfg file (prometheus.yml)").Bool()
dir = kingpin.Flag("dir", "Input directory.").Short('d').OverrideDefaultFromEnvar("INPUT_DIRECTORY").Default(".").String()
framereader = make(<-chan cm.Frame)
Version = "unversioned"
cfgMemoryStorage = local.MemorySeriesStorageOptions{
MemoryChunks: 1024,
MaxChunksToPersist: 1024,
debug = kingpin.Flag("debug", "Enable debug mode. More verbose than --verbose").Default("false").Bool()
verbose = kingpin.Flag("verbose", "Enable info-only mode").Short('v').Default("false").Bool()
nopromcfg = kingpin.Flag("nopromcfg", "Disable the generation of the prometheus cfg file (prometheus.yml)").Bool()
dir = kingpin.Flag("dir", "Input directory.").Short('d').OverrideDefaultFromEnvar("INPUT_DIRECTORY").Default(".").String()
memoryChunk = kingpin.Flag("memoryChunk", "Maximum number of chunks in memory").Default("100000000").Int()
maxChunkToPersist = kingpin.Flag("mexChunkToPersist", "Maximum number of chunks waiting, in memory, to be written on the disk").Default("100000000").Int()
framereader = make(<-chan cm.Frame)
Version = "unversioned"
cfgMemoryStorage = local.MemorySeriesStorageOptions{
MemoryChunks: 0,
MaxChunksToPersist: 0,
//PersistenceStoragePath:
//PersistenceRetentionPeriod:
//CheckpointInterval: time.Minute*30,
Expand All @@ -59,44 +67,62 @@ func osfile2fname(fss []os.FileInfo, dir string) []string {
return out
}

func generateFramereader() {
func generateFramereader() int {
defer func() {
if e := recover(); e != nil {
logrus.Errorf("Frame reader generation failed!, MESSAGE: %v", e)
}
}()

logrus.Infoln("Preliminary file read started...")
var count int = 0
// 1. Check for every file that is GZip or csave format and create the filemap
files, err := ioutil.ReadDir(*dir)
if err != nil {
logrus.Fatalf("generateFilemap: %v", err)
panic(err)
}
readers := make([]io.Reader, 0)

fnames := osfile2fname(files, *dir)
sort.Sort(cm.ByNumber(fnames))
sort.Sort(sort.Reverse(cm.ByNumber(fnames)))

logrus.Debugf("fnames: %v", fnames)

for _, filepath := range fnames {
logrus.Debugf("filepath: %v", filepath)
ftype, err := filetype.MatchFile(filepath)
for _, path := range fnames {
logrus.Debugf("filepath: %v", path)
ftype, err := filetype.MatchFile(path)
if err != nil {
logrus.Debugf("err %v", err)
}
if ftype.MIME.Value == "application/replay" {
f, _ := os.Open(filepath)
f, _ := os.Open(path)

count += len(cm.ReadAll(f).Data)
f.Seek(0, 0)

readers = append(readers, f)
}

if ftype.MIME.Value == "application/gzip" {
f, _ := os.Open(filepath)
logrus.Debugf("reading header: %v", filepath)
gz, _ := gzip.NewReader(f)
header, err := cm.ReadFrameHeader(gz)
if err == nil && cm.CheckVersion(header) {
f.Seek(0, io.SeekStart)
gz, _ = gzip.NewReader(f)
readers = append(readers, gz)
}
filename := filepath.Base(path)
ungzip(path, "./tmp/"+trimSuffix(filename, ".gz"))

f, _ := os.Open("./tmp/" + trimSuffix(filename, ".gz"))

count += len(cm.ReadAll(f).Data)
f.Seek(0, 0)

readers = append(readers, f)
}
}

framereader = cm.NewMultiReader(readers)
return count
}

func trimSuffix(s, suffix string) string {
if strings.HasSuffix(s, suffix) {
s = s[:len(s)-len(suffix)]
}
return s
}

func updateURLTimestamp(timestamp int64, name string, url string, body io.Reader) io.Reader {
Expand Down Expand Up @@ -137,7 +163,7 @@ func updateURLTimestamp(timestamp int64, name string, url string, body io.Reader

enc.Encode(&metrics)

count += 1
count++
}

logrus.Printf("%d metrics unmarshalled for %s", count, url)
Expand All @@ -147,7 +173,40 @@ func updateURLTimestamp(timestamp int64, name string, url string, body io.Reader
return pr
}

func ungzip(source, target string) {
defer func() {
if e := recover(); e != nil {
logrus.Errorf("Errors during decompression of %v", source)
}
}()

reader, err := os.Open(source)
if err != nil {
panic(err)
}
defer reader.Close()

archive, err := gzip.NewReader(reader)
if err != nil {
panic(err)
}
defer archive.Close()

target = filepath.Join(target, archive.Name)
writer, err := os.Create(target)
if err != nil {
panic(err)
}
defer writer.Close()

_, err = io.Copy(writer, archive)
if err != nil {
panic(err)
}
}

func main() {

kingpin.Version(Version)

kingpin.Flag("storage.path", "Directory path to create and fill the data store under.").Default("data").StringVar(&cfgMemoryStorage.PersistenceStoragePath)
Expand All @@ -163,7 +222,20 @@ func main() {
flag.Set("log.level", "debug")
}

if !*verbose {
logrus.SetLevel(logrus.ErrorLevel)
flag.Set("log.level", "error")
}

// create temp directory to store ungzipped files
os.Mkdir("./tmp", 0700)
defer os.RemoveAll("./tmp")

logrus.Infoln("Prefilling into", cfgMemoryStorage.PersistenceStoragePath)

cfgMemoryStorage.MaxChunksToPersist = *maxChunkToPersist
cfgMemoryStorage.MemoryChunks = *memoryChunk

localStorage := local.NewMemorySeriesStorage(&cfgMemoryStorage)

sampleAppender := localStorage
Expand All @@ -181,19 +253,24 @@ func main() {

filetype.AddMatcher(replayType, replayMatcher)

generateFramereader()
count := generateFramereader()

logrus.Debug("frameReader %+v", framereader)

sout := bufio.NewWriter(os.Stdout)
defer sout.Flush()

r := &http.Request{}

bar := pb.ProgressBarTemplate(`{{ red "Frames processed:" }} {{bar . | green}} {{rtime . "ETA %s" | blue }} {{percent . }}`).Start(count)
defer bar.Finish()

for frame := range framereader {
bar.Increment()
response, err := http.ReadResponse(bufio.NewReader(filebuffer.New(frame.Data)), r)
if err != nil {
logrus.Error(err)
return
logrus.Errorf("Errors occured while reading frame %d, MESSAGE: %v", frame.NameString, err)
continue
}
bytesReader := updateURLTimestamp(frame.Header.Timestamp, frame.NameString(), frame.URIString(), response.Body)

Expand All @@ -214,7 +291,7 @@ func main() {
logrus.Infoln("Ingested", len(decSamples), "metrics")

for sampleAppender.NeedsThrottling() {
logrus.Debugln("Waiting 100ms for appender to be ready for more data")
logrus.Debugln("THROTTLING: Waiting 100ms for appender to be ready for more data")
time.Sleep(time.Millisecond * 100)
}

Expand All @@ -224,25 +301,26 @@ func main() {
)

for _, s := range model.Samples(decSamples) {

if err := sampleAppender.Append(s); err != nil {
switch err {
case local.ErrOutOfOrderSample:
numOutOfOrder++
logrus.WithFields(logrus.Fields{
"sample": s,
"error": err,
}).Info("Sample discarded")
}).Error("Sample discarded")
case local.ErrDuplicateSampleForTimestamp:
numDuplicates++
logrus.WithFields(logrus.Fields{
"sample": s,
"error": err,
}).Info("Sample discarded")
}).Error("Sample discarded")
default:
logrus.WithFields(logrus.Fields{
"sample": s,
"error": err,
}).Info("Sample discarded")
}).Error("Sample discarded")
}
}
}
Expand Down
17 changes: 13 additions & 4 deletions model/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ func NewMultiReader(r []io.Reader) <-chan Frame {
}
logrus.Infof("Frames ended")
}()

return chframe
}

Expand Down Expand Up @@ -87,11 +86,19 @@ func ReadFrameHeader(r io.Reader) (*FrameHeader, error) {
// ReadFrame reads the next frame from the Reader or returns an error in
// case it cannot interpret the Frame
func ReadFrame(r io.Reader) (frame *Frame, err error) {
defer func() {
if e := recover(); e != nil {
if e.(error).Error() != "EOF" {
logrus.Errorf("Errors occured while reading frame %v, MESSAGE: %v", frame.NameString, e)
}
}
}()

frame = NewEmptyFrame()
frame.Header, err = ReadFrameHeader(r)

if err != nil {
return
panic(err)
}

// generate the correct framesize for .Data
Expand All @@ -100,14 +107,16 @@ func ReadFrame(r io.Reader) (frame *Frame, err error) {
// read the frame Data
data, err := readNextBytes(r, int64(len(frame.Data)))
if err != nil {
return
panic(err)
}

buffer := bytes.NewBuffer(data)

err = binary.Read(buffer, binary.BigEndian, frame.Data)
if err != nil {
return
panic(err)
}

logrus.Debugf("ReadFrame: frame.Data %d", frame.Data)

return
Expand Down
2 changes: 1 addition & 1 deletion model/sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
var reNumber *regexp.Regexp

func init() {
reNumber, _ = regexp.Compile("([0-9]*)$")
reNumber, _ = regexp.Compile("[0-9]+")
}

// ByNumber helper struct to sort by last number all the log files
Expand Down

0 comments on commit dbdf2ef

Please sign in to comment.