experiment with http stuff

seems to work!
This commit is contained in:
Lily Tsuru 2024-07-21 04:27:29 -04:00
parent 0379b729eb
commit 3c585b3a04
21 changed files with 623 additions and 49 deletions

3
.gitignore vendored
View file

@ -6,3 +6,6 @@
# on your own machine, please.
/speech2/build
/speech2/build-debug
# cmake tools is viciously unaware of subdirectories
/build

3
.gitmodules vendored
View file

@ -196,3 +196,6 @@
[submodule "speech2/third_party/boost/asio"]
path = speech2/third_party/boost/asio
url = https://github.com/boostorg/asio.git
[submodule "speech2/third_party/boost/beast"]
path = speech2/third_party/boost/beast
url = https://github.com/boostorg/beast.git

10
.vscode/settings.json vendored Normal file
View file

@ -0,0 +1,10 @@
{
"cmake.sourceDirectory": "${workspaceFolder}/speech2",
"cmake.configureArgs": [
"--toolchain cmake/clangcl-winxp.cmake"
],
"cmake.configureEnvironment": {
"VCDIR": "${env:HOME}/vs2022"
},
"cmake.ignoreCMakeListsMissing": true,
}

View file

@ -1,3 +1,5 @@
add_subdirectory(base)
add_subdirectory(impl)
add_subdirectory(sapi4)
@ -15,25 +17,23 @@ target_compile_definitions(sapiserver PRIVATE
# Need to force this on, since I think clang's msvc compatibility
# is deciding to set a wrong __cplusplus (like MSVC, so it's not *exactly* clang's fault).
# The best way to fix it would probably involve using clang-cl frontend and passing the option (I think.)
# (nevermind, it's just broken.)
-DBOOST_ASIO_HAS_STD_INVOKE_RESULT=1
# Disable the "helpful" auto-link Boost.Config tries to do. CMake already has a functional
# dependency graph, so we don't need it.
# dependency graph based on our input, so we don't need it.
-DBOOST_ALL_NO_LIB=1
)
target_link_libraries(sapiserver PRIVATE
# runtime libs
libc++
#libc++
# subprojects
speech2_sapi4
speech2::base
speech2::impl
speech2::api_sapi4
# SDK libraries
uuid.lib
ole32.lib
Boost::asio
Boost::coroutine
Boost::context
)

View file

@ -0,0 +1,14 @@
add_library(speech2_base STATIC
Thread.cpp
# Logging system
Logger.cpp
Logger_priv.cpp
StdoutSink.cpp
)
speech2_target(speech2_base)
add_library(speech2::base ALIAS speech2_base)

View file

@ -0,0 +1,41 @@
#include "Logger.hpp"
#include <base/Logger_priv.hpp>
#include <base/Thread.hpp>
namespace base {
inline auto& GlobalState() {
return logger_impl::LoggerGlobalState::The();
}
void LoggerAttachSink(LoggerSink& sink) {
GlobalState().AttachSink(sink);
}
MessageSeverity GetLogLevel() {
return GlobalState().GetLogLevel();
}
void SetLogLevel(MessageSeverity newLevel) {
GlobalState().SetLogLevel(newLevel);
}
Logger& Logger::Get(std::string_view key) {
return logger_impl::GetOrRegister(key);
}
Logger::Logger(ChannelId id)
: channelId(id) {
}
void Logger::VOut(MessageSeverity severity, std::string_view format, std::format_args args) {
logger_impl::MessageData data {
.time = std::chrono::system_clock::now(), .severity = severity, .channelId = channelId, .message = std::vformat(format, args)
};
// Push data into logger thread.
logger_impl::PushMessage(std::move(data));
}
} // namespace common

View file

