4#include "Stroika/Frameworks/StroikaPreComp.h"
9#include "Stroika/Foundation/Characters/FloatConversion.h"
11#include "Stroika/Foundation/Characters/String2Int.h"
13#include "Stroika/Foundation/Containers/Common.h"
14#include "Stroika/Foundation/DataExchange/BadFormatException.h"
19#include "Stroika/Foundation/Execution/Throw.h"
22#include "Stroika/Foundation/IO/Network/HTTP/ClientErrorException.h"
23#include "Stroika/Foundation/IO/Network/HTTP/Headers.h"
25#include "Stroika/Foundation/IO/Network/HTTP/Methods.h"
29#include "Connection.h"
37using namespace Stroika::Foundation::Memory;
38using namespace Stroika::Foundation::Traversal;
39using namespace Stroika::Foundation::Time;
41using namespace Stroika::Frameworks;
57 const Headers& defaultResponseHeaders,
const optional<bool> autoComputeETagResponse)
58 :
Message{
Request{socketStream}, Response{socket, socketStream, defaultResponseHeaders}, socket.GetPeerAddress ()}
59 , fMsgHeaderInTextStream{HTTP::MessageStartTextInputStreamBinaryAdapter::New (rwRequest ().GetInputStream ())}
61 if (autoComputeETagResponse) {
62 this->rwResponse ().autoComputeETag = *autoComputeETagResponse;
66Connection::MyMessage_::ReadHeadersResult Connection::MyMessage_::ReadHeaders (
67#
if qStroika_Framework_WebServer_Connection_DetailedMessagingLog
68 const function<
void (
const String&)>& logMsg
72#if qStroika_Framework_WebServer_Connection_DetailedMessagingLog
73 logMsg (
"Starting ReadHeaders_"sv);
79 if (not fMsgHeaderInTextStream.AssureHeaderSectionAvailable ()) {
80#if qStroika_Framework_WebServer_Connection_DetailedMessagingLog
81 logMsg (
"got fMsgHeaderInTextStream.AssureHeaderSectionAvailable INCOMPLETE"sv);
83 return ReadHeadersResult::eIncompleteButMoreMayBeAvailable;
89 Request& updatableRequest = this->rwRequest ();
92 String line = fMsgHeaderInTextStream.ReadLine ();
94#if qStroika_Framework_WebServer_Connection_DetailedMessagingLog
95 logMsg (
"got EOF from src stream reading headers(incomplete)"sv);
97 return ReadHeadersResult::eIncompleteDeadEnd;
100 if (tokens.size () < 3) {
101 DbgTrace (
"tokens={}, line='{}', fMsgHeaderInTextStream={}"_f, tokens, line, fMsgHeaderInTextStream.ToString ());
106 if (tokens[1].empty ()) {
108 DbgTrace (
"tokens={}, line='{}'"_f, tokens, line);
112 if (updatableRequest.
httpMethod ().empty ()) {
114 DbgTrace (
"tokens={}, line='{}'"_f, tokens, line);
120 static const String kCRLF_{
"\r\n"sv};
121 String line = fMsgHeaderInTextStream.ReadLine ();
122 if (line == kCRLF_ or line.empty ()) {
127 size_t i = line.
find (
':');
128 if (i == string::npos) {
135 updatableRequest.
rwHeaders ().Add (hdr, value);
138#if qStroika_Framework_WebServer_Connection_DetailedMessagingLog
139 logMsg (
"ReadHeaders completed normally"sv);
141 return ReadHeadersResult::eCompleteGood;
163#if qStroika_Framework_WebServer_Connection_TrackExtraStats
164 if (fMostRecentMessage) {
165 sb <<
", mostRecentMessage: " << *fMostRecentMessage;
167 if (fHandlingThread) {
169 sb <<
", handlingThread: " << fHandlingThread;
172 sb <<
", previousThread: " << fHandlingThread;
186 const optional<Headers>& defaultGETResponseHeaders,
const optional<bool> autoComputeETagResponse)
188 .fDefaultResponseHeaders = defaultResponseHeaders,
189 .fDefaultGETResponseHeaders = defaultGETResponseHeaders,
190 .fAutoComputeETagResponse = autoComputeETagResponse}}
195 :
socket{[qStroika_Foundation_Common_Property_ExtraCaptureStuff] ([[maybe_unused]] const auto* property) -> ConnectionOrientedStreamSocket::Ptr {
197 AssertExternallySynchronizedMutex::ReadContext declareContext{*thisObj};
198 return thisObj->fSocket_;
200 ,
request{[qStroika_Foundation_Common_Property_ExtraCaptureStuff] ([[maybe_unused]]
const auto* property) ->
const Request& {
202 AssertExternallySynchronizedMutex::ReadContext declareContext{*thisObj};
203 return thisObj->fMessage_->request ();
205 ,
response{[qStroika_Foundation_Common_Property_ExtraCaptureStuff] ([[maybe_unused]]
const auto* property) ->
const Response& {
207 AssertExternallySynchronizedMutex::ReadContext declareContext{*thisObj};
208 return thisObj->fMessage_->response ();
210 ,
rwResponse{[qStroika_Foundation_Common_Property_ExtraCaptureStuff] ([[maybe_unused]]
const auto* property) -> Response& {
212 AssertExternallySynchronizedMutex::WriteContext declareContext{*thisObj};
213 return thisObj->fMessage_->rwResponse ();
215 ,
stats{[qStroika_Foundation_Common_Property_ExtraCaptureStuff] ([[maybe_unused]]
const auto* property) -> Stats {
220#if qStroika_Framework_WebServer_Connection_TrackExtraStats
222 DurationSeconds::rep b1 = thisObj->fStartHandleMessage_.load ();
223 DurationSeconds::rep e1 = thisObj->fCompletedHandleMessage_.load ();
224 thread::id tid1 = thisObj->fHandlingThread_.load ();
227 optional<thread::id> tid = tid1 == thread::id{} ? optional<thread::id>{} : tid1;
229 Stats
stats{.fSocketID = uniqueID,
230 .fCreatedAt = createdAt,
231#if qStroika_Framework_WebServer_Connection_TrackExtraStats
233 .fHandlingThread = tid
239 [qStroika_Foundation_Common_Property_ExtraCaptureStuff] ([[maybe_unused]]
const auto* property) -> optional<HTTP::KeepAlive> {
241 AssertExternallySynchronizedMutex::ReadContext declareContext{*thisObj};
242 return thisObj->fRemaining_;
244 [qStroika_Foundation_Common_Property_ExtraCaptureStuff] ([[maybe_unused]]
auto* property,
const auto&
remainingConnectionLimits) {
246 AssertExternallySynchronizedMutex::WriteContext declareContext{*thisObj};
249 , fInterceptorChain_{options.fInterceptorChain}
250 , fDefaultResponseHeaders_{options.fDefaultResponseHeaders}
251 , fDefaultGETResponseHeaders_{options.fDefaultGETResponseHeaders}
252 , fAutoComputeETagResponse_{options.fAutoComputeETagResponse}
253 , fSupportedCompressionEncodings_{options.fSupportedCompressionEncodings}
255 , fConnectionStartedAt_{Time::GetTickCount ()}
257 Require (s !=
nullptr);
258#if USE_NOISY_TRACE_IN_THIS_MODULE_
259 DbgTrace (
"Created connection for socket {}"_f, s);
261 fSocketStream_ = SocketStream::New (fSocket_);
262#if qStroika_Framework_WebServer_Connection_DetailedMessagingLog
264 String socketName =
"{}-{}"_f((
long)DateTime::Now ().As<time_t> (), (
int)s.GetNativeSocket ());
265 fSocketStream_ = Streams::LoggingInputOutputStream<byte>::New (
269 fLogConnectionState_ = Streams::TextToBinary::Writer::New (
271 Streams::TextToBinary::Writer::Format::eUTF8WithoutBOM);
276Connection::~Connection ()
278#if USE_NOISY_TRACE_IN_THIS_MODULE_
279 DbgTrace (
"Destroying connection for socket {}, message={}"_f, fSocket_,
static_cast<const void*
> (fMessage_.get ()));
281 AssertExternallySynchronizedMutex::WriteContext declareContext{*
this};
282#if qStroika_Framework_WebServer_Connection_DetailedMessagingLog
283 WriteLogConnectionMsg_ (L
"DestroyingConnection");
285 if (fMessage_ !=
nullptr) {
286 if (not fMessage_->response ().responseCompleted ()) {
287 IgnoreExceptionsForCall (fMessage_->rwResponse ().Abort ());
289 Require (fMessage_->response ().responseCompleted ());
302 DbgTrace (
"Exception ignored closing socket: {}"_f, current_exception ());
308 AssertExternallySynchronizedMutex::WriteContext declareContext{*
this};
310#if USE_NOISY_TRACE_IN_THIS_MODULE_
313 fMessage_ = make_unique<MyMessage_> (fSocket_, fSocketStream_, fDefaultResponseHeaders_, fAutoComputeETagResponse_);
314#if qStroika_Foundation_Debug_AssertExternallySynchronizedMutex_Enabled
315 fMessage_->SetAssertExternallySynchronizedMutexContext (GetSharedContext ());
318 auto readHeaders = [&] () -> optional<ReadAndProcessResult> {
319 switch (fMessage_->ReadHeaders (
320#
if qStroika_Framework_WebServer_Connection_DetailedMessagingLog
321 [
this] (
const String& i) ->
void { WriteLogConnectionMsg_ (i); }
324 case MyMessage_::eIncompleteDeadEnd: {
325 DbgTrace (
"ReadHeaders failed - typically because the client closed the connection before we could handle it (e.g. "
326 "in web browser hitting refresh button fast)."_f);
329 case MyMessage_::eIncompleteButMoreMayBeAvailable: {
330 DbgTrace (
"ReadHeaders failed - incomplete header (most likely a DOS attack)."_f);
331 return ReadAndProcessResult::eTryAgainLater;
333 case MyMessage_::eCompleteGood: {
343 if (
auto r = readHeaders ()) {
348#if qStroika_Foundation_Debug_AssertionsChecked
349 [[maybe_unused]]
auto&& cleanup =
Finally ([&] ()
noexcept { Ensure (fMessage_->response ().responseCompleted ()); });
352#if qStroika_Framework_WebServer_Connection_TrackExtraStats
353 fStartHandleMessage_ = Time::GetTickCount ().time_since_epoch ().count ();
354 fCompletedHandleMessage_ = kAtomicTimeSentinel_;
355 fHandlingThread_ = std::this_thread::get_id ();
356 [[maybe_unused]]
auto&& cleanup2 =
357 Finally ([&] ()
noexcept { fCompletedHandleMessage_ = Time::GetTickCount ().time_since_epoch ().count (); });
360 auto applyDefaultsToResponseHeaders = [&] () ->
void {
361 if (fDefaultGETResponseHeaders_ and fMessage_->request ().httpMethod () == HTTP::Methods::kGet) {
362 fMessage_->rwResponse ().rwHeaders () += *fDefaultGETResponseHeaders_;
365 fMessage_->rwResponse ().rwHeaders ().date = DateTime::Now ();
370 if (optional<HTTP::ContentEncodings> acceptEncoding = fMessage_->request ().headers ().acceptEncoding) {
371 optional<HTTP::ContentEncodings> oBodyEncoding = fMessage_->rwResponse ().bodyEncoding ();
373 fMessage_->rwResponse ().bodyEncoding = [&] () {
375 auto bc = *oBodyEncoding;
376 bc += contentEncoding2Add;
380 return HTTP::ContentEncodings{contentEncoding2Add};
384 bool needBodyEncoding = not oBodyEncoding.has_value ();
387 if (needBodyEncoding and acceptEncoding->Contains (ce) and
388 (fSupportedCompressionEncodings_ == nullopt or fSupportedCompressionEncodings_->Contains (ce))) {
390 needBodyEncoding =
false;
393 if constexpr (DataExchange::Compression::Deflate::kSupported) {
396 if constexpr (DataExchange::Compression::GZip::kSupported) {
402 if (
auto requestedINoneMatch = this->
request ().headers ().ifNoneMatch ()) {
403 if (this->
response ().autoComputeETag ()) {
404 this->
rwResponse ().automaticTransferChunkSize =
405 Response::kNoChunkedTransfer;
409 applyDefaultsToResponseHeaders ();
414 auto applyKeepAliveLogic = [&] () ->
bool {
415 bool thisMessageKeepAlive = fMessage_->request ().keepAliveRequested;
416 if (thisMessageKeepAlive) {
420 if (
auto keepAliveValue = fMessage_->request ().headers ().keepAlive ()) {
425 if (oRemaining->fMessages) {
426 if (oRemaining->fMessages == 0u) {
427 thisMessageKeepAlive =
false;
430 oRemaining->fMessages = *oRemaining->fMessages - 1u;
433 if (oRemaining->fTimeout) {
434 if (fConnectionStartedAt_ + *oRemaining->fTimeout < Time::GetTickCount ()) {
435 thisMessageKeepAlive =
false;
442 this->
rwResponse ().rwHeaders ().connection = thisMessageKeepAlive ? Headers::eKeepAlive : Headers::eClose;
443 return thisMessageKeepAlive;
445 bool thisMessageKeepAlive = applyKeepAliveLogic ();
451 auto invokeInterceptorChain = [&] () {
452#if USE_NOISY_TRACE_IN_THIS_MODULE_
455#if qStroika_Framework_WebServer_Connection_DetailedMessagingLog
456 WriteLogConnectionMsg_ (
"Handing request {} to interceptor chain"_f(
request ()));
462#if USE_NOISY_TRACE_IN_THIS_MODULE_
463 DbgTrace (
"Interceptor-Chain caught exception handling message: {}"_f, current_exception ());
465#if qStroika_Framework_WebServer_Connection_DetailedMessagingLog
466 WriteLogConnectionMsg_ (
"Interceptor-Chain caught exception handling message: {}"_f(current_exception ()));
470 invokeInterceptorChain ();
472 auto assureRequestFullyRead = [&] () {
473 if (thisMessageKeepAlive) {
486 if (
request ().headers ().contentLength ()) {
487#if qStroika_Framework_WebServer_Connection_DetailedMessagingLog
488 WriteLogConnectionMsg_ (L
"msg is keepalive, and have content length, so making sure we read all of request body");
490#if USE_NOISY_TRACE_IN_THIS_MODULE_
494 (void)fMessage_->rwRequest ().GetBody ();
498 assureRequestFullyRead ();
504 auto completeResponse = [&] () {
506 if (
auto requestedINoneMatch = this->
request ().headers ().ifNoneMatch ()) {
507 if (
auto actualETag = this->
response ().headers ().ETag ()) {
508 bool ctm = this->
response ().chunkedTransferMode ();
510 DbgTrace (
"Warning - disregarding ifNoneMatch request (though it matched) - cuz in chunked transfer mode"_f);
512 if (requestedINoneMatch->fETags.Contains (*actualETag) and not ctm) {
513 DbgTrace (
"Updating OK response to NotModified (due to ETag match)"_f);
514 this->
rwResponse ().status = HTTP::StatusCodes::kNotModified;
520 thisMessageKeepAlive =
false;
522#if qStroika_Framework_WebServer_Connection_DetailedMessagingLog
523 WriteLogConnectionMsg_ (L
"Did GetResponse ().End ()");
528 return thisMessageKeepAlive ? eTryAgainLater : eClose;
531 DbgTrace (
"ReadAndProcessMessage Exception caught ({}), so returning ReadAndProcessResult::eClose"_f, current_exception ());
533 return Connection::ReadAndProcessResult::eClose;
537#if qStroika_Framework_WebServer_Connection_DetailedMessagingLog
538void Connection::WriteLogConnectionMsg_ (
const String& msg)
const
540 String useMsg = DateTime::Now ().Format () +
" -- "sv + msg.
Trim ();
541 fLogConnectionState_.WriteLn (useMsg);
547 AssertExternallySynchronizedMutex::ReadContext declareContext{*
this};
550 sb <<
"Socket: "sv << fSocket_;
551 if (not abbreviatedOutput) {
552 sb <<
", Message: "sv << fMessage_;
553 sb <<
", Remaining: "sv << fRemaining_;
555 sb <<
", Connection-Started-At: "sv << fConnectionStartedAt_;
#define AssertNotReached()
time_point< RealtimeClock, DurationSeconds > TimePointSeconds
TimePointSeconds is a simpler approach to chrono::time_point, which doesn't require using templates e...
chrono::duration< double > DurationSeconds
chrono::duration<double> - a time span (length of time) measured in seconds, but high precision.
Similar to String, but intended to more efficiently construct a String. Mutable type (String is large...
String is like std::u32string, except it is much easier to use, often much more space efficient,...
nonvirtual size_t length() const noexcept
nonvirtual String SubString(SZ from) const
nonvirtual String Trim(bool(*shouldBeTrimmed)(Character)=Character::IsWhitespace) const
nonvirtual Containers::Sequence< String > Tokenize() const
nonvirtual size_t find(Character c, size_t startAt=0) const
A generalization of a vector: a container whose elements are keyed by the natural numbers.
nonvirtual void Close() const
ClientErrorException is to capture exceptions caused by a bad (e.g ill-formed) request.
Common::Property< String > httpMethod
typically HTTP::Methods::kGet
Common::Property< Headers & > rwHeaders
Common::Property< URI > url
Common::Property< String > httpVersion
nonvirtual PlatformNativeHandle GetNativeSocket() const
static URI ParseRelative(const String &rawRelativeURL)
A Connection object represents the state (and socket) for an ongoing, active, HTTP Connection,...
Common::Property< optional< HTTP::KeepAlive > > remainingConnectionLimits
const Common::ReadOnlyProperty< Stats > stats
retrieve stats about this connection, like threads used, start/end times. NB: INTERNALLY SYNCRONIZED
const Common::ReadOnlyProperty< ConnectionOrientedStreamSocket::Ptr > socket
nonvirtual ReadAndProcessResult ReadAndProcessMessage() noexcept
const Common::ReadOnlyProperty< const Request & > request
Common::ReadOnlyProperty< Response & > rwResponse
const Common::ReadOnlyProperty< const Response & > response
nonvirtual String ToString(bool abbreviatedOutput=true) const
nonvirtual void HandleMessage(Message &m) const
this represents a HTTP request object for the WebServer module
CONTAINER::value_type * End(CONTAINER &c)
For a contiguous container (such as a vector or basic_string) - find the pointer to the end of the co...
void Throw(T &&e2Throw)
identical to builtin C++ 'throw' except that it does helpful, type dependent DbgTrace() messages firs...
auto Finally(FUNCTION &&f) -> Private_::FinallySentry< FUNCTION >
filesystem::path GetTemporary()
constexpr bool IsOK(Status s)
several status codes considered OK, so check if it is among them
Content coding values indicate an encoding transformation that has been or can be applied to an entit...
static const ContentEncoding kDeflate
static const ContentEncoding kGZip
Time::TimePointSeconds fCreatedAt
nonvirtual String ToString() const
Socket::PlatformNativeHandle fSocketID
ConnectionManager::Options specify things like default headers, caching policies, binding flags (not ...