/*** * ==++== * * Copyright (c) Microsoft Corporation. All rights reserved. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * * ==--== * =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ * * Asynchronous I/O: stream buffer implementation details * * We're going to some lengths to avoid exporting C++ class member functions and implementation details across * module boundaries, and the factoring requires that we keep the implementation details away from the main header * files. The supporting functions, which are in this file, use C-like signatures to avoid as many issues as * possible. * * For the latest on this and related APIs, please see http://casablanca.codeplex.com. * * =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- ****/ #include "stdafx.h" #include "cpprest/details/fileio.h" #include "robuffer.h" using namespace ::Windows::Foundation; using namespace ::Windows::Storage; using namespace ::Windows::Storage::Streams; using namespace ::Windows::Networking; using namespace ::Windows::Networking::Sockets; namespace Concurrency { namespace streams { namespace details { /// /// The public parts of the file information record contain only what is implementation- /// independent. The actual allocated record is larger and has details that the implementation /// require in order to function. /// struct _file_info_impl : _file_info { _file_info_impl(Streams::IRandomAccessStream^ stream, std::ios_base::openmode mode) : m_stream(stream), m_writer(nullptr), _file_info(mode, 0) { m_pendingWrites = pplx::task_from_result(); } _file_info_impl(Streams::IRandomAccessStream^ stream, std::ios_base::openmode mode, size_t buffer_size) : m_stream(stream), m_writer(nullptr), _file_info(mode, buffer_size) { m_pendingWrites = pplx::task_from_result(); } Streams::IRandomAccessStream^ m_stream; Streams::IDataWriter^ m_writer; pplx::task m_pendingWrites; }; }}} using namespace Concurrency::streams::details; #pragma warning(push) #pragma warning(disable : 4100) /// /// Translate from C++ STL file open modes to Win32 flags. /// /// The C++ file open mode /// The C++ file open protection /// A pointer to a DWORD that will hold the desired access flags /// A pointer to a DWORD that will hold the creation disposition /// A pointer to a DWORD that will hold the share mode void _get_create_flags(std::ios_base::openmode mode, int prot, FileAccessMode &acc_mode, CreationCollisionOption &options) { options = CreationCollisionOption::OpenIfExists; acc_mode = FileAccessMode::ReadWrite; if ( (mode & std::ios_base::in) && !(mode & std::ios_base::out) ) acc_mode = FileAccessMode::Read; if ( mode & std::ios_base::trunc ) options = CreationCollisionOption::ReplaceExisting; } /// /// Perform post-CreateFile processing. /// /// The Win32 file handle /// The callback interface pointer /// The C++ file open mode /// The error code if there was an error in file creation. void _finish_create(Streams::IRandomAccessStream^ stream, _In_ _filestream_callback *callback, std::ios_base::openmode mode, int prot) { _file_info_impl *info = nullptr; info = new _file_info_impl(stream, mode, 512); // Seek to end if it's in appending write mode if ((mode & std::ios_base::out) && (mode & std::ios_base::app || mode & std::ios_base::ate )) { _seekwrpos_fsb(info, static_cast(stream->Size), 1); } callback->on_opened(info); } /// /// Create a streambuf instance to represent a WinRT file. /// /// A pointer to the callback interface to invoke when the file has been opened. /// The file object /// A creation mode for the stream buffer /// True if the opening operation could be initiated, false otherwise. /// /// True does not signal that the file will eventually be successfully opened, just that the process was started. /// This is only available for WinRT. /// bool __cdecl _open_fsb_stf_str(_In_ Concurrency::streams::details::_filestream_callback *callback, ::Windows::Storage::StorageFile^ file, std::ios_base::openmode mode, int prot) { _ASSERTE(callback != nullptr); _ASSERTE(file != nullptr); CreationCollisionOption options; FileAccessMode acc_mode; _get_create_flags(mode, prot, acc_mode, options); pplx::create_task(file->OpenAsync(acc_mode)).then( [=](pplx::task sop) { try { _finish_create(sop.get(), callback, mode, prot); } catch(Platform::Exception^ exc) { callback->on_error(std::make_exception_ptr(utility::details::create_system_error(exc->HResult))); } }); return true; } bool __cdecl _sync_fsb_winrt(_In_ Concurrency::streams::details::_file_info *info, _In_opt_ Concurrency::streams::details::_filestream_callback *callback) { _ASSERTE(info != nullptr); _file_info_impl *fInfo = static_cast<_file_info_impl *>(info); pplx::extensibility::scoped_recursive_lock_t lck(fInfo->m_lock); if ( fInfo->m_stream == nullptr || fInfo->m_writer == nullptr || !fInfo->m_stream->CanWrite) return false; // take a snapshot of current writer, since writer can be replaced during flush auto writer = fInfo->m_writer; // Flush operation will not begin until all previous writes (StoreAsync) finished, thus it could avoid race. fInfo->m_pendingWrites = fInfo->m_pendingWrites.then([=] { return writer->StoreAsync(); }).then([=](unsigned int) { fInfo->m_buffill = 0; return writer->FlushAsync(); }).then([=] (pplx::task result) { // Rethrow exception if no callback attached. if (callback == nullptr) result.wait(); else { try { result.wait(); callback->on_completed(0); } catch (Platform::Exception^ exc) { callback->on_error(std::make_exception_ptr(utility::details::create_system_error(exc->HResult))); } } }); return true; } /// /// Close a file stream buffer. /// /// The file info record of the file /// A pointer to the callback interface to invoke when the file has been opened. /// True if the closing operation could be initiated, false otherwise. /// /// True does not signal that the file will eventually be successfully closed, just that the process was started. /// bool __cdecl _close_fsb_nolock(_In_ _file_info **info, _In_ Concurrency::streams::details::_filestream_callback *callback) { _ASSERTE(callback != nullptr); _ASSERTE(info != nullptr); _ASSERTE(*info != nullptr); _file_info_impl *fInfo = static_cast<_file_info_impl *>(*info); *info = nullptr; auto stream = fInfo->m_stream; if (fInfo->m_stream->CanWrite) { _sync_fsb_winrt(fInfo, nullptr); fInfo->m_pendingWrites.then([=] (pplx::task t) { try { // The lock fInfo->m_lock must not be held at this point delete fInfo; t.wait(); callback->on_closed(); } catch (Platform::Exception^ exc) { callback->on_error(std::make_exception_ptr(utility::details::create_system_error(exc->HResult))); } }); } else { // The lock fInfo->m_lock must not be held at this point delete fInfo; callback->on_closed(); } return true; } bool __cdecl _close_fsb(_In_ _file_info **info, _In_ Concurrency::streams::details::_filestream_callback *callback) { return _close_fsb_nolock(info, callback); } /// /// Initiate an asynchronous (overlapped) read from the file stream. /// /// The file info record of the file /// A pointer to the callback interface to invoke when the write request is completed. /// A pointer to a buffer where the data should be placed /// The size (in bytes) of the buffer /// The offset in the file to read from /// 0 if the read request is still outstanding, -1 if the request failed, otherwise the size of the data read into the buffer size_t __cdecl _read_file_async(_In_ Concurrency::streams::details::_file_info_impl *fInfo, _In_ Concurrency::streams::details::_filestream_callback *callback, _Out_writes_ (count) void *ptr, _In_ size_t count, size_t offset) { if ( fInfo->m_stream == nullptr ) { if ( callback != nullptr ) { // I don't know of a better error code, so this will have to do. callback->on_error(std::make_exception_ptr(utility::details::create_system_error(ERROR_INVALID_ADDRESS))); } return 0; } auto reader = ref new Streams::DataReader(fInfo->m_stream->GetInputStreamAt(offset)); pplx::create_task(reader->LoadAsync(static_cast(count))).then( [=](pplx::task result) { try { auto read = result.get(); if (read > 0) { reader->ReadBytes(Platform::ArrayReference(static_cast(ptr), read)); } callback->on_completed(read); } catch (Platform::Exception^ exc) { callback->on_error(std::make_exception_ptr(utility::details::create_system_error(exc->HResult))); } }); return 0; } template class _filestream_callback_fill_buffer : public _filestream_callback { public: _filestream_callback_fill_buffer(_In_ _file_info *info, const Func &func) : m_func(func), m_info(info) { } virtual void on_completed(size_t result) { m_func(result); delete this; } private: _file_info *m_info; Func m_func; }; template _filestream_callback_fill_buffer *create_callback(_In_ _file_info *info, const Func &func) { return new _filestream_callback_fill_buffer(info, func); } size_t _fill_buffer_fsb(_In_ _file_info_impl *fInfo, _In_ _filestream_callback *callback, size_t count, size_t char_size) { msl::safeint3::SafeInt safeCount = count; if ( fInfo->m_buffer == nullptr || safeCount > fInfo->m_bufsize ) { if ( fInfo->m_buffer != nullptr ) delete fInfo->m_buffer; fInfo->m_bufsize = safeCount.Max(fInfo->m_buffer_size); fInfo->m_buffer = new char[fInfo->m_bufsize*char_size]; fInfo->m_bufoff = fInfo->m_rdpos; auto cb = create_callback(fInfo, [=] (size_t result) { { pplx::extensibility::scoped_recursive_lock_t lck(fInfo->m_lock); fInfo->m_buffill = result / char_size; } callback->on_completed(result); }); auto read = _read_file_async(fInfo, cb, (uint8_t *)fInfo->m_buffer, fInfo->m_bufsize*char_size, fInfo->m_rdpos*char_size); switch (read) { case 0: // pending return read; case (-1): // error delete cb; return read; default: // operation is complete. The pattern of returning synchronously // has the expectation that we duplicate the callback code here... // Do the expedient thing for now. cb->on_completed(read); // return pending return 0; }; } // First, we need to understand how far into the buffer we have already read // and how much remains. size_t bufpos = fInfo->m_rdpos - fInfo->m_bufoff; size_t bufrem = fInfo->m_buffill - bufpos; // We have four different scenarios: // 1. The read position is before the start of the buffer, in which case we will just reuse the buffer. // 2. The read position is in the middle of the buffer, and we need to read some more. // 3. The read position is beyond the end of the buffer. Do as in #1. // 4. We have everything we need. if ( (fInfo->m_rdpos < fInfo->m_bufoff) || (fInfo->m_rdpos >= (fInfo->m_bufoff+fInfo->m_buffill)) ) { // Reuse the existing buffer. fInfo->m_bufoff = fInfo->m_rdpos; auto cb = create_callback(fInfo, [=] (size_t result) { { pplx::extensibility::scoped_recursive_lock_t lck(fInfo->m_lock); fInfo->m_buffill = result / char_size; } callback->on_completed(bufrem*char_size+result); }); auto read = _read_file_async(fInfo, cb, (uint8_t*)fInfo->m_buffer, fInfo->m_bufsize*char_size, fInfo->m_rdpos*char_size); switch (read) { case 0: // pending return read; case (-1): // error delete cb; return read; default: // operation is complete. The pattern of returning synchronously // has the expectation that we duplicate the callback code here... // Do the expedient thing for now. cb->on_completed(read); // return pending return 0; }; } else if ( bufrem < count ) { fInfo->m_bufsize = safeCount.Max(fInfo->m_buffer_size); // Then, we allocate a new buffer. char *newbuf = new char[fInfo->m_bufsize*char_size]; // Then, we copy the unread part to the new buffer and delete the old buffer if ( bufrem > 0 ) memcpy(newbuf, fInfo->m_buffer+bufpos*char_size, bufrem*char_size); delete fInfo->m_buffer; fInfo->m_buffer = newbuf; // Then, we read the remainder of the count into the new buffer fInfo->m_bufoff = fInfo->m_rdpos; auto cb = create_callback(fInfo, [=] (size_t result) { { pplx::extensibility::scoped_recursive_lock_t lck(fInfo->m_lock); fInfo->m_buffill = result / char_size; } callback->on_completed(bufrem*char_size+result); }); auto read = _read_file_async(fInfo, cb, (uint8_t*)fInfo->m_buffer+bufrem*char_size, (fInfo->m_bufsize-bufrem)*char_size, (fInfo->m_rdpos+bufrem)*char_size); switch (read) { case 0: // pending return read; case (-1): // error delete cb; return read; default: // operation is complete. The pattern of returning synchronously // has the expectation that we duplicate the callback code here... // Do the expedient thing for now. cb->on_completed(read); // return pending return 0; }; } else { // If we are here, it means that we didn't need to read, we already have enough data in the buffer return count*char_size; } } /// /// Read data from a file stream into a buffer /// /// The file info record of the file /// A pointer to the callback interface to invoke when the write request is completed. /// A pointer to a buffer where the data should be placed /// The size (in characters) of the buffer /// 0 if the read request is still outstanding, -1 if the request failed, otherwise the size of the data read into the buffer size_t __cdecl _getn_fsb(_In_ Concurrency::streams::details::_file_info *info, _In_ Concurrency::streams::details::_filestream_callback *callback, _Out_writes_ (count) void *ptr, _In_ size_t count, size_t char_size) { _ASSERTE(callback != nullptr); _ASSERTE(info != nullptr); _ASSERTE(count > 0); _file_info_impl *fInfo = static_cast<_file_info_impl *>(info); pplx::extensibility::scoped_recursive_lock_t lck(info->m_lock); if ( fInfo->m_buffer_size > 0 ) { auto cb = create_callback(fInfo, [=] (size_t read) { auto sz = count*char_size; auto copy = (read < sz) ? read : sz; auto bufoff = fInfo->m_rdpos - fInfo->m_bufoff; memcpy(ptr, fInfo->m_buffer+bufoff*char_size, copy); fInfo->m_atend = copy < sz; callback->on_completed(copy); }); size_t read = _fill_buffer_fsb(fInfo, cb, count, char_size); if ( read > 0 ) { auto sz = count*char_size; auto copy = (read < sz) ? read : sz; auto bufoff = fInfo->m_rdpos - fInfo->m_bufoff; memcpy(ptr, fInfo->m_buffer+bufoff*char_size, copy); fInfo->m_atend = copy < sz; return copy; } return (size_t)read; } else { return _read_file_async(fInfo, callback, ptr, count*char_size, fInfo->m_rdpos*char_size); } } /// /// Write data from a buffer into the file stream. /// /// The file info record of the file /// A pointer to the callback interface to invoke when the write request is completed. /// A pointer to a buffer where the data should be placed /// The size (in characters) of the buffer /// 0 if the write request is still outstanding, -1 if the request failed, otherwise the size of the data read into the buffer size_t __cdecl _putn_fsb(_In_ Concurrency::streams::details::_file_info *info, _In_ Concurrency::streams::details::_filestream_callback *callback, const void *ptr, size_t count, size_t char_size) { _ASSERTE(callback != nullptr); _ASSERTE(info != nullptr); _ASSERTE(count > 0); _file_info_impl *fInfo = static_cast<_file_info_impl *>(info); pplx::extensibility::scoped_recursive_lock_t lck(fInfo->m_lock); if ( fInfo->m_stream == nullptr ) return static_cast(-1); // To preserve the async write order, we have to move the write head before read. if (fInfo->m_wrpos != static_cast(-1)) fInfo->m_wrpos += count; msl::safeint3::SafeInt safeWriteSize = count; safeWriteSize *= char_size; // In most of the time, we preserve the writer so that it would have better performance. // However, after uer call seek, we will despose old writer. By doing so, users could // write to new writer in new position while the old writer is still flushing data into stream. if (fInfo->m_writer == nullptr) { fInfo->m_writer = ref new Streams::DataWriter(fInfo->m_stream); fInfo->m_buffill = 0; } // It keeps tracking the number of bytes written into m_writer buffer. fInfo->m_buffill += count; // ArrayReference here is for avoiding data copy. fInfo->m_writer->WriteBytes(Platform::ArrayReference(const_cast(static_cast(ptr)), safeWriteSize)); // Flush data from m_writer buffer into stream , if the buffer is full if (fInfo->m_buffill >= fInfo->m_buffer_size) { fInfo->m_buffill = 0; fInfo->m_pendingWrites = fInfo->m_pendingWrites.then([=] { return fInfo->m_writer->StoreAsync(); }).then([=] (pplx::task result) { try { result.wait(); callback->on_completed(safeWriteSize); } catch (Platform::Exception^ exc) { callback->on_error(std::make_exception_ptr(utility::details::create_system_error(exc->HResult))); } }); return 0; } else return safeWriteSize; } /// /// Flush all buffered data to the underlying file. /// /// The file info record of the file /// A pointer to the callback interface to invoke when the write request is completed. /// True if the request was initiated bool __cdecl _sync_fsb(_In_ Concurrency::streams::details::_file_info *info, _In_ Concurrency::streams::details::_filestream_callback *callback) { return _sync_fsb_winrt(info, callback); } /// /// Adjust the internal buffers and pointers when the application seeks to a new read location in the stream. /// /// The file info record of the file /// The new position (offset from the start) in the file stream /// New file position or -1 if error size_t __cdecl _seekrdpos_fsb(_In_ Concurrency::streams::details::_file_info *info, size_t pos, size_t char_size) { _ASSERTE(info != nullptr); _file_info_impl *fInfo = static_cast<_file_info_impl *>(info); pplx::extensibility::scoped_recursive_lock_t lck(info->m_lock); if ( fInfo->m_stream == nullptr) return static_cast(-1);; if ( pos < fInfo->m_bufoff || pos > (fInfo->m_bufoff+fInfo->m_buffill) ) { delete fInfo->m_buffer; fInfo->m_buffer = nullptr; fInfo->m_bufoff = fInfo->m_buffill = fInfo->m_bufsize = 0; } fInfo->m_rdpos = pos; return fInfo->m_rdpos; } /// /// Adjust the internal buffers and pointers when the application seeks to a new read location in the stream. /// /// The file info record of the file /// The new position (offset from the end of the stream) in the file stream /// The size of the character type used for this stream /// New file position or -1 if error _ASYNCRTIMP size_t __cdecl _seekrdtoend_fsb(_In_ Concurrency::streams::details::_file_info *info, int64_t offset, size_t char_size) { _ASSERTE(info != nullptr); _file_info_impl *fInfo = static_cast<_file_info_impl *>(info); return _seekrdpos_fsb(info, static_cast(fInfo->m_stream->Size / char_size + offset), char_size); } utility::size64_t __cdecl _get_size(_In_ concurrency::streams::details::_file_info *info, size_t char_size) { _ASSERTE(info != nullptr); _file_info_impl *fInfo = static_cast<_file_info_impl *>(info); pplx::extensibility::scoped_recursive_lock_t lck(info->m_lock); if ( fInfo->m_stream == nullptr) return 0; return utility::size64_t(fInfo->m_stream->Size/char_size); } /// /// Adjust the internal buffers and pointers when the application seeks to a new write location in the stream. /// /// The file info record of the file /// The new position (offset from the start) in the file stream /// New file position or -1 if error size_t __cdecl _seekwrpos_fsb(_In_ Concurrency::streams::details::_file_info *info, size_t pos, size_t char_size) { _ASSERTE(info != nullptr); _file_info_impl *fInfo = static_cast<_file_info_impl *>(info); pplx::extensibility::scoped_recursive_lock_t lck(info->m_lock); if ( fInfo->m_stream == nullptr) return static_cast(-1); fInfo->m_wrpos = pos; // m_buffill keeps number of chars written into the m_writer buffer. // We need to flush it into stream before seek the write head of the stream if (fInfo->m_buffill > 0) _sync_fsb_winrt(fInfo, nullptr); // Moving write head should follow the flush operation. is_done test is for perf optimization. if (fInfo->m_pendingWrites.is_done()) fInfo->m_stream->Seek(static_cast(pos) * char_size); else { auto lastWriter = fInfo->m_writer; fInfo->m_writer = nullptr; fInfo->m_pendingWrites = fInfo->m_pendingWrites.then([=] { // Detach stream could avoid stream destruction after writer get destructed. lastWriter->DetachStream(); fInfo->m_stream->Seek(static_cast(pos) * char_size); }); } return fInfo->m_wrpos; } namespace Concurrency { namespace streams { namespace details { /// /// This class acts as a bridge between WinRT input streams and Casablanca asynchronous streams. /// ref class IRandomAccessStream_bridge sealed : public Windows::Storage::Streams::IRandomAccessStream { public: virtual property bool CanRead { bool get() { return m_buffer.can_read(); } } virtual property bool CanWrite { bool get() { return m_buffer.can_write(); } } virtual property uint64_t Position { uint64_t get() { return m_position; } } virtual property uint64_t Size { uint64_t get() { if (!m_buffer.has_size()) return m_remembered_size; return m_buffer.size(); } void set(uint64_t sz) { if (!m_buffer.has_size() || !m_buffer.can_write()) m_remembered_size = sz; else m_buffer.seekoff(basic_streambuf::pos_type(sz), std::ios_base::beg, std::ios_base::out); } } virtual Windows::Storage::Streams::IRandomAccessStream^ CloneStream() { return ref new IRandomAccessStream_bridge(m_buffer); } virtual Windows::Storage::Streams::IInputStream^ GetInputStreamAt(uint64_t position) { if ( !m_buffer.can_read() ) return nullptr; concurrency::streams::streambuf::pos_type pos = position; if ( m_buffer.can_seek() || pos == m_buffer.getpos(std::ios_base::in) ) { return ref new IRandomAccessStream_bridge(m_buffer,position); } return nullptr; } virtual Windows::Storage::Streams::IOutputStream^ GetOutputStreamAt(uint64_t position) { if ( !m_buffer.can_write() ) return nullptr; concurrency::streams::streambuf::pos_type pos = position; if ( m_buffer.can_seek() || pos == m_buffer.getpos(std::ios_base::out) ) { return ref new IRandomAccessStream_bridge(m_buffer,position); } return nullptr; }; virtual void Seek(uint64_t position) { if (!m_buffer.can_seek()) throw ref new Platform::InvalidArgumentException(L"underlying buffer cannot seek"); m_position = position; m_buffer.seekpos(concurrency::streams::streambuf::pos_type(m_position),std::ios_base::in); m_buffer.seekpos(concurrency::streams::streambuf::pos_type(m_position),std::ios_base::out); } virtual Windows::Foundation::IAsyncOperationWithProgress^ WriteAsync(Windows::Storage::Streams::IBuffer^ buffer); virtual Windows::Foundation::IAsyncOperationWithProgress<::Windows::Storage::Streams::IBuffer^, unsigned int>^ ReadAsync(::Windows::Storage::Streams::IBuffer^ buffer, unsigned int count, Windows::Storage::Streams::InputStreamOptions options); virtual Windows::Foundation::IAsyncOperation^ FlushAsync(); virtual ~IRandomAccessStream_bridge() { } internal: IRandomAccessStream_bridge(const concurrency::streams::streambuf &buffer) : m_buffer(buffer), m_remembered_size(0), m_position(0) { } IRandomAccessStream_bridge(const concurrency::streams::streambuf &buffer, concurrency::streams::streambuf::pos_type position) : m_buffer(buffer), m_remembered_size(0), m_position(position) { } private: uint64_t m_remembered_size; concurrency::streams::streambuf::pos_type m_position; concurrency::streams::streambuf m_buffer; }; struct _alloc_protector { _alloc_protector(concurrency::streams::streambuf& buffer) : m_buffer(buffer), m_size(0) { } ~_alloc_protector() { m_buffer.commit(m_size); } size_t m_size; private: _alloc_protector& operator=(const _alloc_protector&); concurrency::streams::streambuf& m_buffer; }; struct _acquire_protector { _acquire_protector(concurrency::streams::streambuf& buffer, uint8_t* ptr) : m_buffer(buffer), m_ptr(ptr), m_size(0) { } ~_acquire_protector() { m_buffer.release(m_ptr, m_size); } size_t m_size; private: _acquire_protector& operator=(const _acquire_protector&); uint8_t* m_ptr; concurrency::streams::streambuf& m_buffer; }; // Rather than using ComPtr, which is somewhat complex, a simple RAII class // to make sure that Release() is called is useful here. struct _IUnknown_protector { _IUnknown_protector(IUnknown* unk_ptr) : m_unknown(unk_ptr) {} ~_IUnknown_protector() { if (m_unknown != nullptr) m_unknown->Release(); } private: IUnknown* m_unknown; }; Windows::Foundation::IAsyncOperationWithProgress<::Windows::Storage::Streams::IBuffer^, unsigned int>^ IRandomAccessStream_bridge::ReadAsync(::Windows::Storage::Streams::IBuffer^ buffer, unsigned int count, Windows::Storage::Streams::InputStreamOptions options) { if (!m_buffer.can_read()) { return pplx::create_async([buffer](pplx::progress_reporter reporter) { return buffer; }); } if (buffer->Capacity < count) return pplx::create_async([buffer](pplx::progress_reporter reporter) { return buffer; }); m_buffer.seekpos(concurrency::streams::streambuf::pos_type(m_position),std::ios_base::in); concurrency::streams::streambuf streambuf = m_buffer; return pplx::create_async( [streambuf,buffer,options,count](pplx::progress_reporter reporter) { auto sbuf = streambuf; auto local_buf = ref new ::Platform::Array(count); uint8_t* ptr = nullptr; size_t acquired_size = 0; if ( sbuf.acquire(ptr, acquired_size) && acquired_size >= count ) { _acquire_protector prot(sbuf, ptr); IUnknown* pUnk = reinterpret_cast(buffer); ::Windows::Storage::Streams::IBufferByteAccess* pBufferByteAccess = nullptr; HRESULT hr = pUnk->QueryInterface(IID_PPV_ARGS(&pBufferByteAccess)); __abi_ThrowIfFailed(hr); _IUnknown_protector unkprot(pBufferByteAccess); byte* buffer_data = nullptr; hr = pBufferByteAccess->Buffer(&buffer_data); __abi_ThrowIfFailed(hr); memcpy(buffer_data,ptr,count); prot.m_size = count; buffer->Length = count; return pplx::task_from_result(buffer); } else { if ( acquired_size > 0 ) { sbuf.release(ptr, 0); } IUnknown* pUnk = reinterpret_cast(buffer); ::Windows::Storage::Streams::IBufferByteAccess* pBufferByteAccess = nullptr; HRESULT hr = pUnk->QueryInterface(IID_PPV_ARGS(&pBufferByteAccess)); __abi_ThrowIfFailed(hr); _IUnknown_protector unkprot(pBufferByteAccess); byte* buffer_data = nullptr; hr = pBufferByteAccess->Buffer(&buffer_data); __abi_ThrowIfFailed(hr); pBufferByteAccess->AddRef(); return sbuf.getn(buffer_data,count).then( [buffer,pBufferByteAccess,count](pplx::task written) { _IUnknown_protector unkprot(pBufferByteAccess); buffer->Length = (unsigned int)written.get(); return pplx::task_from_result(buffer); }); } }); } Windows::Foundation::IAsyncOperationWithProgress^ IRandomAccessStream_bridge::WriteAsync(Windows::Storage::Streams::IBuffer^ buffer) { if (!m_buffer.can_write()) { return pplx::create_async([](pplx::progress_reporter reporter) { return 0U; }); } m_buffer.seekpos(concurrency::streams::streambuf::pos_type(m_position),std::ios_base::out); concurrency::streams::streambuf streambuf = m_buffer; return pplx::create_async( [buffer,streambuf](pplx::progress_reporter reporter) { auto size = buffer->Length; auto sbuf = streambuf; uint8_t* ptr = sbuf.alloc(size); if ( ptr != nullptr) { { _alloc_protector prot(sbuf); IUnknown* pUnk = reinterpret_cast(buffer); ::Windows::Storage::Streams::IBufferByteAccess* pBufferByteAccess = nullptr; HRESULT hr = pUnk->QueryInterface(IID_PPV_ARGS(&pBufferByteAccess)); __abi_ThrowIfFailed(hr); _IUnknown_protector unkprot(pBufferByteAccess); byte* buffer_data = nullptr; hr = pBufferByteAccess->Buffer(&buffer_data); __abi_ThrowIfFailed(hr); memcpy(ptr,buffer_data,size); prot.m_size = size; } return pplx::task_from_result((unsigned int)size); } else { IUnknown* pUnk = reinterpret_cast(buffer); ::Windows::Storage::Streams::IBufferByteAccess* pBufferByteAccess = nullptr; HRESULT hr = pUnk->QueryInterface(IID_PPV_ARGS(&pBufferByteAccess)); __abi_ThrowIfFailed(hr); _IUnknown_protector unkprot(pBufferByteAccess); byte* buffer_data = nullptr; hr = pBufferByteAccess->Buffer(&buffer_data); __abi_ThrowIfFailed(hr); pBufferByteAccess->AddRef(); return sbuf.putn_nocopy(buffer_data, size).then( [pBufferByteAccess](pplx::task size) { pBufferByteAccess->Release(); return (unsigned int)size.get(); }); } }); } Windows::Foundation::IAsyncOperation^ IRandomAccessStream_bridge::FlushAsync() { concurrency::streams::streambuf streambuf = m_buffer; return pplx::create_async([streambuf]() { if (!streambuf.can_write()) { return pplx::task_from_result(false); } auto sbuf = streambuf; return sbuf.sync().then([] { return pplx::task_from_result(true); }); }); } }}} // namespaces Windows::Storage::Streams::IInputStream^ Concurrency::streams::winrt_stream::create_input_stream(const concurrency::streams::streambuf &buffer) { return ref new ::Concurrency::streams::details::IRandomAccessStream_bridge(buffer,0); } Windows::Storage::Streams::IOutputStream^ Concurrency::streams::winrt_stream::create_output_stream(const concurrency::streams::streambuf &buffer) { return ref new Concurrency::streams::details::IRandomAccessStream_bridge(buffer,0); } Windows::Storage::Streams::IRandomAccessStream^ Concurrency::streams::winrt_stream::create_random_access_stream(const concurrency::streams::streambuf &buffer) { return ref new Concurrency::streams::details::IRandomAccessStream_bridge(buffer); } #pragma warning(pop)