@ -0,0 +1,75 @@
#pragma once
#include <base/Types.hpp>
#include <format>
#include <string_view>
namespace base {
/// A logger sink. Outputs messages to some device (a TTY), a file,
/// anything. A interface for the logger to spit stuff out.
///
/// # Notes
/// Sinks *do not* run on the main application thread. Instead, they run on a
/// single internal thread shared with the logging system.
/// Sinks should *not* block for large periods of time.
struct LoggerSink {
virtual void OutputMessage(std::string_view message) = 0;
};
enum class MessageSeverity { Debug, Info, Warning, Error, Fatal };
/// A channel ID. `enum class`es are used to avoid confusion with a normal u32,
/// and to also add addional interface type safety.
/// These are opaque, and only exposed here because it would be annoying to move elsewhere.
enum class ChannelId : u32 {};
/// Attach a sink to all Support loggers; allowing it to output logger messages.
void LoggerAttachSink(LoggerSink& sink);
MessageSeverity GetLogLevel();
void SetLogLevel(MessageSeverity newLevel);
/// An (asynchronous) logger.
struct Logger {
/// Gets or creates a logger for the specified channel.
static Logger& Get(std::string_view channel);
explicit Logger(ChannelId channel);
Logger(const Logger&) = delete;
Logger(Logger&&) = delete;
template <class... Args>
inline void Debug(std::string_view fmt, Args&&... args) {
VOut(MessageSeverity::Debug, fmt, std::make_format_args(args...));
}
template <class... Args>
inline void Info(std::string_view fmt, Args&&... args) {
VOut(MessageSeverity::Info, fmt, std::make_format_args(args...));
}
template <class... Args>
inline void Warning(std::string_view fmt, Args&&... args) {
VOut(MessageSeverity::Warning, fmt, std::make_format_args(args...));
}
template <class... Args>
inline void Error(std::string_view fmt, Args&&... args) {
VOut(MessageSeverity::Error, fmt, std::make_format_args(args...));
}
template <class... Args>
inline void Fatal(std::string_view fmt, Args&&... args) {
VOut(MessageSeverity::Fatal, fmt, std::make_format_args(args...));
}
private:
void VOut(MessageSeverity severity, std::string_view format, std::format_args args);
ChannelId channelId;
};
} // namespace collabvm

View file

@ -0,0 +1,170 @@
#include <base/Logger_priv.hpp>
#include <base/Thread.hpp>
#include <chrono>
#include <condition_variable>
#include <cstddef>
#include <deque>
#include <mutex>
#include <queue>
#include <string_view>
#include <thread>
namespace base::logger_impl {
static constexpr std::string_view SeverityToString(MessageSeverity sev) {
// This must match order of MessageSeverity.
const char* MessageSeverityStringTable[] = { "Debug", "Info", "Warn", "Error", "Fatal" };
return MessageSeverityStringTable[static_cast<std::size_t>(sev)];
}
/// Hash algorithm for channel IDs. In this case it's PJW-ELF.
/// I might switch to murmur or something if collisions are a problem,
/// but I don't think it's a problem.
ChannelId ChannelIDHash(const char* in) {
u32 hash = 0;
u32 high = 0;
while(*in) {
hash = (hash << 4) + *in++;
if((high = hash & 0xf0000000))
hash ^= high >> 23;
hash &= ~high;
}
return static_cast<ChannelId>(hash);
}
std::string_view ChannelToString(ChannelId id) {
auto& gs = LoggerGlobalState::The();
std::unique_lock lk(gs.loggerMapLock);
return gs.loggerMap[id].channelName;
}
/// comparator for [std::priority_queue]
struct LogMessageComparator {
constexpr bool operator()(const MessageData& mdLeft, const MessageData& mdRight) { return mdLeft.time > mdRight.time; }
};
struct LoggerThreadData {
// Logger thread stuff
std::thread loggerThread;
std::mutex logQueueMutex;
std::condition_variable logQueueCv;
std::priority_queue<MessageData, std::deque<MessageData>, LogMessageComparator> logQueue;
// could be an atomic_bool
bool logThreadShutdown = false;
bool ShouldUnblock() {
// N.B: ALL calls of this hold the lock.
// Always unblock if the logger thread needs to be shut down.
if(logThreadShutdown)
return true;
return !logQueue.empty();
}
void PushMessage(MessageData&& md) {
{
std::unique_lock lk(logQueueMutex);
logQueue.push(std::move(md));
}
logQueueCv.notify_one();
}
/// This thread drives the logging system.
static void LoggerThread(LoggerThreadData* self) {
// Fancy thread names.
SetThreadName("AsyncLogger");
auto& state = LoggerGlobalState::The();
while(true) {
// Shutdown if requested.
if(self->logThreadShutdown)
break;
{
std::unique_lock lk(self->logQueueMutex);
if(self->logQueue.empty()) {
// Await for messages.
self->logQueueCv.wait(lk, [self]() { return self->ShouldUnblock(); });
}
}
{
std::unique_lock lk(self->logQueueMutex);
// Flush the logger queue until there are no more messages.
while(!self->logQueue.empty()) {
const auto& msg = self->logQueue.top();
state.OutputMessage(msg);
self->logQueue.pop();
}
}
}
}
};
Unique<LoggerThreadData> threadData;
LoggerGlobalState& LoggerGlobalState::The() {
static LoggerGlobalState storage;
return storage;
}
LoggerGlobalState::LoggerGlobalState() {
// Spawn the logger thread
threadData = std::make_unique<LoggerThreadData>();
threadData->loggerThread = std::thread(&LoggerThreadData::LoggerThread, threadData.get());
}
LoggerGlobalState::~LoggerGlobalState() {
// Shut down the logger thread
threadData->logThreadShutdown = true;
threadData->logQueueCv.notify_all();
threadData->loggerThread.join();
}
void LoggerGlobalState::AttachSink(LoggerSink& sink) {
sinks.push_back(&sink);
}
void LoggerGlobalState::OutputMessage(const MessageData& data) {
// give up early if no sinks are attached
if(sinks.empty())
return;
if(data.severity < logLevel)
return;
auto formattedLoggerMessage = std::format("[{:%F %H:%M:%S}|{}|{}] {}", std::chrono::floor<std::chrono::milliseconds>(data.time),
SeverityToString(data.severity), ChannelToString(data.channelId), data.message);
for(auto sink : sinks)
sink->OutputMessage(formattedLoggerMessage);
}
Logger& GetOrRegister(std::string_view component) {
auto& gs = LoggerGlobalState::The();
std::unique_lock lk(gs.loggerMapLock);
auto hash = ChannelIDHash(component.data());
if(!gs.loggerMap.contains(hash)) {
// Insert a new entry into the logger map.
gs.loggerMap.insert_or_assign(hash, BoltedLoggerData { component, std::make_unique<Logger>(hash) });
}
return *gs.loggerMap[hash].logger.get();
}
void PushMessage(MessageData&& md) {
if(threadData)
threadData->PushMessage(std::move(md));
}
} // namespace base::logger_impl

