/***
* ==++==
*
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* ==--==
* =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
*
* Asynchronous I/O: stream buffer implementation details
*
* We're going to some lengths to avoid exporting C++ class member functions and implementation details across
* module boundaries, and the factoring requires that we keep the implementation details away from the main header
* files. The supporting functions, which are in this file, use C-like signatures to avoid as many issues as
* possible.
*
* For the latest on this and related APIs, please see http://casablanca.codeplex.com.
*
* =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
****/
#include "stdafx.h"
#include "cpprest/details/fileio.h"
using namespace web;
using namespace utility;
using namespace concurrency;
using namespace utility::conversions;
namespace Concurrency { namespace streams { namespace details {
/***
* ==++==
*
* Implementation details of the file stream buffer
*
* =-=-=-
****/
///
/// The public parts of the file information record contain only what is implementation-
/// independent. The actual allocated record is larger and has details that the implementation
/// require in order to function.
///
struct _file_info_impl : _file_info
{
_file_info_impl(HANDLE handle, _In_ void *io_ctxt, std::ios_base::openmode mode, size_t buffer_size) :
_file_info(mode, buffer_size),
m_io_context(io_ctxt),
m_handle(handle)
{
}
///
/// The Win32 file handle of the file
///
HANDLE m_handle;
///
/// A Win32 I/O context, used by the thread pool to scheduler work.
///
void *m_io_context;
};
}}}
using namespace streams::details;
///
/// Our extended OVERLAPPED record.
///
///
/// The standard OVERLAPPED structure doesn't have any fields for application-specific
/// data, so we must extend it.
///
struct EXTENDED_OVERLAPPED : OVERLAPPED
{
EXTENDED_OVERLAPPED(LPOVERLAPPED_COMPLETION_ROUTINE func, streams::details::_filestream_callback *cb) : callback(cb), func(func)
{
ZeroMemory(this, sizeof(OVERLAPPED));
}
streams::details::_filestream_callback *callback;
LPOVERLAPPED_COMPLETION_ROUTINE func;
};
#if _WIN32_WINNT < _WIN32_WINNT_VISTA
void CALLBACK IoCompletionCallback(
DWORD dwErrorCode,
DWORD dwNumberOfBytesTransfered,
LPOVERLAPPED pOverlapped)
{
EXTENDED_OVERLAPPED *pExtOverlapped = static_cast(pOverlapped);
////If dwErrorCode is 0xc0000011, it means STATUS_END_OF_FILE.
////Map this error code to system error code:ERROR_HANDLE_EOF
if (dwErrorCode == 0xc0000011)
dwErrorCode = ERROR_HANDLE_EOF;
pExtOverlapped->func(dwErrorCode, dwNumberOfBytesTransfered, pOverlapped);
delete pOverlapped;
}
#else
void CALLBACK IoCompletionCallback(
PTP_CALLBACK_INSTANCE instance,
PVOID ctxt,
PVOID pOverlapped,
ULONG result,
ULONG_PTR numberOfBytesTransferred,
PTP_IO io)
{
CASABLANCA_UNREFERENCED_PARAMETER(io);
CASABLANCA_UNREFERENCED_PARAMETER(ctxt);
CASABLANCA_UNREFERENCED_PARAMETER(instance);
EXTENDED_OVERLAPPED *pExtOverlapped = static_cast(pOverlapped);
pExtOverlapped->func(result, static_cast(numberOfBytesTransferred), static_cast(pOverlapped));
delete pOverlapped;
}
#endif
///
/// Translate from C++ STL file open modes to Win32 flags.
///
/// The C++ file open mode
/// The C++ file open protection
/// A pointer to a DWORD that will hold the desired access flags
/// A pointer to a DWORD that will hold the creation disposition
/// A pointer to a DWORD that will hold the share mode
void _get_create_flags(std::ios_base::openmode mode, int prot, DWORD &dwDesiredAccess, DWORD &dwCreationDisposition, DWORD &dwShareMode)
{
dwDesiredAccess = 0x0;
if ( mode & std::ios_base::in ) dwDesiredAccess |= GENERIC_READ;
if ( mode & std::ios_base::out ) dwDesiredAccess |= GENERIC_WRITE;
if ( mode & std::ios_base::in )
{
if ( mode & std::ios_base::out )
dwCreationDisposition = OPEN_ALWAYS;
else
dwCreationDisposition = OPEN_EXISTING;
}
else if ( mode & std::ios_base::trunc )
{
dwCreationDisposition = CREATE_ALWAYS;
}
else
{
dwCreationDisposition = OPEN_ALWAYS;
}
// C++ specifies what permissions to deny, Windows which permissions to give,
dwShareMode = 0x3;
switch (prot)
{
case _SH_DENYRW:
dwShareMode = 0x0;
break;
case _SH_DENYWR:
dwShareMode = 0x1;
break;
case _SH_DENYRD:
dwShareMode = 0x2;
break;
}
}
///
/// Perform post-CreateFile processing.
///
/// The Win32 file handle
/// The callback interface pointer
/// The C++ file open mode
void _finish_create(HANDLE fh, _In_ _filestream_callback *callback, std::ios_base::openmode mode, int prot)
{
if (fh == INVALID_HANDLE_VALUE)
{
callback->on_error(std::make_exception_ptr(utility::details::create_system_error(GetLastError())));
return;
}
void *io_ctxt = nullptr;
#if _WIN32_WINNT < _WIN32_WINNT_VISTA
if (!BindIoCompletionCallback(fh, IoCompletionCallback, 0))
{
callback->on_error(std::make_exception_ptr(utility::details::create_system_error(GetLastError())));
return;
}
#else
io_ctxt = CreateThreadpoolIo(fh, IoCompletionCallback, nullptr, nullptr);
if (io_ctxt == nullptr)
{
callback->on_error(std::make_exception_ptr(utility::details::create_system_error(GetLastError())));
return;
}
if (!SetFileCompletionNotificationModes(fh, FILE_SKIP_COMPLETION_PORT_ON_SUCCESS))
{
CloseThreadpoolIo(static_cast(io_ctxt));
callback->on_error(std::make_exception_ptr(utility::details::create_system_error(GetLastError())));
return;
}
#endif
// Buffer reads internally if and only if we're just reading (not also writing) and
// if the file is opened exclusively. If either is false, we're better off just
// letting the OS do its buffering, even if it means that prompt reads won't
// happen.
bool buffer = (mode == std::ios_base::in) && (prot == _SH_DENYRW);
auto info = new _file_info_impl(fh, io_ctxt, mode, buffer ? 512 : 0);
if (mode & std::ios_base::app || mode & std::ios_base::ate)
{
info->m_wrpos = static_cast(-1); // Start at the end of the file.
}
callback->on_opened(info);
}
///
/// Open a file and create a streambuf instance to represent it.
///
/// A pointer to the callback interface to invoke when the file has been opened.
/// The name of the file to open
/// A creation mode for the stream buffer
/// A file protection mode to use for the file stream
/// true if the opening operation could be initiated, false otherwise.
///
/// True does not signal that the file will eventually be successfully opened, just that the process was started.
///
bool __cdecl _open_fsb_str(_In_ _filestream_callback *callback, const utility::char_t *filename, std::ios_base::openmode mode, int prot)
{
_ASSERTE(callback != nullptr);
_ASSERTE(filename != nullptr);
std::wstring name(filename);
pplx::create_task([=]()
{
DWORD dwDesiredAccess, dwCreationDisposition, dwShareMode;
_get_create_flags(mode, prot, dwDesiredAccess, dwCreationDisposition, dwShareMode);
HANDLE fh = ::CreateFileW(name.c_str(), dwDesiredAccess, dwShareMode, nullptr, dwCreationDisposition, FILE_FLAG_OVERLAPPED, 0);
_finish_create(fh, callback, mode, prot);
});
return true;
}
///
/// Close a file stream buffer.
///
/// The file info record of the file
/// A pointer to the callback interface to invoke when the file has been opened.
/// true if the closing operation could be initiated, false otherwise.
///
/// True does not signal that the file will eventually be successfully closed, just that the process was started.
///
bool __cdecl _close_fsb_nolock(_In_ _file_info **info, _In_ streams::details::_filestream_callback *callback)
{
_ASSERTE(callback != nullptr);
_ASSERTE(info != nullptr);
_ASSERTE(*info != nullptr);
_file_info_impl *fInfo = static_cast<_file_info_impl *>(*info);
if ( fInfo->m_handle == INVALID_HANDLE_VALUE ) return false;
// Since closing a file may involve waiting for outstanding writes which can take some time
// if the file is on a network share, the close action is done in a separate task, as
// CloseHandle doesn't have I/O completion events.
pplx::create_task([=]()
{
bool result = false;
{
pplx::extensibility::scoped_recursive_lock_t lck(fInfo->m_lock);
if (fInfo->m_handle != INVALID_HANDLE_VALUE)
{
#if _WIN32_WINNT >= _WIN32_WINNT_VISTA
CloseThreadpoolIo(static_cast(fInfo->m_io_context));
#endif // _WIN32_WINNT >= _WIN32_WINNT_VISTA
result = CloseHandle(fInfo->m_handle) != FALSE;
}
delete fInfo->m_buffer;
}
delete fInfo;
if (result)
callback->on_closed();
else
callback->on_error(std::make_exception_ptr(utility::details::create_system_error(GetLastError())));
});
*info = nullptr;
return true;
}
bool __cdecl _close_fsb(_In_ _file_info **info, _In_ streams::details::_filestream_callback *callback)
{
_ASSERTE(callback != nullptr);
_ASSERTE(info != nullptr);
_ASSERTE(*info != nullptr);
return _close_fsb_nolock(info, callback);
}
///
/// The completion routine used when a write request finishes.
///
///
/// The signature is the standard IO completion signature, documented on MSDN
///
template
VOID CALLBACK _WriteFileCompletionRoutine(DWORD dwErrorCode, DWORD dwNumberOfBytesTransfered, LPOVERLAPPED lpOverlapped)
{
EXTENDED_OVERLAPPED* pOverlapped = static_cast(lpOverlapped);
if (dwErrorCode != ERROR_SUCCESS && dwErrorCode != ERROR_HANDLE_EOF)
{
pOverlapped->callback->on_error(std::make_exception_ptr(utility::details::create_system_error(dwErrorCode)));
}
else
{
pOverlapped->callback->on_completed(static_cast(dwNumberOfBytesTransfered));
}
}
///
/// The completion routine used when a read request finishes.
///
///
/// The signature is the standard IO completion signature, documented on MSDN
///
template
VOID CALLBACK _ReadFileCompletionRoutine(DWORD dwErrorCode, DWORD dwNumberOfBytesTransfered, LPOVERLAPPED lpOverlapped)
{
EXTENDED_OVERLAPPED* pOverlapped = static_cast(lpOverlapped);
if (dwErrorCode != ERROR_SUCCESS && dwErrorCode != ERROR_HANDLE_EOF)
{
pOverlapped->callback->on_error(std::make_exception_ptr(utility::details::create_system_error(dwErrorCode)));
}
else
{
pOverlapped->callback->on_completed(static_cast(dwNumberOfBytesTransfered));
}
}
///
/// Initiate an asynchronous (overlapped) write to the file stream.
///
/// The file info record of the file
/// A pointer to the callback interface to invoke when the write request is completed.
/// A pointer to the data to write
/// The size (in bytes) of the data
/// 0 if the write request is still outstanding, -1 if the request failed, otherwise the size of the data written
size_t _write_file_async(_In_ streams::details::_file_info_impl *fInfo, _In_ streams::details::_filestream_callback *callback, const void *ptr, size_t count, size_t position)
{
auto pOverlapped = std::unique_ptr(new EXTENDED_OVERLAPPED(_WriteFileCompletionRoutine, callback));
if (position == static_cast(-1))
{
pOverlapped->Offset = 0xFFFFFFFF;
pOverlapped->OffsetHigh = 0xFFFFFFFF;
}
else
{
pOverlapped->Offset = static_cast(position);
#ifdef _WIN64
pOverlapped->OffsetHigh = static_cast(position >> 32);
#else
pOverlapped->OffsetHigh = 0;
#endif
}
#if _WIN32_WINNT >= _WIN32_WINNT_VISTA
StartThreadpoolIo(static_cast(fInfo->m_io_context));
BOOL wrResult = WriteFile(fInfo->m_handle, ptr, static_cast(count), nullptr, pOverlapped.get());
DWORD error = GetLastError();
// WriteFile will return false when a) the operation failed, or b) when the request is still
// pending. The error code will tell us which is which.
if (wrResult == FALSE && error == ERROR_IO_PENDING)
{
// Overlapped is deleted in the threadpool callback.
pOverlapped.release();
return 0;
}
CancelThreadpoolIo(static_cast(fInfo->m_io_context));
size_t result = static_cast(-1);
if ( wrResult == TRUE )
{
// If WriteFile returned true, it must be because the operation completed immediately.
// However, we didn't pass in an address for the number of bytes written, so
// we have to retrieve it using 'GetOverlappedResult,' which may, in turn, fail.
DWORD written = 0;
result = GetOverlappedResult(fInfo->m_handle, pOverlapped.get(), &written, FALSE) ? static_cast(written) : static_cast(-1);
}
if (result == static_cast(-1))
callback->on_error(std::make_exception_ptr(utility::details::create_system_error(error)));
return result;
#else
BOOL wrResult = WriteFile(fInfo->m_handle, ptr, (DWORD)count, nullptr, pOverlapped.get());
DWORD error = GetLastError();
// 1. If WriteFile returned true, it must be because the operation completed immediately.
// The xp threadpool immediatly creates a workerthread to run "_WriteFileCompletionRoutine".
// If this function return value > 0, the condition "if (written == sizeof(_CharType))" in the filestreams.h "_getcImpl()" function will be satisfied.
// The main thread will delete the input "callback", while the threadpool workerthread is accessing this "callback"; there will be a race condition and AV error.
// We directly return 0 and leave all the completion callbacks working on the workerthread.
// We do not need to call GetOverlappedResult, the workerthread will call the "on_error()" if the WriteFaile falied.
// "req" is deleted in "_WriteFileCompletionRoutine, "pOverlapped" is deleted in io_scheduler::FileIOCompletionRoutine.
if (wrResult == TRUE)
{
pOverlapped.release();
return 0;
}
// 2. If WriteFile returned false and GetLastError is ERROR_IO_PENDING, return 0,
// The xp threadpool will create a workerthread to run "_WriteFileCompletionRoutine" after the operation completed.
if (wrResult == FALSE && error == ERROR_IO_PENDING)
{
// Overlapped is deleted in the threadpool callback.
pOverlapped.release();
return 0;
}
// 3. If ReadFile returned false and GetLastError is not ERROR_IO_PENDING, we must call "callback->on_error()".
// The threadpools will not start the workerthread.
callback->on_error(std::make_exception_ptr(utility::details::create_system_error(error)));
return static_cast(-1);
#endif // _WIN32_WINNT >= _WIN32_WINNT_VISTA
}
///
/// Initiate an asynchronous (overlapped) read from the file stream.
///
/// The file info record of the file
/// A pointer to the callback interface to invoke when the write request is completed.
/// A pointer to a buffer where the data should be placed
/// The size (in bytes) of the buffer
/// The offset in the file to read from
/// 0 if the read request is still outstanding, -1 if the request failed, otherwise the size of the data read into the buffer
size_t _read_file_async(_In_ streams::details::_file_info_impl *fInfo, _In_ streams::details::_filestream_callback *callback, _Out_writes_ (count) void *ptr, _In_ size_t count, size_t offset)
{
auto pOverlapped = std::unique_ptr(new EXTENDED_OVERLAPPED(_ReadFileCompletionRoutine, callback));
pOverlapped->Offset = static_cast(offset);
#ifdef _WIN64
pOverlapped->OffsetHigh = static_cast(offset >> 32);
#else
pOverlapped->OffsetHigh = 0;
#endif
#if _WIN32_WINNT >= _WIN32_WINNT_VISTA
StartThreadpoolIo((PTP_IO)fInfo->m_io_context);
BOOL wrResult = ReadFile(fInfo->m_handle, ptr, static_cast(count), nullptr, pOverlapped.get());
DWORD error = GetLastError();
// ReadFile will return false when a) the operation failed, or b) when the request is still
// pending. The error code will tell us which is which.
if (wrResult == FALSE && error == ERROR_IO_PENDING)
{
// Overlapped is deleted in the threadpool callback.
pOverlapped.release();
return 0;
}
// We find ourselves here because there was a synchronous completion, either with an error or
// success. Either way, we don't need the thread pool I/O request here, or the request and
// overlapped structures.
CancelThreadpoolIo(static_cast(fInfo->m_io_context));
size_t result = static_cast(-1);
if (wrResult == TRUE)
{
// If ReadFile returned true, it must be because the operation completed immediately.
// However, we didn't pass in an address for the number of bytes written, so
// we have to retrieve it using 'GetOverlappedResult,' which may, in turn, fail.
DWORD read = 0;
result = GetOverlappedResult(fInfo->m_handle, pOverlapped.get(), &read, FALSE) ? static_cast(read) : static_cast(-1);
}
if (wrResult == FALSE && error == ERROR_HANDLE_EOF)
{
callback->on_completed(0);
return 0;
}
if (result == static_cast(-1))
callback->on_error(std::make_exception_ptr(utility::details::create_system_error(error)));
return result;
#else
BOOL wrResult = ReadFile(fInfo->m_handle, ptr, static_cast(count), nullptr, pOverlapped.get());
DWORD error = GetLastError();
// 1. If ReadFile returned true, it must be because the operation completed immediately.
// The xp threadpool immediatly creates a workerthread to run "_WriteFileCompletionRoutine".
// If this function return value > 0, the condition "if ( ch == sizeof(_CharType) )" in the filestreams.h "_getcImpl()" function will be satisfied.
// The main thread will delete the input "callback", while the threadpool workerthread is accessing this "callback"; there will be a race condition and AV error.
// We can directly return 0 and leave all the completion callbacks working on the workerthread.
// We do not need to call GetOverlappedResult, the workerthread will call the "on_error()" if the ReadFile falied.
// "req" is deleted in "_ReadFileCompletionRoutine, "pOverlapped" is deleted in io_scheduler::FileIOCompletionRoutine.
if (wrResult == TRUE)
{
pOverlapped.release();
return 0;
}
// 2. If ReadFile returned false and GetLastError is ERROR_IO_PENDING, return 0.
// The xp threadpool will create a workerthread to run "_WriteFileCompletionRoutine" after the operation completed.
if (wrResult == FALSE && error == ERROR_IO_PENDING)
{
// Overlapped is deleted in the threadpool callback.
pOverlapped.release();
return 0;
}
// 3. If ReadFile returned false and GetLastError is ERROR_HANDLE_EOF, we must call "callback->on_completed(0)".
// The threadpool will not start the workerthread.
if (wrResult == FALSE && error == ERROR_HANDLE_EOF)
{
callback->on_completed(0);
return 0;
}
// 4. If ReadFile returned false and GetLastError is not a valid error code, we must call "callback->on_error()".
// The threadpool will not start the workerthread.
callback->on_error(std::make_exception_ptr(utility::details::create_system_error(error)));
return static_cast(-1);
#endif // _WIN32_WINNT >= _WIN32_WINNT_VISTA
}
template
class _filestream_callback_fill_buffer : public _filestream_callback
{
public:
_filestream_callback_fill_buffer(_In_ _file_info *info, const Func &func) : m_func(func), m_info(info) { }
virtual void on_completed(size_t result)
{
m_func(result);
delete this;
}
private:
_file_info *m_info;
Func m_func;
};
template
_filestream_callback_fill_buffer *create_callback(_In_ _file_info *info, const Func &func)
{
return new _filestream_callback_fill_buffer(info, func);
}
size_t _fill_buffer_fsb(_In_ _file_info_impl *fInfo, _In_ _filestream_callback *callback, size_t count, size_t char_size)
{
msl::safeint3::SafeInt safeCount = count;
if ( fInfo->m_buffer == nullptr || safeCount > fInfo->m_bufsize )
{
if ( fInfo->m_buffer != nullptr )
delete fInfo->m_buffer;
fInfo->m_bufsize = safeCount.Max(fInfo->m_buffer_size);
fInfo->m_buffer = new char[fInfo->m_bufsize*char_size];
fInfo->m_bufoff = fInfo->m_rdpos;
auto cb = create_callback(fInfo,
[=] (size_t result)
{
pplx::extensibility::scoped_recursive_lock_t lck(fInfo->m_lock);
fInfo->m_buffill = result/char_size;
callback->on_completed(result);
});
auto read = _read_file_async(fInfo, cb, reinterpret_cast(fInfo->m_buffer), fInfo->m_bufsize*char_size, fInfo->m_rdpos*char_size);
switch (read)
{
case 0:
// pending
return read;
case (-1):
// error
delete cb;
return read;
default:
// operation is complete. The pattern of returning synchronously
// has the expectation that we duplicate the callback code here...
// Do the expedient thing for now.
cb->on_completed(read);
// return pending
return 0;
};
}
// First, we need to understand how far into the buffer we have already read
// and how much remains.
size_t bufpos = fInfo->m_rdpos - fInfo->m_bufoff;
size_t bufrem = fInfo->m_buffill - bufpos;
// We have four different scenarios:
// 1. The read position is before the start of the buffer, in which case we will just reuse the buffer.
// 2. The read position is in the middle of the buffer, and we need to read some more.
// 3. The read position is beyond the end of the buffer. Do as in #1.
// 4. We have everything we need.
if ( (fInfo->m_rdpos < fInfo->m_bufoff) || (fInfo->m_rdpos >= (fInfo->m_bufoff+fInfo->m_buffill)) )
{
// Reuse the existing buffer.
fInfo->m_bufoff = fInfo->m_rdpos;
auto cb = create_callback(fInfo,
[=] (size_t result)
{
pplx::extensibility::scoped_recursive_lock_t lck(fInfo->m_lock);
fInfo->m_buffill = result/char_size;
callback->on_completed(bufrem*char_size+result);
});
auto read = _read_file_async(fInfo, cb, reinterpret_cast(fInfo->m_buffer), fInfo->m_bufsize*char_size, fInfo->m_rdpos*char_size);
switch (read)
{
case 0:
// pending
return read;
case (-1):
// error
delete cb;
return read;
default:
// operation is complete. The pattern of returning synchronously
// has the expectation that we duplicate the callback code here...
// Do the expedient thing for now.
cb->on_completed(read);
// return pending
return 0;
};
}
else if ( bufrem < count )
{
fInfo->m_bufsize = safeCount.Max(fInfo->m_buffer_size);
// Then, we allocate a new buffer.
char *newbuf = new char[fInfo->m_bufsize*char_size];
// Then, we copy the unread part to the new buffer and delete the old buffer
if ( bufrem > 0 )
memcpy(newbuf, fInfo->m_buffer+bufpos*char_size, bufrem*char_size);
delete fInfo->m_buffer;
fInfo->m_buffer = newbuf;
// Then, we read the remainder of the count into the new buffer
fInfo->m_bufoff = fInfo->m_rdpos;
auto cb = create_callback(fInfo,
[=] (size_t result)
{
pplx::extensibility::scoped_recursive_lock_t lck(fInfo->m_lock);
fInfo->m_buffill = result/char_size;
callback->on_completed(bufrem*char_size+result);
});
auto read = _read_file_async(fInfo, cb, reinterpret_cast(fInfo->m_buffer) + bufrem*char_size, (fInfo->m_bufsize - bufrem)*char_size, (fInfo->m_rdpos + bufrem)*char_size);
switch (read)
{
case 0:
// pending
return read;
case (-1):
// error
delete cb;
return read;
default:
// operation is complete. The pattern of returning synchronously
// has the expectation that we duplicate the callback code here...
// Do the expedient thing for now.
cb->on_completed(read);
// return pending
return 0;
};
}
else
{
// If we are here, it means that we didn't need to read, we already have enough data in the buffer
return count*char_size;
}
}
///
/// Read data from a file stream into a buffer
///
/// The file info record of the file
/// A pointer to the callback interface to invoke when the write request is completed.
/// A pointer to a buffer where the data should be placed
/// The size (in characters) of the buffer
/// 0 if the read request is still outstanding, -1 if the request failed, otherwise the size of the data read into the buffer
size_t __cdecl _getn_fsb(_In_ streams::details::_file_info *info, _In_ streams::details::_filestream_callback *callback, _Out_writes_ (count) void *ptr, _In_ size_t count, size_t char_size)
{
_ASSERTE(callback != nullptr);
_ASSERTE(info != nullptr);
_file_info_impl *fInfo = static_cast<_file_info_impl *>(info);
pplx::extensibility::scoped_recursive_lock_t lck(info->m_lock);
if ( fInfo->m_handle == INVALID_HANDLE_VALUE )
{
callback->on_error(std::make_exception_ptr(utility::details::create_system_error(ERROR_INVALID_HANDLE)));
return (size_t)-1;
}
if ( fInfo->m_buffer_size > 0 )
{
auto cb = create_callback(fInfo,
[=] (size_t read)
{
auto sz = count*char_size;
auto copy = (read < sz) ? read : sz;
auto bufoff = fInfo->m_rdpos - fInfo->m_bufoff;
memcpy(ptr, fInfo->m_buffer+bufoff*char_size, copy);
fInfo->m_atend = copy < sz;
callback->on_completed(copy);
});
size_t read = _fill_buffer_fsb(fInfo, cb, count, char_size);
if ( read > 0 )
{
auto sz = count*char_size;
auto copy = (read < sz) ? read : sz;
auto bufoff = fInfo->m_rdpos - fInfo->m_bufoff;
memcpy(ptr, fInfo->m_buffer+bufoff*char_size, copy);
fInfo->m_atend = copy < sz;
return copy;
}
return read;
}
else
{
return _read_file_async(fInfo, callback, ptr, count*char_size, fInfo->m_rdpos*char_size);
}
}
///
/// Write data from a buffer into the file stream.
///
/// The file info record of the file
/// A pointer to the callback interface to invoke when the write request is completed.
/// A pointer to a buffer where the data should be placed
/// The size (in characters) of the buffer
/// 0 if the read request is still outstanding, -1 if the request failed, otherwise the size of the data read into the buffer
size_t __cdecl _putn_fsb(_In_ streams::details::_file_info *info, _In_ streams::details::_filestream_callback *callback, const void *ptr, size_t count, size_t char_size)
{
_ASSERTE(info != nullptr);
_file_info_impl *fInfo = static_cast<_file_info_impl *>(info);
pplx::extensibility::scoped_recursive_lock_t lck(fInfo->m_lock);
if ( fInfo->m_handle == INVALID_HANDLE_VALUE )
{
callback->on_error(std::make_exception_ptr(utility::details::create_system_error(ERROR_INVALID_HANDLE)));
return static_cast(-1);
}
// To preserve the async write order, we have to move the write head before read.
auto lastPos = fInfo->m_wrpos;
if (fInfo->m_wrpos != static_cast(-1))
{
fInfo->m_wrpos += count;
lastPos *= char_size;
}
return _write_file_async(fInfo, callback, ptr, count*char_size, lastPos);
}
///
/// Flush all buffered data to the underlying file.
///
/// The file info record of the file
/// A pointer to the callback interface to invoke when the write request is completed.
/// true if the request was initiated
bool __cdecl _sync_fsb(_In_ streams::details::_file_info *, _In_ streams::details::_filestream_callback *callback)
{
_ASSERTE(callback != nullptr);
// Writes are not cached
callback->on_completed(0);
return true;
}
///
/// Adjust the internal buffers and pointers when the application seeks to a new read location in the stream.
///
/// The file info record of the file
/// The new position (offset from the start) in the file stream
/// New file position or -1 if error
size_t __cdecl _seekrdpos_fsb(_In_ streams::details::_file_info *info, size_t pos, size_t)
{
_ASSERTE(info != nullptr);
_file_info_impl *fInfo = static_cast<_file_info_impl *>(info);
pplx::extensibility::scoped_recursive_lock_t lck(info->m_lock);
if (fInfo->m_handle == INVALID_HANDLE_VALUE) return static_cast(-1);
if ( pos < fInfo->m_bufoff || pos > (fInfo->m_bufoff+fInfo->m_buffill) )
{
delete fInfo->m_buffer;
fInfo->m_buffer = nullptr;
fInfo->m_bufoff = fInfo->m_buffill = fInfo->m_bufsize = 0;
}
fInfo->m_rdpos = pos;
return fInfo->m_rdpos;
}
///
/// Adjust the internal buffers and pointers when the application seeks to a new read location in the stream.
///
/// The file info record of the file
/// The new position (offset from the end of the stream) in the file stream
/// The size of the character type used for this stream
/// New file position or -1 if error
size_t __cdecl _seekrdtoend_fsb(_In_ streams::details::_file_info *info, int64_t offset, size_t char_size)
{
_ASSERTE(info != nullptr);
_file_info_impl *fInfo = static_cast<_file_info_impl *>(info);
pplx::extensibility::scoped_recursive_lock_t lck(info->m_lock);
if (fInfo->m_handle == INVALID_HANDLE_VALUE) return static_cast(-1);
if ( fInfo->m_buffer != nullptr )
{
// Clear the internal buffer.
delete fInfo->m_buffer;
fInfo->m_buffer = nullptr;
fInfo->m_bufoff = fInfo->m_buffill = fInfo->m_bufsize = 0;
}
auto newpos = SetFilePointer(fInfo->m_handle, (LONG)(offset*char_size), nullptr, FILE_END);
if (newpos == INVALID_SET_FILE_POINTER) return static_cast(-1);
fInfo->m_rdpos = static_cast(newpos) / char_size;
return fInfo->m_rdpos;
}
utility::size64_t __cdecl _get_size(_In_ concurrency::streams::details::_file_info *info, size_t char_size)
{
_ASSERTE(info != nullptr);
_file_info_impl *fInfo = static_cast<_file_info_impl *>(info);
pplx::extensibility::scoped_recursive_lock_t lck(info->m_lock);
if ( fInfo->m_handle == INVALID_HANDLE_VALUE ) return (utility::size64_t)-1;
LARGE_INTEGER size;
if ( GetFileSizeEx(fInfo->m_handle, &size) == TRUE )
return utility::size64_t(size.QuadPart/char_size);
else
return 0;
}
///
/// Adjust the internal buffers and pointers when the application seeks to a new write location in the stream.
///
/// The file info record of the file
/// The new position (offset from the start) in the file stream
/// New file position or -1 if error
size_t __cdecl _seekwrpos_fsb(_In_ streams::details::_file_info *info, size_t pos, size_t)
{
_ASSERTE(info != nullptr);
_file_info_impl *fInfo = static_cast<_file_info_impl *>(info);
pplx::extensibility::scoped_recursive_lock_t lck(info->m_lock);
if (fInfo->m_handle == INVALID_HANDLE_VALUE) return static_cast(-1);
fInfo->m_wrpos = pos;
return fInfo->m_wrpos;
}