working commit
This commit is contained in:
@@ -19,6 +19,6 @@ func NewHandler(params *HandlerParams) (*Handler, error) {
|
||||
hand := &Handler{
|
||||
oper: params.Operator,
|
||||
}
|
||||
hand.logg = logger.NewLogger("handler")
|
||||
hand.logg = logger.NewLoggerWithSubject("handler")
|
||||
return hand, err
|
||||
}
|
||||
|
||||
+29
-6
@@ -11,25 +11,44 @@ import (
|
||||
|
||||
var (
|
||||
mtx sync.Mutex
|
||||
output io.Writer = os.Stderr
|
||||
output io.WriteCloser = os.Stderr
|
||||
)
|
||||
|
||||
type Logger struct {
|
||||
subject string
|
||||
writer io.WriteCloser
|
||||
mtx *sync.Mutex
|
||||
}
|
||||
|
||||
func NewLogger(subj string) *Logger {
|
||||
func NewLoggerWithSubject(subj string) *Logger {
|
||||
return &Logger{
|
||||
subject: subj,
|
||||
writer: output,
|
||||
mtx: &mtx,
|
||||
}
|
||||
}
|
||||
|
||||
func SetWriter(newOut io.Writer) {
|
||||
func NewLogger() *Logger {
|
||||
return &Logger{
|
||||
writer: output,
|
||||
mtx: &mtx,
|
||||
}
|
||||
}
|
||||
|
||||
func SetWriter(newOut io.WriteCloser) {
|
||||
mtx.Lock()
|
||||
output = newOut
|
||||
mtx.Unlock()
|
||||
}
|
||||
|
||||
func (logg *Logger) SetWriter(newOut io.WriteCloser) {
|
||||
mtx.Lock()
|
||||
logg.writer = newOut
|
||||
var newMtx sync.Mutex
|
||||
logg.mtx = &newMtx
|
||||
mtx.Unlock()
|
||||
}
|
||||
|
||||
func (logg *Logger) Debugf(message string, args ...any) {
|
||||
logg.printf("debug", message, args...)
|
||||
}
|
||||
@@ -49,10 +68,14 @@ func (logg *Logger) Errorf(message string, args ...any) {
|
||||
func (logg *Logger) printf(level, message string, args ...any) {
|
||||
timestamp := time.Now().Format(time.RFC3339)
|
||||
buffer := bytes.NewBuffer([]byte{})
|
||||
fmt.Fprintf(buffer, "%s %s.%s: ", timestamp, logg.subject, level)
|
||||
if logg.subject != "" {
|
||||
fmt.Fprintf(buffer, "%s %s.%s: ", timestamp, logg.subject, level)
|
||||
} else {
|
||||
fmt.Fprintf(buffer, "%s %s: ", timestamp, level)
|
||||
}
|
||||
fmt.Fprintf(buffer, message, args...)
|
||||
fmt.Fprintf(buffer, "\n")
|
||||
mtx.Lock()
|
||||
logg.mtx.Lock()
|
||||
fmt.Fprint(output, buffer.String())
|
||||
mtx.Unlock()
|
||||
logg.mtx.Unlock()
|
||||
}
|
||||
|
||||
@@ -19,7 +19,7 @@ type Database struct {
|
||||
func NewDatabase(datapath string) *Database {
|
||||
return &Database{
|
||||
datapath: datapath,
|
||||
log: logger.NewLogger("maindb"),
|
||||
log: logger.NewLoggerWithSubject("maindb"),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
+13
-7
@@ -100,6 +100,7 @@ type PatchUploadResult struct {
|
||||
Range string
|
||||
}
|
||||
|
||||
// TODO: partial uploading by range?
|
||||
func (oper *Operator) PatchUpload(ctx context.Context, params *PatchUploadParams) (*PatchUploadResult, int, error) {
|
||||
var err error
|
||||
res := &PatchUploadResult{}
|
||||
@@ -129,7 +130,9 @@ func (oper *Operator) PatchUpload(ctx context.Context, params *PatchUploadParams
|
||||
}
|
||||
var contentLength int64
|
||||
|
||||
// Unfortunately, podman & github.com/containers/image don't set Content-length header for docker transport
|
||||
// Unfortunately, podman and used github.com/containers/image don't sent
|
||||
// Content-length header for docker transport
|
||||
|
||||
if params.ContentLength != "" {
|
||||
contentLength, err = strconv.ParseInt(params.ContentLength, 10, 64)
|
||||
if err != nil {
|
||||
@@ -265,6 +268,11 @@ type DeleteBlobParams struct {
|
||||
}
|
||||
type DeleteBlobResult struct{}
|
||||
|
||||
// Removing an individual layer is very probably to compromise data integrity.
|
||||
// - If the data layers are to be reused, this is like shooting yourself in the foot.
|
||||
// - It also prevents the manifest from being issued, as one
|
||||
// of the layers is missing.
|
||||
|
||||
func (oper *Operator) DeleteBlob(ctx context.Context, params *DeleteBlobParams) (*DeleteBlobResult, int, error) {
|
||||
var err error
|
||||
res := &DeleteBlobResult{}
|
||||
@@ -282,25 +290,23 @@ func (oper *Operator) DeleteBlob(ctx context.Context, params *DeleteBlobParams)
|
||||
if err != nil {
|
||||
return res, http.StatusInternalServerError, err
|
||||
}
|
||||
oper.logg.Debugf("Blob %s:%s descr exists %v", params.Name, params.Digest, descrExists)
|
||||
|
||||
if !descrExists {
|
||||
return res, http.StatusNotFound, err
|
||||
}
|
||||
|
||||
// Deleting blob record
|
||||
oper.logg.Warningf("Deleting blob record %s:%s", params.Name, params.Digest)
|
||||
err = oper.mdb.DeleteBlobByNameDigest(ctx, params.Name, params.Digest)
|
||||
if err != nil {
|
||||
return res, http.StatusInternalServerError, err
|
||||
}
|
||||
// Removing blob file if usage == 0
|
||||
// Removing the blob binary if usage == 0
|
||||
blobUsage, err := oper.mdb.GetBlobUsage(ctx, params.Digest)
|
||||
|
||||
oper.logg.Debugf("Blob %s have usage %d", params.Digest, blobUsage)
|
||||
|
||||
if err != nil {
|
||||
return res, http.StatusInternalServerError, err
|
||||
}
|
||||
if blobUsage == 0 {
|
||||
oper.logg.Warningf("Deleting useless blob binary %s", params.Digest)
|
||||
oper.store.DeleteBlob(params.Digest)
|
||||
}
|
||||
return res, http.StatusOK, err
|
||||
|
||||
@@ -261,6 +261,7 @@ func (oper *Operator) GetManifest(ctx context.Context, params *GetManifestParams
|
||||
|
||||
manifestDescr := descr.Manifest{}
|
||||
var exists bool
|
||||
// TODO: checking layers?
|
||||
if stringLikeSHADigest(params.Reference) {
|
||||
digest := normalizeSHADigest(params.Reference)
|
||||
exists, manifestDescr, err = oper.mdb.GetManifestByDigest(ctx, params.Name, digest)
|
||||
|
||||
@@ -23,6 +23,6 @@ func NewOperator(params *OperatorParams) (*Operator, error) {
|
||||
mdb: params.MainDB,
|
||||
store: params.Store,
|
||||
}
|
||||
oper.logg = logger.NewLogger("operator")
|
||||
oper.logg = logger.NewLoggerWithSubject("operator")
|
||||
return oper, err
|
||||
}
|
||||
|
||||
@@ -30,7 +30,7 @@ type Server struct {
|
||||
func NewServer() (*Server, error) {
|
||||
var err error
|
||||
srv := &Server{}
|
||||
srv.logg = logger.NewLogger("server")
|
||||
srv.logg = logger.NewLoggerWithSubject("server")
|
||||
return srv, err
|
||||
}
|
||||
|
||||
|
||||
@@ -46,7 +46,7 @@ func NewService(params *ServiceParams) (*Service, error) {
|
||||
portnum: params.Portnum,
|
||||
address: params.Address,
|
||||
}
|
||||
svc.logg = logger.NewLogger("service")
|
||||
svc.logg = logger.NewLoggerWithSubject("service")
|
||||
return svc, err
|
||||
}
|
||||
|
||||
|
||||
@@ -29,7 +29,7 @@ func NewStorage(basepath string) *Storage {
|
||||
res := &Storage{
|
||||
basepath: basepath,
|
||||
}
|
||||
res.logg = logger.NewLogger("storage")
|
||||
res.logg = logger.NewLoggerWithSubject("storage")
|
||||
return res
|
||||
}
|
||||
|
||||
|
||||
@@ -69,7 +69,7 @@ func TestImageLife(t *testing.T) {
|
||||
fmt.Printf("=== PushImage ===\n")
|
||||
cli := NewClient()
|
||||
ctx := context.Background()
|
||||
err := cli.PushImage(ctx, "test-oci.img", srvaddr+"/foo/testapp:v123", 1*time.Second)
|
||||
err := cli.PushImage(ctx, "test-oci.img", srvaddr+"/foo/test:123", 1*time.Second)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
|
||||
@@ -1,85 +0,0 @@
|
||||
package test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"mstore/app/server"
|
||||
"mstore/pkg/client"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func xxxTestService(t *testing.T) {
|
||||
var srvport int64 = 10250
|
||||
srvdir := t.TempDir()
|
||||
srvaddr := fmt.Sprintf("127.0.0.1:%d", srvport)
|
||||
|
||||
srv, err := server.NewServer()
|
||||
require.NoError(t, err)
|
||||
{
|
||||
err = srv.Configure()
|
||||
require.NoError(t, err)
|
||||
|
||||
srv.SetDatadir(srvdir)
|
||||
srv.SetLogdir(srvdir)
|
||||
srv.SetRundir(srvdir)
|
||||
srv.SetPort(srvport)
|
||||
|
||||
err = srv.Build()
|
||||
require.NoError(t, err)
|
||||
|
||||
var svcWG sync.WaitGroup
|
||||
errPipe := make(chan error, 5)
|
||||
|
||||
startFunc := func() {
|
||||
err := srv.Service().Run()
|
||||
errPipe <- err
|
||||
svcWG.Done()
|
||||
}
|
||||
|
||||
stopFunc := func() {
|
||||
srv.Service().Stop()
|
||||
svcWG.Wait()
|
||||
err = <-errPipe
|
||||
require.NoError(t, err)
|
||||
}
|
||||
defer stopFunc()
|
||||
|
||||
svcWG.Add(1)
|
||||
go startFunc()
|
||||
time.Sleep(1 * time.Second)
|
||||
}
|
||||
{
|
||||
fmt.Printf("=== ServiceHello ===\n")
|
||||
cli := client.NewClient()
|
||||
ctx := context.Background()
|
||||
ctx, _ = context.WithTimeout(ctx, 1*time.Second)
|
||||
|
||||
helloRes, err := cli.ServiceHello(ctx, srvaddr+"/hello")
|
||||
require.NoError(t, err)
|
||||
require.True(t, helloRes)
|
||||
}
|
||||
{
|
||||
tmpdir := t.TempDir()
|
||||
tmpfile := filepath.Join(tmpdir, "foo.bin")
|
||||
|
||||
filedata := make([]byte, 16)
|
||||
err := os.WriteFile(tmpfile, filedata, 0666)
|
||||
require.NoError(t, err)
|
||||
|
||||
fmt.Printf("=== PutFile ===\n")
|
||||
cli := client.NewClient()
|
||||
ctx := context.Background()
|
||||
ctx, _ = context.WithTimeout(ctx, 1*time.Second)
|
||||
|
||||
err = cli.PutFile(ctx, tmpfile, srvaddr+"/foo.bin")
|
||||
require.NoError(t, err)
|
||||
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user