View file

@ -0,0 +1,59 @@
#pragma once
#include <base/Logger.hpp>
#include <chrono>
#include <mutex>
#include <unordered_map>
#include <vector>
namespace base::logger_impl {
/// Message data. This is only used by logger sinks.
struct MessageData {
std::chrono::system_clock::time_point time;
MessageSeverity severity;
ChannelId channelId; // the channel ID.
std::string message; // DO NOT SET THIS, IT WILL BE OVERWRITTEN AND I WILL BE VERY SAD -lily
};
struct BoltedLoggerData {
std::string_view channelName;
Unique<Logger> logger;
};
/// Shared global state all loggers use.
struct LoggerGlobalState {
static LoggerGlobalState& The();
void AttachSink(LoggerSink& sink);
void OutputMessage(const MessageData& data);
/// Get the current log level.
MessageSeverity GetLogLevel() const { return logLevel; }
/// Set the current log level.
void SetLogLevel(MessageSeverity newLogLevel) { logLevel = newLogLevel; }
private:
LoggerGlobalState();
~LoggerGlobalState();
public:
std::vector<LoggerSink*> sinks;
MessageSeverity logLevel { MessageSeverity::Info };
std::unordered_map<ChannelId, BoltedLoggerData> loggerMap;
std::mutex loggerMapLock;
};
/// Gets or registers a new logger. This routine is threadsafe, and can be called
/// on any thread, like (most) parts of the logging system.
Logger& GetOrRegister(std::string_view component);
/// Push a logger message into the queue.
void PushMessage(MessageData&& md);
} // namespace base::logger_impl

View file

@ -0,0 +1,20 @@
#include <base/StdoutSink.hpp>
#include "Logger.hpp"
namespace base {
StdoutLoggerSink& StdoutLoggerSink::The() {
static StdoutLoggerSink sink;
return sink;
}
void StdoutLoggerSink::OutputMessage(std::string_view message) {
fputs(message.data(), stdout);
fputc('\n', stdout);
fflush(stdout);
}
void LoggerAttachStdout() {
LoggerAttachSink(StdoutLoggerSink::The());
}
} // namespace base

View file

@ -0,0 +1,17 @@
#pragma once
#include <base/Logger.hpp>
namespace base {
/// A logger sink implementation that prints to standard output.
struct StdoutLoggerSink : public LoggerSink {
static StdoutLoggerSink& The();
void OutputMessage(std::string_view message) override;
};
/// Attach the stdout logger sink to the global Lucore logger.
void LoggerAttachStdout();
} // namespace base

View file

@ -0,0 +1,11 @@
#include <pthread.h>
#include <base/Thread.hpp>
namespace base {
void SetThreadNameImpl(const char* name, usize len) {
//COMMON_ASSERT(len <= 15, "name will overflow pthread_setname_np() buffer");
pthread_setname_np(pthread_self(), name);
}
}

