並列化による相関係数計算の高速化

VMAPOpenMPを使って高速化する

というプログラムをC++ Builderで書いていて、長年メンテナンスしてきましたが、高速化の必要があり、プラットフォームをMFCを用いたVisual C++での開発へ移行したことがあります。最終的にはGPUを駆動して高速化をするところまで到達していましたが。この記事ではOpenMPな改良の範囲を記述します。相関計算エンジン部分の速度評価をするために、全体を思いっきり簡素化して、

VMAPのサブセットですね。spiralsearchをしている部分つまり相関計算し局所的な最大値を探索している部分は、

void __fastcall TForm1::SpiralAnalysis(void)
{

#define FLIMIT 1
#define DLIMIT 10000

        int     offsetx=0,offsety=0;
        int     horpos;

        double   sum=0.0;
        double sumupper,sumdownleft,sumdownright,sumdown;
        double  p1,p2;
        double maxcorr = 0.0;
        int     maxposx,maxposy,currx,curry,maxofx,maxofy,maxindex;

        char obuf[100];
        TImage* targs;
        TImage* targd;


        TRect area(0,0,iw-1,ih-1);
        //TRect corr(0,0,400,200);


        if( dlimit > 389376 ){
                ShowMessage("delta limitは389376以下");
                return;
        }

        if( other )
                targd = Form3->Image1;
        else
                targd = Image2;

        if( other )
                targs = Form2->Image1;
        else
                targs = Image1;

        targd->Picture->Bitmap->PixelFormat=pf24bit;   // should be 8bit
        targd->Picture->Bitmap->Width = targd->Width;
        targd->Picture->Bitmap->Height = targd->Height;
//
        targd->Canvas->Pen->Mode = pmCopy;    //描画時に色を反転するように指定
        targd->Canvas->Pen->Color = clRed;     //青色を指定
        targd->Canvas->Brush->Style = bsClear;  //塗り潰しはしない

        //前回描いた四角を消す
        //Image2->Canvas->Rectangle(FClickPoint.x,FClickPoint.y,FPrevPoint.x,FPrevPoint.y);
        //新しく四角を描く
        //Image2->Canvas->Rectangle(FClickPoint.x,FClickPoint.y,X,Y);

//

        //targd->Canvas->Rectangle(0,0,targd->Width,targd->Height);
        if( nottarget )
                return;

        //Form2->disparea(roi);

        disparea(0,0,roi);

        targs->Canvas->Pen->Mode = pmCopy;    //描画時に色を反転するように指定
        targs->Canvas->Pen->Color = clRed;     //青色を指定
        targs->Canvas->Brush->Style = bsClear;  //塗り潰しはしない

        int     centerx = (RoiStartx + RoiEndx )/2;
        int     centery = (RoiStarty + RoiEndy )/2;


                targs->Canvas->MoveTo(centerx-2,centery);
                targs->Canvas->LineTo(centerx+2,centery);
                targs->Canvas->MoveTo(centerx,centery-2);
                targs->Canvas->LineTo(centerx,centery+2);




        int     targeframe = StrToInt(Edit4->Text);

        //for( int targetframe = framenum ; targetframe < framenum + FLIMIT ; targetframe++ ){
                // source display then copy
                //if( targetframe != framenum ){
                //        DispFrame(targetframe);
                //        Image1->Refresh();
                //        Edit1->Text = IntToStr(targetframe);
                //}
                //Image2->Canvas->CopyRect(area,Image1->Canvas,area) ;
                //Image2->Refresh();
                //Edit3->Text = IntToStr(targetframe);

                //PaintBox2->Canvas->Brush->Color = clDkGray;
                //PaintBox2->Canvas->Brush->Color = clMedGray;
                PaintBox2->Canvas->Brush->Color = Form1->Color; // back ground is clBtnFace
                PaintBox2->Canvas->FillRect(Rect(0,0,399,199));

                for( int deltaindex = 0 ; deltaindex < dlimit ; deltaindex++ ){
                        if( linearmode ){
                                if( horiz )
                                        offsetx += 1;
                                else
                                        offsety += 1;

                        }
                        else {
                                offsetx += dx[deltaindex];
                                offsety += dy[deltaindex];
                        }

                        if( RoiStartx + offsetx < 0 || RoiEndx + offsetx > iw )
                                continue;
                        if( RoiStarty + offsety < 0 || RoiEndy + offsety > ih )
                                continue;

                        sum = 0.0;
                        //Image2->Canvas->Rectangle(RoiStartx+offsetx,RoiStarty+offsety,RoiEndx+offsetx,RoiEndy+offsety);
                        //Image2->Refresh();
                        setdestbuf(offsetx,offsety);
                        //Form2->disparea(dest);
                        //ShowMessage("This is destination area");
                        //PaintBox2->Canvas->FillRect(area);

                        sumupper = 0.0;
                        sumdownleft = 0.0;
                        sumdownright = 0.0;

                        for(  int y = 0 ; y < RoiSizeY ; y++ ){
                                for( int x = 0 ; x < RoiSizeX ; x++ ){
                                        p1 = (double)(roi[y*RoiSizeX+x]-roiave);
                                        p2 = (double)(dest[y*RoiSizeX+x]-destave);
                                        sumupper += p1*p2;
                                        sumdownleft += p1*p1;
                                        sumdownright += p2*p2;

                                }
                        }
                        sumdown = sqrt(sumdownleft)*sqrt(sumdownright);
                        sum = sumupper/sumdown;

                        horpos = 400*deltaindex/dlimit;

                        PaintBox2->Canvas->MoveTo(horpos,199);

                        PaintBox2->Canvas->LineTo(horpos,200-200*sum);
                         currx = (RoiStartx+RoiEndx)/2 + offsetx;
                         curry = (RoiStarty+RoiEndy)/2 + offsety;
                        if( sum > maxcorr ){
                                maxcorr = sum;
                                maxposx = currx;
                                maxposy = curry;
                                maxofx = offsetx;
                                maxofy = offsety;
                                maxindex = deltaindex;

                        }

                        results[curry*iw+currx] = sum;
                        if( sum < 0.0 )
                                sum *= -1.0;
                        if( avimode || still )
                              brframe[curry*iw+currx] = (Byte)(255*sum);
                        else
                                rframe[curry*iw+currx] = (Word)(255*sum);

                        //      save to value matrix
                        //      max and two more values -> offset calc
                        //
                        //sprintf(obuf,"%d %g\n",deltaindex,sum);
                        //Memo1->Lines->Add(obuf);
                        free(dest);

                }
                interpolate(maxposx,maxposy);

                // results cursor  set at maxposx,maxposy in Image2

                targd->Canvas->MoveTo(maxposx-2,maxposy);
                targd->Canvas->LineTo(maxposx+2,maxposy);
                targd->Canvas->MoveTo(maxposx,maxposy-2);
                targd->Canvas->LineTo(maxposx,maxposy+2);


                sprintf(obuf,"max correlation is %g at %d,%d:%d\n\n",maxcorr,maxofx,maxofy,maxindex);
                Memo1->Lines->Add(obuf);
                sprintf(obuf,"offset comp. is %g,%g",ox,oy);
                Memo1->Lines->Add(obuf);
                //ShowMessage("kick outs");
                sprintf(obuf,"Object Motion is (%d,%d) -> (%g,%g)",centerx,centery,maxposx+ox,maxposy+oy);
                Memo1->Lines->Add(obuf) ;
                sprintf(obuf,"Object Displacement between fr %d->%d  is %g,%g",sourceframe,destframe,maxposx+ox-centerx,maxposy+oy-centery);
                Memo1->Lines->Add(obuf) ;
                sprintf(obuf,"in real dimension %g,%g",(maxposx+ox-centerx)*spacefactor,(maxposy+oy-centery)*spacefactor);
                setdestbuf(maxofx,maxofy);      // image in dest holds unrotated image
                disparea(RoiSizeX,0,dest);
                ShowMessage("source area and deteced area");

                int offset = 0;
                int     yoffset = 1;

                for( double angle = -30.0 ; angle < 30.5 ; angle += anglestep ,offset++ ){
                        showrotation(maxposx,maxposy,angle,1);      // rotate 10.0 image at maxofx,maxofy in destbuf


                        if( RoiSizeX*offset > iw ){
                                offset = 0;
                                yoffset++;
                        }
                        disparea(RoiSizeX*offset,yoffset*RoiSizeY,rdest);

                        sumupper = 0.0;
                        sumdownleft = 0.0;
                        sumdownright = 0.0;

                        for(  int y = 0 ; y < RoiSizeY ; y++ ){
                                for( int x = 0 ; x < RoiSizeX ; x++ ){
                                        p1 = (double)(roi[y*RoiSizeX+x]-roiave);
                                        p2 = (double)(rdest[y*RoiSizeX+x]-rdestave);
                                        sumupper += p1*p2;
                                        sumdownleft += p1*p1;
                                        sumdownright += p2*p2;

                                }
                        }
                        sumdown = sqrt(sumdownleft)*sqrt(sumdownright);
                        sum = sumupper/sumdown;

                        sprintf(obuf,"%g\t%g\t%g",angle,sum,maxcorr);
                        Memo1->Lines->Add(obuf);


                  }

//                if( avimode )
//                        dispwhole(brframe);
//                else
//                        dispwhole((Word *)rframe);

        //}


}

