java - JDBC를 사용하여 사용자 정의 유형의 배열로 Postgres로 CSV 복사




database postgresql (2)

내 데이터베이스에 정의 된 사용자 정의 유형이 있습니다.

CREATE TYPE address AS (ip inet, port int);

그리고 배열에서이 유형을 사용하는 테이블 :

CREATE TABLE my_table (
  addresses  address[] NULL
)

다음 내용이 포함 된 샘플 CSV 파일이 있습니다.

{(10.10.10.1,80),(10.10.10.2,443)}
{(10.10.10.3,8080),(10.10.10.4,4040)}

그리고 다음 코드 스 니펫을 사용하여 COPY를 수행합니다.

    Class.forName("org.postgresql.Driver");

    String input = loadCsvFromFile();

    Reader reader = new StringReader(input);

    Connection connection = DriverManager.getConnection(
            "jdbc:postgresql://db_host:5432/db_name", "user",
            "password");

    CopyManager copyManager = connection.unwrap(PGConnection.class).getCopyAPI();

    String copyCommand = "COPY my_table (addresses) " + 
                         "FROM STDIN WITH (" + 
                           "DELIMITER '\t', " + 
                           "FORMAT csv, " + 
                           "NULL '\\N', " + 
                           "ESCAPE '\"', " +
                           "QUOTE '\"')";

    copyManager.copyIn(copyCommand, reader);

이 프로그램을 실행하면 다음 예외가 발생합니다.

Exception in thread "main" org.postgresql.util.PSQLException: ERROR: malformed record literal: "(10.10.10.1"
  Detail: Unexpected end of input.
  Where: COPY only_address, line 1, column addresses: "{(10.10.10.1,80),(10.10.10.2,443)}"
    at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2422)
    at org.postgresql.core.v3.QueryExecutorImpl.processCopyResults(QueryExecutorImpl.java:1114)
    at org.postgresql.core.v3.QueryExecutorImpl.endCopy(QueryExecutorImpl.java:963)
    at org.postgresql.core.v3.CopyInImpl.endCopy(CopyInImpl.java:43)
    at org.postgresql.copy.CopyManager.copyIn(CopyManager.java:185)
    at org.postgresql.copy.CopyManager.copyIn(CopyManager.java:160)

입력에서 괄호의 다른 조합을 시도했지만 COPY 작업을 수행 할 수 없습니다. 내가 잘못 가고있는 어떤 생각?


1NF

우선, 1NF 규격이 아니기 때문에 테이블 디자인이 잘못되었다고 생각합니다. 모든 필드는 원자 속성 만 포함해야하지만 그렇지 않습니다. 왜 테이블 같은 :

CREATE TABLE my_table (
    id,
    ip inet,
    port int
)

여기서 id 는 소스 파일의 라인 번호이고 ip / port 는이 라인의 주소 중 하나입니까? 샘플 데이터 :

id | ip         | port
-----------------------
1  | 10.10.10.1 | 80
1  | 10.10.10.2 | 443
2  | 10.10.10.3 | 8080
2  | 10.10.10.4 | 4040
...

따라서 단일 주소에서 데이터베이스를 쿼리 할 수 ​​있습니다 (관련된 모든 주소를 찾으십시오. 두 행의 주소가 같은 행에 있으면 true를 반환하고 원하는 것이 무엇이든간에 ...).

데이터로드

그러나 당신이하고있는 일을 알고 있다고 가정 해 봅시다. 여기에서 가장 중요한 문제는 입력 데이터 파일이 특별한 형식이라는 것입니다. 단일 열 CSV 파일 일 수도 있지만 매우 축약 된 CSV 파일입니다. 어쨌든, 당신은 데이터베이스에 삽입하기 전에 라인을 변환해야합니다. 두 가지 옵션이 있습니다.

  1. 당신은 입력 파일의 각 라인을 읽고 INSERT (이것은 INSERT 이 걸릴 것이다);
  2. 입력 파일을 예상 된 형식의 텍스트 파일로 변환하고 COPY 사용합니다.

하나씩 삽입

첫 번째 옵션은 쉽습니다 : {(10.10.10.1,80),(10.10.10.2,443)} csv 파일의 첫 번째 행에 대해 쿼리를 실행해야합니다.

INSERT INTO my_table VALUES (ARRAY[('10.10.10.1',80),('10.10.10.2',443)]::address[], 4)

이렇게하려면 새 문자열을 만들어야합니다.

String value = row.replaceAll("\\{", "ARRAY[")
                    .replaceAll("\\}", "]::address[]")
                    .replaceAll("\\(([0-9.]+),", "'$1'");
