再上一篇:14.4并行DDL
上一篇:14.5并行恢复
主页
下一篇:14.7小结
再下一篇:15.1 SQL*Loader
文章列表

14.6过程并行化

Oracle 9i 10g编程艺术:深入数据库体系结构

在此要讨论两种类型的过程并行化:
并行管道函数(parallel pipelined function),这是 Oracle的一个特性。
“DIY并行化“(DIY parallelism),这是指在你自己的应用上使用Oracle执行并行全表扫描所采用 的技术。与其说DIY并行化是一种直接内建的Oracle中的特性,不如说是一种开发技术。
你可能经常看到,设计为串行执行的应用(一般是批处理应用)往往类似于以下过程:
Create procedure process_data
As
Begin
For x in ( select * from some_table ) Perform complex process on X
Update some other table, or insert the record somewhere else

End loop end
在这种情况下,Oracle 的并行查询或PDML没有什么帮助(在这里,实际上Oracle的SQL并行执行 可能只会导致数据库占用更多的资源,而且花费更长的时间)。如果Oracle并行地执行简单的 SELECT * FROM SOME_TABLE,可能不会提供任何显著的速度提升。如果Oracle在完成复杂的处理之后再并行地执行 UPDATE或INSERT,则没有任何好处(毕竟,这里只会UPDATE/INSERT一行)。
在此显然可以做这样一件事:完成复杂处理之后,对UPDATE/INSERT使用批量处理。不过,这样不会 是运行时间减少50%(或更多),而通常这才是你的目标。别曲解我的意思,我是说:在此你可能想对修改 实现批量处理,但是这样做并不会使处理速度提高2 倍、2倍或更多倍。
现在,假设你晚上在一台4CPU的机器上运行这个过程,机器上只运行着这一个过程,而没有其他活 动。你会观察到,此时只会不充分地使用这个系统上的一个CPU,而且根本没有用多少磁盘。不仅如此, 这个过程的执行要花费数小时,随着增加更多的数据,每天需要的时间会越来越长。你需要把运行时间减 少几倍,它的速度应该是现在的4 倍或8倍才行,所以稍稍地改善百分之几只能算杯水车薪,是远远不够 的。能你该怎么做呢?
对此有两种办法。一种方法是实现一个并行管道函数,Oracle 会确定一个合适的并行度(这是推荐 的做法)。Oracle会创建会话,进行协调,并运行这些会话,这与前面使用了 CREATE TABLE AS SELECT OR INSERT /*+ APPEND */的并行DDL例子很相似。Oracle会为我们完全自动地实现并行直接路径加载。另一 种方法是DIY并行化。下面几节将分别介绍这两种方法。

14.6.1并行管道函数

还是用前面那个串行进程PROCESS_DATA,不过这一次让Oracle为我们并行地执行这个进程。为此, 需要把这个例程“倒过来“。并非从某个表中选择行,处理这些行,再插入到另一个表,而是向另一个表 中插入获取某些行并对其处理的结果。我们要删除循环最下面的INSERT,而代之以一个PIPE ROW子句。 PIPE ROW子句允许这个PL/SQL例程生成表数据作为输出,这样我们就能从这个PL/SQL过程SELECT数据。 原本处理数据的过程性PL/SQL例程实际上变成了一个表,我们获取并处理的行就是输出。其实这种情况在 这本书中已经屡屡出现,每次执行以下语句时都是如此:
Select * from table(dbms_xplan.display);

这是一个 PL/SQL例程,它要读PLAN_TABLE;重建输出,甚至会增加一些行;然后使用PIPE ROW输
出这个数据,将其发回给客户。我们在这里实际上要做同样的事情,只不过允许并行地处理。
这个例子中要使用两个表:T1和 T2。T1 是先读的表,T2 表用来移入这个信息。假设这是一种ETL 过程,我们要运行这个过程获得每天的事务性数据,并将其转换,作为第二天的报告信息。我们要用的两 个表如下:
ops$tkyte-ORA10G> create table t1
2 as
3 select object_id id, object_name text

4 from all_objects; Table created.
ops$tkyte-ORA10G> begin
2 dbms_stats.set_table_stats
3 ( user, 'T1', numrows=>10000000,numblks=>100000 );
4 end;
5 /
PL/SQL procedure successfully completed.
ops$tkyte-ORA10G> create table t2
2 as
3 select t1.*, 0 session_id
4 from t1
5 where 1=0; Table created.