要するに積和計算の重いループですが、これをどう並列化するかですね。結果は、

spiralsearch::spiralsearch(roi& templa ,CRect& start)
{

	limit = 1000;	// for now
	maxcorrval = 0;
	//CRect next;
	double tempvalue;
	int	width;
	int height;
	//roi* c;


	width = selected->getframerect().Width();
	height = selected->getframerect().Height();
	curr = check;

/*
	results = (double *)malloc(width*height*sizeof(double));
	if( !results )
		exit(1);

	corrvalues = (double*)malloc(1000*sizeof(double));
	if( !corrvalues )
		exit(1);
*/
	//roi templa(templat);

	//next = matcht;
	//next = new roi(matcht);
	//__itt_resume();



#pragma omp parallel for
	//corrvalues[0] = 0.0;	// special case
	for( int x = 0 ; x < 1000 ; x++ ){
		int xpos;
		int ypos;


		//next += spiral[x];
		CRect next = start + spiral[x];

		try {	
			roi* c = new roi(next);
		

		xpos = (next.left+next.right)/2;
		ypos = (next.top+next.bottom)/2;

		//selected->disparect(*c,IDC_pict);
		//basedlg->DispRoi(*c);
		corrvalues[x] = templa*(*c);
		//results[ypos*width+xpos] = corrvalues[x];
		delete c;
		}
		catch( CMemoryException& e) {
			TRACE("new roi error trapped");
			exit(1);
		}
		
		//delete c;
	}

	for( int x = 1 ; x < 1000 ; x++ ){
			if( corrvalues[x] > maxcorrval ){
				maxcorrval = corrvalues[x];
				maxdelta = x;
				maxrect = start + spiral[x];
			}
	}
	//TRACE("%d:max at spiral %d\n",targetframe,maxdelta);
	//__itt_pause();
}

