4#include "Stroika/Frameworks/StroikaPreComp.h"
9#include "Stroika/Foundation/Characters/FloatConversion.h"
11#include "Stroika/Foundation/Characters/String2Int.h"
13#include "Stroika/Foundation/Containers/Common.h"
14#include "Stroika/Foundation/DataExchange/BadFormatException.h"
19#include "Stroika/Foundation/Execution/Throw.h"
22#include "Stroika/Foundation/IO/Network/HTTP/ClientErrorException.h"
23#include "Stroika/Foundation/IO/Network/HTTP/Headers.h"
25#include "Stroika/Foundation/IO/Network/HTTP/Methods.h"
29#include "Connection.h"
37using namespace Stroika::Foundation::Memory;
38using namespace Stroika::Foundation::Traversal;
39using namespace Stroika::Foundation::Time;
41using namespace Stroika::Frameworks;
57 const Headers& defaultResponseHeaders,
const optional<bool> autoComputeETagResponse)
58 :
Message{
Request{socketStream}, Response{socket, socketStream, defaultResponseHeaders}, socket.GetPeerAddress ()}
59 , fMsgHeaderInTextStream{HTTP::MessageStartTextInputStreamBinaryAdapter::New (rwRequest ().GetInputStream ())}
61 if (autoComputeETagResponse) {
62 this->rwResponse ().autoComputeETag = *autoComputeETagResponse;
66Connection::MyMessage_::ReadHeadersResult Connection::MyMessage_::ReadHeaders (
67#
if qStroika_Framework_WebServer_Connection_DetailedMessagingLog
68 const function<
void (
const String&)>& logMsg
72#if qStroika_Framework_WebServer_Connection_DetailedMessagingLog
73 logMsg (
"Starting ReadHeaders_"sv);
79 if (not fMsgHeaderInTextStream.AssureHeaderSectionAvailable ()) {
80#if qStroika_Framework_WebServer_Connection_DetailedMessagingLog
81 logMsg (
"got fMsgHeaderInTextStream.AssureHeaderSectionAvailable INCOMPLETE"sv);
83 return ReadHeadersResult::eIncompleteButMoreMayBeAvailable;
89 Request& updatableRequest = this->rwRequest ();
92 String line = fMsgHeaderInTextStream.ReadLine ();
94#if qStroika_Framework_WebServer_Connection_DetailedMessagingLog
95 logMsg (
"got EOF from src stream reading headers(incomplete)"sv);
97 return ReadHeadersResult::eIncompleteDeadEnd;
100 if (tokens.size () < 3) {
101 DbgTrace (
"tokens={}, line='{}', fMsgHeaderInTextStream={}"_f, tokens, line, fMsgHeaderInTextStream.ToString ());
106 if (tokens[1].empty ()) {
108 DbgTrace (
"tokens={}, line='{}'"_f, tokens, line);
112 if (updatableRequest.
httpMethod ().empty ()) {
114 DbgTrace (
"tokens={}, line='{}'"_f, tokens, line);
120 static const String kCRLF_{
"\r\n"sv};
121 String line = fMsgHeaderInTextStream.ReadLine ();
122 if (line == kCRLF_ or line.empty ()) {
127 size_t i = line.
find (
':');
128 if (i == string::npos) {
135 updatableRequest.
rwHeaders ().Add (hdr, value);
138#if qStroika_Framework_WebServer_Connection_DetailedMessagingLog
139 logMsg (
"ReadHeaders completed normally"sv);
141 return ReadHeadersResult::eCompleteGood;
163#if qStroika_Framework_WebServer_Connection_TrackExtraStats
164 if (fMostRecentMessage) {
165 sb <<
", mostRecentMessage: " << *fMostRecentMessage;
167 if (fHandlingThread) {
169 sb <<
", handlingThread: " << fHandlingThread;
172 sb <<
", thread: " << fHandlingThread;
192 const optional<Headers>& defaultGETResponseHeaders,
const optional<bool> autoComputeETagResponse)
193 :
Connection{s, Options{.fInterceptorChain = interceptorChain,
194 .fDefaultResponseHeaders = defaultResponseHeaders,
195 .fDefaultGETResponseHeaders = defaultGETResponseHeaders,
196 .fAutoComputeETagResponse = autoComputeETagResponse}}
201 :
socket{[qStroika_Foundation_Common_Property_ExtraCaptureStuff] ([[maybe_unused]] const auto* property) -> ConnectionOrientedStreamSocket::Ptr {
203 AssertExternallySynchronizedMutex::ReadContext declareContext{*thisObj};
204 return thisObj->fSocket_;
206 ,
request{[qStroika_Foundation_Common_Property_ExtraCaptureStuff] ([[maybe_unused]]
const auto* property) ->
const Request& {
208 AssertExternallySynchronizedMutex::ReadContext declareContext{*thisObj};
209 return thisObj->fMessage_->request ();
211 ,
response{[qStroika_Foundation_Common_Property_ExtraCaptureStuff] ([[maybe_unused]]
const auto* property) ->
const Response& {
213 AssertExternallySynchronizedMutex::ReadContext declareContext{*thisObj};
214 return thisObj->fMessage_->response ();
216 ,
rwResponse{[qStroika_Foundation_Common_Property_ExtraCaptureStuff] ([[maybe_unused]]
const auto* property) -> Response& {
218 AssertExternallySynchronizedMutex::WriteContext declareContext{*thisObj};
219 return thisObj->fMessage_->rwResponse ();
221 ,
stats{[qStroika_Foundation_Common_Property_ExtraCaptureStuff] ([[maybe_unused]]
const auto* property) -> Stats {
226#if qStroika_Framework_WebServer_Connection_TrackExtraStats
227 Stats2Capture_ statsCapturedDuringMessageProcessing = thisObj->fExtraStats_.
load ();
230 .fSocketID = uniqueID,
231 .fCreatedAt = createdAt,
232#if qStroika_Framework_WebServer_Connection_TrackExtraStats
233 .fMostRecentMessage = statsCapturedDuringMessageProcessing.fMessageStart
235 statsCapturedDuringMessageProcessing.fMessageCompleted}
236 : optional<Range<TimePointSeconds>>{},
237 .fHandlingThread = statsCapturedDuringMessageProcessing.fHandlingThread,
238 .fRemotePeerAddress = statsCapturedDuringMessageProcessing.fPeer,
239 .fRequestWebMethod = statsCapturedDuringMessageProcessing.fWebMethod,
240 .fRequestURI = statsCapturedDuringMessageProcessing.fRequestURI,
246 [qStroika_Foundation_Common_Property_ExtraCaptureStuff] ([[maybe_unused]]
const auto* property) -> optional<HTTP::KeepAlive> {
248 AssertExternallySynchronizedMutex::ReadContext declareContext{*thisObj};
249 return thisObj->fRemaining_;
251 [qStroika_Foundation_Common_Property_ExtraCaptureStuff] ([[maybe_unused]]
auto* property,
const auto&
remainingConnectionLimits) {
253 AssertExternallySynchronizedMutex::WriteContext declareContext{*thisObj};
256 , fInterceptorChain_{options.fInterceptorChain}
257 , fDefaultResponseHeaders_{options.fDefaultResponseHeaders}
258 , fDefaultGETResponseHeaders_{options.fDefaultGETResponseHeaders}
259 , fAutoComputeETagResponse_{options.fAutoComputeETagResponse}
260 , fSupportedCompressionEncodings_{options.fSupportedCompressionEncodings}
262 , fConnectionStartedAt_{Time::GetTickCount ()}
264 Require (s !=
nullptr);
265#if USE_NOISY_TRACE_IN_THIS_MODULE_
266 DbgTrace (
"Created connection for socket {}"_f, s);
268 fSocketStream_ = SocketStream::New (fSocket_);
269#if qStroika_Framework_WebServer_Connection_DetailedMessagingLog
271 String socketName =
"{}-{}"_f((
long)DateTime::Now ().As<time_t> (), (
int)s.GetNativeSocket ());
272 fSocketStream_ = Streams::LoggingInputOutputStream<byte>::New (
276 fLogConnectionState_ = Streams::TextToBinary::Writer::New (
278 Streams::TextToBinary::Writer::Format::eUTF8WithoutBOM);
283Connection::~Connection ()
285#if USE_NOISY_TRACE_IN_THIS_MODULE_
286 DbgTrace (
"Destroying connection for socket {}, message={}"_f, fSocket_,
static_cast<const void*
> (fMessage_.get ()));
288 AssertExternallySynchronizedMutex::WriteContext declareContext{*
this};
289#if qStroika_Framework_WebServer_Connection_DetailedMessagingLog
290 WriteLogConnectionMsg_ (L
"DestroyingConnection");
292 if (fMessage_ !=
nullptr) {
293 if (not fMessage_->response ().responseCompleted ()) {
294 IgnoreExceptionsForCall (fMessage_->rwResponse ().Abort ());
296 Require (fMessage_->response ().responseCompleted ());
309 DbgTrace (
"Exception ignored closing socket: {}"_f, current_exception ());
315 AssertExternallySynchronizedMutex::WriteContext declareContext{*
this};
317#if USE_NOISY_TRACE_IN_THIS_MODULE_
320 fMessage_ = make_unique<MyMessage_> (fSocket_, fSocketStream_, fDefaultResponseHeaders_, fAutoComputeETagResponse_);
321#if qStroika_Foundation_Debug_AssertExternallySynchronizedMutex_Enabled
322 fMessage_->SetAssertExternallySynchronizedMutexContext (GetSharedContext ());
325 auto readHeaders = [&] () -> optional<ReadAndProcessResult> {
326 switch (fMessage_->ReadHeaders (
327#
if qStroika_Framework_WebServer_Connection_DetailedMessagingLog
328 [
this] (
const String& i) ->
void { WriteLogConnectionMsg_ (i); }
331 case MyMessage_::eIncompleteDeadEnd: {
332 DbgTrace (
"ReadHeaders failed - typically because the client closed the connection before we could handle it (e.g. "
333 "in web browser hitting refresh button fast)."_f);
336 case MyMessage_::eIncompleteButMoreMayBeAvailable: {
337 DbgTrace (
"ReadHeaders failed - incomplete header (most likely a DOS attack)."_f);
338 return ReadAndProcessResult::eTryAgainLater;
340 case MyMessage_::eCompleteGood: {
350 if (
auto r = readHeaders ()) {
355#if qStroika_Foundation_Debug_AssertionsChecked
356 [[maybe_unused]]
auto&& cleanup =
Finally ([&] ()
noexcept { Ensure (fMessage_->response ().responseCompleted ()); });
359#if qStroika_Framework_WebServer_Connection_TrackExtraStats
360 [[maybe_unused]]
auto&& cleanup2 =
361 Finally ([&] ()
noexcept { fExtraStats_.
rwget ().rwref ().fMessageCompleted = Time::GetTickCount (); });
362 fExtraStats_.
store (Stats2Capture_{.fMessageStart = Time::GetTickCount (),
364 .fWebMethod = fMessage_->request ().httpMethod (),
365 .fRequestURI = fMessage_->request ().url (),
366 .fHandlingThread = std::this_thread::get_id ()});
369 auto applyDefaultsToResponseHeaders = [&] () ->
void {
370 if (fDefaultGETResponseHeaders_ and fMessage_->request ().httpMethod () == HTTP::Methods::kGet) {
371 fMessage_->rwResponse ().rwHeaders () += *fDefaultGETResponseHeaders_;
374 fMessage_->rwResponse ().rwHeaders ().date = DateTime::Now ();
379 if (optional<HTTP::ContentEncodings> acceptEncoding = fMessage_->request ().headers ().acceptEncoding) {
380 optional<HTTP::ContentEncodings> oBodyEncoding = fMessage_->rwResponse ().bodyEncoding ();
382 fMessage_->rwResponse ().bodyEncoding = [&] () {
384 auto bc = *oBodyEncoding;
385 bc += contentEncoding2Add;
389 return HTTP::ContentEncodings{contentEncoding2Add};
393 bool needBodyEncoding = not oBodyEncoding.has_value ();
396 if (needBodyEncoding and acceptEncoding->Contains (ce) and
397 (fSupportedCompressionEncodings_ == nullopt or fSupportedCompressionEncodings_->Contains (ce))) {
399 needBodyEncoding =
false;
402 if constexpr (DataExchange::Compression::Deflate::kSupported) {
405 if constexpr (DataExchange::Compression::GZip::kSupported) {
411 if (
auto requestedINoneMatch = this->
request ().headers ().ifNoneMatch ()) {
412 if (this->
response ().autoComputeETag ()) {
413 this->
rwResponse ().automaticTransferChunkSize =
414 Response::kNoChunkedTransfer;
418 applyDefaultsToResponseHeaders ();
423 auto applyKeepAliveLogic = [&] () ->
bool {
424 bool thisMessageKeepAlive = fMessage_->request ().keepAliveRequested;
425 if (thisMessageKeepAlive) {
429 if (
auto keepAliveValue = fMessage_->request ().headers ().keepAlive ()) {
434 if (oRemaining->fMessages) {
435 if (oRemaining->fMessages == 0u) {
436 thisMessageKeepAlive =
false;
439 oRemaining->fMessages = *oRemaining->fMessages - 1u;
442 if (oRemaining->fTimeout) {
443 if (fConnectionStartedAt_ + *oRemaining->fTimeout < Time::GetTickCount ()) {
444 thisMessageKeepAlive =
false;
451 this->
rwResponse ().rwHeaders ().connection = thisMessageKeepAlive ? Headers::eKeepAlive : Headers::eClose;
452 return thisMessageKeepAlive;
454 bool thisMessageKeepAlive = applyKeepAliveLogic ();
460 auto invokeInterceptorChain = [&] () {
461#if USE_NOISY_TRACE_IN_THIS_MODULE_
464#if qStroika_Framework_WebServer_Connection_DetailedMessagingLog
465 WriteLogConnectionMsg_ (
"Handing request {} to interceptor chain"_f(
request ()));
471#if USE_NOISY_TRACE_IN_THIS_MODULE_
472 DbgTrace (
"Interceptor-Chain caught exception handling message: {}"_f, current_exception ());
474#if qStroika_Framework_WebServer_Connection_DetailedMessagingLog
475 WriteLogConnectionMsg_ (
"Interceptor-Chain caught exception handling message: {}"_f(current_exception ()));
479 invokeInterceptorChain ();
481 auto assureRequestFullyRead = [&] () {
482 if (thisMessageKeepAlive) {
495 if (
request ().headers ().contentLength ()) {
496#if qStroika_Framework_WebServer_Connection_DetailedMessagingLog
497 WriteLogConnectionMsg_ (L
"msg is keepalive, and have content length, so making sure we read all of request body");
499#if USE_NOISY_TRACE_IN_THIS_MODULE_
503 (void)fMessage_->rwRequest ().GetBody ();
507 assureRequestFullyRead ();
513 auto completeResponse = [&] () {
515 if (
auto requestedINoneMatch = this->
request ().headers ().ifNoneMatch ()) {
516 if (
auto actualETag = this->
response ().headers ().ETag ()) {
517 bool ctm = this->
response ().chunkedTransferMode ();
519 DbgTrace (
"Warning - disregarding ifNoneMatch request (though it matched) - cuz in chunked transfer mode"_f);
521 if (requestedINoneMatch->fETags.Contains (*actualETag) and not ctm) {
522 DbgTrace (
"Updating OK response to NotModified (due to ETag match)"_f);
523 this->
rwResponse ().status = HTTP::StatusCodes::kNotModified;
529 thisMessageKeepAlive =
false;
531#if qStroika_Framework_WebServer_Connection_DetailedMessagingLog
532 WriteLogConnectionMsg_ (L
"Did GetResponse ().End ()");
537 return thisMessageKeepAlive ? eTryAgainLater : eClose;
540 DbgTrace (
"ReadAndProcessMessage Exception caught ({}), so returning ReadAndProcessResult::eClose"_f, current_exception ());
542 return Connection::ReadAndProcessResult::eClose;
546#if qStroika_Framework_WebServer_Connection_DetailedMessagingLog
547void Connection::WriteLogConnectionMsg_ (
const String& msg)
const
549 String useMsg = DateTime::Now ().Format () +
" -- "sv + msg.
Trim ();
550 fLogConnectionState_.WriteLn (useMsg);
556 AssertExternallySynchronizedMutex::ReadContext declareContext{*
this};
559 sb <<
"Socket: "sv << fSocket_;
560 if (not abbreviatedOutput) {
561 sb <<
", Message: "sv << fMessage_;
562 sb <<
", Remaining: "sv << fRemaining_;
564 sb <<
", Connection-Started-At: "sv << fConnectionStartedAt_;
#define AssertNotReached()
time_point< RealtimeClock, DurationSeconds > TimePointSeconds
TimePointSeconds is a simpler approach to chrono::time_point, which doesn't require using templates e...
Similar to String, but intended to more efficiently construct a String. Mutable type (String is large...
String is like std::u32string, except it is much easier to use, often much more space efficient,...
nonvirtual size_t length() const noexcept
nonvirtual String SubString(SZ from) const
nonvirtual String Trim(bool(*shouldBeTrimmed)(Character)=Character::IsWhitespace) const
nonvirtual Containers::Sequence< String > Tokenize() const
nonvirtual size_t find(Character c, size_t startAt=0) const
A generalization of a vector: a container whose elements are keyed by the natural numbers.
nonvirtual WritableReference rwget()
get a read-write smart pointer to the underlying Synchronized<> object, holding the full lock the who...
nonvirtual void store(const T &v)
nonvirtual T load() const
nonvirtual optional< IO::Network::SocketAddress > GetPeerAddress() const
nonvirtual void Close() const
ClientErrorException is to capture exceptions caused by a bad (e.g ill-formed) request.
Common::Property< String > httpMethod
typically HTTP::Methods::kGet
Common::Property< Headers & > rwHeaders
Common::Property< URI > url
Common::Property< String > httpVersion
nonvirtual PlatformNativeHandle GetNativeSocket() const
static URI ParseRelative(const String &rawRelativeURL)
A Connection object represents the state (and socket) for an ongoing, active, HTTP Connection,...
Common::Property< optional< HTTP::KeepAlive > > remainingConnectionLimits
const Common::ReadOnlyProperty< Stats > stats
retrieve stats about this connection, like threads used, start/end times. NB: INTERNALLY SYNCRONIZED
const Common::ReadOnlyProperty< ConnectionOrientedStreamSocket::Ptr > socket
nonvirtual ReadAndProcessResult ReadAndProcessMessage() noexcept
const Common::ReadOnlyProperty< const Request & > request
Common::ReadOnlyProperty< Response & > rwResponse
const Common::ReadOnlyProperty< const Response & > response
nonvirtual String ToString(bool abbreviatedOutput=true) const
nonvirtual void HandleMessage(Message &m) const
this represents a HTTP request object for the WebServer module
CONTAINER::value_type * End(CONTAINER &c)
For a contiguous container (such as a vector or basic_string) - find the pointer to the end of the co...
void Throw(T &&e2Throw)
identical to builtin C++ 'throw' except that it does helpful, type dependent DbgTrace() messages firs...
auto Finally(FUNCTION &&f) -> Private_::FinallySentry< FUNCTION >
filesystem::path GetTemporary()
constexpr bool IsOK(Status s)
several status codes considered OK, so check if it is among them
Content coding values indicate an encoding transformation that has been or can be applied to an entit...
static const ContentEncoding kDeflate
static const ContentEncoding kGZip
optional< URI > fRequestURI
last requested URI (always relative uri)
nonvirtual String ToString() const
Socket::PlatformNativeHandle fSocketID
optional< SocketAddress > fRemotePeerAddress
the address of the client which is talking to the server
TimePointSeconds fCreatedAt
optional< String > fRequestWebMethod
last request