这里使用DBMS_STATS来“骗过“优化器,让它以为输入表中有10,000,000行,而且占用了100,000
个数据库块。在此我们想模拟一个大表。第二个表T2是第一个表的一个副本,只是在结构中增加了一个 SESSION_ID列。这个列很有用,可以通过它具体”看到“发生了并行化。

接下来,需要建立管道函数返回的对象类型。对于我们正在转换的这个过程,对象类型只是其“输出 “的一种结构化定义。在这个例子中,对象类型类似于T2:
ops$tkyte-ORA10G> CREATE OR REPLACE TYPE t2_type
2 AS OBJECT (
3 id number,
4 text varchar2(30),
5 session_id number
6 )
7 /

Type created.
ops$tkyte-ORA10G> create or replace type t2_tab_type
2 as table of t2_type
3 /
Type created.
现在来看管道函数,这只是重写了原来的PROCESS_DATA过程。现在这个过程是一个生成行的函数。
它接收数据作为输入,并在一个引用游标(ref cursor)中处理。这个函数返回一个T2_TAB_TYPE,这就 是我们刚才创建的对象类型。这是一个 PARALLEL_ENABLED(启用子并行)的管道函数。在此使用了分区
(partition)子句,这就告诉Oracle:“以任何最合适的方式划分或分解数据。我们不需要对数据的顺序 做任何假设。“
还 可以在引用游标中对特定列使用散列或区间分区。这就要使用一个强类型化的引用游标,从而使 编译器知道哪些列是可用的。根据所提供的散列,散列分区会向各个 并行执行服务器发送同样多的行来进 行处理。区间分区则基于分区键向各个并行执行服务器发送不重叠的数据区间。例如,如果在 ID上执行区 间分区,每个并行执行服务器可能会得到区间1…1000、1001…20000、20001…30000 等(该区间中的ID 值)。
在此,我们只想划分数据。数据如何划分对于我们的处理并不重要,所以定义如下:
ops$tkyte-ORA10G> create or replace
2 function parallel_pipelined( l_cursor in sys_refcursor )
3 return t2_tab_type
4 pipelined
5 parallel_enable ( partition l_cursor by any )

我们想查看哪些行由哪个并行执行服务器处理,所以声明一个局部变量 L_SESSION_ID,并从 V$MYSTAT

对其进行初始化:
6
7 is
8 l_session_id number;
9 l_rec t1%rowtype;
10 begin

11 select sid into l_session_id
12 from v$mystat
13 where rownum =1;
现在可以处理数据了。在此要获取一行(多多行,因为这里当然可以使用BULK COLLECT来实现对引
用游标的批量处理),执行复杂的处理,并输出。引用游标处理完成数时,将关闭这个游标,并返回:
14 loop
15 fetch l_cursor into l_rec;
16 exit when l_cursor%notfound;
17 -- complex process here
18 pipe row(t2_type(l_rec.id,l_rec.text,l_session_id));
19 end loop;
20 close l_cursor;
21 return;
22 end;
23 /
Function created.


这样就创建了函数。我们准备并行地处理数据,让Oracle根据可用的资源来确定最合适的并行度:
ops$tkyte-ORA10G> alter session enable parallel dml;
Session altered.
ops$tkyte-ORA10G> insert /*+ append */
2 into t2(id,text,session_id)
3 select *
4 from table(parallel_pipelined
5 (CURSOR(select /*+ parallel(t1) */ *

6 from t1 )
7 ))
8 /
48250 rows created.
ops$tkyte-ORA10G> commit; Commit complete.
为了查看这里发生了什么,可以查询新插入的数据,并按SESSION_ID分组,先来看使用了多少个并
行执行服务器,再看每个并行执行服务器处理了多少行:
ops$tkyte-ORA10G> select session_id, count(*)
2 from t2
3 group by session_id;
SESSION_ID COUNT(*)
---------- ----------
241 8040
246 8045
253 8042
254 8042
258 8040
260 8041
6 rows selected.

显然,对于这个并行操作的SELECT部分,我们使用了6个并行执行服务器,每个并行执行服务器处
理了大约8,040 记录。
可以看到,Oracle对我们的过程进行了并行化,但是为此需要从头重写原先的过程。从最初的实现
(串行过程)到现在(可以并行执行的过程),这实在是一条漫长的道理。所以,尽管Oracle可以并行地
处理我们的例程,但是并非所有例程都能编写为完成并行化。如果大规模重写你的过程不太可行,你可能 会对下一种实现感兴趣:DIY并行化。

