PostgreSQL libpq connection pool
For PostgreSQL in C++, there is a wonderful library libpq. The library is very well documented, there's even a full translation into Russian language, from the company PostgresPRO.
When writing a server backend, faced with the fact that in this library there is no connection pool and the database, was assumed in a rather intensive mode and one connection was clearly not enough. Each time to establish connection to send the received data, it would be insane, because the connection is the longest operation, it was decided to write a pool of connections.
The idea that we are at the start of the program to create multiple connections and store them in the queue.
When the data come, we just take the free soedinenie from the queue, and if no loose connections — I like to see, use it to insert the data and then put the connection back. The idea is pretty simple, quick to implement, and most importantly, a very high speed.
Create a PostgreSQL database named demo, demo such a sign
Write a class to represent a connection to the database, the connection parameters will be spelled out in the code for simplification, in reality, of course they should be stored in a configuration file and when you start to read there, so if you change the settings of the server had to recompile the program.
To prevent possible leakage of resources, the combination we will store a smart pointer.
In the constructor, we call the function PQsetdbLogin that establishes a connection to the database, returning a pointer to soedinenie PGconn* and translate soedinenie in asynchronous mode.
When finished, coedine must be removed by the PQfinish function which is passed the pointer returned by the function PQsetdbLogin. Therefore, the last parameter in the call m_connection.reset() we pass the address of the function &PQfinish. When the smart pointer goes out of scope and the reference count is zero, it will call this function, thereby correctly completing coedine.
Now we need a class that will create, store and manage a pool of connections.
In function createPool create a pool of soedinenii, I installed the 10 compounds. Next — create a class PGBackend, and work with it using functions connection — which returns the connection to the database, and freeConnection which puts suedine back in the queue.
It works on the basis of conditional variables, if the queue is empty, it means no loose connections, and the thread goes to sleep until awakened by using a conditional variable.
A simple example that uses our backend with a pool of connections listed in the file main.cpp. In "fighting conditions", you certainly will be a cycle of events which will be working with databases. I have a boost::asio, which works asynchronously, receiving events from the network, writes data to DB. To bring it here too in order not to complicate the idea with the pool of connections. Here we just create 50 threads that are running on the server via one and return the second PGBackend.
It's all compiled with the command:
the
Be careful with the number of database connections — this parameter is specified by the max_connections (integer).
Source code
Article based on information from habrahabr.ru
When writing a server backend, faced with the fact that in this library there is no connection pool and the database, was assumed in a rather intensive mode and one connection was clearly not enough. Each time to establish connection to send the received data, it would be insane, because the connection is the longest operation, it was decided to write a pool of connections.
The idea that we are at the start of the program to create multiple connections and store them in the queue.
When the data come, we just take the free soedinenie from the queue, and if no loose connections — I like to see, use it to insert the data and then put the connection back. The idea is pretty simple, quick to implement, and most importantly, a very high speed.
Create a PostgreSQL database named demo, demo such a sign
structure
-- Table: public.demo
-- DROP TABLE public.demo;
CREATE TABLE public.demo
(
id integer NOT NULL DEFAULT nextval('demo_id_seq'::regclass),
name character varying(256),
Demo_pk CONSTRAINT PRIMARY KEY (id)
)
WITH (
OIDS=FALSE
);
ALTER TABLE public.demo
OWNER TO postgres;
Write a class to represent a connection to the database, the connection parameters will be spelled out in the code for simplification, in reality, of course they should be stored in a configuration file and when you start to read there, so if you change the settings of the server had to recompile the program.
pgconnection.h
#ifndef PGCONNECTION_H
#define PGCONNECTION_H
#include <memory>
#include <mutex>
#include <libpq-fe.h>
class PGConnection
{
public:
PGConnection();
std::shared_ptr<PGconn> connection() const;
private:
void establish_connection();
std::string m_dbhost = "localhost";
int m_dbport = 5432;
std::string m_dbname = "demo";
std::string m_dbuser = "postgres";
std::string m_dbpass = "postgres";
std::shared_ptr<PGconn> m_connection;
};
#endif //PGCONNECTION_H
pgconnection.cpp
#include "pgconnection.h"
PGConnection::PGConnection()
{
m_connection.reset( PQsetdbLogin(m_dbhost.c_str(), std::to_string(m_dbport).c_str(), nullptr, nullptr, m_dbname.c_str(), m_dbuser.c_str(), m_dbpass.c_str()), &PQfinish );
if (PQstatus( m_connection.get() ) != CONNECTION_OK &&PQsetnonblocking(m_connection.get(), 1) != 0 )
{
throw std::runtime_error( PQerrorMessage( m_connection.get() ) );
}
}
std::shared_ptr<PGconn> PGConnection::connection() const
{
return m_connection;
}
To prevent possible leakage of resources, the combination we will store a smart pointer.
In the constructor, we call the function PQsetdbLogin that establishes a connection to the database, returning a pointer to soedinenie PGconn* and translate soedinenie in asynchronous mode.
When finished, coedine must be removed by the PQfinish function which is passed the pointer returned by the function PQsetdbLogin. Therefore, the last parameter in the call m_connection.reset() we pass the address of the function &PQfinish. When the smart pointer goes out of scope and the reference count is zero, it will call this function, thereby correctly completing coedine.
Now we need a class that will create, store and manage a pool of connections.
pgbackend.h
#ifndef PGBACKEND_H
#define PGBACKEND_H
#include <memory>
#include <mutex>
#include <string>
#include <queue>
#include <condition_variable>
#include <libpq-fe.h>
#include "pgconnection.h"
class PGBackend
{
public:
PGBackend();
std::shared_ptr<PGConnection> connection();
void freeConnection(std::shared_ptr<PGConnection>);
private:
void createPool();
std::mutex m_mutex;
std::condition_variable m_condition;
std::queue<std::shared_ptr<PGConnection>> m_pool;
const int POOL = 10;
};
#endif //PGBACKEND_H
pgbackend.cpp
#include <iostream>
#include <thread>
#include <fstream>
#include <sstream>
#include "pgbackend.h"
PGBackend::PGBackend()
{
createPool();
}
void PGBackend::createPool()
{
std::lock_guard<std::mutex> locker_( m_mutex );
for ( auto i = 0; i< POOL; ++i ){
m_pool.emplace ( std::make_shared<PGConnection>() );
}
}
std::shared_ptr<PGConnection> PGBackend::connection()
std::unique_lock<std::mutex > lock_( m_mutex );
while ( m_pool.empty() ){
m_condition.wait( lock_ );
}
auto conn_ = m_pool.front();
m_pool.pop();
return conn_;
}
void PGBackend::freeConnection(std::shared_ptr<PGConnection> conn_)
{
std::unique_lock<std::mutex > lock_( m_mutex );
m_pool.push( conn_ );
lock_.unlock();
m_condition.notify_one();
}
In function createPool create a pool of soedinenii, I installed the 10 compounds. Next — create a class PGBackend, and work with it using functions connection — which returns the connection to the database, and freeConnection which puts suedine back in the queue.
It works on the basis of conditional variables, if the queue is empty, it means no loose connections, and the thread goes to sleep until awakened by using a conditional variable.
A simple example that uses our backend with a pool of connections listed in the file main.cpp. In "fighting conditions", you certainly will be a cycle of events which will be working with databases. I have a boost::asio, which works asynchronously, receiving events from the network, writes data to DB. To bring it here too in order not to complicate the idea with the pool of connections. Here we just create 50 threads that are running on the server via one and return the second PGBackend.
main.cpp
#include <thread>
#include <iostream>
#include "pgbackend.h"
void testConnection(std::shared_ptr<PGBackend > pgbackend)
{
//get free connection
auto conn = pgbackend->connection();
std::string demo = "SELECT max(id) FROM demo;" ;
PQsendQuery( conn->connection().get(), demo.c_str() );
while ( auto res_ = PQgetResult( conn->connection().get()) ) {
if (PQresultStatus(res_) == PGRES_TUPLES_OK &&PQntuples(res_)) {
auto ID = PQgetvalue (res_ ,0, 0);
std::cout<< ID<<std::endl;
}
if (PQresultStatus(res_) == PGRES_FATAL_ERROR){
std::cout<< PQresultErrorMessage(res_)<<std::endl;
}
PQclear( res_ );
}
//return the connection to the queue
pgbackend- > freeConnection(conn);
}
int main(int argc, char const *argv[])
{
auto pgbackend = std::make_shared<PGBackend>();
std::vector<std::shared_ptr<std::thread>> vec;
for ( size_t i = 0; i< 50 ; ++i ){
vec.push_back(std::make_shared<std::thread>(std::thread(testConnection, pgbackend)));
}
for(auto &i : vec) {
i.get()->join();
}
return 0;
}
It's all compiled with the command:
the
g++ main.cpp pgbackend.cpp pgconnection.cpp -o pool-std=c++14 -I/usr/include/postgresql/ -lpq -lpthread
Be careful with the number of database connections — this parameter is specified by the max_connections (integer).
Source code
Комментарии
Отправить комментарий