Skip to content

Commit 2ec9d02

Browse files
committed
add BoundedStream to return only a subset of a stream.
1 parent 36b8d1d commit 2ec9d02

File tree

3 files changed

+503
-0
lines changed

3 files changed

+503
-0
lines changed
Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
using System;
2+
using System.IO;
3+
using System.Threading;
4+
using System.Threading.Tasks;
5+
6+
namespace AlibabaCloud.OSS.V2.IO
7+
{
8+
/// <summary>
9+
/// Represents read-only view over the portion of underlying stream.
10+
/// </summary>
11+
public sealed class BoundedStream : Stream
12+
{
13+
private long _length;
14+
private long _offset;
15+
16+
/// <summary>
17+
/// Creates a new BoundedStream that wraps the given stream.
18+
/// </summary>
19+
/// <param name="stream">The wrapped input stream.</param>
20+
public BoundedStream(Stream stream)
21+
{
22+
this._length = stream.Length;
23+
BaseStream = stream;
24+
}
25+
26+
/// <summary>
27+
/// Creates a new BoundedStream that wraps the given stream and limits it to a certain size.
28+
/// </summary>
29+
/// <param name="stream">The wrapped input stream.</param>
30+
/// <param name="offset">The offset in the underlying stream.</param>
31+
/// <param name="length">Total number of bytes to allow to be read from the stream. </param>
32+
public BoundedStream(Stream stream, long offset, long length)
33+
{
34+
BaseStream = stream;
35+
Adjust(offset, length);
36+
}
37+
38+
/// <summary>
39+
/// Gets underlying stream.
40+
/// </summary>
41+
public Stream BaseStream { get; private set; }
42+
43+
/// <summary>
44+
/// Adjust the stream bounds.
45+
/// </summary>
46+
/// <remarks>
47+
/// This method modifies <see cref="Stream.Position"/> property of the underlying stream.
48+
/// </remarks>
49+
/// <param name="offset">The offset in the underlying stream.</param>
50+
/// <param name="length">Total number of bytes to allow to be read from the stream. </param>
51+
/// <exception cref="ArgumentOutOfRangeException"><paramref name="length"/> is larger than the remaining length of the underlying stream; or <paramref name="offset"/> if greater than the length of the underlying stream.</exception>
52+
public void Adjust(long offset, long length)
53+
{
54+
ThrowIfGreaterThan((ulong)offset, (ulong)BaseStream.Length, nameof(offset));
55+
ThrowIfGreaterThan((ulong)length, (ulong)(BaseStream.Length - offset), nameof(length));
56+
this._length = length;
57+
this._offset = offset;
58+
BaseStream.Position = offset;
59+
}
60+
61+
/// <summary>
62+
/// Gets a value indicating whether the current stream supports reading.
63+
/// </summary>
64+
/// <value><see langword="true"/> if the stream supports reading; otherwise, <see langword="false"/>.</value>
65+
public override bool CanRead => BaseStream.CanRead;
66+
67+
/// <summary>
68+
/// Gets a value indicating whether the current stream supports seeking.
69+
/// </summary>
70+
/// <value><see langword="true"/> if the stream supports seeking; otherwise, <see langword="false"/>.</value>
71+
public override bool CanSeek => BaseStream.CanSeek;
72+
73+
/// <summary>
74+
/// Gets a value indicating whether the current stream supports writing.
75+
/// </summary>
76+
/// <value>Always <see langword="false"/>.</value>
77+
public override bool CanWrite => false;
78+
79+
/// <inheritdoc/>
80+
public override long Length => _length;
81+
82+
/// <inheritdoc/>
83+
public override long Position
84+
{
85+
get => BaseStream.Position - _offset;
86+
set
87+
{
88+
ThrowIfGreaterThan(value, _length, nameof(value));
89+
BaseStream.Position = _offset + value;
90+
}
91+
}
92+
93+
private long RemainingBytes => _length - Position;
94+
95+
/// <inheritdoc/>
96+
public override void Flush() => BaseStream.Flush();
97+
98+
/// <inheritdoc/>
99+
public override Task FlushAsync(CancellationToken token = default) => BaseStream.FlushAsync(token);
100+
101+
/// <inheritdoc/>
102+
public override bool CanTimeout => BaseStream.CanTimeout;
103+
104+
/// <inheritdoc/>
105+
public override int ReadByte()
106+
=> Position < _length ? BaseStream.ReadByte() : -1;
107+
108+
/// <inheritdoc/>
109+
public override void WriteByte(byte value) => throw new NotSupportedException();
110+
111+
/// <inheritdoc/>
112+
public override int Read(byte[] buffer, int offset, int count)
113+
{
114+
return BaseStream.Read(buffer, offset, (int)Math.Min(count, RemainingBytes));
115+
}
116+
117+
/// <inheritdoc/>
118+
public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state)
119+
{
120+
count = (int)Math.Min(count, RemainingBytes);
121+
return BaseStream.BeginRead(buffer, offset, count, callback, state);
122+
}
123+
124+
/// <inheritdoc/>
125+
public override int EndRead(IAsyncResult asyncResult) => BaseStream.EndRead(asyncResult);
126+
127+
/// <inheritdoc/>
128+
public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken token = default)
129+
=> BaseStream.ReadAsync(buffer, offset, (int)Math.Min(count, RemainingBytes), token);
130+
131+
/// <inheritdoc/>
132+
public override long Seek(long offset, SeekOrigin origin)
133+
{
134+
var newPosition = origin switch
135+
{
136+
SeekOrigin.Begin => offset,
137+
SeekOrigin.Current => Position + offset,
138+
SeekOrigin.End => _length + offset,
139+
_ => throw new ArgumentOutOfRangeException(nameof(origin))
140+
};
141+
142+
if (newPosition < 0L)
143+
throw new IOException();
144+
145+
ThrowIfGreaterThan(newPosition, _length, nameof(offset));
146+
147+
Position = newPosition;
148+
return newPosition;
149+
}
150+
151+
/// <inheritdoc/>
152+
public override void SetLength(long value)
153+
{
154+
ThrowIfGreaterThan((ulong)value, (ulong)(BaseStream.Length - BaseStream.Position), nameof(value));
155+
_length = value;
156+
}
157+
158+
/// <inheritdoc/>
159+
public override void Write(byte[] buffer, int offset, int count) => throw new NotSupportedException();
160+
161+
/// <inheritdoc/>
162+
public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken token = default) => Task.FromException(new NotSupportedException());
163+
164+
/// <inheritdoc/>
165+
public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state)
166+
=> throw new NotSupportedException();
167+
168+
/// <inheritdoc/>
169+
public override void EndWrite(IAsyncResult asyncResult) => throw new InvalidOperationException();
170+
171+
/// <inheritdoc/>
172+
public override int ReadTimeout
173+
{
174+
get => BaseStream.ReadTimeout;
175+
set => BaseStream.ReadTimeout = value;
176+
}
177+
178+
/// <inheritdoc/>
179+
public override int WriteTimeout
180+
{
181+
get => BaseStream.WriteTimeout;
182+
set => BaseStream.WriteTimeout = value;
183+
}
184+
185+
/// <inheritdoc/>
186+
protected override void Dispose(bool disposing)
187+
{
188+
base.Dispose(disposing);
189+
BaseStream.Dispose();
190+
}
191+
192+
private static void ThrowIfGreaterThan<T>(T value, T other, string? paramName = null) where T : IComparable<T>
193+
{
194+
if (value.CompareTo(other) > 0)
195+
{
196+
throw new ArgumentOutOfRangeException(paramName, value, $"{paramName} ('{value}')must be less than or equal to '{other}'");
197+
}
198+
}
199+
}
200+
}

