diff --git a/src/pipeline.cc b/src/pipeline.cc index 64c8f4d7..f468a175 100644 --- a/src/pipeline.cc +++ b/src/pipeline.cc @@ -9,9 +9,6 @@ #include #include -#include -#include - #include "nan.h" #include "common.h" @@ -84,19 +81,19 @@ using sharp::counterProcess; using sharp::counterQueue; using sharp::GetBooleanOperation; -typedef struct BufferContainer_t { - std::string name; - Local nodeBuf; -} BufferContainer; - class PipelineWorker : public AsyncWorker { public: - PipelineWorker(Callback *callback, PipelineBaton *baton, Callback *queueListener, - const std::vector saveBuffers) : - AsyncWorker(callback), baton(baton), queueListener(queueListener), saveBuffers(saveBuffers) { - for (const BufferContainer buf : saveBuffers) { - SaveToPersistent(buf.name.c_str(), buf.nodeBuf); - } + PipelineWorker( + Callback *callback, PipelineBaton *baton, Callback *queueListener, + std::vector> const buffersToPersist + ) : AsyncWorker(callback), baton(baton), queueListener(queueListener), buffersToPersist(buffersToPersist) { + // Protect Buffer objects from GC, keyed on index + std::accumulate(buffersToPersist.begin(), buffersToPersist.end(), 0, + [this](uint32_t index, Local const buffer) -> uint32_t { + SaveToPersistent(index, buffer); + return index + 1; + } + ); } ~PipelineWorker() {} @@ -1052,9 +1049,12 @@ class PipelineWorker : public AsyncWorker { } // Dispose of Persistent wrapper around input Buffers so they can be garbage collected - for (const BufferContainer buf : saveBuffers) { - GetFromPersistent(buf.name.c_str()); - } + std::accumulate(buffersToPersist.begin(), buffersToPersist.end(), 0, + [this](uint32_t index, Local const buffer) -> uint32_t { + GetFromPersistent(index); + return index + 1; + } + ); delete baton; // Decrement processing task counter @@ -1070,7 +1070,7 @@ class PipelineWorker : public AsyncWorker { private: PipelineBaton *baton; Callback *queueListener; - std::vector saveBuffers; + std::vector> buffersToPersist; /* Calculate the angle of rotation and need-to-flip for the output image. @@ -1134,6 +1134,9 @@ NAN_METHOD(pipeline) { PipelineBaton *baton = new PipelineBaton; Local options = info[0].As(); + // Input Buffers must not undergo GC compaction during processing + std::vector> buffersToPersist; + // Input filename baton->fileIn = attrAsStr(options, "fileIn"); baton->accessMethod = attrAs(options, "sequentialRead") ? @@ -1144,6 +1147,7 @@ NAN_METHOD(pipeline) { bufferIn = Get(options, New("bufferIn").ToLocalChecked()).ToLocalChecked().As(); baton->bufferInLength = node::Buffer::Length(bufferIn); baton->bufferIn = node::Buffer::Data(bufferIn); + buffersToPersist.push_back(bufferIn); } // ICC profile to use when input CMYK image has no embedded profile baton->iccProfilePath = attrAsStr(options, "iccProfilePath"); @@ -1192,6 +1196,7 @@ NAN_METHOD(pipeline) { overlayBufferIn = Get(options, New("overlayBufferIn").ToLocalChecked()).ToLocalChecked().As(); baton->overlayBufferInLength = node::Buffer::Length(overlayBufferIn); baton->overlayBufferIn = node::Buffer::Data(overlayBufferIn); + buffersToPersist.push_back(overlayBufferIn); } baton->overlayGravity = attrAs(options, "overlayGravity"); baton->overlayXOffset = attrAs(options, "overlayXOffset"); @@ -1205,6 +1210,7 @@ NAN_METHOD(pipeline) { booleanBufferIn = Get(options, New("booleanBufferIn").ToLocalChecked()).ToLocalChecked().As(); baton->booleanBufferInLength = node::Buffer::Length(booleanBufferIn); baton->booleanBufferIn = node::Buffer::Data(booleanBufferIn); + buffersToPersist.push_back(booleanBufferIn); } // Resize options baton->withoutEnlargement = attrAs(options, "withoutEnlargement"); @@ -1295,15 +1301,7 @@ NAN_METHOD(pipeline) { // Join queue for worker thread Callback *callback = new Callback(info[1].As()); - - std::vector saveBuffers; - if (baton->bufferInLength) - saveBuffers.push_back({"bufferIn", bufferIn}); - if (baton->overlayBufferInLength) - saveBuffers.push_back({"overlayBufferIn", overlayBufferIn}); - if (baton->booleanBufferInLength) - saveBuffers.push_back({"booleanBufferIn", booleanBufferIn}); - AsyncQueueWorker(new PipelineWorker(callback, baton, queueListener, saveBuffers)); + AsyncQueueWorker(new PipelineWorker(callback, baton, queueListener, buffersToPersist)); // Increment queued task counter g_atomic_int_inc(&counterQueue);