String sql = String.format("INSERT INTO my_table VALUES (%s)", value);

그리고 입력 파일의 모든 행에 대해 쿼리를 실행하십시오 (또는 보안을 강화하려면 준비된 명령문을 사용하십시오).

COPY 삽입

나는 두 번째 옵션에 대해 자세히 설명 할 것이다. Java 코드에서 사용해야합니다.

copyManager.copyIn(sql, from);

복사 질의는 COPY FROM STDIN 문이고 from 은 판독기입니다. 성명은 다음과 같습니다 :

COPY my_table (addresses) FROM STDIN WITH (FORMAT text);

사본 관리자에게 피드하려면 다음과 같은 데이터가 필요합니다 (따옴표에주의하십시오).

{"(10.10.10.1,80)","(10.10.10.2,443)"}
{"(10.10.10.3,8080)","(10.10.10.4,4040)"}

임시 파일 사용

올바른 형식으로 데이터를 가져 오는 더 간단한 방법은 임시 파일을 만드는 것입니다. 입력 파일의 각 행을 읽고 "( and ) by )" . 이 처리 된 행을 임시 파일에 씁니다. 그런 다음이 파일의 판독기를 사본 관리자에게 전달하십시오.

즉석에서

두 개의 스레드 사용 두 개의 스레드를 사용할 수 있습니다.

  • 스레드 1은 입력 파일을 읽고 라인을 하나씩 처리하여 PipedWriter 씁니다.

  • 스레드 2는 이전 PipedWriter 에 연결된 PipedReader 를 복사 관리자로 전달합니다.

스레드 1이 PipedWriter 데이터 쓰기를 시작하기 전에 스레드 2가 PipedReader 를 읽는 방식으로 스레드를 동기화하는 것이 가장 큰 어려움입니다. 예제 는이 프로젝트를 참조하십시오.

커스텀 리더로 from 독자는 (순진한 버전)과 같은 것의 인스턴스가 될 수 있습니다 :

class DataReader extends Reader {
    PushbackReader csvFileReader;
    private boolean wasParenthese;

    public DataReader(Reader csvFileReader) {
        this.csvFileReader = new PushbackReader(csvFileReader, 1);
        wasParenthese = false;
    }

    @Override
    public void close() throws IOException {
        this.csvFileReader.close();
    }

    @Override
    public int read(char[] cbuf, int off, int len) throws IOException {
        // rely on read()
        for (int i = off; i < off + len; i++) {
            int c = this.read();
            if (c == -1) {
                return i-off > 0 ? i-off : -1;
            }
            cbuf[i] = (char) c;
        }
        return len;
    }

    @Override
    public int read() throws IOException {
        final int c = this.csvFileReader.read();
        if (c == '(' && !this.wasParenthese) {
            this.wasParenthese = true;
            this.csvFileReader.unread('(');
            return '"'; // add " before (
        } else {
            this.wasParenthese = false;
            if (c == ')') {
                this.csvFileReader.unread('"');
                return ')';  // add " after )
            } else {
                return c;
            }
        }
    }
}

이것은 올바른 방법은 public int read(char[] cbuf, int off, int len) 만 덮어 public int read(char[] cbuf, int off, int len) 것이기 때문에 순진한 버전입니다. 그런 다음 cbuf 를 처리하여 따옴표를 추가하고 여분의 문자를 저장해야합니다 오른쪽으로 밀었다 : 이것은 약간 지루하다). 이제, r 이 파일의 독자라면 :

{(10.10.10.1,80),(10.10.10.2,443)}
{(10.10.10.3,8080),(10.10.10.4,4040)}

그냥 사용 :

Class.forName("org.postgresql.Driver");
Connection connection = DriverManager
        .getConnection("jdbc:postgresql://db_host:5432/db_base", "user", "passwd");

CopyManager copyManager = connection.unwrap(PGConnection.class).getCopyAPI();
copyManager.copyIn("COPY my_table FROM STDIN WITH (FORMAT text)", new DataReader(r));

일괄 적재 중

엄청난 양의 데이터를로드하는 경우 다음과 같이 기본 팁을 잊지 마십시오. 자동 커밋을 사용하지 않도록 설정하고 인덱스와 제약 조건을 제거한 다음 TRUNCATEANALYZE 를 사용하십시오.

TRUNCATE my_table;
COPY ...;
ANALYZE my_table;

그러면 로딩 속도가 빨라집니다.


원하는 작업을 수행하는 JUnit 테스트가있는 프로젝트는 https://git.mikael.io/mikaelhg/pg-object-csv-copy-poc/ 를 참조 https://git.mikael.io/mikaelhg/pg-object-csv-copy-poc/ .