View file

@ -0,0 +1,15 @@
#pragma once
#include <base/Types.hpp>
#include <string>
namespace base {
void SetThreadNameImpl(const char* name, usize len);
/// Sets the name of the current thread. Mainly for portability.
inline void SetThreadName(const std::string& name) {
SetThreadNameImpl(name.data(), name.length());
}
} // namespace collabvm

View file

@ -0,0 +1,52 @@
#pragma once
#include <cstdint>
#include <memory>
using u8 = std::uint8_t;
using i8 = std::int8_t;
using u16 = std::uint16_t;
using i16 = std::int16_t;
using u32 = std::uint32_t;
using i32 = std::int32_t;
using u64 = std::uint64_t;
using i64 = std::int64_t;
using usize = std::size_t;
using isize = std::intptr_t;
namespace base {
/// A little ergonomic wrapper over
/// std::unique_ptr<T[]>, for a "kinda-vector"
/// that lives on the heap and is statically sized
template <class T>
struct UniqueArray final {
explicit UniqueArray(usize size)
: array(std::make_unique<T[]>(size)),
size(size) {
}
UniqueArray(UniqueArray&& move) {
array = std::move(move.array);
size = move.size;
// invalidate
move.array = nullptr;
move.size = 0;
}
T& operator[](usize index) { return Get()[index]; }
const T& operator[](usize index) const { return Get()[index]; }
T* Get() { return array.get(); }
const T* Get() const { return array.get(); }
usize Size() const { return size; }
private:
std::unique_ptr<T[]> array {};
usize size {};
};
template<class T>
using Unique = std::unique_ptr<T>;
} // namespace common

View file

@ -0,0 +1,35 @@
add_library(speech2_impl
asio_src.cpp
beast_src.cpp
)
speech2_target(speech2_impl)
target_compile_definitions(speech2_impl PUBLIC
# Need to force this on, since I think clang's msvc compatibility
# is deciding to set a wrong __cplusplus (like MSVC, so it's not *exactly* clang's fault).
# The best way to fix it would probably involve using clang-cl frontend and passing the option (I think.)
# (nevermind, it's just broken.)
-DBOOST_ASIO_HAS_STD_INVOKE_RESULT=1
# We compile all of these header-only libraries in separate .cpp source files
# to decrease build churn
-DBOOST_ASIO_SEPARATE_COMPILATION=1
-DBOOST_BEAST_SEPARATE_COMPILATION=1
# Disable deprecated functionality and some things which add additional dependencies or are
# simply baggage we aren't ever going to use
-DBOOST_ASIO_NO_DEPRECATED=1
-DBOOST_ASIO_DISABLE_BOOST_ARRAY=1
-DBOOST_ASIO_DISABLE_BOOST_BIND=1
)
target_link_libraries(speech2_impl PUBLIC
Boost::asio
Boost::beast
Boost::coroutine
Boost::context
)
add_library(speech2::impl ALIAS speech2_impl)

View file

@ -0,0 +1,5 @@
// Since we're using (BOOST_)ASIO_SEPARATE_COMPILATION, we need
// to include the <(boost/)asio/impl/src.hpp> header in some TU.
// We use this one to explicitly do so.
#include <boost/asio/impl/src.hpp>

View file

@ -0,0 +1 @@
#include <boost/beast/src.hpp>

View file