14.6.2 DIY并行化

假设与上一节一样,也有同样的一个简单的串行过程。重写这个过程实现让我们难以承受,这实在太 费劲了,但是我们又希望并行地执行这个过程。该怎么做呢?我通常采用的方法是:使用rowid区间将表 划分为多个不重叠的区间(但要覆盖整个表)。
从概念上讲,这与Oracle执行并行查询的做法很相似。考虑一个全表扫描,Oracle处理全表扫描时 会提出某种方法将这个表划分为多个“小表“,每个小表分别由一个并行执行服务器处理。我们要用 rowid 区间来做同样的事情。在较早的版本中,Oracle的并行实现实际上使用了rowid区间。
我们要使用一个1,000,000含更多BIG_TABLE,因为这种技术最适用于有大量区段的大表,创建rowid 区间所用的方法取决于区段边界。使用的区段越多,数据分布就越好。所以,创建 1,000,000行的BIG_TABLE 之后,我们将如下创建T2:
big_table-ORA10G> create table t2
2 as
3 select object_id id, object_name text, 0 session_id
4 from big_table
5 where 1=0; Table created.

这里将使用数据库内置的任务队列来并行处理我们的过程。我们将调度2数目的任务。每个认为都是
对我们的过程稍加修改,只处理某个给定rowid区间中的行。
注意 在 Oracle 10g中,这种简单的工作可以使用调度工具完成,但是为了使这个例子与9i兼容, 这里使用了任务队列。

为了高效地支持任务队列,我们要使用一个参数表,向任务传递输入:
big_table-ORA10G> create table job_parms
2 ( job number primary key,
3 lo_rid rowid,
4 hi_rid rowid
5 )
6 /


Table created.
这样一来,我们就能只向过程传入任务 ID,2再查询表来得到所要处理的rowid区间。再来看我们的
过程。粗体显示的代码是新增的:
big_table-ORA10G> create or replace
2 procedure serial( p_job in number )
3 is
4 l_rec job_parms%rowtype;
5 begin
6 select * into l_rec
7 from job_parms
8 where job = p_job;
9
10 for x in ( select object_id id, object_name text
11 from big_table
12 where rowid between l_rec.lo_rid
13 and l_rec.hi_rid )
14 loop
15 -- complex process here
16 insert into t2 (id, text, session_id )
17 values ( x.id, x.text, p_job );
18 end loop;
19
20 delete from job_parms where job = p_job;
21 commit;
22 end;

23 /
Procedure created.
可以看到,改动并不大。大多数新增的代码只是用于得到输入和要处理的 rowid区间。对逻辑只有一
处修改:就是在第12行和第13行增加了谓词。
下面来调度任务。在此使用一个相当复杂的查询,它使用分析函数来划分表,在这种情况下,第19~

26行上的最内层查询将数据划分为8组。第22行上的第一个求和用于计算块数的累计总计;第23行上的 第二个求和得出总块数。如果将累计总和整除所需的“块大小“(chunk size,在这里就是总计大小除以8), 可以创建覆盖相同数量数据的文件/块组。第8~28行上的查询按GRP找出最高和最低文件号以及块号,并 返回不同的grp。这就建立了输入,可以把这个输入发送给DBMS_ROWID,创建Oracle所要的rowid。得到 该输出,并使用DBMS_JOB提交一个任务来处理这个rowid区间:
big_table-ORA10G> declare
2 l_job number;
3 begin
4 for x in (
5 select dbms_rowid.rowid_create
( 1, data_object_id, lo_fno, lo_block, 0 ) min_rid,
6 dbms_rowid.rowid_create
( 1, data_object_id, hi_fno, hi_block, 10000 ) max_rid
7 from (
8 select distinct grp,
9 first_value(relative_fno)
block_id
over (partition by grp order by relative_fno,
10 rows between unbounded preceding and unbounded
following) lo_fno,
11 first_value(block_id )
block_id
over (partition by grp order by relative_fno,

12 rows between unbounded preceding and unbounded following) lo_block,
13 last_value(relative_fno)
block_id
over (partition by grp order by relative_fno,
14 rows between unbounded preceding and unbounded
following) hi_fno,
15 last_value(block_id+blocks-1)
block_id
over (partition by grp order by relative_fno,
16 rows between unbounded preceding and unbounded
following) hi_block,
17 sum(blocks) over (partition by grp) sum_blocks
18 from (
19 select relative_fno,
20 block_id,
21 blocks,
22 trunc( (sum(blocks) over (order by relative_fno, block_id)-0.01) /
23 (sum(blocks) over ()/8) ) grp
24 from dba_extents
25 where segment_name = upper('BIG_TABLE')
26 and owner = user order by block_id
27 )
28 ),
29 (select data_object_id
upper('BIG_TABLE') )
30 )
31 loop

