1. ホーム
  2. スクリプト・コラム
  3. パール

PerlによるMSSQLへのアクセスとMySQLデータベースへの移行スクリプト例

2022-01-29 09:56:26

LinuxでMSSQLにアクセスするための特定のライブラリはありませんが、MSSQLはもともとSybaseから派生したものなので、Sybaseにアクセスするためのライブラリは当然MSSQLにアクセスでき、FreeTDSはそのような実装になっています。
Perlは通常DBIを使ってデータベースにアクセスするので、FreeTDSをシステムにインストールした後は、DBIを使ってFreeTDS経由でMSSQLのデータベースにアクセスするなどの使い方ができます。

コピーコード コードは以下の通りです。

using DBI;
my $cs = "DRIVER={FreeTDS};SERVER=host;PORT=1433;DATABASE=database;UID=sa;PWD=password;TDS_VERSION=7.1;charset=gb2312";
my $dbh = DBI->connect("dbi:ODBC:$cs") or die $@;

Windowsはあまり使わないのですが、QQQデータベースを勉強するために、MSSQLからMySQLにデータを移行する必要があり、QQQデータベース用にWindows Server 2008とSQL Server 2008r2を導入しましたが、あと数日で評価が切れるので、MS SQL Serverからデータを移行する機能を持つMySQL Workbenchを勉強してきました。機能は、MS SQL Serverからデータを移行するが、このような巨大なデータとサブテーブルとサブバンクのデータのQQグループのためにあまりにも面倒なようなので、MSSQLからMySQLにデータベースを移行するための汎用Perlスクリプトを書いて、bashと組み合わせて、それは非常に過去にテーブルの数百の20以上の銀行に転送する便利ですが、Perlコードは次のようになります。
コピーコード コードは以下の通りです。

#! /usr/bin/perl
use strict;
use warnings;
use DBI;


die "Usage: qq db\n" if @ARGV ! = 1;
my $db = $ARGV[0];

print "Connectin to databases $db... \n";
my $cs = "DRIVER={FreeTDS};SERVER=MSSQL's server;PORT=1433;DATABASE=$db;UID=sa;PWD=MSSQL password;TDS_VERSION=7.1;charset=gb2312";

sub db_connect
{
    my $src = DBI->connect("dbi:ODBC:$cs") or die $@;
    my $target = DBI->connect("dbi:mysql:host=MySQL server", "MySQL username", "MySQL password") or die $@;
    return ($src, $target);
}
my ($src, $target) = db_connect;

print "Reading table schemas .... \n";

my $q_tables = $src->prepare("SELECT name FROM sysobjects WHERE xtype = 'U' AND name ! = 'dtproperties';");#Get all table names
my $q_key_usage = $src->prepare("SELECT TABLE_NAME, COLUMN_NAME from INFORMATION_SCHEMA.KEY_COLUMN_USAGE;");#Get the primary key of the table
$q_tables->execute;
my @tables = ();
my %keys = ();
push @tables, @_ while @_ = $q_tables->fetchrow_array;

$q_tables->finish;

$q_key_usage->execute();
$keys{$_[0]} = $_[1] while @_ = $q_key_usage->fetchrow_array;
$q_key_usage->finish;


#Get index information for the table
my $q_index = $src->prepare(qq(
    SELECT T.name, C.name
    FROM sys.index_columns I
    INNER JOIN sys.tables T ON T.object_id = I.object_id
    INNER JOIN sys.columns C ON C.column_id = I.column_id AND I.object_id = C.object_id;
));
$q_index->execute;
my %table_indices = ();
while(my @row = $q_index->fetchrow_array)
{
    my ($table, $column) = @row;
    my $columns = $table_indices{$table};
    $columns = $table_indices{$table} = [] if not $columns;
    push @$columns, $column;
}
$q_index->finish;

# Create the corresponding database on the target MySQL
$target->do("DROP DATABASE IF EXISTS `$db`;") or die "Cannot drop old database $db\n";
$target->do("CREATE DATABASE `$db` DEFAULT CHARSET = utf8 COLLATE utf8_general_ci;") or die "Cannot create database $db\n" ;;
$target->disconnect;
$src->disconnect;


my $total_start = time;
for my $table(@tables)
{
    my $pid = fork;
    unless($pid)
    {
        ($src, $target) = db_connect;
        my $start = time;
        $src->do("USE $db;");
        # Get the table structure used to generate the DDL for MySQL
        my $q_schema = $src->prepare("SELECT COLUMN_NAME, IS_NULLABLE, DATA_TYPE, CHARACTER_MAXIMUM_LENGTH from INFORMATION_SCHEMA.COLUMNS where TABLE_NAME = ? ORDER BY ORDINAL_POSITION;");
        $target->do("USE `$db`;");
        $target->do("SET NAMES utf8;");
        my $key_column = $keys{$table};
        my $ddl = "CREATE TABLE `$table` ( \n";
        $q_schema->execute($table);
        my @fields = ();
        while(my @row = $q_schema->fetchrow_array)
        {
            my ($column, $nullable, $datatype, $length) = @row;
            my $field = "`$column` $datatype";
            $field . = "($length)" if $length;
            $field . = " PRIMARY KEY" if $key_column eq $column;
            push @fields, $field;
        }
        $ddl . = join(",\n", @fields);
        $ddl . = "\n) ENGINE = MyISAM;\n\n";
        $target->do($ddl) or die "Cannot create table $table\n";
        #Create index
        my $indices = $table_indices{$table};
        if($indices)
        {
            for(@$indices)
            {
                $target->do("CREATE INDEX `$_` ON `$table`(`$_`);\n") or die "Cannot create index on $db.$table$. $_\n";
            }
        }
        #transfer data
        my @placeholders = map {'?'} @fields;
        my $insert_sql = "INSERT DELAYED INTO $table VALUES(" . (join ', ', @placeholders) . ");\n";
        my $insert = $target->prepare($insert_sql);
        my $select = $src->prepare("SELECT * FROM $table;");
        $select->execute;
        $select->{'LongReadLen'} = 1000;
        $select->{'LongTruncOk'} = 1;
        $target->do("SET AUTOCOMMIT = 0;");
        $target->do("START TRANSACTION;");
        my $rows = 0;
        while(my @row = $select->fetchrow_array)
        {
            $insert->execute(@row);
            $rows++;
        }
        $target->do("COMMIT;");
        # End, output task information
        my $elapsed = time - $start;
        print "Child process $$ for table $db.$table done, $rows records, $elapsed seconds.\n";
        exit(0);
    }
}
print "Waiting for child processes\n";
#Waiting for all child processes to finish
while (wait() ! = -1) {}
my $total_elapsed = time - $total_start;
print "All tasks from $db finished, $total_elapsed seconds.\n";

このスクリプトは、各テーブルに基づいて子プロセスと対応するデータベース接続をフォークするので、この移行を行う前に、ターゲットMySQLデータベースが処理できる最大数の接続に設定されていることを確認する必要があります。
そして、bashで実行します。

コピーコード コードは以下の通りです。

for x in {1..11};do . /qq.pl QunInfo$x; done
for x in {1..11};do . /qq.pl GroupData$x; done

そのままにしておくと、スクリプトがMySQL側で同じ構造を作り、MSSQL側のテーブル構造に基づいてインデックスを構成してくれます。