diff --git a/jubatus/core/storage/abstract_column.hpp b/jubatus/core/storage/abstract_column.hpp index 8732bf43..40de311b 100644 --- a/jubatus/core/storage/abstract_column.hpp +++ b/jubatus/core/storage/abstract_column.hpp @@ -315,10 +315,41 @@ class typed_column : public detail::abstract_column_base { template void pack_array(msgpack::packer& packer) const { - packer.pack(array_); + static const uint64_t max_segment_size = (1ull << 32) - 1; + const uint64_t bytes = array_.size() * sizeof(uint64_t); + const uint64_t num_of_segments = + (bytes + max_segment_size - 1) / max_segment_size; + const char *p = reinterpret_cast(array_.data()); + packer.pack_array(num_of_segments); + for (uint64_t remains = bytes; remains > 0;) { + const uint64_t seg_size = std::min(remains, max_segment_size); + packer.pack_raw(seg_size); + packer.pack_raw_body(p, seg_size); + p += seg_size; + remains -= seg_size; + } } void unpack_array(msgpack::object o) { - o.convert(&array_); + if (o.type != msgpack::type::ARRAY) { + throw msgpack::type_error(); + } + size_t bytes = 0; + msgpack::object_array& ary = o.via.array; + for (uint32_t i = 0; i < ary.size; ++i) { + if (ary.ptr[i].type != msgpack::type::RAW) { + throw msgpack::type_error(); + } + bytes += ary.ptr[i].via.raw.size; + } + if (bytes % sizeof(uint64_t) != 0) { + throw msgpack::type_error(); + } + array_.resize(bytes / sizeof(uint64_t)); + char *p = reinterpret_cast(array_.data()); + for (uint32_t i = 0; i < ary.size; ++i) { + std::memcpy(p, ary.ptr[i].via.raw.ptr, ary.ptr[i].via.raw.size); + p += ary.ptr[i].via.raw.size; + } } private: diff --git a/jubatus/core/storage/column_table.hpp b/jubatus/core/storage/column_table.hpp index f3896848..05b62c06 100644 --- a/jubatus/core/storage/column_table.hpp +++ b/jubatus/core/storage/column_table.hpp @@ -385,7 +385,28 @@ class column_table { void unpack(msgpack::object o) { jubatus::util::concurrent::scoped_wlock lk(table_lock_); - o.convert(this); + + // inline expansion and memory optimization: o.convert(this); + if (o.type != msgpack::type::ARRAY || o.via.array.size != 6) { + throw msgpack::type_error(); + } + msgpack::object *ptr = o.via.array.ptr; + + // unpack num of tuples at first + ptr[1].convert(&tuples_); + + // reserve vector/unordered_map memory + keys_.clear(); + versions_.clear(); + index_.clear(); + keys_.reserve(tuples_); + versions_.reserve(tuples_); + index_.reserve(tuples_); + ptr[0].convert(&keys_); + ptr[2].convert(&versions_); + ptr[3].convert(&columns_); + ptr[4].convert(&clock_); + ptr[5].convert(&index_); } private: