4#include "Stroika/Frameworks/StroikaPreComp.h"
9#include "Stroika/Foundation/Characters/FloatConversion.h"
11#include "Stroika/Foundation/Characters/String2Int.h"
13#include "Stroika/Foundation/Containers/Common.h"
14#include "Stroika/Foundation/DataExchange/BadFormatException.h"
20#include "Stroika/Foundation/Execution/Throw.h"
23#include "Stroika/Foundation/IO/Network/HTTP/ClientErrorException.h"
24#include "Stroika/Foundation/IO/Network/HTTP/Headers.h"
26#include "Stroika/Foundation/IO/Network/HTTP/Methods.h"
30#include "Connection.h"
38using namespace Stroika::Foundation::Memory;
39using namespace Stroika::Foundation::Traversal;
40using namespace Stroika::Foundation::Time;
42using namespace Stroika::Frameworks;
58 const Headers& defaultResponseHeaders,
const optional<bool> autoComputeETagResponse)
59 :
Message{
Request{socketStream}, Response{socket, socketStream, defaultResponseHeaders}, socket.GetPeerAddress ()}
60 , fMsgHeaderInTextStream{HTTP::MessageStartTextInputStreamBinaryAdapter::New (rwRequest ().GetInputStream ())}
62 if (autoComputeETagResponse) {
63 this->rwResponse ().autoComputeETag = *autoComputeETagResponse;
67Connection::MyMessage_::ReadHeadersResult Connection::MyMessage_::ReadHeaders (
68#
if qStroika_Framework_WebServer_Connection_DetailedMessagingLog
69 const function<
void (
const String&)>& logMsg
73#if qStroika_Framework_WebServer_Connection_DetailedMessagingLog
74 logMsg (
"Starting ReadHeaders_"sv);
80 if (not fMsgHeaderInTextStream.AssureHeaderSectionAvailable ()) {
81#if qStroika_Framework_WebServer_Connection_DetailedMessagingLog
82 logMsg (
"got fMsgHeaderInTextStream.AssureHeaderSectionAvailable INCOMPLETE"sv);
84 if (fMsgHeaderInTextStream.IsAtEOF (Streams::eDontBlock) ==
true) {
85 return ReadHeadersResult::eIncompleteDeadEnd;
87 return ReadHeadersResult::eIncompleteButMoreMayBeAvailable;
93 Request& updatableRequest = this->rwRequest ();
96 String line = fMsgHeaderInTextStream.ReadLine ();
98#if qStroika_Framework_WebServer_Connection_DetailedMessagingLog
99 logMsg (
"got EOF from src stream reading headers(incomplete)"sv);
101 return ReadHeadersResult::eIncompleteDeadEnd;
104 if (tokens.size () < 3) {
105 DbgTrace (
"tokens={}, line='{}', fMsgHeaderInTextStream={}"_f, tokens, line, fMsgHeaderInTextStream.ToString ());
110 if (tokens[1].empty ()) {
112 DbgTrace (
"tokens={}, line='{}'"_f, tokens, line);
116 if (updatableRequest.
httpMethod ().empty ()) {
118 DbgTrace (
"tokens={}, line='{}'"_f, tokens, line);
124 static const String kCRLF_{
"\r\n"sv};
125 String line = fMsgHeaderInTextStream.ReadLine ();
126 if (line == kCRLF_ or line.empty ()) {
131 size_t i = line.
find (
':');
132 if (i == string::npos) {
139 updatableRequest.
rwHeaders ().Add (hdr, value);
142#if qStroika_Framework_WebServer_Connection_DetailedMessagingLog
143 logMsg (
"ReadHeaders completed normally"sv);
145 return ReadHeadersResult::eCompleteGood;
167#if qStroika_Framework_WebServer_Connection_TrackExtraStats
171 sb <<
", state: " << fState;
172 if (fMostRecentMessage) {
173 sb <<
", mostRecentMessage: " << *fMostRecentMessage;
175 if (fHandlingThread) {
177 sb <<
", handlingThread: " << fHandlingThread;
180 sb <<
", thread: " << fHandlingThread;
200 const optional<Headers>& defaultGETResponseHeaders,
const optional<bool> autoComputeETagResponse)
201 :
Connection{s, Options{.fInterceptorChain = interceptorChain,
202 .fDefaultResponseHeaders = defaultResponseHeaders,
203 .fDefaultGETResponseHeaders = defaultGETResponseHeaders,
204 .fAutoComputeETagResponse = autoComputeETagResponse}}
209 :
socket{[qStroika_Foundation_Common_Property_ExtraCaptureStuff] ([[maybe_unused]] const auto* property) -> ConnectionOrientedStreamSocket::Ptr {
211 AssertExternallySynchronizedMutex::ReadContext declareContext{*thisObj};
212 return thisObj->fSocket_;
214 ,
request{[qStroika_Foundation_Common_Property_ExtraCaptureStuff] ([[maybe_unused]]
const auto* property) ->
const Request& {
216 AssertExternallySynchronizedMutex::ReadContext declareContext{*thisObj};
217 return thisObj->fMessage_->request ();
219 ,
response{[qStroika_Foundation_Common_Property_ExtraCaptureStuff] ([[maybe_unused]]
const auto* property) ->
const Response& {
221 AssertExternallySynchronizedMutex::ReadContext declareContext{*thisObj};
222 return thisObj->fMessage_->response ();
224 ,
rwResponse{[qStroika_Foundation_Common_Property_ExtraCaptureStuff] ([[maybe_unused]]
const auto* property) -> Response& {
226 AssertExternallySynchronizedMutex::WriteContext declareContext{*thisObj};
227 return thisObj->fMessage_->rwResponse ();
229 ,
stats{[qStroika_Foundation_Common_Property_ExtraCaptureStuff] ([[maybe_unused]]
const auto* property) -> Stats {
235#if qStroika_Framework_WebServer_Connection_TrackExtraStats
236 Stats2Capture_ statsCapturedDuringMessageProcessing = thisObj->fExtraStats_.
load ();
238 State state = [thisObj] () {
239 switch (thisObj->fState_) {
240 case State_Flag_::eNew:
242 case State_Flag_::eReadingHeaders_Started:
243 return State::eReadingHeaders;
244 case State_Flag_::eFinishedReadingHeaders_Success:
245 return State::eProcessingInterceptorChain;
246 case State_Flag_::eFinishedReadingHeaders_Incomplete:
247 return State::ePausedIncompleteHeaders;
248 case State_Flag_::eFinishedReadingHeaders_Failed:
249 return State::eClosing;
250 case State_Flag_::eInterceptorChain_Start:
251 return State::eProcessingInterceptorChain;
252 case State_Flag_::eInterceptorChain_Complete:
253 return State::eFlushing;
254 case State_Flag_::eFlushing_Start:
255 return State::eFlushing;
256 case State_Flag_::eFlushing_Done:
257 if (thisObj->fKeepAlive_) {
258 return State::eReadyForNextMessage;
261 return State::eClosing;
263 case State_Flag_::eAborting:
264 return State::eClosing;
272 .fSocketID = uniqueID,
273 .fCreatedAt = createdAt,
274#if qStroika_Framework_WebServer_Connection_TrackExtraStats
275 .fReadAndProcessMessageNumber = thisObj->fReadAndProcessMessageNumber_,
277 .fMostRecentMessage = statsCapturedDuringMessageProcessing.fMessageStart
279 statsCapturedDuringMessageProcessing.fMessageCompleted}
280 : optional<Range<TimePointSeconds>>{},
281 .fHandlingThread = statsCapturedDuringMessageProcessing.fHandlingThread,
282 .fRemotePeerAddress = statsCapturedDuringMessageProcessing.fPeer,
283 .fRequestWebMethod = statsCapturedDuringMessageProcessing.fWebMethod,
284 .fRequestURI = statsCapturedDuringMessageProcessing.fRequestURI,
290 [qStroika_Foundation_Common_Property_ExtraCaptureStuff] ([[maybe_unused]]
const auto* property) -> optional<HTTP::KeepAlive> {
292 AssertExternallySynchronizedMutex::ReadContext declareContext{*thisObj};
293 return thisObj->fRemaining_;
295 [qStroika_Foundation_Common_Property_ExtraCaptureStuff] ([[maybe_unused]]
auto* property,
const auto&
remainingConnectionLimits) {
297 AssertExternallySynchronizedMutex::WriteContext declareContext{*thisObj};
300 , fInterceptorChain_{options.fInterceptorChain}
301 , fDefaultResponseHeaders_{options.fDefaultResponseHeaders}
302 , fDefaultGETResponseHeaders_{options.fDefaultGETResponseHeaders}
303 , fAutoComputeETagResponse_{options.fAutoComputeETagResponse}
304 , fSupportedCompressionEncodings_{options.fSupportedCompressionEncodings}
306 , fConnectionStartedAt_{Time::GetTickCount ()}
308 Require (s !=
nullptr);
309#if USE_NOISY_TRACE_IN_THIS_MODULE_
310 DbgTrace (
"Created connection for socket {}"_f, s);
312 fSocketStream_ = SocketStream::New (fSocket_);
313#if qStroika_Framework_WebServer_Connection_DetailedMessagingLog
315 String socketName =
"{}-{}"_f((
long)DateTime::Now ().As<time_t> (), (
int)s.GetNativeSocket ());
316 fSocketStream_ = Streams::LoggingInputOutputStream<byte>::New (
322 Streams::TextToBinary::Writer::Format::eUTF8WithoutBOM);
327Connection::~Connection ()
329#if USE_NOISY_TRACE_IN_THIS_MODULE_
330 DbgTrace (
"Destroying connection for socket {}, message={}"_f, fSocket_,
static_cast<const void*
> (fMessage_.get ()));
332 AssertExternallySynchronizedMutex::WriteContext declareContext{*
this};
333#if qStroika_Framework_WebServer_Connection_DetailedMessagingLog
334 WriteLogConnectionMsg_ (L
"DestroyingConnection");
336 if (fMessage_ !=
nullptr) {
337 if (not fMessage_->response ().responseCompleted ()) {
338 IgnoreExceptionsForCall (fMessage_->rwResponse ().Abort ());
340 Require (fMessage_->response ().responseCompleted ());
353 DbgTrace (
"Exception ignored closing socket: {}"_f, current_exception ());
359 AssertExternallySynchronizedMutex::WriteContext declareContext{*
this};
361#if USE_NOISY_TRACE_IN_THIS_MODULE_
364 fMessage_ = make_unique<MyMessage_> (fSocket_, fSocketStream_, fDefaultResponseHeaders_, fAutoComputeETagResponse_);
365#if qStroika_Foundation_Debug_AssertExternallySynchronizedMutex_Enabled
366 fMessage_->SetAssertExternallySynchronizedMutexContext (GetSharedContext ());
370 auto readHeaders = [&] () -> optional<ReadAndProcessResult> {
371#if qStroika_Framework_WebServer_Connection_TrackExtraStats
372 ++fReadAndProcessMessageNumber_;
373 fState_ = State_Flag_::eReadingHeaders_Started;
374 fExtraStats_.
store (Stats2Capture_{
375 .fMessageStart = Time::GetTickCount (), .fPeer = fSocket_.
GetPeerAddress (), .fHandlingThread = std::this_thread::get_id ()});
377 switch (fMessage_->ReadHeaders (
378#
if qStroika_Framework_WebServer_Connection_DetailedMessagingLog
379 [
this] (
const String& i) ->
void { WriteLogConnectionMsg_ (i); }
382 case MyMessage_::eIncompleteDeadEnd: {
383 DbgTrace (
"ReadHeaders failed (socket {}) - incomplete data read from client."_f,
385#if qStroika_Framework_WebServer_Connection_TrackExtraStats
386 fState_ = State_Flag_::eFinishedReadingHeaders_Failed;
390 case MyMessage_::eIncompleteButMoreMayBeAvailable: {
391 DbgTrace (
"ReadHeaders failed - incomplete header (most likely a DOS attack)."_f);
392#if qStroika_Framework_WebServer_Connection_TrackExtraStats
393 fState_ = State_Flag_::eFinishedReadingHeaders_Incomplete;
395 return ReadAndProcessResult::eTryAgainLater;
397 case MyMessage_::eCompleteGood: {
399#if qStroika_Framework_WebServer_Connection_TrackExtraStats
400 fState_ = State_Flag_::eFinishedReadingHeaders_Success;
410 if (
auto r = readHeaders ()) {
415#if qStroika_Foundation_Debug_AssertionsChecked
416 [[maybe_unused]]
auto&& cleanup =
Finally ([&] ()
noexcept { Ensure (fMessage_->response ().responseCompleted ()); });
419#if qStroika_Framework_WebServer_Connection_TrackExtraStats
420 [[maybe_unused]]
auto&& cleanup2 =
421 Finally ([&] ()
noexcept { fExtraStats_.
rwget ().rwref ().fMessageCompleted = Time::GetTickCount (); });
422 fExtraStats_.
store (Stats2Capture_{.fMessageStart = Time::GetTickCount (),
424 .fWebMethod = fMessage_->request ().httpMethod (),
425 .fRequestURI = fMessage_->request ().url (),
426 .fHandlingThread = std::this_thread::get_id ()});
429 auto applyDefaultsToResponseHeaders = [&] () ->
void {
430 if (fDefaultGETResponseHeaders_ and fMessage_->request ().httpMethod () == HTTP::Methods::kGet) {
431 fMessage_->rwResponse ().rwHeaders () += *fDefaultGETResponseHeaders_;
434 fMessage_->rwResponse ().rwHeaders ().date = DateTime::Now ();
439 if (optional<HTTP::ContentEncodings> acceptEncoding = fMessage_->request ().headers ().acceptEncoding) {
440 optional<HTTP::ContentEncodings> oBodyEncoding = fMessage_->rwResponse ().bodyEncoding ();
442 fMessage_->rwResponse ().bodyEncoding = [&] () {
444 auto bc = *oBodyEncoding;
445 bc += contentEncoding2Add;
449 return HTTP::ContentEncodings{contentEncoding2Add};
453 bool needBodyEncoding = not oBodyEncoding.has_value ();
456 if (needBodyEncoding and acceptEncoding->Contains (ce) and
457 (fSupportedCompressionEncodings_ == nullopt or fSupportedCompressionEncodings_->Contains (ce))) {
459 needBodyEncoding =
false;
462 if constexpr (DataExchange::Compression::Deflate::kSupported) {
465 if constexpr (DataExchange::Compression::GZip::kSupported) {
468 if constexpr (DataExchange::Compression::ZStd::kSupported) {
474 if (
auto requestedINoneMatch = this->
request ().headers ().ifNoneMatch ()) {
475 if (this->
response ().autoComputeETag ()) {
476 this->
rwResponse ().automaticTransferChunkSize =
477 Response::kNoChunkedTransfer;
481 applyDefaultsToResponseHeaders ();
486 auto applyKeepAliveLogic = [&] () ->
bool {
487 bool thisMessageKeepAlive = fMessage_->request ().keepAliveRequested;
488 if (thisMessageKeepAlive) {
492 if (
auto keepAliveValue = fMessage_->request ().headers ().keepAlive ()) {
497 if (oRemaining->fMessages) {
498 if (oRemaining->fMessages == 0u) {
499 thisMessageKeepAlive =
false;
502 oRemaining->fMessages = *oRemaining->fMessages - 1u;
505 if (oRemaining->fTimeout) {
506 if (fConnectionStartedAt_ + *oRemaining->fTimeout < Time::GetTickCount ()) {
507 thisMessageKeepAlive =
false;
514 this->
rwResponse ().rwHeaders ().connection = thisMessageKeepAlive ? Headers::eKeepAlive : Headers::eClose;
515 return thisMessageKeepAlive;
517 bool thisMessageKeepAlive = applyKeepAliveLogic ();
523 auto invokeInterceptorChain = [&] () {
524#if USE_NOISY_TRACE_IN_THIS_MODULE_
527#if qStroika_Framework_WebServer_Connection_TrackExtraStats
528 [[maybe_unused]]
auto&& cleanupFlags =
Finally ([&] ()
noexcept {
529 Assert (fState_ < State_Flag_::eInterceptorChain_Complete);
530 fState_ = State_Flag_::eInterceptorChain_Complete;
532 fState_ = State_Flag_::eInterceptorChain_Start;
534#if qStroika_Framework_WebServer_Connection_DetailedMessagingLog
535 WriteLogConnectionMsg_ (
"Handing request {} to interceptor chain"_f(
request ()));
541#if USE_NOISY_TRACE_IN_THIS_MODULE_
542 DbgTrace (
"Interceptor-Chain caught exception handling message: {}"_f, current_exception ());
544#if qStroika_Framework_WebServer_Connection_DetailedMessagingLog
545 WriteLogConnectionMsg_ (
"Interceptor-Chain caught exception handling message: {}"_f(current_exception ()));
549 invokeInterceptorChain ();
551 auto assureRequestFullyRead = [&] () {
552 if (thisMessageKeepAlive) {
565 if (
request ().headers ().contentLength ()) {
566#if qStroika_Framework_WebServer_Connection_DetailedMessagingLog
567 WriteLogConnectionMsg_ (L
"msg is keepalive, and have content length, so making sure we read all of request body");
569#if USE_NOISY_TRACE_IN_THIS_MODULE_
573 (void)fMessage_->rwRequest ().GetBody ();
577 assureRequestFullyRead ();
584#if qStroika_Framework_WebServer_Connection_TrackExtraStats
585 fState_ = State_Flag_::eFlushing_Start;
588 if (
auto requestedINoneMatch = this->
request ().headers ().ifNoneMatch ()) {
589 if (
auto actualETag = this->
response ().headers ().ETag ()) {
590 bool ctm = this->
response ().chunkedTransferMode ();
592 DbgTrace (
"Warning - disregarding ifNoneMatch request (though it matched) - cuz in chunked transfer mode"_f);
594 if (requestedINoneMatch->fETags.Contains (*actualETag) and not ctm) {
595 DbgTrace (
"Updating OK response to NotModified (due to ETag match)"_f);
596 this->
rwResponse ().status = HTTP::StatusCodes::kNotModified;
602 thisMessageKeepAlive =
false;
604#if qStroika_Framework_WebServer_Connection_TrackExtraStats
605 fKeepAlive_ = thisMessageKeepAlive;
606 fState_ = State_Flag_::eFlushing_Done;
608#if qStroika_Framework_WebServer_Connection_DetailedMessagingLog
609 WriteLogConnectionMsg_ (L
"Did GetResponse ().End ()");
613 return thisMessageKeepAlive ? eTryAgainLater : eClose;
616 DbgTrace (
"ReadAndProcessMessage Exception caught ({}), so returning ReadAndProcessResult::eClose"_f, current_exception ());
618#if qStroika_Framework_WebServer_Connection_TrackExtraStats
619 fState_ = State_Flag_::eAborting;
621 return Connection::ReadAndProcessResult::eClose;
625#if qStroika_Framework_WebServer_Connection_DetailedMessagingLog
626void Connection::WriteLogConnectionMsg_ (
const String& msg)
const
628 String useMsg = DateTime::Now ().Format () +
" -- "sv + msg.
Trim ();
629 fLogConnectionState_.WriteLn (useMsg);
635 AssertExternallySynchronizedMutex::ReadContext declareContext{*
this};
638 sb <<
"Socket: "sv << fSocket_;
639 if (not abbreviatedOutput) {
640 sb <<
", Message: "sv << fMessage_;
641 sb <<
", Remaining: "sv << fRemaining_;
643 sb <<
", Connection-Started-At: "sv << fConnectionStartedAt_;
#define AssertNotReached()
time_point< RealtimeClock, DurationSeconds > TimePointSeconds
TimePointSeconds is a simpler approach to chrono::time_point, which doesn't require using templates e...
Similar to String, but intended to more efficiently construct a String. Mutable type (String is large...
String is like std::u32string, except it is much easier to use, often much more space efficient,...
nonvirtual size_t length() const noexcept
nonvirtual String SubString(SZ from) const
nonvirtual String Trim(bool(*shouldBeTrimmed)(Character)=Character::IsWhitespace) const
nonvirtual Containers::Sequence< String > Tokenize() const
nonvirtual size_t find(Character c, size_t startAt=0) const
A generalization of a vector: a container whose elements are keyed by the natural numbers.
nonvirtual WritableReference rwget()
get a read-write smart pointer to the underlying Synchronized<> object, holding the full lock the who...
nonvirtual void store(const T &v)
nonvirtual T load() const
nonvirtual optional< IO::Network::SocketAddress > GetPeerAddress() const
nonvirtual void Close() const
ClientErrorException is to capture exceptions caused by a bad (e.g ill-formed) request.
Common::Property< String > httpMethod
typically HTTP::Methods::kGet
Common::Property< Headers & > rwHeaders
Common::Property< URI > url
Common::Property< String > httpVersion
nonvirtual PlatformNativeHandle GetNativeSocket() const
static URI ParseRelative(const String &rawRelativeURL)
A Connection object represents the state (and socket) for an ongoing, active, HTTP Connection,...
Common::Property< optional< HTTP::KeepAlive > > remainingConnectionLimits
const Common::ReadOnlyProperty< Stats > stats
retrieve stats about this connection, like threads used, start/end times. NB: INTERNALLY SYNCRONIZED
const Common::ReadOnlyProperty< ConnectionOrientedStreamSocket::Ptr > socket
nonvirtual ReadAndProcessResult ReadAndProcessMessage() noexcept
const Common::ReadOnlyProperty< const Request & > request
Common::ReadOnlyProperty< Response & > rwResponse
const Common::ReadOnlyProperty< const Response & > response
nonvirtual String ToString(bool abbreviatedOutput=true) const
nonvirtual void HandleMessage(Message &m) const
this represents a HTTP request object for the WebServer module
CONTAINER::value_type * End(CONTAINER &c)
For a contiguous container (such as a vector or basic_string) - find the pointer to the end of the co...
void Throw(T &&e2Throw)
identical to builtin C++ 'throw' except that it does helpful, type dependent DbgTrace() messages firs...
auto Finally(FUNCTION &&f) -> Private_::FinallySentry< FUNCTION >
filesystem::path GetTemporary()
constexpr bool IsOK(Status s)
several status codes considered OK, so check if it is among them
Ptr New(const Streams::OutputStream::Ptr< byte > &src, const Characters::CodeCvt<> &char2OutputConverter)
Content coding values indicate an encoding transformation that has been or can be applied to an entit...
static const ContentEncoding kDeflate
static const ContentEncoding kZStd
probably fastest/best, but NYI in Stroika as of 2024-06-20
static const ContentEncoding kGZip
optional< URI > fRequestURI
last requested URI (always relative uri)
State
Experimental state information - dont count on details or names. Subject to change....
nonvirtual String ToString() const
Socket::PlatformNativeHandle fSocketID
optional< SocketAddress > fRemotePeerAddress
the address of the client which is talking to the server
TimePointSeconds fCreatedAt
optional< String > fRequestWebMethod
last request
unsigned int fReadAndProcessMessageNumber