기본적으로 배열 항목을 구분하고 형식 필드를 구분하기 위해 쉼표를 사용할 수 있기를 원하지만 CSV 구문 분석에서 쉼표를 필드 구분자로 해석하지 않아야합니다.

그래서

  1. 당신은 CSV 파서에게 전체 행이 하나의 문자열, 하나의 필드라고 생각하게하고, 그것을 작은 따옴표로 묶고 이것을 CSV 파서에게 알려줌으로써 할 수 있습니다.
  2. PG 필드 파서가 각 배열 항목 유형 인스턴스를 큰 따옴표로 묶는 것으로 간주하기를 원합니다.

암호:

copyManager.copyIn("COPY my_table (addresses) FROM STDIN WITH CSV QUOTE ''''", reader);

DML 예제 1 :

COPY my_table (addresses) FROM STDIN WITH CSV QUOTE ''''

CSV 예 1 :

'{"(10.0.0.1,1)","(10.0.0.2,2)"}'
'{"(10.10.10.1,80)","(10.10.10.2,443)"}'
'{"(10.10.10.3,8080)","(10.10.10.4,4040)"}'

DML 예제 2, 큰 따옴표를 이스케이프 처리 :

COPY my_table (addresses) FROM STDIN WITH CSV

CSV 예제 2, 큰 따옴표를 이스케이프 처리 :

"{""(10.0.0.1,1)"",""(10.0.0.2,2)""}"
"{""(10.10.10.1,80)"",""(10.10.10.2,443)""}"
"{""(10.10.10.3,8080)"",""(10.10.10.4,4040)""}"

전체 JUnit 테스트 클래스 :

package io.mikael.poc;

import com.google.common.io.CharStreams;
import org.junit.*;
import org.postgresql.PGConnection;
import org.postgresql.copy.CopyManager;
import org.testcontainers.containers.PostgreSQLContainer;

import java.io.*;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;

import static java.nio.charset.StandardCharsets.UTF_8;

public class CopyTest {

    private Reader reader;

    private Connection connection;

    private CopyManager copyManager;

    private static final String CREATE_TYPE = "CREATE TYPE address AS (ip inet, port int)";

    private static final String CREATE_TABLE = "CREATE TABLE my_table (addresses  address[] NULL)";

    private String loadCsvFromFile(final String fileName) throws IOException {
        try (InputStream is = getClass().getResourceAsStream(fileName)) {
            return CharStreams.toString(new InputStreamReader(is, UTF_8));
        }
    }

    @ClassRule
    public static PostgreSQLContainer db = new PostgreSQLContainer("postgres:10-alpine");

    @BeforeClass
    public static void beforeClass() throws Exception {
        Class.forName("org.postgresql.Driver");
    }

    @Before
    public void before() throws Exception {
        String input = loadCsvFromFile("/data_01.csv");
        reader = new StringReader(input);

        connection = DriverManager.getConnection(db.getJdbcUrl(), db.getUsername(), db.getPassword());
        copyManager = connection.unwrap(PGConnection.class).getCopyAPI();

        connection.setAutoCommit(false);
        connection.beginRequest();

        connection.prepareCall(CREATE_TYPE).execute();
        connection.prepareCall(CREATE_TABLE).execute();
    }

    @After
    public void after() throws Exception {
        connection.rollback();
    }

    @Test
    public void copyTest01() throws Exception {
        copyManager.copyIn("COPY my_table (addresses) FROM STDIN WITH CSV QUOTE ''''", reader);

        final StringWriter writer = new StringWriter();
        copyManager.copyOut("COPY my_table TO STDOUT WITH CSV", writer);
        System.out.printf("roundtrip:%n%s%n", writer.toString());

        final ResultSet rs = connection.prepareStatement(
                "SELECT array_to_json(array_agg(t)) FROM (SELECT addresses FROM my_table) t")
                .executeQuery();
        rs.next();
        System.out.printf("json:%n%s%n", rs.getString(1));
    }

}

테스트 출력 :

roundtrip:
"{""(10.0.0.1,1)"",""(10.0.0.2,2)""}"
"{""(10.10.10.1,80)"",""(10.10.10.2,443)""}"
"{""(10.10.10.3,8080)"",""(10.10.10.4,4040)""}"

json:
[{"addresses":[{"ip":"10.0.0.1","port":1},{"ip":"10.0.0.2","port":2}]},{"addresses":[{"ip":"10.10.10.1","port":80},{"ip":"10.10.10.2","port":443}]},{"addresses":[{"ip":"10.10.10.3","port":8080},{"ip":"10.10.10.4","port":4040}]}]




postgresql-9.5