working commit

This commit is contained in:
2026-02-05 14:37:54 +02:00
parent 2dfb4a88b8
commit e81037d75f
10 changed files with 408 additions and 119 deletions
+52
View File
@@ -1,6 +1,9 @@
package handler package handler
import ( import (
"io"
"net/http"
"mstore/app/operator" "mstore/app/operator"
"mstore/app/router" "mstore/app/router"
@@ -125,3 +128,52 @@ func (hand *Handler) PutUpload(rctx *router.Context) {
rctx.SetHeader("Location", res.Location) rctx.SetHeader("Location", res.Location)
rctx.SetStatus(code) rctx.SetStatus(code)
} }
// GET /v2/<name>/blobs/<digest> 200 404
func (hand *Handler) GetBlob(rctx *router.Context) {
name, _ := rctx.GetSubpath("name")
digest, _ := rctx.GetSubpath("digest")
params := &operator.GetBlobParams{
Name: name,
Digest: digest,
}
ctx := rctx.GetContext()
res, code, err := hand.oper.GetBlob(ctx, params)
if err != nil {
hand.logg.Errorf("GetBlob error: %v", err)
}
rctx.SetHeader("Content-Length", res.ContentLength)
rctx.SetHeader("Content-Type", res.ContentType)
rctx.SetHeader("Docker-Content-Digest", res.DockerContentDigest)
rctx.SetStatus(code)
defer res.ReadCloser.Close()
_, err = io.Copy(rctx.Writer, res.ReadCloser)
if err != nil {
hand.logg.Errorf("GetFile error: %v", err)
rctx.SetStatus(http.StatusInternalServerError)
return
}
}
// DELETE /v2/<name>/blobs/<digest> 202 404/405
func (hand *Handler) DeleteBlob(rctx *router.Context) {
name, _ := rctx.GetSubpath("name")
digest, _ := rctx.GetSubpath("digest")
params := &operator.DeleteBlobParams{
Name: name,
Digest: digest,
}
ctx := rctx.GetContext()
_, code, err := hand.oper.DeleteBlob(ctx, params)
if err != nil {
hand.logg.Errorf("DeleteBlob error: %v", err)
}
rctx.SetStatus(code)
}
+1 -1
View File
@@ -89,7 +89,7 @@ func (hand *Handler) ManifestExists(rctx *router.Context) {
func (hand *Handler) PutManifest(rctx *router.Context) { func (hand *Handler) PutManifest(rctx *router.Context) {
hand.DumpHeaders("PutManifest headers", rctx) //hand.DumpHeaders("PutManifest headers", rctx)
contentType := rctx.GetHeader("Content-Type") contentType := rctx.GetHeader("Content-Type")
contentLength := rctx.GetHeader("Content-Length") contentLength := rctx.GetHeader("Content-Length")
+29 -3
View File
@@ -37,6 +37,23 @@ func (db *Database) GetBlobByDigest(ctx context.Context, digest string) (bool, d
return exists, res, err return exists, res, err
} }
func (db *Database) GetBlobByNameDigest(ctx context.Context, name, digest string) (bool, descr.Blob, error) {
var err error
blobs := make([]descr.Blob, 0)
res := descr.Blob{}
exists := false
request := `SELECT * FROM blobs WHERE name = $1 AND digest = $1 LIMIT 1`
err = db.db.Select(&blobs, request, name, digest)
if err != nil {
return exists, res, err
}
if len(blobs) > 0 {
res = blobs[0]
exists = true
}
return exists, res, err
}
func (db *Database) ListAllBlobs(ctx context.Context) ([]descr.Blob, error) { func (db *Database) ListAllBlobs(ctx context.Context) ([]descr.Blob, error) {
var err error var err error
blobs := make([]descr.Blob, 0) blobs := make([]descr.Blob, 0)
@@ -79,8 +96,7 @@ func (db *Database) GetBlobUsage(ctx context.Context, digest string) (int64, err
var err error var err error
var usage int64 var usage int64
count := make([]int64, 0) count := make([]int64, 0)
request := ` request := `SELECT count(id) AS count FROM blobs WHERE digest = $1`
SELECT count(id) AS count FROM blobs WHERE digest = $1`
err = db.db.Select(&count, request, digest) err = db.db.Select(&count, request, digest)
if err != nil { if err != nil {
return usage, err return usage, err
@@ -89,7 +105,7 @@ func (db *Database) GetBlobUsage(ctx context.Context, digest string) (int64, err
return usage, err return usage, err
} }
func (db *Database) DeleteBlobByDigest(ctx context.Context, digest string) error { func (db *Database) xxxxDeleteBlobByDigest(ctx context.Context, digest string) error {
var err error var err error
request := `DELETE FROM blobs WHERE digest = $1;` request := `DELETE FROM blobs WHERE digest = $1;`
_, err = db.db.Exec(request, digest) _, err = db.db.Exec(request, digest)
@@ -98,3 +114,13 @@ func (db *Database) DeleteBlobByDigest(ctx context.Context, digest string) error
} }
return err return err
} }
func (db *Database) DeleteBlobByNameDigest(ctx context.Context, name, digest string) error {
var err error
request := `DELETE FROM blobs WHERE name = $1 AND digest = $2;`
_, err = db.db.Exec(request, name, digest)
if err != nil {
return err
}
return err
}
+88
View File
@@ -24,6 +24,7 @@ func (oper *Operator) BlobExists(ctx context.Context, params *BlobExistsParams)
var err error var err error
res := &BlobExistsResult{} res := &BlobExistsResult{}
oper.logg.Debugf("Call BlobExists") oper.logg.Debugf("Call BlobExists")
if params.Digest == "" { if params.Digest == "" {
err = fmt.Errorf("Empty reference") err = fmt.Errorf("Empty reference")
return res, http.StatusBadRequest, err return res, http.StatusBadRequest, err
@@ -224,3 +225,90 @@ func (oper *Operator) PutUpload(ctx context.Context, params *PutUploadParams) (*
res.Location = fmt.Sprintf("/v2/%s/blobs/%s", params.Name, params.Digest) res.Location = fmt.Sprintf("/v2/%s/blobs/%s", params.Name, params.Digest)
return res, http.StatusCreated, err return res, http.StatusCreated, err
} }
type GetBlobParams struct {
Name string
Digest string
}
type GetBlobResult struct {
ContentLength string
ContentType string
DockerContentDigest string
ReadCloser io.ReadCloser
}
func (oper *Operator) GetBlob(ctx context.Context, params *GetBlobParams) (*GetBlobResult, int, error) {
var err error
res := &GetBlobResult{}
oper.logg.Debugf("Calling GetBlob %s:%s", params.Name, params.Digest)
if params.Name == "" {
err = fmt.Errorf("Empty name")
return res, http.StatusBadRequest, err
}
if params.Digest == "" {
err = fmt.Errorf("Empty digest")
return res, http.StatusBadRequest, err
}
blobExists, blobSize, err := oper.store.BlobExists(params.Digest)
if err != nil {
return res, http.StatusInternalServerError, err
}
if !blobExists {
oper.logg.Debugf("Blob %s:%s not exists", params.Name, params.Digest)
return res, http.StatusNotFound, err
}
_, readCloser, err := oper.store.BlobReader(params.Digest)
if err != nil {
return res, http.StatusInternalServerError, err
}
res.ContentLength = strconv.FormatInt(blobSize, 10)
res.DockerContentDigest = params.Digest
res.ReadCloser = readCloser
return res, http.StatusOK, err
}
type DeleteBlobParams struct {
Name string
Digest string
}
type DeleteBlobResult struct{}
func (oper *Operator) DeleteBlob(ctx context.Context, params *DeleteBlobParams) (*DeleteBlobResult, int, error) {
var err error
res := &DeleteBlobResult{}
oper.logg.Debugf("DeleteBlob")
if params.Digest == "" {
err = fmt.Errorf("Empty digest")
return res, http.StatusBadRequest, err
}
if params.Name == "" {
err = fmt.Errorf("Empty name")
return res, http.StatusBadRequest, err
}
// Check namespace record
exists, _, err := oper.mdb.GetBlobByNameDigest(ctx, params.Name, params.Digest)
if err != nil {
return res, http.StatusInternalServerError, err
}
if !exists {
return res, http.StatusNotFound, err
}
err = oper.mdb.DeleteBlobByNameDigest(ctx, params.Name, params.Digest)
if err != nil {
return res, http.StatusInternalServerError, err
}
// Removing blob file if usage == 0
blobUsage, err := oper.mdb.GetBlobUsage(ctx, params.Digest)
if err != nil {
return res, http.StatusInternalServerError, err
}
if blobUsage == 0 {
oper.store.DeleteBlob(params.Digest)
}
return res, http.StatusOK, err
}
+70 -89
View File
@@ -90,8 +90,7 @@ type PutManifestResult struct {
Location string Location string
} }
// TODO: control size 413 Payload Too Large // TODO: lock for the name-reference or simular?
func (oper *Operator) PutManifest(ctx context.Context, params *PutManifestParams) (*PutManifestResult, int, error) { func (oper *Operator) PutManifest(ctx context.Context, params *PutManifestParams) (*PutManifestResult, int, error) {
var err error var err error
res := &PutManifestResult{} res := &PutManifestResult{}
@@ -128,6 +127,7 @@ func (oper *Operator) PutManifest(ctx context.Context, params *PutManifestParams
if err != nil { if err != nil {
return res, http.StatusInternalServerError, err return res, http.StatusInternalServerError, err
} }
incomingManifestBytes := buffer.Bytes() incomingManifestBytes := buffer.Bytes()
if int64(len(incomingManifestBytes)) != contentLength { if int64(len(incomingManifestBytes)) != contentLength {
err = fmt.Errorf("Mismatch Content-Length and received manifest size: %d vs %d", err = fmt.Errorf("Mismatch Content-Length and received manifest size: %d vs %d",
@@ -136,7 +136,12 @@ func (oper *Operator) PutManifest(ctx context.Context, params *PutManifestParams
return res, code, err return res, code, err
} }
oper.logg.Debugf("Manifest data: [%s]", string(incomingManifestBytes)) if len(incomingManifestBytes) > (4 * 1024 * 1024) {
err = fmt.Errorf("Payload more 4M: %d bytes", len(incomingManifestBytes))
code := http.StatusRequestEntityTooLarge
return res, code, err
}
//oper.logg.Debugf("Manifest data: [%s]", string(incomingManifestBytes))
incomingManifest, err := auxoci.ParseOCIManifest(incomingManifestBytes) incomingManifest, err := auxoci.ParseOCIManifest(incomingManifestBytes)
if err != nil { if err != nil {
@@ -147,100 +152,76 @@ func (oper *Operator) PutManifest(ctx context.Context, params *PutManifestParams
incomingManifest.MediaType = params.ContentType incomingManifest.MediaType = params.ContentType
} }
manifestExists, _, err := oper.mdb.GetManifestByReference(ctx, params.Name, params.Reference) manifestExists, existengManifestDescr, err := oper.mdb.GetManifestByReference(ctx, params.Name, params.Reference)
if err != nil { if err != nil {
return res, http.StatusInternalServerError, err return res, http.StatusInternalServerError, err
} }
if !manifestExists {
name := params.Name name := params.Name
reference := params.Reference reference := params.Reference
manifestDescr, layerDescrs, err := descrsFromManifest(name, reference, incomingManifest, incomingManifestBytes)
// Check layers incomingManifestDescr, incomingLayerDescrs, err := descrsFromManifest(name, reference, incomingManifest, incomingManifestBytes)
var blobError error // Always check layer files for availability
for _, layer := range layerDescrs { var blobError error
layerExists, _, err := oper.store.BlobFileExists(layer.Digest) for _, layer := range incomingLayerDescrs {
if err != nil { layerExists, _, err := oper.store.BlobExists(layer.Digest)
return res, http.StatusInternalServerError, err
}
if !layerExists {
err := fmt.Errorf("Layer %s not found", layer.Digest)
blobError = errors.Join(blobError, err)
}
}
if blobError != nil {
return res, http.StatusInternalServerError, blobError
}
// Store manifest and layesrs data
err = oper.mdb.InsertManifestWithLayers(ctx, &manifestDescr, layerDescrs)
if err != nil { if err != nil {
return res, http.StatusInternalServerError, err return res, http.StatusInternalServerError, err
} }
if !layerExists {
err := fmt.Errorf("Layer %s not found.", layer.Digest)
blobError = errors.Join(blobError, err)
}
} }
if blobError != nil {
return res, http.StatusInternalServerError, blobError
}
if !manifestExists {
// Store manifest and layesrs data
err = oper.mdb.InsertManifestWithLayers(ctx, &incomingManifestDescr, incomingLayerDescrs)
if err != nil {
return res, http.StatusInternalServerError, err
}
} else {
/* TODO: only update descr
if bytes.Equal(existingManifestBytes, incomingManifestBytes) {
return res, http.StatusCreated, err
}
*/
/* existingManifestBytes := []byte(existengManifestDescr.Payload)
exists, existingManifestDescr, err := lg.mdb.GetManifestByReference(params.Name, params.Reference) existingManifest, err := auxoci.ParseOCIManifest(existingManifestBytes)
if err != nil { if err != nil {
return res, http.StatusInternalServerError, err return res, http.StatusInternalServerError, err
} }
addedBlobDescrs, uselessBlobDescrs, err := layersDiff(name, reference,
if exists { existingManifest, incomingManifest, incomingManifestBytes)
existingManifestBytes := []byte(existingManifestDescr.Payload) if err != nil {
// Exist if incoming and existing manyfest is equal return res, http.StatusInternalServerError, err
if bytes.Equal(existingManifestBytes, incomingManifestBytes) { }
return res, http.StatusCreated, err // Starting manifest and blobs transaction
} err = oper.mdb.UpdateManifestWithBlobs(ctx, &incomingManifestDescr, addedBlobDescrs, uselessBlobDescrs)
name := params.Name if err != nil {
reference := params.Reference return res, http.StatusInternalServerError, err
manifestDescr, newBlobDescrs, delBlobDescrs, err := blobsDiff(name, reference, existingManifestBytes, incomingManifestBytes) }
if err != nil { for _, blob := range uselessBlobDescrs {
return res, http.StatusInternalServerError, err exists, _, err := oper.store.BlobExists(blob.Digest)
} if err != nil {
err = lg.maindb.UpdateManifest(ctx, &manifestDescr, newBlobDescrs, delBlobDescrs) return res, http.StatusInternalServerError, err
if err != nil { }
return res, http.StatusInternalServerError, err blobUsage, err := oper.mdb.GetBlobUsage(ctx, blob.Digest)
} if err != nil {
// Clean blobs return res, http.StatusInternalServerError, err
for _, blob := range delBlobDescrs { }
exists, _, err = lg.st.BlobFileExists(blob.Digest) if exists && blobUsage == 0 {
if err != nil { oper.logg.Debugf("Delete file %s:%s blob %s", name, reference, blob.Digest)
return res, http.StatusInternalServerError, err err = oper.store.DeleteBlob(blob.Digest)
} if err != nil {
blobUsage, err := lg.maindb.GetBlobUsage(blob.Digest) return res, http.StatusInternalServerError, err
if err != nil { }
return res, http.StatusInternalServerError, err }
} }
if exists && blobUsage == 0 { }
lg.log.Debugf("Delete file %s:%s blob %s", params.Name, params.Reference, blob.Digest)
err = lg.st.DeleteBlobFile(blob.Digest)
if err != nil {
return res, http.StatusInternalServerError, err
}
}
}
} else {
name := params.Name
reference := params.Reference
manifestDescr, configDescr, layerDescrs, err := descrsFromManifestBytes(name, reference, incomingManifestBytes)
// Check layer blobs
var blobError error
for _, layer := range layerDescrs {
exists, _, err = lg.st.BlobFileExists(layer.Digest)
if !exists {
err := fmt.Errorf("Blob %s not exists", layer.Digest)
blobError = errors.Join(blobError, err)
}
}
if blobError != nil {
return res, http.StatusInternalServerError, blobError
}
// Store manifest and blobs data
err = lg.maindb.InsertManifest(ctx, &manifestDescr, &configDescr, layerDescrs)
if err != nil {
return res, http.StatusInternalServerError, err
}
}
*/
res.Location = fmt.Sprintf(`/v2/%s/manifests/%s`, params.Name, params.Reference) res.Location = fmt.Sprintf(`/v2/%s/manifests/%s`, params.Name, params.Reference)
return res, http.StatusCreated, err return res, http.StatusCreated, err
+65
View File
@@ -63,3 +63,68 @@ func descrsFromManifest(name, reference string, manifest *ocispec.Manifest, rawM
} }
return manifestDescr, blobDescrs, err return manifestDescr, blobDescrs, err
} }
func layersDiff(name, reference string, existingManifest, incomingManifest *ocispec.Manifest, rawManifest []byte) ([]descr.Blob, []descr.Blob, error) {
var err error
newBlobDescrs := make([]descr.Blob, 0)
delBlobDescrs := make([]descr.Blob, 0)
// Calculating old layers
delLayers := make([]ocispec.Descriptor, 0)
for _, oldLayer := range existingManifest.Layers {
uniq := true
for _, newLayer := range incomingManifest.Layers {
if oldLayer.Digest == newLayer.Digest {
uniq = false
}
}
if uniq {
delLayers = append(delLayers, oldLayer)
}
}
// Calculating new layers
newLayers := make([]ocispec.Descriptor, 0)
for _, newLayer := range incomingManifest.Layers {
uniq := true
for _, oldLayer := range existingManifest.Layers {
if oldLayer.Digest == newLayer.Digest {
uniq = false
}
}
if uniq {
newLayers = append(newLayers, newLayer)
}
}
if incomingManifest.Config.Digest != existingManifest.Config.Digest {
delLayers = append(delLayers, existingManifest.Config)
newLayers = append(newLayers, incomingManifest.Config)
}
// Converting to new blob
timestamp := auxtool.TimeNow()
for _, layer := range newLayers {
blobDescr := descr.Blob{
ID: auxuuid.NewUUID(),
Name: name,
Reference: reference,
MediaType: layer.MediaType,
Digest: string(layer.Digest),
Size: layer.Size,
CreatedAt: timestamp,
UpdatedAt: timestamp,
}
newBlobDescrs = append(newBlobDescrs, blobDescr)
}
// Converting to old blobs
for _, layer := range delLayers {
blobDescr := descr.Blob{
ID: auxuuid.NewUUID(),
Name: name,
Reference: reference,
MediaType: layer.MediaType,
Digest: string(layer.Digest),
Size: layer.Size,
}
delBlobDescrs = append(delBlobDescrs, blobDescr)
}
return newBlobDescrs, delBlobDescrs, err
}
+38
View File
@@ -213,6 +213,44 @@ func (svc *Service) Build() error {
svc.rout.Put(`/v2/{name}/uploads/{reference}`, svc.hand.PutUpload) svc.rout.Put(`/v2/{name}/uploads/{reference}`, svc.hand.PutUpload)
// Pulling blobs
//
// To pull a blob, perform a GET request to a URL in the following form:
//
// /v2/<name>/blobs/<digest> end-2
//
// <name> is the namespace of the repository, and <digest> is the blob's digest.
//
// A GET request to an existing blob URL MUST provide the expected blob,
// with a response code that MUST be 200 OK.
// A successful response SHOULD contain the digest of the uploaded blob in the header Docker-Content-Digest.
// If present, the value of this header MUST be a digest matching that of the response body.
// Most clients MAY ignore the value, but if it is used, the client MUST verify the value
// matches the returned response body. Clients SHOULD verify that the response body
// matches the requested digest.
//
// If the blob is not found in the repository, the response code MUST be 404 Not Found.
//
// A registry SHOULD support the Range request header in accordance with RFC 9110.
svc.rout.Get(`/v2/{name}/blobs/{digest}`, svc.hand.GetBlob)
// Deleting Blobs
//
// To delete a blob, perform a DELETE request to a path in the following format:
//
// /v2/<name>/blobs/<digest>
//
// <name> is the namespace of the repository, and <digest> is the digest
// of the blob to be deleted.
//
// Upon success, the registry MUST respond with code 202 Accepted.
// If the blob is not found, a 404 Not Found code MUST be returned.
// If blob deletion is disabled, the registry MUST respond with either
// a 400 Bad Request or a 405 Method Not Allowed.
svc.rout.Delete(`/v2/{name}/blobs/{digest}`, svc.hand.DeleteBlob)
svc.rout.NotFound(svc.hand.NotFound) svc.rout.NotFound(svc.hand.NotFound)
selector := svc.rout.Selector() selector := svc.rout.Selector()
+39 -1
View File
@@ -1,6 +1,7 @@
package storage package storage
import ( import (
"bytes"
"crypto/sha256" "crypto/sha256"
"encoding/hex" "encoding/hex"
"errors" "errors"
@@ -309,7 +310,7 @@ func (store *Storage) WriteBlob(digest string, source io.Reader) (int64, string,
return recsize, recsum, err return recsize, recsum, err
} }
func (st *Storage) BlobFileExists(digest string) (bool, int64, error) { func (st *Storage) BlobExists(digest string) (bool, int64, error) {
var err error var err error
var fileSize int64 var fileSize int64
@@ -326,3 +327,40 @@ func (st *Storage) BlobFileExists(digest string) (bool, int64, error) {
fileSize = fileStat.Size() fileSize = fileStat.Size()
return true, fileSize, err return true, fileSize, err
} }
func (store *Storage) BlobReader(digest string) (int64, io.ReadCloser, error) {
var err error
var filesize int64
blobpath := store.makeBlobpath(digest)
emptyReadCloser := io.NopCloser(bytes.NewReader(nil))
file, err := os.OpenFile(blobpath, os.O_RDONLY, 0)
if err != nil {
return filesize, emptyReadCloser, err
}
defer func() {
if err != nil {
file.Close()
}
}()
filestat, err := file.Stat()
if err != nil {
return filesize, emptyReadCloser, err
}
filesize = filestat.Size()
return filesize, file, err
}
func (store *Storage) DeleteBlob(digest string) error {
var err error
blobpath := store.makeBlobpath(digest)
err = os.Remove(blobpath)
if errors.Is(err, os.ErrNotExist) {
err = nil
}
if err != nil {
return err
}
return err
}
+2 -2
View File
@@ -7,9 +7,9 @@ import (
ocispec "github.com/opencontainers/image-spec/specs-go/v1" ocispec "github.com/opencontainers/image-spec/specs-go/v1"
) )
func ParseOCIManifest(source []byte) (*ocispec.Manifest, error) { func ParseOCIManifest(rawManifest []byte) (*ocispec.Manifest, error) {
manifest := &ocispec.Manifest{} manifest := &ocispec.Manifest{}
err := json.Unmarshal(source, &manifest) err := json.Unmarshal(rawManifest, &manifest)
if err != nil { if err != nil {
err = fmt.Errorf("Manifest parsing error: %v", err) err = fmt.Errorf("Manifest parsing error: %v", err)
return manifest, err return manifest, err
+24 -23
View File
@@ -1,33 +1,34 @@
package auxtool package auxtool
import ( import (
"path/filepath" "os"
"os" "path/filepath"
"strings" "strings"
) )
// Clean only overbase elements of dir path if possible
func CleanDirs(basedir, datadir string) { func CleanDirs(basedir, datadir string) {
separator := string(os.PathSeparator) separator := string(os.PathSeparator)
basedir = filepath.Clean(separator + basedir) basedir = filepath.Clean(separator + basedir)
datadir = filepath.Clean(separator + datadir) datadir = filepath.Clean(separator + datadir)
items := strings.Split(datadir, separator) items := strings.Split(datadir, separator)
for i := len(items); i > 0; i-- { for i := len(items); i > 0; i-- {
p := filepath.Join(items[0:i]...) p := filepath.Join(items[0:i]...)
p = filepath.Clean(separator + p) p = filepath.Clean(separator + p)
if p == basedir { if p == basedir {
break break
} }
fileInfo, err := os.Stat(p) fileInfo, err := os.Stat(p)
if err != nil { if err != nil {
return return
} }
if fileInfo.IsDir() { if fileInfo.IsDir() {
err = os.Remove(p) err = os.Remove(p)
if err != nil { if err != nil {
return return
} }
} }
} }
} }