VMAPをOpenMPを使って高速化する
というプログラムをC++ Builderで書いていて、長年メンテナンスしてきましたが、高速化の必要があり、プラットフォームをMFCを用いたVisual C++での開発へ移行したことがあります。最終的にはGPUを駆動して高速化をするところまで到達していましたが。この記事ではOpenMPな改良の範囲を記述します。相関計算エンジン部分の速度評価をするために、全体を思いっきり簡素化して、

VMAPのサブセットですね。spiralsearchをしている部分つまり相関計算し局所的な最大値を探索している部分は、
void __fastcall TForm1::SpiralAnalysis(void)
{
#define FLIMIT 1
#define DLIMIT 10000
int offsetx=0,offsety=0;
int horpos;
double sum=0.0;
double sumupper,sumdownleft,sumdownright,sumdown;
double p1,p2;
double maxcorr = 0.0;
int maxposx,maxposy,currx,curry,maxofx,maxofy,maxindex;
char obuf[100];
TImage* targs;
TImage* targd;
TRect area(0,0,iw-1,ih-1);
//TRect corr(0,0,400,200);
if( dlimit > 389376 ){
ShowMessage("delta limitは389376以下");
return;
}
if( other )
targd = Form3->Image1;
else
targd = Image2;
if( other )
targs = Form2->Image1;
else
targs = Image1;
targd->Picture->Bitmap->PixelFormat=pf24bit; // should be 8bit
targd->Picture->Bitmap->Width = targd->Width;
targd->Picture->Bitmap->Height = targd->Height;
//
targd->Canvas->Pen->Mode = pmCopy; //描画時に色を反転するように指定
targd->Canvas->Pen->Color = clRed; //青色を指定
targd->Canvas->Brush->Style = bsClear; //塗り潰しはしない
//前回描いた四角を消す
//Image2->Canvas->Rectangle(FClickPoint.x,FClickPoint.y,FPrevPoint.x,FPrevPoint.y);
//新しく四角を描く
//Image2->Canvas->Rectangle(FClickPoint.x,FClickPoint.y,X,Y);
//
//targd->Canvas->Rectangle(0,0,targd->Width,targd->Height);
if( nottarget )
return;
//Form2->disparea(roi);
disparea(0,0,roi);
targs->Canvas->Pen->Mode = pmCopy; //描画時に色を反転するように指定
targs->Canvas->Pen->Color = clRed; //青色を指定
targs->Canvas->Brush->Style = bsClear; //塗り潰しはしない
int centerx = (RoiStartx + RoiEndx )/2;
int centery = (RoiStarty + RoiEndy )/2;
targs->Canvas->MoveTo(centerx-2,centery);
targs->Canvas->LineTo(centerx+2,centery);
targs->Canvas->MoveTo(centerx,centery-2);
targs->Canvas->LineTo(centerx,centery+2);
int targeframe = StrToInt(Edit4->Text);
//for( int targetframe = framenum ; targetframe < framenum + FLIMIT ; targetframe++ ){
// source display then copy
//if( targetframe != framenum ){
// DispFrame(targetframe);
// Image1->Refresh();
// Edit1->Text = IntToStr(targetframe);
//}
//Image2->Canvas->CopyRect(area,Image1->Canvas,area) ;
//Image2->Refresh();
//Edit3->Text = IntToStr(targetframe);
//PaintBox2->Canvas->Brush->Color = clDkGray;
//PaintBox2->Canvas->Brush->Color = clMedGray;
PaintBox2->Canvas->Brush->Color = Form1->Color; // back ground is clBtnFace
PaintBox2->Canvas->FillRect(Rect(0,0,399,199));
for( int deltaindex = 0 ; deltaindex < dlimit ; deltaindex++ ){
if( linearmode ){
if( horiz )
offsetx += 1;
else
offsety += 1;
}
else {
offsetx += dx[deltaindex];
offsety += dy[deltaindex];
}
if( RoiStartx + offsetx < 0 || RoiEndx + offsetx > iw )
continue;
if( RoiStarty + offsety < 0 || RoiEndy + offsety > ih )
continue;
sum = 0.0;
//Image2->Canvas->Rectangle(RoiStartx+offsetx,RoiStarty+offsety,RoiEndx+offsetx,RoiEndy+offsety);
//Image2->Refresh();
setdestbuf(offsetx,offsety);
//Form2->disparea(dest);
//ShowMessage("This is destination area");
//PaintBox2->Canvas->FillRect(area);
sumupper = 0.0;
sumdownleft = 0.0;
sumdownright = 0.0;
for( int y = 0 ; y < RoiSizeY ; y++ ){
for( int x = 0 ; x < RoiSizeX ; x++ ){
p1 = (double)(roi[y*RoiSizeX+x]-roiave);
p2 = (double)(dest[y*RoiSizeX+x]-destave);
sumupper += p1*p2;
sumdownleft += p1*p1;
sumdownright += p2*p2;
}
}
sumdown = sqrt(sumdownleft)*sqrt(sumdownright);
sum = sumupper/sumdown;
horpos = 400*deltaindex/dlimit;
PaintBox2->Canvas->MoveTo(horpos,199);
PaintBox2->Canvas->LineTo(horpos,200-200*sum);
currx = (RoiStartx+RoiEndx)/2 + offsetx;
curry = (RoiStarty+RoiEndy)/2 + offsety;
if( sum > maxcorr ){
maxcorr = sum;
maxposx = currx;
maxposy = curry;
maxofx = offsetx;
maxofy = offsety;
maxindex = deltaindex;
}
results[curry*iw+currx] = sum;
if( sum < 0.0 )
sum *= -1.0;
if( avimode || still )
brframe[curry*iw+currx] = (Byte)(255*sum);
else
rframe[curry*iw+currx] = (Word)(255*sum);
// save to value matrix
// max and two more values -> offset calc
//
//sprintf(obuf,"%d %g\n",deltaindex,sum);
//Memo1->Lines->Add(obuf);
free(dest);
}
interpolate(maxposx,maxposy);
// results cursor set at maxposx,maxposy in Image2
targd->Canvas->MoveTo(maxposx-2,maxposy);
targd->Canvas->LineTo(maxposx+2,maxposy);
targd->Canvas->MoveTo(maxposx,maxposy-2);
targd->Canvas->LineTo(maxposx,maxposy+2);
sprintf(obuf,"max correlation is %g at %d,%d:%d\n\n",maxcorr,maxofx,maxofy,maxindex);
Memo1->Lines->Add(obuf);
sprintf(obuf,"offset comp. is %g,%g",ox,oy);
Memo1->Lines->Add(obuf);
//ShowMessage("kick outs");
sprintf(obuf,"Object Motion is (%d,%d) -> (%g,%g)",centerx,centery,maxposx+ox,maxposy+oy);
Memo1->Lines->Add(obuf) ;
sprintf(obuf,"Object Displacement between fr %d->%d is %g,%g",sourceframe,destframe,maxposx+ox-centerx,maxposy+oy-centery);
Memo1->Lines->Add(obuf) ;
sprintf(obuf,"in real dimension %g,%g",(maxposx+ox-centerx)*spacefactor,(maxposy+oy-centery)*spacefactor);
setdestbuf(maxofx,maxofy); // image in dest holds unrotated image
disparea(RoiSizeX,0,dest);
ShowMessage("source area and deteced area");
int offset = 0;
int yoffset = 1;
for( double angle = -30.0 ; angle < 30.5 ; angle += anglestep ,offset++ ){
showrotation(maxposx,maxposy,angle,1); // rotate 10.0 image at maxofx,maxofy in destbuf
if( RoiSizeX*offset > iw ){
offset = 0;
yoffset++;
}
disparea(RoiSizeX*offset,yoffset*RoiSizeY,rdest);
sumupper = 0.0;
sumdownleft = 0.0;
sumdownright = 0.0;
for( int y = 0 ; y < RoiSizeY ; y++ ){
for( int x = 0 ; x < RoiSizeX ; x++ ){
p1 = (double)(roi[y*RoiSizeX+x]-roiave);
p2 = (double)(rdest[y*RoiSizeX+x]-rdestave);
sumupper += p1*p2;
sumdownleft += p1*p1;
sumdownright += p2*p2;
}
}
sumdown = sqrt(sumdownleft)*sqrt(sumdownright);
sum = sumupper/sumdown;
sprintf(obuf,"%g\t%g\t%g",angle,sum,maxcorr);
Memo1->Lines->Add(obuf);
}
// if( avimode )
// dispwhole(brframe);
// else
// dispwhole((Word *)rframe);
//}
}
要するに積和計算の重いループですが、これをどう並列化するかですね。結果は、
spiralsearch::spiralsearch(roi& templa ,CRect& start)
{
limit = 1000; // for now
maxcorrval = 0;
//CRect next;
double tempvalue;
int width;
int height;
//roi* c;
width = selected->getframerect().Width();
height = selected->getframerect().Height();
curr = check;
/*
results = (double *)malloc(width*height*sizeof(double));
if( !results )
exit(1);
corrvalues = (double*)malloc(1000*sizeof(double));
if( !corrvalues )
exit(1);
*/
//roi templa(templat);
//next = matcht;
//next = new roi(matcht);
//__itt_resume();
#pragma omp parallel for
//corrvalues[0] = 0.0; // special case
for( int x = 0 ; x < 1000 ; x++ ){
int xpos;
int ypos;
//next += spiral[x];
CRect next = start + spiral[x];
try {
roi* c = new roi(next);
xpos = (next.left+next.right)/2;
ypos = (next.top+next.bottom)/2;
//selected->disparect(*c,IDC_pict);
//basedlg->DispRoi(*c);
corrvalues[x] = templa*(*c);
//results[ypos*width+xpos] = corrvalues[x];
delete c;
}
catch( CMemoryException& e) {
TRACE("new roi error trapped");
exit(1);
}
//delete c;
}
for( int x = 1 ; x < 1000 ; x++ ){
if( corrvalues[x] > maxcorrval ){
maxcorrval = corrvalues[x];
maxdelta = x;
maxrect = start + spiral[x];
}
}
//TRACE("%d:max at spiral %d\n",targetframe,maxdelta);
//__itt_pause();
}ここで高速化の観点で重要なのは、
#pragma omp parallel for
//corrvalues[0] = 0.0; // special case
for( int x = 0 ; x < 1000 ; x++ ){
int xpos;
int ypos;
//next += spiral[x];
CRect next = start + spiral[x];
try {
roi* c = new roi(next);
xpos = (next.left+next.right)/2;
ypos = (next.top+next.bottom)/2;
//selected->disparect(*c,IDC_pict);
//basedlg->DispRoi(*c);
corrvalues[x] = templa*(*c);
//results[ypos*width+xpos] = corrvalues[x];
delete c;
}
catch( CMemoryException& e) {
TRACE("new roi error trapped");
exit(1);
}
//delete c;
}
for( int x = 1 ; x < 1000 ; x++ ){
if( corrvalues[x] > maxcorrval ){
maxcorrval = corrvalues[x];
maxdelta = x;
maxrect = start + spiral[x];
}
}
の部分です。OpenMPのディレクティブで、自動並列化が可能で、得られるリソースに適切に割り当てると高速化が可能です。VMAPの場合は、ROIで並列化しています。フレームで並列化することも原理的には可能ですが、前の結果が次の結果に影響しますからかえって複雑化します。OpenMPの利点は、並列化をしないシリアルなコードにpragmaを加えて、単一のソースコードでシリアルなコードとOpenMPによって並列化したコードを使い分けることができる点です。
OpenMPの扱いについてはさすがにM$ではなく純正のIntel C Compilerが優秀でした。並列化の結果はGPUをCuda経由で駆動した結果と合わせて別の記事で書く予定です。


コメント