mirror of https://github.com/harness/drone.git
parent
a0461fd870
commit
32196d481f
|
@ -681,7 +681,7 @@ func (d *driver) GetContent(ctx context.Context, path string) ([]byte, error) {
|
|||
|
||||
// PutContent stores the []byte content at a location designated by "path".
|
||||
func (d *driver) PutContent(ctx context.Context, path string, contents []byte) error {
|
||||
log.Ctx(ctx).Debug().Msgf("[AWS] PutContent: %s", path)
|
||||
log.Ctx(ctx).Trace().Msgf("[AWS] PutContent: %s", path)
|
||||
_, err := d.S3.PutObjectWithContext(
|
||||
ctx, &s3.PutObjectInput{
|
||||
Bucket: aws.String(d.Bucket),
|
||||
|
@ -700,7 +700,7 @@ func (d *driver) PutContent(ctx context.Context, path string, contents []byte) e
|
|||
// Reader retrieves an io.ReadCloser for the content stored at "path" with a
|
||||
// given byte offset.
|
||||
func (d *driver) Reader(ctx context.Context, path string, offset int64) (io.ReadCloser, error) {
|
||||
log.Ctx(ctx).Debug().Msgf("[AWS] GetObject: %s", path)
|
||||
log.Ctx(ctx).Trace().Msgf("[AWS] GetObject: %s", path)
|
||||
resp, err := d.S3.GetObjectWithContext(
|
||||
ctx, &s3.GetObjectInput{
|
||||
Bucket: aws.String(d.Bucket),
|
||||
|
@ -728,7 +728,7 @@ func (d *driver) Reader(ctx context.Context, path string, offset int64) (io.Read
|
|||
func (d *driver) Writer(ctx context.Context, path string, appendMode bool) (storagedriver.FileWriter, error) {
|
||||
key := d.s3Path(path)
|
||||
if !appendMode {
|
||||
log.Ctx(ctx).Debug().Msgf("[AWS] CreateMultipartUpload: %s", path)
|
||||
log.Ctx(ctx).Trace().Msgf("[AWS] CreateMultipartUpload: %s", path)
|
||||
resp, err := d.S3.CreateMultipartUploadWithContext(
|
||||
ctx, &s3.CreateMultipartUploadInput{
|
||||
Bucket: aws.String(d.Bucket),
|
||||
|
@ -751,7 +751,7 @@ func (d *driver) Writer(ctx context.Context, path string, appendMode bool) (stor
|
|||
Prefix: aws.String(key),
|
||||
}
|
||||
for {
|
||||
log.Ctx(ctx).Debug().Msgf("[AWS] ListMultipartUploads: %s", path)
|
||||
log.Ctx(ctx).Trace().Msgf("[AWS] ListMultipartUploads: %s", path)
|
||||
resp, err := d.S3.ListMultipartUploadsWithContext(ctx, listMultipartUploadsInput)
|
||||
if err != nil {
|
||||
return nil, parseError(path, err)
|
||||
|
@ -767,7 +767,7 @@ func (d *driver) Writer(ctx context.Context, path string, appendMode bool) (stor
|
|||
}
|
||||
|
||||
if fi.Size() == 0 {
|
||||
log.Ctx(ctx).Debug().Msgf("[AWS] CreateMultipartUpload: %s", path)
|
||||
log.Ctx(ctx).Trace().Msgf("[AWS] CreateMultipartUpload: %s", path)
|
||||
resp, err := d.S3.CreateMultipartUploadWithContext(
|
||||
ctx, &s3.CreateMultipartUploadInput{
|
||||
Bucket: aws.String(d.Bucket),
|
||||
|
@ -796,7 +796,7 @@ func (d *driver) Writer(ctx context.Context, path string, appendMode bool) (stor
|
|||
continue
|
||||
}
|
||||
|
||||
log.Debug().Msgf("[AWS] ListParts: %s", path)
|
||||
log.Ctx(ctx).Trace().Msgf("[AWS] ListParts: %s", path)
|
||||
partsList, err := d.S3.ListPartsWithContext(
|
||||
ctx, &s3.ListPartsInput{
|
||||
Bucket: aws.String(d.Bucket),
|
||||
|
@ -809,7 +809,7 @@ func (d *driver) Writer(ctx context.Context, path string, appendMode bool) (stor
|
|||
}
|
||||
allParts = append(allParts, partsList.Parts...)
|
||||
for *partsList.IsTruncated {
|
||||
log.Debug().Msgf("[AWS] ListParts: %s", path)
|
||||
log.Ctx(ctx).Trace().Msgf("[AWS] ListParts: %s", path)
|
||||
partsList, err = d.S3.ListPartsWithContext(
|
||||
ctx, &s3.ListPartsInput{
|
||||
Bucket: aws.String(d.Bucket),
|
||||
|
@ -841,7 +841,7 @@ func (d *driver) Writer(ctx context.Context, path string, appendMode bool) (stor
|
|||
// Stat retrieves the FileInfo for the given path, including the current size
|
||||
// in bytes and the creation time.
|
||||
func (d *driver) Stat(ctx context.Context, path string) (storagedriver.FileInfo, error) {
|
||||
log.Debug().Msgf("[AWS] ListObjectsV2: %s", path)
|
||||
log.Ctx(ctx).Trace().Msgf("[AWS] ListObjectsV2: %s", path)
|
||||
resp, err := d.S3.ListObjectsV2WithContext(
|
||||
ctx, &s3.ListObjectsV2Input{
|
||||
Bucket: aws.String(d.Bucket),
|
||||
|
@ -890,7 +890,7 @@ func (d *driver) List(ctx context.Context, opath string) ([]string, error) {
|
|||
prefix = "/"
|
||||
}
|
||||
|
||||
log.Debug().Msgf("[AWS] ListObjectsV2: %s", path)
|
||||
log.Ctx(ctx).Trace().Msgf("[AWS] ListObjectsV2: %s", path)
|
||||
resp, err := d.S3.ListObjectsV2WithContext(
|
||||
ctx, &s3.ListObjectsV2Input{
|
||||
Bucket: aws.String(d.Bucket),
|
||||
|
@ -920,7 +920,7 @@ func (d *driver) List(ctx context.Context, opath string) ([]string, error) {
|
|||
}
|
||||
|
||||
if *resp.IsTruncated {
|
||||
log.Debug().Msgf("[AWS] ListObjectsV2: %s", path)
|
||||
log.Ctx(ctx).Trace().Msgf("[AWS] ListObjectsV2: %s", path)
|
||||
resp, err = d.S3.ListObjectsV2WithContext(
|
||||
ctx, &s3.ListObjectsV2Input{
|
||||
Bucket: aws.String(d.Bucket),
|
||||
|
@ -973,7 +973,7 @@ func (d *driver) copy(ctx context.Context, sourcePath string, destPath string) e
|
|||
}
|
||||
|
||||
if fileInfo.Size() <= d.MultipartCopyThresholdSize {
|
||||
log.Debug().Msgf("[AWS] CopyObject: %s -> %s", sourcePath, destPath)
|
||||
log.Ctx(ctx).Trace().Msgf("[AWS] CopyObject: %s -> %s", sourcePath, destPath)
|
||||
_, err := d.S3.CopyObjectWithContext(
|
||||
ctx, &s3.CopyObjectInput{
|
||||
Bucket: aws.String(d.Bucket),
|
||||
|
@ -992,7 +992,7 @@ func (d *driver) copy(ctx context.Context, sourcePath string, destPath string) e
|
|||
return nil
|
||||
}
|
||||
|
||||
log.Debug().Msgf("[AWS] CreateMultipartUpload: %s", destPath)
|
||||
log.Ctx(ctx).Trace().Msgf("[AWS] CreateMultipartUpload: %s", destPath)
|
||||
createResp, err := d.S3.CreateMultipartUploadWithContext(
|
||||
ctx, &s3.CreateMultipartUploadInput{
|
||||
Bucket: aws.String(d.Bucket),
|
||||
|
@ -1022,7 +1022,7 @@ func (d *driver) copy(ctx context.Context, sourcePath string, destPath string) e
|
|||
if lastByte >= fileInfo.Size() {
|
||||
lastByte = fileInfo.Size() - 1
|
||||
}
|
||||
log.Debug().Msgf("[AWS] [%d] UploadPartCopy: %s -> %s", i, sourcePath, destPath)
|
||||
log.Ctx(ctx).Trace().Msgf("[AWS] [%d] UploadPartCopy: %s -> %s", i, sourcePath, destPath)
|
||||
uploadResp, err := d.S3.UploadPartCopyWithContext(
|
||||
ctx, &s3.UploadPartCopyInput{
|
||||
Bucket: aws.String(d.Bucket),
|
||||
|
@ -1051,7 +1051,7 @@ func (d *driver) copy(ctx context.Context, sourcePath string, destPath string) e
|
|||
}
|
||||
}
|
||||
|
||||
log.Debug().Msgf("[AWS] CompleteMultipartUpload: %s", destPath)
|
||||
log.Ctx(ctx).Trace().Msgf("[AWS] CompleteMultipartUpload: %s", destPath)
|
||||
_, err = d.S3.CompleteMultipartUploadWithContext(
|
||||
ctx, &s3.CompleteMultipartUploadInput{
|
||||
Bucket: aws.String(d.Bucket),
|
||||
|
@ -1075,7 +1075,7 @@ func (d *driver) Delete(ctx context.Context, path string) error {
|
|||
|
||||
for {
|
||||
// list all the objects
|
||||
log.Debug().Msgf("[AWS] List all the objects: %s", path)
|
||||
log.Ctx(ctx).Trace().Msgf("[AWS] List all the objects: %s", path)
|
||||
resp, err := d.S3.ListObjectsV2WithContext(ctx, listObjectsInput)
|
||||
|
||||
// resp.Contents can only be empty on the first call
|
||||
|
@ -1106,7 +1106,7 @@ func (d *driver) Delete(ctx context.Context, path string) error {
|
|||
// 10000 keys is coincidentally (?) also the max number of keys that can be
|
||||
// deleted in a single Delete operation, so we'll just smack
|
||||
// Delete here straight away and reset the object slice when successful.
|
||||
log.Debug().Msgf("[AWS] DeleteObjects: %s", path)
|
||||
log.Ctx(ctx).Trace().Msgf("[AWS] DeleteObjects: %s", path)
|
||||
resp, err := d.S3.DeleteObjectsWithContext(
|
||||
ctx, &s3.DeleteObjectsInput{
|
||||
Bucket: aws.String(d.Bucket),
|
||||
|
@ -1151,7 +1151,7 @@ func (d *driver) Delete(ctx context.Context, path string) error {
|
|||
}
|
||||
|
||||
// RedirectURL returns a URL which may be used to retrieve the content stored at the given path.
|
||||
func (d *driver) RedirectURL(_ context.Context, method string, path string) (string, error) {
|
||||
func (d *driver) RedirectURL(ctx context.Context, method string, path string) (string, error) {
|
||||
expiresIn := 20 * time.Minute
|
||||
|
||||
var req *request.Request
|
||||
|
@ -1175,7 +1175,7 @@ func (d *driver) RedirectURL(_ context.Context, method string, path string) (str
|
|||
return "", nil
|
||||
}
|
||||
|
||||
log.Debug().Msgf("[AWS] Generating presigned URL for %s %s", method, path)
|
||||
log.Ctx(ctx).Debug().Msgf("[AWS] Generating presigned URL for %s %s", method, path)
|
||||
return req.Presign(expiresIn)
|
||||
}
|
||||
|
||||
|
@ -1251,7 +1251,7 @@ func (d *driver) doWalk(
|
|||
// for extreme edge cases but for the general use case
|
||||
// in a registry, this is orders of magnitude
|
||||
// faster than a more explicit recursive implementation.
|
||||
log.Debug().Msgf("[AWS] Listing objects in %s", path)
|
||||
log.Ctx(ctx).Trace().Msgf("[AWS] Listing objects in %s", path)
|
||||
listObjectErr := d.S3.ListObjectsV2PagesWithContext(
|
||||
ctx,
|
||||
listObjectsInput,
|
||||
|
@ -1525,7 +1525,6 @@ func (w *writer) Write(p []byte) (int, error) {
|
|||
|
||||
sort.Sort(completedUploadedParts)
|
||||
|
||||
log.Debug().Msgf("[AWS] Completing multipart upload for %s", w.key)
|
||||
_, err := w.driver.S3.CompleteMultipartUploadWithContext(
|
||||
w.ctx, &s3.CompleteMultipartUploadInput{
|
||||
Bucket: aws.String(w.driver.Bucket),
|
||||
|
@ -1537,7 +1536,6 @@ func (w *writer) Write(p []byte) (int, error) {
|
|||
},
|
||||
)
|
||||
if err != nil {
|
||||
log.Debug().Msgf("[AWS] Abort multipart upload for %s: %v", w.key, err)
|
||||
if _, aErr := w.driver.S3.AbortMultipartUploadWithContext(
|
||||
w.ctx, &s3.AbortMultipartUploadInput{
|
||||
Bucket: aws.String(w.driver.Bucket),
|
||||
|
@ -1550,7 +1548,6 @@ func (w *writer) Write(p []byte) (int, error) {
|
|||
return 0, err
|
||||
}
|
||||
|
||||
log.Debug().Msgf("[AWS] Creating new multipart upload for %s", w.key)
|
||||
resp, err := w.driver.S3.CreateMultipartUploadWithContext(
|
||||
w.ctx, &s3.CreateMultipartUploadInput{
|
||||
Bucket: aws.String(w.driver.Bucket),
|
||||
|
@ -1569,7 +1566,6 @@ func (w *writer) Write(p []byte) (int, error) {
|
|||
// If the entire written file is smaller than minChunkSize, we need to make
|
||||
// a new part from scratch :double sad face:
|
||||
if w.size < minChunkSize {
|
||||
log.Debug().Msgf("[AWS] Uploading new part for %s", w.key)
|
||||
resp, err := w.driver.S3.GetObjectWithContext(
|
||||
w.ctx, &s3.GetObjectInput{
|
||||
Bucket: aws.String(w.driver.Bucket),
|
||||
|
@ -1594,7 +1590,6 @@ func (w *writer) Write(p []byte) (int, error) {
|
|||
}
|
||||
} else {
|
||||
// Otherwise we can use the old file as the new first part
|
||||
log.Debug().Msgf("[AWS] Upload copy part for %s", w.key)
|
||||
copyPartResp, err := w.driver.S3.UploadPartCopyWithContext(
|
||||
w.ctx, &s3.UploadPartCopyInput{
|
||||
Bucket: aws.String(w.driver.Bucket),
|
||||
|
@ -1682,7 +1677,7 @@ func (w *writer) Cancel(ctx context.Context) error {
|
|||
return fmt.Errorf("already committed")
|
||||
}
|
||||
w.cancelled = true
|
||||
log.Debug().Msgf("[AWS] Abort multipart upload for %s", w.key)
|
||||
log.Ctx(ctx).Trace().Msgf("[AWS] Abort multipart upload for %s", w.key)
|
||||
_, err := w.driver.S3.AbortMultipartUploadWithContext(
|
||||
ctx, &s3.AbortMultipartUploadInput{
|
||||
Bucket: aws.String(w.driver.Bucket),
|
||||
|
@ -1693,7 +1688,7 @@ func (w *writer) Cancel(ctx context.Context) error {
|
|||
return err
|
||||
}
|
||||
|
||||
func (w *writer) Commit(_ context.Context) error {
|
||||
func (w *writer) Commit(ctx context.Context) error {
|
||||
switch {
|
||||
case w.closed:
|
||||
return fmt.Errorf("already closed")
|
||||
|
@ -1726,7 +1721,7 @@ func (w *writer) Commit(_ context.Context) error {
|
|||
// Solution: we upload the empty i.e. 0 byte part as a single part and then append it
|
||||
// to the completedUploadedParts slice used to complete the Multipart upload.
|
||||
if len(w.parts) == 0 {
|
||||
log.Debug().Msgf("[AWS] Upload empty part for %s", w.key)
|
||||
log.Ctx(ctx).Trace().Msgf("[AWS] Upload empty part for %s", w.key)
|
||||
resp, err := w.driver.S3.UploadPartWithContext(
|
||||
w.ctx, &s3.UploadPartInput{
|
||||
Bucket: aws.String(w.driver.Bucket),
|
||||
|
@ -1753,7 +1748,7 @@ func (w *writer) Commit(_ context.Context) error {
|
|||
|
||||
sort.Sort(completedUploadedParts)
|
||||
|
||||
log.Debug().Msgf("[AWS] Complete multipart upload for %s", w.key)
|
||||
log.Ctx(ctx).Trace().Msgf("[AWS] Complete multipart upload for %s", w.key)
|
||||
_, err = w.driver.S3.CompleteMultipartUploadWithContext(
|
||||
w.ctx, &s3.CompleteMultipartUploadInput{
|
||||
Bucket: aws.String(w.driver.Bucket),
|
||||
|
@ -1765,7 +1760,7 @@ func (w *writer) Commit(_ context.Context) error {
|
|||
},
|
||||
)
|
||||
if err != nil {
|
||||
log.Debug().Msgf("[AWS] Abort multipart upload for %s: %v", w.key, err)
|
||||
log.Ctx(ctx).Trace().Msgf("[AWS] Abort multipart upload for %s: %v", w.key, err)
|
||||
if _, aErr := w.driver.S3.AbortMultipartUploadWithContext(
|
||||
w.ctx, &s3.AbortMultipartUploadInput{
|
||||
Bucket: aws.String(w.driver.Bucket),
|
||||
|
@ -1787,12 +1782,10 @@ func (w *writer) flush() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
buf := bytes.NewBuffer(w.ready.data)
|
||||
partSize := buf.Len()
|
||||
partNumber := aws.Int64(int64(len(w.parts) + 1))
|
||||
|
||||
log.Debug().Msgf("[AWS] Upload part %d for %s", *partNumber, w.key)
|
||||
resp, err := w.driver.S3.UploadPartWithContext(
|
||||
w.ctx, &s3.UploadPartInput{
|
||||
Bucket: aws.String(w.driver.Bucket),
|
||||
|
@ -1802,7 +1795,6 @@ func (w *writer) flush() error {
|
|||
Body: bytes.NewReader(buf.Bytes()),
|
||||
},
|
||||
)
|
||||
log.Debug().Msgf("Elapsed1: %d, %f, %s", *partNumber, time.Since(start).Seconds(), w.key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -1814,19 +1806,15 @@ func (w *writer) flush() error {
|
|||
Size: aws.Int64(int64(partSize)),
|
||||
},
|
||||
)
|
||||
log.Debug().Msgf("Elapsed2: %d, %f, %s", *partNumber, time.Since(start).Seconds(), w.key)
|
||||
// reset the flushed buffer and swap buffers
|
||||
w.ready.Clear()
|
||||
w.ready, w.pending = w.pending, w.ready
|
||||
|
||||
// In case we have more data in the pending buffer (now ready), we need to flush it
|
||||
if w.ready.Len() > 0 {
|
||||
start = time.Now()
|
||||
err := w.flush()
|
||||
log.Debug().Msgf("Elapsed Recursive: %d, %f, %s", *partNumber, time.Since(start).Seconds(), w.key)
|
||||
return err
|
||||
}
|
||||
|
||||
log.Debug().Msgf("Elapsed exit: %d, %f, %s", *partNumber, time.Since(start).Seconds(), w.key)
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -696,7 +696,7 @@ func (r *LocalRegistry) PutManifest(
|
|||
var jsonBuf bytes.Buffer
|
||||
d, _ := digest.Parse(artInfo.Digest)
|
||||
tag := artInfo.Tag
|
||||
log.Ctx(ctx).Info().Msgf("Pushing manifest %s %s / %s", artInfo.RegIdentifier, d, tag)
|
||||
log.Ctx(ctx).Info().Msgf("Pushing manifest %s, digest: %q, tag: %s", artInfo.RegIdentifier, d, tag)
|
||||
|
||||
responseHeaders = &commons.ResponseHeaders{
|
||||
Headers: map[string]string{},
|
||||
|
@ -724,6 +724,7 @@ func (r *LocalRegistry) PutManifest(
|
|||
} else {
|
||||
if tag != "" {
|
||||
d = desc.Digest
|
||||
log.Ctx(ctx).Debug().Msgf("payload digest: %q", d)
|
||||
} else {
|
||||
errs = append(errs, errcode.ErrCodeTagInvalid.WithDetail("no tag or digest specified"))
|
||||
return responseHeaders, errs
|
||||
|
|
|
@ -499,20 +499,25 @@ func (l *manifestService) dbPutManifest(
|
|||
) error {
|
||||
switch reqManifest := manifest.(type) {
|
||||
case *schema2.DeserializedManifest:
|
||||
log.Ctx(ctx).Debug().Msgf("Putting schema2 manifest %s to database", d.String())
|
||||
if err := l.dbPutManifestSchema2(ctx, reqManifest, payload, d, repoKey, headers, info); err != nil {
|
||||
return err
|
||||
}
|
||||
return l.upsertImageAndArtifact(ctx, d, repoKey, info)
|
||||
case *ocischema.DeserializedManifest:
|
||||
log.Ctx(ctx).Debug().Msgf("Putting ocischema manifest %s to database", d.String())
|
||||
if err := l.dbPutManifestOCI(ctx, reqManifest, payload, d, repoKey, headers, info); err != nil {
|
||||
return err
|
||||
}
|
||||
return l.upsertImageAndArtifact(ctx, d, repoKey, info)
|
||||
case *manifestlist.DeserializedManifestList:
|
||||
log.Ctx(ctx).Debug().Msgf("Putting manifestlist manifest %s to database", d.String())
|
||||
return l.dbPutManifestList(ctx, reqManifest, payload, d, repoKey, headers, info)
|
||||
case *ocischema.DeserializedImageIndex:
|
||||
log.Ctx(ctx).Debug().Msgf("Putting ocischema image index %s to database", d.String())
|
||||
return l.dbPutImageIndex(ctx, reqManifest, payload, d, repoKey, headers, info)
|
||||
default:
|
||||
log.Ctx(ctx).Info().Msgf("Invalid manifest type: %T", reqManifest)
|
||||
return errcode.ErrorCodeManifestInvalid.WithDetail("manifest type unsupported")
|
||||
}
|
||||
}
|
||||
|
@ -701,6 +706,7 @@ func (l *manifestService) dbPutManifestV2(
|
|||
|
||||
// find and associate distributable manifest layer blobs
|
||||
for _, reqLayer := range mfst.DistributableLayers() {
|
||||
log.Ctx(ctx).Debug().Msgf("associating layer %s with manifest %s", reqLayer.Digest.String(), digest.String())
|
||||
dbBlob, err := l.DBFindRepositoryBlob(ctx, reqLayer, dbRepo.ID, info.Image)
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
Loading…
Reference in New Issue