@ -6,70 +6,106 @@
#include <boost/asio/spawn.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/asio/write.hpp>
#include <boost/beast/core/basic_stream.hpp>
#include <boost/beast/core/flat_buffer.hpp>
#include <boost/beast/http.hpp>
#include <boost/beast/http/message_generator.hpp>
#include <boost/beast/http/read.hpp>
#include <boost/beast/http/string_body.hpp>
#include <boost/beast/http/write.hpp>
#include <format>
#include <iostream>
#include <memory>
#include "base/Logger.hpp"
#include "base/StdoutSink.hpp"
#include "speechapi.hpp"
using boost::asio::ip::tcp;
namespace net = boost::asio;
namespace beast = boost::beast;
namespace bhttp = beast::http;
using net::ip::tcp;
auto coresv_to_cxx(boost::core::string_view sv) {
return std::string_view { sv.data(), sv.length() };
}
// A test coro
void test_out_of_line_coro(net::any_io_executor ioc,net::yield_context yc) {
net::steady_timer t{ioc};
t.expires_after(std::chrono::seconds(5));
t.async_wait(yc);
}
class session : public std::enable_shared_from_this<session> {
public:
explicit session(boost::asio::io_context& io_context, tcp::socket socket)
: socket_(std::move(socket)), timer_(io_context), strand_(io_context.get_executor()) {}
explicit session(net::io_context& io_context, beast::basic_stream<net::ip::tcp> socket)
: socket_(std::move(socket)), strand_(io_context.get_executor()) {}
void go() {
auto self(shared_from_this());
boost::asio::spawn(
net::spawn(
strand_,
[this, self](boost::asio::yield_context yield) {
[this, self](net::yield_context yield) {
try {
char data[128];
auto& logger = base::Logger::Get("HTTPSession");
bhttp::request<bhttp::string_body> req {};
beast::flat_buffer buffer {};
logger.Info("Wait test");
// mostly just to test if I can yield stuff from another member function.
// This seems to work, so /shrug
test_out_of_line_coro(socket_.get_executor(), yield);
logger.Info("Wait completed");
for(;;) {
timer_.expires_after(std::chrono::seconds(10));
std::size_t n = socket_.async_read_some(boost::asio::buffer(data), yield);
boost::asio::async_write(socket_, boost::asio::buffer(data, n), yield);
socket_.expires_after(std::chrono::seconds(10));
bhttp::async_read(socket_, buffer, req, yield);
auto const routeTest = [&]() -> bhttp::message_generator {
bhttp::response<bhttp::string_body> resp { bhttp::status::bad_request, req.version() };
resp.set(bhttp::field::server, "Fucker Google/1.0");
resp.set(bhttp::field::content_type, "text/plain");
resp.keep_alive(false);
resp.body() = std::format("You requested \"{} {}\"", coresv_to_cxx(req.method_string()), coresv_to_cxx(req.target()));
resp.prepare_payload();
return resp;
};
logger.Info("HTTP request to {}", coresv_to_cxx(req.target()));
auto res = routeTest();
socket_.expires_after(std::chrono::seconds(10));
beast::async_write(socket_, std::move(res), yield);
if(!req.keep_alive()) {
socket_.close();
return;
}
}
} catch(std::exception& e) {
socket_.close();
timer_.cancel();
}
},
boost::asio::detached);
boost::asio::spawn(
strand_,
[this, self](boost::asio::yield_context yield) {
while(socket_.is_open()) {
boost::system::error_code ignored_ec;
timer_.async_wait(yield[ignored_ec]);
if(timer_.expiry() <= boost::asio::steady_timer::clock_type::now())
socket_.close();
}
},
boost::asio::detached);
net::detached);
}
private:
tcp::socket socket_;
boost::asio::steady_timer timer_;
boost::asio::strand<boost::asio::io_context::executor_type> strand_;
beast::basic_stream<net::ip::tcp> socket_;
net::strand<net::io_context::executor_type> strand_;
};
int main(int argc, char** argv) {
// CoInitialize(nullptr);
#if 0
boost::asio::io_context iocMain(1);
printf("inited io context\n");
iocMain.post([&]() {
printf("Hello from Boost.ASIO + C++20 on XP\n");
iocMain.stop();
});
iocMain.run();
#endif
base::LoggerAttachStdout();
try {
if(argc != 2) {
@ -77,19 +113,21 @@ int main(int argc, char** argv) {
return 1;
}
boost::asio::io_context io_context;
net::io_context io_context;
boost::asio::spawn(
net::spawn(
io_context,
[&](boost::asio::yield_context yield) {
[&](net::yield_context yield) {
tcp::acceptor acceptor(io_context, tcp::endpoint(tcp::v4(), std::atoi(argv[1])));
for(;;) {
boost::system::error_code ec;
tcp::socket socket(io_context);
acceptor.async_accept(socket, yield[ec]);
if(!ec) {
std::make_shared<session>(io_context, std::move(socket))->go();
auto stream = beast::basic_stream<net::ip::tcp>(std::move(socket));
std::make_shared<session>(io_context, std::move(stream))->go();
}
}
},

View file

@ -1,3 +1,4 @@
# SAPI4 layer for speech2
add_library(speech2_sapi4
api_sapi4.cpp
@ -6,3 +7,5 @@ add_library(speech2_sapi4
)
speech2_target(speech2_sapi4)
add_library(speech2::api_sapi4 ALIAS speech2_sapi4)

1
speech2/third_party/boost/beast vendored Submodule

@ -0,0 +1 @@
Subproject commit 98b8be489fa7a74753a724d8d0772d90bcbed0fc

View file

@ -64,3 +64,4 @@ utility
variant2
winapi
asio
beast