Prevent GC of Buffer object vector via accumulate/lambda.

Removes need for naming Buffers and separate container struct.
This commit is contained in:
Lovell Fuller 2016-07-11 13:06:57 +01:00
parent d17e8d3450
commit fee3d882c7

View File

@ -9,9 +9,6 @@
#include <node.h> #include <node.h>
#include <node_buffer.h> #include <node_buffer.h>
#include <sstream>
#include <iomanip>
#include "nan.h" #include "nan.h"
#include "common.h" #include "common.h"
@ -84,19 +81,19 @@ using sharp::counterProcess;
using sharp::counterQueue; using sharp::counterQueue;
using sharp::GetBooleanOperation; using sharp::GetBooleanOperation;
typedef struct BufferContainer_t {
std::string name;
Local<Object> nodeBuf;
} BufferContainer;
class PipelineWorker : public AsyncWorker { class PipelineWorker : public AsyncWorker {
public: public:
PipelineWorker(Callback *callback, PipelineBaton *baton, Callback *queueListener, PipelineWorker(
const std::vector<BufferContainer> saveBuffers) : Callback *callback, PipelineBaton *baton, Callback *queueListener,
AsyncWorker(callback), baton(baton), queueListener(queueListener), saveBuffers(saveBuffers) { std::vector<Local<Object>> const buffersToPersist
for (const BufferContainer buf : saveBuffers) { ) : AsyncWorker(callback), baton(baton), queueListener(queueListener), buffersToPersist(buffersToPersist) {
SaveToPersistent(buf.name.c_str(), buf.nodeBuf); // Protect Buffer objects from GC, keyed on index
std::accumulate(buffersToPersist.begin(), buffersToPersist.end(), 0,
[this](uint32_t index, Local<Object> const buffer) -> uint32_t {
SaveToPersistent(index, buffer);
return index + 1;
} }
);
} }
~PipelineWorker() {} ~PipelineWorker() {}
@ -1052,9 +1049,12 @@ class PipelineWorker : public AsyncWorker {
} }
// Dispose of Persistent wrapper around input Buffers so they can be garbage collected // Dispose of Persistent wrapper around input Buffers so they can be garbage collected
for (const BufferContainer buf : saveBuffers) { std::accumulate(buffersToPersist.begin(), buffersToPersist.end(), 0,
GetFromPersistent(buf.name.c_str()); [this](uint32_t index, Local<Object> const buffer) -> uint32_t {
GetFromPersistent(index);
return index + 1;
} }
);
delete baton; delete baton;
// Decrement processing task counter // Decrement processing task counter
@ -1070,7 +1070,7 @@ class PipelineWorker : public AsyncWorker {
private: private:
PipelineBaton *baton; PipelineBaton *baton;
Callback *queueListener; Callback *queueListener;
std::vector<BufferContainer> saveBuffers; std::vector<Local<Object>> buffersToPersist;
/* /*
Calculate the angle of rotation and need-to-flip for the output image. Calculate the angle of rotation and need-to-flip for the output image.
@ -1134,6 +1134,9 @@ NAN_METHOD(pipeline) {
PipelineBaton *baton = new PipelineBaton; PipelineBaton *baton = new PipelineBaton;
Local<Object> options = info[0].As<Object>(); Local<Object> options = info[0].As<Object>();
// Input Buffers must not undergo GC compaction during processing
std::vector<Local<Object>> buffersToPersist;
// Input filename // Input filename
baton->fileIn = attrAsStr(options, "fileIn"); baton->fileIn = attrAsStr(options, "fileIn");
baton->accessMethod = attrAs<bool>(options, "sequentialRead") ? baton->accessMethod = attrAs<bool>(options, "sequentialRead") ?
@ -1144,6 +1147,7 @@ NAN_METHOD(pipeline) {
bufferIn = Get(options, New("bufferIn").ToLocalChecked()).ToLocalChecked().As<Object>(); bufferIn = Get(options, New("bufferIn").ToLocalChecked()).ToLocalChecked().As<Object>();
baton->bufferInLength = node::Buffer::Length(bufferIn); baton->bufferInLength = node::Buffer::Length(bufferIn);
baton->bufferIn = node::Buffer::Data(bufferIn); baton->bufferIn = node::Buffer::Data(bufferIn);
buffersToPersist.push_back(bufferIn);
} }
// ICC profile to use when input CMYK image has no embedded profile // ICC profile to use when input CMYK image has no embedded profile
baton->iccProfilePath = attrAsStr(options, "iccProfilePath"); baton->iccProfilePath = attrAsStr(options, "iccProfilePath");
@ -1192,6 +1196,7 @@ NAN_METHOD(pipeline) {
overlayBufferIn = Get(options, New("overlayBufferIn").ToLocalChecked()).ToLocalChecked().As<Object>(); overlayBufferIn = Get(options, New("overlayBufferIn").ToLocalChecked()).ToLocalChecked().As<Object>();
baton->overlayBufferInLength = node::Buffer::Length(overlayBufferIn); baton->overlayBufferInLength = node::Buffer::Length(overlayBufferIn);
baton->overlayBufferIn = node::Buffer::Data(overlayBufferIn); baton->overlayBufferIn = node::Buffer::Data(overlayBufferIn);
buffersToPersist.push_back(overlayBufferIn);
} }
baton->overlayGravity = attrAs<int32_t>(options, "overlayGravity"); baton->overlayGravity = attrAs<int32_t>(options, "overlayGravity");
baton->overlayXOffset = attrAs<int32_t>(options, "overlayXOffset"); baton->overlayXOffset = attrAs<int32_t>(options, "overlayXOffset");
@ -1205,6 +1210,7 @@ NAN_METHOD(pipeline) {
booleanBufferIn = Get(options, New("booleanBufferIn").ToLocalChecked()).ToLocalChecked().As<Object>(); booleanBufferIn = Get(options, New("booleanBufferIn").ToLocalChecked()).ToLocalChecked().As<Object>();
baton->booleanBufferInLength = node::Buffer::Length(booleanBufferIn); baton->booleanBufferInLength = node::Buffer::Length(booleanBufferIn);
baton->booleanBufferIn = node::Buffer::Data(booleanBufferIn); baton->booleanBufferIn = node::Buffer::Data(booleanBufferIn);
buffersToPersist.push_back(booleanBufferIn);
} }
// Resize options // Resize options
baton->withoutEnlargement = attrAs<bool>(options, "withoutEnlargement"); baton->withoutEnlargement = attrAs<bool>(options, "withoutEnlargement");
@ -1295,15 +1301,7 @@ NAN_METHOD(pipeline) {
// Join queue for worker thread // Join queue for worker thread
Callback *callback = new Callback(info[1].As<Function>()); Callback *callback = new Callback(info[1].As<Function>());
AsyncQueueWorker(new PipelineWorker(callback, baton, queueListener, buffersToPersist));
std::vector<BufferContainer> saveBuffers;
if (baton->bufferInLength)
saveBuffers.push_back({"bufferIn", bufferIn});
if (baton->overlayBufferInLength)
saveBuffers.push_back({"overlayBufferIn", overlayBufferIn});
if (baton->booleanBufferInLength)
saveBuffers.push_back({"booleanBufferIn", booleanBufferIn});
AsyncQueueWorker(new PipelineWorker(callback, baton, queueListener, saveBuffers));
// Increment queued task counter // Increment queued task counter
g_atomic_int_inc(&counterQueue); g_atomic_int_inc(&counterQueue);