Skip to content

Commit

Permalink
feat: add support for hash to bingads offline conversions (#5390)
Browse files Browse the repository at this point in the history
* feat: add support for hash to bingads offline conversions

* chore: fix lint errors

* fix: remove redundant parameters
  • Loading branch information
ItsSudip authored Dec 27, 2024
1 parent 46d9b6a commit 8a186e5
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,7 @@ var _ = Describe("Bing ads Offline Conversions", func() {
initBingads()
ctrl := gomock.NewController(GinkgoT())
bingAdsService := mockbulkservice.NewMockBulkServiceI(ctrl)
clientI := Client{
URL: "http://localhost/upload1",
client: &http.Client{},
}
bulkUploader := NewBingAdsBulkUploader(logger.NOP, stats.NOP, "BING_ADS", bingAdsService, &clientI)
bulkUploader := NewBingAdsBulkUploader(logger.NOP, stats.NOP, "BING_ADS", bingAdsService, true)
bingAdsService.EXPECT().GetBulkUploadUrl().Return(&mockbingads.GetBulkUploadUrlResponse{
UploadUrl: "http://localhost/upload1",
RequestId: misc.FastUUID().URN(),
Expand Down Expand Up @@ -138,8 +134,7 @@ var _ = Describe("Bing ads Offline Conversions", func() {
initBingads()
ctrl := gomock.NewController(GinkgoT())
bingAdsService := mockbulkservice.NewMockBulkServiceI(ctrl)
clientI := Client{}
bulkUploader := NewBingAdsBulkUploader(logger.NOP, stats.NOP, "BING_ADS", bingAdsService, &clientI)
bulkUploader := NewBingAdsBulkUploader(logger.NOP, stats.NOP, "BING_ADS", bingAdsService, true)
errorMsg := "Error in getting bulk upload url"
bingAdsService.EXPECT().GetBulkUploadUrl().Return(nil, errors.New(errorMsg))
bingAdsService.EXPECT().GetBulkUploadUrl().Return(nil, errors.New(errorMsg))
Expand Down Expand Up @@ -181,8 +176,7 @@ var _ = Describe("Bing ads Offline Conversions", func() {
initBingads()
ctrl := gomock.NewController(GinkgoT())
bingAdsService := mockbulkservice.NewMockBulkServiceI(ctrl)
ClientI := Client{}
bulkUploader := NewBingAdsBulkUploader(logger.NOP, stats.NOP, "BING_ADS", bingAdsService, &ClientI)
bulkUploader := NewBingAdsBulkUploader(logger.NOP, stats.NOP, "BING_ADS", bingAdsService, true)
errMsg := "unable to get bulk upload url, check your credentials"
bingAdsService.EXPECT().GetBulkUploadUrl().Return(nil, errors.New(errMsg))

Expand Down Expand Up @@ -227,8 +221,7 @@ var _ = Describe("Bing ads Offline Conversions", func() {
initBingads()
ctrl := gomock.NewController(GinkgoT())
bingAdsService := mockbulkservice.NewMockBulkServiceI(ctrl)
clientI := Client{}
bulkUploader := NewBingAdsBulkUploader(logger.NOP, stats.NOP, "BING_ADS", bingAdsService, &clientI)
bulkUploader := NewBingAdsBulkUploader(logger.NOP, stats.NOP, "BING_ADS", bingAdsService, true)
bingAdsService.EXPECT().GetBulkUploadUrl().Return(&mockbingads.GetBulkUploadUrlResponse{
UploadUrl: "http://localhost/upload1",
RequestId: misc.FastUUID().URN(),
Expand Down Expand Up @@ -285,8 +278,7 @@ var _ = Describe("Bing ads Offline Conversions", func() {
initBingads()
ctrl := gomock.NewController(GinkgoT())
bingAdsService := mockbulkservice.NewMockBulkServiceI(ctrl)
clientI := Client{}
bulkUploader := NewBingAdsBulkUploader(logger.NOP, stats.NOP, "BING_ADS", bingAdsService, &clientI)
bulkUploader := NewBingAdsBulkUploader(logger.NOP, stats.NOP, "BING_ADS", bingAdsService, true)

bingAdsService.EXPECT().GetBulkUploadStatus("dummyRequestId123").Return(&mockbingads.GetBulkUploadStatusResponse{
PercentComplete: int64(100),
Expand All @@ -308,8 +300,7 @@ var _ = Describe("Bing ads Offline Conversions", func() {
initBingads()
ctrl := gomock.NewController(GinkgoT())
bingAdsService := mockbulkservice.NewMockBulkServiceI(ctrl)
clientI := Client{}
bulkUploader := NewBingAdsBulkUploader(logger.NOP, stats.NOP, "BING_ADS", bingAdsService, &clientI)
bulkUploader := NewBingAdsBulkUploader(logger.NOP, stats.NOP, "BING_ADS", bingAdsService, true)

bingAdsService.EXPECT().GetBulkUploadStatus("dummyRequestId123").Return(nil, fmt.Errorf("failed to get bulk upload status:"))
pollInput := common.AsyncPoll{
Expand All @@ -327,8 +318,7 @@ var _ = Describe("Bing ads Offline Conversions", func() {
initBingads()
ctrl := gomock.NewController(GinkgoT())
bingAdsService := mockbulkservice.NewMockBulkServiceI(ctrl)
clientI := Client{}
bulkUploader := NewBingAdsBulkUploader(logger.NOP, stats.NOP, "BING_ADS", bingAdsService, &clientI)
bulkUploader := NewBingAdsBulkUploader(logger.NOP, stats.NOP, "BING_ADS", bingAdsService, true)

bingAdsService.EXPECT().GetBulkUploadStatus("dummyRequestId123").Return(&mockbingads.GetBulkUploadStatusResponse{
PercentComplete: int64(100),
Expand Down Expand Up @@ -356,8 +346,7 @@ var _ = Describe("Bing ads Offline Conversions", func() {
initBingads()
ctrl := gomock.NewController(GinkgoT())
bingAdsService := mockbulkservice.NewMockBulkServiceI(ctrl)
clientI := Client{}
bulkUploader := NewBingAdsBulkUploader(logger.NOP, stats.NOP, "BING_ADS", bingAdsService, &clientI)
bulkUploader := NewBingAdsBulkUploader(logger.NOP, stats.NOP, "BING_ADS", bingAdsService, true)

bingAdsService.EXPECT().GetBulkUploadStatus("dummyRequestId123").Return(&mockbingads.GetBulkUploadStatusResponse{
PercentComplete: int64(0),
Expand All @@ -383,8 +372,7 @@ var _ = Describe("Bing ads Offline Conversions", func() {
initBingads()
ctrl := gomock.NewController(GinkgoT())
bingAdsService := mockbulkservice.NewMockBulkServiceI(ctrl)
clientI := Client{}
bulkUploader := NewBingAdsBulkUploader(logger.NOP, stats.NOP, "BING_ADS", bingAdsService, &clientI)
bulkUploader := NewBingAdsBulkUploader(logger.NOP, stats.NOP, "BING_ADS", bingAdsService, true)

bingAdsService.EXPECT().GetBulkUploadStatus("dummyRequestId123").Return(&mockbingads.GetBulkUploadStatusResponse{
PercentComplete: int64(0),
Expand All @@ -410,8 +398,7 @@ var _ = Describe("Bing ads Offline Conversions", func() {
initBingads()
ctrl := gomock.NewController(GinkgoT())
bingAdsService := mockbulkservice.NewMockBulkServiceI(ctrl)
clientI := Client{}
bulkUploader := NewBingAdsBulkUploader(logger.NOP, stats.NOP, "BING_ADS", bingAdsService, &clientI)
bulkUploader := NewBingAdsBulkUploader(logger.NOP, stats.NOP, "BING_ADS", bingAdsService, true)

bingAdsService.EXPECT().GetBulkUploadStatus("dummyRequestId456").Return(&mockbingads.GetBulkUploadStatusResponse{
PercentComplete: int64(100),
Expand Down Expand Up @@ -454,10 +441,8 @@ var _ = Describe("Bing ads Offline Conversions", func() {
http.ServeFile(w, r, errorsTemplateFilePath)
}))
defer ts.Close()
client := ts.Client()
modifiedURL := ts.URL // Use the test server URL
clientI := Client{client: client, URL: modifiedURL}
bulkUploader := NewBingAdsBulkUploader(logger.NOP, stats.NOP, "BING_ADS", bingAdsService, &clientI)
bulkUploader := NewBingAdsBulkUploader(logger.NOP, stats.NOP, "BING_ADS", bingAdsService, true)

UploadStatsInput := common.GetUploadStatsInput{
FailedJobParameters: modifiedURL,
Expand Down Expand Up @@ -531,11 +516,15 @@ var _ = Describe("Bing ads Offline Conversions", func() {
})
It("Transform() Test -> successful ", func() {
job := &jobsdb.JobT{
EventPayload: []byte("{\"type\": \"record\", \"action\": \"insert\", \"fields\": {\"conversionName\": \"Test-Integration\", \"conversionTime\": \"5/22/2023 6:27:54 AM\", \"conversionValue\": \"100\", \"microsoftClickId\": \"click_id\", \"conversionCurrencyCode\": \"USD\"}}"),
EventPayload: []byte("{\n \"type\": \"record\",\n \"action\": \"insert\",\n \"fields\": {\n \"conversionName\": \"Test-Integration\",\n \"conversionTime\": \"5/22/2023 6:27:54 AM\",\n \"conversionValue\": \"100\",\n \"microsoftClickId\": \"click_id\",\n \"conversionCurrencyCode\": \"USD\",\n \"email\":\"[email protected]\",\n \"phone\":\"+911234567890\"\n }\n}"),
}
uploader := &BingAdsBulkUploader{}
uploader := &BingAdsBulkUploader{
isHashRequired: true,
}
expectedResp := `{"message":{"fields":{"conversionCurrencyCode":"USD","conversionName":"Test-Integration","conversionTime":"5/22/2023 6:27:54 AM","conversionValue":"100","email":"28a4da98f8812110001ab8ffacde3b38b4725a9e3570c39299fbf2d12c5aa70e","microsoftClickId":"click_id","phone":"8c229df83de8ab269e90918846e326c4008c86481393223d17a30ff5a407b08e"},"action":"insert"},"metadata":{"jobId":0}}`
// Execute
_, err := uploader.Transform(job)
resp, err := uploader.Transform(job)
Expect(resp).To(Equal(expectedResp))
Expect(err).To(BeNil())
})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@ import (
"github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/common"
)

func NewBingAdsBulkUploader(logger logger.Logger, statsFactory stats.Stats, destName string, service bingads.BulkServiceI, client *Client) *BingAdsBulkUploader {
func NewBingAdsBulkUploader(logger logger.Logger, statsFactory stats.Stats, destName string, service bingads.BulkServiceI, isHashRequired bool) *BingAdsBulkUploader {
return &BingAdsBulkUploader{
destName: destName,
service: service,
logger: logger.Child("BingAds").Child("BingAdsBulkUploader"),
statsFactory: statsFactory,
client: *client,
fileSizeLimit: common.GetBatchRouterConfigInt64("MaxUploadLimit", destName, 100*bytesize.MB),
eventsLimit: common.GetBatchRouterConfigInt64("MaxEventsLimit", destName, 1000),
isHashRequired: isHashRequired,
destName: destName,
service: service,
logger: logger.Child("BingAds").Child("BingAdsBulkUploader"),
statsFactory: statsFactory,
fileSizeLimit: common.GetBatchRouterConfigInt64("MaxUploadLimit", destName, 100*bytesize.MB),
eventsLimit: common.GetBatchRouterConfigInt64("MaxEventsLimit", destName, 1000),
}
}

Expand Down Expand Up @@ -81,6 +81,12 @@ func (b *BingAdsBulkUploader) Transform(job *jobsdb.JobT) (string, error) {
}
}
}
if b.isHashRequired {
event.Fields, err = hashFields(fields)
if err != nil {
return payload, fmt.Errorf("Unable to hash fields.%w", err)
}
}
data := Data{
Message: Message{
Fields: event.Fields,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,7 @@ func newManagerInternal(logger logger.Logger, statsFactory stats.Stats, destinat
TokenSource: &token,
}
session := bingads.NewSession(sessionConfig)

clientNew := Client{}
bingUploader := NewBingAdsBulkUploader(logger, statsFactory, destination.DestinationDefinition.Name, bingads.NewBulkService(session), &clientNew)
bingUploader := NewBingAdsBulkUploader(logger, statsFactory, destination.DestinationDefinition.Name, bingads.NewBulkService(session), destConfig.IsHashRequired)
return bingUploader, nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,20 @@ package offline_conversions
import (
"encoding/csv"
"encoding/json"
"net/http"

"github.com/rudderlabs/bing-ads-go-sdk/bingads"
"github.com/rudderlabs/rudder-go-kit/logger"
"github.com/rudderlabs/rudder-go-kit/stats"
)

type Client struct {
URL string
client *http.Client
}
type BingAdsBulkUploader struct {
destName string
service bingads.BulkServiceI
logger logger.Logger
statsFactory stats.Stats
client Client
fileSizeLimit int64
eventsLimit int64
destName string
service bingads.BulkServiceI
logger logger.Logger
statsFactory stats.Stats
fileSizeLimit int64
eventsLimit int64
isHashRequired bool
}
type Message struct {
Fields json.RawMessage `json:"fields"`
Expand All @@ -41,6 +36,7 @@ type DestinationConfig struct {
CustomerAccountID string `json:"customerAccountId"`
CustomerID string `json:"customerId"`
RudderAccountID string `json:"rudderAccountId"`
IsHashRequired bool `json:"isHashRequired"`
}

type ActionFileInfo struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ package offline_conversions
import (
"archive/zip"
"bufio"
"crypto/sha256"
"encoding/csv"
"encoding/json"
stdjson "encoding/json"
"fmt"
"io"
"net/http"
Expand Down Expand Up @@ -466,3 +468,43 @@ func validateField(fields map[string]interface{}, field string) error {
}
return nil
}

func calculateHashCode(data string) string {
// Join the strings into a single string with a separator
hash := sha256.New()
hash.Write([]byte(data))
hashBytes := hash.Sum(nil)
hashCode := fmt.Sprintf("%x", hashBytes)

return hashCode
}

func hashFields(input map[string]interface{}) (stdjson.RawMessage, error) {
// Create a new map to hold the hashed fields
hashedMap := make(map[string]interface{})

// Iterate over the input map
for key, value := range input {
// Check if the key is "email" or "phone"
if key == "email" || key == "phone" {
// Ensure the value is a string before hashing
if strVal, ok := value.(string); ok {
hashedMap[key] = calculateHashCode(strVal)
} else {
// If not a string, preserve the original value
hashedMap[key] = value
}
} else {
// Preserve other fields unchanged
hashedMap[key] = value
}
}

// Convert the resulting map to JSON RawMessage
result, err := json.Marshal(hashedMap)
if err != nil {
return nil, err
}

return json.RawMessage(result), nil
}

0 comments on commit 8a186e5

Please sign in to comment.