/*** * ==++== * * Copyright (c) Microsoft Corporation. All rights reserved. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * * ==--== * =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+ * * Asynchronous I/O: stream buffer implementation details * * We're going to some lengths to avoid exporting C++ class member functions and implementation details across * module boundaries, and the factoring requires that we keep the implementation details away from the main header * files. The supporting functions, which are in this file, use C-like signatures to avoid as many issues as * possible. * * For the latest on this and related APIs, please see http://casablanca.codeplex.com. * * =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=- ****/ #include "stdafx.h" #include "cpprest/details/fileio.h" using namespace boost::asio; using namespace Concurrency::streams::details; namespace Concurrency { namespace streams { namespace details { /*** * ==++== * * Implementation details of the file stream buffer * * =-=-=- ****/ /// /// The public parts of the file information record contain only what is implementation- /// independent. The actual allocated record is larger and has details that the implementation /// require in order to function. /// struct _file_info_impl : _file_info { _file_info_impl(int handle, std::ios_base::openmode mode, bool buffer_reads) : _file_info(mode, 512), m_handle(handle), m_buffer_reads(buffer_reads), m_outstanding_writes(0) { } /// /// The file handle of the file /// int m_handle; bool m_buffer_reads; /// /// A list of callback waiting to be signalled that there are no outstanding writes. /// std::vector<_filestream_callback *> m_sync_waiters; std::atomic m_outstanding_writes; }; }}} /// /// Perform post-CreateFile processing. /// /// The Win32 file handle /// The callback interface pointer /// The C++ file open mode /// The error code if there was an error in file creation. bool _finish_create(int fh, _filestream_callback *callback, std::ios_base::openmode mode, int /* prot */) { if (fh != -1) { // Buffer reads internally if and only if we're just reading (not also writing) and // if the file is opened exclusively. If either is false, we're better off just // letting the OS do its buffering, even if it means that prompt reads won't // happen. bool buffer = (mode == std::ios_base::in); // seek to end if requested if (mode & std::ios_base::ate) { lseek(fh, 0, SEEK_END); } auto info = new _file_info_impl(fh, mode, buffer); if (mode & std::ios_base::app || mode & std::ios_base::ate) { info->m_wrpos = static_cast(-1); // Start at the end of the file. } callback->on_opened(info); return true; } else { callback->on_error(std::make_exception_ptr(utility::details::create_system_error(errno))); return false; } } int get_open_flags(std::ios_base::openmode mode) { int result = 0; if (mode & std::ios_base::in) { if (mode & std::ios_base::out) { result = O_RDWR; } else { result = O_RDONLY; } } else if (mode & std::ios_base::out) { result = O_WRONLY|O_CREAT; } if (mode & std::ios_base::app) { result |= O_APPEND; } if (mode & std::ios_base::trunc) { result |= O_TRUNC|O_CREAT; } return result; } /// /// Open a file and create a streambuf instance to represent it. /// /// A pointer to the callback interface to invoke when the file has been opened. /// The name of the file to open /// A creation mode for the stream buffer /// A file protection mode to use for the file stream /// True if the opening operation could be initiated, false otherwise. /// /// True does not signal that the file will eventually be successfully opened, just that the process was started. /// bool _open_fsb_str(_filestream_callback *callback, const char *filename, std::ios_base::openmode mode, int prot) { if ( callback == nullptr || filename == nullptr) return false; std::string name(filename); pplx::create_task([=]() -> void { int cmode = get_open_flags(mode); if(cmode==O_RDWR) { cmode |= O_CREAT; } int f = open(name.c_str(), cmode, 0600); _finish_create(f, callback, mode, prot); }); return true; } /// /// Close a file stream buffer. /// /// The file info record of the file /// A pointer to the callback interface to invoke when the file has been opened. /// True if the closing operation could be initiated, false otherwise. /// /// True does not signal that the file will eventually be successfully closed, just that the process was started. /// bool _close_fsb_nolock(_file_info **info, Concurrency::streams::details::_filestream_callback *callback) { if ( callback == nullptr ) return false; if ( info == nullptr || *info == nullptr ) return false; _file_info_impl *fInfo = static_cast<_file_info_impl *>(*info); if ( fInfo->m_handle == -1 ) return false; // Since closing a file may involve waiting for outstanding writes which can take some time // if the file is on a network share, the close action is done in a separate task, as // CloseHandle doesn't have I/O completion events. pplx::create_task([=] () -> void { bool result = false; { pplx::extensibility::scoped_recursive_lock_t lock(fInfo->m_lock); if ( fInfo->m_handle != -1 ) { result = close(fInfo->m_handle) != -1; } if ( fInfo->m_buffer != nullptr ) { delete[] fInfo->m_buffer; } } delete fInfo; if (result) { callback->on_closed(); } else { callback->on_error(std::make_exception_ptr(utility::details::create_system_error(errno))); } }); *info = nullptr; return true; } bool _close_fsb(_file_info **info, Concurrency::streams::details::_filestream_callback *callback) { if ( callback == nullptr ) return false; if ( info == nullptr || *info == nullptr ) return false; pplx::extensibility::scoped_recursive_lock_t lock((*info)->m_lock); return _close_fsb_nolock(info, callback); } /// /// Initiate an asynchronous (overlapped) write to the file stream. /// /// The file info record of the file /// A pointer to the callback interface to invoke when the write request is completed. /// A pointer to the data to write /// The size (in bytes) of the data /// 0 if the write request is still outstanding, -1 if the request failed, otherwise the size of the data written size_t _write_file_async(Concurrency::streams::details::_file_info_impl *fInfo, Concurrency::streams::details::_filestream_callback *callback, const void *ptr, size_t count, size_t position) { ++fInfo->m_outstanding_writes; pplx::create_task([=]() -> void { off_t abs_position; bool must_restore_pos; off_t orig_pos; if( position == static_cast(-1) ) { orig_pos = lseek(fInfo->m_handle, 0, SEEK_CUR); abs_position = lseek(fInfo->m_handle, 0, SEEK_END); must_restore_pos = true; } else { abs_position = position; orig_pos = 0; must_restore_pos = false; } auto bytes_written = pwrite(fInfo->m_handle, ptr, count, abs_position); if (bytes_written == -1) { callback->on_error(std::make_exception_ptr(utility::details::create_system_error(errno))); } if(must_restore_pos) { lseek(fInfo->m_handle, orig_pos, SEEK_SET); } callback->on_completed(bytes_written); { pplx::extensibility::scoped_recursive_lock_t lock(fInfo->m_lock); // Decrement the counter of outstanding write events. if ( --fInfo->m_outstanding_writes == 0 ) { // If this was the last one, signal all objects waiting for it to complete. for (auto iter = fInfo->m_sync_waiters.begin(); iter != fInfo->m_sync_waiters.end(); iter++) { (*iter)->on_completed(0); } fInfo->m_sync_waiters.clear(); } } }); return 0; } /// /// Initiate an asynchronous (overlapped) read from the file stream. /// /// The file info record of the file /// A pointer to the callback interface to invoke when the write request is completed. /// A pointer to a buffer where the data should be placed /// The size (in bytes) of the buffer /// The offset in the file to read from /// 0 if the read request is still outstanding, -1 if the request failed, otherwise the size of the data read into the buffer size_t _read_file_async(Concurrency::streams::details::_file_info_impl *fInfo, Concurrency::streams::details::_filestream_callback *callback, void *ptr, size_t count, size_t offset) { pplx::create_task([=]() -> void { auto bytes_read = pread(fInfo->m_handle, ptr, count, offset); if (bytes_read < 0) { callback->on_error(std::make_exception_ptr(utility::details::create_system_error(errno))); } else { callback->on_completed(bytes_read); } }); return 0; } template class _filestream_callback_fill_buffer : public _filestream_callback { public: _filestream_callback_fill_buffer(_file_info *info, _filestream_callback *callback, const Func &func) : m_info(info), m_func(func), m_callback(callback) { } virtual void on_completed(size_t result) override { m_func(result); delete this; } virtual void on_error(const std::exception_ptr &e) override { auto exptr = std::make_exception_ptr(e); m_callback->on_error(exptr); delete this; } private: _file_info *m_info; Func m_func; _filestream_callback *m_callback; }; template _filestream_callback_fill_buffer *create_callback(_file_info *info, _filestream_callback *callback, const Func &func) { return new _filestream_callback_fill_buffer(info, callback, func); } static const size_t PageSize = 512; size_t _fill_buffer_fsb(_file_info_impl *fInfo, _filestream_callback *callback, size_t count, size_t charSize) { size_t byteCount = count * charSize; if ( fInfo->m_buffer == nullptr ) { fInfo->m_bufsize = std::max(PageSize, byteCount); fInfo->m_buffer = new char[static_cast(fInfo->m_bufsize)]; fInfo->m_bufoff = fInfo->m_rdpos; auto cb = create_callback(fInfo, callback, [=] (size_t result) { pplx::extensibility::scoped_recursive_lock_t lock(fInfo->m_lock); fInfo->m_buffill = result / charSize; callback->on_completed(result); }); return _read_file_async(fInfo, cb, (uint8_t *)fInfo->m_buffer, fInfo->m_bufsize, fInfo->m_rdpos * charSize); } // First, we need to understand how far into the buffer we have already read // and how much remains. size_t bufpos = fInfo->m_rdpos - fInfo->m_bufoff; size_t bufrem = fInfo->m_buffill - bufpos; if ( bufrem < count ) { fInfo->m_bufsize = std::max(PageSize, byteCount); // Then, we allocate a new buffer. char *newbuf = new char[static_cast(fInfo->m_bufsize)]; // Then, we copy the unread part to the new buffer and delete the old buffer if ( bufrem > 0 ) memcpy(newbuf, fInfo->m_buffer + bufpos * charSize, bufrem * charSize); delete[] fInfo->m_buffer; fInfo->m_buffer = newbuf; // Then, we read the remainder of the count into the new buffer fInfo->m_bufoff = fInfo->m_rdpos; auto cb = create_callback(fInfo, callback, [=] (size_t result) { pplx::extensibility::scoped_recursive_lock_t lock(fInfo->m_lock); fInfo->m_buffill = result / charSize; callback->on_completed(result + bufrem * charSize); }); return _read_file_async(fInfo, cb, (uint8_t*)fInfo->m_buffer + bufrem * charSize, fInfo->m_bufsize - bufrem * charSize, (fInfo->m_rdpos+bufrem)*charSize); } else return byteCount; } /// /// Read data from a file stream into a buffer /// /// The file info record of the file /// A pointer to the callback interface to invoke when the write request is completed. /// A pointer to a buffer where the data should be placed /// The size (in bytes) of the buffer /// 0 if the read request is still outstanding, -1 if the request failed, otherwise the size of the data read into the buffer size_t _getn_fsb(Concurrency::streams::details::_file_info *info, Concurrency::streams::details::_filestream_callback *callback, void *ptr, size_t count, size_t charSize) { if ( callback == nullptr || info == nullptr ) return static_cast(-1); _file_info_impl *fInfo = (_file_info_impl *)info; pplx::extensibility::scoped_recursive_lock_t lock(info->m_lock); if ( fInfo->m_handle == -1 ) return static_cast(-1); size_t byteCount = count * charSize; if ( fInfo->m_buffer_reads ) { auto cb = create_callback(fInfo, callback, [=] (size_t read) { auto copy = std::min(read, byteCount); auto bufoff = fInfo->m_rdpos - fInfo->m_bufoff; memcpy(ptr, fInfo->m_buffer + bufoff * charSize, copy); fInfo->m_atend = copy < byteCount; callback->on_completed(copy); }); size_t read = _fill_buffer_fsb(fInfo, cb, count, charSize); if ( static_cast(read) > 0 ) { auto copy = std::min(read, byteCount); auto bufoff = fInfo->m_rdpos - fInfo->m_bufoff; memcpy(ptr, fInfo->m_buffer + bufoff * charSize, copy); fInfo->m_atend = copy < byteCount; return copy; } return read; } else { return _read_file_async(fInfo, callback, ptr, count, fInfo->m_rdpos * charSize); } } /// /// Write data from a buffer into the file stream. /// /// The file info record of the file /// A pointer to the callback interface to invoke when the write request is completed. /// A pointer to a buffer where the data should be placed /// The size (in bytes) of the buffer /// 0 if the read request is still outstanding, -1 if the request failed, otherwise the size of the data read into the buffer size_t _putn_fsb(Concurrency::streams::details::_file_info *info, Concurrency::streams::details::_filestream_callback *callback, const void *ptr, size_t count, size_t charSize) { if (callback == nullptr || info == nullptr) return static_cast(-1); _file_info_impl *fInfo = static_cast<_file_info_impl *>(info); pplx::extensibility::scoped_recursive_lock_t lock(fInfo->m_lock); if ( fInfo->m_handle == -1 ) return static_cast(-1); size_t byteSize = count * charSize; // To preserve the async write order, we have to move the write head before read. auto lastPos = fInfo->m_wrpos; if (fInfo->m_wrpos != static_cast(-1)) { fInfo->m_wrpos += count; lastPos *= charSize; } return _write_file_async(fInfo, callback, ptr, byteSize, lastPos); } /// /// Flush all buffered data to the underlying file. /// /// The file info record of the file /// A pointer to the callback interface to invoke when the write request is completed. /// True if the request was initiated bool _sync_fsb(Concurrency::streams::details::_file_info *info, Concurrency::streams::details::_filestream_callback *callback) { if ( callback == nullptr ) return false; if ( info == nullptr ) return false; _file_info_impl *fInfo = static_cast<_file_info_impl *>(info); pplx::extensibility::scoped_recursive_lock_t lock(fInfo->m_lock); if ( fInfo->m_handle == -1 ) return false; if ( fInfo->m_outstanding_writes > 0 ) fInfo->m_sync_waiters.push_back(callback); else callback->on_completed(0); return true; } /// /// Adjust the internal buffers and pointers when the application seeks to a new read location in the stream. /// /// The file info record of the file /// The new position (offset from the start) in the file stream /// New file position or -1 if error size_t _seekrdtoend_fsb(Concurrency::streams::details::_file_info *info, int64_t offset, size_t char_size) { if ( info == nullptr ) return static_cast(-1); _file_info_impl *fInfo = static_cast<_file_info_impl *>(info); pplx::extensibility::scoped_recursive_lock_t lock(info->m_lock); if ( fInfo->m_handle == -1 ) return static_cast(-1); if ( fInfo->m_buffer != nullptr ) { delete[] fInfo->m_buffer; fInfo->m_buffer = nullptr; fInfo->m_bufoff = fInfo->m_buffill = fInfo->m_bufsize = 0; } auto newpos = lseek(fInfo->m_handle, static_cast(offset * char_size), SEEK_END); if ( newpos == -1 ) return static_cast(-1); fInfo->m_rdpos = newpos / char_size; return fInfo->m_rdpos; } utility::size64_t _get_size(_In_ concurrency::streams::details::_file_info *info, size_t char_size) { if ( info == nullptr ) return static_cast(-1); _file_info_impl *fInfo = static_cast<_file_info_impl *>(info); pplx::extensibility::scoped_recursive_lock_t lock(info->m_lock); if ( fInfo->m_handle == -1 ) return static_cast(-1); if ( fInfo->m_buffer != nullptr ) { delete[] fInfo->m_buffer; fInfo->m_buffer = nullptr; fInfo->m_bufoff = fInfo->m_buffill = fInfo->m_bufsize = 0; } auto oldpos = lseek(fInfo->m_handle, 0, SEEK_CUR); if ( oldpos == -1 ) return utility::size64_t(-1); auto newpos = lseek(fInfo->m_handle, 0, SEEK_END); if ( newpos == -1 ) return utility::size64_t(-1); lseek(fInfo->m_handle, oldpos, SEEK_SET); return utility::size64_t(newpos / char_size); } /// /// Adjust the internal buffers and pointers when the application seeks to a new read location in the stream. /// /// The file info record of the file /// The new position (offset from the start) in the file stream /// New file position or -1 if error size_t _seekrdpos_fsb(Concurrency::streams::details::_file_info *info, size_t pos, size_t) { if ( info == nullptr ) return static_cast(-1); _file_info_impl *fInfo = static_cast<_file_info_impl *>(info); pplx::extensibility::scoped_recursive_lock_t lock(info->m_lock); if ( fInfo->m_handle == -1 ) return static_cast(-1); if ( pos < fInfo->m_bufoff || pos > (fInfo->m_bufoff+fInfo->m_buffill) ) { delete[] fInfo->m_buffer; fInfo->m_buffer = nullptr; fInfo->m_bufoff = fInfo->m_buffill = fInfo->m_bufsize = 0; } fInfo->m_rdpos = pos; return fInfo->m_rdpos; } /// /// Adjust the internal buffers and pointers when the application seeks to a new write location in the stream. /// /// The file info record of the file /// The new position (offset from the start) in the file stream /// New file position or -1 if error size_t _seekwrpos_fsb(Concurrency::streams::details::_file_info *info, size_t pos, size_t) { if ( info == nullptr ) return static_cast(-1); _file_info_impl *fInfo = static_cast<_file_info_impl *>(info); pplx::extensibility::scoped_recursive_lock_t lock(info->m_lock); if ( fInfo->m_handle == -1 ) return static_cast(-1); fInfo->m_wrpos = pos; return fInfo->m_wrpos; }