working commit

This commit is contained in:
2026-02-04 18:17:36 +02:00
parent 219a4cb890
commit 55e8abcdd3
6 changed files with 287 additions and 25 deletions
+44 -3
View File
@@ -3,10 +3,17 @@ package handler
import (
"mstore/app/operator"
"mstore/app/router"
"sigs.k8s.io/yaml"
)
// HEAD /v2/<name>/blobs/<digest> 200 404
func (hand *Handler) DumpHeaders(message string, rctx *router.Context) {
headers := rctx.GetHeaders()
yamlData, _ := yaml.Marshal(headers)
hand.logg.Debugf("%s:\n%s\n", message, string(yamlData))
}
// 1 HEAD /v2/<name>/blobs/<digest> 200 404
func (hand *Handler) BlobExists(rctx *router.Context) {
name, _ := rctx.GetSubpath("name")
digest, _ := rctx.GetSubpath("digest")
@@ -27,7 +34,7 @@ func (hand *Handler) BlobExists(rctx *router.Context) {
rctx.SetStatus(code)
}
// POST /v2/<name>/blobs/uploads/ 202 404
// 2 POST /v2/<name>/blobs/uploads/ 202 404
func (hand *Handler) PostUpload(rctx *router.Context) {
name, _ := rctx.GetSubpath("name")
digest := rctx.GetQuery("digest")
@@ -56,9 +63,11 @@ func (hand *Handler) PostUpload(rctx *router.Context) {
// POST /v2/<name>/blobs/uploads/?digest=<digest> 201/202 404/400
// POST /v2/<name>/blobs/uploads/?mount=<digest>&from=<other_name> 201 404
// PATCH /v2/<name>/blobs/uploads/<reference> 202 404/416
// 3 PATCH /v2/<name>/blobs/uploads/<reference> 202 404/416
func (hand *Handler) PatchUpload(rctx *router.Context) {
hand.DumpHeaders("PatchUpload headers", rctx)
contentLength := rctx.GetHeader("Content-Length")
contentType := rctx.GetHeader("Content-Type")
name, _ := rctx.GetSubpath("name")
@@ -80,3 +89,35 @@ func (hand *Handler) PatchUpload(rctx *router.Context) {
rctx.SetHeader("Location", res.Location)
rctx.SetStatus(code)
}
// 4 PUT /v2/<name>/blobs/uploads/<reference>?digest=<digest> 202 404/416
//
// PUT /v2/<name>/uploads/<reference>?digest=<digest> 202 404/416
func (hand *Handler) PutUpload(rctx *router.Context) {
hand.DumpHeaders("PutUpload headers", rctx)
contentType := rctx.GetHeader("Content-Type")
contentLength := rctx.GetHeader("Content-Length")
contentRange := rctx.GetHeader("Content-Range")
name, _ := rctx.GetSubpath("name")
reference, _ := rctx.GetSubpath("reference")
digest := rctx.GetQuery("digest")
params := &operator.PutUploadParams{
ContentLength: contentLength,
ContentType: contentType,
ContentRange: contentRange,
Name: name,
Reference: reference,
Digest: digest,
}
res, code, err := hand.oper.PutUpload(rctx.Ctx, params)
if err != nil {
hand.logg.Errorf("PutUpload error: %v", err)
}
rctx.SetHeader("Location", res.Location)
rctx.SetStatus(code)
}
+72 -10
View File
@@ -128,26 +128,88 @@ func (oper *Operator) PatchUpload(ctx context.Context, params *PatchUploadParams
err = fmt.Errorf("Wrong Conten-Type header: %s", params.ContentType)
return res, http.StatusBadRequest, err
}
if params.ContentLength == "" {
err = fmt.Errorf("Empty Content-length header")
return res, http.StatusBadRequest, err
}
contentLength, err := strconv.ParseInt(params.ContentLength, 10, 64)
if err != nil {
err = fmt.Errorf("Wrong Content-length header")
return res, http.StatusBadRequest, err
var contentLength int64
// podman & github.com/containers/image don't set Content-length header for docker transport
if params.ContentLength != "" {
contentLength, err = strconv.ParseInt(params.ContentLength, 10, 64)
if err != nil {
err = fmt.Errorf("Wrong Content-length header")
return res, http.StatusBadRequest, err
}
}
recsize, err := oper.store.WriteUpload(params.Reference, params.Reader)
if err != nil {
return res, http.StatusInternalServerError, err
}
if recsize != contentLength {
if contentLength != 0 && recsize != contentLength {
oper.store.RemoveUpload(params.Reference)
err = fmt.Errorf("Mismatch upload recorded size and content length")
return res, http.StatusInternalServerError, err
}
res.Location = fmt.Sprintf("/v2/%s/uploads/%s", params.Name, params.Reference)
return res, http.StatusAccepted, err // http.StatusCreated
return res, http.StatusAccepted, err
}
type PutUploadParams struct {
ContentType string
ContentLength string
ContentRange string
Name string
Reference string
Digest string
Reader io.Reader
}
type PutUploadResult struct {
Location string
}
func (oper *Operator) PutUpload(ctx context.Context, params *PutUploadParams) (*PutUploadResult, int, error) {
var err error
res := &PutUploadResult{}
oper.logg.Debugf("Call PutUpload")
if params.Reference == "" {
err = fmt.Errorf("Empty reference")
return res, http.StatusBadRequest, err
}
if params.Name == "" {
err = fmt.Errorf("Empty name")
return res, http.StatusBadRequest, err
}
if params.Digest == "" {
err = fmt.Errorf("Empty digest")
return res, http.StatusBadRequest, err
}
if params.ContentType != "application/octet-stream" {
err = fmt.Errorf("Wrong conten type: %s", params.ContentType)
return res, http.StatusBadRequest, err
}
var contentLength int64
if params.ContentLength != "" {
contentLength, err = strconv.ParseInt(params.ContentLength, 10, 64)
if err != nil {
err = fmt.Errorf("Cannot convert Content-Length=%s to integer: %v", params.ContentLength, err)
return res, http.StatusBadRequest, err
}
}
if contentLength != 0 {
// TODO
err = fmt.Errorf("Unexpected Content-Length header: %s", params.ContentLength)
return res, http.StatusInternalServerError, err
Content - Range
}
err = oper.store.LinkUpload(params.Reference, params.Digest)
if err != nil {
err = fmt.Errorf("Failed to link upload %s, err: %v", params.Reference, err)
return res, http.StatusInternalServerError, err
}
res.Location = fmt.Sprintf("/v2/%s/blobs/%s", params.Name, params.Digest)
return res, http.StatusCreated, err
}
+14 -8
View File
@@ -25,6 +25,7 @@ func NewContext(writer http.ResponseWriter, request *http.Request) *Context {
return rctx
}
// Request
func (rctx *Context) GetSubpath(key string) (string, bool) {
value, exists := rctx.PathMap[key]
return value, exists
@@ -34,6 +35,19 @@ func (rctx *Context) GetQuery(key string) string {
return rctx.Request.URL.Query().Get(key)
}
func (rctx *Context) GetHeader(key string) string {
return rctx.Request.Header.Get(key)
}
func (rctx *Context) GetHeaders() http.Header {
return rctx.Request.Header
}
func (rctx *Context) GetContext() context.Context {
return rctx.Request.Context()
}
// Response
func (rctx *Context) SetHeader(key, value string) {
rctx.Writer.Header().Set(key, value)
}
@@ -52,11 +66,3 @@ func (rctx *Context) SendText(payload string) {
rctx.Writer.Header().Set("Content-Type", "text/plain")
rctx.Writer.Write([]byte(payload))
}
func (rctx *Context) GetHeader(key string) string {
return rctx.Request.Header.Get(key)
}
func (rctx *Context) GetContext() context.Context {
return rctx.Request.Context()
}
+24
View File
@@ -189,6 +189,30 @@ func (svc *Service) Build() error {
svc.rout.Patch(`/v2/{name}/blobs/uploads/{reference}`, svc.hand.PatchUpload)
// To close the session, issue a PUT request to a url in the following format,
// and with the following headers (and optional body, depending on whether or not
// the final chunk was uploaded already via a PATCH request):
//
// <location>?digest=<digest>
//
// Content-Length: <length of chunk, if present>
// Content-Range: <range of chunk, if present>
// Content-Type: application/octet-stream <if chunk provided>
//
// OPTIONAL: <final chunk byte stream>
//
// The closing PUT request MUST include the <digest> of the whole blob
// (not the final chunk) as a query parameter.
//
// The response to a successful closing of the session MUST be 201 Created,
// and MUST contain the following header:
//
// Location: <blob-location>
//
// Here, <blob-location> is a pullable blob URL.
svc.rout.Put(`/v2/{name}/uploads/{reference}`, svc.hand.PutUpload)
svc.rout.NotFound(svc.hand.NotFound)
selector := svc.rout.Selector()
+45 -4
View File
@@ -139,17 +139,36 @@ func (store *Storage) DeleteFile(collection, filename string) error {
}
const (
upsubdir = "uploads"
upsubdir = "uploads"
blobsubdir = "blobs"
)
func (store *Storage) makeUppath(upname string) string {
return filepath.Join(store.basepath, upsubdir, upname)
return filepath.Join(store.basepath, upsubdir, upname) + ".bin"
}
func (store *Storage) makeUpsubdir() string {
return filepath.Join(store.basepath, upsubdir)
}
func (store *Storage) makeBlobpath(upname string) string {
return filepath.Join(store.basepath, blobsubdir, upname) + ".bin"
}
func (store *Storage) makeBlobsubdir() string {
return filepath.Join(store.basepath, blobsubdir)
}
func (store *Storage) WriteUpload(digest string, source io.Reader) (int64, error) {
var err error
var recsize int64
uploadDir := store.makeUpsubdir()
err = os.MkdirAll(uploadDir, 0750)
if err != nil {
return recsize, err
}
uploadPath := store.makeUppath(digest)
uploadFile, err := os.OpenFile(uploadPath, os.O_WRONLY|os.O_CREATE, 0644)
if err != nil {
@@ -166,9 +185,31 @@ func (store *Storage) WriteUpload(digest string, source io.Reader) (int64, error
return recsize, err
}
func (st *Storage) RemoveUpload(digest string) error {
func (store *Storage) LinkUpload(reference, digest string) error {
var err error
uploadPath := st.makeUppath(digest)
uploadPath := store.makeUppath(reference)
blobPath := store.makeBlobpath(digest)
blobdir := store.makeBlobsubdir()
err = os.MkdirAll(blobdir, 0750)
if err != nil {
return err
}
err = os.Link(uploadPath, blobPath)
if err != nil {
return err
}
err = os.Remove(uploadPath)
if err != nil {
return err
}
return err
}
func (store *Storage) RemoveUpload(digest string) error {
var err error
uploadPath := store.makeUppath(digest)
err = os.Remove(uploadPath)
if err != nil {
return err