ここで高速化の観点で重要なのは、

#pragma omp parallel for
	//corrvalues[0] = 0.0;	// special case
	for( int x = 0 ; x < 1000 ; x++ ){
		int xpos;
		int ypos;


		//next += spiral[x];
		CRect next = start + spiral[x];

		try {	
			roi* c = new roi(next);
		

		xpos = (next.left+next.right)/2;
		ypos = (next.top+next.bottom)/2;

		//selected->disparect(*c,IDC_pict);
		//basedlg->DispRoi(*c);
		corrvalues[x] = templa*(*c);
		//results[ypos*width+xpos] = corrvalues[x];
		delete c;
		}
		catch( CMemoryException& e) {
			TRACE("new roi error trapped");
			exit(1);
		}
		
		//delete c;
	}

	for( int x = 1 ; x < 1000 ; x++ ){
			if( corrvalues[x] > maxcorrval ){
				maxcorrval = corrvalues[x];
				maxdelta = x;
				maxrect = start + spiral[x];
			}
	}

の部分です。OpenMPのディレクティブで、自動並列化が可能で、得られるリソースに適切に割り当てると高速化が可能です。VMAPの場合は、ROIで並列化しています。フレームで並列化することも原理的には可能ですが、前の結果が次の結果に影響しますからかえって複雑化します。OpenMPの利点は、並列化をしないシリアルなコードにpragmaを加えて、単一のソースコードでシリアルなコードとOpenMPによって並列化したコードを使い分けることができる点です。

OpenMPの扱いについてはさすがにM$ではなく純正のIntel C Compilerが優秀でした。並列化の結果はGPUをCuda経由で駆動した結果と合わせて別の記事で書く予定です。

コメント