node.js - nodejs - Canalize um fluxo para s3.upload()




s3 node js sdk (6)

Embrulhe a função upload() do S3 com o fluxo stream.PassThrough() .

Aqui está um exemplo:

inputStream
  .pipe(uploadFromStream(s3));

function uploadFromStream(s3) {
  var pass = new stream.PassThrough();

  var params = {Bucket: BUCKET, Key: KEY, Body: pass};
  s3.upload(params, function(err, data) {
    console.log(err, data);
  });

  return pass;
}

Atualmente, estou usando um plugin node.js chamado s3-upload-stream para transmitir arquivos muito grandes para o Amazon S3. Ele usa a API de várias partes e, na maioria das vezes, funciona muito bem.

No entanto, este módulo está mostrando sua idade e eu já tive que fazer modificações nele (o autor também o reprovou). Hoje encontrei outro problema com a Amazon e gostaria muito de seguir a recomendação do autor e começar a usar o aws-sdk oficial para realizar meus envios.

MAS.

O SDK oficial não parece oferecer suporte à canalização para s3.upload() . A natureza do s3.upload é que você deve passar o fluxo legível como argumento para o construtor S3.

Tenho mais de 120 módulos de código de usuário que processam vários arquivos e são independentes do destino final de sua saída. O mecanismo entrega a eles um fluxo de saída gravável e canalizável, e eles passam por ele. Não posso entregá-los um objeto AWS.S3 e pedir que eles chamem upload() sem adicionar código a todos os módulos. O motivo pelo qual eu usei o s3-upload-stream foi porque ele suportava a tubulação.

Existe uma maneira de fazer algo aws-sdk s3.upload() qual eu possa canalizar o fluxo?


Estou usando o KnexJS e tive um problema ao usar a API de streaming. Eu finalmente consertei, espero que o seguinte ajude alguém.

const knexStream = knex.select('*').from('my_table').stream();
const passThroughStream = new stream.PassThrough();

knexStream.on('data', (chunk) => passThroughStream.write(JSON.stringify(chunk) + '\n'));
knexStream.on('end', () => passThroughStream.end());

const uploadResult = await s3
  .upload({
    Bucket: 'my-bucket',
    Key: 'stream-test.txt',
    Body: passThroughStream
  })
  .promise();

Nenhuma das respostas funcionou para mim porque eu queria:

  • Canalize para s3.upload()
  • Canalize o resultado de s3.upload() para outro fluxo

A resposta aceita não faz a última. Os outros confiam na API da promessa, que é difícil de trabalhar quando se trabalha com tubos de fluxo.

Esta é a minha modificação da resposta aceita.

const s3 = new S3();

function writeToS3({Key, Bucket}) {
  const Body = new stream.PassThrough();

  s3.upload({
    Body,
    Key,
    Bucket: process.env.adpBucket
  })
   .on('httpUploadProgress', progress => {
       console.log('progress', progress);
   })
   .send((err, data) => {
     if (err) {
       Body.destroy(err);
     } else {
       console.log(`File uploaded and available at ${data.Location}`);
       Body.destroy();
     }
  });

  return Body;
}

const pipeline = myReadableStream.pipe(writeToS3({Key, Bucket});

pipeline.on('close', () => {
  // upload finished, do something else
})
pipeline.on('error', () => {
  // upload wasn't successful. Handle it
})


Para aqueles que reclamam que quando usam a função de upload da API s3 e um arquivo de zero byte acaba na s3 (@ Radar155 e @gabo) - eu também tive esse problema.

Crie um segundo fluxo PassThrough e apenas canalize todos os dados do primeiro para o segundo e passe a referência a esse segundo para s3. Você pode fazer isso de duas maneiras diferentes - possivelmente uma maneira suja é ouvir o evento "data" no primeiro fluxo e depois gravar esses mesmos dados no segundo fluxo - da mesma forma que no evento "end" - basta chamar a função final no segundo fluxo. Não tenho idéia se isso é um bug no aws api, na versão do nó ou em algum outro problema - mas funcionou em torno do problema para mim.

Aqui está como isso pode parecer:

var PassThroughStream = require('stream').PassThrough;
var srcStream = new PassThroughStream();

var rstream = fs.createReadStream('Learning/stocktest.json');
var sameStream = rstream.pipe(srcStream);
// interesting note: (srcStream == sameStream) at this point
var destStream = new PassThroughStream();
// call your s3.upload function here - passing in the destStream as the Body parameter
srcStream.on('data', function (chunk) {
    destStream.write(chunk);
});

srcStream.on('end', function () {
    dataStream.end();
});

Se você conhece o tamanho do fluxo, pode usar minio-js para fazer upload do fluxo desta forma:

  s3Client.putObject('my-bucketname', 'my-objectname.ogg', stream, size, 'audio/ogg', function(e) {
    if (e) {
      return console.log(e)
    }
    console.log("Successfully uploaded the stream")
  })

Solução de Script de Tipo:
Este exemplo usa:

import * as AWS from "aws-sdk";
import * as fsExtra from "fs-extra";
import * as zlib from "zlib";
import * as stream from "stream";

E função assíncrona:

public async saveFile(filePath: string, s3Bucket: AWS.S3, key: string, bucketName: string): Promise<boolean> { 

         const uploadStream = (S3: AWS.S3, Bucket: string, Key: string) => {
            const passT = new stream.PassThrough();
            return {
              writeStream: passT,
              promise: S3.upload({ Bucket, Key, Body: passT }).promise(),
            };
          };
        const { writeStream, promise } = uploadStream(s3Bucket, bucketName, key);
        fsExtra.createReadStream(filePath).pipe(writeStream);     //  NOTE: Addition You can compress to zip by  .pipe(zlib.createGzip()).pipe(writeStream)
        let output = true;
        await promise.catch((reason)=> { output = false; console.log(reason);});
        return output;
}

Chame esse método em algum lugar como:

let result = await saveFileToS3(testFilePath, someS3Bucket, someKey, someBucketName);