Libosmium  2.17.0
Fast and flexible C++ library for working with OpenStreetMap data
writer.hpp
Go to the documentation of this file.
1 #ifndef OSMIUM_IO_WRITER_HPP
2 #define OSMIUM_IO_WRITER_HPP
3 
4 /*
5 
6 This file is part of Osmium (https://osmcode.org/libosmium).
7 
8 Copyright 2013-2021 Jochen Topf <jochen@topf.org> and others (see README).
9 
10 Boost Software License - Version 1.0 - August 17th, 2003
11 
12 Permission is hereby granted, free of charge, to any person or organization
13 obtaining a copy of the software and accompanying documentation covered by
14 this license (the "Software") to use, reproduce, display, distribute,
15 execute, and transmit the Software, and to prepare derivative works of the
16 Software, and to permit third-parties to whom the Software is furnished to
17 do so, all subject to the following:
18 
19 The copyright notices in the Software and this entire statement, including
20 the above license grant, this restriction and the following disclaimer,
21 must be included in all copies of the Software, in whole or in part, and
22 all derivative works of the Software, unless such copies or derivative
23 works are solely in the form of machine-executable object code generated by
24 a source language processor.
25 
26 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
27 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
28 FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT
29 SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE
30 FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE,
31 ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
32 DEALINGS IN THE SOFTWARE.
33 
34 */
35 
37 #include <osmium/io/detail/output_format.hpp>
38 #include <osmium/io/detail/queue_util.hpp>
39 #include <osmium/io/detail/read_write.hpp>
40 #include <osmium/io/detail/write_thread.hpp>
41 #include <osmium/io/error.hpp>
42 #include <osmium/io/file.hpp>
43 #include <osmium/io/header.hpp>
45 #include <osmium/memory/buffer.hpp>
46 #include <osmium/thread/pool.hpp>
47 #include <osmium/thread/util.hpp>
48 #include <osmium/util/config.hpp>
49 #include <osmium/version.hpp>
50 
51 #include <cassert>
52 #include <cstddef>
53 #include <exception>
54 #include <functional>
55 #include <future>
56 #include <initializer_list>
57 #include <memory>
58 #include <string>
59 #include <utility>
60 
61 namespace osmium {
62 
63  namespace memory {
64  class Item;
65  } //namespace memory
66 
67  namespace io {
68 
69  namespace detail {
70 
71  inline std::size_t get_output_queue_size() noexcept {
72  return osmium::config::get_max_queue_size("OUTPUT", 20);
73  }
74 
75  } // namespace detail
76 
100  class Writer {
101 
102  enum {
103  default_buffer_size = 10UL * 1024UL * 1024UL
104  };
105 
107 
108  detail::future_string_queue_type m_output_queue{detail::get_output_queue_size(), "raw_output"};
109 
110  std::unique_ptr<osmium::io::detail::OutputFormat> m_output{nullptr};
111 
112  osmium::memory::Buffer m_buffer{};
113 
115 
116  std::future<std::size_t> m_write_future{};
117 
119 
120  // Checking the m_write_future is much more expensive then checking
121  // one atomic bool, so we set this bool in the write_thread when
122  // the writer should check the future...
123  std::atomic_bool m_notification{false};
124 
125  enum class status {
126  okay = 0, // normal writing
127  error = 1, // some error occurred while writing
128  closed = 2 // close() called successfully
130 
131  // This function will run in a separate thread.
132  static void write_thread(detail::future_string_queue_type& output_queue,
133  std::unique_ptr<osmium::io::Compressor>&& compressor,
134  std::promise<std::size_t>&& write_promise,
135  std::atomic_bool* notification) {
136  detail::WriteThread write_thread{output_queue,
137  std::move(compressor),
138  std::move(write_promise),
139  notification};
140  write_thread();
141  }
142 
143  void do_write(osmium::memory::Buffer&& buffer) {
144  if (buffer && buffer.committed() > 0) {
145  m_output->write_buffer(std::move(buffer));
146  }
147  }
148 
149  void do_flush() {
150  if (m_notification) {
152  }
153  if (m_buffer && m_buffer.committed() > 0) {
154  osmium::memory::Buffer buffer{m_buffer_size,
155  osmium::memory::Buffer::auto_grow::no};
156  using std::swap;
157  swap(m_buffer, buffer);
158 
159  m_output->write_buffer(std::move(buffer));
160  }
161  }
162 
163  template <typename TFunction, typename... TArgs>
164  void ensure_cleanup(TFunction func, TArgs&&... args) {
165  if (m_status != status::okay) {
166  throw io_error("Can not write to writer when in status 'closed' or 'error'");
167  }
168 
169  try {
170  func(std::forward<TArgs>(args)...);
171  } catch (...) {
173  detail::add_to_queue(m_output_queue, std::current_exception());
174  detail::add_end_of_data_to_queue(m_output_queue);
175  throw;
176  }
177  }
178 
179  struct options_type {
184  };
185 
186  static void set_option(options_type& options, osmium::thread::Pool& pool) {
187  options.pool = &pool;
188  }
189 
190  static void set_option(options_type& options, const osmium::io::Header& header) {
191  options.header = header;
192  }
193 
194  static void set_option(options_type& options, overwrite value) {
195  options.allow_overwrite = value;
196  }
197 
198  static void set_option(options_type& options, fsync value) {
199  options.sync = value;
200  }
201 
202  void do_close() {
203  if (m_status == status::okay) {
204  ensure_cleanup([&](){
205  do_write(std::move(m_buffer));
206  m_output->write_end();
208  detail::add_end_of_data_to_queue(m_output_queue);
209  });
210  }
211  }
212 
213  public:
214 
245  template <typename... TArgs>
246  explicit Writer(const osmium::io::File& file, TArgs&&... args) :
247  m_file(file.check()) {
248  assert(!m_file.buffer()); // XXX can't handle pseudo-files
249 
250  options_type options;
251  (void)std::initializer_list<int>{
252  (set_option(options, args), 0)...
253  };
254 
255  if (!options.pool) {
257  }
258 
259  m_output = osmium::io::detail::OutputFormatFactory::instance().create_output(*options.pool, m_file, m_output_queue);
260 
261  if (options.header.get("generator").empty()) {
262  options.header.set("generator", "libosmium/" LIBOSMIUM_VERSION_STRING);
263  }
264 
265  std::unique_ptr<osmium::io::Compressor> compressor =
267  osmium::io::detail::open_for_writing(m_file.filename(), options.allow_overwrite),
268  options.sync);
269 
270  std::promise<std::size_t> write_promise;
271  m_write_future = write_promise.get_future();
272  m_thread = osmium::thread::thread_handler{write_thread, std::ref(m_output_queue), std::move(compressor), std::move(write_promise), &m_notification};
273 
274  ensure_cleanup([&](){
275  m_output->write_header(options.header);
276  });
277  }
278 
279  template <typename... TArgs>
280  explicit Writer(const std::string& filename, TArgs&&... args) :
281  Writer(osmium::io::File{filename}, std::forward<TArgs>(args)...) {
282  }
283 
284  template <typename... TArgs>
285  explicit Writer(const char* filename, TArgs&&... args) :
286  Writer(osmium::io::File{filename}, std::forward<TArgs>(args)...) {
287  }
288 
289  Writer(const Writer&) = delete;
290  Writer& operator=(const Writer&) = delete;
291 
292  Writer(Writer&&) = delete;
293  Writer& operator=(Writer&&) = delete;
294 
295  ~Writer() noexcept {
296  try {
297  do_close();
298  } catch (...) {
299  // Ignore any exceptions because destructor must not throw.
300  }
301  }
302 
306  size_t buffer_size() const noexcept {
307  return m_buffer_size;
308  }
309 
314  void set_buffer_size(size_t size) noexcept {
315  m_buffer_size = size;
316  }
317 
325  void flush() {
326  ensure_cleanup([&](){
327  do_flush();
328  });
329  }
330 
339  void operator()(osmium::memory::Buffer&& buffer) {
340  ensure_cleanup([&](){
341  do_flush();
342  do_write(std::move(buffer));
343  });
344  }
345 
353  void operator()(const osmium::memory::Item& item) {
354  ensure_cleanup([&](){
355  if (!m_buffer) {
356  m_buffer = osmium::memory::Buffer{m_buffer_size,
357  osmium::memory::Buffer::auto_grow::no};
358  }
359  try {
360  m_buffer.push_back(item);
361  } catch (const osmium::buffer_is_full&) {
362  do_flush();
363  m_buffer.push_back(item);
364  }
365  });
366  }
367 
379  std::size_t close() {
380  do_close();
381 
382  if (m_write_future.valid()) {
383  return m_write_future.get();
384  }
385 
386  return 0;
387  }
388 
389  }; // class Writer
390 
391  } // namespace io
392 
393 } // namespace osmium
394 
395 #endif // OSMIUM_IO_WRITER_HPP
std::unique_ptr< osmium::io::Compressor > create_compressor(const osmium::io::file_compression compression, TArgs &&... args) const
Definition: compression.hpp:204
static CompressionFactory & instance()
Definition: compression.hpp:184
Definition: file.hpp:72
File & filename(const std::string &filename)
Definition: file.hpp:312
const char * buffer() const noexcept
Definition: file.hpp:143
file_compression compression() const noexcept
Definition: file.hpp:294
Definition: header.hpp:68
Definition: writer.hpp:100
std::size_t close()
Definition: writer.hpp:379
@ default_buffer_size
Definition: writer.hpp:103
static void set_option(options_type &options, fsync value)
Definition: writer.hpp:198
static void set_option(options_type &options, const osmium::io::Header &header)
Definition: writer.hpp:190
static void set_option(options_type &options, osmium::thread::Pool &pool)
Definition: writer.hpp:186
size_t m_buffer_size
Definition: writer.hpp:114
Writer(Writer &&)=delete
Writer & operator=(Writer &&)=delete
void do_flush()
Definition: writer.hpp:149
Writer & operator=(const Writer &)=delete
Writer(const osmium::io::File &file, TArgs &&... args)
Definition: writer.hpp:246
Writer(const std::string &filename, TArgs &&... args)
Definition: writer.hpp:280
size_t buffer_size() const noexcept
Definition: writer.hpp:306
status
Definition: writer.hpp:125
osmium::thread::thread_handler m_thread
Definition: writer.hpp:118
void flush()
Definition: writer.hpp:325
void operator()(osmium::memory::Buffer &&buffer)
Definition: writer.hpp:339
osmium::memory::Buffer m_buffer
Definition: writer.hpp:112
std::atomic_bool m_notification
Definition: writer.hpp:123
~Writer() noexcept
Definition: writer.hpp:295
std::unique_ptr< osmium::io::detail::OutputFormat > m_output
Definition: writer.hpp:110
static void set_option(options_type &options, overwrite value)
Definition: writer.hpp:194
std::future< std::size_t > m_write_future
Definition: writer.hpp:116
void do_close()
Definition: writer.hpp:202
void do_write(osmium::memory::Buffer &&buffer)
Definition: writer.hpp:143
detail::future_string_queue_type m_output_queue
Definition: writer.hpp:108
void set_buffer_size(size_t size) noexcept
Definition: writer.hpp:314
Writer(const Writer &)=delete
osmium::io::File m_file
Definition: writer.hpp:106
void operator()(const osmium::memory::Item &item)
Definition: writer.hpp:353
void ensure_cleanup(TFunction func, TArgs &&... args)
Definition: writer.hpp:164
static void write_thread(detail::future_string_queue_type &output_queue, std::unique_ptr< osmium::io::Compressor > &&compressor, std::promise< std::size_t > &&write_promise, std::atomic_bool *notification)
Definition: writer.hpp:132
Writer(const char *filename, TArgs &&... args)
Definition: writer.hpp:285
enum osmium::io::Writer::status m_status
Definition: item.hpp:105
Definition: pool.hpp:90
static Pool & default_instance()
Definition: pool.hpp:186
Definition: util.hpp:85
Definition: attr.hpp:342
std::size_t get_max_queue_size(const char *queue_name, const std::size_t default_value) noexcept
Definition: config.hpp:83
fsync
Definition: writer_options.hpp:51
overwrite
Definition: writer_options.hpp:43
void check_for_exception(std::future< T > &future)
Definition: util.hpp:55
Namespace for everything in the Osmium library.
Definition: assembler.hpp:53
Definition: location.hpp:551
Definition: writer.hpp:179
overwrite allow_overwrite
Definition: writer.hpp:181
osmium::thread::Pool * pool
Definition: writer.hpp:183
fsync sync
Definition: writer.hpp:182
osmium::io::Header header
Definition: writer.hpp:180
Definition: error.hpp:44
#define LIBOSMIUM_VERSION_STRING
Definition: version.hpp:40