test/AlibabaCloud.OSS.V2.IntegrationTests/ClientMiscTest.cs

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
using System.Text;
22
using AlibabaCloud.OSS.V2.Models;
3+
using AlibabaCloud.OSS.V2.IO;
34

45
namespace AlibabaCloud.OSS.V2.IntegrationTests;
56

@@ -1005,4 +1006,106 @@ public async Task TestQueryWithSpecialChar()
10051006
var got1 = await reader1.ReadToEndAsync();
10061007
Assert.Equal(content, got1);
10071008
}
1009+
1010+
[Fact]
1011+
public async Task TestUploadPartFromFile()
1012+
{
1013+
var client = Utils.GetDefaultClient();
1014+
1015+
//default
1016+
var bucketName = Utils.RandomBucketName(BucketNamePrefix);
1017+
1018+
var result = await client.PutBucketAsync(new()
1019+
{
1020+
Bucket = bucketName
1021+
});
1022+
1023+
Assert.NotNull(result);
1024+
Assert.Equal(200, result.StatusCode);
1025+
Assert.NotNull(result.RequestId);
1026+
1027+
var partSize = 120 * 1024;
1028+
var filepath = Utils.RandomFilePath(RootPath);
1029+
var saveFilepath = Utils.RandomFilePath(RootPath);
1030+
Utils.PrepareSampleFile(filepath, 512);
1031+
Assert.True(File.Exists(filepath));
1032+
1033+
// init
1034+
var objectName = Utils.RandomObjectName();
1035+
var initResult = await client.InitiateMultipartUploadAsync(new()
1036+
{
1037+
Bucket = bucketName,
1038+
Key = objectName
1039+
}
1040+
);
1041+
Assert.NotNull(initResult);
1042+
Assert.Equal(200, initResult.StatusCode);
1043+
Assert.NotNull(initResult.RequestId);
1044+
Assert.Equal("url", initResult.EncodingType);
1045+
1046+
// upload
1047+
using var file = File.OpenRead(filepath);
1048+
var fileSize = file.Length;
1049+
long partNumber = 1;
1050+
for (int offset = 0; offset < fileSize; offset += partSize)
1051+
{
1052+
var size = (long)Math.Min(partSize, fileSize - offset);
1053+
var upResult = await client.UploadPartAsync(new()
1054+
{
1055+
Bucket = bucketName,
1056+
Key = objectName,
1057+
PartNumber = partNumber,
1058+
UploadId = initResult.UploadId,
1059+
Body = new BoundedStream(file, (long)offset, size)
1060+
});
1061+
1062+
partNumber++;
1063+
1064+
Assert.NotNull(upResult);
1065+
Assert.Equal(200, upResult.StatusCode);
1066+
Assert.NotNull(upResult.RequestId);
1067+
}
1068+
1069+
// complete
1070+
var cmResult = await client.CompleteMultipartUploadAsync(new()
1071+
{
1072+
Bucket = bucketName,
1073+
Key = objectName,
1074+
UploadId = initResult.UploadId,
1075+
CompleteAll = "yes"
1076+
});
1077+
Assert.NotNull(cmResult);
1078+
Assert.Equal(200, cmResult.StatusCode);
1079+
Assert.NotNull(cmResult.RequestId);
1080+
Assert.Equal("url", cmResult.EncodingType);
1081+
1082+
// get object
1083+
var getObjectResult = await client.GetObjectAsync(
1084+
new()
1085+
{
1086+
Bucket = bucketName,
1087+
Key = objectName
1088+
}
1089+
);
1090+
Assert.NotNull(getObjectResult);
1091+
Assert.Equal(200, getObjectResult.StatusCode);
1092+
Assert.Equal((long)fileSize, getObjectResult.ContentLength);
1093+
Assert.Equal("Multipart", getObjectResult.ObjectType);
1094+
Assert.Null(getObjectResult.TaggingCount);
1095+
Assert.Null(getObjectResult.ContentMd5);
1096+
Assert.NotNull(getObjectResult.StorageClass);
1097+
Assert.NotNull(getObjectResult.Metadata);
1098+
Assert.Empty(getObjectResult.Metadata);
1099+
Assert.NotNull(getObjectResult.Body);
1100+
Assert.False(getObjectResult.Body.CanSeek);
1101+
1102+
using var saveStream = File.OpenWrite(saveFilepath);
1103+
await getObjectResult.Body.CopyToAsync(saveStream);
1104+
saveStream.Close();
1105+
1106+
var srcMd5 = Utils.ComputeContentMd5(filepath);
1107+
var dstMd5 = Utils.ComputeContentMd5(saveFilepath);
1108+
Assert.NotEmpty(srcMd5);
1109+
Assert.Equal(srcMd5, dstMd5);
1110+
}
10081111
}

0 commit comments

Comments
 (0)