from user_objects where object_name =
32 dbms_job.submit( l_job, 'serial(JOB);' );
33 insert into job_parms(job, lo_rid, hi_rid)
34 values ( l_job, x.min_rid, x.max_rid );
35 end loop;
36 end;
37 /
PL/SQL procedure successfully completed.
这个PL/SQL块会为我们调度8个任务(如果由于区段或空间大小不足,以至于无法将表划分为8个

部分,调度的任务可能会少一些)。如下可以看到调度了多少任务以及这些任务的输入:
big_table-ORA10G> select * from job_parms;
JOB LO_RID HI_RID
---------- ------------------ ------------------
172 AAAT7tAAEAAAAkpAAA AAAT7tAAEAAABQICcQ
173 AAAT7tAAEAAABQJAAA AAAT7tAAEAAABwICcQ
174 AAAT7tAAEAAABwJAAA AAAT7tAAEAAACUICcQ
175 AAAT7tAAEAAACUJAAA AAAT7tAAEAAAC0ICcQ
176 AAAT7tAAEAAAC0JAAA AAAT7tAAEAAADMICcQ
177 AAAT7tAAEAAADaJAAA AAAT7tAAEAAAD6ICcQ
178 AAAT7tAAEAAAD6JAAA AAAT7tAAEAAAEaICcQ
179 AAAT7tAAEAAAEaJAAA AAAT7tAAEAAAF4ICcQ

8 rows selected.
big_table-ORA10G> commit; Commit complete.
一旦提交就会开始处理我们的任务。在参数文件在我们将JOB_QUEUE_PROCESSES设置为8,所有8个
任务都开始运行,并很快完成。结果如下:
big_table-ORA10G> select session_id, count(*)
2 from t2
3 group by session_id;
SESSION_ID COUNT(*)
---------- ----------
172 130055
173 130978
174 130925
175 129863
176 106154
177 140772
178 140778
179 90475
8 rows selected.

在这里,虽然不如Oracle内置并行化分布那么均匀,但是已经很不错了。如果还记得,前面你曾经
看到过每个并行执行服务器处理了多少行,使用内置并行化时,行数彼此之间非常接近(只相差1或 2)。 在此,有一个任务只处理了90,475行,而另一个任务处理了多达140,778行,另外的大多数任务都处理了 大约130,000行。
不过,假设你不想使用rowid处理,允许是因为查询不像SELECT * FROM T那么简单,而涉及联结和 其他构造,这使得使用 rowid 是不切实际的。此时就可以使用某个表的主键。例如,假设你下把这个

BIG_TABLE划分为10个部分,按主键并发地处理。使用内置NTILE分析函数就能很轻松地做到。这个过程 相当简单:
big_table-ORA10G> select nt, min(id), max(id), count(*)
2 from (
3 select id, ntile(10) over (order by id) nt
4 from big_table
5 )
6 group by nt;
NT MIN(ID) MAX(ID) COUNT(*)
---------- ---------- ---------- ----------big_table-ORA10G> select nt, min(id), max(id), count(*)
2 from (
3 select id, ntile(10) over (order by id) nt
4 from big_table
5 )
6 group by nt;
NT MIN(ID) MAX(ID) COUNT(*)
---------- ---------- ---------- ----------
1 1 100000 100000
2 100001 200000 100000
3 200001 300000 100000
4 300001 400000 100000
5 400001 500000 100000
6 500001 600000 100000

7 600001 700000 100000
8 700001 800000 100000
9 800001 900000 100000
10 900001 1000000 100000
10 rows selected.
1 1 100000 100000
2 100001 200000 100000
3 200001 300000 100000
4 300001 400000 100000
5 400001 500000 100000
6 500001 600000 100000
7 600001 700000 100000
8 700001 800000 100000
9 800001 900000 100000
10 900001 1000000 100000
10 rows selected.
现在就有了10个互不重叠的主键区间,大小都一样,可以使用这些区间来实现如前所示的DBMS_JOB
技术,从而并行化你的过程。